From 009e8cecc92b258383f521fad789546354dac92e Mon Sep 17 00:00:00 2001 From: Peter Hamilton Date: Thu, 10 Oct 2019 17:34:28 -0400 Subject: [PATCH] Add ObjectGroup support to the server This change ObjectGroup attribute support to the server, allowing for the storage and retrieval of the new attribute in addition to object filtering based on its value. New unit tests have been added to cover the new changes. --- kmip/services/server/engine.py | 17 ++- kmip/services/server/policy.py | 2 +- .../tests/unit/services/server/test_engine.py | 122 +++++++++++++++++- 3 files changed, 133 insertions(+), 8 deletions(-) diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index cc9e501..5ddb19b 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -725,7 +725,7 @@ class KmipEngine(object): elif attr_name == 'Archive Date': return None elif attr_name == 'Object Group': - return None + return [x.object_group for x in managed_object.object_groups] elif attr_name == 'Fresh': return None elif attr_name == 'Link': @@ -797,6 +797,11 @@ class KmipEngine(object): application_data=value.application_data ) ) + elif attribute_name == "Object Group": + for value in attribute_value: + managed_object.object_groups.append( + objects.ObjectGroup(object_group=value.value) + ) else: # TODO (peterhamilton) Remove when all attributes are supported raise exceptions.InvalidField( @@ -1698,6 +1703,16 @@ class KmipEngine(object): ) add_object = False break + elif name == "Object Group": + if value.value not in attribute: + self._logger.debug( + "Failed match: " + "the specified object group ('{}') does not " + "match any of the object's associated object " + "group attributes.".format(value.value) + ) + add_object = False + break elif name == "Name": if value not in attribute: self._logger.debug( diff --git a/kmip/services/server/policy.py b/kmip/services/server/policy.py index 1fa021d..40d5292 100644 --- a/kmip/services/server/policy.py +++ b/kmip/services/server/policy.py @@ -875,7 +875,7 @@ class AttributePolicy(object): False, False, False, - False, + True, ( enums.Operation.CREATE, enums.Operation.CREATE_KEY_PAIR, diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index 371272a..0665a0d 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -1584,7 +1584,7 @@ class TestKmipEngine(testtools.TestCase): symmetric_key, 'Object Group' ) - self.assertEqual(None, result) + self.assertEqual([], result) result = e._get_attribute_from_managed_object( symmetric_key, @@ -1900,16 +1900,16 @@ class TestKmipEngine(testtools.TestCase): ) # Test that an unsupported attribute cannot be set. - object_group = attribute_factory.create_attribute( - enums.AttributeType.OBJECT_GROUP, - 'Test Group' + custom_attribute = attribute_factory.create_attribute( + enums.AttributeType.CUSTOM_ATTRIBUTE, + "Test Group" ) args = ( managed_object, - ('Object Group', object_group.attribute_value) + ("Custom Attribute", custom_attribute.attribute_value) ) - regex = "The Object Group attribute is unsupported." + regex = "The Custom Attribute attribute is unsupported." six.assertRaisesRegex( self, exceptions.InvalidField, @@ -2430,6 +2430,10 @@ class TestKmipEngine(testtools.TestCase): "application_namespace": "ssl", "application_data": "www.example.com" } + ), + attribute_factory.create_attribute( + enums.AttributeType.OBJECT_GROUP, + "Group1" ) ] ) @@ -2489,6 +2493,8 @@ class TestKmipEngine(testtools.TestCase): "www.example.com", symmetric_key.app_specific_info[0].application_data ) + self.assertEqual(1, len(symmetric_key.object_groups)) + self.assertEqual("Group1", symmetric_key.object_groups[0].object_group) self.assertEqual(uid, e._id_placeholder) @@ -5735,6 +5741,110 @@ class TestKmipEngine(testtools.TestCase): self.assertEqual(1, len(response_payload.unique_identifiers)) self.assertIn(id_a, response_payload.unique_identifiers) + def test_locate_with_object_group(self): + """ + Test the Locate operation when the 'Object Group' + attribute is given. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + key = ( + b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + ) + + object_group_a = pie_objects.ObjectGroup(object_group="Group1") + object_group_b = pie_objects.ObjectGroup(object_group="Group2") + + obj_a = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 128, + key, + name='name1' + ) + obj_a.object_groups.append(object_group_a) + obj_b = pie_objects.SecretData( + key, + enums.SecretDataType.PASSWORD + ) + obj_b.object_groups.append(object_group_a) + obj_b.object_groups.append(object_group_b) + obj_c = pie_objects.SecretData( + key, + enums.SecretDataType.PASSWORD + ) + + e._data_session.add(obj_a) + e._data_session.add(obj_b) + e._data_session.add(obj_c) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + id_a = str(obj_a.unique_identifier) + id_b = str(obj_b.unique_identifier) + + attribute_factory = factory.AttributeFactory() + + # Locate the symmetric key objects based on their shared object group + # attribute. + attrs = [ + attribute_factory.create_attribute( + enums.AttributeType.OBJECT_GROUP, + "Group1" + ) + ] + payload = payloads.LocateRequestPayload(attributes=attrs) + e._logger.reset_mock() + response_payload = e._process_locate(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call("Processing operation: Locate") + e._logger.debug.assert_any_call( + "Locate filter matched object: {}".format(id_a) + ) + e._logger.debug.assert_any_call( + "Locate filter matched object: {}".format(id_b) + ) + e._logger.debug.assert_any_call( + "Failed match: " + "the specified object group ('Group1') does not match any " + "of the object's associated object group attributes." + ) + self.assertEqual(2, len(response_payload.unique_identifiers)) + self.assertIn(id_a, response_payload.unique_identifiers) + self.assertIn(id_b, response_payload.unique_identifiers) + + # Locate a single symmetric key object based on its unique object group + # attribute. + attrs = [ + attribute_factory.create_attribute( + enums.AttributeType.OBJECT_GROUP, + "Group2" + ) + ] + payload = payloads.LocateRequestPayload(attributes=attrs) + e._logger.reset_mock() + response_payload = e._process_locate(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call("Processing operation: Locate") + e._logger.debug.assert_any_call( + "Locate filter matched object: {}".format(id_b) + ) + e._logger.debug.assert_any_call( + "Failed match: " + "the specified object group ('Group2') does not match any " + "of the object's associated object group attributes." + ) + self.assertEqual(1, len(response_payload.unique_identifiers)) + self.assertIn(id_b, response_payload.unique_identifiers) + def test_get(self): """ Test that a Get request can be processed correctly.