Merge pull request #346 from OpenKMIP/feat/update-client-get

Updating the ProxyKmipClient to support getting wrapped keys
This commit is contained in:
Peter Hamilton 2017-10-02 01:16:01 -04:00 committed by GitHub
commit 7961d8820c
3 changed files with 343 additions and 8 deletions

View File

@ -559,12 +559,15 @@ class ProxyKmipClient(api.KmipClient):
raise exceptions.KmipOperationFailure(status, reason, message)
@is_connected
def get(self, uid=None):
def get(self, uid=None, key_wrapping_specification=None):
"""
Get a managed object from a KMIP appliance.
Args:
uid (string): The unique ID of the managed object to retrieve.
key_wrapping_specification (dict): A dictionary containing various
settings to be used when wrapping the key during retrieval.
See Note below. Optional, defaults to None.
Returns:
ManagedObject: The retrieved managed object object.
@ -573,14 +576,48 @@ class ProxyKmipClient(api.KmipClient):
ClientConnectionNotOpen: if the client connection is unusable
KmipOperationFailure: if the operation result is a failure
TypeError: if the input argument is invalid
Notes:
The derivation_parameters argument is a dictionary that can
contain the following key/value pairs:
Key | Value
--------------------------------|---------------------------------
'wrapping_method' | A WrappingMethod enumeration
| that specifies how the object
| should be wrapped.
'encryption_key_information' | A dictionary containing the ID
| of the wrapping key and
| associated cryptographic
| parameters.
'mac_signature_key_information' | A dictionary containing the ID
| of the wrapping key and
| associated cryptographic
| parameters.
'attribute_names' | A list of strings representing
| the names of attributes that
| should be included with the
| wrapped object.
'encoding_option' | An EncodingOption enumeration
| that specifies the encoding of
| the object before it is wrapped.
"""
# Check input
if uid is not None:
if not isinstance(uid, six.string_types):
raise TypeError("uid must be a string")
if key_wrapping_specification is not None:
if not isinstance(key_wrapping_specification, dict):
raise TypeError(
"Key wrapping specification must be a dictionary."
)
spec = self._build_key_wrapping_specification(
key_wrapping_specification
)
# Get the managed object and handle the results
result = self.proxy.get(uid)
result = self.proxy.get(uid, key_wrapping_specification=spec)
status = result.result_status.value
if status == enums.ResultStatus.SUCCESS:
@ -1226,6 +1263,103 @@ class ProxyKmipClient(api.KmipClient):
)
return cryptographic_parameters
def _build_encryption_key_information(self, value):
"""
Build an EncryptionKeyInformation struct from a dictionary.
Args:
value (dict): A dictionary containing the key/value pairs for a
EncryptionKeyInformation struct.
Returns:
EncryptionKeyInformation: an EncryptionKeyInformation struct
Raises:
TypeError: if the input argument is invalid
"""
if value is None:
return None
if not isinstance(value, dict):
raise TypeError("Encryption key information must be a dictionary.")
cryptographic_parameters = value.get('cryptographic_parameters')
if cryptographic_parameters:
cryptographic_parameters = self._build_cryptographic_parameters(
cryptographic_parameters
)
encryption_key_information = cobjects.EncryptionKeyInformation(
unique_identifier=value.get('unique_identifier'),
cryptographic_parameters=cryptographic_parameters
)
return encryption_key_information
def _build_mac_signature_key_information(self, value):
"""
Build an MACSignatureKeyInformation struct from a dictionary.
Args:
value (dict): A dictionary containing the key/value pairs for a
MACSignatureKeyInformation struct.
Returns:
MACSignatureInformation: a MACSignatureKeyInformation struct
Raises:
TypeError: if the input argument is invalid
"""
if value is None:
return None
if not isinstance(value, dict):
raise TypeError(
"MAC/signature key information must be a dictionary."
)
cryptographic_parameters = value.get('cryptographic_parameters')
if cryptographic_parameters:
cryptographic_parameters = self._build_cryptographic_parameters(
cryptographic_parameters
)
mac_signature_key_information = cobjects.MACSignatureKeyInformation(
unique_identifier=value.get('unique_identifier'),
cryptographic_parameters=cryptographic_parameters
)
return mac_signature_key_information
def _build_key_wrapping_specification(self, value):
"""
Build a KeyWrappingSpecification struct from a dictionary.
Args:
value (dict): A dictionary containing the key/value pairs for a
KeyWrappingSpecification struct.
Returns:
KeyWrappingSpecification: a KeyWrappingSpecification struct
Raises:
TypeError: if the input argument is invalid
"""
if value is None:
return None
if not isinstance(value, dict):
raise TypeError("Key wrapping specification must be a dictionary.")
encryption_key_info = self._build_encryption_key_information(
value.get('encryption_key_information')
)
mac_signature_key_info = self._build_mac_signature_key_information(
value.get('mac_signature_key_information')
)
key_wrapping_specification = cobjects.KeyWrappingSpecification(
wrapping_method=value.get('wrapping_method'),
encryption_key_information=encryption_key_info,
mac_signature_key_information=mac_signature_key_info,
attribute_names=value.get('attribute_names'),
encoding_option=value.get('encoding_option')
)
return key_wrapping_specification
def _build_common_attributes(self, operation_policy_name=None):
'''
Build a list of common attributes that are shared across

View File

@ -1059,18 +1059,14 @@ class KMIPProxy(KMIP):
credential=None):
operation = Operation(OperationEnum.GET)
kws = None
if key_format_type is not None:
key_format_type = key_format_type.value
if key_wrapping_specification is not None:
kws = objects.KeyWrappingSpecification(key_wrapping_specification)
req_pl = payloads.GetRequestPayload(
unique_identifier=unique_identifier,
key_format_type=key_format_type,
key_compression_type=key_compression_type,
key_wrapping_specification=kws
key_wrapping_specification=key_wrapping_specification
)
batch_item = messages.RequestBatchItem(operation=operation,

View File

@ -743,7 +743,9 @@ class TestProxyKmipClient(testtools.TestCase):
result = client.get('aaaaaaaa-1111-2222-3333-ffffffffffff')
client.proxy.get.assert_called_with(
'aaaaaaaa-1111-2222-3333-ffffffffffff')
'aaaaaaaa-1111-2222-3333-ffffffffffff',
key_wrapping_specification=None
)
self.assertIsInstance(result, objects.SymmetricKey)
self.assertEqual(result, secret)
@ -758,6 +760,24 @@ class TestProxyKmipClient(testtools.TestCase):
with ProxyKmipClient() as client:
self.assertRaises(TypeError, client.get, *args)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_get_on_invalid_key_wrapping_specification(self):
"""
Test that a TypeError exception is raised when trying to retrieve a
secret with an invalid key wrapping specification.
"""
args = ['1']
kwargs = {'key_wrapping_specification': 'invalid'}
with ProxyKmipClient() as client:
self.assertRaisesRegexp(
TypeError,
"Key wrapping specification must be a dictionary.",
client.get,
*args,
**kwargs
)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_get_on_closed(self):
@ -2594,3 +2614,188 @@ class TestProxyKmipClient(testtools.TestCase):
client._build_cryptographic_parameters,
*args
)
def test_build_encryption_key_information(self):
"""
Test that an EncryptionKeyInformation struct can be built from a
dictionary.
"""
client = ProxyKmipClient()
# Test with no value
result = client._build_encryption_key_information(None)
self.assertEqual(None, result)
# Test with a value
result = client._build_encryption_key_information(
{
'unique_identifier': 'test',
'cryptographic_parameters': {
'block_cipher_mode': enums.BlockCipherMode.CBC
}
}
)
self.assertIsInstance(result, obj.EncryptionKeyInformation)
self.assertEqual('test', result.unique_identifier)
self.assertIsInstance(
result.cryptographic_parameters,
obj.CryptographicParameters
)
self.assertEqual(
enums.BlockCipherMode.CBC,
result.cryptographic_parameters.block_cipher_mode
)
def test_build_encryption_key_information_invalid(self):
"""
Test that the right error is raised when attempting to build
an EncryptionKeyInformation struct with an invalid value.
"""
client = ProxyKmipClient()
args = ['invalid']
self.assertRaisesRegexp(
TypeError,
"Encryption key information must be a dictionary.",
client._build_encryption_key_information,
*args
)
def test_build_mac_signature_key_information(self):
"""
Test that a MACSignatureKeyInformation struct can be built from a
dictionary.
"""
client = ProxyKmipClient()
# Test with no value
result = client._build_mac_signature_key_information(None)
self.assertEqual(None, result)
# Test with a value
result = client._build_mac_signature_key_information(
{
'unique_identifier': '1',
'cryptographic_parameters': {
'cryptographic_algorithm': enums.CryptographicAlgorithm.AES
}
}
)
self.assertIsInstance(result, obj.MACSignatureKeyInformation)
self.assertEqual('1', result.unique_identifier)
self.assertIsInstance(
result.cryptographic_parameters,
obj.CryptographicParameters
)
self.assertEqual(
enums.CryptographicAlgorithm.AES,
result.cryptographic_parameters.cryptographic_algorithm
)
def test_build_mac_signature_key_information_invalid(self):
"""
Test that the right error is raised when attempting to build
a MACSignatureKeyInformation struct with an invalid value.
"""
client = ProxyKmipClient()
args = ['invalid']
self.assertRaisesRegexp(
TypeError,
"MAC/signature key information must be a dictionary.",
client._build_mac_signature_key_information,
*args
)
def test_build_key_wrapping_specification(self):
"""
Test that a KeyWrappingSpecification can be built from a dictionary.
"""
client = ProxyKmipClient()
# Test with no value
result = client._build_key_wrapping_specification(None)
self.assertEqual(None, result)
# Test with a value
result = client._build_key_wrapping_specification(
{
'wrapping_method': enums.WrappingMethod.ENCRYPT,
'encryption_key_information': {
'unique_identifier': '1',
'cryptographic_parameters': {
'cryptographic_algorithm':
enums.CryptographicAlgorithm.AES
}
},
'mac_signature_key_information': {
'unique_identifier': '2',
'cryptographic_parameters': {
'padding_method': enums.PaddingMethod.PKCS5
}
},
'attribute_names': [
'Cryptographic Algorithm',
'Cryptographic Length'
],
'encoding_option': enums.EncodingOption.NO_ENCODING
}
)
self.assertIsInstance(result, obj.KeyWrappingSpecification)
self.assertIsInstance(
result.encryption_key_information,
obj.EncryptionKeyInformation
)
info = result.encryption_key_information
self.assertEqual('1', info.unique_identifier)
self.assertIsInstance(
info.cryptographic_parameters,
obj.CryptographicParameters
)
self.assertEqual(
enums.CryptographicAlgorithm.AES,
info.cryptographic_parameters.cryptographic_algorithm
)
self.assertIsInstance(
result.mac_signature_key_information,
obj.MACSignatureKeyInformation
)
info = result.mac_signature_key_information
self.assertEqual('2', info.unique_identifier)
self.assertIsInstance(
info.cryptographic_parameters,
obj.CryptographicParameters
)
self.assertEqual(
enums.PaddingMethod.PKCS5,
info.cryptographic_parameters.padding_method
)
self.assertIsInstance(result.attribute_names, list)
self.assertEqual(2, len(result.attribute_names))
self.assertIn('Cryptographic Algorithm', result.attribute_names)
self.assertIn('Cryptographic Length', result.attribute_names)
self.assertEqual(
enums.EncodingOption.NO_ENCODING,
result.encoding_option
)
def test_build_key_wrapping_specification_invalid(self):
"""
Test that the right error is raised when attempting to build
a KeyWrappingSpecification struct with an invalid value.
"""
client = ProxyKmipClient()
args = ['invalid']
self.assertRaisesRegexp(
TypeError,
"Key wrapping specification must be a dictionary.",
client._build_key_wrapping_specification,
*args
)