Merge pull request #328 from OpenKMIP/feat/add-signature-verify-to-clients

Add SignatureVerify support to the clients
This commit is contained in:
Peter Hamilton 2017-08-30 10:36:23 -04:00 committed by GitHub
commit 139425143d
4 changed files with 375 additions and 0 deletions

View File

@ -1000,6 +1000,75 @@ class ProxyKmipClient(api.KmipClient):
result.get('result_message')
)
def signature_verify(self, message, signature, uid=None,
cryptographic_parameters=None):
"""
Verify a message signature using the specified signing key.
Args:
message (bytes): The bytes of the signed message. Required.
signature (bytes): The bytes of the message signature. Required.
uid (string): The unique ID of the signing key to use.
Optional, defaults to None.
cryptographic_parameters (dict): A dictionary containing various
cryptographic settings to be used for signature verification
(e.g., cryptographic algorithm, hashing algorithm, and/or
digital signature algorithm). Optional, defaults to None.
Returns:
ValidityIndicator: An enumeration indicating whether or not the
signature was valid.
Raises:
ClientConnectionNotOpen: if the client connection is unusable
KmipOperationFailure: if the operation result is a failure
TypeError: if the input arguments are invalid
Notes:
The cryptographic_parameters argument is a dictionary that can
contain various key/value pairs. For a list of allowed pairs,
see the documentation for encrypt/decrypt.
"""
# Check input
if not isinstance(message, six.binary_type):
raise TypeError("Message must be bytes.")
if not isinstance(signature, six.binary_type):
raise TypeError("Signature must be bytes.")
if uid is not None:
if not isinstance(uid, six.string_types):
raise TypeError("Unique identifier must be a string.")
if cryptographic_parameters is not None:
if not isinstance(cryptographic_parameters, dict):
raise TypeError(
"Cryptographic parameters must be a dictionary."
)
# Verify that operations can be given at this time
if not self._is_open:
raise exceptions.ClientConnectionNotOpen()
cryptographic_parameters = self._build_cryptographic_parameters(
cryptographic_parameters
)
# Decrypt the provided data and handle the results
result = self.proxy.signature_verify(
message,
signature,
uid,
cryptographic_parameters
)
status = result.get('result_status')
if status == enums.ResultStatus.SUCCESS:
return result.get('validity_indicator')
else:
raise exceptions.KmipOperationFailure(
status,
result.get('result_reason'),
result.get('result_message')
)
def mac(self, data, uid=None, algorithm=None):
"""
Get the message authentication code for data.

View File

@ -64,6 +64,7 @@ from kmip.core.messages.payloads import query
from kmip.core.messages.payloads import rekey_key_pair
from kmip.core.messages.payloads import register
from kmip.core.messages.payloads import revoke
from kmip.core.messages.payloads import signature_verify
from kmip.core.messages.payloads import mac
from kmip.services.server.kmip_protocol import KMIPProtocol
@ -671,6 +672,81 @@ class KMIPProxy(KMIP):
return result
def signature_verify(self,
message,
signature,
unique_identifier=None,
cryptographic_parameters=None,
credential=None):
"""
Verify a message signature using the specified signing key.
Args:
message (bytes): The bytes of the signed message. Required.
signature (bytes): The bytes of the message signature. Required.
unique_identifier (string): The unique ID of the signing key to
use. Optional, defaults to None.
cryptographic_parameters (CryptographicParameters): A structure
containing various cryptographic settings to be used for
signature verification. Optional, defaults to None.
credential (Credential): A credential object containing a set of
authorization parameters for the operation. Optional, defaults
to None.
Returns:
dict: The results of the signature verify operation, containing the
following key/value pairs:
Key | Value
---------------------|-----------------------------------------
'unique_identifier' | (string) The unique ID of the signing
| key used to verify the signature.
'validity_indicator' | (ValidityIndicator) An enumeration
| indicating the result of signature
| verification.
'result_status' | (ResultStatus) An enumeration indicating
| the status of the operation result.
'result_reason' | (ResultReason) An enumeration providing
| context for the result status.
'result_message' | (string) A message providing additional
| context for the operation result.
"""
operation = Operation(OperationEnum.SIGNATURE_VERIFY)
request_payload = signature_verify.SignatureVerifyRequestPayload(
unique_identifier=unique_identifier,
cryptographic_parameters=cryptographic_parameters,
data=message,
signature_data=signature
)
batch_item = messages.RequestBatchItem(
operation=operation,
request_payload=request_payload
)
request = self._build_request_message(credential, [batch_item])
response = self._send_and_receive_message(request)
batch_item = response.batch_items[0]
payload = batch_item.response_payload
result = {}
if payload:
result['unique_identifier'] = payload.unique_identifier
result['validity_indicator'] = payload.validity_indicator
result['result_status'] = batch_item.result_status.value
try:
result['result_reason'] = batch_item.result_reason.value
except:
result['result_reason'] = batch_item.result_reason
try:
result['result_message'] = batch_item.result_message.value
except:
result['result_message'] = batch_item.result_message
return result
def mac(self, data, unique_identifier=None,
cryptographic_parameters=None, credential=None):
return self._mac(

View File

@ -1945,6 +1945,181 @@ class TestProxyKmipClient(testtools.TestCase):
**kwargs
)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_signature_verify(self):
"""
Test that the client can verify a signature.
"""
result = {
'unique_identifier': '1',
'validity_indicator': enums.ValidityIndicator.VALID,
'result_status': enums.ResultStatus.SUCCESS
}
client = ProxyKmipClient()
client.open()
client.proxy.signature_verify.return_value = result
validity = client.signature_verify(
(
b'\x6B\x77\xB4\xD6\x30\x06\xDE\xE6'
b'\x05\xB1\x56\xE2\x74\x03\x97\x93'
b'\x58\xDE\xB9\xE7\x15\x46\x16\xD9'
b'\x74\x9D\xEC\xBE\xC0\x5D\x26\x4B'
),
(
b'\x00\x00\x00\x00\x00\x00\x00\x00'
),
uid='1',
cryptographic_parameters={
'block_cipher_mode': enums.BlockCipherMode.CBC,
'padding_method': enums.PaddingMethod.PKCS5,
'cryptographic_algorithm':
enums.CryptographicAlgorithm.BLOWFISH
}
)
self.assertEqual(enums.ValidityIndicator.VALID, validity)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_signature_verify_on_invalid_inputs(self):
"""
Test that TypeError exception are raised when trying to verify
signatures with invalid parameters.
"""
client = ProxyKmipClient()
client.open()
client.proxy.signature_verify.return_value = {}
args = [
[],
b''
]
kwargs = {
'uid': '1',
'cryptographic_parameters': {}
}
self.assertRaisesRegexp(
TypeError,
"Message must be bytes.",
client.signature_verify,
*args,
**kwargs
)
args = [
b'\x01\x02\x03\x04',
[]
]
kwargs = {
'uid': '1',
'cryptographic_parameters': {}
}
self.assertRaisesRegexp(
TypeError,
"Signature must be bytes.",
client.signature_verify,
*args,
**kwargs
)
args = [
b'\x01\x02\x03\x04',
b'\xFF\xFF\xFF\xFF'
]
kwargs = {
'uid': 0,
'cryptographic_parameters': {}
}
self.assertRaisesRegexp(
TypeError,
"Unique identifier must be a string.",
client.signature_verify,
*args,
**kwargs
)
args = [
b'\x01\x02\x03\x04',
b'\xFF\xFF\xFF\xFF'
]
kwargs = {
'uid': '1',
'cryptographic_parameters': 'invalid'
}
self.assertRaisesRegexp(
TypeError,
"Cryptographic parameters must be a dictionary.",
client.signature_verify,
*args,
**kwargs
)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_signature_verify_on_closed(self):
"""
Test that a ClientConnectionNotOpen exception is raised when trying
to verify a signature on an unopened client connection.
"""
client = ProxyKmipClient()
args = [
b'\x01\x02\x03\x04',
b'\xFF\xFF\xFF\xFF'
]
kwargs = {
'uid': '1',
'cryptographic_parameters': {}
}
self.assertRaises(
ClientConnectionNotOpen,
client.signature_verify,
*args,
**kwargs
)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_signature_verify_on_operation_failure(self):
"""
Test that a KmipOperationFailure exception is raised when the
backend fails to verify a signature.
"""
status = enums.ResultStatus.OPERATION_FAILED
reason = enums.ResultReason.GENERAL_FAILURE
message = "Test failure message"
result = {
'result_status': status,
'result_reason': reason,
'result_message': message
}
error_message = str(KmipOperationFailure(status, reason, message))
client = ProxyKmipClient()
client.open()
client.proxy.signature_verify.return_value = result
args = [
b'\x01\x02\x03\x04',
b'\xFF\xFF\xFF\xFF'
]
kwargs = {
'uid': '1',
'cryptographic_parameters': {}
}
self.assertRaisesRegexp(
KmipOperationFailure,
error_message,
client.signature_verify,
*args,
**kwargs
)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_mac(self):

View File

@ -55,6 +55,7 @@ from kmip.core.messages.payloads.query import \
QueryRequestPayload, QueryResponsePayload
from kmip.core.messages.payloads.rekey_key_pair import \
RekeyKeyPairRequestPayload, RekeyKeyPairResponsePayload
from kmip.core.messages.payloads import signature_verify
from kmip.core.misc import Offset
from kmip.core.misc import QueryFunction
@ -906,6 +907,60 @@ class TestKMIPClient(TestCase):
self.assertEqual(None, result.get('result_reason'))
self.assertEqual(None, result.get('result_message'))
@mock.patch(
'kmip.services.kmip_client.KMIPProxy._build_request_message'
)
@mock.patch(
'kmip.services.kmip_client.KMIPProxy._send_and_receive_message'
)
def test_signature_verify(self, send_mock, build_mock):
"""
Test that the client can verify a signature.
"""
payload = signature_verify.SignatureVerifyResponsePayload(
unique_identifier='1',
validity_indicator=enums.ValidityIndicator.INVALID
)
batch_item = ResponseBatchItem(
operation=Operation(OperationEnum.SIGNATURE_VERIFY),
result_status=ResultStatus(ResultStatusEnum.SUCCESS),
response_payload=payload
)
response = ResponseMessage(batch_items=[batch_item])
build_mock.return_value = None
send_mock.return_value = response
result = self.client.signature_verify(
(
b'\x6B\x77\xB4\xD6\x30\x06\xDE\xE6'
b'\x05\xB1\x56\xE2\x74\x03\x97\x93'
b'\x58\xDE\xB9\xE7\x15\x46\x16\xD9'
b'\x74\x9D\xEC\xBE\xC0\x5D\x26\x4B'
),
(
b'\x11\x11\x11\x11\x11\x11\x11\x11'
),
unique_identifier='1',
cryptographic_parameters=CryptographicParameters(
padding_method=enums.PaddingMethod.PKCS1v15,
cryptographic_algorithm=enums.CryptographicAlgorithm.RSA,
hashing_algorithm=enums.HashingAlgorithm.SHA_224
)
)
self.assertEqual('1', result.get('unique_identifier'))
self.assertEqual(
enums.ValidityIndicator.INVALID,
result.get('validity_indicator')
)
self.assertEqual(
ResultStatusEnum.SUCCESS,
result.get('result_status')
)
self.assertEqual(None, result.get('result_reason'))
self.assertEqual(None, result.get('result_message'))
@mock.patch('kmip.services.kmip_client.KMIPProxy._send_message',
mock.MagicMock())
@mock.patch('kmip.services.kmip_client.KMIPProxy._receive_message',