mirror of https://github.com/OpenKMIP/PyKMIP.git
Merge pull request #297 from OpenKMIP/feat/add-decrypt-to-clients
Add Decrypt support to the clients
This commit is contained in:
commit
278a54320c
|
@ -158,6 +158,25 @@ class KmipClient:
|
||||||
"""
|
"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def decrypt(self, data, uid=None, cryptographic_parameters=None,
|
||||||
|
iv_counter_nonce=None):
|
||||||
|
"""
|
||||||
|
Decrypt data using the specified decryption key and parameters.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data (bytes): The bytes to decrypt. Required.
|
||||||
|
uid (string): The unique ID of the decryption key to use.
|
||||||
|
Optional, defaults to None.
|
||||||
|
cryptographic_parameters (dict): A dictionary containing various
|
||||||
|
cryptographic settings to be used for the decryption.
|
||||||
|
Optional, defaults to None.
|
||||||
|
iv_counter_nonce (bytes): The bytes to use for the IV/counter/
|
||||||
|
nonce, if needed by the decryption algorithm and/or cipher
|
||||||
|
mode. Optional, defaults to None.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def mac(self, data, uid, algorithm):
|
def mac(self, data, uid, algorithm):
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -785,6 +785,140 @@ class ProxyKmipClient(api.KmipClient):
|
||||||
result.get('result_message')
|
result.get('result_message')
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def decrypt(self, data, uid=None, cryptographic_parameters=None,
|
||||||
|
iv_counter_nonce=None):
|
||||||
|
"""
|
||||||
|
Decrypt data using the specified decryption key and parameters.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data (bytes): The bytes to decrypt. Required.
|
||||||
|
uid (string): The unique ID of the decryption key to use.
|
||||||
|
Optional, defaults to None.
|
||||||
|
cryptographic_parameters (dict): A dictionary containing various
|
||||||
|
cryptographic settings to be used for the decryption.
|
||||||
|
Optional, defaults to None.
|
||||||
|
iv_counter_nonce (bytes): The bytes to use for the IV/counter/
|
||||||
|
nonce, if needed by the decryption algorithm and/or cipher
|
||||||
|
mode. Optional, defaults to None.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bytes: The decrypted data.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ClientConnectionNotOpen: if the client connection is unusable
|
||||||
|
KmipOperationFailure: if the operation result is a failure
|
||||||
|
TypeError: if the input arguments are invalid
|
||||||
|
|
||||||
|
Notes:
|
||||||
|
The cryptographic_parameters argument is a dictionary that can
|
||||||
|
contain the following key/value pairs:
|
||||||
|
|
||||||
|
Keys | Value
|
||||||
|
------------------------------|-----------------------------------
|
||||||
|
'block_cipher_mode' | A BlockCipherMode enumeration
|
||||||
|
| indicating the cipher mode to use
|
||||||
|
| with the decryption algorithm.
|
||||||
|
'padding_method' | A PaddingMethod enumeration
|
||||||
|
| indicating which padding method to
|
||||||
|
| use with the decryption algorithm.
|
||||||
|
'hashing_algorithm' | A HashingAlgorithm enumeration
|
||||||
|
| indicating which hashing algorithm
|
||||||
|
| to use.
|
||||||
|
'key_role_type' | A KeyRoleType enumeration
|
||||||
|
| indicating the intended use of the
|
||||||
|
| associated cryptographic key.
|
||||||
|
'digital_signature_algorithm' | A DigitalSignatureAlgorithm
|
||||||
|
| enumeration indicating which
|
||||||
|
| digital signature algorithm to
|
||||||
|
| use.
|
||||||
|
'cryptographic_algorithm' | A CryptographicAlgorithm
|
||||||
|
| enumeration indicating which
|
||||||
|
| decryption algorithm to use.
|
||||||
|
'random_iv' | A boolean indicating whether the
|
||||||
|
| server should autogenerate an IV.
|
||||||
|
'iv_length' | An integer representing the length
|
||||||
|
| of the initialization vector (IV)
|
||||||
|
| in bits.
|
||||||
|
'tag_length' | An integer representing the length
|
||||||
|
| of the authenticator tag in bytes.
|
||||||
|
'fixed_field_length' | An integer representing the length
|
||||||
|
| of the fixed field portion of the
|
||||||
|
| IV in bits.
|
||||||
|
'invocation_field_length' | An integer representing the length
|
||||||
|
| of the invocation field portion of
|
||||||
|
| the IV in bits.
|
||||||
|
'counter_length' | An integer representing the length
|
||||||
|
| of the counter portion of the IV
|
||||||
|
| in bits.
|
||||||
|
'initial_counter_value' | An integer representing the
|
||||||
|
| starting counter value for CTR
|
||||||
|
| mode (typically 1).
|
||||||
|
"""
|
||||||
|
# Check input
|
||||||
|
if not isinstance(data, six.binary_type):
|
||||||
|
raise TypeError("data must be bytes")
|
||||||
|
if uid is not None:
|
||||||
|
if not isinstance(uid, six.string_types):
|
||||||
|
raise TypeError("uid must be a string")
|
||||||
|
if cryptographic_parameters is not None:
|
||||||
|
if not isinstance(cryptographic_parameters, dict):
|
||||||
|
raise TypeError("cryptographic_parameters must be a dict")
|
||||||
|
if iv_counter_nonce is not None:
|
||||||
|
if not isinstance(iv_counter_nonce, six.binary_type):
|
||||||
|
raise TypeError("iv_counter_nonce must be bytes")
|
||||||
|
|
||||||
|
# Verify that operations can be given at this time
|
||||||
|
if not self._is_open:
|
||||||
|
raise exceptions.ClientConnectionNotOpen()
|
||||||
|
|
||||||
|
cryptographic_parameters = CryptographicParameters(
|
||||||
|
block_cipher_mode=cryptographic_parameters.get(
|
||||||
|
'block_cipher_mode'
|
||||||
|
),
|
||||||
|
padding_method=cryptographic_parameters.get('padding_method'),
|
||||||
|
hashing_algorithm=cryptographic_parameters.get(
|
||||||
|
'hashing_algorithm'
|
||||||
|
),
|
||||||
|
key_role_type=cryptographic_parameters.get('key_role_type'),
|
||||||
|
digital_signature_algorithm=cryptographic_parameters.get(
|
||||||
|
'digital_signature_algorithm'
|
||||||
|
),
|
||||||
|
cryptographic_algorithm=cryptographic_parameters.get(
|
||||||
|
'cryptographic_algorithm'
|
||||||
|
),
|
||||||
|
random_iv=cryptographic_parameters.get('random_iv'),
|
||||||
|
iv_length=cryptographic_parameters.get('iv_length'),
|
||||||
|
tag_length=cryptographic_parameters.get('tag_length'),
|
||||||
|
fixed_field_length=cryptographic_parameters.get(
|
||||||
|
'fixed_field_length'
|
||||||
|
),
|
||||||
|
invocation_field_length=cryptographic_parameters.get(
|
||||||
|
'invocation_field_length'
|
||||||
|
),
|
||||||
|
counter_length=cryptographic_parameters.get('counter_length'),
|
||||||
|
initial_counter_value=cryptographic_parameters.get(
|
||||||
|
'initial_counter_value'
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Decrypt the provided data and handle the results
|
||||||
|
result = self.proxy.decrypt(
|
||||||
|
data,
|
||||||
|
uid,
|
||||||
|
cryptographic_parameters,
|
||||||
|
iv_counter_nonce
|
||||||
|
)
|
||||||
|
|
||||||
|
status = result.get('result_status')
|
||||||
|
if status == enums.ResultStatus.SUCCESS:
|
||||||
|
return result.get('data')
|
||||||
|
else:
|
||||||
|
raise exceptions.KmipOperationFailure(
|
||||||
|
status,
|
||||||
|
result.get('result_reason'),
|
||||||
|
result.get('result_message')
|
||||||
|
)
|
||||||
|
|
||||||
def mac(self, data, uid=None, algorithm=None):
|
def mac(self, data, uid=None, algorithm=None):
|
||||||
"""
|
"""
|
||||||
Get the message authentication code for data.
|
Get the message authentication code for data.
|
||||||
|
|
|
@ -51,6 +51,7 @@ from kmip.core.messages import messages
|
||||||
from kmip.core.messages.payloads import activate
|
from kmip.core.messages.payloads import activate
|
||||||
from kmip.core.messages.payloads import create
|
from kmip.core.messages.payloads import create
|
||||||
from kmip.core.messages.payloads import create_key_pair
|
from kmip.core.messages.payloads import create_key_pair
|
||||||
|
from kmip.core.messages.payloads import decrypt
|
||||||
from kmip.core.messages.payloads import destroy
|
from kmip.core.messages.payloads import destroy
|
||||||
from kmip.core.messages.payloads import discover_versions
|
from kmip.core.messages.payloads import discover_versions
|
||||||
from kmip.core.messages.payloads import encrypt
|
from kmip.core.messages.payloads import encrypt
|
||||||
|
@ -506,6 +507,75 @@ class KMIPProxy(KMIP):
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
def decrypt(self,
|
||||||
|
data,
|
||||||
|
unique_identifier=None,
|
||||||
|
cryptographic_parameters=None,
|
||||||
|
iv_counter_nonce=None,
|
||||||
|
credential=None):
|
||||||
|
"""
|
||||||
|
Decrypt data using the specified decryption key and parameters.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data (bytes): The bytes to decrypt. Required.
|
||||||
|
unique_identifier (string): The unique ID of the decryption key
|
||||||
|
to use. Optional, defaults to None.
|
||||||
|
cryptographic_parameters (CryptographicParameters): A structure
|
||||||
|
containing various cryptographic settings to be used for the
|
||||||
|
decryption. Optional, defaults to None.
|
||||||
|
iv_counter_nonce (bytes): The bytes to use for the IV/counter/
|
||||||
|
nonce, if needed by the decryption algorithm and/or cipher
|
||||||
|
mode. Optional, defaults to None.
|
||||||
|
credential (Credential): A credential object containing a set of
|
||||||
|
authorization parameters for the operation. Optional, defaults
|
||||||
|
to None.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict: The results of the decrypt operation, containing the
|
||||||
|
following key/value pairs:
|
||||||
|
|
||||||
|
Key | Value
|
||||||
|
--------------------|-----------------------------------------
|
||||||
|
'unique_identifier' | (string) The unique ID of the decryption
|
||||||
|
| key used to decrypt the data.
|
||||||
|
'data' | (bytes) The decrypted data.
|
||||||
|
'result_status' | (ResultStatus) An enumeration indicating
|
||||||
|
| the status of the operation result.
|
||||||
|
'result_reason' | (ResultReason) An enumeration providing
|
||||||
|
| context for the result status.
|
||||||
|
'result_message' | (string) A message providing additional
|
||||||
|
| context for the operation result.
|
||||||
|
"""
|
||||||
|
operation = Operation(OperationEnum.DECRYPT)
|
||||||
|
|
||||||
|
request_payload = decrypt.DecryptRequestPayload(
|
||||||
|
unique_identifier=unique_identifier,
|
||||||
|
data=data,
|
||||||
|
cryptographic_parameters=cryptographic_parameters,
|
||||||
|
iv_counter_nonce=iv_counter_nonce
|
||||||
|
)
|
||||||
|
batch_item = messages.RequestBatchItem(
|
||||||
|
operation=operation,
|
||||||
|
request_payload=request_payload
|
||||||
|
)
|
||||||
|
|
||||||
|
request = self._build_request_message(credential, [batch_item])
|
||||||
|
response = self._send_and_receive_message(request)
|
||||||
|
batch_item = response.batch_items[0]
|
||||||
|
payload = batch_item.response_payload
|
||||||
|
|
||||||
|
result = {}
|
||||||
|
|
||||||
|
if payload:
|
||||||
|
result['unique_identifier'] = payload.unique_identifier
|
||||||
|
result['data'] = payload.data
|
||||||
|
|
||||||
|
result['result_status'] = batch_item.result_status
|
||||||
|
result['result_reason'] = batch_item.result_reason
|
||||||
|
result['result_message'] = batch_item.result_message
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
def mac(self, data, unique_identifier=None,
|
def mac(self, data, unique_identifier=None,
|
||||||
cryptographic_parameters=None, credential=None):
|
cryptographic_parameters=None, credential=None):
|
||||||
return self._mac(
|
return self._mac(
|
||||||
|
|
|
@ -71,6 +71,18 @@ class DummyKmipClient(api.KmipClient):
|
||||||
iv_counter_nonce
|
iv_counter_nonce
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def decrypt(self,
|
||||||
|
data,
|
||||||
|
uid=None,
|
||||||
|
cryptographic_parameters=None,
|
||||||
|
iv_counter_nonce=None):
|
||||||
|
super(DummyKmipClient, self).decrypt(
|
||||||
|
data,
|
||||||
|
uid,
|
||||||
|
cryptographic_parameters,
|
||||||
|
iv_counter_nonce
|
||||||
|
)
|
||||||
|
|
||||||
def mac(self, data, uid, algorithm):
|
def mac(self, data, uid, algorithm):
|
||||||
super(DummyKmipClient, self).mac(data, uid, algorithm)
|
super(DummyKmipClient, self).mac(data, uid, algorithm)
|
||||||
|
|
||||||
|
@ -162,11 +174,17 @@ class TestKmipClient(testtools.TestCase):
|
||||||
def test_encrypt(self):
|
def test_encrypt(self):
|
||||||
"""
|
"""
|
||||||
Test that the encrypt method can be called without error.
|
Test that the encrypt method can be called without error.
|
||||||
:return:
|
|
||||||
"""
|
"""
|
||||||
dummy = DummyKmipClient()
|
dummy = DummyKmipClient()
|
||||||
dummy.encrypt('data', 'uid', 'crypto_params', 'iv')
|
dummy.encrypt('data', 'uid', 'crypto_params', 'iv')
|
||||||
|
|
||||||
|
def test_decrypt(self):
|
||||||
|
"""
|
||||||
|
Test that the decrypt method can be called without error.
|
||||||
|
"""
|
||||||
|
dummy = DummyKmipClient()
|
||||||
|
dummy.decrypt('data', 'uid', 'crypto_params', 'iv')
|
||||||
|
|
||||||
def test_mac(self):
|
def test_mac(self):
|
||||||
"""
|
"""
|
||||||
Test that the mac method can be called without error.
|
Test that the mac method can be called without error.
|
||||||
|
|
|
@ -1487,6 +1487,173 @@ class TestProxyKmipClient(testtools.TestCase):
|
||||||
**kwargs
|
**kwargs
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_decrypt(self):
|
||||||
|
"""
|
||||||
|
Test that the client can decrypt data.
|
||||||
|
"""
|
||||||
|
result = {
|
||||||
|
'data': (
|
||||||
|
b'\x37\x36\x35\x34\x33\x32\x31\x20'
|
||||||
|
b'\x4E\x6F\x77\x20\x69\x73\x20\x74'
|
||||||
|
b'\x68\x65\x20\x74\x69\x6D\x65\x20'
|
||||||
|
b'\x66\x6F\x72\x20\x00'
|
||||||
|
),
|
||||||
|
'result_status': enums.ResultStatus.SUCCESS
|
||||||
|
}
|
||||||
|
|
||||||
|
client = ProxyKmipClient()
|
||||||
|
client.open()
|
||||||
|
client.proxy.decrypt.return_value = result
|
||||||
|
|
||||||
|
decrypted_data = client.decrypt(
|
||||||
|
(
|
||||||
|
b'\x6B\x77\xB4\xD6\x30\x06\xDE\xE6'
|
||||||
|
b'\x05\xB1\x56\xE2\x74\x03\x97\x93'
|
||||||
|
b'\x58\xDE\xB9\xE7\x15\x46\x16\xD9'
|
||||||
|
b'\x74\x9D\xEC\xBE\xC0\x5D\x26\x4B'
|
||||||
|
),
|
||||||
|
uid='1',
|
||||||
|
cryptographic_parameters={
|
||||||
|
'block_cipher_mode': enums.BlockCipherMode.CBC,
|
||||||
|
'padding_method': enums.PaddingMethod.PKCS5,
|
||||||
|
'cryptographic_algorithm':
|
||||||
|
enums.CryptographicAlgorithm.BLOWFISH
|
||||||
|
},
|
||||||
|
iv_counter_nonce=b'\xFE\xDC\xBA\x98\x76\x54\x32\x10'
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(result.get('data'), decrypted_data)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_decrypt_on_invalid_inputs(self):
|
||||||
|
"""
|
||||||
|
Test that TypeError exception are raised when trying to decrypt with
|
||||||
|
invalid parameters.
|
||||||
|
"""
|
||||||
|
client = ProxyKmipClient()
|
||||||
|
client.open()
|
||||||
|
client.proxy.decrypt.return_value = {}
|
||||||
|
args = [None]
|
||||||
|
kwargs = {
|
||||||
|
'uid': '1',
|
||||||
|
'cryptographic_parameters': {},
|
||||||
|
'iv_counter_nonce': b'\x00\x00\x00\x00'
|
||||||
|
}
|
||||||
|
|
||||||
|
self.assertRaisesRegexp(
|
||||||
|
TypeError,
|
||||||
|
"data must be bytes",
|
||||||
|
client.decrypt,
|
||||||
|
*args,
|
||||||
|
**kwargs
|
||||||
|
)
|
||||||
|
|
||||||
|
args = [b'\x01\x02\x03\x04']
|
||||||
|
kwargs = {
|
||||||
|
'uid': 1,
|
||||||
|
'cryptographic_parameters': {},
|
||||||
|
'iv_counter_nonce': b'\x00\x00\x00\x00'
|
||||||
|
}
|
||||||
|
|
||||||
|
self.assertRaisesRegexp(
|
||||||
|
TypeError,
|
||||||
|
"uid must be a string",
|
||||||
|
client.decrypt,
|
||||||
|
*args,
|
||||||
|
**kwargs
|
||||||
|
)
|
||||||
|
|
||||||
|
args = [b'\x01\x02\x03\x04']
|
||||||
|
kwargs = {
|
||||||
|
'uid': '1',
|
||||||
|
'cryptographic_parameters': 'invalid',
|
||||||
|
'iv_counter_nonce': b'\x00\x00\x00\x00'
|
||||||
|
}
|
||||||
|
|
||||||
|
self.assertRaisesRegexp(
|
||||||
|
TypeError,
|
||||||
|
"cryptographic_parameters must be a dict",
|
||||||
|
client.decrypt,
|
||||||
|
*args,
|
||||||
|
**kwargs
|
||||||
|
)
|
||||||
|
|
||||||
|
args = [b'\x01\x02\x03\x04']
|
||||||
|
kwargs = {
|
||||||
|
'uid': '1',
|
||||||
|
'cryptographic_parameters': {},
|
||||||
|
'iv_counter_nonce': {}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.assertRaisesRegexp(
|
||||||
|
TypeError,
|
||||||
|
"iv_counter_nonce must be bytes",
|
||||||
|
client.decrypt,
|
||||||
|
*args,
|
||||||
|
**kwargs
|
||||||
|
)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_decrypt_on_closed(self):
|
||||||
|
"""
|
||||||
|
Test that a ClientConnectionNotOpen exception is raised when trying
|
||||||
|
to decrypt data on an unopened client connection.
|
||||||
|
"""
|
||||||
|
client = ProxyKmipClient()
|
||||||
|
args = [b'\x01\x02\x03\x04']
|
||||||
|
kwargs = {
|
||||||
|
'uid': '1',
|
||||||
|
'cryptographic_parameters': {},
|
||||||
|
'iv_counter_nonce': b'\x00\x00\x00\x00'
|
||||||
|
}
|
||||||
|
|
||||||
|
self.assertRaises(
|
||||||
|
ClientConnectionNotOpen,
|
||||||
|
client.decrypt,
|
||||||
|
*args,
|
||||||
|
**kwargs
|
||||||
|
)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_decrypt_on_operation_failure(self):
|
||||||
|
"""
|
||||||
|
Test that a KmipOperationFailure exception is raised when the
|
||||||
|
backend fails to decrypt data.
|
||||||
|
"""
|
||||||
|
status = enums.ResultStatus.OPERATION_FAILED
|
||||||
|
reason = enums.ResultReason.GENERAL_FAILURE
|
||||||
|
message = "Test failure message"
|
||||||
|
|
||||||
|
result = {
|
||||||
|
'result_status': status,
|
||||||
|
'result_reason': reason,
|
||||||
|
'result_message': message
|
||||||
|
}
|
||||||
|
error_message = str(KmipOperationFailure(status, reason, message))
|
||||||
|
|
||||||
|
client = ProxyKmipClient()
|
||||||
|
client.open()
|
||||||
|
client.proxy.decrypt.return_value = result
|
||||||
|
args = [b'\x01\x02\x03\x04']
|
||||||
|
kwargs = {
|
||||||
|
'uid': '1',
|
||||||
|
'cryptographic_parameters': {},
|
||||||
|
'iv_counter_nonce': b'\x00\x00\x00\x00'
|
||||||
|
}
|
||||||
|
|
||||||
|
self.assertRaisesRegexp(
|
||||||
|
KmipOperationFailure,
|
||||||
|
error_message,
|
||||||
|
client.decrypt,
|
||||||
|
*args,
|
||||||
|
**kwargs
|
||||||
|
)
|
||||||
|
|
||||||
@mock.patch('kmip.pie.client.KMIPProxy',
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
mock.MagicMock(spec_set=KMIPProxy))
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
def test_mac(self):
|
def test_mac(self):
|
||||||
|
|
|
@ -44,6 +44,7 @@ from kmip.core.messages.contents import ResultMessage
|
||||||
from kmip.core.messages.contents import ProtocolVersion
|
from kmip.core.messages.contents import ProtocolVersion
|
||||||
from kmip.core.messages.payloads.create_key_pair import \
|
from kmip.core.messages.payloads.create_key_pair import \
|
||||||
CreateKeyPairRequestPayload, CreateKeyPairResponsePayload
|
CreateKeyPairRequestPayload, CreateKeyPairResponsePayload
|
||||||
|
from kmip.core.messages.payloads import decrypt
|
||||||
from kmip.core.messages.payloads.discover_versions import \
|
from kmip.core.messages.payloads.discover_versions import \
|
||||||
DiscoverVersionsRequestPayload, DiscoverVersionsResponsePayload
|
DiscoverVersionsRequestPayload, DiscoverVersionsResponsePayload
|
||||||
from kmip.core.messages.payloads import encrypt
|
from kmip.core.messages.payloads import encrypt
|
||||||
|
@ -783,6 +784,68 @@ class TestKMIPClient(TestCase):
|
||||||
self.assertEqual(None, result.get('result_reason'))
|
self.assertEqual(None, result.get('result_reason'))
|
||||||
self.assertEqual(None, result.get('result_message'))
|
self.assertEqual(None, result.get('result_message'))
|
||||||
|
|
||||||
|
@mock.patch(
|
||||||
|
'kmip.services.kmip_client.KMIPProxy._build_request_message'
|
||||||
|
)
|
||||||
|
@mock.patch(
|
||||||
|
'kmip.services.kmip_client.KMIPProxy._send_and_receive_message'
|
||||||
|
)
|
||||||
|
def test_decrypt(self, send_mock, build_mock):
|
||||||
|
"""
|
||||||
|
Test that the client can decrypt data.
|
||||||
|
"""
|
||||||
|
payload = decrypt.DecryptResponsePayload(
|
||||||
|
unique_identifier='1',
|
||||||
|
data=(
|
||||||
|
b'\x37\x36\x35\x34\x33\x32\x31\x20'
|
||||||
|
b'\x4E\x6F\x77\x20\x69\x73\x20\x74'
|
||||||
|
b'\x68\x65\x20\x74\x69\x6D\x65\x20'
|
||||||
|
b'\x66\x6F\x72\x20\x00'
|
||||||
|
)
|
||||||
|
)
|
||||||
|
batch_item = ResponseBatchItem(
|
||||||
|
operation=Operation(OperationEnum.DECRYPT),
|
||||||
|
result_status=ResultStatus(ResultStatusEnum.SUCCESS),
|
||||||
|
response_payload=payload
|
||||||
|
)
|
||||||
|
response = ResponseMessage(batch_items=[batch_item])
|
||||||
|
|
||||||
|
build_mock.return_value = None
|
||||||
|
send_mock.return_value = response
|
||||||
|
|
||||||
|
result = self.client.decrypt(
|
||||||
|
(
|
||||||
|
b'\x6B\x77\xB4\xD6\x30\x06\xDE\xE6'
|
||||||
|
b'\x05\xB1\x56\xE2\x74\x03\x97\x93'
|
||||||
|
b'\x58\xDE\xB9\xE7\x15\x46\x16\xD9'
|
||||||
|
b'\x74\x9D\xEC\xBE\xC0\x5D\x26\x4B'
|
||||||
|
),
|
||||||
|
unique_identifier='1',
|
||||||
|
cryptographic_parameters=CryptographicParameters(
|
||||||
|
block_cipher_mode=enums.BlockCipherMode.CBC,
|
||||||
|
padding_method=enums.PaddingMethod.PKCS5,
|
||||||
|
cryptographic_algorithm=enums.CryptographicAlgorithm.BLOWFISH
|
||||||
|
),
|
||||||
|
iv_counter_nonce=b'\xFE\xDC\xBA\x98\x76\x54\x32\x10'
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual('1', result.get('unique_identifier'))
|
||||||
|
self.assertEqual(
|
||||||
|
(
|
||||||
|
b'\x37\x36\x35\x34\x33\x32\x31\x20'
|
||||||
|
b'\x4E\x6F\x77\x20\x69\x73\x20\x74'
|
||||||
|
b'\x68\x65\x20\x74\x69\x6D\x65\x20'
|
||||||
|
b'\x66\x6F\x72\x20\x00'
|
||||||
|
),
|
||||||
|
result.get('data')
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
ResultStatusEnum.SUCCESS,
|
||||||
|
result.get('result_status').value
|
||||||
|
)
|
||||||
|
self.assertEqual(None, result.get('result_reason'))
|
||||||
|
self.assertEqual(None, result.get('result_message'))
|
||||||
|
|
||||||
@mock.patch('kmip.services.kmip_client.KMIPProxy._send_message',
|
@mock.patch('kmip.services.kmip_client.KMIPProxy._send_message',
|
||||||
mock.MagicMock())
|
mock.MagicMock())
|
||||||
@mock.patch('kmip.services.kmip_client.KMIPProxy._receive_message',
|
@mock.patch('kmip.services.kmip_client.KMIPProxy._receive_message',
|
||||||
|
|
Loading…
Reference in New Issue