diff --git a/kmip/services/kmip_client.py b/kmip/services/kmip_client.py index eff7d6c..73f5a25 100644 --- a/kmip/services/kmip_client.py +++ b/kmip/services/kmip_client.py @@ -31,6 +31,7 @@ from kmip.services.results import MACResult from kmip.core import attributes as attr +from kmip.core import enums from kmip.core.enums import AuthenticationSuite from kmip.core.enums import ConformanceClause from kmip.core.enums import CredentialType @@ -364,6 +365,107 @@ class KMIPProxy: return result + def check(self, + uuid=None, + usage_limits_count=None, + cryptographic_usage_mask=None, + lease_time=None, + credential=None): + """ + Check object usage according to specific constraints. + + Args: + uuid (string): The unique identifier of a managed cryptographic + object that should be checked. Optional, defaults to None. + usage_limits_count (int): An integer specifying the number of + items that can be secured with the specified cryptographic + object. Optional, defaults to None. + cryptographic_usage_mask (list): A list of CryptographicUsageMask + enumerations specifying the operations possible with the + specified cryptographic object. Optional, defaults to None. + lease_time (int): The number of seconds that can be leased for the + specified cryptographic object. Optional, defaults to None. + credential (Credential): A Credential struct containing a set of + authorization parameters for the operation. Optional, defaults + to None. + + Returns: + dict: The results of the check operation, containing the following + key/value pairs: + + Key | Value + ---------------------------|----------------------------------- + 'unique_identifier' | (string) The unique ID of the + | checked cryptographic object. + 'usage_limits_count' | (int) The value provided as input + | if the value exceeds server + | constraints. + 'cryptographic_usage_mask' | (list) The value provided as input + | if the value exceeds server + | constraints. + 'lease_time' | (int) The value provided as input + | if the value exceeds server + | constraints. + 'result_status' | (ResultStatus) An enumeration + | indicating the status of the + | operation result. + 'result_reason' | (ResultReason) An enumeration + | providing context for the result + | status. + 'result_message' | (string) A message providing + | additional context for the + | operation result. + """ + # TODO (peter-hamilton) Push this into the Check request. + mask = 0 + for m in cryptographic_usage_mask: + mask |= m.value + + operation = Operation(OperationEnum.CHECK) + request_payload = payloads.CheckRequestPayload( + unique_identifier=uuid, + usage_limits_count=usage_limits_count, + cryptographic_usage_mask=mask, + lease_time=lease_time + ) + batch_item = messages.RequestBatchItem( + operation=operation, + request_payload=request_payload + ) + + request = self._build_request_message(credential, [batch_item]) + response = self._send_and_receive_message(request) + batch_item = response.batch_items[0] + payload = batch_item.response_payload + + result = {} + + if payload: + result['unique_identifier'] = payload.unique_identifier + if payload.usage_limits_count is not None: + result['usage_limits_count'] = payload.usage_limits_count + if payload.cryptographic_usage_mask is not None: + # TODO (peter-hamilton) Push this into the Check response. + masks = [] + for enumeration in enums.CryptographicUsageMask: + if payload.cryptographic_usage_mask & enumeration.value: + masks.append(enumeration) + result['cryptographic_usage_mask'] = masks + if payload.lease_time is not None: + result['lease_time'] = payload.lease_time + + result['result_status'] = batch_item.result_status.value + try: + result['result_reason'] = batch_item.result_reason.value + except Exception: + result['result_reason'] = batch_item.result_reason + try: + result['result_message'] = batch_item.result_message.value + except Exception: + result['result_message'] = batch_item.result_message + + return result + def get(self, uuid=None, key_format_type=None, key_compression_type=None, key_wrapping_specification=None, credential=None): return self._get( diff --git a/kmip/tests/unit/services/test_kmip_client.py b/kmip/tests/unit/services/test_kmip_client.py index 2a6a172..fb9cbab 100644 --- a/kmip/tests/unit/services/test_kmip_client.py +++ b/kmip/tests/unit/services/test_kmip_client.py @@ -707,6 +707,60 @@ class TestKMIPClient(TestCase): self.client._create_socket(sock) self.assertEqual(ssl.SSLSocket, type(self.client.socket)) + @mock.patch( + 'kmip.services.kmip_client.KMIPProxy._build_request_message' + ) + @mock.patch( + 'kmip.services.kmip_client.KMIPProxy._send_and_receive_message' + ) + def test_check(self, send_mock, build_mock): + """ + Test that the client can correctly build, send, and process a Check + request. + """ + payload = payloads.CheckResponsePayload( + unique_identifier='1', + usage_limits_count=100, + cryptographic_usage_mask=12, + lease_time=10000 + ) + batch_item = ResponseBatchItem( + operation=Operation(OperationEnum.CHECK), + result_status=ResultStatus(ResultStatusEnum.SUCCESS), + response_payload=payload + ) + response = ResponseMessage(batch_items=[batch_item]) + + build_mock.return_value = None + send_mock.return_value = response + + result = self.client.check( + '1', + 100, + [ + enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.DECRYPT + ], + 10000 + ) + + self.assertEqual('1', result.get('unique_identifier')) + self.assertEqual(100, result.get('usage_limits_count')) + self.assertEqual( + [ + enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.DECRYPT + ], + result.get('cryptographic_usage_mask') + ) + self.assertEqual(10000, result.get('lease_time')) + self.assertEqual( + ResultStatusEnum.SUCCESS, + result.get('result_status') + ) + self.assertEqual(None, result.get('result_reason')) + self.assertEqual(None, result.get('result_message')) + @mock.patch( 'kmip.services.kmip_client.KMIPProxy._build_request_message' )