From 29750cbda6282bd512c4bfe6978d19721daf8ed9 Mon Sep 17 00:00:00 2001 From: Peter Hamilton Date: Fri, 20 Sep 2019 13:36:01 -0400 Subject: [PATCH] Add SplitKey server integration tests This change adds integration tests that test registering, retrieving, and destroying SplitKey objects with the server. Minor updates are included for the client and server to ensure that SplitKey operations function as expected. Partially implements #545 --- kmip/core/factories/secrets.py | 14 ++- kmip/pie/factory.py | 9 +- kmip/services/server/engine.py | 13 +++ .../integration/services/test_integration.py | 103 ++++++++++++++++++ .../services/test_proxykmipclient.py | 54 +++++++++ .../tests/unit/services/server/test_engine.py | 4 +- 6 files changed, 191 insertions(+), 6 deletions(-) diff --git a/kmip/core/factories/secrets.py b/kmip/core/factories/secrets.py index 628c9cc..2880631 100644 --- a/kmip/core/factories/secrets.py +++ b/kmip/core/factories/secrets.py @@ -31,6 +31,7 @@ from kmip.core.secrets import OpaqueObject from kmip.core.secrets import PrivateKey from kmip.core.secrets import PublicKey from kmip.core.secrets import SecretData +from kmip.core.secrets import SplitKey from kmip.core.secrets import SymmetricKey from kmip.core.secrets import Template @@ -116,7 +117,18 @@ class SecretFactory(object): return PrivateKey(key_block) def _create_split_key(self, value): - raise NotImplementedError() + if value is None: + return SplitKey() + else: + key_block = self._build_key_block(value) + return SplitKey( + split_key_parts=value.get("split_key_parts"), + key_part_identifier=value.get("key_part_identifier"), + split_key_threshold=value.get("split_key_threshold"), + split_key_method=value.get("split_key_method"), + prime_field_size=value.get("prime_field_size"), + key_block=key_block + ) def _create_template(self, value): if value is None: diff --git a/kmip/pie/factory.py b/kmip/pie/factory.py index 3752a0a..c986193 100644 --- a/kmip/pie/factory.py +++ b/kmip/pie/factory.py @@ -194,6 +194,11 @@ class ObjectFactory: def _build_core_split_key(self, secret): key_material = cobjects.KeyMaterial(secret.value) key_value = cobjects.KeyValue(key_material) + key_wrapping_data = None + if secret.key_wrapping_data: + key_wrapping_data = cobjects.KeyWrappingData( + **secret.key_wrapping_data + ) key_block = cobjects.KeyBlock( key_format_type=misc.KeyFormatType(secret.key_format_type), key_compression_type=None, @@ -204,9 +209,7 @@ class ObjectFactory: cryptographic_length=attributes.CryptographicLength( secret.cryptographic_length ), - key_wrapping_data=cobjects.KeyWrappingData( - **secret.key_wrapping_data - ) + key_wrapping_data=key_wrapping_data ) return secrets.SplitKey( split_key_parts=secret.split_key_parts, diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index dd7b99d..d410fc5 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -519,6 +519,19 @@ class KmipEngine(object): 'opaque_data_type': obj.opaque_type, 'opaque_data_value': obj.value } + elif object_type == enums.ObjectType.SPLIT_KEY: + value = { + "cryptographic_algorithm": obj.cryptographic_algorithm, + "cryptographic_length": obj.cryptographic_length, + "key_format_type": obj.key_format_type, + "key_value": obj.value, + "key_wrapping_data": obj.key_wrapping_data, + "split_key_parts": obj.split_key_parts, + "key_part_identifier": obj.key_part_identifier, + "split_key_threshold": obj.split_key_threshold, + "split_key_method": obj.split_key_method, + "prime_field_size": obj.prime_field_size + } else: name = object_type.name raise exceptions.InvalidField( diff --git a/kmip/tests/integration/services/test_integration.py b/kmip/tests/integration/services/test_integration.py index a51912c..c2866d9 100644 --- a/kmip/tests/integration/services/test_integration.py +++ b/kmip/tests/integration/services/test_integration.py @@ -54,6 +54,7 @@ from kmip.core.secrets import PublicKey from kmip.core.secrets import Certificate from kmip.core.secrets import SecretData from kmip.core.secrets import OpaqueObject +from kmip.core.secrets import SplitKey @pytest.mark.usefixtures("client") @@ -1675,3 +1676,105 @@ class TestIntegration(testtools.TestCase): ResultStatus.OPERATION_FAILED, result.result_status.value ) + + def test_split_key_register_get_destroy(self): + """ + Tests that split keys are properly registered, retrieved, and + destroyed. + """ + usage_mask = self.attr_factory.create_attribute( + AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [CryptographicUsageMask.ENCRYPT, CryptographicUsageMask.DECRYPT] + ) + key_name = "Integration Test - Register-Get-Destroy Split Key" + name = self.attr_factory.create_attribute(AttributeType.NAME, key_name) + template_attribute = TemplateAttribute(attributes=[usage_mask, name]) + + key_data = ( + b'\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x00\x00\x00\x00\x00\x00\x00\x00' + ) + + key_block = KeyBlock( + key_format_type=KeyFormatType(KeyFormatTypeEnum.RAW), + key_compression_type=None, + key_value=KeyValue(KeyMaterial(key_data)), + cryptographic_algorithm=CryptographicAlgorithm( + CryptoAlgorithmEnum.AES + ), + cryptographic_length=CryptographicLength(128), + key_wrapping_data=None + ) + + secret = SplitKey( + split_key_parts=3, + key_part_identifier=1, + split_key_threshold=2, + split_key_method=enums.SplitKeyMethod.XOR, + prime_field_size=None, + key_block=key_block + ) + + result = self.client.register( + ObjectType.SPLIT_KEY, + template_attribute, + secret, + credential=None + ) + + self._check_result_status(result, ResultStatus, ResultStatus.SUCCESS) + self._check_uuid(result.uuid, str) + + # Check that the returned key bytes match what was provided + uuid = result.uuid + result = self.client.get(uuid=uuid, credential=None) + + self._check_result_status(result, ResultStatus, ResultStatus.SUCCESS) + self._check_object_type( + result.object_type, + ObjectType, + ObjectType.SPLIT_KEY + ) + self._check_uuid(result.uuid, str) + + self.assertEqual(3, result.secret.split_key_parts) + self.assertEqual(1, result.secret.key_part_identifier) + self.assertEqual(2, result.secret.split_key_threshold) + self.assertEqual( + enums.SplitKeyMethod.XOR, + result.secret.split_key_method + ) + self.assertIsNone(result.secret.prime_field_size) + + # Check the secret type + self.assertIsInstance(result.secret, SplitKey) + self.assertEqual( + key_data, + result.secret.key_block.key_value.key_material.value + ) + + self.logger.debug( + 'Destroying key: ' + key_name + '\nWith UUID: ' + result.uuid + ) + + result = self.client.destroy(result.uuid) + self._check_result_status( + result, + ResultStatus, + ResultStatus.SUCCESS + ) + self._check_uuid(result.uuid.value, str) + + # Verify the secret was destroyed + result = self.client.get(uuid=uuid, credential=None) + + self._check_result_status( + result, + ResultStatus, + ResultStatus.OPERATION_FAILED + ) + self.assertIsInstance(result.result_reason.value, ResultReason) + self.assertEqual( + ResultReason.ITEM_NOT_FOUND, + result.result_reason.value + ) diff --git a/kmip/tests/integration/services/test_proxykmipclient.py b/kmip/tests/integration/services/test_proxykmipclient.py index 1df64a8..5d22a97 100644 --- a/kmip/tests/integration/services/test_proxykmipclient.py +++ b/kmip/tests/integration/services/test_proxykmipclient.py @@ -1295,3 +1295,57 @@ class TestProxyKmipClientIntegration(testtools.TestCase): # Clean up the keys self.client.destroy(a_id) self.client.destroy(b_id) + + def test_split_key_register_get_destroy(self): + """ + Test that the ProxyKmipClient can register, retrieve, and destroy a + split key. + """ + key = objects.SplitKey( + cryptographic_algorithm=enums.CryptographicAlgorithm.AES, + cryptographic_length=128, + key_value=( + b'\x00\x01\x02\x03\x04\x05\x06\x07' + b'\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F' + ), + name="Test Split Key", + cryptographic_usage_masks=[enums.CryptographicUsageMask.EXPORT], + key_format_type=enums.KeyFormatType.RAW, + key_wrapping_data=None, + split_key_parts=3, + key_part_identifier=1, + split_key_threshold=2, + split_key_method=enums.SplitKeyMethod.XOR, + prime_field_size=None + ) + + uid = self.client.register(key) + self.assertIsInstance(uid, six.string_types) + + try: + result = self.client.get(uid) + self.assertIsInstance(result, objects.SplitKey) + self.assertEqual( + enums.CryptographicAlgorithm.AES, + result.cryptographic_algorithm + ) + self.assertEqual(128, result.cryptographic_length) + self.assertEqual( + ( + b'\x00\x01\x02\x03\x04\x05\x06\x07' + b'\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F' + ), + result.value + ) + self.assertEqual(enums.KeyFormatType.RAW, result.key_format_type) + self.assertEqual(3, result.split_key_parts) + self.assertEqual(1, result.key_part_identifier) + self.assertEqual(2, result.split_key_threshold) + self.assertEqual(enums.SplitKeyMethod.XOR, result.split_key_method) + self.assertIsNone(result.prime_field_size) + finally: + self.client.destroy(uid) + self.assertRaises( + exceptions.KmipOperationFailure, self.client.get, uid) + self.assertRaises( + exceptions.KmipOperationFailure, self.client.destroy, uid) diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index 1dbc4c2..50f1e06 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -1038,10 +1038,10 @@ class TestKmipEngine(testtools.TestCase): class DummyObject: def __init__(self): - self._object_type = enums.ObjectType.SPLIT_KEY + self._object_type = enums.ObjectType.TEMPLATE args = (DummyObject(), ) - regex = "The SplitKey object type is not supported." + regex = "The Template object type is not supported." six.assertRaisesRegex( self, exceptions.InvalidField,