diff --git a/kmip/pie/objects.py b/kmip/pie/objects.py index 023214b..f71615f 100644 --- a/kmip/pie/objects.py +++ b/kmip/pie/objects.py @@ -97,7 +97,8 @@ class ManagedObject(sql.Base): _names = sqlalchemy.orm.relationship( "ManagedObjectName", back_populates="mo", - cascade="all, delete-orphan" + cascade="all, delete-orphan", + order_by="ManagedObjectName.id" ) names = association_proxy('_names', 'name') operation_policy_name = Column( @@ -112,12 +113,14 @@ class ManagedObject(sql.Base): "ApplicationSpecificInformation", secondary=app_specific_info_map, back_populates="managed_objects", + order_by="ApplicationSpecificInformation.id", passive_deletes=True ) object_groups = sqlalchemy.orm.relationship( "ObjectGroup", secondary=object_group_map, back_populates="managed_objects", + order_by="ObjectGroup.id", passive_deletes=True ) diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index c2f2a17..a4654ba 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -843,6 +843,86 @@ class KmipEngine(object): "The {0} attribute is unsupported.".format(attribute_name) ) + def _delete_attribute_from_managed_object(self, managed_object, attribute): + attribute_name, attribute_index, attribute_value = attribute + object_type = managed_object._object_type + if not self._attribute_policy.is_attribute_applicable_to_object_type( + attribute_name, + object_type + ): + raise exceptions.ItemNotFound( + "The '{}' attribute is not applicable to '{}' objects.".format( + attribute_name, + ''.join( + [x.capitalize() for x in object_type.name.split('_')] + ) + ) + ) + if not self._attribute_policy.is_attribute_deletable_by_client( + attribute_name + ): + raise exceptions.PermissionDenied( + "Cannot delete a required attribute." + ) + + if self._attribute_policy.is_attribute_multivalued(attribute_name): + # Get the specific attribute collection and attribute objects. + attribute_list = [] + if attribute_name == "Name": + attribute_list = managed_object.names + if attribute_value is not None: + attribute_value = attribute_value.value + elif attribute_name == "Application Specific Information": + attribute_list = managed_object.app_specific_info + if attribute_value is not None: + namespace = attribute_value.application_namespace + attribute_value = objects.ApplicationSpecificInformation( + application_namespace=namespace, + application_data=attribute_value.application_data + ) + elif attribute_name == "Object Group": + attribute_list = managed_object.object_groups + if attribute_value is not None: + attribute_value = objects.ObjectGroup( + object_group=attribute_value.value + ) + else: + raise exceptions.InvalidField( + "The '{}' attribute is not supported.".format( + attribute_name + ) + ) + + # Generically handle attribute deletion. + if attribute_value: + if attribute_list.count(attribute_value): + attribute_list.remove(attribute_value) + else: + raise exceptions.ItemNotFound( + "Could not locate the attribute instance with the " + "specified value: {}".format(attribute_value) + ) + elif attribute_index is not None: + if attribute_index < len(attribute_list): + attribute_list.pop(attribute_index) + else: + raise exceptions.ItemNotFound( + "Could not locate the attribute instance with the " + "specified index: {}".format(attribute_index) + ) + else: + # If no attribute index is provided, this is not a KMIP + # 1.* request. If no attribute value is provided, this + # must be a KMIP 2.0 attribute reference request, so + # delete all instances of the attribute. + attribute_list[:] = [] + else: + # The server does not currently support any single-instance, + # client deletable attributes. + raise exceptions.InvalidField( + "The '{}' attribute is not supported.".format(attribute_name) + ) + def _is_allowed_by_operation_policy( self, policy_name, @@ -1075,6 +1155,8 @@ class KmipEngine(object): return self._process_create(payload) elif operation == enums.Operation.CREATE_KEY_PAIR: return self._process_create_key_pair(payload) + elif operation == enums.Operation.DELETE_ATTRIBUTE: + return self._process_delete_attribute(payload) elif operation == enums.Operation.REGISTER: return self._process_register(payload) elif operation == enums.Operation.DERIVE_KEY: @@ -1378,6 +1460,91 @@ class KmipEngine(object): self._id_placeholder = str(private_key.unique_identifier) return response_payload + @_kmip_version_supported('1.0') + def _process_delete_attribute(self, payload): + self._logger.info("Processing operation: DeleteAttribute") + + unique_identifier = self._id_placeholder + if payload.unique_identifier: + unique_identifier = payload.unique_identifier + + managed_object = self._get_object_with_access_controls( + unique_identifier, + enums.Operation.DELETE_ATTRIBUTE + ) + deleted_attribute = None + + attribute_name = None + attribute_index = None + attribute_value = None + + if self._protocol_version >= contents.ProtocolVersion(2, 0): + # If the current attribute is defined, use that. Otherwise, use + # the attribute reference. + if payload.current_attribute: + try: + attribute_name = enums.convert_attribute_tag_to_name( + payload.current_attribute.attribute.tag + ) + attribute_value = payload.current_attribute.attribute + except ValueError as e: + self._logger.exception(e) + raise exceptions.ItemNotFound( + "No attribute with the specified name exists." + ) + elif payload.attribute_reference: + attribute_name = payload.attribute_reference.attribute_name + else: + raise exceptions.InvalidMessage( + "The DeleteAttribute request must specify the current " + "attribute or an attribute reference." + ) + else: + # Build a partial attribute from the attribute name and index. + if payload.attribute_name: + attribute_name = payload.attribute_name + else: + raise exceptions.InvalidMessage( + "The DeleteAttribute request must specify the attribute " + "name." + ) + if payload.attribute_index: + attribute_index = payload.attribute_index + else: + attribute_index = 0 + + # Grab a copy of the attribute before deleting it. + existing_attributes = self._get_attributes_from_managed_object( + managed_object, + [payload.attribute_name] + ) + if len(existing_attributes) > 0: + if not attribute_index: + deleted_attribute = existing_attributes[0] + else: + if attribute_index < len(existing_attributes): + deleted_attribute = existing_attributes[ + attribute_index + ] + else: + raise exceptions.ItemNotFound( + "Could not locate the attribute instance with the " + "specified index: {}".format(attribute_index) + ) + + self._delete_attribute_from_managed_object( + managed_object, + (attribute_name, attribute_index, attribute_value) + ) + self._data_session.commit() + + response_payload = payloads.DeleteAttributeResponsePayload( + unique_identifier=unique_identifier, + attribute=deleted_attribute + ) + + return response_payload + @_kmip_version_supported('1.0') def _process_register(self, payload): self._logger.info("Processing operation: Register") diff --git a/kmip/services/server/policy.py b/kmip/services/server/policy.py index 40d5292..08f58b1 100644 --- a/kmip/services/server/policy.py +++ b/kmip/services/server/policy.py @@ -129,6 +129,7 @@ class AttributePolicy(object): """ self._version = version + # TODO (peterhamilton) Alphabetize these self._attribute_rule_sets = { 'Unique Identifier': AttributeRuleSet( True, @@ -872,9 +873,9 @@ class AttributePolicy(object): 'Object Group': AttributeRuleSet( False, ('server', 'client'), - False, - False, - False, + True, + True, + True, True, ( enums.Operation.CREATE, @@ -1116,6 +1117,20 @@ class AttributePolicy(object): else: return False + def is_attribute_deletable_by_client(self, attribute): + """ + Check if the attribute can be deleted by the client. + + Args: + attribute (string): The name of the attribute (e.g., "Name"). + + Returns: + bool: True if the attribute can be deleted by the client. False + otherwise. + """ + rule_set = self._attribute_rule_sets.get(attribute) + return rule_set.deletable_by_client + def is_attribute_applicable_to_object_type(self, attribute, object_type): """ Check if the attribute is supported by the given object type. diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index 0665a0d..57ed296 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -702,8 +702,11 @@ class TestKmipEngine(testtools.TestCase): e = engine.KmipEngine() e._logger = mock.MagicMock() + # TODO (peterhamilton) Alphatize these. e._process_create = mock.MagicMock() e._process_create_key_pair = mock.MagicMock() + e._process_delete_attribute = mock.MagicMock() + e._process_derive_key = mock.MagicMock() e._process_register = mock.MagicMock() e._process_locate = mock.MagicMock() e._process_get = mock.MagicMock() @@ -722,6 +725,8 @@ class TestKmipEngine(testtools.TestCase): e._process_operation(enums.Operation.CREATE, None) e._process_operation(enums.Operation.CREATE_KEY_PAIR, None) + e._process_operation(enums.Operation.DELETE_ATTRIBUTE, None) + e._process_operation(enums.Operation.DERIVE_KEY, None) e._process_operation(enums.Operation.REGISTER, None) e._process_operation(enums.Operation.LOCATE, None) e._process_operation(enums.Operation.GET, None) @@ -740,6 +745,8 @@ class TestKmipEngine(testtools.TestCase): e._process_create.assert_called_with(None) e._process_create_key_pair.assert_called_with(None) + e._process_delete_attribute.assert_called_with(None) + e._process_derive_key.assert_called_with(None) e._process_register.assert_called_with(None) e._process_locate.assert_called_with(None) e._process_get.assert_called_with(None) @@ -1918,6 +1925,343 @@ class TestKmipEngine(testtools.TestCase): *args ) + def test_delete_attribute_from_managed_object(self): + """ + Test that various attributes can be deleted correctly from a given + managed object. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + attribute_factory = factory.AttributeFactory() + + name_1 = attribute_factory.create_attribute( + enums.AttributeType.NAME, + attributes.Name.create( + "Name 1", + enums.NameType.UNINTERPRETED_TEXT_STRING + ) + ) + name_2 = attribute_factory.create_attribute( + enums.AttributeType.NAME, + attributes.Name.create( + "Name 2", + enums.NameType.UNINTERPRETED_TEXT_STRING + ) + ) + app_specific_info_1 = attribute_factory.create_attribute( + enums.AttributeType.APPLICATION_SPECIFIC_INFORMATION, + { + "application_namespace": "Namespace 1", + "application_data": "Data 1" + } + ) + app_specific_info_2 = attribute_factory.create_attribute( + enums.AttributeType.APPLICATION_SPECIFIC_INFORMATION, + { + "application_namespace": "Namespace 2", + "application_data": "Data 2" + } + ) + object_group_1 = attribute_factory.create_attribute( + enums.AttributeType.OBJECT_GROUP, + "Object Group 1" + ) + object_group_2 = attribute_factory.create_attribute( + enums.AttributeType.OBJECT_GROUP, + "Object Group 2" + ) + managed_object = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + managed_object.names.clear() + + self.assertEqual(0, len(managed_object.names)) + self.assertEqual(0, len(managed_object.app_specific_info)) + self.assertEqual(0, len(managed_object.object_groups)) + + e._set_attribute_on_managed_object( + managed_object, + ( + "Name", + [ + name_1.attribute_value, + name_2.attribute_value + ] + ) + ) + e._set_attribute_on_managed_object( + managed_object, + ( + "Application Specific Information", + [ + app_specific_info_1.attribute_value, + app_specific_info_2.attribute_value + ] + ) + ) + e._set_attribute_on_managed_object( + managed_object, + ( + "Object Group", + [ + object_group_1.attribute_value, + object_group_2.attribute_value + ] + ) + ) + + self.assertEqual(2, len(managed_object.names)) + self.assertEqual(2, len(managed_object.app_specific_info)) + self.assertEqual(2, len(managed_object.object_groups)) + + e._delete_attribute_from_managed_object( + managed_object, + ( + "Application Specific Information", + 0, + None + ) + ) + + self.assertEqual(1, len(managed_object.app_specific_info)) + + e._delete_attribute_from_managed_object( + managed_object, + ( + "Application Specific Information", + 0, + app_specific_info_2.attribute_value + ) + ) + + self.assertEqual(0, len(managed_object.app_specific_info)) + + e._delete_attribute_from_managed_object( + managed_object, + ( + "Name", + None, + primitives.TextString(value="Name 2", tag=enums.Tags.NAME) + ) + ) + + self.assertEqual(1, len(managed_object.names)) + self.assertEqual("Name 1", managed_object.names[0]) + + e._delete_attribute_from_managed_object( + managed_object, + ( + "Object Group", + None, + None + ) + ) + + self.assertEqual(0, len(managed_object.object_groups)) + + e._set_attribute_on_managed_object( + managed_object, + ( + "Object Group", + [object_group_1.attribute_value] + ) + ) + + self.assertEqual(1, len(managed_object.object_groups)) + + e._delete_attribute_from_managed_object( + managed_object, + ( + "Object Group", + None, + primitives.TextString( + value="Object Group 1", + tag=enums.Tags.OBJECT_GROUP + ) + ) + ) + + self.assertEqual(0, len(managed_object.object_groups)) + + def test_delete_attribute_from_managed_object_unsupported_attribute(self): + """ + Test that an ItemNotFound error is raised when attempting to delete an + unsupported attribute from a managed object. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + managed_object = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + args = (managed_object, ("Digital Signature Algorithm", None, None)) + self.assertRaisesRegex( + exceptions.ItemNotFound, + "The 'Digital Signature Algorithm' attribute is not applicable " + "to 'SymmetricKey' objects.", + e._delete_attribute_from_managed_object, + *args + ) + + def test_delete_attribute_from_managed_object_undeletable_attribute(self): + """ + Test that a PermissionDenied error is raised when attempting to delete + a required attribute from a managed object. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + managed_object = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + args = (managed_object, ("Cryptographic Algorithm", None, None)) + self.assertRaisesRegex( + exceptions.PermissionDenied, + "Cannot delete a required attribute.", + e._delete_attribute_from_managed_object, + *args + ) + + def test_delete_attribute_from_managed_object_unsupported_multivalue(self): + """ + Test that an InvalidField error is raised when attempting to delete an + unsupported multivalued attribute. + """ + # TODO (peterhamilton) Remove this test once all multivalued attributes + # are supported. + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + managed_object = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + args = (managed_object, ("Link", None, None)) + self.assertRaisesRegex( + exceptions.InvalidField, + "The 'Link' attribute is not supported.", + e._delete_attribute_from_managed_object, + *args + ) + + def test_delete_attribute_from_managed_object_bad_attribute_value(self): + """ + Test that an ItemNotFound error is raised when attempting to delete + an attribute by value that cannot be found on a managed object. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + managed_object = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + args = ( + managed_object, + ( + "Object Group", + None, + primitives.TextString( + value="invalid", + tag=enums.Tags.OBJECT_GROUP + ) + ) + ) + self.assertRaisesRegex( + exceptions.ItemNotFound, + "Could not locate the attribute instance with the specified " + "value: {'object_group': 'invalid'}", + e._delete_attribute_from_managed_object, + *args + ) + + def test_delete_attribute_from_managed_object_bad_attribute_index(self): + """ + Test that an ItemNotFound error is raised when attempting to delete + an attribute by index that cannot be found on a managed object. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + managed_object = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + args = ( + managed_object, + ( + "Object Group", + 3, + None + ) + ) + self.assertRaisesRegex( + exceptions.ItemNotFound, + "Could not locate the attribute instance with the specified " + "index: 3", + e._delete_attribute_from_managed_object, + *args + ) + + def test_delete_attribute_from_managed_object_bad_single_value(self): + """ + Test that an InvalidField error is raised when attempting to delete + a single-valued attribute from a managed object. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + managed_object = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + args = ( + managed_object, + ( + "Contact Information", + None, + None + ) + ) + self.assertRaisesRegex( + exceptions.InvalidField, + "The 'Contact Information' attribute is not supported", + e._delete_attribute_from_managed_object, + *args + ) + def test_is_allowed_by_operation_policy_granted(self): """ Test that access granted by operation policy is processed correctly. @@ -3393,6 +3737,379 @@ class TestKmipEngine(testtools.TestCase): ) e._logger.reset_mock() + def test_delete_attribute(self): + """ + Test that a DeleteAttribute request can be processed correctly. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + attribute_factory = factory.AttributeFactory() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + object_group_1 = attribute_factory.create_attribute( + enums.AttributeType.OBJECT_GROUP, + "Object Group 1" + ) + object_group_2 = attribute_factory.create_attribute( + enums.AttributeType.OBJECT_GROUP, + "Object Group 2" + ) + + e._data_session.add(secret) + e._set_attribute_on_managed_object( + secret, + ( + "Object Group", + [ + object_group_1.attribute_value, + object_group_2.attribute_value + ] + ) + ) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + # Confirm that the attribute was actually added by fetching the + # managed object fresh from the database and checking it. + managed_object = e._get_object_with_access_controls( + "1", + enums.Operation.DELETE_ATTRIBUTE + ) + self.assertEqual(2, len(managed_object.object_groups)) + + payload = payloads.DeleteAttributeRequestPayload( + unique_identifier="1", + attribute_name="Object Group", + attribute_index=1 + ) + + response_payload = e._process_delete_attribute(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call( + "Processing operation: DeleteAttribute" + ) + self.assertEqual( + "1", + response_payload.unique_identifier + ) + self.assertEqual( + attribute_factory.create_attribute( + enums.AttributeType.OBJECT_GROUP, + "Object Group 2", + 1 + ), + response_payload.attribute + ) + + # Confirm that the attribute was actually deleted by fetching the + # managed object fresh from the database and checking it. + managed_object = e._get_object_with_access_controls( + response_payload.unique_identifier, + enums.Operation.DELETE_ATTRIBUTE + ) + + self.assertEqual(1, len(managed_object.object_groups)) + payload = payloads.DeleteAttributeRequestPayload( + unique_identifier="1", + attribute_name="Object Group" + ) + + response_payload = e._process_delete_attribute(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call( + "Processing operation: DeleteAttribute" + ) + self.assertEqual( + "1", + response_payload.unique_identifier + ) + self.assertEqual( + attribute_factory.create_attribute( + enums.AttributeType.OBJECT_GROUP, + "Object Group 1", + 0 + ), + response_payload.attribute + ) + + # Confirm that the attribute was actually deleted by fetching the + # managed object fresh from the database and checking it. + managed_object = e._get_object_with_access_controls( + response_payload.unique_identifier, + enums.Operation.DELETE_ATTRIBUTE + ) + self.assertEqual(0, len(managed_object.object_groups)) + + def test_delete_attribute_with_kmip_2_0(self): + """ + Test that a DeleteAttribute request can be processed correctly + when using KMIP 2.0 payload features. + """ + e = engine.KmipEngine() + e._protocol_version = contents.ProtocolVersion(2, 0) + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + attribute_factory = factory.AttributeFactory() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + object_group = attribute_factory.create_attribute( + enums.AttributeType.OBJECT_GROUP, + "Object Group 1" + ) + + e._data_session.add(secret) + e._set_attribute_on_managed_object( + secret, + ( + "Object Group", + [object_group.attribute_value] + ) + ) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + # Confirm that the attribute was actually added by fetching the + # managed object fresh from the database and checking it. + managed_object = e._get_object_with_access_controls( + "1", + enums.Operation.DELETE_ATTRIBUTE + ) + self.assertEqual(1, len(managed_object.names)) + self.assertEqual(1, len(managed_object.object_groups)) + + payload = payloads.DeleteAttributeRequestPayload( + unique_identifier="1", + current_attribute=objects.CurrentAttribute( + attribute=primitives.TextString( + value="Object Group 1", + tag=enums.Tags.OBJECT_GROUP + ) + ), + attribute_reference=objects.AttributeReference( + vendor_identification="Vendor 1", + attribute_name="Object Group" + ) + ) + + response_payload = e._process_delete_attribute(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call( + "Processing operation: DeleteAttribute" + ) + self.assertEqual( + "1", + response_payload.unique_identifier + ) + self.assertIsNone(response_payload.attribute) + + # Confirm that the attribute was actually deleted by fetching the + # managed object fresh from the database and checking it. + managed_object = e._get_object_with_access_controls( + response_payload.unique_identifier, + enums.Operation.DELETE_ATTRIBUTE + ) + self.assertEqual(0, len(managed_object.object_groups)) + + payload = payloads.DeleteAttributeRequestPayload( + unique_identifier="1", + attribute_reference=objects.AttributeReference( + vendor_identification="Vendor 1", + attribute_name="Name" + ) + ) + + response_payload = e._process_delete_attribute(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call( + "Processing operation: DeleteAttribute" + ) + self.assertEqual( + "1", + response_payload.unique_identifier + ) + self.assertIsNone(response_payload.attribute) + + # Confirm that the attribute was actually deleted by fetching the + # managed object fresh from the database and checking it. + managed_object = e._get_object_with_access_controls( + response_payload.unique_identifier, + enums.Operation.DELETE_ATTRIBUTE + ) + self.assertEqual(0, len(managed_object.names)) + + def test_delete_attribute_with_invalid_current_attribute(self): + """ + Test that an ItemNotFound error is raised when attempting to delete + an invalid current attribute from a managed object. + """ + e = engine.KmipEngine() + e._protocol_version = contents.ProtocolVersion(2, 0) + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + current_attribute = objects.CurrentAttribute() + current_attribute._attribute = primitives.TextString( + value="Object Group 1", + tag=enums.Tags.CURRENT_ATTRIBUTE + ) + args = ( + payloads.DeleteAttributeRequestPayload( + unique_identifier="1", + current_attribute=current_attribute + ), + ) + + self.assertRaisesRegex( + exceptions.ItemNotFound, + "No attribute with the specified name exists.", + e._process_delete_attribute, + *args + ) + + def test_delete_attribute_with_missing_current_attribute_reference(self): + """ + Test that an InvalidMessage error is raised when attempting to delete + an attribute without specifying a current attribute or an attribute + reference (under KMIP 2.0+). + """ + e = engine.KmipEngine() + e._protocol_version = contents.ProtocolVersion(2, 0) + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + args = ( + payloads.DeleteAttributeRequestPayload(unique_identifier="1"), + ) + + self.assertRaisesRegex( + exceptions.InvalidMessage, + "The DeleteAttribute request must specify the current attribute " + "or an attribute reference.", + e._process_delete_attribute, + *args + ) + + def test_delete_attribute_with_missing_attribute_name(self): + """ + Test that an InvalidMessage error is raised when attempting to delete + an attribute without specifying the attribute name (under KMIP 1.0+). + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + args = ( + payloads.DeleteAttributeRequestPayload(unique_identifier="1"), + ) + + self.assertRaisesRegex( + exceptions.InvalidMessage, + "The DeleteAttribute request must specify the attribute name.", + e._process_delete_attribute, + *args + ) + + def test_delete_attribute_with_invalid_attribute_index(self): + """ + Test that an ItemNotFound error is raised when attempting to delete + an attribute when specifying an invalid attribute index. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._is_allowed_by_operation_policy = mock.Mock(return_value=True) + e._logger = mock.MagicMock() + + secret = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(secret) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + args = ( + payloads.DeleteAttributeRequestPayload( + unique_identifier="1", + attribute_name="Name", + attribute_index=20), + ) + + self.assertRaisesRegex( + exceptions.ItemNotFound, + "Could not locate the attribute instance with the specified " + "index: 20", + e._process_delete_attribute, + *args + ) + def test_register(self): """ Test that a Register request can be processed correctly. diff --git a/kmip/tests/unit/services/server/test_policy.py b/kmip/tests/unit/services/server/test_policy.py index 90af6ed..3778430 100644 --- a/kmip/tests/unit/services/server/test_policy.py +++ b/kmip/tests/unit/services/server/test_policy.py @@ -76,6 +76,20 @@ class TestAttributePolicy(testtools.TestCase): result = rules.is_attribute_deprecated(attribute_b) self.assertTrue(result) + def test_is_attribute_deletable_by_client(self): + """ + Test that is_attribute_deletable_by_client returns the expected + results in all cases. + """ + rules = policy.AttributePolicy(contents.ProtocolVersion(1, 0)) + + self.assertFalse( + rules.is_attribute_deletable_by_client("Cryptographic Algorithm") + ) + self.assertTrue( + rules.is_attribute_deletable_by_client("Contact Information") + ) + def test_is_attribute_applicable_to_object_type(self): """ Test that is_attribute_applicable_to_object_type returns the