diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index 5d73fc7..414fcb7 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -1567,19 +1567,27 @@ class KmipEngine(object): else: unique_identifier = self._id_placeholder - self._get_object_with_access_controls( + managed_object = self._get_object_with_access_controls( unique_identifier, enums.Operation.DESTROY ) - # TODO (peterhamilton) Process attributes to see if destroy possible - # 1. Check object state. If invalid, error out. - # 2. Check object deactivation date. If invalid, error out. + # TODO If in "ACTIVE" state, we need to check its "Deactivation date" + # to see whether it can be destroyed or not + if hasattr(managed_object, 'state'): + if managed_object.state == enums.State.ACTIVE: + raise exceptions.PermissionDenied( + "Object is active and cannot be destroyed." + ) + + # 'OpaqueObject' object has no attribute 'state' + if hasattr(managed_object, 'state') and \ + managed_object.state == enums.State.COMPROMISED: + managed_object.state = enums.State.DESTROYED_COMPROMISED self._logger.info( "Destroying an object with ID: {0}".format(unique_identifier) ) - self._data_session.query(objects.ManagedObject).filter( objects.ManagedObject.unique_identifier == unique_identifier ).delete() diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index c0c5824..a19c96b 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -4644,14 +4644,21 @@ class TestKmipEngine(testtools.TestCase): obj_a = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE) obj_b = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE) + key = (b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x00\x00\x00\x00\x00') + algorithm = enums.CryptographicAlgorithm.AES + obj_c = pie_objects.SymmetricKey(algorithm, 128, key) + obj_c.state = enums.State.COMPROMISED e._data_session.add(obj_a) e._data_session.add(obj_b) + e._data_session.add(obj_c) e._data_session.commit() e._data_session = e._data_store_session_factory() id_a = str(obj_a.unique_identifier) id_b = str(obj_b.unique_identifier) + id_c = str(obj_c.unique_identifier) # Test by specifying the ID of the object to destroy. payload = destroy.DestroyRequestPayload( @@ -4705,6 +4712,33 @@ class TestKmipEngine(testtools.TestCase): ) e._data_session.commit() + e._data_store_session_factory() + e._logger.reset_mock() + + # Test that compromised object can be destroyed properly + payload = destroy.DestroyRequestPayload( + unique_identifier=attributes.UniqueIdentifier(id_c) + ) + response_payload = e._process_destroy(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call( + "Processing operation: Destroy" + ) + self.assertEqual(str(id_c), response_payload.unique_identifier.value) + + args = (payload, ) + regex = "Could not locate object: {0}".format(id_c) + six.assertRaisesRegex( + self, + exceptions.ItemNotFound, + regex, + e._process_destroy, + *args + ) + + e._data_session.commit() def test_destroy_not_allowed_by_policy(self): """ @@ -4739,6 +4773,44 @@ class TestKmipEngine(testtools.TestCase): *args ) + def test_destroy_active_state(self): + """ + Test that the right error is generated when destroy request is + received for an object that is in 'active' state. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + key = (b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x00\x00\x00\x00\x00') + algorithm = enums.CryptographicAlgorithm.AES + obj = pie_objects.SymmetricKey(algorithm, 128, key) + obj.state = enums.State.ACTIVE + + e._data_session.add(obj) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + id = str(obj.unique_identifier) + + # Test by specifying the ID of the object to destroy. + payload = destroy.DestroyRequestPayload( + unique_identifier=attributes.UniqueIdentifier(id) + ) + + args = (payload, ) + regex = "Object is active and cannot be destroyed." + six.assertRaisesRegex( + self, + exceptions.PermissionDenied, + regex, + e._process_destroy, + *args + ) + def test_query_1_0(self): """ Test that a Query request can be processed correctly, for KMIP 1.0.