Updating the ProxyKmipClient to support getting wrapped keys

This change updates the ProxyKmipClient, allowing Get operation
calls to retrieve wrapped keys by providing key wrapping
specification information with the request. Unit tests have been
added and updated to reflect this change.
This commit is contained in:
Peter Hamilton 2017-09-30 12:11:19 -04:00
parent aa798d939c
commit 6fa4999cc5
3 changed files with 343 additions and 8 deletions

View File

@ -559,12 +559,15 @@ class ProxyKmipClient(api.KmipClient):
raise exceptions.KmipOperationFailure(status, reason, message) raise exceptions.KmipOperationFailure(status, reason, message)
@is_connected @is_connected
def get(self, uid=None): def get(self, uid=None, key_wrapping_specification=None):
""" """
Get a managed object from a KMIP appliance. Get a managed object from a KMIP appliance.
Args: Args:
uid (string): The unique ID of the managed object to retrieve. uid (string): The unique ID of the managed object to retrieve.
key_wrapping_specification (dict): A dictionary containing various
settings to be used when wrapping the key during retrieval.
See Note below. Optional, defaults to None.
Returns: Returns:
ManagedObject: The retrieved managed object object. ManagedObject: The retrieved managed object object.
@ -573,14 +576,48 @@ class ProxyKmipClient(api.KmipClient):
ClientConnectionNotOpen: if the client connection is unusable ClientConnectionNotOpen: if the client connection is unusable
KmipOperationFailure: if the operation result is a failure KmipOperationFailure: if the operation result is a failure
TypeError: if the input argument is invalid TypeError: if the input argument is invalid
Notes:
The derivation_parameters argument is a dictionary that can
contain the following key/value pairs:
Key | Value
--------------------------------|---------------------------------
'wrapping_method' | A WrappingMethod enumeration
| that specifies how the object
| should be wrapped.
'encryption_key_information' | A dictionary containing the ID
| of the wrapping key and
| associated cryptographic
| parameters.
'mac_signature_key_information' | A dictionary containing the ID
| of the wrapping key and
| associated cryptographic
| parameters.
'attribute_names' | A list of strings representing
| the names of attributes that
| should be included with the
| wrapped object.
'encoding_option' | An EncodingOption enumeration
| that specifies the encoding of
| the object before it is wrapped.
""" """
# Check input # Check input
if uid is not None: if uid is not None:
if not isinstance(uid, six.string_types): if not isinstance(uid, six.string_types):
raise TypeError("uid must be a string") raise TypeError("uid must be a string")
if key_wrapping_specification is not None:
if not isinstance(key_wrapping_specification, dict):
raise TypeError(
"Key wrapping specification must be a dictionary."
)
spec = self._build_key_wrapping_specification(
key_wrapping_specification
)
# Get the managed object and handle the results # Get the managed object and handle the results
result = self.proxy.get(uid) result = self.proxy.get(uid, key_wrapping_specification=spec)
status = result.result_status.value status = result.result_status.value
if status == enums.ResultStatus.SUCCESS: if status == enums.ResultStatus.SUCCESS:
@ -1226,6 +1263,103 @@ class ProxyKmipClient(api.KmipClient):
) )
return cryptographic_parameters return cryptographic_parameters
def _build_encryption_key_information(self, value):
"""
Build an EncryptionKeyInformation struct from a dictionary.
Args:
value (dict): A dictionary containing the key/value pairs for a
EncryptionKeyInformation struct.
Returns:
EncryptionKeyInformation: an EncryptionKeyInformation struct
Raises:
TypeError: if the input argument is invalid
"""
if value is None:
return None
if not isinstance(value, dict):
raise TypeError("Encryption key information must be a dictionary.")
cryptographic_parameters = value.get('cryptographic_parameters')
if cryptographic_parameters:
cryptographic_parameters = self._build_cryptographic_parameters(
cryptographic_parameters
)
encryption_key_information = cobjects.EncryptionKeyInformation(
unique_identifier=value.get('unique_identifier'),
cryptographic_parameters=cryptographic_parameters
)
return encryption_key_information
def _build_mac_signature_key_information(self, value):
"""
Build an MACSignatureKeyInformation struct from a dictionary.
Args:
value (dict): A dictionary containing the key/value pairs for a
MACSignatureKeyInformation struct.
Returns:
MACSignatureInformation: a MACSignatureKeyInformation struct
Raises:
TypeError: if the input argument is invalid
"""
if value is None:
return None
if not isinstance(value, dict):
raise TypeError(
"MAC/signature key information must be a dictionary."
)
cryptographic_parameters = value.get('cryptographic_parameters')
if cryptographic_parameters:
cryptographic_parameters = self._build_cryptographic_parameters(
cryptographic_parameters
)
mac_signature_key_information = cobjects.MACSignatureKeyInformation(
unique_identifier=value.get('unique_identifier'),
cryptographic_parameters=cryptographic_parameters
)
return mac_signature_key_information
def _build_key_wrapping_specification(self, value):
"""
Build a KeyWrappingSpecification struct from a dictionary.
Args:
value (dict): A dictionary containing the key/value pairs for a
KeyWrappingSpecification struct.
Returns:
KeyWrappingSpecification: a KeyWrappingSpecification struct
Raises:
TypeError: if the input argument is invalid
"""
if value is None:
return None
if not isinstance(value, dict):
raise TypeError("Key wrapping specification must be a dictionary.")
encryption_key_info = self._build_encryption_key_information(
value.get('encryption_key_information')
)
mac_signature_key_info = self._build_mac_signature_key_information(
value.get('mac_signature_key_information')
)
key_wrapping_specification = cobjects.KeyWrappingSpecification(
wrapping_method=value.get('wrapping_method'),
encryption_key_information=encryption_key_info,
mac_signature_key_information=mac_signature_key_info,
attribute_names=value.get('attribute_names'),
encoding_option=value.get('encoding_option')
)
return key_wrapping_specification
def _build_common_attributes(self, operation_policy_name=None): def _build_common_attributes(self, operation_policy_name=None):
''' '''
Build a list of common attributes that are shared across Build a list of common attributes that are shared across

View File

@ -1059,18 +1059,14 @@ class KMIPProxy(KMIP):
credential=None): credential=None):
operation = Operation(OperationEnum.GET) operation = Operation(OperationEnum.GET)
kws = None
if key_format_type is not None: if key_format_type is not None:
key_format_type = key_format_type.value key_format_type = key_format_type.value
if key_wrapping_specification is not None:
kws = objects.KeyWrappingSpecification(key_wrapping_specification)
req_pl = payloads.GetRequestPayload( req_pl = payloads.GetRequestPayload(
unique_identifier=unique_identifier, unique_identifier=unique_identifier,
key_format_type=key_format_type, key_format_type=key_format_type,
key_compression_type=key_compression_type, key_compression_type=key_compression_type,
key_wrapping_specification=kws key_wrapping_specification=key_wrapping_specification
) )
batch_item = messages.RequestBatchItem(operation=operation, batch_item = messages.RequestBatchItem(operation=operation,

View File

@ -743,7 +743,9 @@ class TestProxyKmipClient(testtools.TestCase):
result = client.get('aaaaaaaa-1111-2222-3333-ffffffffffff') result = client.get('aaaaaaaa-1111-2222-3333-ffffffffffff')
client.proxy.get.assert_called_with( client.proxy.get.assert_called_with(
'aaaaaaaa-1111-2222-3333-ffffffffffff') 'aaaaaaaa-1111-2222-3333-ffffffffffff',
key_wrapping_specification=None
)
self.assertIsInstance(result, objects.SymmetricKey) self.assertIsInstance(result, objects.SymmetricKey)
self.assertEqual(result, secret) self.assertEqual(result, secret)
@ -758,6 +760,24 @@ class TestProxyKmipClient(testtools.TestCase):
with ProxyKmipClient() as client: with ProxyKmipClient() as client:
self.assertRaises(TypeError, client.get, *args) self.assertRaises(TypeError, client.get, *args)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_get_on_invalid_key_wrapping_specification(self):
"""
Test that a TypeError exception is raised when trying to retrieve a
secret with an invalid key wrapping specification.
"""
args = ['1']
kwargs = {'key_wrapping_specification': 'invalid'}
with ProxyKmipClient() as client:
self.assertRaisesRegexp(
TypeError,
"Key wrapping specification must be a dictionary.",
client.get,
*args,
**kwargs
)
@mock.patch('kmip.pie.client.KMIPProxy', @mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy)) mock.MagicMock(spec_set=KMIPProxy))
def test_get_on_closed(self): def test_get_on_closed(self):
@ -2594,3 +2614,188 @@ class TestProxyKmipClient(testtools.TestCase):
client._build_cryptographic_parameters, client._build_cryptographic_parameters,
*args *args
) )
def test_build_encryption_key_information(self):
"""
Test that an EncryptionKeyInformation struct can be built from a
dictionary.
"""
client = ProxyKmipClient()
# Test with no value
result = client._build_encryption_key_information(None)
self.assertEqual(None, result)
# Test with a value
result = client._build_encryption_key_information(
{
'unique_identifier': 'test',
'cryptographic_parameters': {
'block_cipher_mode': enums.BlockCipherMode.CBC
}
}
)
self.assertIsInstance(result, obj.EncryptionKeyInformation)
self.assertEqual('test', result.unique_identifier)
self.assertIsInstance(
result.cryptographic_parameters,
obj.CryptographicParameters
)
self.assertEqual(
enums.BlockCipherMode.CBC,
result.cryptographic_parameters.block_cipher_mode
)
def test_build_encryption_key_information_invalid(self):
"""
Test that the right error is raised when attempting to build
an EncryptionKeyInformation struct with an invalid value.
"""
client = ProxyKmipClient()
args = ['invalid']
self.assertRaisesRegexp(
TypeError,
"Encryption key information must be a dictionary.",
client._build_encryption_key_information,
*args
)
def test_build_mac_signature_key_information(self):
"""
Test that a MACSignatureKeyInformation struct can be built from a
dictionary.
"""
client = ProxyKmipClient()
# Test with no value
result = client._build_mac_signature_key_information(None)
self.assertEqual(None, result)
# Test with a value
result = client._build_mac_signature_key_information(
{
'unique_identifier': '1',
'cryptographic_parameters': {
'cryptographic_algorithm': enums.CryptographicAlgorithm.AES
}
}
)
self.assertIsInstance(result, obj.MACSignatureKeyInformation)
self.assertEqual('1', result.unique_identifier)
self.assertIsInstance(
result.cryptographic_parameters,
obj.CryptographicParameters
)
self.assertEqual(
enums.CryptographicAlgorithm.AES,
result.cryptographic_parameters.cryptographic_algorithm
)
def test_build_mac_signature_key_information_invalid(self):
"""
Test that the right error is raised when attempting to build
a MACSignatureKeyInformation struct with an invalid value.
"""
client = ProxyKmipClient()
args = ['invalid']
self.assertRaisesRegexp(
TypeError,
"MAC/signature key information must be a dictionary.",
client._build_mac_signature_key_information,
*args
)
def test_build_key_wrapping_specification(self):
"""
Test that a KeyWrappingSpecification can be built from a dictionary.
"""
client = ProxyKmipClient()
# Test with no value
result = client._build_key_wrapping_specification(None)
self.assertEqual(None, result)
# Test with a value
result = client._build_key_wrapping_specification(
{
'wrapping_method': enums.WrappingMethod.ENCRYPT,
'encryption_key_information': {
'unique_identifier': '1',
'cryptographic_parameters': {
'cryptographic_algorithm':
enums.CryptographicAlgorithm.AES
}
},
'mac_signature_key_information': {
'unique_identifier': '2',
'cryptographic_parameters': {
'padding_method': enums.PaddingMethod.PKCS5
}
},
'attribute_names': [
'Cryptographic Algorithm',
'Cryptographic Length'
],
'encoding_option': enums.EncodingOption.NO_ENCODING
}
)
self.assertIsInstance(result, obj.KeyWrappingSpecification)
self.assertIsInstance(
result.encryption_key_information,
obj.EncryptionKeyInformation
)
info = result.encryption_key_information
self.assertEqual('1', info.unique_identifier)
self.assertIsInstance(
info.cryptographic_parameters,
obj.CryptographicParameters
)
self.assertEqual(
enums.CryptographicAlgorithm.AES,
info.cryptographic_parameters.cryptographic_algorithm
)
self.assertIsInstance(
result.mac_signature_key_information,
obj.MACSignatureKeyInformation
)
info = result.mac_signature_key_information
self.assertEqual('2', info.unique_identifier)
self.assertIsInstance(
info.cryptographic_parameters,
obj.CryptographicParameters
)
self.assertEqual(
enums.PaddingMethod.PKCS5,
info.cryptographic_parameters.padding_method
)
self.assertIsInstance(result.attribute_names, list)
self.assertEqual(2, len(result.attribute_names))
self.assertIn('Cryptographic Algorithm', result.attribute_names)
self.assertIn('Cryptographic Length', result.attribute_names)
self.assertEqual(
enums.EncodingOption.NO_ENCODING,
result.encoding_option
)
def test_build_key_wrapping_specification_invalid(self):
"""
Test that the right error is raised when attempting to build
a KeyWrappingSpecification struct with an invalid value.
"""
client = ProxyKmipClient()
args = ['invalid']
self.assertRaisesRegexp(
TypeError,
"Key wrapping specification must be a dictionary.",
client._build_key_wrapping_specification,
*args
)