Add ObjectType filtering support for the Locate operation

This change updates Locate operation support in the PyKMIP server,
allowing users to filter objects based on the object's Object Type.
Unit tests and integration tests have been added to test and verify
the correctness of this feature.

Additionally, the Locate demo scripts have also been updated to
support Object Type filtering. Simply use the "--object-type" flag
to specify an Object Type enumeration for the Locate script to
filter on.
This commit is contained in:
Peter Hamilton 2019-07-30 15:54:52 -04:00 committed by Peter Hamilton
parent e5de55c0a8
commit d74b394261
7 changed files with 195 additions and 1 deletions

View File

@ -35,6 +35,7 @@ if __name__ == '__main__':
name = opts.name
initial_dates = opts.initial_dates
state = opts.state
object_type = opts.object_type
attribute_factory = AttributeFactory()
@ -84,6 +85,20 @@ if __name__ == '__main__':
else:
logger.error("Invalid state provided: {}".format(opts.state))
sys.exit(-3)
if object_type:
object_type = getattr(enums.ObjectType, object_type, None)
if object_type:
attributes.append(
attribute_factory.create_attribute(
enums.AttributeType.OBJECT_TYPE,
object_type
)
)
else:
logger.error(
"Invalid object type provided: {}".format(opts.object_type)
)
sys.exit(-4)
# Build the client and connect to the server
with client.ProxyKmipClient(

View File

@ -38,6 +38,7 @@ if __name__ == '__main__':
name = opts.name
initial_dates = opts.initial_dates
state = opts.state
object_type = opts.object_type
attribute_factory = AttributeFactory()
credential_factory = CredentialFactory()
@ -107,7 +108,22 @@ if __name__ == '__main__':
else:
logger.error("Invalid state provided: {}".format(opts.state))
client.close()
sys.exit(-1)
sys.exit(-3)
if object_type:
object_type = getattr(enums.ObjectType, object_type, None)
if object_type:
attributes.append(
attribute_factory.create_attribute(
enums.AttributeType.OBJECT_TYPE,
object_type
)
)
else:
logger.error(
"Invalid object type provided: {}".format(opts.object_type)
)
client.close()
sys.exit(-4)
result = client.locate(attributes=attributes, credential=credential)
client.close()

View File

@ -260,6 +260,17 @@ def build_cli_parser(operation=None):
dest="state",
help="The state of the secret (e.g., PRE_ACTIVE, ACTIVE)"
)
parser.add_option(
"--object-type",
action="store",
type="str",
default=None,
dest="object_type",
help=(
"The object type of the secret "
"(e.g., CERTIFICATE, SYMMETRIC_KEY)"
)
)
elif operation is Operation.REGISTER:
parser.add_option(
"-f",

View File

@ -1654,6 +1654,19 @@ class KmipEngine(object):
)
add_object = False
break
elif name == enums.AttributeType.OBJECT_TYPE.value:
value = value.value
if value != attribute:
self._logger.debug(
"Failed match: "
"the specified object type ({}) does not "
"match the object's object type ({}).".format(
value.name,
attribute.name
)
)
add_object = False
break
elif name == enums.AttributeType.INITIAL_DATE.value:
initial_date["value"] = attribute
self._track_date_attributes(

View File

@ -1359,6 +1359,20 @@ class TestIntegration(testtools.TestCase):
self.assertIn(uid_a, result.uuids)
self.assertIn(uid_b, result.uuids)
# Test locating each key based off of its object type.
result = self.client.locate(
attributes=[
self.attr_factory.create_attribute(
enums.AttributeType.OBJECT_TYPE,
enums.ObjectType.SYMMETRIC_KEY
)
]
)
self.assertEqual(ResultStatus.SUCCESS, result.result_status.value)
self.assertEqual(2, len(result.uuids))
self.assertIn(uid_a, result.uuids)
self.assertIn(uid_b, result.uuids)
# Clean up keys
result = self.client.destroy(uid_a)
self.assertEqual(ResultStatus.SUCCESS, result.result_status.value)

View File

@ -997,6 +997,19 @@ class TestProxyKmipClientIntegration(testtools.TestCase):
self.assertIn(a_id, result)
self.assertIn(b_id, result)
# Test locating each key by its object type.
result = self.client.locate(
attributes=[
self.attribute_factory.create_attribute(
enums.AttributeType.OBJECT_TYPE,
enums.ObjectType.SYMMETRIC_KEY
)
]
)
self.assertEqual(2, len(result))
self.assertIn(a_id, result)
self.assertIn(b_id, result)
# Clean up the keys
self.client.destroy(a_id)
self.client.destroy(b_id)

View File

@ -4777,6 +4777,118 @@ class TestKmipEngine(testtools.TestCase):
self.assertEqual(len(response_payload.unique_identifiers), 1)
self.assertIn(id_b, response_payload.unique_identifiers)
def test_locate_with_object_type(self):
"""
Test the Locate operation when the 'Object Type' attribute is given.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._is_allowed_by_operation_policy = mock.Mock(return_value=True)
e._logger = mock.MagicMock()
key = (
b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
)
obj_a = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.AES,
128,
key,
name='name1'
)
obj_b = pie_objects.SecretData(
key,
enums.SecretDataType.PASSWORD
)
e._data_session.add(obj_a)
e._data_session.add(obj_b)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
id_a = str(obj_a.unique_identifier)
id_b = str(obj_b.unique_identifier)
attribute_factory = factory.AttributeFactory()
# Locate the secret data object based on its type.
attrs = [
attribute_factory.create_attribute(
enums.AttributeType.OBJECT_TYPE,
enums.ObjectType.SECRET_DATA
)
]
payload = payloads.LocateRequestPayload(attributes=attrs)
e._logger.reset_mock()
response_payload = e._process_locate(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_any_call("Processing operation: Locate")
e._logger.debug.assert_any_call(
"Locate filter matched object: {}".format(id_b)
)
e._logger.debug.assert_any_call(
"Failed match: "
"the specified object type (SECRET_DATA) does not "
"match the object's object type (SYMMETRIC_KEY)."
)
self.assertEqual(1, len(response_payload.unique_identifiers))
self.assertIn(id_b, response_payload.unique_identifiers)
# Locate the symmetric key object based on its type.
attrs = [
attribute_factory.create_attribute(
enums.AttributeType.OBJECT_TYPE,
enums.ObjectType.SYMMETRIC_KEY
)
]
payload = payloads.LocateRequestPayload(attributes=attrs)
e._logger.reset_mock()
response_payload = e._process_locate(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_any_call("Processing operation: Locate")
e._logger.debug.assert_any_call(
"Locate filter matched object: {}".format(id_a)
)
e._logger.debug.assert_any_call(
"Failed match: "
"the specified object type (SYMMETRIC_KEY) does not "
"match the object's object type (SECRET_DATA)."
)
self.assertEqual(1, len(response_payload.unique_identifiers))
self.assertIn(id_a, response_payload.unique_identifiers)
# Try to locate a non-existent object by its type.
attrs = [
attribute_factory.create_attribute(
enums.AttributeType.OBJECT_TYPE,
enums.ObjectType.PUBLIC_KEY
)
]
payload = payloads.LocateRequestPayload(attributes=attrs)
e._logger.reset_mock()
response_payload = e._process_locate(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_any_call("Processing operation: Locate")
e._logger.debug.assert_any_call(
"Failed match: "
"the specified object type (PUBLIC_KEY) does not "
"match the object's object type (SYMMETRIC_KEY)."
)
e._logger.debug.assert_any_call(
"Failed match: "
"the specified object type (PUBLIC_KEY) does not "
"match the object's object type (SECRET_DATA)."
)
self.assertEqual(0, len(response_payload.unique_identifiers))
def test_get(self):
"""
Test that a Get request can be processed correctly.