mirror of https://github.com/OpenKMIP/PyKMIP.git
Merge pull request #317 from OpenKMIP/feat/add-derive-key-to-clients
Add DeriveKey support to the clients
This commit is contained in:
commit
357ac022c4
|
@ -23,6 +23,7 @@ from kmip.core import objects as cobjects
|
|||
from kmip.core.factories import attributes
|
||||
|
||||
from kmip.core.attributes import CryptographicParameters
|
||||
from kmip.core.attributes import DerivationParameters
|
||||
|
||||
from kmip.pie import api
|
||||
from kmip.pie import exceptions
|
||||
|
@ -342,6 +343,138 @@ class ProxyKmipClient(api.KmipClient):
|
|||
message = result.result_message.value
|
||||
raise exceptions.KmipOperationFailure(status, reason, message)
|
||||
|
||||
def derive_key(self,
|
||||
object_type,
|
||||
unique_identifiers,
|
||||
derivation_method,
|
||||
derivation_parameters,
|
||||
**kwargs):
|
||||
"""
|
||||
Derive a new key or secret data from existing managed objects.
|
||||
|
||||
Args:
|
||||
object_type (ObjectType): An ObjectType enumeration specifying
|
||||
what type of object to derive. Only SymmetricKeys and
|
||||
SecretData can be specified. Required.
|
||||
unique_identifiers (list): A list of strings specifying the
|
||||
unique IDs of the existing managed objects to use for
|
||||
derivation. Multiple objects can be specified to fit the
|
||||
requirements of the given derivation method. Required.
|
||||
derivation_method (DerivationMethod): A DerivationMethod
|
||||
enumeration specifying how key derivation should be done.
|
||||
Required.
|
||||
derivation_parameters (dict): A dictionary containing various
|
||||
settings for the key derivation process. See Note below.
|
||||
Required.
|
||||
**kwargs (various): A placeholder for object attributes that
|
||||
should be set on the newly derived object. Currently
|
||||
supported attributes include:
|
||||
cryptographic_algorithm (enums.CryptographicAlgorithm)
|
||||
cryptographic_length (int)
|
||||
|
||||
Returns:
|
||||
string: The unique ID of the newly derived object.
|
||||
|
||||
Raises:
|
||||
ClientConnectionNotOpen: if the client connection is unusable
|
||||
KmipOperationFailure: if the operation result is a failure
|
||||
TypeError: if the input arguments are invalid
|
||||
|
||||
Notes:
|
||||
The derivation_parameters argument is a dictionary that can
|
||||
contain the following key/value pairs:
|
||||
|
||||
Key | Value
|
||||
---------------------------|---------------------------------------
|
||||
'cryptographic_parameters' | A dictionary containing additional
|
||||
| cryptographic settings. See the
|
||||
| decrypt method for more information.
|
||||
'initialization_vector' | Bytes to be used to initialize the key
|
||||
| derivation function, if needed.
|
||||
'derivation_data' | Bytes to be used as the basis for the
|
||||
| key derivation process (e.g., the
|
||||
| bytes to be encrypted, hashed, etc).
|
||||
'salt' | Bytes to used as a salt value for the
|
||||
| key derivation function, if needed.
|
||||
| Usually used with PBKDF2.
|
||||
'iteration_count' | An integer defining how many
|
||||
| iterations should be used with the key
|
||||
| derivation function, if needed.
|
||||
| Usually used with PBKDF2.
|
||||
"""
|
||||
# Check input
|
||||
if not isinstance(object_type, enums.ObjectType):
|
||||
raise TypeError("Object type must be an ObjectType enumeration.")
|
||||
if not isinstance(unique_identifiers, list):
|
||||
raise TypeError("Unique identifiers must be a list of strings.")
|
||||
else:
|
||||
for unique_identifier in unique_identifiers:
|
||||
if not isinstance(unique_identifier, six.string_types):
|
||||
raise TypeError(
|
||||
"Unique identifiers must be a list of strings."
|
||||
)
|
||||
if not isinstance(derivation_method, enums.DerivationMethod):
|
||||
raise TypeError(
|
||||
"Derivation method must be a DerivationMethod enumeration."
|
||||
)
|
||||
if not isinstance(derivation_parameters, dict):
|
||||
raise TypeError("Derivation parameters must be a dictionary.")
|
||||
|
||||
# Verify that operations can be given at this time.
|
||||
if not self._is_open:
|
||||
raise exceptions.ClientConnectionNotOpen()
|
||||
|
||||
derivation_parameters = DerivationParameters(
|
||||
cryptographic_parameters=self._build_cryptographic_parameters(
|
||||
derivation_parameters.get('cryptographic_parameters')
|
||||
),
|
||||
initialization_vector=derivation_parameters.get(
|
||||
'initialization_vector'
|
||||
),
|
||||
derivation_data=derivation_parameters.get('derivation_data'),
|
||||
salt=derivation_parameters.get('salt'),
|
||||
iteration_count=derivation_parameters.get('iteration_count')
|
||||
)
|
||||
|
||||
# Handle object attributes
|
||||
attributes = []
|
||||
if kwargs.get('cryptographic_length'):
|
||||
attributes.append(
|
||||
self.attribute_factory.create_attribute(
|
||||
enums.AttributeType.CRYPTOGRAPHIC_LENGTH,
|
||||
kwargs.get('cryptographic_length')
|
||||
)
|
||||
)
|
||||
if kwargs.get('cryptographic_algorithm'):
|
||||
attributes.append(
|
||||
self.attribute_factory.create_attribute(
|
||||
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
|
||||
kwargs.get('cryptographic_algorithm')
|
||||
)
|
||||
)
|
||||
template_attribute = cobjects.TemplateAttribute(
|
||||
attributes=attributes
|
||||
)
|
||||
|
||||
# Derive the new key/data and handle the results
|
||||
result = self.proxy.derive_key(
|
||||
object_type,
|
||||
unique_identifiers,
|
||||
derivation_method,
|
||||
derivation_parameters,
|
||||
template_attribute
|
||||
)
|
||||
|
||||
status = result.get('result_status')
|
||||
if status == enums.ResultStatus.SUCCESS:
|
||||
return result.get('unique_identifier')
|
||||
else:
|
||||
raise exceptions.KmipOperationFailure(
|
||||
status,
|
||||
result.get('result_reason'),
|
||||
result.get('result_message')
|
||||
)
|
||||
|
||||
def locate(self, maximum_items=None, storage_status_mask=None,
|
||||
object_group_member=None, attributes=None):
|
||||
"""
|
||||
|
@ -737,34 +870,8 @@ class ProxyKmipClient(api.KmipClient):
|
|||
if not self._is_open:
|
||||
raise exceptions.ClientConnectionNotOpen()
|
||||
|
||||
cryptographic_parameters = CryptographicParameters(
|
||||
block_cipher_mode=cryptographic_parameters.get(
|
||||
'block_cipher_mode'
|
||||
),
|
||||
padding_method=cryptographic_parameters.get('padding_method'),
|
||||
hashing_algorithm=cryptographic_parameters.get(
|
||||
'hashing_algorithm'
|
||||
),
|
||||
key_role_type=cryptographic_parameters.get('key_role_type'),
|
||||
digital_signature_algorithm=cryptographic_parameters.get(
|
||||
'digital_signature_algorithm'
|
||||
),
|
||||
cryptographic_algorithm=cryptographic_parameters.get(
|
||||
'cryptographic_algorithm'
|
||||
),
|
||||
random_iv=cryptographic_parameters.get('random_iv'),
|
||||
iv_length=cryptographic_parameters.get('iv_length'),
|
||||
tag_length=cryptographic_parameters.get('tag_length'),
|
||||
fixed_field_length=cryptographic_parameters.get(
|
||||
'fixed_field_length'
|
||||
),
|
||||
invocation_field_length=cryptographic_parameters.get(
|
||||
'invocation_field_length'
|
||||
),
|
||||
counter_length=cryptographic_parameters.get('counter_length'),
|
||||
initial_counter_value=cryptographic_parameters.get(
|
||||
'initial_counter_value'
|
||||
)
|
||||
cryptographic_parameters = self._build_cryptographic_parameters(
|
||||
cryptographic_parameters
|
||||
)
|
||||
|
||||
# Encrypt the provided data and handle the results
|
||||
|
@ -871,34 +978,8 @@ class ProxyKmipClient(api.KmipClient):
|
|||
if not self._is_open:
|
||||
raise exceptions.ClientConnectionNotOpen()
|
||||
|
||||
cryptographic_parameters = CryptographicParameters(
|
||||
block_cipher_mode=cryptographic_parameters.get(
|
||||
'block_cipher_mode'
|
||||
),
|
||||
padding_method=cryptographic_parameters.get('padding_method'),
|
||||
hashing_algorithm=cryptographic_parameters.get(
|
||||
'hashing_algorithm'
|
||||
),
|
||||
key_role_type=cryptographic_parameters.get('key_role_type'),
|
||||
digital_signature_algorithm=cryptographic_parameters.get(
|
||||
'digital_signature_algorithm'
|
||||
),
|
||||
cryptographic_algorithm=cryptographic_parameters.get(
|
||||
'cryptographic_algorithm'
|
||||
),
|
||||
random_iv=cryptographic_parameters.get('random_iv'),
|
||||
iv_length=cryptographic_parameters.get('iv_length'),
|
||||
tag_length=cryptographic_parameters.get('tag_length'),
|
||||
fixed_field_length=cryptographic_parameters.get(
|
||||
'fixed_field_length'
|
||||
),
|
||||
invocation_field_length=cryptographic_parameters.get(
|
||||
'invocation_field_length'
|
||||
),
|
||||
counter_length=cryptographic_parameters.get('counter_length'),
|
||||
initial_counter_value=cryptographic_parameters.get(
|
||||
'initial_counter_value'
|
||||
)
|
||||
cryptographic_parameters = self._build_cryptographic_parameters(
|
||||
cryptographic_parameters
|
||||
)
|
||||
|
||||
# Decrypt the provided data and handle the results
|
||||
|
@ -955,8 +1036,8 @@ class ProxyKmipClient(api.KmipClient):
|
|||
if not self._is_open:
|
||||
raise exceptions.ClientConnectionNotOpen()
|
||||
|
||||
parameters_attribute = CryptographicParameters(
|
||||
cryptographic_algorithm=algorithm
|
||||
parameters_attribute = self._build_cryptographic_parameters(
|
||||
{'cryptographic_algorithm': algorithm}
|
||||
)
|
||||
|
||||
# Get the message authentication code and handle the results
|
||||
|
@ -993,6 +1074,42 @@ class ProxyKmipClient(api.KmipClient):
|
|||
|
||||
return [algorithm_attribute, length_attribute, mask_attribute]
|
||||
|
||||
def _build_cryptographic_parameters(self, value):
|
||||
"""
|
||||
Build a CryptographicParameters struct from a dictionary.
|
||||
|
||||
Args:
|
||||
value (dict): A dictionary containing the key/value pairs for a
|
||||
CryptographicParameters struct.
|
||||
|
||||
Returns:
|
||||
CryptographicParameters: a CryptographicParameters struct
|
||||
|
||||
Raises:
|
||||
TypeError: if the input argument is invalid
|
||||
"""
|
||||
if not isinstance(value, dict):
|
||||
raise TypeError("Cryptographic parameters must be a dictionary.")
|
||||
|
||||
cryptographic_parameters = CryptographicParameters(
|
||||
block_cipher_mode=value.get('block_cipher_mode'),
|
||||
padding_method=value.get('padding_method'),
|
||||
hashing_algorithm=value.get('hashing_algorithm'),
|
||||
key_role_type=value.get('key_role_type'),
|
||||
digital_signature_algorithm=value.get(
|
||||
'digital_signature_algorithm'
|
||||
),
|
||||
cryptographic_algorithm=value.get('cryptographic_algorithm'),
|
||||
random_iv=value.get('random_iv'),
|
||||
iv_length=value.get('iv_length'),
|
||||
tag_length=value.get('tag_length'),
|
||||
fixed_field_length=value.get('fixed_field_length'),
|
||||
invocation_field_length=value.get('invocation_field_length'),
|
||||
counter_length=value.get('counter_length'),
|
||||
initial_counter_value=value.get('initial_counter_value')
|
||||
)
|
||||
return cryptographic_parameters
|
||||
|
||||
def _build_common_attributes(self, operation_policy_name=None):
|
||||
'''
|
||||
Build a list of common attributes that are shared across
|
||||
|
|
|
@ -52,6 +52,7 @@ from kmip.core.messages.payloads import activate
|
|||
from kmip.core.messages.payloads import create
|
||||
from kmip.core.messages.payloads import create_key_pair
|
||||
from kmip.core.messages.payloads import decrypt
|
||||
from kmip.core.messages.payloads import derive_key
|
||||
from kmip.core.messages.payloads import destroy
|
||||
from kmip.core.messages.payloads import discover_versions
|
||||
from kmip.core.messages.payloads import encrypt
|
||||
|
@ -299,6 +300,88 @@ class KMIPProxy(KMIP):
|
|||
"""
|
||||
return self._activate(uuid, credential=credential)
|
||||
|
||||
def derive_key(self,
|
||||
object_type,
|
||||
unique_identifiers,
|
||||
derivation_method,
|
||||
derivation_parameters,
|
||||
template_attribute,
|
||||
credential=None):
|
||||
"""
|
||||
Derive a new key or secret data from an existing managed object.
|
||||
|
||||
Args:
|
||||
object_type (ObjectType): An ObjectType enumeration specifying
|
||||
what type of object to create. Required.
|
||||
unique_identifiers (list): A list of strings specifying the unique
|
||||
IDs of the existing managed objects to use for key derivation.
|
||||
Required.
|
||||
derivation_method (DerivationMethod): A DerivationMethod
|
||||
enumeration specifying what key derivation method to use.
|
||||
Required.
|
||||
derivation_parameters (DerivationParameters): A
|
||||
DerivationParameters struct containing the settings and
|
||||
options to use for key derivation.
|
||||
template_attribute (TemplateAttribute): A TemplateAttribute struct
|
||||
containing the attributes to set on the newly derived object.
|
||||
credential (Credential): A Credential struct containing a set of
|
||||
authorization parameters for the operation. Optional, defaults
|
||||
to None.
|
||||
|
||||
Returns:
|
||||
dict: The results of the derivation operation, containing the
|
||||
following key/value pairs:
|
||||
|
||||
Key | Value
|
||||
---------------------|-----------------------------------------
|
||||
'unique_identifier' | (string) The unique ID of the newly
|
||||
| derived object.
|
||||
'template_attribute' | (TemplateAttribute) A struct containing
|
||||
| any attributes set on the newly derived
|
||||
| object.
|
||||
'result_status' | (ResultStatus) An enumeration indicating
|
||||
| the status of the operation result.
|
||||
'result_reason' | (ResultReason) An enumeration providing
|
||||
| context for the result status.
|
||||
'result_message' | (string) A message providing additional
|
||||
| context for the operation result.
|
||||
"""
|
||||
operation = Operation(OperationEnum.DERIVE_KEY)
|
||||
request_payload = derive_key.DeriveKeyRequestPayload(
|
||||
object_type=object_type,
|
||||
unique_identifiers=unique_identifiers,
|
||||
derivation_method=derivation_method,
|
||||
derivation_parameters=derivation_parameters,
|
||||
template_attribute=template_attribute
|
||||
)
|
||||
batch_item = messages.RequestBatchItem(
|
||||
operation=operation,
|
||||
request_payload=request_payload
|
||||
)
|
||||
|
||||
request = self._build_request_message(credential, [batch_item])
|
||||
response = self._send_and_receive_message(request)
|
||||
batch_item = response.batch_items[0]
|
||||
payload = batch_item.response_payload
|
||||
|
||||
result = {}
|
||||
|
||||
if payload:
|
||||
result['unique_identifier'] = payload.unique_identifier
|
||||
result['template_attribute'] = payload.template_attribute
|
||||
|
||||
result['result_status'] = batch_item.result_status.value
|
||||
try:
|
||||
result['result_reason'] = batch_item.result_reason.value
|
||||
except:
|
||||
result['result_reason'] = batch_item.result_reason
|
||||
try:
|
||||
result['result_message'] = batch_item.result_message.value
|
||||
except:
|
||||
result['result_message'] = batch_item.result_message
|
||||
|
||||
return result
|
||||
|
||||
def get(self, uuid=None, key_format_type=None, key_compression_type=None,
|
||||
key_wrapping_specification=None, credential=None):
|
||||
return self._get(
|
||||
|
|
|
@ -1318,6 +1318,297 @@ class TestProxyKmipClient(testtools.TestCase):
|
|||
self.assertEqual(opn.attribute_name.value, 'Operation Policy Name')
|
||||
self.assertEqual(opn.attribute_value.value, 'test')
|
||||
|
||||
@mock.patch(
|
||||
'kmip.pie.client.KMIPProxy', mock.MagicMock(spec_set=KMIPProxy)
|
||||
)
|
||||
def test_derive_key(self):
|
||||
"""
|
||||
Test that the client can derive a key.
|
||||
"""
|
||||
result = {
|
||||
'unique_identifier': '1',
|
||||
'result_status': enums.ResultStatus.SUCCESS
|
||||
}
|
||||
|
||||
client = ProxyKmipClient()
|
||||
client.open()
|
||||
client.proxy.derive_key.return_value = result
|
||||
|
||||
derived_id = client.derive_key(
|
||||
enums.ObjectType.SYMMETRIC_KEY,
|
||||
['2', '3'],
|
||||
enums.DerivationMethod.ENCRYPT,
|
||||
{
|
||||
'cryptographic_parameters': {
|
||||
'cryptographic_algorithm':
|
||||
enums.CryptographicAlgorithm.AES,
|
||||
'block_cipher_mode': enums.BlockCipherMode.CBC,
|
||||
'padding_method': enums.PaddingMethod.PKCS1v15
|
||||
},
|
||||
'initialization_vector': b'\x01\x02\x03\x04',
|
||||
'derivation_data': b'\xFF\xFE\xFE\xFC'
|
||||
},
|
||||
cryptographic_length=128,
|
||||
cryptographic_algorithm=enums.CryptographicAlgorithm.AES
|
||||
)
|
||||
|
||||
self.assertEqual('1', derived_id)
|
||||
|
||||
@mock.patch(
|
||||
'kmip.pie.client.KMIPProxy', mock.MagicMock(spec_set=KMIPProxy)
|
||||
)
|
||||
def test_derive_key_invalid_object_type(self):
|
||||
"""
|
||||
Test that the right error is raised when attempting to derive a key
|
||||
with an invalid object type.
|
||||
"""
|
||||
client = ProxyKmipClient()
|
||||
client.open()
|
||||
client.proxy.derive_key.return_value = {}
|
||||
args = [
|
||||
'invalid',
|
||||
['2', '3'],
|
||||
enums.DerivationMethod.ENCRYPT,
|
||||
{
|
||||
'cryptographic_parameters': {
|
||||
'cryptographic_algorithm':
|
||||
enums.CryptographicAlgorithm.AES,
|
||||
'block_cipher_mode': enums.BlockCipherMode.CBC,
|
||||
'padding_method': enums.PaddingMethod.PKCS1v15
|
||||
},
|
||||
'initialization_vector': b'\x01\x02\x03\x04',
|
||||
'derivation_data': b'\xFF\xFE\xFE\xFC'
|
||||
}
|
||||
]
|
||||
kwargs = {
|
||||
'cryptographic_length': 128,
|
||||
'cryptographic_algorithm': enums.CryptographicAlgorithm.AES
|
||||
}
|
||||
|
||||
self.assertRaisesRegexp(
|
||||
TypeError,
|
||||
"Object type must be an ObjectType enumeration.",
|
||||
client.derive_key,
|
||||
*args,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
@mock.patch(
|
||||
'kmip.pie.client.KMIPProxy', mock.MagicMock(spec_set=KMIPProxy)
|
||||
)
|
||||
def test_derive_key_invalid_unique_identifiers(self):
|
||||
"""
|
||||
Test that the right error is raised when attempting to derive a key
|
||||
with an invalid list of unique identifiers.
|
||||
"""
|
||||
client = ProxyKmipClient()
|
||||
client.open()
|
||||
client.proxy.derive_key.return_value = {}
|
||||
args = [
|
||||
enums.ObjectType.SYMMETRIC_KEY,
|
||||
'invalid',
|
||||
enums.DerivationMethod.ENCRYPT,
|
||||
{
|
||||
'cryptographic_parameters': {
|
||||
'cryptographic_algorithm':
|
||||
enums.CryptographicAlgorithm.AES,
|
||||
'block_cipher_mode': enums.BlockCipherMode.CBC,
|
||||
'padding_method': enums.PaddingMethod.PKCS1v15
|
||||
},
|
||||
'initialization_vector': b'\x01\x02\x03\x04',
|
||||
'derivation_data': b'\xFF\xFE\xFE\xFC'
|
||||
}
|
||||
]
|
||||
kwargs = {
|
||||
'cryptographic_length': 128,
|
||||
'cryptographic_algorithm': enums.CryptographicAlgorithm.AES
|
||||
}
|
||||
|
||||
self.assertRaisesRegexp(
|
||||
TypeError,
|
||||
"Unique identifiers must be a list of strings.",
|
||||
client.derive_key,
|
||||
*args,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
args = [
|
||||
enums.ObjectType.SYMMETRIC_KEY,
|
||||
[2, 3],
|
||||
enums.DerivationMethod.ENCRYPT,
|
||||
{
|
||||
'cryptographic_parameters': {
|
||||
'cryptographic_algorithm':
|
||||
enums.CryptographicAlgorithm.AES,
|
||||
'block_cipher_mode': enums.BlockCipherMode.CBC,
|
||||
'padding_method': enums.PaddingMethod.PKCS1v15
|
||||
},
|
||||
'initialization_vector': b'\x01\x02\x03\x04',
|
||||
'derivation_data': b'\xFF\xFE\xFE\xFC'
|
||||
}
|
||||
]
|
||||
|
||||
self.assertRaisesRegexp(
|
||||
TypeError,
|
||||
"Unique identifiers must be a list of strings.",
|
||||
client.derive_key,
|
||||
*args,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
@mock.patch(
|
||||
'kmip.pie.client.KMIPProxy', mock.MagicMock(spec_set=KMIPProxy)
|
||||
)
|
||||
def test_derive_key_invalid_derivation_method(self):
|
||||
"""
|
||||
Test that the right error is raised when attempting to derive a key
|
||||
with an invalid derivation method.
|
||||
"""
|
||||
client = ProxyKmipClient()
|
||||
client.open()
|
||||
client.proxy.derive_key.return_value = {}
|
||||
args = [
|
||||
enums.ObjectType.SYMMETRIC_KEY,
|
||||
['2', '3'],
|
||||
'invalid',
|
||||
{
|
||||
'cryptographic_parameters': {
|
||||
'cryptographic_algorithm':
|
||||
enums.CryptographicAlgorithm.AES,
|
||||
'block_cipher_mode': enums.BlockCipherMode.CBC,
|
||||
'padding_method': enums.PaddingMethod.PKCS1v15
|
||||
},
|
||||
'initialization_vector': b'\x01\x02\x03\x04',
|
||||
'derivation_data': b'\xFF\xFE\xFE\xFC'
|
||||
}
|
||||
]
|
||||
kwargs = {
|
||||
'cryptographic_length': 128,
|
||||
'cryptographic_algorithm': enums.CryptographicAlgorithm.AES
|
||||
}
|
||||
|
||||
self.assertRaisesRegexp(
|
||||
TypeError,
|
||||
"Derivation method must be a DerivationMethod enumeration.",
|
||||
client.derive_key,
|
||||
*args,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
@mock.patch(
|
||||
'kmip.pie.client.KMIPProxy', mock.MagicMock(spec_set=KMIPProxy)
|
||||
)
|
||||
def test_derive_key_invalid_derivation_parameters(self):
|
||||
"""
|
||||
Test that the right error is raised when attempting to derive a key
|
||||
with an invalid derivation parameters.
|
||||
"""
|
||||
client = ProxyKmipClient()
|
||||
client.open()
|
||||
client.proxy.derive_key.return_value = {}
|
||||
args = [
|
||||
enums.ObjectType.SYMMETRIC_KEY,
|
||||
['2', '3'],
|
||||
enums.DerivationMethod.ENCRYPT,
|
||||
'invalid'
|
||||
]
|
||||
kwargs = {
|
||||
'cryptographic_length': 128,
|
||||
'cryptographic_algorithm': enums.CryptographicAlgorithm.AES
|
||||
}
|
||||
|
||||
self.assertRaisesRegexp(
|
||||
TypeError,
|
||||
"Derivation parameters must be a dictionary.",
|
||||
client.derive_key,
|
||||
*args,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||
mock.MagicMock(spec_set=KMIPProxy))
|
||||
def test_derive_key_on_closed(self):
|
||||
"""
|
||||
Test that a ClientConnectionNotOpen exception is raised when trying
|
||||
to derive a key on an unopened client connection.
|
||||
"""
|
||||
client = ProxyKmipClient()
|
||||
args = [
|
||||
enums.ObjectType.SYMMETRIC_KEY,
|
||||
['2', '3'],
|
||||
enums.DerivationMethod.ENCRYPT,
|
||||
{
|
||||
'cryptographic_parameters': {
|
||||
'cryptographic_algorithm':
|
||||
enums.CryptographicAlgorithm.AES,
|
||||
'block_cipher_mode': enums.BlockCipherMode.CBC,
|
||||
'padding_method': enums.PaddingMethod.PKCS1v15
|
||||
},
|
||||
'initialization_vector': b'\x01\x02\x03\x04',
|
||||
'derivation_data': b'\xFF\xFE\xFE\xFC'
|
||||
}
|
||||
]
|
||||
kwargs = {
|
||||
'cryptographic_length': 128,
|
||||
'cryptographic_algorithm': enums.CryptographicAlgorithm.AES
|
||||
}
|
||||
|
||||
self.assertRaises(
|
||||
ClientConnectionNotOpen,
|
||||
client.derive_key,
|
||||
*args,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||
mock.MagicMock(spec_set=KMIPProxy))
|
||||
def test_derive_key_on_operation_failure(self):
|
||||
"""
|
||||
Test that a KmipOperationFailure exception is raised when the
|
||||
backend fails to derive a key.
|
||||
"""
|
||||
status = enums.ResultStatus.OPERATION_FAILED
|
||||
reason = enums.ResultReason.GENERAL_FAILURE
|
||||
message = "Test failure message"
|
||||
|
||||
result = {
|
||||
'result_status': status,
|
||||
'result_reason': reason,
|
||||
'result_message': message
|
||||
}
|
||||
error_message = str(KmipOperationFailure(status, reason, message))
|
||||
|
||||
client = ProxyKmipClient()
|
||||
client.open()
|
||||
client.proxy.derive_key.return_value = result
|
||||
args = [
|
||||
enums.ObjectType.SYMMETRIC_KEY,
|
||||
['2', '3'],
|
||||
enums.DerivationMethod.ENCRYPT,
|
||||
{
|
||||
'cryptographic_parameters': {
|
||||
'cryptographic_algorithm':
|
||||
enums.CryptographicAlgorithm.AES,
|
||||
'block_cipher_mode': enums.BlockCipherMode.CBC,
|
||||
'padding_method': enums.PaddingMethod.PKCS1v15
|
||||
},
|
||||
'initialization_vector': b'\x01\x02\x03\x04',
|
||||
'derivation_data': b'\xFF\xFE\xFE\xFC'
|
||||
}
|
||||
]
|
||||
kwargs = {
|
||||
'cryptographic_length': 128,
|
||||
'cryptographic_algorithm': enums.CryptographicAlgorithm.AES
|
||||
}
|
||||
|
||||
self.assertRaisesRegexp(
|
||||
KmipOperationFailure,
|
||||
error_message,
|
||||
client.derive_key,
|
||||
*args,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||
mock.MagicMock(spec_set=KMIPProxy))
|
||||
def test_encrypt(self):
|
||||
|
@ -1972,3 +2263,18 @@ class TestProxyKmipClient(testtools.TestCase):
|
|||
object_group_member, attributes]
|
||||
self.assertRaises(
|
||||
ClientConnectionNotOpen, client.locate, *args)
|
||||
|
||||
def test_build_cryptographic_parameters_invalid(self):
|
||||
"""
|
||||
Test that the right error is raised when attempting to build
|
||||
cryptographic parameters with an invalid value.
|
||||
"""
|
||||
client = ProxyKmipClient()
|
||||
args = ['invalid']
|
||||
|
||||
self.assertRaisesRegexp(
|
||||
TypeError,
|
||||
"Cryptographic parameters must be a dictionary.",
|
||||
client._build_cryptographic_parameters,
|
||||
*args
|
||||
)
|
||||
|
|
|
@ -15,9 +15,9 @@
|
|||
|
||||
from testtools import TestCase
|
||||
|
||||
from kmip.core.attributes import PrivateKeyUniqueIdentifier
|
||||
from kmip.core.attributes import CryptographicParameters
|
||||
|
||||
from kmip.core.attributes import DerivationParameters
|
||||
from kmip.core.attributes import PrivateKeyUniqueIdentifier
|
||||
|
||||
from kmip.core import enums
|
||||
from kmip.core.enums import AuthenticationSuite
|
||||
|
@ -45,6 +45,7 @@ from kmip.core.messages.contents import ProtocolVersion
|
|||
from kmip.core.messages.payloads.create_key_pair import \
|
||||
CreateKeyPairRequestPayload, CreateKeyPairResponsePayload
|
||||
from kmip.core.messages.payloads import decrypt
|
||||
from kmip.core.messages.payloads import derive_key
|
||||
from kmip.core.messages.payloads.discover_versions import \
|
||||
DiscoverVersionsRequestPayload, DiscoverVersionsResponsePayload
|
||||
from kmip.core.messages.payloads import encrypt
|
||||
|
@ -60,6 +61,7 @@ from kmip.core.misc import QueryFunction
|
|||
from kmip.core.misc import ServerInformation
|
||||
from kmip.core.misc import VendorIdentification
|
||||
|
||||
from kmip.core.objects import TemplateAttribute
|
||||
from kmip.core.objects import CommonTemplateAttribute
|
||||
from kmip.core.objects import PrivateKeyTemplateAttribute
|
||||
from kmip.core.objects import PublicKeyTemplateAttribute
|
||||
|
@ -721,6 +723,64 @@ class TestKMIPClient(TestCase):
|
|||
self.client._create_socket(sock)
|
||||
self.assertEqual(ssl.SSLSocket, type(self.client.socket))
|
||||
|
||||
@mock.patch(
|
||||
'kmip.services.kmip_client.KMIPProxy._build_request_message'
|
||||
)
|
||||
@mock.patch(
|
||||
'kmip.services.kmip_client.KMIPProxy._send_and_receive_message'
|
||||
)
|
||||
def test_derive_key(self, send_mock, build_mock):
|
||||
"""
|
||||
Test that the client can derive a key.
|
||||
"""
|
||||
payload = derive_key.DeriveKeyResponsePayload(
|
||||
unique_identifier='1',
|
||||
)
|
||||
batch_item = ResponseBatchItem(
|
||||
operation=Operation(OperationEnum.DERIVE_KEY),
|
||||
result_status=ResultStatus(ResultStatusEnum.SUCCESS),
|
||||
response_payload=payload
|
||||
)
|
||||
response = ResponseMessage(batch_items=[batch_item])
|
||||
|
||||
build_mock.return_value = None
|
||||
send_mock.return_value = response
|
||||
|
||||
result = self.client.derive_key(
|
||||
object_type=enums.ObjectType.SYMMETRIC_KEY,
|
||||
unique_identifiers=['2', '3'],
|
||||
derivation_method=enums.DerivationMethod.ENCRYPT,
|
||||
derivation_parameters=DerivationParameters(
|
||||
cryptographic_parameters=CryptographicParameters(
|
||||
block_cipher_mode=enums.BlockCipherMode.CBC,
|
||||
padding_method=enums.PaddingMethod.PKCS1v15,
|
||||
cryptographic_algorithm=enums.CryptographicAlgorithm.AES
|
||||
),
|
||||
initialization_vector=b'\x01\x02\x03\x04',
|
||||
derivation_data=b'\xF1\xF2\xF3\xF4\xF5\xF6\xF7\xF8'
|
||||
),
|
||||
template_attribute=TemplateAttribute(
|
||||
attributes=[
|
||||
self.attr_factory.create_attribute(
|
||||
'Cryptographic Length',
|
||||
128
|
||||
),
|
||||
self.attr_factory.create_attribute(
|
||||
'Cryptographic Algorithm',
|
||||
enums.CryptographicAlgorithm.AES
|
||||
)
|
||||
]
|
||||
),
|
||||
)
|
||||
|
||||
self.assertEqual('1', result.get('unique_identifier'))
|
||||
self.assertEqual(
|
||||
ResultStatusEnum.SUCCESS,
|
||||
result.get('result_status')
|
||||
)
|
||||
self.assertEqual(None, result.get('result_reason'))
|
||||
self.assertEqual(None, result.get('result_message'))
|
||||
|
||||
@mock.patch(
|
||||
'kmip.services.kmip_client.KMIPProxy._build_request_message'
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue