From 6ecbe7bdda9702e4013cbb004f9c8baa63576a21 Mon Sep 17 00:00:00 2001 From: Peter Date: Tue, 15 Mar 2016 16:13:40 -0400 Subject: [PATCH] Adding KmipEngine support for Get This change adds support for the Get operation to the KmipEngine. New exceptions and test cases are included. --- kmip/core/exceptions.py | 56 ++ kmip/core/factories/secrets.py | 25 +- kmip/services/server/engine.py | 188 ++++- .../tests/unit/services/server/test_engine.py | 642 ++++++++++++++---- 4 files changed, 758 insertions(+), 153 deletions(-) diff --git a/kmip/core/exceptions.py b/kmip/core/exceptions.py index 5420da9..4d6c541 100644 --- a/kmip/core/exceptions.py +++ b/kmip/core/exceptions.py @@ -129,6 +129,44 @@ class ItemNotFound(KmipError): ) +class KeyCompressionTypeNotSupported(KmipError): + """ + An error generated when dealing with unsupported key compression types + and operations. + """ + + def __init__(self, message): + """ + Create a KeyCompressionTypeNotSupported exception. + + Args: + message (string): A string containing information about the error. + """ + super(KeyCompressionTypeNotSupported, self).__init__( + reason=enums.ResultReason.KEY_COMPRESSION_TYPE_NOT_SUPPORTED, + message=message + ) + + +class KeyFormatTypeNotSupported(KmipError): + """ + An error generated when dealing with unsupported key formats + and operations. + """ + + def __init__(self, message): + """ + Create a KeyFormatTypeNotSupported exception. + + Args: + message (string): A string containing information about the error. + """ + super(KeyFormatTypeNotSupported, self).__init__( + reason=enums.ResultReason.KEY_FORMAT_TYPE_NOT_SUPPORTED, + message=message + ) + + class OperationNotSupported(KmipError): """ An error generated when an unsupported operation is invoked. @@ -147,6 +185,24 @@ class OperationNotSupported(KmipError): ) +class PermissionDenied(KmipError): + """ + An error generated when permission constraints are violated. + """ + + def __init__(self, message): + """ + Create a PermissionDenied exception. + + Args: + message (string): A string containing information about the error. + """ + super(PermissionDenied, self).__init__( + reason=enums.ResultReason.PERMISSION_DENIED, + message=message + ) + + class InvalidKmipEncoding(Exception): """ An exception raised when processing invalid KMIP message encodings. diff --git a/kmip/core/factories/secrets.py b/kmip/core/factories/secrets.py index 394df81..70bafc3 100644 --- a/kmip/core/factories/secrets.py +++ b/kmip/core/factories/secrets.py @@ -20,6 +20,7 @@ from kmip.core.attributes import CryptographicLength from kmip.core.enums import ObjectType from kmip.core.errors import ErrorStrings + from kmip.core.misc import KeyFormatType from kmip.core.objects import Attribute @@ -69,7 +70,7 @@ class SecretFactory(object): SymmetricKey(...) """ if secret_type is ObjectType.CERTIFICATE: - return self._create_certificate() + return self._create_certificate(value) elif secret_type is ObjectType.SYMMETRIC_KEY: return self._create_symmetric_key(value) elif secret_type is ObjectType.PUBLIC_KEY: @@ -88,8 +89,14 @@ class SecretFactory(object): raise TypeError("Unrecognized secret type: {0}".format( secret_type)) - def _create_certificate(self): - return Certificate() + def _create_certificate(self, value): + if value: + return Certificate( + certificate_type=value.get('certificate_type'), + certificate_value=value.get('certificate_value') + ) + else: + return Certificate() def _create_symmetric_key(self, value): if value is None: @@ -164,8 +171,16 @@ class SecretFactory(object): key_material = KeyMaterial(key_value) key_value = KeyValue(key_material) - crypto_algorithm = CryptographicAlgorithm(cryptographic_algorithm) - crypto_length = CryptographicLength(cryptographic_length) + + crypto_algorithm = None + if cryptographic_algorithm is not None: + crypto_algorithm = CryptographicAlgorithm( + cryptographic_algorithm + ) + + crypto_length = None + if cryptographic_length is not None: + crypto_length = CryptographicLength(cryptographic_length) key_wrap_data = None if key_wrapping_data is not None: diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index a6465d2..3265c57 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -26,12 +26,14 @@ import kmip from kmip.core import attributes from kmip.core import enums from kmip.core import exceptions +from kmip.core.factories import secrets from kmip.core.messages import contents from kmip.core.messages import messages from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import discover_versions +from kmip.core.messages.payloads import get from kmip.core.messages.payloads import query from kmip.core import misc @@ -51,13 +53,16 @@ class KmipEngine(object): client connections. Features that are not supported: - * KMIP versions > 1.1 + * KMIP versions > 1.2 * Numerous operations, objects, and attributes. * User authentication * Batch processing options: UNDO * Asynchronous operations * Operation policies * Object archival + * Key compression + * Key wrapping + * Key format conversions """ def __init__(self): @@ -376,29 +381,7 @@ class KmipEngine(object): return response_batch - def _process_operation(self, operation, payload): - if operation == enums.Operation.DESTROY: - return self._process_destroy(payload) - if operation == enums.Operation.QUERY: - return self._process_query(payload) - elif operation == enums.Operation.DISCOVER_VERSIONS: - return self._process_discover_versions(payload) - else: - raise exceptions.OperationNotSupported( - "{0} operation is not supported by the server.".format( - operation.name.title() - ) - ) - - @_kmip_version_supported('1.0') - def _process_destroy(self, payload): - self._logger.info("Processing operation: Destroy") - - if payload.unique_identifier: - unique_identifier = payload.unique_identifier.value - else: - unique_identifier = self._id_placeholder - + def _get_object_type(self, unique_identifier): try: object_type = self._data_session.query( objects.ManagedObject._object_type @@ -423,23 +406,169 @@ class KmipEngine(object): ) raise e - table = self._object_map.get(object_type) - if table is None: + class_type = self._object_map.get(object_type) + if class_type is None: name = object_type.name raise exceptions.InvalidField( "The {0} object type is not supported.".format( ''.join( - [x.capitalize() for x in name[9:].split('_')] + [x.capitalize() for x in name.split('_')] ) ) ) + return class_type + + def _build_core_object(self, obj): + try: + object_type = obj._object_type + except Exception: + raise exceptions.InvalidField( + "Cannot build an unsupported object type." + ) + + value = {} + + if object_type == enums.ObjectType.CERTIFICATE: + value = { + 'certificate_type': obj.certificate_type, + 'certificate_value': obj.value + } + elif object_type == enums.ObjectType.SYMMETRIC_KEY: + value = { + 'cryptographic_algorithm': obj.cryptographic_algorithm, + 'cryptographic_length': obj.cryptographic_length, + 'key_format_type': obj.key_format_type, + 'key_value': obj.value + } + elif object_type == enums.ObjectType.PUBLIC_KEY: + value = { + 'cryptographic_algorithm': obj.cryptographic_algorithm, + 'cryptographic_length': obj.cryptographic_length, + 'key_format_type': obj.key_format_type, + 'key_value': obj.value + } + elif object_type == enums.ObjectType.PRIVATE_KEY: + value = { + 'cryptographic_algorithm': obj.cryptographic_algorithm, + 'cryptographic_length': obj.cryptographic_length, + 'key_format_type': obj.key_format_type, + 'key_value': obj.value + } + elif object_type == enums.ObjectType.SECRET_DATA: + value = { + 'key_format_type': enums.KeyFormatType.OPAQUE, + 'key_value': obj.value, + 'secret_data_type': obj.data_type + } + elif object_type == enums.ObjectType.OPAQUE_DATA: + value = { + 'opaque_data_type': obj.opaque_type, + 'opaque_data_value': obj.value + } + else: + name = object_type.name + raise exceptions.InvalidField( + "The {0} object type is not supported.".format( + ''.join( + [x.capitalize() for x in name.split('_')] + ) + ) + ) + + secret_factory = secrets.SecretFactory() + return secret_factory.create(object_type, value) + + def _process_operation(self, operation, payload): + if operation == enums.Operation.GET: + return self._process_get(payload) + elif operation == enums.Operation.DESTROY: + return self._process_destroy(payload) + elif operation == enums.Operation.QUERY: + return self._process_query(payload) + elif operation == enums.Operation.DISCOVER_VERSIONS: + return self._process_discover_versions(payload) + else: + raise exceptions.OperationNotSupported( + "{0} operation is not supported by the server.".format( + operation.name.title() + ) + ) + + @_kmip_version_supported('1.0') + def _process_get(self, payload): + self._logger.info("Processing operation: Get") + + unique_identifier = self._id_placeholder + if payload.unique_identifier: + unique_identifier = payload.unique_identifier.value + + key_format_type = None + if payload.key_format_type: + key_format_type = payload.key_format_type.value + + if payload.key_compression_type: + raise exceptions.KeyCompressionTypeNotSupported( + "Key compression is not supported." + ) + + if payload.key_wrapping_specification: + raise exceptions.PermissionDenied( + "Key wrapping is not supported." + ) + + # TODO (peterhamilton) Process key wrapping information + # 1. Error check wrapping keys for accessibility and usability + + object_type = self._get_object_type(unique_identifier) + + managed_object = self._data_session.query(object_type).filter( + object_type.unique_identifier == unique_identifier + ).one() + + if key_format_type: + if not hasattr(managed_object, 'key_format_type'): + raise exceptions.KeyFormatTypeNotSupported( + "Key format is not applicable to the specified object." + ) + + # TODO (peterhamilton) Convert key to desired format if possible + if key_format_type != managed_object.key_format_type: + raise exceptions.KeyFormatTypeNotSupported( + "Key format conversion from {0} to {1} is " + "unsupported.".format( + managed_object.key_format_type.name, + key_format_type.name + ) + ) + + core_secret = self._build_core_object(managed_object) + + response_payload = get.GetResponsePayload( + object_type=attributes.ObjectType(managed_object._object_type), + unique_identifier=attributes.UniqueIdentifier(unique_identifier), + secret=core_secret + ) + + return response_payload + + @_kmip_version_supported('1.0') + def _process_destroy(self, payload): + self._logger.info("Processing operation: Destroy") + + if payload.unique_identifier: + unique_identifier = payload.unique_identifier.value + else: + unique_identifier = self._id_placeholder + + object_type = self._get_object_type(unique_identifier) + # TODO (peterhamilton) Process attributes to see if destroy possible # 1. Check object state. If invalid, error out. # 2. Check object deactivation date. If invalid, error out. - self._data_session.query(table).filter( - table.unique_identifier == unique_identifier + self._data_session.query(object_type).filter( + object_type.unique_identifier == unique_identifier ).delete() response_payload = destroy.DestroyResponsePayload( @@ -463,6 +592,7 @@ class KmipEngine(object): if enums.QueryFunction.QUERY_OPERATIONS in queries: operations = list([ + contents.Operation(enums.Operation.GET), contents.Operation(enums.Operation.DESTROY), contents.Operation(enums.Operation.QUERY) ]) diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index d566106..a8c3b96 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -28,12 +28,14 @@ from kmip.core import enums from kmip.core import exceptions from kmip.core import misc from kmip.core import objects +from kmip.core import secrets from kmip.core.messages import contents from kmip.core.messages import messages from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import discover_versions +from kmip.core.messages.payloads import get from kmip.core.messages.payloads import query from kmip.pie import objects as pie_objects @@ -638,14 +640,17 @@ class TestKmipEngine(testtools.TestCase): e = engine.KmipEngine() e._logger = mock.MagicMock() + e._process_get = mock.MagicMock() e._process_destroy = mock.MagicMock() e._process_query = mock.MagicMock() e._process_discover_versions = mock.MagicMock() + e._process_operation(enums.Operation.GET, None) e._process_operation(enums.Operation.DESTROY, None) e._process_operation(enums.Operation.QUERY, None) e._process_operation(enums.Operation.DISCOVER_VERSIONS, None) + e._process_get.assert_called_with(None) e._process_destroy.assert_called_with(None) e._process_query.assert_called_with(None) e._process_discover_versions.assert_called_with(None) @@ -669,6 +674,515 @@ class TestKmipEngine(testtools.TestCase): *args ) + def test_get_object_type(self): + """ + Test that the object type of a stored object can be retrieved + correctly. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + obj_a = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE) + + e._data_session.add(obj_a) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + id_a = str(obj_a.unique_identifier) + + object_type = e._get_object_type(id_a) + e._data_session.commit() + + self.assertEqual(pie_objects.OpaqueObject, object_type) + + def test_get_object_type_missing_object(self): + """ + Test that an ItemNotFound error is generated when attempting to + retrieve the object type of an object that does not exist. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + args = ('1', ) + regex = "Could not locate object: 1" + self.assertRaisesRegexp( + exceptions.ItemNotFound, + regex, + e._get_object_type, + *args + ) + e._data_session.commit() + e._logger.warning.assert_called_once_with( + "Could not identify object type for object: 1" + ) + self.assertTrue(e._logger.exception.called) + + def test_get_object_type_multiple_objects(self): + """ + Test that a sqlalchemy.orm.exc.MultipleResultsFound error is generated + when getting the object type of multiple objects map to the same + object ID. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + test_exception = exc.MultipleResultsFound() + e._data_session.query = mock.MagicMock(side_effect=test_exception) + e._logger = mock.MagicMock() + + args = ('1', ) + self.assertRaises( + exc.MultipleResultsFound, + e._get_object_type, + *args + ) + e._data_session.commit() + e._logger.warning.assert_called_once_with( + "Multiple objects found for ID: 1" + ) + + def test_get_object_type_unsupported_type(self): + """ + Test that an InvalidField error is generated when attempting to + get the object type of an object with an unsupported object type. + This should never happen by definition, but "Safety first!" + """ + e = engine.KmipEngine() + e._object_map = {enums.ObjectType.OPAQUE_DATA: None} + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + obj_a = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE) + + e._data_session.add(obj_a) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + id_a = str(obj_a.unique_identifier) + + args = (id_a, ) + name = enums.ObjectType.OPAQUE_DATA.name + regex = "The {0} object type is not supported.".format( + ''.join( + [x.capitalize() for x in name.split('_')] + ) + ) + + self.assertRaisesRegexp( + exceptions.InvalidField, + regex, + e._get_object_type, + *args + ) + e._data_session.commit() + + def test_build_core_object(self): + """ + Test that kmip.core objects can be built from simpler kmip.pie + objects. + """ + e = engine.KmipEngine() + e._logger = mock.MagicMock() + + # Test building a Certificate. + managed_object = pie_objects.X509Certificate(value=b'') + core_object = e._build_core_object(managed_object) + + self.assertIsInstance(core_object, secrets.Certificate) + self.assertEqual( + b'', + core_object.certificate_value.value + ) + self.assertEqual( + enums.CertificateTypeEnum.X_509, + core_object.certificate_type.value + ) + + # Test building a Symmetric Key. + managed_object = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + core_object = e._build_core_object(managed_object) + + self.assertIsInstance(core_object, secrets.SymmetricKey) + self.assertEqual( + enums.CryptographicAlgorithm.AES, + core_object.key_block.cryptographic_algorithm.value + ) + self.assertEqual( + 0, + core_object.key_block.cryptographic_length.value + ) + self.assertEqual( + b'', + core_object.key_block.key_value.key_material.value + ) + + # Test building a Public Key. + managed_object = pie_objects.PublicKey( + enums.CryptographicAlgorithm.RSA, + 0, + b'' + ) + core_object = e._build_core_object(managed_object) + + self.assertIsInstance(core_object, secrets.PublicKey) + self.assertEqual( + enums.CryptographicAlgorithm.RSA, + core_object.key_block.cryptographic_algorithm.value + ) + self.assertEqual( + 0, + core_object.key_block.cryptographic_length.value + ) + self.assertEqual( + b'', + core_object.key_block.key_value.key_material.value + ) + + # Test building a Private Key. + managed_object = pie_objects.PrivateKey( + enums.CryptographicAlgorithm.RSA, + 0, + b'', + enums.KeyFormatType.PKCS_8 + ) + core_object = e._build_core_object(managed_object) + + self.assertIsInstance(core_object, secrets.PrivateKey) + self.assertEqual( + enums.CryptographicAlgorithm.RSA, + core_object.key_block.cryptographic_algorithm.value + ) + self.assertEqual( + 0, + core_object.key_block.cryptographic_length.value + ) + self.assertEqual( + b'', + core_object.key_block.key_value.key_material.value + ) + self.assertEqual( + enums.KeyFormatType.PKCS_8, + core_object.key_block.key_format_type.value + ) + + # Test building a Secret Data. + managed_object = pie_objects.SecretData( + b'', + enums.SecretDataType.PASSWORD + ) + core_object = e._build_core_object(managed_object) + + self.assertIsInstance(core_object, secrets.SecretData) + self.assertEqual( + enums.SecretDataType.PASSWORD, + core_object.secret_data_type.value + ) + self.assertEqual( + b'', + core_object.key_block.key_value.key_material.value + ) + + # Test building an Opaque Data. + managed_object = pie_objects.OpaqueObject( + b'', + enums.OpaqueDataType.NONE + ) + core_object = e._build_core_object(managed_object) + + self.assertIsInstance(core_object, secrets.OpaqueObject) + self.assertEqual( + enums.OpaqueDataType.NONE, + core_object.opaque_data_type.value + ) + self.assertEqual( + b'', + core_object.opaque_data_value.value + ) + + def test_build_core_object_unsupported_type(self): + """ + Test that an InvalidField error is generated when building + kmip.core objects that are unsupported. + """ + e = engine.KmipEngine() + e._logger = mock.MagicMock() + + args = (None, ) + regex = "Cannot build an unsupported object type." + self.assertRaisesRegexp( + exceptions.InvalidField, + regex, + e._build_core_object, + *args + ) + + class DummyObject: + def __init__(self): + self._object_type = enums.ObjectType.SPLIT_KEY + + args = (DummyObject(), ) + regex = "The SplitKey object type is not supported." + self.assertRaisesRegexp( + exceptions.InvalidField, + regex, + e._build_core_object, + *args + ) + + def test_get(self): + """ + Test that a Get request can be processed correctly. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + obj_a = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE) + obj_b = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE) + + e._data_session.add(obj_a) + e._data_session.add(obj_b) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + id_a = str(obj_a.unique_identifier) + id_b = str(obj_b.unique_identifier) + + # Test by specifying the ID of the object to get. + payload = get.GetRequestPayload( + unique_identifier=attributes.UniqueIdentifier(id_a) + ) + + response_payload = e._process_get(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_called_once_with( + "Processing operation: Get" + ) + self.assertEqual( + enums.ObjectType.OPAQUE_DATA, + response_payload.object_type.value + ) + self.assertEqual(str(id_a), response_payload.unique_identifier.value) + self.assertIsInstance(response_payload.secret, secrets.OpaqueObject) + self.assertEqual( + enums.OpaqueDataType.NONE, + response_payload.secret.opaque_data_type.value + ) + self.assertEqual( + b'', + response_payload.secret.opaque_data_value.value + ) + + e._data_session.commit() + e._data_store_session_factory() + e._logger.reset_mock() + e._id_placeholder = str(id_b) + + # Test by using the ID placeholder to specify the object to get. + payload = get.GetRequestPayload() + + response_payload = e._process_get(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_called_once_with( + "Processing operation: Get" + ) + self.assertEqual( + enums.ObjectType.OPAQUE_DATA, + response_payload.object_type.value + ) + self.assertEqual(str(id_b), response_payload.unique_identifier.value) + self.assertIsInstance(response_payload.secret, secrets.OpaqueObject) + self.assertEqual( + enums.OpaqueDataType.NONE, + response_payload.secret.opaque_data_type.value + ) + self.assertEqual( + b'', + response_payload.secret.opaque_data_value.value + ) + + e._data_session.commit() + + def test_get_with_unsupported_features(self): + """ + Test that the right errors are generated when unsupported features + are used in a Get request. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + # Test that specifying the key compression type generates an error. + payload = get.GetRequestPayload( + key_compression_type=get.GetRequestPayload.KeyCompressionType( + enums.KeyCompressionType.EC_PUBLIC_KEY_TYPE_UNCOMPRESSED + ) + ) + + args = (payload, ) + regex = "Key compression is not supported." + self.assertRaisesRegexp( + exceptions.KeyCompressionTypeNotSupported, + regex, + e._process_get, + *args + ) + e._logger.info.assert_called_once_with( + "Processing operation: Get" + ) + + e._logger.reset_mock() + + # Test that specifying the key wrapping specification generates an + # error. + payload = get.GetRequestPayload( + key_wrapping_specification=objects.KeyWrappingSpecification() + ) + + args = (payload, ) + regex = "Key wrapping is not supported." + self.assertRaisesRegexp( + exceptions.PermissionDenied, + regex, + e._process_get, + *args + ) + e._logger.info.assert_called_once_with( + "Processing operation: Get" + ) + + def test_get_with_key_format_type(self): + """ + Test that the key format type is handled properly in a Get request. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + obj_a = pie_objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 0, + b'' + ) + + e._data_session.add(obj_a) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + id_a = str(obj_a.unique_identifier) + + # Test that a key can be retrieved with the right key format. + payload = get.GetRequestPayload( + unique_identifier=attributes.UniqueIdentifier(id_a), + key_format_type=get.GetRequestPayload.KeyFormatType( + enums.KeyFormatType.RAW + ) + ) + + response_payload = e._process_get(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_called_once_with( + "Processing operation: Get" + ) + + self.assertIsInstance(response_payload.secret, secrets.SymmetricKey) + self.assertEqual( + enums.CryptographicAlgorithm.AES, + response_payload.secret.key_block.cryptographic_algorithm.value + ) + self.assertEqual( + 0, + response_payload.secret.key_block.cryptographic_length.value + ) + self.assertEqual( + b'', + response_payload.secret.key_block.key_value.key_material.value + ) + self.assertEqual( + enums.KeyFormatType.RAW, + response_payload.secret.key_block.key_format_type.value + ) + + # Test that an error is generated when a key format conversion is + # required. + e._logger.reset_mock() + + payload = get.GetRequestPayload( + unique_identifier=attributes.UniqueIdentifier(id_a), + key_format_type=get.GetRequestPayload.KeyFormatType( + enums.KeyFormatType.OPAQUE + ) + ) + + args = (payload, ) + regex = "Key format conversion from RAW to OPAQUE is unsupported." + self.assertRaisesRegexp( + exceptions.KeyFormatTypeNotSupported, + regex, + e._process_get, + *args + ) + e._logger.info.assert_called_once_with( + "Processing operation: Get" + ) + + # Test that an error is generated when a key format is requested but + # does not apply to the given managed object. + e._data_session = e._data_store_session_factory() + e._logger.reset_mock() + + obj_b = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE) + + e._data_session.add(obj_b) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + id_b = str(obj_b.unique_identifier) + + payload = get.GetRequestPayload( + unique_identifier=attributes.UniqueIdentifier(id_b), + key_format_type=get.GetRequestPayload.KeyFormatType( + enums.KeyFormatType.RAW + ) + ) + + args = (payload, ) + regex = "Key format is not applicable to the specified object." + self.assertRaisesRegexp( + exceptions.KeyFormatTypeNotSupported, + regex, + e._process_get, + *args + ) + e._logger.info.assert_called_once_with( + "Processing operation: Get" + ) + def test_destroy(self): """ Test that a Destroy request can be processed correctly. @@ -735,112 +1249,6 @@ class TestKmipEngine(testtools.TestCase): e._data_session.commit() - def test_destroy_missing_object(self): - """ - Test that an ItemNotFound error is generated when attempting to - destroy an object that does not exist. - """ - e = engine.KmipEngine() - e._data_store = self.engine - e._data_store_session_factory = self.session_factory - e._data_session = e._data_store_session_factory() - e._logger = mock.MagicMock() - - payload = destroy.DestroyRequestPayload( - unique_identifier=attributes.UniqueIdentifier('1') - ) - - args = (payload, ) - regex = "Could not locate object: 1" - self.assertRaisesRegexp( - exceptions.ItemNotFound, - regex, - e._process_destroy, - *args - ) - e._data_session.commit() - e._logger.info.assert_called_once_with( - "Processing operation: Destroy" - ) - e._logger.warning.assert_called_once_with( - "Could not identify object type for object: 1" - ) - self.assertTrue(e._logger.exception.called) - - def test_destroy_multiple_objects(self): - """ - Test that a sqlalchemy.orm.exc.MultipleResultsFound error is generated - when multiple objects map to the same object ID. - """ - e = engine.KmipEngine() - e._data_store = self.engine - e._data_store_session_factory = self.session_factory - e._data_session = e._data_store_session_factory() - test_exception = exc.MultipleResultsFound() - e._data_session.query = mock.MagicMock(side_effect=test_exception) - e._logger = mock.MagicMock() - - payload = destroy.DestroyRequestPayload( - unique_identifier=attributes.UniqueIdentifier('1') - ) - - args = (payload, ) - self.assertRaises( - exc.MultipleResultsFound, - e._process_destroy, - *args - ) - e._data_session.commit() - e._logger.info.assert_called_once_with( - "Processing operation: Destroy" - ) - e._logger.warning.assert_called_once_with( - "Multiple objects found for ID: 1" - ) - - def test_destroy_unsupported_object_type(self): - """ - Test that an InvalidField error is generated when attempting to - destroy an unsupported object type. - """ - e = engine.KmipEngine() - e._object_map = {enums.ObjectType.OPAQUE_DATA: None} - e._data_store = self.engine - e._data_store_session_factory = self.session_factory - e._data_session = e._data_store_session_factory() - e._logger = mock.MagicMock() - - obj_a = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE) - - e._data_session.add(obj_a) - e._data_session.commit() - e._data_session = e._data_store_session_factory() - - id_a = str(obj_a.unique_identifier) - - payload = destroy.DestroyRequestPayload( - unique_identifier=attributes.UniqueIdentifier(id_a) - ) - - args = (payload, ) - name = enums.ObjectType.OPAQUE_DATA.name - regex = "The {0} object type is not supported.".format( - ''.join( - [x.capitalize() for x in name[9:].split('_')] - ) - ) - - self.assertRaisesRegexp( - exceptions.InvalidField, - regex, - e._process_destroy, - *args - ) - e._data_session.commit() - e._logger.info.assert_called_once_with( - "Processing operation: Destroy" - ) - def test_query(self): """ Test that a Query request can be processed correctly, for different @@ -870,15 +1278,19 @@ class TestKmipEngine(testtools.TestCase): e._logger.info.assert_called_once_with("Processing operation: Query") self.assertIsInstance(result, query.QueryResponsePayload) self.assertIsNotNone(result.operations) - self.assertEqual(2, len(result.operations)) + self.assertEqual(3, len(result.operations)) self.assertEqual( - enums.Operation.DESTROY, + enums.Operation.GET, result.operations[0].value ) self.assertEqual( - enums.Operation.QUERY, + enums.Operation.DESTROY, result.operations[1].value ) + self.assertEqual( + enums.Operation.QUERY, + result.operations[2].value + ) self.assertEqual(list(), result.object_types) self.assertIsNotNone(result.vendor_identification) self.assertEqual( @@ -897,18 +1309,10 @@ class TestKmipEngine(testtools.TestCase): e._logger.info.assert_called_once_with("Processing operation: Query") self.assertIsNotNone(result.operations) - self.assertEqual(3, len(result.operations)) - self.assertEqual( - enums.Operation.DESTROY, - result.operations[0].value - ) - self.assertEqual( - enums.Operation.QUERY, - result.operations[1].value - ) + self.assertEqual(4, len(result.operations)) self.assertEqual( enums.Operation.DISCOVER_VERSIONS, - result.operations[2].value + result.operations[-1].value ) def test_discover_versions(self):