Add initial Revoke operation support for server.

See the TODO in _process_revoke for more info.
This commit is contained in:
Hao Shen 2017-03-31 10:18:00 -07:00
parent 2aabad714a
commit 0229a83acf
2 changed files with 329 additions and 0 deletions

View File

@ -39,6 +39,7 @@ from kmip.core.messages import contents
from kmip.core.messages import messages from kmip.core.messages import messages
from kmip.core.messages.payloads import activate from kmip.core.messages.payloads import activate
from kmip.core.messages.payloads import revoke
from kmip.core.messages.payloads import create from kmip.core.messages.payloads import create
from kmip.core.messages.payloads import create_key_pair from kmip.core.messages.payloads import create_key_pair
from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import destroy
@ -971,6 +972,8 @@ class KmipEngine(object):
return self._process_get_attribute_list(payload) return self._process_get_attribute_list(payload)
elif operation == enums.Operation.ACTIVATE: elif operation == enums.Operation.ACTIVATE:
return self._process_activate(payload) return self._process_activate(payload)
elif operation == enums.Operation.REVOKE:
return self._process_revoke(payload)
elif operation == enums.Operation.DESTROY: elif operation == enums.Operation.DESTROY:
return self._process_destroy(payload) return self._process_destroy(payload)
elif operation == enums.Operation.QUERY: elif operation == enums.Operation.QUERY:
@ -1500,6 +1503,61 @@ class KmipEngine(object):
return response_payload return response_payload
@_kmip_version_supported('1.0')
def _process_revoke(self, payload):
self._logger.info("Processing operation: Revoke")
revocation_code = None
if payload.revocation_reason and \
payload.revocation_reason.revocation_code:
revocation_code = payload.revocation_reason.revocation_code
else:
raise exceptions.InvalidField(
"revocation reason code must be specified"
)
if payload.unique_identifier:
unique_identifier = payload.unique_identifier.value
else:
unique_identifier = self._id_placeholder
managed_object = self._get_object_with_access_controls(
unique_identifier,
enums.Operation.REVOKE
)
object_type = managed_object._object_type
if not hasattr(managed_object, 'state'):
raise exceptions.IllegalOperation(
"An {0} object has no state and cannot be revoked.".format(
''.join(
[x.capitalize() for x in object_type.name.split('_')]
)
)
)
# TODO: need to set Compromise Date attribute or Deactivation Date
# attribute
if revocation_code.value is enums.RevocationReasonCode.KEY_COMPROMISE:
if managed_object.state == enums.State.DESTROYED:
managed_object.state = enums.State.DESTROYED_COMPROMISED
else:
managed_object.state = enums.State.COMPROMISED
else:
if managed_object.state != enums.State.ACTIVE:
raise exceptions.IllegalOperation(
"The object is not active and cannot be revoked with "
"reason other than KEY_COMPROMISE"
)
else:
managed_object.state = enums.State.DEACTIVATED
self._data_session.commit()
response_payload = revoke.RevokeResponsePayload(
unique_identifier=attributes.UniqueIdentifier(unique_identifier)
)
return response_payload
@_kmip_version_supported('1.0') @_kmip_version_supported('1.0')
def _process_destroy(self, payload): def _process_destroy(self, payload):
self._logger.info("Processing operation: Destroy") self._logger.info("Processing operation: Destroy")

View File

@ -31,6 +31,7 @@ from kmip.core import enums
from kmip.core import exceptions from kmip.core import exceptions
from kmip.core import misc from kmip.core import misc
from kmip.core import objects from kmip.core import objects
from kmip.core import primitives
from kmip.core import secrets from kmip.core import secrets
from kmip.core.factories import attributes as factory from kmip.core.factories import attributes as factory
@ -39,6 +40,7 @@ from kmip.core.messages import contents
from kmip.core.messages import messages from kmip.core.messages import messages
from kmip.core.messages.payloads import activate from kmip.core.messages.payloads import activate
from kmip.core.messages.payloads import revoke
from kmip.core.messages.payloads import create from kmip.core.messages.payloads import create
from kmip.core.messages.payloads import create_key_pair from kmip.core.messages.payloads import create_key_pair
from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import destroy
@ -4361,6 +4363,275 @@ class TestKmipEngine(testtools.TestCase):
*args *args
) )
def test_revoke(self):
"""
Test that an Revoke request can be processed correctly.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
managed_object = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.AES,
0,
b''
)
managed_object.state = enums.State.ACTIVE
e._data_session.add(managed_object)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
object_id = str(managed_object.unique_identifier)
reason_unspecified = objects.RevocationReason(
code=enums.RevocationReasonCode.UNSPECIFIED)
reason_compromise = objects.RevocationReason(
code=enums.RevocationReasonCode.KEY_COMPROMISE)
date = primitives.DateTime(
tag=enums.Tags.COMPROMISE_OCCURRENCE_DATE, value=6)
# Test that reason UNSPECIFIED will put object into state
# DEACTIVATED
payload = revoke.RevokeRequestPayload(
unique_identifier=attributes.UniqueIdentifier(object_id),
revocation_reason=reason_unspecified,
compromise_date=date)
response_payload = e._process_revoke(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_any_call(
"Processing operation: Revoke"
)
self.assertEqual(
str(object_id),
response_payload.unique_identifier.value
)
symmetric_key = e._data_session.query(
pie_objects.SymmetricKey
).filter(
pie_objects.ManagedObject.unique_identifier == object_id
).one()
self.assertEqual(enums.State.DEACTIVATED, symmetric_key.state)
# Test that reason KEY_COMPROMISE will put object not in DESTROYED
# state into state COMPROMISED
payload = revoke.RevokeRequestPayload(
unique_identifier=attributes.UniqueIdentifier(object_id),
revocation_reason=reason_compromise,
compromise_date=date)
response_payload = e._process_revoke(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_any_call(
"Processing operation: Revoke"
)
self.assertEqual(
str(object_id),
response_payload.unique_identifier.value
)
symmetric_key = e._data_session.query(
pie_objects.SymmetricKey
).filter(
pie_objects.ManagedObject.unique_identifier == object_id
).one()
self.assertEqual(enums.State.COMPROMISED, symmetric_key.state)
# Test that reason KEY_COMPROMISE will put object in DESTROYED
# state into state DESTROYED_COMPROMISED
symmetric_key.state = enums.State.DESTROYED
e._data_session.commit()
e._data_session = e._data_store_session_factory()
payload = revoke.RevokeRequestPayload(
unique_identifier=attributes.UniqueIdentifier(object_id),
revocation_reason=reason_compromise,
compromise_date=date)
response_payload = e._process_revoke(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_any_call(
"Processing operation: Revoke"
)
self.assertEqual(
str(object_id),
response_payload.unique_identifier.value
)
symmetric_key = e._data_session.query(
pie_objects.SymmetricKey
).filter(
pie_objects.ManagedObject.unique_identifier == object_id
).one()
self.assertEqual(enums.State.DESTROYED_COMPROMISED,
symmetric_key.state)
# Test that the ID placeholder can also be used to specify revocation.
symmetric_key.state = enums.State.ACTIVE
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._id_placeholder = str(object_id)
payload = revoke.RevokeRequestPayload(
revocation_reason=reason_unspecified,
compromise_date=date)
response_payload = e._process_revoke(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_any_call(
"Processing operation: Revoke"
)
self.assertEqual(
str(object_id),
response_payload.unique_identifier.value
)
symmetric_key = e._data_session.query(
pie_objects.SymmetricKey
).filter(
pie_objects.ManagedObject.unique_identifier == object_id
).one()
self.assertEqual(enums.State.DEACTIVATED, symmetric_key.state)
def test_revoke_on_not_active_object(self):
"""
Test that the right error is generated when an revocation request is
received for an object that is not active with the reason other than
KEY_COMPROMISE.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
managed_object = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.AES,
0,
b''
)
e._data_session.add(managed_object)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
self.assertEqual(enums.State.PRE_ACTIVE, managed_object.state)
object_id = str(managed_object.unique_identifier)
reason_unspecified = objects.RevocationReason(
code=enums.RevocationReasonCode.UNSPECIFIED)
date = primitives.DateTime(
tag=enums.Tags.COMPROMISE_OCCURRENCE_DATE, value=6)
payload = revoke.RevokeRequestPayload(
unique_identifier=attributes.UniqueIdentifier(object_id),
revocation_reason=reason_unspecified,
compromise_date=date)
args = (payload, )
regex = "The object is not active and cannot be revoked with " \
"reason other than KEY_COMPROMISE"
self.assertRaisesRegexp(
exceptions.IllegalOperation,
regex,
e._process_revoke,
*args
)
def test_revoke_on_static_object(self):
"""
Test that the right error is generated when an revoke request is
received for an object that cannot be revoked.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
managed_object = pie_objects.OpaqueObject(
b'',
enums.OpaqueDataType.NONE
)
e._data_session.add(managed_object)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
object_id = str(managed_object.unique_identifier)
reason_unspecified = objects.RevocationReason(
code=enums.RevocationReasonCode.UNSPECIFIED)
date = primitives.DateTime(
tag=enums.Tags.COMPROMISE_OCCURRENCE_DATE, value=6)
payload = revoke.RevokeRequestPayload(
unique_identifier=attributes.UniqueIdentifier(object_id),
revocation_reason=reason_unspecified,
compromise_date=date)
args = (payload,)
name = enums.ObjectType.OPAQUE_DATA.name
regex = "An {0} object has no state and cannot be revoked.".format(
''.join(
[x.capitalize() for x in name.split('_')]
)
)
self.assertRaisesRegexp(
exceptions.IllegalOperation,
regex,
e._process_revoke,
*args
)
def test_revoke_not_allowed_by_policy(self):
"""
Test that an unallowed request is handled correctly by Revoke.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._client_identity = 'test'
obj_a = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE)
obj_a._owner = 'admin'
e._data_session.add(obj_a)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
id_a = str(obj_a.unique_identifier)
reason_unspecified = objects.RevocationReason(
code=enums.RevocationReasonCode.UNSPECIFIED)
date = primitives.DateTime(
tag=enums.Tags.COMPROMISE_OCCURRENCE_DATE, value=6)
payload = revoke.RevokeRequestPayload(
unique_identifier=attributes.UniqueIdentifier(id_a),
revocation_reason=reason_unspecified,
compromise_date=date)
args = [payload]
self.assertRaisesRegex(
exceptions.ItemNotFound,
"Could not locate object: {0}".format(id_a),
e._process_revoke,
*args
)
def test_destroy(self): def test_destroy(self):
""" """
Test that a Destroy request can be processed correctly. Test that a Destroy request can be processed correctly.