Merge pull request #330 from danetrain/feat/sign-add-client-support

Add Sign operation support to clients.
This commit is contained in:
Peter Hamilton 2017-09-12 12:25:00 -04:00 committed by GitHub
commit 5c20fe9b3e
4 changed files with 251 additions and 0 deletions

View File

@ -1069,6 +1069,64 @@ class ProxyKmipClient(api.KmipClient):
result.get('result_message') result.get('result_message')
) )
def sign(self, data, uid=None, cryptographic_parameters=None):
"""
Create a digital signature for data using the specified signing key.
Args:
data (bytes): The bytes of the data to be signed. Required.
uid (string): The unique ID of the signing key to use.
Optional, defaults to None.
cryptographic_parameters (dict): A dictionary containing various
cryptographic settings to be used for creating the signature
(e.g., cryptographic algorithm, hashing algorithm, and/or
digital signature algorithm). Optional, defaults to None.
Returns:
signature (bytes): Bytes representing the signature of the data
Raises:
ClientConnectionNotOpen: if the client connection is unusable
KmipOperationFailure: if the operation result is a failure
TypeError: if the input arguments are invalid
"""
# Check input
if not isinstance(data, six.binary_type):
raise TypeError("Data to be signed must be bytes.")
if uid is not None:
if not isinstance(uid, six.string_types):
raise TypeError("Unique identifier must be a string.")
if cryptographic_parameters is not None:
if not isinstance(cryptographic_parameters, dict):
raise TypeError(
"Cryptographic parameters must be a dictionary."
)
# Verify that operations can be served at this time
if not self._is_open:
raise exceptions.ClientConnectionNotOpen()
cryptographic_parameters = self._build_cryptographic_parameters(
cryptographic_parameters
)
# Sign the provided data and handle results
result = self.proxy.sign(
data,
uid,
cryptographic_parameters
)
status = result.get('result_status')
if status == enums.ResultStatus.SUCCESS:
return result.get('signature')
else:
raise exceptions.KmipOperationFailure(
status,
result.get('result_reason'),
result.get('result_message')
)
def mac(self, data, uid=None, algorithm=None): def mac(self, data, uid=None, algorithm=None):
""" """
Get the message authentication code for data. Get the message authentication code for data.

View File

@ -64,6 +64,7 @@ from kmip.core.messages.payloads import query
from kmip.core.messages.payloads import rekey_key_pair from kmip.core.messages.payloads import rekey_key_pair
from kmip.core.messages.payloads import register from kmip.core.messages.payloads import register
from kmip.core.messages.payloads import revoke from kmip.core.messages.payloads import revoke
from kmip.core.messages.payloads import sign
from kmip.core.messages.payloads import signature_verify from kmip.core.messages.payloads import signature_verify
from kmip.core.messages.payloads import mac from kmip.core.messages.payloads import mac
@ -747,6 +748,72 @@ class KMIPProxy(KMIP):
return result return result
def sign(self, data, unique_identifier=None,
cryptographic_parameters=None, credential=None):
"""
Sign specified data using a specified signing key.
Args:
data (bytes): Data to be signed. Required.
unique_identifier (string): The unique ID of the signing
key to be used. Optional, defaults to None.
cryptographic_parameters (CryptographicParameters): A structure
containing various cryptographic settings to be used for
creating the signature. Optional, defaults to None.
credential (Credential): A credential object containing a set of
authorization parameters for the operation. Optional, defaults
to None.
Returns:
dict: The results of the sign operation, containing the
following key/value pairs:
Key | Value
---------------------|-----------------------------------------
'unique_identifier' | (string) The unique ID of the signing
| key used to create the signature
'signature' | (bytes) The bytes of the signature
'result_status' | (ResultStatus) An enumeration indicating
| the status of the operation result
'result_reason' | (ResultReason) An enumeration providing
| context for the result status.
'result_message' | (string) A message providing additional
| context for the operation result.
"""
operation = Operation(OperationEnum.SIGN)
request_payload = sign.SignRequestPayload(
unique_identifier=unique_identifier,
cryptographic_parameters=cryptographic_parameters,
data=data
)
batch_item = messages.RequestBatchItem(
operation=operation,
request_payload=request_payload
)
request = self._build_request_message(credential, [batch_item])
response = self._send_and_receive_message(request)
batch_item = response.batch_items[0]
payload = batch_item.response_payload
result = {}
if payload:
result['unique_identifier'] = payload.unique_identifier
result['signature'] = payload.signature_data
result['result_status'] = batch_item.result_status.value
try:
result['result_reason'] = batch_item.result_reason.value
except:
result['result_reason'] = batch_item.result_reason
try:
result['result_message'] = batch_item.result_message.value
except:
result['result_message'] = batch_item.result_message
return result
def mac(self, data, unique_identifier=None, def mac(self, data, unique_identifier=None,
cryptographic_parameters=None, credential=None): cryptographic_parameters=None, credential=None):
return self._mac( return self._mac(

View File

@ -1982,6 +1982,85 @@ class TestProxyKmipClient(testtools.TestCase):
self.assertEqual(enums.ValidityIndicator.VALID, validity) self.assertEqual(enums.ValidityIndicator.VALID, validity)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_sign(self):
"""
Test that the client can sign data.
"""
mock_signature = b'aaaaaaaaaaaaaaaaaaaaaaaaaa'
result = {
'result_status': enums.ResultStatus.SUCCESS,
'unique_identifier': '1',
'signature': mock_signature
}
client = ProxyKmipClient()
client.open()
client.proxy.sign.return_value = result
actual_signature = client.sign(
b'\x01\x02\x03\x04\x05\x06\x07\x08',
uid='1',
cryptographic_parameters={
'padding_method': enums.PaddingMethod.PSS,
'cryptographic_algorithm':
enums.CryptographicAlgorithm.RSA
}
)
self.assertEqual(mock_signature, actual_signature)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_sign_on_invalid_inputs(self):
"""
Test that TypeError exceptions are raised when trying to sign
data with invalid parameters.
"""
client = ProxyKmipClient()
client.open()
client.proxy.sign.return_value = {}
args = [1234]
kwargs = {
'uid': '1',
'cryptographic_parameters': {}
}
self.assertRaisesRegexp(
TypeError,
"Data to be signed must be bytes.",
client.sign,
*args,
**kwargs
)
args = [
b'\x01\x02\x03\x04'
]
kwargs = {
'uid': 0,
'cryptographic_parameters': {}
}
self.assertRaisesRegexp(
TypeError,
"Unique identifier must be a string.",
client.sign,
*args,
**kwargs
)
kwargs = {
'uid': '1',
'cryptographic_parameters': 'invalid'
}
self.assertRaisesRegexp(
TypeError,
"Cryptographic parameters must be a dictionary.",
client.sign,
*args,
**kwargs
)
@mock.patch('kmip.pie.client.KMIPProxy', @mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy)) mock.MagicMock(spec_set=KMIPProxy))
def test_signature_verify_on_invalid_inputs(self): def test_signature_verify_on_invalid_inputs(self):

View File

@ -55,6 +55,7 @@ from kmip.core.messages.payloads.query import \
QueryRequestPayload, QueryResponsePayload QueryRequestPayload, QueryResponsePayload
from kmip.core.messages.payloads.rekey_key_pair import \ from kmip.core.messages.payloads.rekey_key_pair import \
RekeyKeyPairRequestPayload, RekeyKeyPairResponsePayload RekeyKeyPairRequestPayload, RekeyKeyPairResponsePayload
from kmip.core.messages.payloads import sign
from kmip.core.messages.payloads import signature_verify from kmip.core.messages.payloads import signature_verify
from kmip.core.misc import Offset from kmip.core.misc import Offset
@ -961,6 +962,52 @@ class TestKMIPClient(TestCase):
self.assertEqual(None, result.get('result_reason')) self.assertEqual(None, result.get('result_reason'))
self.assertEqual(None, result.get('result_message')) self.assertEqual(None, result.get('result_message'))
@mock.patch(
'kmip.services.kmip_client.KMIPProxy._build_request_message'
)
@mock.patch(
'kmip.services.kmip_client.KMIPProxy._send_and_receive_message'
)
def test_sign(self, send_mock, build_mock):
"""
Test that the client can sign data
"""
payload = sign.SignResponsePayload(
unique_identifier='1',
signature_data=b'aaaaaaaaaaaaaaaa'
)
batch_item = ResponseBatchItem(
operation=Operation(OperationEnum.SIGN),
result_status=ResultStatus(ResultStatusEnum.SUCCESS),
response_payload=payload
)
response = ResponseMessage(batch_items=[batch_item])
build_mock.return_value = None
send_mock.return_value = response
result = self.client.sign(
b'\x11\x11\x11\x11\x11\x11\x11\x11',
unique_identifier='1',
cryptographic_parameters=CryptographicParameters(
padding_method=enums.PaddingMethod.PKCS1v15,
cryptographic_algorithm=enums.CryptographicAlgorithm.RSA,
hashing_algorithm=enums.HashingAlgorithm.SHA_224
)
)
self.assertEqual('1', result.get('unique_identifier'))
self.assertEqual(
b'aaaaaaaaaaaaaaaa',
result.get('signature')
)
self.assertEqual(
ResultStatusEnum.SUCCESS,
result.get('result_status')
)
self.assertEqual(None, result.get('result_reason'))
self.assertEqual(None, result.get('result_message'))
@mock.patch('kmip.services.kmip_client.KMIPProxy._send_message', @mock.patch('kmip.services.kmip_client.KMIPProxy._send_message',
mock.MagicMock()) mock.MagicMock())
@mock.patch('kmip.services.kmip_client.KMIPProxy._receive_message', @mock.patch('kmip.services.kmip_client.KMIPProxy._receive_message',