diff --git a/kmip/pie/client.py b/kmip/pie/client.py index 8f7f83c..ffa0c03 100644 --- a/kmip/pie/client.py +++ b/kmip/pie/client.py @@ -1000,6 +1000,75 @@ class ProxyKmipClient(api.KmipClient): result.get('result_message') ) + def signature_verify(self, message, signature, uid=None, + cryptographic_parameters=None): + """ + Verify a message signature using the specified signing key. + + Args: + message (bytes): The bytes of the signed message. Required. + signature (bytes): The bytes of the message signature. Required. + uid (string): The unique ID of the signing key to use. + Optional, defaults to None. + cryptographic_parameters (dict): A dictionary containing various + cryptographic settings to be used for signature verification + (e.g., cryptographic algorithm, hashing algorithm, and/or + digital signature algorithm). Optional, defaults to None. + + Returns: + ValidityIndicator: An enumeration indicating whether or not the + signature was valid. + + Raises: + ClientConnectionNotOpen: if the client connection is unusable + KmipOperationFailure: if the operation result is a failure + TypeError: if the input arguments are invalid + + Notes: + The cryptographic_parameters argument is a dictionary that can + contain various key/value pairs. For a list of allowed pairs, + see the documentation for encrypt/decrypt. + """ + # Check input + if not isinstance(message, six.binary_type): + raise TypeError("Message must be bytes.") + if not isinstance(signature, six.binary_type): + raise TypeError("Signature must be bytes.") + if uid is not None: + if not isinstance(uid, six.string_types): + raise TypeError("Unique identifier must be a string.") + if cryptographic_parameters is not None: + if not isinstance(cryptographic_parameters, dict): + raise TypeError( + "Cryptographic parameters must be a dictionary." + ) + + # Verify that operations can be given at this time + if not self._is_open: + raise exceptions.ClientConnectionNotOpen() + + cryptographic_parameters = self._build_cryptographic_parameters( + cryptographic_parameters + ) + + # Decrypt the provided data and handle the results + result = self.proxy.signature_verify( + message, + signature, + uid, + cryptographic_parameters + ) + + status = result.get('result_status') + if status == enums.ResultStatus.SUCCESS: + return result.get('validity_indicator') + else: + raise exceptions.KmipOperationFailure( + status, + result.get('result_reason'), + result.get('result_message') + ) + def mac(self, data, uid=None, algorithm=None): """ Get the message authentication code for data. diff --git a/kmip/services/kmip_client.py b/kmip/services/kmip_client.py index 8ab39a4..5a829ea 100644 --- a/kmip/services/kmip_client.py +++ b/kmip/services/kmip_client.py @@ -64,6 +64,7 @@ from kmip.core.messages.payloads import query from kmip.core.messages.payloads import rekey_key_pair from kmip.core.messages.payloads import register from kmip.core.messages.payloads import revoke +from kmip.core.messages.payloads import signature_verify from kmip.core.messages.payloads import mac from kmip.services.server.kmip_protocol import KMIPProtocol @@ -671,6 +672,81 @@ class KMIPProxy(KMIP): return result + def signature_verify(self, + message, + signature, + unique_identifier=None, + cryptographic_parameters=None, + credential=None): + """ + Verify a message signature using the specified signing key. + + Args: + message (bytes): The bytes of the signed message. Required. + signature (bytes): The bytes of the message signature. Required. + unique_identifier (string): The unique ID of the signing key to + use. Optional, defaults to None. + cryptographic_parameters (CryptographicParameters): A structure + containing various cryptographic settings to be used for + signature verification. Optional, defaults to None. + credential (Credential): A credential object containing a set of + authorization parameters for the operation. Optional, defaults + to None. + + Returns: + dict: The results of the signature verify operation, containing the + following key/value pairs: + + Key | Value + ---------------------|----------------------------------------- + 'unique_identifier' | (string) The unique ID of the signing + | key used to verify the signature. + 'validity_indicator' | (ValidityIndicator) An enumeration + | indicating the result of signature + | verification. + 'result_status' | (ResultStatus) An enumeration indicating + | the status of the operation result. + 'result_reason' | (ResultReason) An enumeration providing + | context for the result status. + 'result_message' | (string) A message providing additional + | context for the operation result. + """ + operation = Operation(OperationEnum.SIGNATURE_VERIFY) + + request_payload = signature_verify.SignatureVerifyRequestPayload( + unique_identifier=unique_identifier, + cryptographic_parameters=cryptographic_parameters, + data=message, + signature_data=signature + ) + batch_item = messages.RequestBatchItem( + operation=operation, + request_payload=request_payload + ) + + request = self._build_request_message(credential, [batch_item]) + response = self._send_and_receive_message(request) + batch_item = response.batch_items[0] + payload = batch_item.response_payload + + result = {} + + if payload: + result['unique_identifier'] = payload.unique_identifier + result['validity_indicator'] = payload.validity_indicator + + result['result_status'] = batch_item.result_status.value + try: + result['result_reason'] = batch_item.result_reason.value + except: + result['result_reason'] = batch_item.result_reason + try: + result['result_message'] = batch_item.result_message.value + except: + result['result_message'] = batch_item.result_message + + return result + def mac(self, data, unique_identifier=None, cryptographic_parameters=None, credential=None): return self._mac( diff --git a/kmip/tests/unit/pie/test_client.py b/kmip/tests/unit/pie/test_client.py index aa014d7..2ec47ed 100644 --- a/kmip/tests/unit/pie/test_client.py +++ b/kmip/tests/unit/pie/test_client.py @@ -1945,6 +1945,181 @@ class TestProxyKmipClient(testtools.TestCase): **kwargs ) + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_signature_verify(self): + """ + Test that the client can verify a signature. + """ + result = { + 'unique_identifier': '1', + 'validity_indicator': enums.ValidityIndicator.VALID, + 'result_status': enums.ResultStatus.SUCCESS + } + + client = ProxyKmipClient() + client.open() + client.proxy.signature_verify.return_value = result + + validity = client.signature_verify( + ( + b'\x6B\x77\xB4\xD6\x30\x06\xDE\xE6' + b'\x05\xB1\x56\xE2\x74\x03\x97\x93' + b'\x58\xDE\xB9\xE7\x15\x46\x16\xD9' + b'\x74\x9D\xEC\xBE\xC0\x5D\x26\x4B' + ), + ( + b'\x00\x00\x00\x00\x00\x00\x00\x00' + ), + uid='1', + cryptographic_parameters={ + 'block_cipher_mode': enums.BlockCipherMode.CBC, + 'padding_method': enums.PaddingMethod.PKCS5, + 'cryptographic_algorithm': + enums.CryptographicAlgorithm.BLOWFISH + } + ) + + self.assertEqual(enums.ValidityIndicator.VALID, validity) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_signature_verify_on_invalid_inputs(self): + """ + Test that TypeError exception are raised when trying to verify + signatures with invalid parameters. + """ + client = ProxyKmipClient() + client.open() + client.proxy.signature_verify.return_value = {} + args = [ + [], + b'' + ] + kwargs = { + 'uid': '1', + 'cryptographic_parameters': {} + } + + self.assertRaisesRegexp( + TypeError, + "Message must be bytes.", + client.signature_verify, + *args, + **kwargs + ) + + args = [ + b'\x01\x02\x03\x04', + [] + ] + kwargs = { + 'uid': '1', + 'cryptographic_parameters': {} + } + + self.assertRaisesRegexp( + TypeError, + "Signature must be bytes.", + client.signature_verify, + *args, + **kwargs + ) + + args = [ + b'\x01\x02\x03\x04', + b'\xFF\xFF\xFF\xFF' + ] + kwargs = { + 'uid': 0, + 'cryptographic_parameters': {} + } + self.assertRaisesRegexp( + TypeError, + "Unique identifier must be a string.", + client.signature_verify, + *args, + **kwargs + ) + + args = [ + b'\x01\x02\x03\x04', + b'\xFF\xFF\xFF\xFF' + ] + kwargs = { + 'uid': '1', + 'cryptographic_parameters': 'invalid' + } + self.assertRaisesRegexp( + TypeError, + "Cryptographic parameters must be a dictionary.", + client.signature_verify, + *args, + **kwargs + ) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_signature_verify_on_closed(self): + """ + Test that a ClientConnectionNotOpen exception is raised when trying + to verify a signature on an unopened client connection. + """ + client = ProxyKmipClient() + args = [ + b'\x01\x02\x03\x04', + b'\xFF\xFF\xFF\xFF' + ] + kwargs = { + 'uid': '1', + 'cryptographic_parameters': {} + } + + self.assertRaises( + ClientConnectionNotOpen, + client.signature_verify, + *args, + **kwargs + ) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_signature_verify_on_operation_failure(self): + """ + Test that a KmipOperationFailure exception is raised when the + backend fails to verify a signature. + """ + status = enums.ResultStatus.OPERATION_FAILED + reason = enums.ResultReason.GENERAL_FAILURE + message = "Test failure message" + + result = { + 'result_status': status, + 'result_reason': reason, + 'result_message': message + } + error_message = str(KmipOperationFailure(status, reason, message)) + + client = ProxyKmipClient() + client.open() + client.proxy.signature_verify.return_value = result + args = [ + b'\x01\x02\x03\x04', + b'\xFF\xFF\xFF\xFF' + ] + kwargs = { + 'uid': '1', + 'cryptographic_parameters': {} + } + + self.assertRaisesRegexp( + KmipOperationFailure, + error_message, + client.signature_verify, + *args, + **kwargs + ) + @mock.patch('kmip.pie.client.KMIPProxy', mock.MagicMock(spec_set=KMIPProxy)) def test_mac(self): diff --git a/kmip/tests/unit/services/test_kmip_client.py b/kmip/tests/unit/services/test_kmip_client.py index dded6d5..03c7966 100644 --- a/kmip/tests/unit/services/test_kmip_client.py +++ b/kmip/tests/unit/services/test_kmip_client.py @@ -55,6 +55,7 @@ from kmip.core.messages.payloads.query import \ QueryRequestPayload, QueryResponsePayload from kmip.core.messages.payloads.rekey_key_pair import \ RekeyKeyPairRequestPayload, RekeyKeyPairResponsePayload +from kmip.core.messages.payloads import signature_verify from kmip.core.misc import Offset from kmip.core.misc import QueryFunction @@ -906,6 +907,60 @@ class TestKMIPClient(TestCase): self.assertEqual(None, result.get('result_reason')) self.assertEqual(None, result.get('result_message')) + @mock.patch( + 'kmip.services.kmip_client.KMIPProxy._build_request_message' + ) + @mock.patch( + 'kmip.services.kmip_client.KMIPProxy._send_and_receive_message' + ) + def test_signature_verify(self, send_mock, build_mock): + """ + Test that the client can verify a signature. + """ + payload = signature_verify.SignatureVerifyResponsePayload( + unique_identifier='1', + validity_indicator=enums.ValidityIndicator.INVALID + ) + batch_item = ResponseBatchItem( + operation=Operation(OperationEnum.SIGNATURE_VERIFY), + result_status=ResultStatus(ResultStatusEnum.SUCCESS), + response_payload=payload + ) + response = ResponseMessage(batch_items=[batch_item]) + + build_mock.return_value = None + send_mock.return_value = response + + result = self.client.signature_verify( + ( + b'\x6B\x77\xB4\xD6\x30\x06\xDE\xE6' + b'\x05\xB1\x56\xE2\x74\x03\x97\x93' + b'\x58\xDE\xB9\xE7\x15\x46\x16\xD9' + b'\x74\x9D\xEC\xBE\xC0\x5D\x26\x4B' + ), + ( + b'\x11\x11\x11\x11\x11\x11\x11\x11' + ), + unique_identifier='1', + cryptographic_parameters=CryptographicParameters( + padding_method=enums.PaddingMethod.PKCS1v15, + cryptographic_algorithm=enums.CryptographicAlgorithm.RSA, + hashing_algorithm=enums.HashingAlgorithm.SHA_224 + ) + ) + + self.assertEqual('1', result.get('unique_identifier')) + self.assertEqual( + enums.ValidityIndicator.INVALID, + result.get('validity_indicator') + ) + self.assertEqual( + ResultStatusEnum.SUCCESS, + result.get('result_status') + ) + self.assertEqual(None, result.get('result_reason')) + self.assertEqual(None, result.get('result_message')) + @mock.patch('kmip.services.kmip_client.KMIPProxy._send_message', mock.MagicMock()) @mock.patch('kmip.services.kmip_client.KMIPProxy._receive_message',