Merge pull request #421 from OpenKMIP/feat/add-rekey-client

Add Rekey support to the KMIPProxy client
This commit is contained in:
Peter Hamilton 2018-04-15 22:41:31 -04:00 committed by GitHub
commit be436ba519
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 178 additions and 0 deletions

View File

@ -283,6 +283,83 @@ class KMIPProxy:
"""
return self._activate(uuid, credential=credential)
def rekey(self,
uuid=None,
offset=None,
template_attribute=None,
credential=None):
"""
Check object usage according to specific constraints.
Args:
uuid (string): The unique identifier of a managed cryptographic
object that should be checked. Optional, defaults to None.
offset (int): An integer specifying, in seconds, the difference
between the rekeyed objects initialization date and activation
date. Optional, defaults to None.
template_attribute (TemplateAttribute): A TemplateAttribute struct
containing the attributes to set on the newly rekeyed object.
Optional, defaults to None.
credential (Credential): A Credential struct containing a set of
authorization parameters for the operation. Optional, defaults
to None.
Returns:
dict: The results of the check operation, containing the following
key/value pairs:
Key | Value
---------------------------|-----------------------------------
'unique_identifier' | (string) The unique ID of the
| checked cryptographic object.
'template_attribute' | (TemplateAttribute) A struct
| containing attribute set by the
| server. Optional.
'result_status' | (ResultStatus) An enumeration
| indicating the status of the
| operation result.
'result_reason' | (ResultReason) An enumeration
| providing context for the result
| status.
'result_message' | (string) A message providing
| additional context for the
| operation result.
"""
operation = Operation(OperationEnum.REKEY)
request_payload = payloads.RekeyRequestPayload(
unique_identifier=uuid,
offset=offset,
template_attribute=template_attribute
)
batch_item = messages.RequestBatchItem(
operation=operation,
request_payload=request_payload
)
request = self._build_request_message(credential, [batch_item])
response = self._send_and_receive_message(request)
batch_item = response.batch_items[0]
payload = batch_item.response_payload
result = {}
if payload:
result['unique_identifier'] = payload.unique_identifier
if payload.template_attribute is not None:
result['template_attribute'] = payload.template_attribute
result['result_status'] = batch_item.result_status.value
try:
result['result_reason'] = batch_item.result_reason.value
except Exception:
result['result_reason'] = batch_item.result_reason
try:
result['result_message'] = batch_item.result_message.value
except Exception:
result['result_message'] = batch_item.result_message
return result
def derive_key(self,
object_type,
unique_identifiers,

View File

@ -49,10 +49,12 @@ from kmip.core.misc import QueryFunction
from kmip.core.misc import ServerInformation
from kmip.core.misc import VendorIdentification
from kmip.core import objects
from kmip.core.objects import TemplateAttribute
from kmip.core.objects import CommonTemplateAttribute
from kmip.core.objects import PrivateKeyTemplateAttribute
from kmip.core.objects import PublicKeyTemplateAttribute
from kmip.core import primitives
from kmip.services.kmip_client import KMIPProxy
@ -761,6 +763,105 @@ class TestKMIPClient(TestCase):
self.assertEqual(None, result.get('result_reason'))
self.assertEqual(None, result.get('result_message'))
@mock.patch(
'kmip.services.kmip_client.KMIPProxy._build_request_message'
)
@mock.patch(
'kmip.services.kmip_client.KMIPProxy._send_and_receive_message'
)
def test_rekey(self, send_mock, build_mock):
"""
Test that the client can correctly build, send, and process a Rekey
request.
"""
payload = payloads.RekeyResponsePayload(
unique_identifier='1',
template_attribute=objects.TemplateAttribute(
attributes=[
objects.Attribute(
attribute_name=objects.Attribute.AttributeName(
'Cryptographic Algorithm'
),
attribute_value=primitives.Enumeration(
enums.CryptographicAlgorithm,
value=enums.CryptographicAlgorithm.AES,
tag=enums.Tags.CRYPTOGRAPHIC_ALGORITHM
)
),
objects.Attribute(
attribute_name=objects.Attribute.AttributeName(
'Cryptographic Length'
),
attribute_value=primitives.Integer(
value=128,
tag=enums.Tags.CRYPTOGRAPHIC_LENGTH
)
)
]
)
)
batch_item = ResponseBatchItem(
operation=Operation(OperationEnum.REKEY),
result_status=ResultStatus(ResultStatusEnum.SUCCESS),
response_payload=payload
)
response = ResponseMessage(batch_items=[batch_item])
build_mock.return_value = None
send_mock.return_value = response
result = self.client.rekey(
uuid='1',
offset=0,
template_attribute=objects.TemplateAttribute(
attributes=[
objects.Attribute(
attribute_name=objects.Attribute.AttributeName(
'Activation Date'
),
attribute_value=primitives.DateTime(
value=1136113200,
tag=enums.Tags.ACTIVATION_DATE
)
)
]
)
)
self.assertEqual('1', result.get('unique_identifier'))
self.assertEqual(
objects.TemplateAttribute(
attributes=[
objects.Attribute(
attribute_name=objects.Attribute.AttributeName(
'Cryptographic Algorithm'
),
attribute_value=primitives.Enumeration(
enums.CryptographicAlgorithm,
value=enums.CryptographicAlgorithm.AES,
tag=enums.Tags.CRYPTOGRAPHIC_ALGORITHM
)
),
objects.Attribute(
attribute_name=objects.Attribute.AttributeName(
'Cryptographic Length'
),
attribute_value=primitives.Integer(
value=128,
tag=enums.Tags.CRYPTOGRAPHIC_LENGTH
)
)
]
),
result.get('template_attribute')
)
self.assertEqual(
ResultStatusEnum.SUCCESS,
result.get('result_status')
)
self.assertEqual(None, result.get('result_reason'))
self.assertEqual(None, result.get('result_message'))
@mock.patch(
'kmip.services.kmip_client.KMIPProxy._build_request_message'
)