diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index 3265c57..8b15de6 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -14,6 +14,7 @@ # under the License. import logging +import six import sqlalchemy from sqlalchemy.orm import exc @@ -35,12 +36,15 @@ from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import discover_versions from kmip.core.messages.payloads import get from kmip.core.messages.payloads import query +from kmip.core.messages.payloads import register from kmip.core import misc -from kmip.pie import sqltypes +from kmip.pie import factory from kmip.pie import objects +from kmip.pie import sqltypes +from kmip.services.server import policy from kmip.services.server.crypto import engine @@ -63,6 +67,9 @@ class KmipEngine(object): * Key compression * Key wrapping * Key format conversions + * Registration of empty managed objects (e.g., Private Keys) + * Managed object state tracking + * Managed object usage limit tracking and enforcement """ def __init__(self): @@ -105,6 +112,8 @@ class KmipEngine(object): enums.ObjectType.OPAQUE_DATA: objects.OpaqueObject } + self._attribute_policy = policy.AttributePolicy(self._protocol_version) + def _kmip_version_supported(supported): def decorator(function): def wrapper(self, *args, **kwargs): @@ -133,6 +142,9 @@ class KmipEngine(object): def _set_protocol_version(self, protocol_version): if protocol_version in self._protocol_versions: self._protocol_version = protocol_version + self._attribute_policy = policy.AttributePolicy( + self._protocol_version + ) else: raise exceptions.InvalidMessage( "KMIP {0} is not supported by the server.".format( @@ -479,8 +491,134 @@ class KmipEngine(object): secret_factory = secrets.SecretFactory() return secret_factory.create(object_type, value) + def _process_template_attribute(self, template_attribute): + """ + Given a kmip.core TemplateAttribute object, extract the attribute + value data into a usable dictionary format. + """ + attributes = {} + + if len(template_attribute.names) > 0: + raise exceptions.ItemNotFound( + "Attribute templates are not supported." + ) + + for attribute in template_attribute.attributes: + name = attribute.attribute_name.value + + if not self._attribute_policy.is_attribute_supported(name): + raise exceptions.InvalidField( + "The {0} attribute is unsupported.".format(name) + ) + + if self._attribute_policy.is_attribute_multivalued(name): + values = attributes.get(name, list()) + if (not attribute.attribute_index) and len(values) > 0: + raise exceptions.InvalidField( + "Attribute index missing from multivalued attribute." + ) + + values.append(attribute.attribute_value) + attributes.update([(name, values)]) + else: + if attribute.attribute_index: + if attribute.attribute_index.value != 0: + raise exceptions.InvalidField( + "Non-zero attribute index found for " + "single-valued attribute." + ) + value = attributes.get(name, None) + if value: + raise exceptions.IndexOutOfBounds( + "Cannot set multiple instances of the " + "{0} attribute.".format(name) + ) + else: + attributes.update([(name, attribute.attribute_value)]) + + return attributes + + def _set_attributes_on_managed_object(self, managed_object, attributes): + """ + Given a kmip.pie object and a dictionary of attributes, attempt to set + the attribute values on the object. + """ + for attribute_name, attribute_value in six.iteritems(attributes): + object_type = managed_object._object_type + if self._attribute_policy.is_attribute_applicable_to_object_type( + attribute_name, + object_type): + self._set_attribute_on_managed_object( + managed_object, + (attribute_name, attribute_value) + ) + else: + name = object_type.name + raise exceptions.InvalidField( + "Cannot set {0} attribute on {1} object.".format( + attribute_name, + ''.join([x.capitalize() for x in name.split('_')]) + ) + ) + + def _set_attribute_on_managed_object(self, managed_object, attribute): + """ + Set the attribute value on the kmip.pie managed object. + """ + attribute_name = attribute[0] + attribute_value = attribute[1] + + if self._attribute_policy.is_attribute_multivalued(attribute_name): + if attribute_name == 'Name': + managed_object.names.extend( + [x.name_value.value for x in attribute_value] + ) + for name in managed_object.names: + if managed_object.names.count(name) > 1: + raise exceptions.InvalidField( + "Cannot set duplicate name values." + ) + else: + # TODO (peterhamilton) Remove when all attributes are supported + raise exceptions.InvalidField( + "The {0} attribute is unsupported.".format(attribute_name) + ) + else: + field = None + value = attribute_value.value + + if attribute_name == 'Cryptographic Algorithm': + field = 'cryptographic_algorithm' + elif attribute_name == 'Cryptographic Length': + field = 'cryptographic_length' + elif attribute_name == 'Cryptographic Usage Mask': + field = 'cryptographic_usage_masks' + value = list() + for e in enums.CryptographicUsageMask: + if e.value & attribute_value.value: + value.append(e) + + if field: + existing_value = getattr(managed_object, field) + if existing_value: + if existing_value != value: + raise exceptions.InvalidField( + "Cannot overwrite the {0} attribute.".format( + attribute_name + ) + ) + else: + setattr(managed_object, field, value) + else: + # TODO (peterhamilton) Remove when all attributes are supported + raise exceptions.InvalidField( + "The {0} attribute is unsupported.".format(attribute_name) + ) + def _process_operation(self, operation, payload): - if operation == enums.Operation.GET: + if operation == enums.Operation.REGISTER: + return self._process_register(payload) + elif operation == enums.Operation.GET: return self._process_get(payload) elif operation == enums.Operation.DESTROY: return self._process_destroy(payload) @@ -495,6 +633,65 @@ class KmipEngine(object): ) ) + @_kmip_version_supported('1.0') + def _process_register(self, payload): + self._logger.info("Processing operation: Register") + + object_type = payload.object_type.value + template_attribute = payload.template_attribute + + if self._object_map.get(object_type) is None: + name = object_type.name + raise exceptions.InvalidField( + "The {0} object type is not supported.".format( + ''.join( + [x.capitalize() for x in name.split('_')] + ) + ) + ) + + if payload.secret: + secret = payload.secret + else: + # TODO (peterhamilton) It is possible to register 'empty' secrets + # like Private Keys. For now, that feature is not supported. + raise exceptions.InvalidField( + "Cannot register a secret in absentia." + ) + + object_attributes = {} + if template_attribute: + object_attributes = self._process_template_attribute( + template_attribute + ) + + managed_object_factory = factory.ObjectFactory() + managed_object = managed_object_factory.convert(secret) + managed_object.names = [] + + self._set_attributes_on_managed_object( + managed_object, + object_attributes + ) + + # TODO (peterhamilton) Set additional server-only attributes. + + self._data_session.add(managed_object) + + # NOTE (peterhamilton) SQLAlchemy will *not* assign an ID until + # commit is called. This makes future support for UNDO problematic. + self._data_session.commit() + + response_payload = register.RegisterResponsePayload( + unique_identifier=attributes.UniqueIdentifier( + str(managed_object.unique_identifier) + ) + ) + + self._id_placeholder = str(managed_object.unique_identifier) + + return response_payload + @_kmip_version_supported('1.0') def _process_get(self, payload): self._logger.info("Processing operation: Get") @@ -592,6 +789,7 @@ class KmipEngine(object): if enums.QueryFunction.QUERY_OPERATIONS in queries: operations = list([ + contents.Operation(enums.Operation.REGISTER), contents.Operation(enums.Operation.GET), contents.Operation(enums.Operation.DESTROY), contents.Operation(enums.Operation.QUERY) diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index a8c3b96..b287c35 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -30,6 +30,8 @@ from kmip.core import misc from kmip.core import objects from kmip.core import secrets +from kmip.core.factories import attributes as factory + from kmip.core.messages import contents from kmip.core.messages import messages @@ -37,6 +39,7 @@ from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import discover_versions from kmip.core.messages.payloads import get from kmip.core.messages.payloads import query +from kmip.core.messages.payloads import register from kmip.pie import objects as pie_objects from kmip.pie import sqltypes @@ -640,16 +643,19 @@ class TestKmipEngine(testtools.TestCase): e = engine.KmipEngine() e._logger = mock.MagicMock() + e._process_register = mock.MagicMock() e._process_get = mock.MagicMock() e._process_destroy = mock.MagicMock() e._process_query = mock.MagicMock() e._process_discover_versions = mock.MagicMock() + e._process_operation(enums.Operation.REGISTER, None) e._process_operation(enums.Operation.GET, None) e._process_operation(enums.Operation.DESTROY, None) e._process_operation(enums.Operation.QUERY, None) e._process_operation(enums.Operation.DISCOVER_VERSIONS, None) + e._process_register.assert_called_with(None) e._process_get.assert_called_with(None) e._process_destroy.assert_called_with(None) e._process_query.assert_called_with(None) @@ -942,6 +948,634 @@ class TestKmipEngine(testtools.TestCase): *args ) + def test_process_template_attribute(self): + """ + Test that a template attribute structure can be processed correctly. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + attribute_factory = factory.AttributeFactory() + + name = attribute_factory.create_attribute( + enums.AttributeType.NAME, + attributes.Name.create( + 'Test Symmetric Key', + enums.NameType.UNINTERPRETED_TEXT_STRING + ) + ) + algorithm = attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.AES + ) + length = attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_LENGTH, + 128 + ) + mask = attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [ + enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.DECRYPT + ] + ) + template_attribute = objects.TemplateAttribute( + attributes=[name, algorithm, length, mask] + ) + + result = e._process_template_attribute(template_attribute) + + self.assertIsInstance(result, dict) + self.assertEqual(4, len(result.keys())) + self.assertIn('Name', result.keys()) + self.assertIn('Cryptographic Algorithm', result.keys()) + self.assertIn('Cryptographic Length', result.keys()) + self.assertIn('Cryptographic Usage Mask', result.keys()) + + self.assertEqual([name.attribute_value], result.get('Name')) + self.assertEqual( + algorithm.attribute_value, + result.get('Cryptographic Algorithm') + ) + self.assertEqual( + length.attribute_value, + result.get('Cryptographic Length') + ) + self.assertEqual( + mask.attribute_value, + result.get('Cryptographic Usage Mask') + ) + + def test_process_template_attribute_unsupported_features(self): + """ + Test that the right errors are generated when unsupported features + are referenced while processing a template attribute. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + attribute_factory = factory.AttributeFactory() + + # Test that providing template names generates an InvalidField error. + template_attribute = objects.TemplateAttribute( + names=[ + attributes.Name.create( + 'invalid', + enums.NameType.UNINTERPRETED_TEXT_STRING + ) + ] + ) + + args = (template_attribute, ) + regex = "Attribute templates are not supported." + self.assertRaisesRegexp( + exceptions.ItemNotFound, + regex, + e._process_template_attribute, + *args + ) + + # Test that an unrecognized attribute generates an InvalidField error. + name = attribute_factory.create_attribute( + enums.AttributeType.NAME, + attributes.Name.create( + 'Test Symmetric Key', + enums.NameType.UNINTERPRETED_TEXT_STRING + ) + ) + name.attribute_name.value = 'invalid' + template_attribute = objects.TemplateAttribute(attributes=[name]) + + args = (template_attribute, ) + regex = "The invalid attribute is unsupported." + self.assertRaisesRegexp( + exceptions.InvalidField, + regex, + e._process_template_attribute, + *args + ) + + # Test that missing indices generate an InvalidField error. + name_a = attribute_factory.create_attribute( + enums.AttributeType.NAME, + attributes.Name.create( + 'Test Symmetric Key', + enums.NameType.UNINTERPRETED_TEXT_STRING + ) + ) + name_b = attribute_factory.create_attribute( + enums.AttributeType.NAME, + attributes.Name.create( + 'Test Symmetric Key', + enums.NameType.UNINTERPRETED_TEXT_STRING + ) + ) + + template_attribute = objects.TemplateAttribute( + attributes=[name_a, name_b] + ) + + args = (template_attribute, ) + regex = "Attribute index missing from multivalued attribute." + self.assertRaisesRegexp( + exceptions.InvalidField, + regex, + e._process_template_attribute, + *args + ) + + # Test that a non-zero index generates an InvalidField error. + algorithm = attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.AES, + 1 + ) + template_attribute = objects.TemplateAttribute(attributes=[algorithm]) + + args = (template_attribute, ) + regex = "Non-zero attribute index found for single-valued attribute." + self.assertRaisesRegexp( + exceptions.InvalidField, + regex, + e._process_template_attribute, + *args + ) + + # Test that setting multiple values for a single-value attribute + # generates an InvalidField error. + algorithm_a = attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.AES + ) + algorithm_b = attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.TRIPLE_DES + ) + + template_attribute = objects.TemplateAttribute( + attributes=[algorithm_a, algorithm_b] + ) + + args = (template_attribute, ) + regex = ( + "Cannot set multiple instances of the Cryptographic Algorithm " + "attribute." + ) + self.assertRaisesRegexp( + exceptions.IndexOutOfBounds, + regex, + e._process_template_attribute, + *args + ) + + def test_set_attributes_on_managed_object(self): + """ + Test that multiple attributes can be set on a given managed object. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + managed_object = pie_objects.SecretData( + b'', + enums.SecretDataType.PASSWORD + ) + managed_object.names = [] + attribute_factory = factory.AttributeFactory() + + name = attribute_factory.create_attribute( + enums.AttributeType.NAME, + attributes.Name.create( + 'Test Secret Data', + enums.NameType.UNINTERPRETED_TEXT_STRING + ) + ) + mask = attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [ + enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.DECRYPT + ] + ) + template_attribute = objects.TemplateAttribute( + attributes=[name, mask] + ) + object_attributes = e._process_template_attribute(template_attribute) + + self.assertEqual([], managed_object.names) + self.assertEqual([], managed_object.cryptographic_usage_masks) + + e._set_attributes_on_managed_object( + managed_object, + object_attributes + ) + + self.assertEqual(['Test Secret Data'], managed_object.names) + self.assertEqual( + [ + enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.DECRYPT + ], + managed_object.cryptographic_usage_masks + ) + + def test_set_attributes_on_managed_object_attribute_mismatch(self): + """ + Test that an InvalidField error is generated when attempting to set + an attribute that is not applicable for a given managed object. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + managed_object = pie_objects.OpaqueObject( + b'', + enums.OpaqueDataType.NONE + ) + attribute_factory = factory.AttributeFactory() + + mask = attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [ + enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.DECRYPT + ] + ) + template_attribute = objects.TemplateAttribute(attributes=[mask]) + object_attributes = e._process_template_attribute(template_attribute) + + args = (managed_object, object_attributes) + regex = ( + "Cannot set Cryptographic Usage Mask attribute on OpaqueData " + "object." + ) + self.assertRaisesRegexp( + exceptions.InvalidField, + regex, + e._set_attributes_on_managed_object, + *args + ) + + def test_set_attribute_on_managed_object(self): + """ + Test that various attributes can be set correctly on a given + managed object. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + attribute_factory = factory.AttributeFactory() + + name = attribute_factory.create_attribute( + enums.AttributeType.NAME, + attributes.Name.create( + 'Test Symmetric Key', + enums.NameType.UNINTERPRETED_TEXT_STRING + ) + ) + algorithm = attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.AES + ) + length = attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_LENGTH, + 0 + ) + mask = attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [ + enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.DECRYPT + ] + ) + managed_object = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + managed_object.names = [] + + self.assertEqual([], managed_object.names) + self.assertEqual( + enums.CryptographicAlgorithm.AES, + managed_object.cryptographic_algorithm + ) + self.assertEqual(0, managed_object.cryptographic_length) + self.assertEqual([], managed_object.cryptographic_usage_masks) + + e._set_attribute_on_managed_object( + managed_object, + ('Name', [name.attribute_value]) + ) + + self.assertEqual(['Test Symmetric Key'], managed_object.names) + + e._set_attribute_on_managed_object( + managed_object, + ('Cryptographic Algorithm', algorithm.attribute_value) + ) + + self.assertEqual( + enums.CryptographicAlgorithm.AES, + managed_object.cryptographic_algorithm + ) + + e._set_attribute_on_managed_object( + managed_object, + ('Cryptographic Length', length.attribute_value) + ) + + self.assertEqual(0, managed_object.cryptographic_length) + + e._set_attribute_on_managed_object( + managed_object, + ('Cryptographic Usage Mask', mask.attribute_value) + ) + + self.assertEqual( + [ + enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.DECRYPT + ], + managed_object.cryptographic_usage_masks + ) + + def test_set_attribute_on_managed_object_unsupported_features(self): + """ + Test that the right errors are generated when unsupported features + are referenced while setting managed object attributes. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + attribute_factory = factory.AttributeFactory() + + managed_object = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 8, + b'\x00' + ) + + # Test that multiple duplicate names cannot be set on an object. + name_a = attribute_factory.create_attribute( + enums.AttributeType.NAME, + attributes.Name.create( + 'Test Symmetric Key', + enums.NameType.UNINTERPRETED_TEXT_STRING + ) + ) + name_b = attribute_factory.create_attribute( + enums.AttributeType.NAME, + attributes.Name.create( + 'Test Symmetric Key', + enums.NameType.UNINTERPRETED_TEXT_STRING + ) + ) + + args = ( + managed_object, + ('Name', [name_a.attribute_value, name_b.attribute_value]) + ) + regex = "Cannot set duplicate name values." + self.assertRaisesRegexp( + exceptions.InvalidField, + regex, + e._set_attribute_on_managed_object, + *args + ) + + # Test that a multivalued, unsupported attribute cannot be set on an + # object. + name_a = attribute_factory.create_attribute( + enums.AttributeType.NAME, + attributes.Name.create( + 'Test Symmetric Key', + enums.NameType.UNINTERPRETED_TEXT_STRING + ) + ) + name_b = attribute_factory.create_attribute( + enums.AttributeType.NAME, + attributes.Name.create( + 'Test Symmetric Key', + enums.NameType.UNINTERPRETED_TEXT_STRING + ) + ) + + args = ( + managed_object, + ('Digest', [name_a.attribute_value, name_b.attribute_value]) + ) + regex = "The Digest attribute is unsupported." + self.assertRaisesRegexp( + exceptions.InvalidField, + regex, + e._set_attribute_on_managed_object, + *args + ) + + # Test that a set attribute cannot be overwritten. + length = attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_LENGTH, + 128 + ) + + args = ( + managed_object, + ('Cryptographic Length', length.attribute_value) + ) + regex = "Cannot overwrite the Cryptographic Length attribute." + self.assertRaisesRegexp( + exceptions.InvalidField, + regex, + e._set_attribute_on_managed_object, + *args + ) + + # Test that an unsupported attribute cannot be set. + object_group = attribute_factory.create_attribute( + enums.AttributeType.OBJECT_GROUP, + 'Test Group' + ) + + args = ( + managed_object, + ('Object Group', object_group.attribute_value) + ) + regex = "The Object Group attribute is unsupported." + self.assertRaisesRegexp( + exceptions.InvalidField, + regex, + e._set_attribute_on_managed_object, + *args + ) + + def test_register(self): + """ + Test that a Register request can be processed correctly. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + attribute_factory = factory.AttributeFactory() + + # Build a SymmetricKey for registration. + object_type = attributes.ObjectType(enums.ObjectType.SYMMETRIC_KEY) + template_attribute = objects.TemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.NAME, + attributes.Name.create( + 'Test Symmetric Key', + enums.NameType.UNINTERPRETED_TEXT_STRING + ) + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.AES + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_LENGTH, + 128 + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [ + enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.DECRYPT + ] + ) + ] + ) + key_bytes = ( + b'\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x00\x00\x00\x00\x00\x00\x00\x00' + ) + secret = secrets.SymmetricKey( + key_block=objects.KeyBlock( + key_format_type=misc.KeyFormatType(enums.KeyFormatType.RAW), + key_value=objects.KeyValue( + key_material=objects.KeyMaterial(key_bytes) + ), + cryptographic_algorithm=attributes.CryptographicAlgorithm( + enums.CryptographicAlgorithm.AES + ), + cryptographic_length=attributes.CryptographicLength(128) + ) + ) + + payload = register.RegisterRequestPayload( + object_type=object_type, + template_attribute=template_attribute, + secret=secret + ) + + response_payload = e._process_register(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_called_once_with( + "Processing operation: Register" + ) + + uid = response_payload.unique_identifier.value + self.assertEqual('1', uid) + + # Retrieve the stored object and verify all attributes were set + # appropriately. + symmetric_key = e._data_session.query( + pie_objects.SymmetricKey + ).filter( + pie_objects.ManagedObject.unique_identifier == uid + ).one() + self.assertEqual( + enums.KeyFormatType.RAW, + symmetric_key.key_format_type + ) + self.assertEqual(1, len(symmetric_key.names)) + self.assertIn('Test Symmetric Key', symmetric_key.names) + self.assertEqual(key_bytes, symmetric_key.value) + self.assertEqual( + enums.CryptographicAlgorithm.AES, + symmetric_key.cryptographic_algorithm + ) + self.assertEqual(128, symmetric_key.cryptographic_length) + self.assertEqual(2, len(symmetric_key.cryptographic_usage_masks)) + self.assertIn( + enums.CryptographicUsageMask.ENCRYPT, + symmetric_key.cryptographic_usage_masks + ) + self.assertIn( + enums.CryptographicUsageMask.DECRYPT, + symmetric_key.cryptographic_usage_masks + ) + + self.assertEqual(uid, e._id_placeholder) + + def test_register_unsupported_object_type(self): + """ + Test that an InvalidField error is generated when attempting to + register an unsupported object type. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + object_type = attributes.ObjectType(enums.ObjectType.SPLIT_KEY) + payload = register.RegisterRequestPayload(object_type=object_type) + + args = (payload, ) + regex = "The SplitKey object type is not supported." + self.assertRaisesRegexp( + exceptions.InvalidField, + regex, + e._process_register, + *args + ) + + def test_request_omitting_secret(self): + """ + Test that an InvalidField error is generate when trying to register + a secret in absentia. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + object_type = attributes.ObjectType(enums.ObjectType.SYMMETRIC_KEY) + payload = register.RegisterRequestPayload(object_type=object_type) + + args = (payload, ) + regex = "Cannot register a secret in absentia." + self.assertRaisesRegexp( + exceptions.InvalidField, + regex, + e._process_register, + *args + ) + def test_get(self): """ Test that a Get request can be processed correctly. @@ -1278,19 +1912,23 @@ class TestKmipEngine(testtools.TestCase): e._logger.info.assert_called_once_with("Processing operation: Query") self.assertIsInstance(result, query.QueryResponsePayload) self.assertIsNotNone(result.operations) - self.assertEqual(3, len(result.operations)) + self.assertEqual(4, len(result.operations)) self.assertEqual( - enums.Operation.GET, + enums.Operation.REGISTER, result.operations[0].value ) self.assertEqual( - enums.Operation.DESTROY, + enums.Operation.GET, result.operations[1].value ) self.assertEqual( - enums.Operation.QUERY, + enums.Operation.DESTROY, result.operations[2].value ) + self.assertEqual( + enums.Operation.QUERY, + result.operations[3].value + ) self.assertEqual(list(), result.object_types) self.assertIsNotNone(result.vendor_identification) self.assertEqual( @@ -1309,7 +1947,7 @@ class TestKmipEngine(testtools.TestCase): e._logger.info.assert_called_once_with("Processing operation: Query") self.assertIsNotNone(result.operations) - self.assertEqual(4, len(result.operations)) + self.assertEqual(5, len(result.operations)) self.assertEqual( enums.Operation.DISCOVER_VERSIONS, result.operations[-1].value @@ -1380,3 +2018,141 @@ class TestKmipEngine(testtools.TestCase): "Processing operation: DiscoverVersions" ) self.assertEqual([], result.protocol_versions) + + def test_register_get_destroy(self): + """ + Test that a managed object can be registered, retrieved, and destroyed + without error. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + attribute_factory = factory.AttributeFactory() + + # Build a SymmetricKey for registration. + object_type = attributes.ObjectType(enums.ObjectType.SYMMETRIC_KEY) + template_attribute = objects.TemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.NAME, + attributes.Name.create( + 'Test Symmetric Key', + enums.NameType.UNINTERPRETED_TEXT_STRING + ) + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.AES + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_LENGTH, + 128 + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [ + enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.DECRYPT + ] + ) + ] + ) + key_bytes = ( + b'\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x00\x00\x00\x00\x00\x00\x00\x00' + ) + secret = secrets.SymmetricKey( + key_block=objects.KeyBlock( + key_format_type=misc.KeyFormatType(enums.KeyFormatType.RAW), + key_value=objects.KeyValue( + key_material=objects.KeyMaterial(key_bytes) + ), + cryptographic_algorithm=attributes.CryptographicAlgorithm( + enums.CryptographicAlgorithm.AES + ), + cryptographic_length=attributes.CryptographicLength(128) + ) + ) + + # Register the symmetric key with the corresponding attributes + payload = register.RegisterRequestPayload( + object_type=object_type, + template_attribute=template_attribute, + secret=secret + ) + + response_payload = e._process_register(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_called_once_with( + "Processing operation: Register" + ) + + uid = response_payload.unique_identifier.value + self.assertEqual('1', uid) + + e._logger.reset_mock() + + # Retrieve the registered key using Get and verify all fields set + payload = get.GetRequestPayload( + unique_identifier=attributes.UniqueIdentifier(uid) + ) + + response_payload = e._process_get(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_called_once_with( + "Processing operation: Get" + ) + self.assertEqual( + enums.ObjectType.SYMMETRIC_KEY, + response_payload.object_type.value + ) + self.assertEqual(str(uid), response_payload.unique_identifier.value) + self.assertIsInstance(response_payload.secret, secrets.SymmetricKey) + self.assertEqual( + key_bytes, + response_payload.secret.key_block.key_value.key_material.value + ) + self.assertEqual( + enums.KeyFormatType.RAW, + response_payload.secret.key_block.key_format_type.value + ) + self.assertEqual( + enums.CryptographicAlgorithm.AES, + response_payload.secret.key_block.cryptographic_algorithm.value + ) + self.assertEqual( + 128, + response_payload.secret.key_block.cryptographic_length.value + ) + + e._logger.reset_mock() + + # Destroy the symmetric key and verify it cannot be accessed again + payload = destroy.DestroyRequestPayload( + unique_identifier=attributes.UniqueIdentifier(uid) + ) + + response_payload = e._process_destroy(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_called_once_with( + "Processing operation: Destroy" + ) + self.assertEqual(str(uid), response_payload.unique_identifier.value) + self.assertRaises( + exc.NoResultFound, + e._data_session.query(pie_objects.OpaqueObject).filter( + pie_objects.ManagedObject.unique_identifier == uid + ).one + ) + + e._data_session.commit() + e._data_store_session_factory()