mirror of
https://github.com/OpenKMIP/PyKMIP.git
synced 2025-04-08 19:25:06 +02:00
Add SplitKey server integration tests
This change adds integration tests that test registering, retrieving, and destroying SplitKey objects with the server. Minor updates are included for the client and server to ensure that SplitKey operations function as expected. Partially implements #545
This commit is contained in:
parent
a8713fc909
commit
29750cbda6
@ -31,6 +31,7 @@ from kmip.core.secrets import OpaqueObject
|
||||
from kmip.core.secrets import PrivateKey
|
||||
from kmip.core.secrets import PublicKey
|
||||
from kmip.core.secrets import SecretData
|
||||
from kmip.core.secrets import SplitKey
|
||||
from kmip.core.secrets import SymmetricKey
|
||||
from kmip.core.secrets import Template
|
||||
|
||||
@ -116,7 +117,18 @@ class SecretFactory(object):
|
||||
return PrivateKey(key_block)
|
||||
|
||||
def _create_split_key(self, value):
|
||||
raise NotImplementedError()
|
||||
if value is None:
|
||||
return SplitKey()
|
||||
else:
|
||||
key_block = self._build_key_block(value)
|
||||
return SplitKey(
|
||||
split_key_parts=value.get("split_key_parts"),
|
||||
key_part_identifier=value.get("key_part_identifier"),
|
||||
split_key_threshold=value.get("split_key_threshold"),
|
||||
split_key_method=value.get("split_key_method"),
|
||||
prime_field_size=value.get("prime_field_size"),
|
||||
key_block=key_block
|
||||
)
|
||||
|
||||
def _create_template(self, value):
|
||||
if value is None:
|
||||
|
@ -194,6 +194,11 @@ class ObjectFactory:
|
||||
def _build_core_split_key(self, secret):
|
||||
key_material = cobjects.KeyMaterial(secret.value)
|
||||
key_value = cobjects.KeyValue(key_material)
|
||||
key_wrapping_data = None
|
||||
if secret.key_wrapping_data:
|
||||
key_wrapping_data = cobjects.KeyWrappingData(
|
||||
**secret.key_wrapping_data
|
||||
)
|
||||
key_block = cobjects.KeyBlock(
|
||||
key_format_type=misc.KeyFormatType(secret.key_format_type),
|
||||
key_compression_type=None,
|
||||
@ -204,9 +209,7 @@ class ObjectFactory:
|
||||
cryptographic_length=attributes.CryptographicLength(
|
||||
secret.cryptographic_length
|
||||
),
|
||||
key_wrapping_data=cobjects.KeyWrappingData(
|
||||
**secret.key_wrapping_data
|
||||
)
|
||||
key_wrapping_data=key_wrapping_data
|
||||
)
|
||||
return secrets.SplitKey(
|
||||
split_key_parts=secret.split_key_parts,
|
||||
|
@ -519,6 +519,19 @@ class KmipEngine(object):
|
||||
'opaque_data_type': obj.opaque_type,
|
||||
'opaque_data_value': obj.value
|
||||
}
|
||||
elif object_type == enums.ObjectType.SPLIT_KEY:
|
||||
value = {
|
||||
"cryptographic_algorithm": obj.cryptographic_algorithm,
|
||||
"cryptographic_length": obj.cryptographic_length,
|
||||
"key_format_type": obj.key_format_type,
|
||||
"key_value": obj.value,
|
||||
"key_wrapping_data": obj.key_wrapping_data,
|
||||
"split_key_parts": obj.split_key_parts,
|
||||
"key_part_identifier": obj.key_part_identifier,
|
||||
"split_key_threshold": obj.split_key_threshold,
|
||||
"split_key_method": obj.split_key_method,
|
||||
"prime_field_size": obj.prime_field_size
|
||||
}
|
||||
else:
|
||||
name = object_type.name
|
||||
raise exceptions.InvalidField(
|
||||
|
@ -54,6 +54,7 @@ from kmip.core.secrets import PublicKey
|
||||
from kmip.core.secrets import Certificate
|
||||
from kmip.core.secrets import SecretData
|
||||
from kmip.core.secrets import OpaqueObject
|
||||
from kmip.core.secrets import SplitKey
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("client")
|
||||
@ -1675,3 +1676,105 @@ class TestIntegration(testtools.TestCase):
|
||||
ResultStatus.OPERATION_FAILED,
|
||||
result.result_status.value
|
||||
)
|
||||
|
||||
def test_split_key_register_get_destroy(self):
|
||||
"""
|
||||
Tests that split keys are properly registered, retrieved, and
|
||||
destroyed.
|
||||
"""
|
||||
usage_mask = self.attr_factory.create_attribute(
|
||||
AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
|
||||
[CryptographicUsageMask.ENCRYPT, CryptographicUsageMask.DECRYPT]
|
||||
)
|
||||
key_name = "Integration Test - Register-Get-Destroy Split Key"
|
||||
name = self.attr_factory.create_attribute(AttributeType.NAME, key_name)
|
||||
template_attribute = TemplateAttribute(attributes=[usage_mask, name])
|
||||
|
||||
key_data = (
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||
)
|
||||
|
||||
key_block = KeyBlock(
|
||||
key_format_type=KeyFormatType(KeyFormatTypeEnum.RAW),
|
||||
key_compression_type=None,
|
||||
key_value=KeyValue(KeyMaterial(key_data)),
|
||||
cryptographic_algorithm=CryptographicAlgorithm(
|
||||
CryptoAlgorithmEnum.AES
|
||||
),
|
||||
cryptographic_length=CryptographicLength(128),
|
||||
key_wrapping_data=None
|
||||
)
|
||||
|
||||
secret = SplitKey(
|
||||
split_key_parts=3,
|
||||
key_part_identifier=1,
|
||||
split_key_threshold=2,
|
||||
split_key_method=enums.SplitKeyMethod.XOR,
|
||||
prime_field_size=None,
|
||||
key_block=key_block
|
||||
)
|
||||
|
||||
result = self.client.register(
|
||||
ObjectType.SPLIT_KEY,
|
||||
template_attribute,
|
||||
secret,
|
||||
credential=None
|
||||
)
|
||||
|
||||
self._check_result_status(result, ResultStatus, ResultStatus.SUCCESS)
|
||||
self._check_uuid(result.uuid, str)
|
||||
|
||||
# Check that the returned key bytes match what was provided
|
||||
uuid = result.uuid
|
||||
result = self.client.get(uuid=uuid, credential=None)
|
||||
|
||||
self._check_result_status(result, ResultStatus, ResultStatus.SUCCESS)
|
||||
self._check_object_type(
|
||||
result.object_type,
|
||||
ObjectType,
|
||||
ObjectType.SPLIT_KEY
|
||||
)
|
||||
self._check_uuid(result.uuid, str)
|
||||
|
||||
self.assertEqual(3, result.secret.split_key_parts)
|
||||
self.assertEqual(1, result.secret.key_part_identifier)
|
||||
self.assertEqual(2, result.secret.split_key_threshold)
|
||||
self.assertEqual(
|
||||
enums.SplitKeyMethod.XOR,
|
||||
result.secret.split_key_method
|
||||
)
|
||||
self.assertIsNone(result.secret.prime_field_size)
|
||||
|
||||
# Check the secret type
|
||||
self.assertIsInstance(result.secret, SplitKey)
|
||||
self.assertEqual(
|
||||
key_data,
|
||||
result.secret.key_block.key_value.key_material.value
|
||||
)
|
||||
|
||||
self.logger.debug(
|
||||
'Destroying key: ' + key_name + '\nWith UUID: ' + result.uuid
|
||||
)
|
||||
|
||||
result = self.client.destroy(result.uuid)
|
||||
self._check_result_status(
|
||||
result,
|
||||
ResultStatus,
|
||||
ResultStatus.SUCCESS
|
||||
)
|
||||
self._check_uuid(result.uuid.value, str)
|
||||
|
||||
# Verify the secret was destroyed
|
||||
result = self.client.get(uuid=uuid, credential=None)
|
||||
|
||||
self._check_result_status(
|
||||
result,
|
||||
ResultStatus,
|
||||
ResultStatus.OPERATION_FAILED
|
||||
)
|
||||
self.assertIsInstance(result.result_reason.value, ResultReason)
|
||||
self.assertEqual(
|
||||
ResultReason.ITEM_NOT_FOUND,
|
||||
result.result_reason.value
|
||||
)
|
||||
|
@ -1295,3 +1295,57 @@ class TestProxyKmipClientIntegration(testtools.TestCase):
|
||||
# Clean up the keys
|
||||
self.client.destroy(a_id)
|
||||
self.client.destroy(b_id)
|
||||
|
||||
def test_split_key_register_get_destroy(self):
|
||||
"""
|
||||
Test that the ProxyKmipClient can register, retrieve, and destroy a
|
||||
split key.
|
||||
"""
|
||||
key = objects.SplitKey(
|
||||
cryptographic_algorithm=enums.CryptographicAlgorithm.AES,
|
||||
cryptographic_length=128,
|
||||
key_value=(
|
||||
b'\x00\x01\x02\x03\x04\x05\x06\x07'
|
||||
b'\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F'
|
||||
),
|
||||
name="Test Split Key",
|
||||
cryptographic_usage_masks=[enums.CryptographicUsageMask.EXPORT],
|
||||
key_format_type=enums.KeyFormatType.RAW,
|
||||
key_wrapping_data=None,
|
||||
split_key_parts=3,
|
||||
key_part_identifier=1,
|
||||
split_key_threshold=2,
|
||||
split_key_method=enums.SplitKeyMethod.XOR,
|
||||
prime_field_size=None
|
||||
)
|
||||
|
||||
uid = self.client.register(key)
|
||||
self.assertIsInstance(uid, six.string_types)
|
||||
|
||||
try:
|
||||
result = self.client.get(uid)
|
||||
self.assertIsInstance(result, objects.SplitKey)
|
||||
self.assertEqual(
|
||||
enums.CryptographicAlgorithm.AES,
|
||||
result.cryptographic_algorithm
|
||||
)
|
||||
self.assertEqual(128, result.cryptographic_length)
|
||||
self.assertEqual(
|
||||
(
|
||||
b'\x00\x01\x02\x03\x04\x05\x06\x07'
|
||||
b'\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F'
|
||||
),
|
||||
result.value
|
||||
)
|
||||
self.assertEqual(enums.KeyFormatType.RAW, result.key_format_type)
|
||||
self.assertEqual(3, result.split_key_parts)
|
||||
self.assertEqual(1, result.key_part_identifier)
|
||||
self.assertEqual(2, result.split_key_threshold)
|
||||
self.assertEqual(enums.SplitKeyMethod.XOR, result.split_key_method)
|
||||
self.assertIsNone(result.prime_field_size)
|
||||
finally:
|
||||
self.client.destroy(uid)
|
||||
self.assertRaises(
|
||||
exceptions.KmipOperationFailure, self.client.get, uid)
|
||||
self.assertRaises(
|
||||
exceptions.KmipOperationFailure, self.client.destroy, uid)
|
||||
|
@ -1038,10 +1038,10 @@ class TestKmipEngine(testtools.TestCase):
|
||||
|
||||
class DummyObject:
|
||||
def __init__(self):
|
||||
self._object_type = enums.ObjectType.SPLIT_KEY
|
||||
self._object_type = enums.ObjectType.TEMPLATE
|
||||
|
||||
args = (DummyObject(), )
|
||||
regex = "The SplitKey object type is not supported."
|
||||
regex = "The Template object type is not supported."
|
||||
six.assertRaisesRegex(
|
||||
self,
|
||||
exceptions.InvalidField,
|
||||
|
Loading…
x
Reference in New Issue
Block a user