Merge pull request #422 from OpenKMIP/feat/add-rekey-pie

Add Rekey support to the ProxyKmipClient
This commit is contained in:
Peter Hamilton 2018-04-16 13:32:50 -04:00 committed by GitHub
commit f1ccdf9c5a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 200 additions and 0 deletions

View File

@ -389,6 +389,94 @@ class ProxyKmipClient(object):
message = result.result_message.value message = result.result_message.value
raise exceptions.KmipOperationFailure(status, reason, message) raise exceptions.KmipOperationFailure(status, reason, message)
@is_connected
def rekey(self,
uid=None,
offset=None,
**kwargs):
"""
Rekey an existing key.
Args:
uid (string): The unique ID of the symmetric key to rekey.
Optional, defaults to None.
offset (int): The time delta, in seconds, between the new key's
initialization date and activation date. Optional, defaults
to None.
**kwargs (various): A placeholder for object attributes that
should be set on the newly rekeyed key. Currently
supported attributes include:
activation_date (int)
process_start_date (int)
protect_stop_date (int)
deactivation_date (int)
Returns:
string: The unique ID of the newly rekeyed key.
Raises:
ClientConnectionNotOpen: if the client connection is unusable
KmipOperationFailure: if the operation result is a failure
TypeError: if the input arguments are invalid
"""
if uid is not None:
if not isinstance(uid, six.string_types):
raise TypeError("The unique identifier must be a string.")
if offset is not None:
if not isinstance(offset, six.integer_types):
raise TypeError("The offset must be an integer.")
# TODO (peter-hamilton) Unify attribute handling across operations
attributes = []
if kwargs.get('activation_date'):
attributes.append(
self.attribute_factory.create_attribute(
enums.AttributeType.ACTIVATION_DATE,
kwargs.get('activation_date')
)
)
if kwargs.get('process_start_date'):
attributes.append(
self.attribute_factory.create_attribute(
enums.AttributeType.PROCESS_START_DATE,
kwargs.get('process_start_date')
)
)
if kwargs.get('protect_stop_date'):
attributes.append(
self.attribute_factory.create_attribute(
enums.AttributeType.PROTECT_STOP_DATE,
kwargs.get('protect_stop_date')
)
)
if kwargs.get('deactivation_date'):
attributes.append(
self.attribute_factory.create_attribute(
enums.AttributeType.DEACTIVATION_DATE,
kwargs.get('deactivation_date')
)
)
template_attribute = cobjects.TemplateAttribute(
attributes=attributes
)
# Derive the new key/data and handle the results
result = self.proxy.rekey(
uuid=uid,
offset=offset,
template_attribute=template_attribute
)
status = result.get('result_status')
if status == enums.ResultStatus.SUCCESS:
return result.get('unique_identifier')
else:
raise exceptions.KmipOperationFailure(
status,
result.get('result_reason'),
result.get('result_message')
)
@is_connected @is_connected
def derive_key(self, def derive_key(self,
object_type, object_type,

View File

@ -702,6 +702,118 @@ class TestProxyKmipClient(testtools.TestCase):
KmipOperationFailure, error_msg, KmipOperationFailure, error_msg,
client.create_key_pair, *args) client.create_key_pair, *args)
@mock.patch(
'kmip.pie.client.KMIPProxy', mock.MagicMock(spec_set=KMIPProxy)
)
def test_rekey(self):
"""
Test that the client can rekey an object.
"""
result = {
'unique_identifier': '2',
'result_status': enums.ResultStatus.SUCCESS
}
client = ProxyKmipClient()
client.open()
client.proxy.rekey.return_value = result
checked_id = client.rekey(
uid='1',
offset=0,
activation_date=1000000,
process_start_date=1000001,
protect_stop_date=1000002,
deactivation_date=1000003
)
self.assertEqual('2', checked_id)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_rekey_on_invalid_unique_identifier(self):
"""
Test that a TypeError exception is raised when trying to rekey an
object with an invalid unique identifier.
"""
kwargs = {'uid': 0}
with ProxyKmipClient() as client:
self.assertRaisesRegexp(
TypeError,
"The unique identifier must be a string.",
client.rekey,
**kwargs
)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_rekey_on_invalid_offset(self):
"""
Test that a TypeError exception is raised when trying to rekey an
object with an invalid offset.
"""
kwargs = {'offset': 'invalid'}
with ProxyKmipClient() as client:
self.assertRaisesRegexp(
TypeError,
"The offset must be an integer.",
client.rekey,
**kwargs
)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_rekey_on_closed(self):
"""
Test that a ClientConnectionNotOpen exception is raised when trying
to rekey an object on an unopened client connection.
"""
client = ProxyKmipClient()
kwargs = {
'uid': '1',
'offset': 10
}
self.assertRaises(
ClientConnectionNotOpen,
client.rekey,
**kwargs
)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_rekey_on_operation_failure(self):
"""
Test that a KmipOperationFailure exception is raised when the
backend fails to rekey a key.
"""
status = enums.ResultStatus.OPERATION_FAILED
reason = enums.ResultReason.GENERAL_FAILURE
message = "Test failure message"
result = {
'result_status': status,
'result_reason': reason,
'result_message': message
}
error_message = str(KmipOperationFailure(status, reason, message))
client = ProxyKmipClient()
client.open()
client.proxy.rekey.return_value = result
kwargs = {
'uid': '1',
'offset': 1,
'deactivation_date': 10000
}
self.assertRaisesRegexp(
KmipOperationFailure,
error_message,
client.rekey,
**kwargs
)
@mock.patch( @mock.patch(
'kmip.pie.client.KMIPProxy', mock.MagicMock(spec_set=KMIPProxy) 'kmip.pie.client.KMIPProxy', mock.MagicMock(spec_set=KMIPProxy)
) )