diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index cc9e501..5ddb19b 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -725,7 +725,7 @@ class KmipEngine(object): elif attr_name == 'Archive Date': return None elif attr_name == 'Object Group': - return None + return [x.object_group for x in managed_object.object_groups] elif attr_name == 'Fresh': return None elif attr_name == 'Link': @@ -797,6 +797,11 @@ class KmipEngine(object): application_data=value.application_data ) ) + elif attribute_name == "Object Group": + for value in attribute_value: + managed_object.object_groups.append( + objects.ObjectGroup(object_group=value.value) + ) else: # TODO (peterhamilton) Remove when all attributes are supported raise exceptions.InvalidField( @@ -1698,6 +1703,16 @@ class KmipEngine(object): ) add_object = False break + elif name == "Object Group": + if value.value not in attribute: + self._logger.debug( + "Failed match: " + "the specified object group ('{}') does not " + "match any of the object's associated object " + "group attributes.".format(value.value) + ) + add_object = False + break elif name == "Name": if value not in attribute: self._logger.debug( diff --git a/kmip/services/server/policy.py b/kmip/services/server/policy.py index 1fa021d..40d5292 100644 --- a/kmip/services/server/policy.py +++ b/kmip/services/server/policy.py @@ -875,7 +875,7 @@ class AttributePolicy(object): False, False, False, - False, + True, ( enums.Operation.CREATE, enums.Operation.CREATE_KEY_PAIR, diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index 371272a..0665a0d 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -1584,7 +1584,7 @@ class TestKmipEngine(testtools.TestCase): symmetric_key, 'Object Group' ) - self.assertEqual(None, result) + self.assertEqual([], result) result = e._get_attribute_from_managed_object( symmetric_key, @@ -1900,16 +1900,16 @@ class TestKmipEngine(testtools.TestCase): ) # Test that an unsupported attribute cannot be set. - object_group = attribute_factory.create_attribute( - enums.AttributeType.OBJECT_GROUP, - 'Test Group' + custom_attribute = attribute_factory.create_attribute( + enums.AttributeType.CUSTOM_ATTRIBUTE, + "Test Group" ) args = ( managed_object, - ('Object Group', object_group.attribute_value) + ("Custom Attribute", custom_attribute.attribute_value) ) - regex = "The Object Group attribute is unsupported." + regex = "The Custom Attribute attribute is unsupported." six.assertRaisesRegex( self, exceptions.InvalidField, @@ -2430,6 +2430,10 @@ class TestKmipEngine(testtools.TestCase): "application_namespace": "ssl", "application_data": "www.example.com" } + ), + attribute_factory.create_attribute( + enums.AttributeType.OBJECT_GROUP, + "Group1" ) ] ) @@ -2489,6 +2493,8 @@ class TestKmipEngine(testtools.TestCase): "www.example.com", symmetric_key.app_specific_info[0].application_data ) + self.assertEqual(1, len(symmetric_key.object_groups)) + self.assertEqual("Group1", symmetric_key.object_groups[0].object_group) self.assertEqual(uid, e._id_placeholder) @@ -5735,6 +5741,110 @@ class TestKmipEngine(testtools.TestCase): self.assertEqual(1, len(response_payload.unique_identifiers)) self.assertIn(id_a, response_payload.unique_identifiers) + def test_locate_with_object_group(self): + """ + Test the Locate operation when the 'Object Group' + attribute is given. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + key = ( + b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + ) + + object_group_a = pie_objects.ObjectGroup(object_group="Group1") + object_group_b = pie_objects.ObjectGroup(object_group="Group2") + + obj_a = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 128, + key, + name='name1' + ) + obj_a.object_groups.append(object_group_a) + obj_b = pie_objects.SecretData( + key, + enums.SecretDataType.PASSWORD + ) + obj_b.object_groups.append(object_group_a) + obj_b.object_groups.append(object_group_b) + obj_c = pie_objects.SecretData( + key, + enums.SecretDataType.PASSWORD + ) + + e._data_session.add(obj_a) + e._data_session.add(obj_b) + e._data_session.add(obj_c) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + id_a = str(obj_a.unique_identifier) + id_b = str(obj_b.unique_identifier) + + attribute_factory = factory.AttributeFactory() + + # Locate the symmetric key objects based on their shared object group + # attribute. + attrs = [ + attribute_factory.create_attribute( + enums.AttributeType.OBJECT_GROUP, + "Group1" + ) + ] + payload = payloads.LocateRequestPayload(attributes=attrs) + e._logger.reset_mock() + response_payload = e._process_locate(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call("Processing operation: Locate") + e._logger.debug.assert_any_call( + "Locate filter matched object: {}".format(id_a) + ) + e._logger.debug.assert_any_call( + "Locate filter matched object: {}".format(id_b) + ) + e._logger.debug.assert_any_call( + "Failed match: " + "the specified object group ('Group1') does not match any " + "of the object's associated object group attributes." + ) + self.assertEqual(2, len(response_payload.unique_identifiers)) + self.assertIn(id_a, response_payload.unique_identifiers) + self.assertIn(id_b, response_payload.unique_identifiers) + + # Locate a single symmetric key object based on its unique object group + # attribute. + attrs = [ + attribute_factory.create_attribute( + enums.AttributeType.OBJECT_GROUP, + "Group2" + ) + ] + payload = payloads.LocateRequestPayload(attributes=attrs) + e._logger.reset_mock() + response_payload = e._process_locate(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call("Processing operation: Locate") + e._logger.debug.assert_any_call( + "Locate filter matched object: {}".format(id_b) + ) + e._logger.debug.assert_any_call( + "Failed match: " + "the specified object group ('Group2') does not match any " + "of the object's associated object group attributes." + ) + self.assertEqual(1, len(response_payload.unique_identifiers)) + self.assertIn(id_b, response_payload.unique_identifiers) + def test_get(self): """ Test that a Get request can be processed correctly.