Merge pull request #291 from OpenKMIP/feat/add-encrypt-to-clients

Add Encrypt support to the clients
This commit is contained in:
Peter Hamilton 2017-06-28 10:59:30 -04:00 committed by GitHub
commit 4c244a1f6f
6 changed files with 482 additions and 0 deletions

View File

@ -139,6 +139,25 @@ class KmipClient:
"""
pass
@abc.abstractmethod
def encrypt(self, data, uid=None, cryptographic_parameters=None,
iv_counter_nonce=None):
"""
Encrypt data using the specified encryption key and parameters.
Args:
data (bytes): The bytes to encrypt. Required.
uid (string): The unique ID of the encryption key to use.
Optional, defaults to None.
cryptographic_parameters (dict): A dictionary containing various
cryptographic settings to be used for the encryption.
Optional, defaults to None.
iv_counter_nonce (bytes): The bytes to use for the IV/counter/
nonce, if needed by the encryption algorithm and/or cipher
mode. Optional, defaults to None.
"""
pass
@abc.abstractmethod
def mac(self, data, uid, algorithm):
"""

View File

@ -649,6 +649,142 @@ class ProxyKmipClient(api.KmipClient):
message = result.result_message.value
raise exceptions.KmipOperationFailure(status, reason, message)
def encrypt(self, data, uid=None, cryptographic_parameters=None,
iv_counter_nonce=None):
"""
Encrypt data using the specified encryption key and parameters.
Args:
data (bytes): The bytes to encrypt. Required.
uid (string): The unique ID of the encryption key to use.
Optional, defaults to None.
cryptographic_parameters (dict): A dictionary containing various
cryptographic settings to be used for the encryption.
Optional, defaults to None.
iv_counter_nonce (bytes): The bytes to use for the IV/counter/
nonce, if needed by the encryption algorithm and/or cipher
mode. Optional, defaults to None.
Returns:
bytes: The encrypted data.
bytes: The IV/counter/nonce used with the encryption algorithm,
only if it was autogenerated by the server.
Raises:
ClientConnectionNotOpen: if the client connection is unusable
KmipOperationFailure: if the operation result is a failure
TypeError: if the input arguments are invalid
Notes:
The cryptographic_parameters argument is a dictionary that can
contain the following key/value pairs:
Keys | Value
------------------------------|-----------------------------------
'block_cipher_mode' | A BlockCipherMode enumeration
| indicating the cipher mode to use
| with the encryption algorithm.
'padding_method' | A PaddingMethod enumeration
| indicating which padding method to
| use with the encryption algorithm.
'hashing_algorithm' | A HashingAlgorithm enumeration
| indicating which hashing algorithm
| to use.
'key_role_type' | A KeyRoleType enumeration
| indicating the intended use of the
| associated cryptographic key.
'digital_signature_algorithm' | A DigitalSignatureAlgorithm
| enumeration indicating which
| digital signature algorithm to
| use.
'cryptographic_algorithm' | A CryptographicAlgorithm
| enumeration indicating which
| encryption algorithm to use.
'random_iv' | A boolean indicating whether the
| server should autogenerate an IV.
'iv_length' | An integer representing the length
| of the initialization vector (IV)
| in bits.
'tag_length' | An integer representing the length
| of the authenticator tag in bytes.
'fixed_field_length' | An integer representing the length
| of the fixed field portion of the
| IV in bits.
'invocation_field_length' | An integer representing the length
| of the invocation field portion of
| the IV in bits.
'counter_length' | An integer representing the length
| of the coutner portion of the IV
| in bits.
'initial_counter_value' | An integer representing the
| starting counter value for CTR
| mode (typically 1).
"""
# Check input
if not isinstance(data, six.binary_type):
raise TypeError("data must be bytes")
if uid is not None:
if not isinstance(uid, six.string_types):
raise TypeError("uid must be a string")
if cryptographic_parameters is not None:
if not isinstance(cryptographic_parameters, dict):
raise TypeError("cryptographic_parameters must be a dict")
if iv_counter_nonce is not None:
if not isinstance(iv_counter_nonce, six.binary_type):
raise TypeError("iv_counter_nonce must be bytes")
# Verify that operations can be given at this time
if not self._is_open:
raise exceptions.ClientConnectionNotOpen()
cryptographic_parameters = CryptographicParameters(
block_cipher_mode=cryptographic_parameters.get(
'block_cipher_mode'
),
padding_method=cryptographic_parameters.get('padding_method'),
hashing_algorithm=cryptographic_parameters.get(
'hashing_algorithm'
),
key_role_type=cryptographic_parameters.get('key_role_type'),
digital_signature_algorithm=cryptographic_parameters.get(
'digital_signature_algorithm'
),
cryptographic_algorithm=cryptographic_parameters.get(
'cryptographic_algorithm'
),
random_iv=cryptographic_parameters.get('random_iv'),
iv_length=cryptographic_parameters.get('iv_length'),
tag_length=cryptographic_parameters.get('tag_length'),
fixed_field_length=cryptographic_parameters.get(
'fixed_field_length'
),
invocation_field_length=cryptographic_parameters.get(
'invocation_field_length'
),
counter_length=cryptographic_parameters.get('counter_length'),
initial_counter_value=cryptographic_parameters.get(
'initial_counter_value'
)
)
# Encrypt the provided data and handle the results
result = self.proxy.encrypt(
data,
uid,
cryptographic_parameters,
iv_counter_nonce
)
status = result.get('result_status')
if status == enums.ResultStatus.SUCCESS:
return result.get('data'), result.get('iv_counter_nonce')
else:
raise exceptions.KmipOperationFailure(
status,
result.get('result_reason'),
result.get('result_message')
)
def mac(self, data, uid=None, algorithm=None):
"""
Get the message authentication code for data.

View File

@ -53,6 +53,7 @@ from kmip.core.messages.payloads import create
from kmip.core.messages.payloads import create_key_pair
from kmip.core.messages.payloads import destroy
from kmip.core.messages.payloads import discover_versions
from kmip.core.messages.payloads import encrypt
from kmip.core.messages.payloads import get
from kmip.core.messages.payloads import get_attributes
from kmip.core.messages.payloads import get_attribute_list
@ -433,6 +434,78 @@ class KMIPProxy(KMIP):
results = self._process_batch_items(response)
return results[0]
def encrypt(self,
data,
unique_identifier=None,
cryptographic_parameters=None,
iv_counter_nonce=None,
credential=None):
"""
Encrypt data using the specified encryption key and parameters.
Args:
data (bytes): The bytes to encrypt. Required.
unique_identifier (string): The unique ID of the encryption key
to use. Optional, defaults to None.
cryptographic_parameters (CryptographicParameters): A structure
containing various cryptographic settings to be used for the
encryption. Optional, defaults to None.
iv_counter_nonce (bytes): The bytes to use for the IV/counter/
nonce, if needed by the encryption algorithm and/or cipher
mode. Optional, defaults to None.
credential (Credential): A credential object containing a set of
authorization parameters for the operation. Optional, defaults
to None.
Returns:
dict: The results of the encrypt operation, containing the
following key/value pairs:
Key | Value
--------------------|-----------------------------------------
'unique_identifier' | (string) The unique ID of the encryption
| key used to encrypt the data.
'data' | (bytes) The encrypted data.
'iv_counter_nonce' | (bytes) The IV/counter/nonce used for
| the encryption, if autogenerated.
'result_status' | (ResultStatus) An enumeration indicating
| the status of the operation result.
'result_reason' | (ResultReason) An enumeration providing
| context for the result status.
'result_message' | (string) A message providing additional
| context for the operation result.
"""
operation = Operation(OperationEnum.ENCRYPT)
request_payload = encrypt.EncryptRequestPayload(
unique_identifier=unique_identifier,
data=data,
cryptographic_parameters=cryptographic_parameters,
iv_counter_nonce=iv_counter_nonce
)
batch_item = messages.RequestBatchItem(
operation=operation,
request_payload=request_payload
)
request = self._build_request_message(credential, [batch_item])
response = self._send_and_receive_message(request)
batch_item = response.batch_items[0]
payload = batch_item.response_payload
result = {}
if payload:
result['unique_identifier'] = payload.unique_identifier
result['data'] = payload.data
result['iv_counter_nonce'] = payload.iv_counter_nonce
result['result_status'] = batch_item.result_status
result['result_reason'] = batch_item.result_reason
result['result_message'] = batch_item.result_message
return result
def mac(self, data, unique_identifier=None,
cryptographic_parameters=None, credential=None):
return self._mac(

View File

@ -59,6 +59,18 @@ class DummyKmipClient(api.KmipClient):
def destroy(self, uid):
super(DummyKmipClient, self).destroy(uid)
def encrypt(self,
data,
uid=None,
cryptographic_parameters=None,
iv_counter_nonce=None):
super(DummyKmipClient, self).encrypt(
data,
uid,
cryptographic_parameters,
iv_counter_nonce
)
def mac(self, data, uid, algorithm):
super(DummyKmipClient, self).mac(data, uid, algorithm)
@ -147,6 +159,14 @@ class TestKmipClient(testtools.TestCase):
dummy = DummyKmipClient()
dummy.destroy('uid')
def test_encrypt(self):
"""
Test that the encrypt method can be called without error.
:return:
"""
dummy = DummyKmipClient()
dummy.encrypt('data', 'uid', 'crypto_params', 'iv')
def test_mac(self):
"""
Test that the mac method can be called without error.

View File

@ -1318,6 +1318,175 @@ class TestProxyKmipClient(testtools.TestCase):
self.assertEqual(opn.attribute_name.value, 'Operation Policy Name')
self.assertEqual(opn.attribute_value.value, 'test')
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_encrypt(self):
"""
Test that the client can encrypt data.
"""
result = {
'data': (
b'\x6B\x77\xB4\xD6\x30\x06\xDE\xE6'
b'\x05\xB1\x56\xE2\x74\x03\x97\x93'
b'\x58\xDE\xB9\xE7\x15\x46\x16\xD9'
b'\x74\x9D\xEC\xBE\xC0\x5D\x26\x4B'
),
'iv_counter_nonce': None,
'result_status': enums.ResultStatus.SUCCESS
}
client = ProxyKmipClient()
client.open()
client.proxy.encrypt.return_value = result
encrypted_data, iv_counter_nonce = client.encrypt(
(
b'\x37\x36\x35\x34\x33\x32\x31\x20'
b'\x4E\x6F\x77\x20\x69\x73\x20\x74'
b'\x68\x65\x20\x74\x69\x6D\x65\x20'
b'\x66\x6F\x72\x20\x00'
),
uid='1',
cryptographic_parameters={
'block_cipher_mode': enums.BlockCipherMode.CBC,
'padding_method': enums.PaddingMethod.PKCS5,
'cryptographic_algorithm':
enums.CryptographicAlgorithm.BLOWFISH
},
iv_counter_nonce=b'\xFE\xDC\xBA\x98\x76\x54\x32\x10'
)
self.assertEqual(result.get('data'), encrypted_data)
self.assertEqual(result.get('iv_counter_nonce'), iv_counter_nonce)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_encrypt_on_invalid_inputs(self):
"""
Test that TypeError exception are raised when trying to encrypt with
invalid parameters.
"""
client = ProxyKmipClient()
client.open()
client.proxy.encrypt.return_value = {}
args = [None]
kwargs = {
'uid': '1',
'cryptographic_parameters': {},
'iv_counter_nonce': b'\x00\x00\x00\x00'
}
self.assertRaisesRegexp(
TypeError,
"data must be bytes",
client.encrypt,
*args,
**kwargs
)
args = [b'\x01\x02\x03\x04']
kwargs = {
'uid': 1,
'cryptographic_parameters': {},
'iv_counter_nonce': b'\x00\x00\x00\x00'
}
self.assertRaisesRegexp(
TypeError,
"uid must be a string",
client.encrypt,
*args,
**kwargs
)
args = [b'\x01\x02\x03\x04']
kwargs = {
'uid': '1',
'cryptographic_parameters': 'invalid',
'iv_counter_nonce': b'\x00\x00\x00\x00'
}
self.assertRaisesRegexp(
TypeError,
"cryptographic_parameters must be a dict",
client.encrypt,
*args,
**kwargs
)
args = [b'\x01\x02\x03\x04']
kwargs = {
'uid': '1',
'cryptographic_parameters': {},
'iv_counter_nonce': {}
}
self.assertRaisesRegexp(
TypeError,
"iv_counter_nonce must be bytes",
client.encrypt,
*args,
**kwargs
)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_encrypt_on_closed(self):
"""
Test that a ClientConnectionNotOpen exception is raised when trying
to encrypt data on an unopened client connection.
"""
client = ProxyKmipClient()
args = [b'\x01\x02\x03\x04']
kwargs = {
'uid': '1',
'cryptographic_parameters': {},
'iv_counter_nonce': b'\x00\x00\x00\x00'
}
self.assertRaises(
ClientConnectionNotOpen,
client.encrypt,
*args,
**kwargs
)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_encrypt_on_operation_failure(self):
"""
Test that a KmipOperationFailure exception is raised when the
backend fails to encrypt data.
"""
status = enums.ResultStatus.OPERATION_FAILED
reason = enums.ResultReason.GENERAL_FAILURE
message = "Test failure message"
result = {
'result_status': status,
'result_reason': reason,
'result_message': message
}
error_message = str(KmipOperationFailure(status, reason, message))
client = ProxyKmipClient()
client.open()
client.proxy.encrypt.return_value = result
args = [b'\x01\x02\x03\x04']
kwargs = {
'uid': '1',
'cryptographic_parameters': {},
'iv_counter_nonce': b'\x00\x00\x00\x00'
}
self.assertRaisesRegexp(
KmipOperationFailure,
error_message,
client.encrypt,
*args,
**kwargs
)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_mac(self):

View File

@ -19,6 +19,7 @@ from kmip.core.attributes import PrivateKeyUniqueIdentifier
from kmip.core.attributes import CryptographicParameters
from kmip.core import enums
from kmip.core.enums import AuthenticationSuite
from kmip.core.enums import ConformanceClause
from kmip.core.enums import CredentialType
@ -45,6 +46,7 @@ from kmip.core.messages.payloads.create_key_pair import \
CreateKeyPairRequestPayload, CreateKeyPairResponsePayload
from kmip.core.messages.payloads.discover_versions import \
DiscoverVersionsRequestPayload, DiscoverVersionsResponsePayload
from kmip.core.messages.payloads import encrypt
from kmip.core.messages.payloads import get_attributes
from kmip.core.messages.payloads import get_attribute_list
from kmip.core.messages.payloads.query import \
@ -718,6 +720,69 @@ class TestKMIPClient(TestCase):
self.client._create_socket(sock)
self.assertEqual(ssl.SSLSocket, type(self.client.socket))
@mock.patch(
'kmip.services.kmip_client.KMIPProxy._build_request_message'
)
@mock.patch(
'kmip.services.kmip_client.KMIPProxy._send_and_receive_message'
)
def test_encrypt(self, send_mock, build_mock):
"""
Test that the client can encrypt data.
"""
payload = encrypt.EncryptResponsePayload(
unique_identifier='1',
data=(
b'\x6B\x77\xB4\xD6\x30\x06\xDE\xE6'
b'\x05\xB1\x56\xE2\x74\x03\x97\x93'
b'\x58\xDE\xB9\xE7\x15\x46\x16\xD9'
b'\x74\x9D\xEC\xBE\xC0\x5D\x26\x4B'
)
)
batch_item = ResponseBatchItem(
operation=Operation(OperationEnum.ENCRYPT),
result_status=ResultStatus(ResultStatusEnum.SUCCESS),
response_payload=payload
)
response = ResponseMessage(batch_items=[batch_item])
build_mock.return_value = None
send_mock.return_value = response
result = self.client.encrypt(
(
b'\x37\x36\x35\x34\x33\x32\x31\x20'
b'\x4E\x6F\x77\x20\x69\x73\x20\x74'
b'\x68\x65\x20\x74\x69\x6D\x65\x20'
b'\x66\x6F\x72\x20\x00'
),
unique_identifier='1',
cryptographic_parameters=CryptographicParameters(
block_cipher_mode=enums.BlockCipherMode.CBC,
padding_method=enums.PaddingMethod.PKCS5,
cryptographic_algorithm=enums.CryptographicAlgorithm.BLOWFISH
),
iv_counter_nonce=b'\xFE\xDC\xBA\x98\x76\x54\x32\x10'
)
self.assertEqual('1', result.get('unique_identifier'))
self.assertEqual(
(
b'\x6B\x77\xB4\xD6\x30\x06\xDE\xE6'
b'\x05\xB1\x56\xE2\x74\x03\x97\x93'
b'\x58\xDE\xB9\xE7\x15\x46\x16\xD9'
b'\x74\x9D\xEC\xBE\xC0\x5D\x26\x4B'
),
result.get('data')
)
self.assertEqual(None, result.get('iv_counter_nonce'))
self.assertEqual(
ResultStatusEnum.SUCCESS,
result.get('result_status').value
)
self.assertEqual(None, result.get('result_reason'))
self.assertEqual(None, result.get('result_message'))
@mock.patch('kmip.services.kmip_client.KMIPProxy._send_message',
mock.MagicMock())
@mock.patch('kmip.services.kmip_client.KMIPProxy._receive_message',