mirror of
https://github.com/OpenKMIP/PyKMIP.git
synced 2025-06-28 17:54:23 +02:00
Merge pull request #274 from vbnmmnbv/destroy_state_permission_server
Add state check for Destroy operation.
This commit is contained in:
commit
a7c627d28f
@ -1567,19 +1567,27 @@ class KmipEngine(object):
|
|||||||
else:
|
else:
|
||||||
unique_identifier = self._id_placeholder
|
unique_identifier = self._id_placeholder
|
||||||
|
|
||||||
self._get_object_with_access_controls(
|
managed_object = self._get_object_with_access_controls(
|
||||||
unique_identifier,
|
unique_identifier,
|
||||||
enums.Operation.DESTROY
|
enums.Operation.DESTROY
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO (peterhamilton) Process attributes to see if destroy possible
|
# TODO If in "ACTIVE" state, we need to check its "Deactivation date"
|
||||||
# 1. Check object state. If invalid, error out.
|
# to see whether it can be destroyed or not
|
||||||
# 2. Check object deactivation date. If invalid, error out.
|
if hasattr(managed_object, 'state'):
|
||||||
|
if managed_object.state == enums.State.ACTIVE:
|
||||||
|
raise exceptions.PermissionDenied(
|
||||||
|
"Object is active and cannot be destroyed."
|
||||||
|
)
|
||||||
|
|
||||||
|
# 'OpaqueObject' object has no attribute 'state'
|
||||||
|
if hasattr(managed_object, 'state') and \
|
||||||
|
managed_object.state == enums.State.COMPROMISED:
|
||||||
|
managed_object.state = enums.State.DESTROYED_COMPROMISED
|
||||||
|
|
||||||
self._logger.info(
|
self._logger.info(
|
||||||
"Destroying an object with ID: {0}".format(unique_identifier)
|
"Destroying an object with ID: {0}".format(unique_identifier)
|
||||||
)
|
)
|
||||||
|
|
||||||
self._data_session.query(objects.ManagedObject).filter(
|
self._data_session.query(objects.ManagedObject).filter(
|
||||||
objects.ManagedObject.unique_identifier == unique_identifier
|
objects.ManagedObject.unique_identifier == unique_identifier
|
||||||
).delete()
|
).delete()
|
||||||
|
@ -4644,14 +4644,21 @@ class TestKmipEngine(testtools.TestCase):
|
|||||||
|
|
||||||
obj_a = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE)
|
obj_a = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE)
|
||||||
obj_b = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE)
|
obj_b = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE)
|
||||||
|
key = (b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||||
|
b'\x00\x00\x00\x00\x00')
|
||||||
|
algorithm = enums.CryptographicAlgorithm.AES
|
||||||
|
obj_c = pie_objects.SymmetricKey(algorithm, 128, key)
|
||||||
|
obj_c.state = enums.State.COMPROMISED
|
||||||
|
|
||||||
e._data_session.add(obj_a)
|
e._data_session.add(obj_a)
|
||||||
e._data_session.add(obj_b)
|
e._data_session.add(obj_b)
|
||||||
|
e._data_session.add(obj_c)
|
||||||
e._data_session.commit()
|
e._data_session.commit()
|
||||||
e._data_session = e._data_store_session_factory()
|
e._data_session = e._data_store_session_factory()
|
||||||
|
|
||||||
id_a = str(obj_a.unique_identifier)
|
id_a = str(obj_a.unique_identifier)
|
||||||
id_b = str(obj_b.unique_identifier)
|
id_b = str(obj_b.unique_identifier)
|
||||||
|
id_c = str(obj_c.unique_identifier)
|
||||||
|
|
||||||
# Test by specifying the ID of the object to destroy.
|
# Test by specifying the ID of the object to destroy.
|
||||||
payload = destroy.DestroyRequestPayload(
|
payload = destroy.DestroyRequestPayload(
|
||||||
@ -4705,6 +4712,33 @@ class TestKmipEngine(testtools.TestCase):
|
|||||||
)
|
)
|
||||||
|
|
||||||
e._data_session.commit()
|
e._data_session.commit()
|
||||||
|
e._data_store_session_factory()
|
||||||
|
e._logger.reset_mock()
|
||||||
|
|
||||||
|
# Test that compromised object can be destroyed properly
|
||||||
|
payload = destroy.DestroyRequestPayload(
|
||||||
|
unique_identifier=attributes.UniqueIdentifier(id_c)
|
||||||
|
)
|
||||||
|
response_payload = e._process_destroy(payload)
|
||||||
|
e._data_session.commit()
|
||||||
|
e._data_session = e._data_store_session_factory()
|
||||||
|
|
||||||
|
e._logger.info.assert_any_call(
|
||||||
|
"Processing operation: Destroy"
|
||||||
|
)
|
||||||
|
self.assertEqual(str(id_c), response_payload.unique_identifier.value)
|
||||||
|
|
||||||
|
args = (payload, )
|
||||||
|
regex = "Could not locate object: {0}".format(id_c)
|
||||||
|
six.assertRaisesRegex(
|
||||||
|
self,
|
||||||
|
exceptions.ItemNotFound,
|
||||||
|
regex,
|
||||||
|
e._process_destroy,
|
||||||
|
*args
|
||||||
|
)
|
||||||
|
|
||||||
|
e._data_session.commit()
|
||||||
|
|
||||||
def test_destroy_not_allowed_by_policy(self):
|
def test_destroy_not_allowed_by_policy(self):
|
||||||
"""
|
"""
|
||||||
@ -4739,6 +4773,44 @@ class TestKmipEngine(testtools.TestCase):
|
|||||||
*args
|
*args
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def test_destroy_active_state(self):
|
||||||
|
"""
|
||||||
|
Test that the right error is generated when destroy request is
|
||||||
|
received for an object that is in 'active' state.
|
||||||
|
"""
|
||||||
|
e = engine.KmipEngine()
|
||||||
|
e._data_store = self.engine
|
||||||
|
e._data_store_session_factory = self.session_factory
|
||||||
|
e._data_session = e._data_store_session_factory()
|
||||||
|
e._logger = mock.MagicMock()
|
||||||
|
|
||||||
|
key = (b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||||
|
b'\x00\x00\x00\x00\x00')
|
||||||
|
algorithm = enums.CryptographicAlgorithm.AES
|
||||||
|
obj = pie_objects.SymmetricKey(algorithm, 128, key)
|
||||||
|
obj.state = enums.State.ACTIVE
|
||||||
|
|
||||||
|
e._data_session.add(obj)
|
||||||
|
e._data_session.commit()
|
||||||
|
e._data_session = e._data_store_session_factory()
|
||||||
|
|
||||||
|
id = str(obj.unique_identifier)
|
||||||
|
|
||||||
|
# Test by specifying the ID of the object to destroy.
|
||||||
|
payload = destroy.DestroyRequestPayload(
|
||||||
|
unique_identifier=attributes.UniqueIdentifier(id)
|
||||||
|
)
|
||||||
|
|
||||||
|
args = (payload, )
|
||||||
|
regex = "Object is active and cannot be destroyed."
|
||||||
|
six.assertRaisesRegex(
|
||||||
|
self,
|
||||||
|
exceptions.PermissionDenied,
|
||||||
|
regex,
|
||||||
|
e._process_destroy,
|
||||||
|
*args
|
||||||
|
)
|
||||||
|
|
||||||
def test_query_1_0(self):
|
def test_query_1_0(self):
|
||||||
"""
|
"""
|
||||||
Test that a Query request can be processed correctly, for KMIP 1.0.
|
Test that a Query request can be processed correctly, for KMIP 1.0.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user