From 934fc7b93e70ef2b5c315b6aa878c828280ee8de Mon Sep 17 00:00:00 2001 From: Peter Date: Tue, 22 Mar 2016 16:22:27 -0400 Subject: [PATCH] Adding KmipEngine support for CreateKeyPair This change adds support for the CreateKeyPair operation to the KmipEngine. New exceptions and test cases are included. --- kmip/services/server/engine.py | 174 +++- .../tests/unit/services/server/test_engine.py | 966 +++++++++++++++++- 2 files changed, 1111 insertions(+), 29 deletions(-) diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index 8536510..bcc9900 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -33,6 +33,7 @@ from kmip.core.messages import contents from kmip.core.messages import messages from kmip.core.messages.payloads import create +from kmip.core.messages.payloads import create_key_pair from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import discover_versions from kmip.core.messages.payloads import get @@ -71,6 +72,7 @@ class KmipEngine(object): * Registration of empty managed objects (e.g., Private Keys) * Managed object state tracking * Managed object usage limit tracking and enforcement + * Cryptographic usage mask enforcement per object type """ def __init__(self): @@ -619,6 +621,8 @@ class KmipEngine(object): def _process_operation(self, operation, payload): if operation == enums.Operation.CREATE: return self._process_create(payload) + elif operation == enums.Operation.CREATE_KEY_PAIR: + return self._process_create_key_pair(payload) elif operation == enums.Operation.REGISTER: return self._process_register(payload) elif operation == enums.Operation.GET: @@ -722,6 +726,167 @@ class KmipEngine(object): return response_payload + @_kmip_version_supported('1.0') + def _process_create_key_pair(self, payload): + self._logger.info("Processing operation: CreateKeyPair") + + algorithm = None + length = None + + # Process attribute sets + public_key_attributes = {} + private_key_attributes = {} + common_attributes = {} + if payload.public_key_template_attribute: + public_key_attributes = self._process_template_attribute( + payload.public_key_template_attribute + ) + if payload.private_key_template_attribute: + private_key_attributes = self._process_template_attribute( + payload.private_key_template_attribute + ) + if payload.common_template_attribute: + common_attributes = self._process_template_attribute( + payload.common_template_attribute + ) + + # Propagate common attributes if not overridden by the public/private + # attribute sets + for key, value in six.iteritems(common_attributes): + if key not in public_key_attributes.keys(): + public_key_attributes.update([(key, value)]) + if key not in private_key_attributes.keys(): + private_key_attributes.update([(key, value)]) + + # Error check for required attributes. + public_algorithm = public_key_attributes.get('Cryptographic Algorithm') + if public_algorithm: + public_algorithm = public_algorithm.value + else: + raise exceptions.InvalidField( + "The cryptographic algorithm must be specified as an " + "attribute for the public key." + ) + + public_length = public_key_attributes.get('Cryptographic Length') + if public_length: + public_length = public_length.value + else: + # TODO (peterhamilton) The cryptographic length is technically not + # required per the spec. Update the CryptographyEngine to accept a + # None length, allowing it to pick the length dynamically. Default + # to the strongest key size allowed for the algorithm type. + raise exceptions.InvalidField( + "The cryptographic length must be specified as an attribute " + "for the public key." + ) + + public_usage_mask = public_key_attributes.get( + 'Cryptographic Usage Mask' + ) + if public_usage_mask is None: + raise exceptions.InvalidField( + "The cryptographic usage mask must be specified as an " + "attribute for the public key." + ) + + private_algorithm = private_key_attributes.get( + 'Cryptographic Algorithm' + ) + if private_algorithm: + private_algorithm = private_algorithm.value + else: + raise exceptions.InvalidField( + "The cryptographic algorithm must be specified as an " + "attribute for the private key." + ) + + private_length = private_key_attributes.get('Cryptographic Length') + if private_length: + private_length = private_length.value + else: + # TODO (peterhamilton) The cryptographic length is technically not + # required per the spec. Update the CryptographyEngine to accept a + # None length, allowing it to pick the length dynamically. Default + # to the strongest key size allowed for the algorithm type. + raise exceptions.InvalidField( + "The cryptographic length must be specified as an attribute " + "for the private key." + ) + + private_usage_mask = private_key_attributes.get( + 'Cryptographic Usage Mask' + ) + if private_usage_mask is None: + raise exceptions.InvalidField( + "The cryptographic usage mask must be specified as an " + "attribute for the private key." + ) + + if public_algorithm == private_algorithm: + algorithm = public_algorithm + else: + raise exceptions.InvalidField( + "The public and private key algorithms must be the same." + ) + + if public_length == private_length: + length = public_length + else: + raise exceptions.InvalidField( + "The public and private key lengths must be the same." + ) + + public, private = self._cryptography_engine.create_asymmetric_key_pair( + algorithm, + length + ) + + public_key = objects.PublicKey( + algorithm, + length, + public.get('value'), + public.get('format') + ) + private_key = objects.PrivateKey( + algorithm, + length, + private.get('value'), + private.get('format') + ) + public_key.names = [] + private_key.names = [] + + self._set_attributes_on_managed_object( + public_key, + public_key_attributes + ) + self._set_attributes_on_managed_object( + private_key, + private_key_attributes + ) + + # TODO (peterhamilton) Set additional server-only attributes. + + self._data_session.add(public_key) + self._data_session.add(private_key) + + # NOTE (peterhamilton) SQLAlchemy will *not* assign an ID until + # commit is called. This makes future support for UNDO problematic. + self._data_session.commit() + + response_payload = create_key_pair.CreateKeyPairResponsePayload( + private_key_uuid=attributes.PrivateKeyUniqueIdentifier( + str(private_key.unique_identifier) + ), + public_key_uuid=attributes.PublicKeyUniqueIdentifier( + str(public_key.unique_identifier) + ) + ) + + self._id_placeholder = str(private_key.unique_identifier) + return response_payload + @_kmip_version_supported('1.0') def _process_register(self, payload): self._logger.info("Processing operation: Register") @@ -847,20 +1012,22 @@ class KmipEngine(object): else: unique_identifier = self._id_placeholder - object_type = self._get_object_type(unique_identifier) + self._get_object_type(unique_identifier) # TODO (peterhamilton) Process attributes to see if destroy possible # 1. Check object state. If invalid, error out. # 2. Check object deactivation date. If invalid, error out. - self._data_session.query(object_type).filter( - object_type.unique_identifier == unique_identifier + self._data_session.query(objects.ManagedObject).filter( + objects.ManagedObject.unique_identifier == unique_identifier ).delete() response_payload = destroy.DestroyResponsePayload( unique_identifier=attributes.UniqueIdentifier(unique_identifier) ) + self._data_session.commit() + return response_payload @_kmip_version_supported('1.0') @@ -879,6 +1046,7 @@ class KmipEngine(object): if enums.QueryFunction.QUERY_OPERATIONS in queries: operations = list([ contents.Operation(enums.Operation.CREATE), + contents.Operation(enums.Operation.CREATE_KEY_PAIR), contents.Operation(enums.Operation.REGISTER), contents.Operation(enums.Operation.GET), contents.Operation(enums.Operation.DESTROY), diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index 2f52207..edcdf47 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -36,6 +36,7 @@ from kmip.core.messages import contents from kmip.core.messages import messages from kmip.core.messages.payloads import create +from kmip.core.messages.payloads import create_key_pair from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import discover_versions from kmip.core.messages.payloads import get @@ -645,6 +646,7 @@ class TestKmipEngine(testtools.TestCase): e._logger = mock.MagicMock() e._process_create = mock.MagicMock() + e._process_create_key_pair = mock.MagicMock() e._process_register = mock.MagicMock() e._process_get = mock.MagicMock() e._process_destroy = mock.MagicMock() @@ -652,6 +654,7 @@ class TestKmipEngine(testtools.TestCase): e._process_discover_versions = mock.MagicMock() e._process_operation(enums.Operation.CREATE, None) + e._process_operation(enums.Operation.CREATE_KEY_PAIR, None) e._process_operation(enums.Operation.REGISTER, None) e._process_operation(enums.Operation.GET, None) e._process_operation(enums.Operation.DESTROY, None) @@ -659,6 +662,7 @@ class TestKmipEngine(testtools.TestCase): e._process_operation(enums.Operation.DISCOVER_VERSIONS, None) e._process_create.assert_called_with(None) + e._process_create_key_pair.assert_called_with(None) e._process_register.assert_called_with(None) e._process_get.assert_called_with(None) e._process_destroy.assert_called_with(None) @@ -1690,6 +1694,682 @@ class TestKmipEngine(testtools.TestCase): ) e._logger.reset_mock() + def test_create_key_pair(self): + """ + Test that a CreateKeyPair request can be processed correctly. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + attribute_factory = factory.AttributeFactory() + + common_template = objects.CommonTemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.NAME, + attributes.Name.create( + 'Test Asymmetric Key', + enums.NameType.UNINTERPRETED_TEXT_STRING + ) + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.RSA + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_LENGTH, + 2048 + ) + ] + ) + public_template = objects.PublicKeyTemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [ + enums.CryptographicUsageMask.ENCRYPT + ] + ) + ] + ) + private_template = objects.PrivateKeyTemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [ + enums.CryptographicUsageMask.DECRYPT + ] + ) + ] + ) + payload = create_key_pair.CreateKeyPairRequestPayload( + common_template, + private_template, + public_template + ) + + response_payload = e._process_create_key_pair(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_called_once_with( + "Processing operation: CreateKeyPair" + ) + + public_id = response_payload.public_key_uuid.value + self.assertEqual('1', public_id) + private_id = response_payload.private_key_uuid.value + self.assertEqual('2', private_id) + + # Retrieve the stored public key and verify all attributes were set + # appropriately. + public_key = e._data_session.query( + pie_objects.PublicKey + ).filter( + pie_objects.ManagedObject.unique_identifier == public_id + ).one() + self.assertEqual( + enums.KeyFormatType.PKCS_1, + public_key.key_format_type + ) + self.assertEqual(1, len(public_key.names)) + self.assertIn('Test Asymmetric Key', public_key.names) + self.assertEqual( + enums.CryptographicAlgorithm.RSA, + public_key.cryptographic_algorithm + ) + self.assertEqual(2048, public_key.cryptographic_length) + self.assertEqual(1, len(public_key.cryptographic_usage_masks)) + self.assertIn( + enums.CryptographicUsageMask.ENCRYPT, + public_key.cryptographic_usage_masks + ) + + # Retrieve the stored private key and verify all attributes were set + # appropriately. + private_key = e._data_session.query( + pie_objects.PrivateKey + ).filter( + pie_objects.ManagedObject.unique_identifier == private_id + ).one() + self.assertEqual( + enums.KeyFormatType.PKCS_8, + private_key.key_format_type + ) + self.assertEqual(1, len(private_key.names)) + self.assertIn('Test Asymmetric Key', private_key.names) + self.assertEqual( + enums.CryptographicAlgorithm.RSA, + private_key.cryptographic_algorithm + ) + self.assertEqual(2048, private_key.cryptographic_length) + self.assertEqual(1, len(private_key.cryptographic_usage_masks)) + self.assertIn( + enums.CryptographicUsageMask.DECRYPT, + private_key.cryptographic_usage_masks + ) + + self.assertEqual(private_id, e._id_placeholder) + + def test_create_key_pair_omitting_attributes(self): + """ + Test that the right errors are generated when required attributes + are missing from a CreateKeyPair request. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + attribute_factory = factory.AttributeFactory() + + # Test that a missing PublicKey CryptographicAlgorithm raises an error + common_template = objects.CommonTemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.NAME, + attributes.Name.create( + 'Test Asymmetric Key', + enums.NameType.UNINTERPRETED_TEXT_STRING + ) + ) + ] + ) + public_template = objects.PublicKeyTemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_LENGTH, + 2048 + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [ + enums.CryptographicUsageMask.ENCRYPT + ] + ) + ] + ) + private_template = objects.PrivateKeyTemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.RSA + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_LENGTH, + 2048 + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [ + enums.CryptographicUsageMask.DECRYPT + ] + ) + ] + ) + payload = create_key_pair.CreateKeyPairRequestPayload( + common_template, + private_template, + public_template + ) + + args = (payload, ) + regex = ( + "The cryptographic algorithm must be specified as an attribute " + "for the public key." + ) + self.assertRaisesRegexp( + exceptions.InvalidField, + regex, + e._process_create_key_pair, + *args + ) + e._logger.info.assert_called_once_with( + "Processing operation: CreateKeyPair" + ) + e._logger.reset_mock() + + # Test that a missing PrivateKey CryptographicAlgorithm raises an error + common_template = objects.CommonTemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.NAME, + attributes.Name.create( + 'Test Asymmetric Key', + enums.NameType.UNINTERPRETED_TEXT_STRING + ) + ) + ] + ) + public_template = objects.PublicKeyTemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.RSA + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_LENGTH, + 2048 + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [ + enums.CryptographicUsageMask.ENCRYPT + ] + ) + ] + ) + private_template = objects.PrivateKeyTemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_LENGTH, + 2048 + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [ + enums.CryptographicUsageMask.DECRYPT + ] + ) + ] + ) + payload = create_key_pair.CreateKeyPairRequestPayload( + common_template, + private_template, + public_template + ) + + args = (payload, ) + regex = ( + "The cryptographic algorithm must be specified as an attribute " + "for the private key." + ) + self.assertRaisesRegexp( + exceptions.InvalidField, + regex, + e._process_create_key_pair, + *args + ) + e._logger.info.assert_called_once_with( + "Processing operation: CreateKeyPair" + ) + e._logger.reset_mock() + + # Test that a missing PublicKey CryptographicLength raises an error + common_template = objects.CommonTemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.NAME, + attributes.Name.create( + 'Test Asymmetric Key', + enums.NameType.UNINTERPRETED_TEXT_STRING + ) + ) + ] + ) + public_template = objects.PublicKeyTemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.RSA + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [ + enums.CryptographicUsageMask.ENCRYPT + ] + ) + ] + ) + private_template = objects.PrivateKeyTemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.RSA + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_LENGTH, + 2048 + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [ + enums.CryptographicUsageMask.DECRYPT + ] + ) + ] + ) + payload = create_key_pair.CreateKeyPairRequestPayload( + common_template, + private_template, + public_template + ) + + args = (payload, ) + regex = ( + "The cryptographic length must be specified as an attribute for " + "the public key." + ) + self.assertRaisesRegexp( + exceptions.InvalidField, + regex, + e._process_create_key_pair, + *args + ) + e._logger.info.assert_called_once_with( + "Processing operation: CreateKeyPair" + ) + e._logger.reset_mock() + + # Test that a missing PrivateKey CryptographicLength raises an error + common_template = objects.CommonTemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.NAME, + attributes.Name.create( + 'Test Asymmetric Key', + enums.NameType.UNINTERPRETED_TEXT_STRING + ) + ) + ] + ) + public_template = objects.PublicKeyTemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.RSA + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_LENGTH, + 2048 + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [ + enums.CryptographicUsageMask.ENCRYPT + ] + ) + ] + ) + private_template = objects.PrivateKeyTemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.RSA + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [ + enums.CryptographicUsageMask.DECRYPT + ] + ) + ] + ) + payload = create_key_pair.CreateKeyPairRequestPayload( + common_template, + private_template, + public_template + ) + + args = (payload, ) + regex = ( + "The cryptographic length must be specified as an attribute for " + "the private key." + ) + self.assertRaisesRegexp( + exceptions.InvalidField, + regex, + e._process_create_key_pair, + *args + ) + e._logger.info.assert_called_once_with( + "Processing operation: CreateKeyPair" + ) + e._logger.reset_mock() + + # Test that a missing PublicKey CryptographicUsageMask raises an error + common_template = objects.CommonTemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.NAME, + attributes.Name.create( + 'Test Asymmetric Key', + enums.NameType.UNINTERPRETED_TEXT_STRING + ) + ) + ] + ) + public_template = objects.PublicKeyTemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.RSA + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_LENGTH, + 2048 + ) + ] + ) + private_template = objects.PrivateKeyTemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.RSA + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_LENGTH, + 2048 + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [ + enums.CryptographicUsageMask.DECRYPT + ] + ) + ] + ) + payload = create_key_pair.CreateKeyPairRequestPayload( + common_template, + private_template, + public_template + ) + + args = (payload, ) + regex = ( + "The cryptographic usage mask must be specified as an attribute " + "for the public key." + ) + self.assertRaisesRegexp( + exceptions.InvalidField, + regex, + e._process_create_key_pair, + *args + ) + e._logger.info.assert_called_once_with( + "Processing operation: CreateKeyPair" + ) + e._logger.reset_mock() + + # Test that a missing PrivateKey CryptographicUsageMask raises an error + common_template = objects.CommonTemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.NAME, + attributes.Name.create( + 'Test Asymmetric Key', + enums.NameType.UNINTERPRETED_TEXT_STRING + ) + ) + ] + ) + public_template = objects.PublicKeyTemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.RSA + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_LENGTH, + 2048 + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [ + enums.CryptographicUsageMask.ENCRYPT + ] + ) + ] + ) + private_template = objects.PrivateKeyTemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.RSA + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_LENGTH, + 2048 + ) + ] + ) + payload = create_key_pair.CreateKeyPairRequestPayload( + common_template, + private_template, + public_template + ) + + args = (payload, ) + regex = ( + "The cryptographic usage mask must be specified as an attribute " + "for the private key." + ) + self.assertRaisesRegexp( + exceptions.InvalidField, + regex, + e._process_create_key_pair, + *args + ) + e._logger.info.assert_called_once_with( + "Processing operation: CreateKeyPair" + ) + e._logger.reset_mock() + + def test_create_key_pair_mismatched_attributes(self): + """ + Test that the right errors are generated when required attributes + are mismatched in a CreateKeyPair request. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + attribute_factory = factory.AttributeFactory() + + # Test that mismatched CryptographicAlgorithms raise an error. + common_template = objects.CommonTemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.NAME, + attributes.Name.create( + 'Test Asymmetric Key', + enums.NameType.UNINTERPRETED_TEXT_STRING + ) + ) + ] + ) + public_template = objects.PublicKeyTemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.RSA + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_LENGTH, + 2048 + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [ + enums.CryptographicUsageMask.ENCRYPT + ] + ) + ] + ) + private_template = objects.PrivateKeyTemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.DSA + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_LENGTH, + 2048 + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [ + enums.CryptographicUsageMask.DECRYPT + ] + ) + ] + ) + payload = create_key_pair.CreateKeyPairRequestPayload( + common_template, + private_template, + public_template + ) + + args = (payload, ) + regex = ( + "The public and private key algorithms must be the same." + ) + self.assertRaisesRegexp( + exceptions.InvalidField, + regex, + e._process_create_key_pair, + *args + ) + e._logger.info.assert_called_once_with( + "Processing operation: CreateKeyPair" + ) + e._logger.reset_mock() + + # Test that mismatched CryptographicAlgorithms raise an error. + common_template = objects.CommonTemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.NAME, + attributes.Name.create( + 'Test Asymmetric Key', + enums.NameType.UNINTERPRETED_TEXT_STRING + ) + ) + ] + ) + public_template = objects.PublicKeyTemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.RSA + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_LENGTH, + 2048 + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [ + enums.CryptographicUsageMask.ENCRYPT + ] + ) + ] + ) + private_template = objects.PrivateKeyTemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.RSA + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_LENGTH, + 4096 + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [ + enums.CryptographicUsageMask.DECRYPT + ] + ) + ] + ) + payload = create_key_pair.CreateKeyPairRequestPayload( + common_template, + private_template, + public_template + ) + + args = (payload, ) + regex = ( + "The public and private key lengths must be the same." + ) + self.assertRaisesRegexp( + exceptions.InvalidField, + regex, + e._process_create_key_pair, + *args + ) + e._logger.info.assert_called_once_with( + "Processing operation: CreateKeyPair" + ) + e._logger.reset_mock() + def test_register(self): """ Test that a Register request can be processed correctly. @@ -2116,11 +2796,14 @@ class TestKmipEngine(testtools.TestCase): "Processing operation: Destroy" ) self.assertEqual(str(id_a), response_payload.unique_identifier.value) - self.assertRaises( - exc.NoResultFound, - e._data_session.query(pie_objects.OpaqueObject).filter( - pie_objects.ManagedObject.unique_identifier == id_a - ).one + + args = (payload, ) + regex = "Could not locate object: {0}".format(id_a) + self.assertRaisesRegexp( + exceptions.ItemNotFound, + regex, + e._process_destroy, + *args ) e._data_session.commit() @@ -2139,11 +2822,14 @@ class TestKmipEngine(testtools.TestCase): "Processing operation: Destroy" ) self.assertEqual(str(id_b), response_payload.unique_identifier.value) - self.assertRaises( - exc.NoResultFound, - e._data_session.query(pie_objects.OpaqueObject).filter( - pie_objects.ManagedObject.unique_identifier == id_b - ).one + + args = (payload, ) + regex = "Could not locate object: {0}".format(id_b) + self.assertRaisesRegexp( + exceptions.ItemNotFound, + regex, + e._process_destroy, + *args ) e._data_session.commit() @@ -2177,27 +2863,31 @@ class TestKmipEngine(testtools.TestCase): e._logger.info.assert_called_once_with("Processing operation: Query") self.assertIsInstance(result, query.QueryResponsePayload) self.assertIsNotNone(result.operations) - self.assertEqual(5, len(result.operations)) + self.assertEqual(6, len(result.operations)) self.assertEqual( enums.Operation.CREATE, result.operations[0].value ) self.assertEqual( - enums.Operation.REGISTER, + enums.Operation.CREATE_KEY_PAIR, result.operations[1].value ) self.assertEqual( - enums.Operation.GET, + enums.Operation.REGISTER, result.operations[2].value ) self.assertEqual( - enums.Operation.DESTROY, + enums.Operation.GET, result.operations[3].value ) self.assertEqual( - enums.Operation.QUERY, + enums.Operation.DESTROY, result.operations[4].value ) + self.assertEqual( + enums.Operation.QUERY, + result.operations[5].value + ) self.assertEqual(list(), result.object_types) self.assertIsNotNone(result.vendor_identification) self.assertEqual( @@ -2216,7 +2906,7 @@ class TestKmipEngine(testtools.TestCase): e._logger.info.assert_called_once_with("Processing operation: Query") self.assertIsNotNone(result.operations) - self.assertEqual(6, len(result.operations)) + self.assertEqual(7, len(result.operations)) self.assertEqual( enums.Operation.DISCOVER_VERSIONS, result.operations[-1].value @@ -2401,11 +3091,232 @@ class TestKmipEngine(testtools.TestCase): "Processing operation: Destroy" ) self.assertEqual(str(uid), response_payload.unique_identifier.value) - self.assertRaises( - exc.NoResultFound, - e._data_session.query(pie_objects.OpaqueObject).filter( - pie_objects.ManagedObject.unique_identifier == uid - ).one + + args = (payload, ) + regex = "Could not locate object: {0}".format(uid) + self.assertRaisesRegexp( + exceptions.ItemNotFound, + regex, + e._process_destroy, + *args + ) + + e._data_session.commit() + e._data_store_session_factory() + + def test_create_key_pair_get_destroy(self): + """ + Test that a key pair can be created, retrieved, and destroyed without + error. + """ + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + attribute_factory = factory.AttributeFactory() + + common_template = objects.CommonTemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.NAME, + attributes.Name.create( + 'Test Asymmetric Key', + enums.NameType.UNINTERPRETED_TEXT_STRING + ) + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + enums.CryptographicAlgorithm.RSA + ), + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_LENGTH, + 2048 + ) + ] + ) + public_template = objects.PublicKeyTemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [ + enums.CryptographicUsageMask.ENCRYPT + ] + ) + ] + ) + private_template = objects.PrivateKeyTemplateAttribute( + attributes=[ + attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [ + enums.CryptographicUsageMask.DECRYPT + ] + ) + ] + ) + payload = create_key_pair.CreateKeyPairRequestPayload( + common_template, + private_template, + public_template + ) + + response_payload = e._process_create_key_pair(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_called_once_with( + "Processing operation: CreateKeyPair" + ) + + public_id = response_payload.public_key_uuid.value + self.assertEqual('1', public_id) + private_id = response_payload.private_key_uuid.value + self.assertEqual('2', private_id) + + e._logger.reset_mock() + + # Retrieve the created public key using Get and verify all fields set + payload = get.GetRequestPayload( + unique_identifier=attributes.UniqueIdentifier(public_id) + ) + + response_payload = e._process_get(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_called_once_with( + "Processing operation: Get" + ) + self.assertEqual( + enums.ObjectType.PUBLIC_KEY, + response_payload.object_type.value + ) + self.assertEqual( + str(public_id), + response_payload.unique_identifier.value + ) + self.assertIsInstance(response_payload.secret, secrets.PublicKey) + + key_block = response_payload.secret.key_block + self.assertEqual( + enums.KeyFormatType.PKCS_1, + key_block.key_format_type.value + ) + self.assertEqual( + enums.CryptographicAlgorithm.RSA, + key_block.cryptographic_algorithm.value + ) + self.assertEqual( + 2048, + key_block.cryptographic_length.value + ) + + e._logger.reset_mock() + + # Retrieve the created private key using Get and verify all fields set + payload = get.GetRequestPayload( + unique_identifier=attributes.UniqueIdentifier(private_id) + ) + + response_payload = e._process_get(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_called_once_with( + "Processing operation: Get" + ) + self.assertEqual( + enums.ObjectType.PRIVATE_KEY, + response_payload.object_type.value + ) + self.assertEqual( + str(private_id), + response_payload.unique_identifier.value + ) + self.assertIsInstance(response_payload.secret, secrets.PrivateKey) + + key_block = response_payload.secret.key_block + self.assertEqual( + enums.KeyFormatType.PKCS_8, + key_block.key_format_type.value + ) + self.assertEqual( + enums.CryptographicAlgorithm.RSA, + key_block.cryptographic_algorithm.value + ) + self.assertEqual( + 2048, + key_block.cryptographic_length.value + ) + + e._data_session.commit() + e._data_store_session_factory() + e._logger.reset_mock() + + # Destroy the public key and verify it cannot be accessed again + payload = destroy.DestroyRequestPayload( + unique_identifier=attributes.UniqueIdentifier(public_id) + ) + + response_payload = e._process_destroy(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_called_once_with( + "Processing operation: Destroy" + ) + self.assertEqual( + str(public_id), + response_payload.unique_identifier.value + ) + + e._data_session.commit() + e._data_store_session_factory() + e._logger.reset_mock() + + args = (payload, ) + regex = "Could not locate object: {0}".format(public_id) + self.assertRaisesRegexp( + exceptions.ItemNotFound, + regex, + e._process_destroy, + *args + ) + + e._data_session.commit() + e._data_store_session_factory() + e._logger.reset_mock() + + # Destroy the private key and verify it cannot be accessed again + payload = destroy.DestroyRequestPayload( + unique_identifier=attributes.UniqueIdentifier(private_id) + ) + + response_payload = e._process_destroy(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_called_once_with( + "Processing operation: Destroy" + ) + self.assertEqual( + str(private_id), + response_payload.unique_identifier.value + ) + + e._data_session.commit() + e._data_store_session_factory() + e._logger.reset_mock() + + args = (payload, ) + regex = "Could not locate object: {0}".format(private_id) + self.assertRaisesRegexp( + exceptions.ItemNotFound, + regex, + e._process_destroy, + *args ) e._data_session.commit() @@ -2539,11 +3450,14 @@ class TestKmipEngine(testtools.TestCase): "Processing operation: Destroy" ) self.assertEqual(str(uid), response_payload.unique_identifier.value) - self.assertRaises( - exc.NoResultFound, - e._data_session.query(pie_objects.OpaqueObject).filter( - pie_objects.ManagedObject.unique_identifier == uid - ).one + + args = (payload, ) + regex = "Could not locate object: {0}".format(uid) + self.assertRaisesRegexp( + exceptions.ItemNotFound, + regex, + e._process_destroy, + *args ) e._data_session.commit()