Add DeriveKey support to the clients

This change adds DeriveKey operation support to the KMIPProxy and
ProxyKmipClient clients, including unit tests to cover the new
functionality.
This commit is contained in:
Peter Hamilton 2017-08-04 13:25:35 -04:00
parent 278a54320c
commit 6071c938b6
4 changed files with 626 additions and 60 deletions

View File

@ -23,6 +23,7 @@ from kmip.core import objects as cobjects
from kmip.core.factories import attributes
from kmip.core.attributes import CryptographicParameters
from kmip.core.attributes import DerivationParameters
from kmip.pie import api
from kmip.pie import exceptions
@ -342,6 +343,138 @@ class ProxyKmipClient(api.KmipClient):
message = result.result_message.value
raise exceptions.KmipOperationFailure(status, reason, message)
def derive_key(self,
object_type,
unique_identifiers,
derivation_method,
derivation_parameters,
**kwargs):
"""
Derive a new key or secret data from existing managed objects.
Args:
object_type (ObjectType): An ObjectType enumeration specifying
what type of object to derive. Only SymmetricKeys and
SecretData can be specified. Required.
unique_identifiers (list): A list of strings specifying the
unique IDs of the existing managed objects to use for
derivation. Multiple objects can be specified to fit the
requirements of the given derivation method. Required.
derivation_method (DerivationMethod): A DerivationMethod
enumeration specifying how key derivation should be done.
Required.
derivation_parameters (dict): A dictionary containing various
settings for the key derivation process. See Note below.
Required.
**kwargs (various): A placeholder for object attributes that
should be set on the newly derived object. Currently
supported attributes include:
cryptographic_algorithm (enums.CryptographicAlgorithm)
cryptographic_length (int)
Returns:
string: The unique ID of the newly derived object.
Raises:
ClientConnectionNotOpen: if the client connection is unusable
KmipOperationFailure: if the operation result is a failure
TypeError: if the input arguments are invalid
Notes:
The derivation_parameters argument is a dictionary that can
contain the following key/value pairs:
Key | Value
---------------------------|---------------------------------------
'cryptographic_parameters' | A dictionary containing additional
| cryptographic settings. See the
| decrypt method for more information.
'initialization_vector' | Bytes to be used to initialize the key
| derivation function, if needed.
'derivation_data' | Bytes to be used as the basis for the
| key derivation process (e.g., the
| bytes to be encrypted, hashed, etc).
'salt' | Bytes to used as a salt value for the
| key derivation function, if needed.
| Usually used with PBKDF2.
'iteration_count' | An integer defining how many
| iterations should be used with the key
| derivation function, if needed.
| Usually used with PBKDF2.
"""
# Check input
if not isinstance(object_type, enums.ObjectType):
raise TypeError("Object type must be an ObjectType enumeration.")
if not isinstance(unique_identifiers, list):
raise TypeError("Unique identifiers must be a list of strings.")
else:
for unique_identifier in unique_identifiers:
if not isinstance(unique_identifier, six.string_types):
raise TypeError(
"Unique identifiers must be a list of strings."
)
if not isinstance(derivation_method, enums.DerivationMethod):
raise TypeError(
"Derivation method must be a DerivationMethod enumeration."
)
if not isinstance(derivation_parameters, dict):
raise TypeError("Derivation parameters must be a dictionary.")
# Verify that operations can be given at this time.
if not self._is_open:
raise exceptions.ClientConnectionNotOpen()
derivation_parameters = DerivationParameters(
cryptographic_parameters=self._build_cryptographic_parameters(
derivation_parameters.get('cryptographic_parameters')
),
initialization_vector=derivation_parameters.get(
'initialization_vector'
),
derivation_data=derivation_parameters.get('derivation_data'),
salt=derivation_parameters.get('salt'),
iteration_count=derivation_parameters.get('iteration_count')
)
# Handle object attributes
attributes = []
if kwargs.get('cryptographic_length'):
attributes.append(
self.attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_LENGTH,
kwargs.get('cryptographic_length')
)
)
if kwargs.get('cryptographic_algorithm'):
attributes.append(
self.attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
kwargs.get('cryptographic_algorithm')
)
)
template_attribute = cobjects.TemplateAttribute(
attributes=attributes
)
# Derive the new key/data and handle the results
result = self.proxy.derive_key(
object_type,
unique_identifiers,
derivation_method,
derivation_parameters,
template_attribute
)
status = result.get('result_status')
if status == enums.ResultStatus.SUCCESS:
return result.get('unique_identifier')
else:
raise exceptions.KmipOperationFailure(
status,
result.get('result_reason'),
result.get('result_message')
)
def locate(self, maximum_items=None, storage_status_mask=None,
object_group_member=None, attributes=None):
"""
@ -737,34 +870,8 @@ class ProxyKmipClient(api.KmipClient):
if not self._is_open:
raise exceptions.ClientConnectionNotOpen()
cryptographic_parameters = CryptographicParameters(
block_cipher_mode=cryptographic_parameters.get(
'block_cipher_mode'
),
padding_method=cryptographic_parameters.get('padding_method'),
hashing_algorithm=cryptographic_parameters.get(
'hashing_algorithm'
),
key_role_type=cryptographic_parameters.get('key_role_type'),
digital_signature_algorithm=cryptographic_parameters.get(
'digital_signature_algorithm'
),
cryptographic_algorithm=cryptographic_parameters.get(
'cryptographic_algorithm'
),
random_iv=cryptographic_parameters.get('random_iv'),
iv_length=cryptographic_parameters.get('iv_length'),
tag_length=cryptographic_parameters.get('tag_length'),
fixed_field_length=cryptographic_parameters.get(
'fixed_field_length'
),
invocation_field_length=cryptographic_parameters.get(
'invocation_field_length'
),
counter_length=cryptographic_parameters.get('counter_length'),
initial_counter_value=cryptographic_parameters.get(
'initial_counter_value'
)
cryptographic_parameters = self._build_cryptographic_parameters(
cryptographic_parameters
)
# Encrypt the provided data and handle the results
@ -871,34 +978,8 @@ class ProxyKmipClient(api.KmipClient):
if not self._is_open:
raise exceptions.ClientConnectionNotOpen()
cryptographic_parameters = CryptographicParameters(
block_cipher_mode=cryptographic_parameters.get(
'block_cipher_mode'
),
padding_method=cryptographic_parameters.get('padding_method'),
hashing_algorithm=cryptographic_parameters.get(
'hashing_algorithm'
),
key_role_type=cryptographic_parameters.get('key_role_type'),
digital_signature_algorithm=cryptographic_parameters.get(
'digital_signature_algorithm'
),
cryptographic_algorithm=cryptographic_parameters.get(
'cryptographic_algorithm'
),
random_iv=cryptographic_parameters.get('random_iv'),
iv_length=cryptographic_parameters.get('iv_length'),
tag_length=cryptographic_parameters.get('tag_length'),
fixed_field_length=cryptographic_parameters.get(
'fixed_field_length'
),
invocation_field_length=cryptographic_parameters.get(
'invocation_field_length'
),
counter_length=cryptographic_parameters.get('counter_length'),
initial_counter_value=cryptographic_parameters.get(
'initial_counter_value'
)
cryptographic_parameters = self._build_cryptographic_parameters(
cryptographic_parameters
)
# Decrypt the provided data and handle the results
@ -955,8 +1036,8 @@ class ProxyKmipClient(api.KmipClient):
if not self._is_open:
raise exceptions.ClientConnectionNotOpen()
parameters_attribute = CryptographicParameters(
cryptographic_algorithm=algorithm
parameters_attribute = self._build_cryptographic_parameters(
{'cryptographic_algorithm': algorithm}
)
# Get the message authentication code and handle the results
@ -993,6 +1074,42 @@ class ProxyKmipClient(api.KmipClient):
return [algorithm_attribute, length_attribute, mask_attribute]
def _build_cryptographic_parameters(self, value):
"""
Build a CryptographicParameters struct from a dictionary.
Args:
value (dict): A dictionary containing the key/value pairs for a
CryptographicParameters struct.
Returns:
CryptographicParameters: a CryptographicParameters struct
Raises:
TypeError: if the input argument is invalid
"""
if not isinstance(value, dict):
raise TypeError("Cryptographic parameters must be a dictionary.")
cryptographic_parameters = CryptographicParameters(
block_cipher_mode=value.get('block_cipher_mode'),
padding_method=value.get('padding_method'),
hashing_algorithm=value.get('hashing_algorithm'),
key_role_type=value.get('key_role_type'),
digital_signature_algorithm=value.get(
'digital_signature_algorithm'
),
cryptographic_algorithm=value.get('cryptographic_algorithm'),
random_iv=value.get('random_iv'),
iv_length=value.get('iv_length'),
tag_length=value.get('tag_length'),
fixed_field_length=value.get('fixed_field_length'),
invocation_field_length=value.get('invocation_field_length'),
counter_length=value.get('counter_length'),
initial_counter_value=value.get('initial_counter_value')
)
return cryptographic_parameters
def _build_common_attributes(self, operation_policy_name=None):
'''
Build a list of common attributes that are shared across

View File

@ -52,6 +52,7 @@ from kmip.core.messages.payloads import activate
from kmip.core.messages.payloads import create
from kmip.core.messages.payloads import create_key_pair
from kmip.core.messages.payloads import decrypt
from kmip.core.messages.payloads import derive_key
from kmip.core.messages.payloads import destroy
from kmip.core.messages.payloads import discover_versions
from kmip.core.messages.payloads import encrypt
@ -299,6 +300,88 @@ class KMIPProxy(KMIP):
"""
return self._activate(uuid, credential=credential)
def derive_key(self,
object_type,
unique_identifiers,
derivation_method,
derivation_parameters,
template_attribute,
credential=None):
"""
Derive a new key or secret data from an existing managed object.
Args:
object_type (ObjectType): An ObjectType enumeration specifying
what type of object to create. Required.
unique_identifiers (list): A list of strings specifying the unique
IDs of the existing managed objects to use for key derivation.
Required.
derivation_method (DerivationMethod): A DerivationMethod
enumeration specifying what key derivation method to use.
Required.
derivation_parameters (DerivationParameters): A
DerivationParameters struct containing the settings and
options to use for key derivation.
template_attribute (TemplateAttribute): A TemplateAttribute struct
containing the attributes to set on the newly derived object.
credential (Credential): A Credential struct containing a set of
authorization parameters for the operation. Optional, defaults
to None.
Returns:
dict: The results of the derivation operation, containing the
following key/value pairs:
Key | Value
---------------------|-----------------------------------------
'unique_identifier' | (string) The unique ID of the newly
| derived object.
'template_attribute' | (TemplateAttribute) A struct containing
| any attributes set on the newly derived
| object.
'result_status' | (ResultStatus) An enumeration indicating
| the status of the operation result.
'result_reason' | (ResultReason) An enumeration providing
| context for the result status.
'result_message' | (string) A message providing additional
| context for the operation result.
"""
operation = Operation(OperationEnum.DERIVE_KEY)
request_payload = derive_key.DeriveKeyRequestPayload(
object_type=object_type,
unique_identifiers=unique_identifiers,
derivation_method=derivation_method,
derivation_parameters=derivation_parameters,
template_attribute=template_attribute
)
batch_item = messages.RequestBatchItem(
operation=operation,
request_payload=request_payload
)
request = self._build_request_message(credential, [batch_item])
response = self._send_and_receive_message(request)
batch_item = response.batch_items[0]
payload = batch_item.response_payload
result = {}
if payload:
result['unique_identifier'] = payload.unique_identifier
result['template_attribute'] = payload.template_attribute
result['result_status'] = batch_item.result_status.value
try:
result['result_reason'] = batch_item.result_reason.value
except:
result['result_reason'] = batch_item.result_reason
try:
result['result_message'] = batch_item.result_message.value
except:
result['result_message'] = batch_item.result_message
return result
def get(self, uuid=None, key_format_type=None, key_compression_type=None,
key_wrapping_specification=None, credential=None):
return self._get(

View File

@ -1318,6 +1318,297 @@ class TestProxyKmipClient(testtools.TestCase):
self.assertEqual(opn.attribute_name.value, 'Operation Policy Name')
self.assertEqual(opn.attribute_value.value, 'test')
@mock.patch(
'kmip.pie.client.KMIPProxy', mock.MagicMock(spec_set=KMIPProxy)
)
def test_derive_key(self):
"""
Test that the client can derive a key.
"""
result = {
'unique_identifier': '1',
'result_status': enums.ResultStatus.SUCCESS
}
client = ProxyKmipClient()
client.open()
client.proxy.derive_key.return_value = result
derived_id = client.derive_key(
enums.ObjectType.SYMMETRIC_KEY,
['2', '3'],
enums.DerivationMethod.ENCRYPT,
{
'cryptographic_parameters': {
'cryptographic_algorithm':
enums.CryptographicAlgorithm.AES,
'block_cipher_mode': enums.BlockCipherMode.CBC,
'padding_method': enums.PaddingMethod.PKCS1v15
},
'initialization_vector': b'\x01\x02\x03\x04',
'derivation_data': b'\xFF\xFE\xFE\xFC'
},
cryptographic_length=128,
cryptographic_algorithm=enums.CryptographicAlgorithm.AES
)
self.assertEqual('1', derived_id)
@mock.patch(
'kmip.pie.client.KMIPProxy', mock.MagicMock(spec_set=KMIPProxy)
)
def test_derive_key_invalid_object_type(self):
"""
Test that the right error is raised when attempting to derive a key
with an invalid object type.
"""
client = ProxyKmipClient()
client.open()
client.proxy.derive_key.return_value = {}
args = [
'invalid',
['2', '3'],
enums.DerivationMethod.ENCRYPT,
{
'cryptographic_parameters': {
'cryptographic_algorithm':
enums.CryptographicAlgorithm.AES,
'block_cipher_mode': enums.BlockCipherMode.CBC,
'padding_method': enums.PaddingMethod.PKCS1v15
},
'initialization_vector': b'\x01\x02\x03\x04',
'derivation_data': b'\xFF\xFE\xFE\xFC'
}
]
kwargs = {
'cryptographic_length': 128,
'cryptographic_algorithm': enums.CryptographicAlgorithm.AES
}
self.assertRaisesRegexp(
TypeError,
"Object type must be an ObjectType enumeration.",
client.derive_key,
*args,
**kwargs
)
@mock.patch(
'kmip.pie.client.KMIPProxy', mock.MagicMock(spec_set=KMIPProxy)
)
def test_derive_key_invalid_unique_identifiers(self):
"""
Test that the right error is raised when attempting to derive a key
with an invalid list of unique identifiers.
"""
client = ProxyKmipClient()
client.open()
client.proxy.derive_key.return_value = {}
args = [
enums.ObjectType.SYMMETRIC_KEY,
'invalid',
enums.DerivationMethod.ENCRYPT,
{
'cryptographic_parameters': {
'cryptographic_algorithm':
enums.CryptographicAlgorithm.AES,
'block_cipher_mode': enums.BlockCipherMode.CBC,
'padding_method': enums.PaddingMethod.PKCS1v15
},
'initialization_vector': b'\x01\x02\x03\x04',
'derivation_data': b'\xFF\xFE\xFE\xFC'
}
]
kwargs = {
'cryptographic_length': 128,
'cryptographic_algorithm': enums.CryptographicAlgorithm.AES
}
self.assertRaisesRegexp(
TypeError,
"Unique identifiers must be a list of strings.",
client.derive_key,
*args,
**kwargs
)
args = [
enums.ObjectType.SYMMETRIC_KEY,
[2, 3],
enums.DerivationMethod.ENCRYPT,
{
'cryptographic_parameters': {
'cryptographic_algorithm':
enums.CryptographicAlgorithm.AES,
'block_cipher_mode': enums.BlockCipherMode.CBC,
'padding_method': enums.PaddingMethod.PKCS1v15
},
'initialization_vector': b'\x01\x02\x03\x04',
'derivation_data': b'\xFF\xFE\xFE\xFC'
}
]
self.assertRaisesRegexp(
TypeError,
"Unique identifiers must be a list of strings.",
client.derive_key,
*args,
**kwargs
)
@mock.patch(
'kmip.pie.client.KMIPProxy', mock.MagicMock(spec_set=KMIPProxy)
)
def test_derive_key_invalid_derivation_method(self):
"""
Test that the right error is raised when attempting to derive a key
with an invalid derivation method.
"""
client = ProxyKmipClient()
client.open()
client.proxy.derive_key.return_value = {}
args = [
enums.ObjectType.SYMMETRIC_KEY,
['2', '3'],
'invalid',
{
'cryptographic_parameters': {
'cryptographic_algorithm':
enums.CryptographicAlgorithm.AES,
'block_cipher_mode': enums.BlockCipherMode.CBC,
'padding_method': enums.PaddingMethod.PKCS1v15
},
'initialization_vector': b'\x01\x02\x03\x04',
'derivation_data': b'\xFF\xFE\xFE\xFC'
}
]
kwargs = {
'cryptographic_length': 128,
'cryptographic_algorithm': enums.CryptographicAlgorithm.AES
}
self.assertRaisesRegexp(
TypeError,
"Derivation method must be a DerivationMethod enumeration.",
client.derive_key,
*args,
**kwargs
)
@mock.patch(
'kmip.pie.client.KMIPProxy', mock.MagicMock(spec_set=KMIPProxy)
)
def test_derive_key_invalid_derivation_parameters(self):
"""
Test that the right error is raised when attempting to derive a key
with an invalid derivation parameters.
"""
client = ProxyKmipClient()
client.open()
client.proxy.derive_key.return_value = {}
args = [
enums.ObjectType.SYMMETRIC_KEY,
['2', '3'],
enums.DerivationMethod.ENCRYPT,
'invalid'
]
kwargs = {
'cryptographic_length': 128,
'cryptographic_algorithm': enums.CryptographicAlgorithm.AES
}
self.assertRaisesRegexp(
TypeError,
"Derivation parameters must be a dictionary.",
client.derive_key,
*args,
**kwargs
)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_derive_key_on_closed(self):
"""
Test that a ClientConnectionNotOpen exception is raised when trying
to derive a key on an unopened client connection.
"""
client = ProxyKmipClient()
args = [
enums.ObjectType.SYMMETRIC_KEY,
['2', '3'],
enums.DerivationMethod.ENCRYPT,
{
'cryptographic_parameters': {
'cryptographic_algorithm':
enums.CryptographicAlgorithm.AES,
'block_cipher_mode': enums.BlockCipherMode.CBC,
'padding_method': enums.PaddingMethod.PKCS1v15
},
'initialization_vector': b'\x01\x02\x03\x04',
'derivation_data': b'\xFF\xFE\xFE\xFC'
}
]
kwargs = {
'cryptographic_length': 128,
'cryptographic_algorithm': enums.CryptographicAlgorithm.AES
}
self.assertRaises(
ClientConnectionNotOpen,
client.derive_key,
*args,
**kwargs
)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_derive_key_on_operation_failure(self):
"""
Test that a KmipOperationFailure exception is raised when the
backend fails to derive a key.
"""
status = enums.ResultStatus.OPERATION_FAILED
reason = enums.ResultReason.GENERAL_FAILURE
message = "Test failure message"
result = {
'result_status': status,
'result_reason': reason,
'result_message': message
}
error_message = str(KmipOperationFailure(status, reason, message))
client = ProxyKmipClient()
client.open()
client.proxy.derive_key.return_value = result
args = [
enums.ObjectType.SYMMETRIC_KEY,
['2', '3'],
enums.DerivationMethod.ENCRYPT,
{
'cryptographic_parameters': {
'cryptographic_algorithm':
enums.CryptographicAlgorithm.AES,
'block_cipher_mode': enums.BlockCipherMode.CBC,
'padding_method': enums.PaddingMethod.PKCS1v15
},
'initialization_vector': b'\x01\x02\x03\x04',
'derivation_data': b'\xFF\xFE\xFE\xFC'
}
]
kwargs = {
'cryptographic_length': 128,
'cryptographic_algorithm': enums.CryptographicAlgorithm.AES
}
self.assertRaisesRegexp(
KmipOperationFailure,
error_message,
client.derive_key,
*args,
**kwargs
)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_encrypt(self):
@ -1972,3 +2263,18 @@ class TestProxyKmipClient(testtools.TestCase):
object_group_member, attributes]
self.assertRaises(
ClientConnectionNotOpen, client.locate, *args)
def test_build_cryptographic_parameters_invalid(self):
"""
Test that the right error is raised when attempting to build
cryptographic parameters with an invalid value.
"""
client = ProxyKmipClient()
args = ['invalid']
self.assertRaisesRegexp(
TypeError,
"Cryptographic parameters must be a dictionary.",
client._build_cryptographic_parameters,
*args
)

View File

@ -15,9 +15,9 @@
from testtools import TestCase
from kmip.core.attributes import PrivateKeyUniqueIdentifier
from kmip.core.attributes import CryptographicParameters
from kmip.core.attributes import DerivationParameters
from kmip.core.attributes import PrivateKeyUniqueIdentifier
from kmip.core import enums
from kmip.core.enums import AuthenticationSuite
@ -45,6 +45,7 @@ from kmip.core.messages.contents import ProtocolVersion
from kmip.core.messages.payloads.create_key_pair import \
CreateKeyPairRequestPayload, CreateKeyPairResponsePayload
from kmip.core.messages.payloads import decrypt
from kmip.core.messages.payloads import derive_key
from kmip.core.messages.payloads.discover_versions import \
DiscoverVersionsRequestPayload, DiscoverVersionsResponsePayload
from kmip.core.messages.payloads import encrypt
@ -60,6 +61,7 @@ from kmip.core.misc import QueryFunction
from kmip.core.misc import ServerInformation
from kmip.core.misc import VendorIdentification
from kmip.core.objects import TemplateAttribute
from kmip.core.objects import CommonTemplateAttribute
from kmip.core.objects import PrivateKeyTemplateAttribute
from kmip.core.objects import PublicKeyTemplateAttribute
@ -721,6 +723,64 @@ class TestKMIPClient(TestCase):
self.client._create_socket(sock)
self.assertEqual(ssl.SSLSocket, type(self.client.socket))
@mock.patch(
'kmip.services.kmip_client.KMIPProxy._build_request_message'
)
@mock.patch(
'kmip.services.kmip_client.KMIPProxy._send_and_receive_message'
)
def test_derive_key(self, send_mock, build_mock):
"""
Test that the client can derive a key.
"""
payload = derive_key.DeriveKeyResponsePayload(
unique_identifier='1',
)
batch_item = ResponseBatchItem(
operation=Operation(OperationEnum.DERIVE_KEY),
result_status=ResultStatus(ResultStatusEnum.SUCCESS),
response_payload=payload
)
response = ResponseMessage(batch_items=[batch_item])
build_mock.return_value = None
send_mock.return_value = response
result = self.client.derive_key(
object_type=enums.ObjectType.SYMMETRIC_KEY,
unique_identifiers=['2', '3'],
derivation_method=enums.DerivationMethod.ENCRYPT,
derivation_parameters=DerivationParameters(
cryptographic_parameters=CryptographicParameters(
block_cipher_mode=enums.BlockCipherMode.CBC,
padding_method=enums.PaddingMethod.PKCS1v15,
cryptographic_algorithm=enums.CryptographicAlgorithm.AES
),
initialization_vector=b'\x01\x02\x03\x04',
derivation_data=b'\xF1\xF2\xF3\xF4\xF5\xF6\xF7\xF8'
),
template_attribute=TemplateAttribute(
attributes=[
self.attr_factory.create_attribute(
'Cryptographic Length',
128
),
self.attr_factory.create_attribute(
'Cryptographic Algorithm',
enums.CryptographicAlgorithm.AES
)
]
),
)
self.assertEqual('1', result.get('unique_identifier'))
self.assertEqual(
ResultStatusEnum.SUCCESS,
result.get('result_status')
)
self.assertEqual(None, result.get('result_reason'))
self.assertEqual(None, result.get('result_message'))
@mock.patch(
'kmip.services.kmip_client.KMIPProxy._build_request_message'
)