mirror of https://github.com/OpenKMIP/PyKMIP.git
Add state check for Destroy operation.
This commit is contained in:
parent
e26e0748af
commit
3897455469
|
@ -1509,19 +1509,27 @@ class KmipEngine(object):
|
|||
else:
|
||||
unique_identifier = self._id_placeholder
|
||||
|
||||
self._get_object_with_access_controls(
|
||||
managed_object = self._get_object_with_access_controls(
|
||||
unique_identifier,
|
||||
enums.Operation.DESTROY
|
||||
)
|
||||
|
||||
# TODO (peterhamilton) Process attributes to see if destroy possible
|
||||
# 1. Check object state. If invalid, error out.
|
||||
# 2. Check object deactivation date. If invalid, error out.
|
||||
# TODO If in "ACTIVE" state, we need to check its "Deactivation date"
|
||||
# to see whether it can be destroyed or not
|
||||
if hasattr(managed_object, 'state'):
|
||||
if managed_object.state == enums.State.ACTIVE:
|
||||
raise exceptions.PermissionDenied(
|
||||
"Object is active and cannot be destroyed."
|
||||
)
|
||||
|
||||
# 'OpaqueObject' object has no attribute 'state'
|
||||
if hasattr(managed_object, 'state') and \
|
||||
managed_object.state == enums.State.COMPROMISED:
|
||||
managed_object.state = enums.State.DESTROYED_COMPROMISED
|
||||
|
||||
self._logger.info(
|
||||
"Destroying an object with ID: {0}".format(unique_identifier)
|
||||
)
|
||||
|
||||
self._data_session.query(objects.ManagedObject).filter(
|
||||
objects.ManagedObject.unique_identifier == unique_identifier
|
||||
).delete()
|
||||
|
|
|
@ -4373,14 +4373,21 @@ class TestKmipEngine(testtools.TestCase):
|
|||
|
||||
obj_a = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE)
|
||||
obj_b = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE)
|
||||
key = (b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||
b'\x00\x00\x00\x00\x00')
|
||||
algorithm = enums.CryptographicAlgorithm.AES
|
||||
obj_c = pie_objects.SymmetricKey(algorithm, 128, key)
|
||||
obj_c.state = enums.State.COMPROMISED
|
||||
|
||||
e._data_session.add(obj_a)
|
||||
e._data_session.add(obj_b)
|
||||
e._data_session.add(obj_c)
|
||||
e._data_session.commit()
|
||||
e._data_session = e._data_store_session_factory()
|
||||
|
||||
id_a = str(obj_a.unique_identifier)
|
||||
id_b = str(obj_b.unique_identifier)
|
||||
id_c = str(obj_c.unique_identifier)
|
||||
|
||||
# Test by specifying the ID of the object to destroy.
|
||||
payload = destroy.DestroyRequestPayload(
|
||||
|
@ -4434,6 +4441,33 @@ class TestKmipEngine(testtools.TestCase):
|
|||
)
|
||||
|
||||
e._data_session.commit()
|
||||
e._data_store_session_factory()
|
||||
e._logger.reset_mock()
|
||||
|
||||
# Test that compromised object can be destroyed properly
|
||||
payload = destroy.DestroyRequestPayload(
|
||||
unique_identifier=attributes.UniqueIdentifier(id_c)
|
||||
)
|
||||
response_payload = e._process_destroy(payload)
|
||||
e._data_session.commit()
|
||||
e._data_session = e._data_store_session_factory()
|
||||
|
||||
e._logger.info.assert_any_call(
|
||||
"Processing operation: Destroy"
|
||||
)
|
||||
self.assertEqual(str(id_c), response_payload.unique_identifier.value)
|
||||
|
||||
args = (payload, )
|
||||
regex = "Could not locate object: {0}".format(id_c)
|
||||
six.assertRaisesRegex(
|
||||
self,
|
||||
exceptions.ItemNotFound,
|
||||
regex,
|
||||
e._process_destroy,
|
||||
*args
|
||||
)
|
||||
|
||||
e._data_session.commit()
|
||||
|
||||
def test_destroy_not_allowed_by_policy(self):
|
||||
"""
|
||||
|
@ -4468,6 +4502,44 @@ class TestKmipEngine(testtools.TestCase):
|
|||
*args
|
||||
)
|
||||
|
||||
def test_destroy_active_state(self):
|
||||
"""
|
||||
Test that the right error is generated when destroy request is
|
||||
received for an object that is in 'active' state.
|
||||
"""
|
||||
e = engine.KmipEngine()
|
||||
e._data_store = self.engine
|
||||
e._data_store_session_factory = self.session_factory
|
||||
e._data_session = e._data_store_session_factory()
|
||||
e._logger = mock.MagicMock()
|
||||
|
||||
key = (b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||
b'\x00\x00\x00\x00\x00')
|
||||
algorithm = enums.CryptographicAlgorithm.AES
|
||||
obj = pie_objects.SymmetricKey(algorithm, 128, key)
|
||||
obj.state = enums.State.ACTIVE
|
||||
|
||||
e._data_session.add(obj)
|
||||
e._data_session.commit()
|
||||
e._data_session = e._data_store_session_factory()
|
||||
|
||||
id = str(obj.unique_identifier)
|
||||
|
||||
# Test by specifying the ID of the object to destroy.
|
||||
payload = destroy.DestroyRequestPayload(
|
||||
unique_identifier=attributes.UniqueIdentifier(id)
|
||||
)
|
||||
|
||||
args = (payload, )
|
||||
regex = "Object is active and cannot be destroyed."
|
||||
six.assertRaisesRegex(
|
||||
self,
|
||||
exceptions.PermissionDenied,
|
||||
regex,
|
||||
e._process_destroy,
|
||||
*args
|
||||
)
|
||||
|
||||
def test_query_1_0(self):
|
||||
"""
|
||||
Test that a Query request can be processed correctly, for KMIP 1.0.
|
||||
|
|
Loading…
Reference in New Issue