Adding server support for the Activate operation

This change adds support for the Activate operation to the server,
including additional server unit tests, a new IllegalOperation
exception, and updated state attribute support in Pie objects.

Fixes #186
This commit is contained in:
Peter Hamilton 2016-08-28 12:39:00 -04:00
parent 2a7c45e97a
commit b7cc542d9a
4 changed files with 255 additions and 1 deletions

View File

@ -57,6 +57,26 @@ class CryptographicFailure(KmipError):
) )
class IllegalOperation(KmipError):
"""
An error generated when an improper operation is attempted. The operation
can be 'illegal' for various reasons, including invalid permissions or
literal object/operation mismatch (e.g., a Template item cannot be
activated with the Activate operation since it has no state).
"""
def __init__(self, message):
"""
Create an IllegalOperation exception.
Args:
message (string): A string containing information about the error.
"""
super(IllegalOperation, self).__init__(
reason=enums.ResultReason.ILLEGAL_OPERATION,
message=message
)
class IndexOutOfBounds(KmipError): class IndexOutOfBounds(KmipError):
""" """
An error generated when exceeding the attribute instance limit. An error generated when exceeding the attribute instance limit.

View File

@ -159,6 +159,7 @@ class CryptographicObject(ManagedObject):
primary_key=True) primary_key=True)
cryptographic_usage_masks = Column('cryptographic_usage_mask', cryptographic_usage_masks = Column('cryptographic_usage_mask',
sql.UsageMaskType) sql.UsageMaskType)
state = Column('state', sql.EnumType(enums.State))
__mapper_args__ = { __mapper_args__ = {
'polymorphic_identity': 'CryptographicObject' 'polymorphic_identity': 'CryptographicObject'
} }
@ -175,6 +176,7 @@ class CryptographicObject(ManagedObject):
super(CryptographicObject, self).__init__() super(CryptographicObject, self).__init__()
self.cryptographic_usage_masks = list() self.cryptographic_usage_masks = list()
self.state = enums.State.PRE_ACTIVE
# All remaining attributes are not considered part of the public API # All remaining attributes are not considered part of the public API
# and are subject to change. # and are subject to change.
@ -191,7 +193,6 @@ class CryptographicObject(ManagedObject):
self._lease_time = None self._lease_time = None
self._links = list() self._links = list()
self._revocation_reason = None self._revocation_reason = None
self._state = None
class Key(CryptographicObject): class Key(CryptographicObject):

View File

@ -34,6 +34,7 @@ from kmip.core.factories import secrets
from kmip.core.messages import contents from kmip.core.messages import contents
from kmip.core.messages import messages from kmip.core.messages import messages
from kmip.core.messages.payloads import activate
from kmip.core.messages.payloads import create from kmip.core.messages.payloads import create
from kmip.core.messages.payloads import create_key_pair from kmip.core.messages.payloads import create_key_pair
from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import destroy
@ -1186,6 +1187,59 @@ class KmipEngine(object):
return response_payload return response_payload
@_kmip_version_supported('1.0')
def _process_activate(self, payload):
self._logger.info("Processing operation: Activate")
if payload.unique_identifier:
unique_identifier = payload.unique_identifier.value
else:
unique_identifier = self._id_placeholder
object_type = self._get_object_type(unique_identifier)
managed_object = self._data_session.query(object_type).filter(
object_type.unique_identifier == unique_identifier
).one()
# Determine if the request should be carried out under the object's
# operation policy. If not, feign ignorance of the object.
is_allowed = self._is_allowed_by_operation_policy(
managed_object.operation_policy_name,
self._client_identity,
managed_object._owner,
managed_object._object_type,
enums.Operation.ACTIVATE
)
if not is_allowed:
raise exceptions.ItemNotFound(
"Could not locate object: {0}".format(unique_identifier)
)
object_type = managed_object._object_type
if not hasattr(managed_object, 'state'):
raise exceptions.IllegalOperation(
"An {0} object has no state and cannot be activated.".format(
''.join(
[x.capitalize() for x in object_type.name.split('_')]
)
)
)
if managed_object.state != enums.State.PRE_ACTIVE:
raise exceptions.PermissionDenied(
"The object state is not pre-active and cannot be activated."
)
managed_object.state = enums.State.ACTIVE
self._data_session.commit()
response_payload = activate.ActivateResponsePayload(
unique_identifier=attributes.UniqueIdentifier(unique_identifier)
)
return response_payload
@_kmip_version_supported('1.0') @_kmip_version_supported('1.0')
def _process_destroy(self, payload): def _process_destroy(self, payload):
self._logger.info("Processing operation: Destroy") self._logger.info("Processing operation: Destroy")

View File

@ -37,6 +37,7 @@ from kmip.core.factories import attributes as factory
from kmip.core.messages import contents from kmip.core.messages import contents
from kmip.core.messages import messages from kmip.core.messages import messages
from kmip.core.messages.payloads import activate
from kmip.core.messages.payloads import create from kmip.core.messages.payloads import create
from kmip.core.messages.payloads import create_key_pair from kmip.core.messages.payloads import create_key_pair
from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import destroy
@ -3253,6 +3254,184 @@ class TestKmipEngine(testtools.TestCase):
*args *args
) )
def test_activate(self):
"""
Test that an Activate request can be processed correctly.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
managed_object = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.AES,
0,
b''
)
e._data_session.add(managed_object)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
self.assertEqual(enums.State.PRE_ACTIVE, managed_object.state)
object_id = str(managed_object.unique_identifier)
# Test by specifying the ID of the object to activate.
payload = activate.ActivateRequestPayload(
unique_identifier=attributes.UniqueIdentifier(object_id)
)
response_payload = e._process_activate(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_any_call(
"Processing operation: Activate"
)
self.assertEqual(
str(object_id),
response_payload.unique_identifier.value
)
symmetric_key = e._data_session.query(
pie_objects.SymmetricKey
).filter(
pie_objects.ManagedObject.unique_identifier == object_id
).one()
self.assertEqual(enums.State.ACTIVE, symmetric_key.state)
args = (payload,)
regex = "The object state is not pre-active and cannot be activated."
self.assertRaisesRegexp(
exceptions.PermissionDenied,
regex,
e._process_activate,
*args
)
# Test that the ID placeholder can also be used to specify activation.
e._id_placeholder = str(object_id)
payload = activate.ActivateRequestPayload()
args = (payload,)
regex = "The object state is not pre-active and cannot be activated."
self.assertRaisesRegexp(
exceptions.PermissionDenied,
regex,
e._process_activate,
*args
)
def test_activate_on_static_object(self):
"""
Test that the right error is generated when an activation request is
received for an object that cannot be activated.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
managed_object = pie_objects.OpaqueObject(
b'',
enums.OpaqueDataType.NONE
)
e._data_session.add(managed_object)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
object_id = str(managed_object.unique_identifier)
# Test by specifying the ID of the object to activate.
payload = activate.ActivateRequestPayload(
unique_identifier=attributes.UniqueIdentifier(object_id)
)
args = (payload,)
name = enums.ObjectType.OPAQUE_DATA.name
regex = "An {0} object has no state and cannot be activated.".format(
''.join(
[x.capitalize() for x in name.split('_')]
)
)
self.assertRaisesRegexp(
exceptions.IllegalOperation,
regex,
e._process_activate,
*args
)
def test_activate_on_active_object(self):
"""
Test that the right error is generated when an activation request is
received for an object that is not pre-active.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
managed_object = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.AES,
0,
b''
)
managed_object.state = enums.State.ACTIVE
e._data_session.add(managed_object)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
object_id = str(managed_object.unique_identifier)
# Test by specifying the ID of the object to activate.
payload = activate.ActivateRequestPayload(
unique_identifier=attributes.UniqueIdentifier(object_id)
)
args = (payload,)
regex = "The object state is not pre-active and cannot be activated."
self.assertRaisesRegexp(
exceptions.PermissionDenied,
regex,
e._process_activate,
*args
)
def test_activate_not_allowed_by_policy(self):
"""
Test that an unallowed request is handled correctly by Activate.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
e._client_identity = 'test'
obj_a = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE)
obj_a._owner = 'admin'
e._data_session.add(obj_a)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
id_a = str(obj_a.unique_identifier)
payload = activate.ActivateRequestPayload(
unique_identifier=attributes.UniqueIdentifier(id_a)
)
# Test by specifying the ID of the object to activate.
args = [payload]
self.assertRaisesRegex(
exceptions.ItemNotFound,
"Could not locate object: {0}".format(id_a),
e._process_activate,
*args
)
def test_destroy(self): def test_destroy(self):
""" """
Test that a Destroy request can be processed correctly. Test that a Destroy request can be processed correctly.