Add Check support to the KMIPProxy client

This change adds Check operation support to the KMIPProxy client.
The client unit test suite has been updated to cover the new
additions.

Partially addresses #405
This commit is contained in:
Peter Hamilton 2018-04-06 00:02:36 -04:00
parent fdfbba8a0e
commit 822b889f2e
2 changed files with 156 additions and 0 deletions

View File

@ -31,6 +31,7 @@ from kmip.services.results import MACResult
from kmip.core import attributes as attr
from kmip.core import enums
from kmip.core.enums import AuthenticationSuite
from kmip.core.enums import ConformanceClause
from kmip.core.enums import CredentialType
@ -364,6 +365,107 @@ class KMIPProxy:
return result
def check(self,
uuid=None,
usage_limits_count=None,
cryptographic_usage_mask=None,
lease_time=None,
credential=None):
"""
Check object usage according to specific constraints.
Args:
uuid (string): The unique identifier of a managed cryptographic
object that should be checked. Optional, defaults to None.
usage_limits_count (int): An integer specifying the number of
items that can be secured with the specified cryptographic
object. Optional, defaults to None.
cryptographic_usage_mask (list): A list of CryptographicUsageMask
enumerations specifying the operations possible with the
specified cryptographic object. Optional, defaults to None.
lease_time (int): The number of seconds that can be leased for the
specified cryptographic object. Optional, defaults to None.
credential (Credential): A Credential struct containing a set of
authorization parameters for the operation. Optional, defaults
to None.
Returns:
dict: The results of the check operation, containing the following
key/value pairs:
Key | Value
---------------------------|-----------------------------------
'unique_identifier' | (string) The unique ID of the
| checked cryptographic object.
'usage_limits_count' | (int) The value provided as input
| if the value exceeds server
| constraints.
'cryptographic_usage_mask' | (list) The value provided as input
| if the value exceeds server
| constraints.
'lease_time' | (int) The value provided as input
| if the value exceeds server
| constraints.
'result_status' | (ResultStatus) An enumeration
| indicating the status of the
| operation result.
'result_reason' | (ResultReason) An enumeration
| providing context for the result
| status.
'result_message' | (string) A message providing
| additional context for the
| operation result.
"""
# TODO (peter-hamilton) Push this into the Check request.
mask = 0
for m in cryptographic_usage_mask:
mask |= m.value
operation = Operation(OperationEnum.CHECK)
request_payload = payloads.CheckRequestPayload(
unique_identifier=uuid,
usage_limits_count=usage_limits_count,
cryptographic_usage_mask=mask,
lease_time=lease_time
)
batch_item = messages.RequestBatchItem(
operation=operation,
request_payload=request_payload
)
request = self._build_request_message(credential, [batch_item])
response = self._send_and_receive_message(request)
batch_item = response.batch_items[0]
payload = batch_item.response_payload
result = {}
if payload:
result['unique_identifier'] = payload.unique_identifier
if payload.usage_limits_count is not None:
result['usage_limits_count'] = payload.usage_limits_count
if payload.cryptographic_usage_mask is not None:
# TODO (peter-hamilton) Push this into the Check response.
masks = []
for enumeration in enums.CryptographicUsageMask:
if payload.cryptographic_usage_mask & enumeration.value:
masks.append(enumeration)
result['cryptographic_usage_mask'] = masks
if payload.lease_time is not None:
result['lease_time'] = payload.lease_time
result['result_status'] = batch_item.result_status.value
try:
result['result_reason'] = batch_item.result_reason.value
except Exception:
result['result_reason'] = batch_item.result_reason
try:
result['result_message'] = batch_item.result_message.value
except Exception:
result['result_message'] = batch_item.result_message
return result
def get(self, uuid=None, key_format_type=None, key_compression_type=None,
key_wrapping_specification=None, credential=None):
return self._get(

View File

@ -707,6 +707,60 @@ class TestKMIPClient(TestCase):
self.client._create_socket(sock)
self.assertEqual(ssl.SSLSocket, type(self.client.socket))
@mock.patch(
'kmip.services.kmip_client.KMIPProxy._build_request_message'
)
@mock.patch(
'kmip.services.kmip_client.KMIPProxy._send_and_receive_message'
)
def test_check(self, send_mock, build_mock):
"""
Test that the client can correctly build, send, and process a Check
request.
"""
payload = payloads.CheckResponsePayload(
unique_identifier='1',
usage_limits_count=100,
cryptographic_usage_mask=12,
lease_time=10000
)
batch_item = ResponseBatchItem(
operation=Operation(OperationEnum.CHECK),
result_status=ResultStatus(ResultStatusEnum.SUCCESS),
response_payload=payload
)
response = ResponseMessage(batch_items=[batch_item])
build_mock.return_value = None
send_mock.return_value = response
result = self.client.check(
'1',
100,
[
enums.CryptographicUsageMask.ENCRYPT,
enums.CryptographicUsageMask.DECRYPT
],
10000
)
self.assertEqual('1', result.get('unique_identifier'))
self.assertEqual(100, result.get('usage_limits_count'))
self.assertEqual(
[
enums.CryptographicUsageMask.ENCRYPT,
enums.CryptographicUsageMask.DECRYPT
],
result.get('cryptographic_usage_mask')
)
self.assertEqual(10000, result.get('lease_time'))
self.assertEqual(
ResultStatusEnum.SUCCESS,
result.get('result_status')
)
self.assertEqual(None, result.get('result_reason'))
self.assertEqual(None, result.get('result_message'))
@mock.patch(
'kmip.services.kmip_client.KMIPProxy._build_request_message'
)