Add support for AES GCM mode.

This commit is contained in:
oleksiys 2019-08-06 17:53:51 -07:00 committed by Peter Hamilton
parent 44d55f2550
commit 26c1c71fff
7 changed files with 596 additions and 64 deletions

View File

@ -33,13 +33,22 @@ class DecryptRequestPayload(primitives.Struct):
data: The data to be decrypted in the form of a binary string.
iv_counter_nonce: An IV/counter/nonce to be used with the decryption
algorithm. Comes in the form of a binary string.
auth_additional_data: Any additional data to be authenticated via the
Authenticated Encryption Tag. Added in KMIP 1.4.
auth_tag: Specifies the tag that will be needed to
authenticate the decrypted data. Only returned on completion
of the encryption of the last of the plaintext by an
authenticated encryption cipher. Optional, defaults to None.
Added in KMIP 1.4.
"""
def __init__(self,
unique_identifier=None,
cryptographic_parameters=None,
data=None,
iv_counter_nonce=None):
iv_counter_nonce=None,
auth_additional_data=None,
auth_tag=None):
"""
Construct a Decrypt request payload struct.
@ -56,6 +65,14 @@ class DecryptRequestPayload(primitives.Struct):
encoding and decoding.
iv_counter_nonce (bytes): The IV/counter/nonce value to be used
with the decryption algorithm. Optional, defaults to None.
auth_additional_data (bytes): Any additional data to be
authenticated via the Authenticated Encryption Tag.
Added in KMIP 1.4.
auth_tag (bytes): Specifies the tag that will be needed to
authenticate the decrypted data. Only returned on completion
of the encryption of the last of the plaintext by an
authenticated encryption cipher. Optional, defaults to None.
Added in KMIP 1.4.
"""
super(DecryptRequestPayload, self).__init__(
enums.Tags.REQUEST_PAYLOAD
@ -65,11 +82,15 @@ class DecryptRequestPayload(primitives.Struct):
self._cryptographic_parameters = None
self._data = None
self._iv_counter_nonce = None
self._auth_additional_data = None
self._auth_tag = None
self.unique_identifier = unique_identifier
self.cryptographic_parameters = cryptographic_parameters
self.data = data
self.iv_counter_nonce = iv_counter_nonce
self.auth_additional_data = auth_additional_data
self.auth_tag = auth_tag
@property
def unique_identifier(self):
@ -144,6 +165,44 @@ class DecryptRequestPayload(primitives.Struct):
else:
raise TypeError("IV/counter/nonce must be bytes")
@property
def auth_additional_data(self):
if self._auth_additional_data:
return self._auth_additional_data.value
else:
return None
@auth_additional_data.setter
def auth_additional_data(self, value):
if value is None:
self._auth_additional_data = None
elif isinstance(value, six.binary_type):
self._auth_additional_data = primitives.ByteString(
value=value,
tag=enums.Tags.AUTHENTICATED_ENCRYPTION_ADDITIONAL_DATA
)
else:
raise TypeError("authenticated additional data must be bytes")
@property
def auth_tag(self):
if self._auth_tag:
return self._auth_tag.value
else:
return None
@auth_tag.setter
def auth_tag(self, value):
if value is None:
self._auth_tag = None
elif isinstance(value, six.binary_type):
self._auth_tag = primitives.ByteString(
value=value,
tag=enums.Tags.AUTHENTICATED_ENCRYPTION_TAG
)
else:
raise TypeError("authenticated encryption tag must be bytes")
def read(self, input_stream, kmip_version=enums.KMIPVersion.KMIP_1_0):
"""
Read the data encoding the Decrypt request payload and decode it
@ -202,6 +261,29 @@ class DecryptRequestPayload(primitives.Struct):
kmip_version=kmip_version
)
if kmip_version >= enums.KMIPVersion.KMIP_1_4:
if self.is_tag_next(
enums.Tags.AUTHENTICATED_ENCRYPTION_ADDITIONAL_DATA,
local_stream
):
self._auth_additional_data = primitives.ByteString(
tag=enums.Tags.AUTHENTICATED_ENCRYPTION_ADDITIONAL_DATA
)
self._auth_additional_data.read(
local_stream,
kmip_version=kmip_version
)
if self.is_tag_next(
enums.Tags.AUTHENTICATED_ENCRYPTION_TAG, local_stream
):
self._auth_tag = primitives.ByteString(
tag=enums.Tags.AUTHENTICATED_ENCRYPTION_TAG
)
self._auth_tag.read(
local_stream,
kmip_version=kmip_version
)
self.is_oversized(local_stream)
def write(self, output_stream, kmip_version=enums.KMIPVersion.KMIP_1_0):
@ -243,6 +325,18 @@ class DecryptRequestPayload(primitives.Struct):
kmip_version=kmip_version
)
if kmip_version >= enums.KMIPVersion.KMIP_1_4:
if self._auth_additional_data:
self._auth_additional_data.write(
local_stream,
kmip_version=kmip_version
)
if self._auth_tag:
self._auth_tag.write(
local_stream,
kmip_version=kmip_version
)
self.length = local_stream.length()
super(DecryptRequestPayload, self).write(
output_stream,
@ -261,6 +355,10 @@ class DecryptRequestPayload(primitives.Struct):
return False
elif self.iv_counter_nonce != other.iv_counter_nonce:
return False
elif self.auth_additional_data != other.auth_additional_data:
return False
elif self.auth_tag != other.auth_tag:
return False
else:
return True
else:
@ -279,7 +377,9 @@ class DecryptRequestPayload(primitives.Struct):
repr(self.cryptographic_parameters)
),
"data={0}".format(self.data),
"iv_counter_nonce={0}".format(self.iv_counter_nonce)
"iv_counter_nonce={0}".format(self.iv_counter_nonce),
"auth_additional_data={0}".format(self.auth_additional_data),
"auth_tag={0}".format(self.auth_tag)
])
return "DecryptRequestPayload({0})".format(args)
@ -288,7 +388,9 @@ class DecryptRequestPayload(primitives.Struct):
'unique_identifier': self.unique_identifier,
'cryptographic_parameters': self.cryptographic_parameters,
'data': self.data,
'iv_counter_nonce': self.iv_counter_nonce
'iv_counter_nonce': self.iv_counter_nonce,
'auth_additional_data': self.auth_additional_data,
'auth_tag': self.auth_tag
})

View File

@ -33,13 +33,16 @@ class EncryptRequestPayload(primitives.Struct):
data: The data to be encrypted in the form of a binary string.
iv_counter_nonce: An IV/counter/nonce to be used with the encryption
algorithm. Comes in the form of a binary string.
auth_additional_data: Any additional data to be authenticated via the
Authenticated Encryption Tag. Added in KMIP 1.4.
"""
def __init__(self,
unique_identifier=None,
cryptographic_parameters=None,
data=None,
iv_counter_nonce=None):
iv_counter_nonce=None,
auth_additional_data=None):
"""
Construct an Encrypt request payload struct.
@ -56,6 +59,9 @@ class EncryptRequestPayload(primitives.Struct):
encoding and decoding.
iv_counter_nonce (bytes): The IV/counter/nonce value to be used
with the encryption algorithm. Optional, defaults to None.
auth_additional_data (bytes): Any additional data to be
authenticated via the Authenticated Encryption Tag.
Added in KMIP 1.4.
"""
super(EncryptRequestPayload, self).__init__(
enums.Tags.REQUEST_PAYLOAD
@ -65,11 +71,13 @@ class EncryptRequestPayload(primitives.Struct):
self._cryptographic_parameters = None
self._data = None
self._iv_counter_nonce = None
self._auth_additional_data = None
self.unique_identifier = unique_identifier
self.cryptographic_parameters = cryptographic_parameters
self.data = data
self.iv_counter_nonce = iv_counter_nonce
self.auth_additional_data = auth_additional_data
@property
def unique_identifier(self):
@ -144,6 +152,25 @@ class EncryptRequestPayload(primitives.Struct):
else:
raise TypeError("IV/counter/nonce must be bytes")
@property
def auth_additional_data(self):
if self._auth_additional_data:
return self._auth_additional_data.value
else:
return None
@auth_additional_data.setter
def auth_additional_data(self, value):
if value is None:
self._auth_additional_data = None
elif isinstance(value, six.binary_type):
self._auth_additional_data = primitives.ByteString(
value=value,
tag=enums.Tags.AUTHENTICATED_ENCRYPTION_ADDITIONAL_DATA
)
else:
raise TypeError("authenticated additional data must be bytes")
def read(self, input_stream, kmip_version=enums.KMIPVersion.KMIP_1_0):
"""
Read the data encoding the Encrypt request payload and decode it
@ -202,6 +229,19 @@ class EncryptRequestPayload(primitives.Struct):
kmip_version=kmip_version
)
if kmip_version >= enums.KMIPVersion.KMIP_1_4:
if self.is_tag_next(
enums.Tags.AUTHENTICATED_ENCRYPTION_ADDITIONAL_DATA,
local_stream
):
self._auth_additional_data = primitives.ByteString(
tag=enums.Tags.AUTHENTICATED_ENCRYPTION_ADDITIONAL_DATA
)
self._auth_additional_data.read(
local_stream,
kmip_version=kmip_version
)
self.is_oversized(local_stream)
def write(self, output_stream, kmip_version=enums.KMIPVersion.KMIP_1_0):
@ -243,6 +283,13 @@ class EncryptRequestPayload(primitives.Struct):
kmip_version=kmip_version
)
if kmip_version >= enums.KMIPVersion.KMIP_1_4:
if self._auth_additional_data:
self._auth_additional_data.write(
local_stream,
kmip_version=kmip_version
)
self.length = local_stream.length()
super(EncryptRequestPayload, self).write(
output_stream,
@ -261,6 +308,8 @@ class EncryptRequestPayload(primitives.Struct):
return False
elif self.iv_counter_nonce != other.iv_counter_nonce:
return False
elif self.auth_additional_data != other.auth_additional_data:
return False
else:
return True
else:
@ -279,7 +328,8 @@ class EncryptRequestPayload(primitives.Struct):
repr(self.cryptographic_parameters)
),
"data={0}".format(self.data),
"iv_counter_nonce={0}".format(self.iv_counter_nonce)
"iv_counter_nonce={0}".format(self.iv_counter_nonce),
"auth_additional_data={0}".format(self.auth_additional_data)
])
return "EncryptRequestPayload({0})".format(args)
@ -288,7 +338,8 @@ class EncryptRequestPayload(primitives.Struct):
'unique_identifier': self.unique_identifier,
'cryptographic_parameters': self.cryptographic_parameters,
'data': self.data,
'iv_counter_nonce': self.iv_counter_nonce
'iv_counter_nonce': self.iv_counter_nonce,
'auth_additional_data': self.auth_additional_data
})
@ -302,12 +353,18 @@ class EncryptResponsePayload(primitives.Struct):
data: The encrypted data in the form of a binary string.
iv_counter_nonce: The IV/counter/nonce used with the encryption
algorithm. Comes in the form of a binary string.
auth_tag: Specifies the tag that will be needed to
authenticate the decrypted data. Only returned on completion
of the encryption of the last of the plaintext by an
authenticated encryption cipher. Optional, defaults to None.
Added in KMIP 1.4.
"""
def __init__(self,
unique_identifier=None,
data=None,
iv_counter_nonce=None):
iv_counter_nonce=None,
auth_tag=None):
"""
Construct an Encrypt response payload struct.
@ -321,6 +378,11 @@ class EncryptResponsePayload(primitives.Struct):
the encryption algorithm if it was required and if this
value was not originally specified by the client. Optional,
defaults to None.
auth_tag (bytes): Specifies the tag that will be needed to
authenticate the decrypted data. Only returned on completion
of the encryption of the last of the plaintext by an
authenticated encryption cipher. Optional, defaults to None.
Added in KMIP 1.4.
"""
super(EncryptResponsePayload, self).__init__(
enums.Tags.RESPONSE_PAYLOAD
@ -329,10 +391,12 @@ class EncryptResponsePayload(primitives.Struct):
self._unique_identifier = None
self._data = None
self._iv_counter_nonce = None
self._auth_tag = None
self.unique_identifier = unique_identifier
self.data = data
self.iv_counter_nonce = iv_counter_nonce
self.auth_tag = auth_tag
@property
def unique_identifier(self):
@ -391,6 +455,25 @@ class EncryptResponsePayload(primitives.Struct):
else:
raise TypeError("IV/counter/nonce must be bytes")
@property
def auth_tag(self):
if self._auth_tag:
return self._auth_tag.value
else:
return None
@auth_tag.setter
def auth_tag(self, value):
if value is None:
self._auth_tag = None
elif isinstance(value, six.binary_type):
self._auth_tag = primitives.ByteString(
value=value,
tag=enums.Tags.AUTHENTICATED_ENCRYPTION_TAG
)
else:
raise TypeError("authenticated encryption tag must be bytes")
def read(self, input_stream, kmip_version=enums.KMIPVersion.KMIP_1_0):
"""
Read the data encoding the Encrypt response payload and decode it
@ -445,6 +528,18 @@ class EncryptResponsePayload(primitives.Struct):
kmip_version=kmip_version
)
if kmip_version >= enums.KMIPVersion.KMIP_1_4:
if self.is_tag_next(
enums.Tags.AUTHENTICATED_ENCRYPTION_TAG, local_stream
):
self._auth_tag = primitives.ByteString(
tag=enums.Tags.AUTHENTICATED_ENCRYPTION_TAG
)
self._auth_tag.read(
local_stream,
kmip_version=kmip_version
)
self.is_oversized(local_stream)
def write(self, output_stream, kmip_version=enums.KMIPVersion.KMIP_1_0):
@ -486,6 +581,13 @@ class EncryptResponsePayload(primitives.Struct):
kmip_version=kmip_version
)
if kmip_version >= enums.KMIPVersion.KMIP_1_4:
if self._auth_tag:
self._auth_tag.write(
local_stream,
kmip_version=kmip_version
)
self.length = local_stream.length()
super(EncryptResponsePayload, self).write(
output_stream,
@ -501,6 +603,8 @@ class EncryptResponsePayload(primitives.Struct):
return False
elif self.iv_counter_nonce != other.iv_counter_nonce:
return False
elif self.auth_tag != other.auth_tag:
return False
else:
return True
else:
@ -516,7 +620,8 @@ class EncryptResponsePayload(primitives.Struct):
args = ", ".join([
"unique_identifier='{0}'".format(self.unique_identifier),
"data={0}".format(self.data),
"iv_counter_nonce={0}".format(self.iv_counter_nonce)
"iv_counter_nonce={0}".format(self.iv_counter_nonce),
"auth_tag={0}".format(self.auth_tag)
])
return "EncryptResponsePayload({0})".format(args)
@ -524,5 +629,6 @@ class EncryptResponsePayload(primitives.Struct):
return str({
'unique_identifier': self.unique_identifier,
'data': self.data,
'iv_counter_nonce': self.iv_counter_nonce
'iv_counter_nonce': self.iv_counter_nonce,
'auth_tag': self.auth_tag
})

View File

@ -79,14 +79,13 @@ class CryptographyEngine(api.CryptographicEngine):
enums.HashingAlgorithm.SHA_512: hashes.SHA512
}
# GCM is supported by cryptography but requires inputs that are not
# supported by the KMIP spec. It is excluded for now.
self._modes = {
enums.BlockCipherMode.CBC: modes.CBC,
enums.BlockCipherMode.ECB: modes.ECB,
enums.BlockCipherMode.OFB: modes.OFB,
enums.BlockCipherMode.CFB: modes.CFB,
enums.BlockCipherMode.CTR: modes.CTR
enums.BlockCipherMode.CTR: modes.CTR,
enums.BlockCipherMode.GCM: modes.GCM
}
self._asymmetric_padding_methods = {
enums.PaddingMethod.OAEP: asymmetric_padding.OAEP,
@ -294,6 +293,8 @@ class CryptographyEngine(api.CryptographicEngine):
cipher_mode=None,
padding_method=None,
iv_nonce=None,
auth_additional_data=None,
auth_tag_length=None,
hashing_algorithm=None):
"""
Encrypt data using symmetric or asymmetric encryption.
@ -316,6 +317,12 @@ class CryptographyEngine(api.CryptographicEngine):
of the encryption algorithm. Optional, defaults to None. If
required and not provided, it will be autogenerated and
returned with the cipher text.
auth_additional_data (bytes): Any additional data to be
authenticated via the Authenticated Encryption Tag.
Optional, defaults to None.
auth_tag_length (int): The length of the authentication tag in
bytes. This parameter SHALL be provided when the Block
Cipher Mode is GCM.
hashing_algorithm (HashingAlgorithm): An enumeration specifying
the hashing algorithm to use with the encryption algorithm,
if needed. Required for OAEP-based asymmetric encryption.
@ -328,6 +335,8 @@ class CryptographyEngine(api.CryptographicEngine):
* iv_nonce - the bytes of the IV/counter/nonce used if it
was needed by the encryption scheme and if it was
automatically generated for the encryption
* auth_tag - the bytes of the authentication tag used in GCM or
CCM mode
Raises:
InvalidField: Raised when the algorithm is unsupported or the
@ -373,7 +382,9 @@ class CryptographyEngine(api.CryptographicEngine):
plain_text,
cipher_mode=cipher_mode,
padding_method=padding_method,
iv_nonce=iv_nonce
iv_nonce=iv_nonce,
auth_additional_data=auth_additional_data,
auth_tag_length=auth_tag_length
)
def _encrypt_symmetric(
@ -383,7 +394,9 @@ class CryptographyEngine(api.CryptographicEngine):
plain_text,
cipher_mode=None,
padding_method=None,
iv_nonce=None):
iv_nonce=None,
auth_additional_data=None,
auth_tag_length=None):
"""
Encrypt data using symmetric encryption.
@ -406,6 +419,12 @@ class CryptographyEngine(api.CryptographicEngine):
of the encryption algorithm. Optional, defaults to None. If
required and not provided, it will be autogenerated and
returned with the cipher text.
auth_additional_data (bytes): Any additional data to be
authenticated via the Authenticated Encryption Tag.
Optional, defaults to None.
auth_tag_length (int): The length of the authentication tag in
bytes. This parameter SHALL be provided when the Block Cipher
Mode is GCM.
Returns:
dict: A dictionary containing the encrypted data, with at least
@ -414,6 +433,8 @@ class CryptographyEngine(api.CryptographicEngine):
* iv_nonce - the bytes of the IV/counter/nonce used if it
was needed by the encryption scheme and if it was
automatically generated for the encryption
* auth_tag - the bytes of the authentication tag used in
GCM mode
Raises:
InvalidField: Raised when the algorithm is unsupported or the
@ -440,6 +461,18 @@ class CryptographyEngine(api.CryptographicEngine):
"Invalid key bytes for the specified encryption algorithm."
)
is_gcm_mode = cipher_mode == enums.BlockCipherMode.GCM
if not is_gcm_mode and auth_additional_data is not None:
raise exceptions.InvalidField(
'Authenticated encryption additional data is supported '
'in GCM mode only.'
)
if is_gcm_mode and auth_tag_length is None:
raise exceptions.InvalidField(
'Authenticated encryption tag length must be provided '
'in GCM mode only.'
)
# Set up the cipher mode if needed
return_iv_nonce = False
if encryption_algorithm == enums.CryptographicAlgorithm.RC4:
@ -459,7 +492,10 @@ class CryptographyEngine(api.CryptographicEngine):
if iv_nonce is None:
iv_nonce = os.urandom(algorithm.block_size // 8)
return_iv_nonce = True
mode = mode(iv_nonce)
if is_gcm_mode:
mode = mode(iv_nonce, None, auth_tag_length)
else:
mode = mode(iv_nonce)
else:
mode = mode()
@ -477,15 +513,16 @@ class CryptographyEngine(api.CryptographicEngine):
# Encrypt the plain text
cipher = ciphers.Cipher(algorithm, mode, backend=default_backend())
encryptor = cipher.encryptor()
if auth_additional_data is not None:
encryptor.authenticate_additional_data(auth_additional_data)
cipher_text = encryptor.update(plain_text) + encryptor.finalize()
result = {'cipher_text': cipher_text}
if return_iv_nonce:
return {
'cipher_text': cipher_text,
'iv_nonce': iv_nonce
}
else:
return {'cipher_text': cipher_text}
result['iv_nonce'] = iv_nonce
if is_gcm_mode:
result['auth_tag'] = encryptor.tag
return result
def _encrypt_asymmetric(self,
encryption_algorithm,
@ -612,6 +649,8 @@ class CryptographyEngine(api.CryptographicEngine):
cipher_mode=None,
padding_method=None,
iv_nonce=None,
auth_additional_data=None,
auth_tag=None,
hashing_algorithm=None):
"""
Decrypt data using symmetric decryption.
@ -633,6 +672,14 @@ class CryptographyEngine(api.CryptographicEngine):
Optional otherwise, defaults to None.
iv_nonce (bytes): The IV/nonce value to use to initialize the mode
of the decryption algorithm. Optional, defaults to None.
auth_additional_data (bytes): Any additional data to be
authenticated via the Authenticated Encryption Tag.
Added in KMIP 1.4.
auth_tag (bytes): Specifies the tag that will be needed to
authenticate the decrypted data. Only returned on completion
of the encryption of the last of the plaintext by an
authenticated encryption cipher. Optional, defaults to None.
Added in KMIP 1.4.
hashing_algorithm (HashingAlgorithm): An enumeration specifying
the hashing algorithm to use with the decryption algorithm,
if needed. Required for OAEP-based asymmetric decryption.
@ -687,7 +734,9 @@ class CryptographyEngine(api.CryptographicEngine):
cipher_text,
cipher_mode=cipher_mode,
padding_method=padding_method,
iv_nonce=iv_nonce
iv_nonce=iv_nonce,
auth_additional_data=auth_additional_data,
auth_tag=auth_tag
)
def _decrypt_symmetric(
@ -697,7 +746,9 @@ class CryptographyEngine(api.CryptographicEngine):
cipher_text,
cipher_mode=None,
padding_method=None,
iv_nonce=None):
iv_nonce=None,
auth_additional_data=None,
auth_tag=None):
"""
Decrypt data using symmetric decryption.
@ -718,6 +769,14 @@ class CryptographyEngine(api.CryptographicEngine):
Optional otherwise, defaults to None.
iv_nonce (bytes): The IV/nonce value to use to initialize the mode
of the decryption algorithm. Optional, defaults to None.
auth_additional_data (bytes): Any additional data to be
authenticated via the Authenticated Encryption Tag.
Added in KMIP 1.4.
auth_tag (bytes): Specifies the tag that will be needed to
authenticate the decrypted data. Only returned on completion
of the encryption of the last of the plaintext by an
authenticated encryption cipher. Optional, defaults to None.
Added in KMIP 1.4.
Returns:
bytes: the bytes of the decrypted data
@ -746,6 +805,16 @@ class CryptographyEngine(api.CryptographicEngine):
"Invalid key bytes for the specified decryption algorithm."
)
is_gcm_mode = cipher_mode == enums.BlockCipherMode.GCM
if auth_additional_data is not None and not is_gcm_mode:
raise exceptions.InvalidField(
'Additional data is supported in GCM mode only.'
)
if is_gcm_mode and auth_tag is None:
raise exceptions.InvalidField(
'Authenticated tag must be provided in GCM mode.'
)
# Set up the cipher mode if needed
if decryption_algorithm == enums.CryptographicAlgorithm.RC4:
mode = None
@ -765,13 +834,18 @@ class CryptographyEngine(api.CryptographicEngine):
raise exceptions.InvalidField(
"IV/nonce is required."
)
mode = mode(iv_nonce)
if is_gcm_mode:
mode = mode(iv_nonce, auth_tag)
else:
mode = mode(iv_nonce)
else:
mode = mode()
# Decrypt the plain text
cipher = ciphers.Cipher(algorithm, mode, backend=default_backend())
decryptor = cipher.decryptor()
if auth_additional_data is not None:
decryptor.authenticate_additional_data(auth_additional_data)
plain_text = decryptor.update(cipher_text) + decryptor.finalize()
# Unpad the plain text if needed (separate methods for testing

View File

@ -2321,13 +2321,16 @@ class KmipEngine(object):
payload.data,
cipher_mode=cryptographic_parameters.block_cipher_mode,
padding_method=cryptographic_parameters.padding_method,
iv_nonce=payload.iv_counter_nonce
iv_nonce=payload.iv_counter_nonce,
auth_additional_data=payload.auth_additional_data,
auth_tag_length=cryptographic_parameters.tag_length
)
response_payload = payloads.EncryptResponsePayload(
unique_identifier,
result.get('cipher_text'),
result.get('iv_nonce')
result.get('iv_nonce'),
result.get('auth_tag')
)
return response_payload
@ -2384,7 +2387,9 @@ class KmipEngine(object):
payload.data,
cipher_mode=cryptographic_parameters.block_cipher_mode,
padding_method=cryptographic_parameters.padding_method,
iv_nonce=payload.iv_counter_nonce
iv_nonce=payload.iv_counter_nonce,
auth_additional_data=payload.auth_additional_data,
auth_tag=payload.auth_tag
)
response_payload = payloads.DecryptResponsePayload(

View File

@ -53,9 +53,11 @@ class TestDecryptRequestPayload(testtools.TestCase):
# Initial Counter Value - 1
# Data - 0x0123456789ABCDEF
# IV/Counter/Nonce - 0x01
# Authenticated Encryption Additional Data - 0x011080ff
# Authenticated Encryption Tag - 0x0190FE
self.full_encoding = utils.BytearrayStream(
b'\x42\x00\x79\x01\x00\x00\x01\x28'
b'\x42\x00\x79\x01\x00\x00\x01\x48'
b'\x42\x00\x94\x07\x00\x00\x00\x24\x62\x34\x66\x61\x65\x65\x31\x30'
b'\x2D\x61\x61\x32\x61\x2D\x34\x34\x34\x36\x2D\x38\x61\x64\x34\x2D'
b'\x30\x38\x38\x31\x66\x33\x34\x32\x32\x39\x35\x39\x00\x00\x00\x00'
@ -75,6 +77,8 @@ class TestDecryptRequestPayload(testtools.TestCase):
b'\x42\x00\xD1\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00'
b'\x42\x00\xC2\x08\x00\x00\x00\x08\x01\x23\x45\x67\x89\xAB\xCD\xEF'
b'\x42\x00\x3D\x08\x00\x00\x00\x01\x01\x00\x00\x00\x00\x00\x00\x00'
b'\x42\x00\xFE\x08\x00\x00\x00\x04\x01\x10\x80\xFF\x00\x00\x00\x00'
b'\x42\x00\xFF\x08\x00\x00\x00\x03\x01\x90\xFE\x00\x00\x00\x00\x00'
)
# Adapted from the full encoding above. This encoding matches the
@ -104,6 +108,8 @@ class TestDecryptRequestPayload(testtools.TestCase):
self.assertEqual(None, payload.cryptographic_parameters)
self.assertEqual(None, payload.data)
self.assertEqual(None, payload.iv_counter_nonce)
self.assertEqual(None, payload.auth_additional_data)
self.assertEqual(None, payload.auth_tag)
def test_init_with_args(self):
"""
@ -114,7 +120,9 @@ class TestDecryptRequestPayload(testtools.TestCase):
unique_identifier='00000000-1111-2222-3333-444444444444',
cryptographic_parameters=attributes.CryptographicParameters(),
data=b'\x01\x02\x03',
iv_counter_nonce=b'\x01'
iv_counter_nonce=b'\x01',
auth_additional_data=b'\x01\x10\x80\xFF',
auth_tag=b'\x01\x90\xFE'
)
self.assertEqual(
@ -127,6 +135,8 @@ class TestDecryptRequestPayload(testtools.TestCase):
)
self.assertEqual(b'\x01\x02\x03', payload.data)
self.assertEqual(b'\x01', payload.iv_counter_nonce)
self.assertEqual(b'\x01\x10\x80\xFF', payload.auth_additional_data)
self.assertEqual(b'\x01\x90\xFE', payload.auth_tag)
def test_invalid_unique_identifier(self):
"""
@ -185,6 +195,34 @@ class TestDecryptRequestPayload(testtools.TestCase):
*args
)
def test_invalid_auth_additional_data(self):
"""
Test that a TypeError is raised when an invalid value is used to set
the authenticated additional data of an Decrypt request payload.
"""
payload = payloads.DecryptRequestPayload()
args = (payload, 'auth_additional_data', 0)
self.assertRaisesRegex(
TypeError,
"authenticated additional data must be bytes",
setattr,
*args
)
def test_invalid_auth_tag(self):
"""
Test that a TypeError is raised when an invalid value is used to set
the authenticated encryption tag of an Decrypt request payload.
"""
payload = payloads.DecryptRequestPayload()
args = (payload, 'auth_tag', 0)
self.assertRaisesRegex(
TypeError,
"authenticated encryption tag must be bytes",
setattr,
*args
)
def test_read(self):
"""
Test that a Decrypt request payload can be read from a data stream.
@ -195,8 +233,10 @@ class TestDecryptRequestPayload(testtools.TestCase):
self.assertEqual(None, payload.cryptographic_parameters)
self.assertEqual(None, payload.data)
self.assertEqual(None, payload.iv_counter_nonce)
self.assertEqual(None, payload.auth_additional_data)
self.assertEqual(None, payload.auth_tag)
payload.read(self.full_encoding)
payload.read(self.full_encoding, enums.KMIPVersion.KMIP_1_4)
self.assertEqual(
'b4faee10-aa2a-4446-8ad4-0881f3422959',
@ -245,6 +285,8 @@ class TestDecryptRequestPayload(testtools.TestCase):
)
self.assertEqual(b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', payload.data)
self.assertEqual(b'\x01', payload.iv_counter_nonce)
self.assertEqual(b'\x01\x10\x80\xFF', payload.auth_additional_data)
self.assertEqual(b'\x01\x90\xFE', payload.auth_tag)
def test_read_partial(self):
"""
@ -302,10 +344,12 @@ class TestDecryptRequestPayload(testtools.TestCase):
initial_counter_value=1
),
data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF',
iv_counter_nonce=b'\x01'
iv_counter_nonce=b'\x01',
auth_additional_data=b'\x01\x10\x80\xFF',
auth_tag=b'\x01\x90\xFE'
)
stream = utils.BytearrayStream()
payload.write(stream)
payload.write(stream, enums.KMIPVersion.KMIP_1_4)
self.assertEqual(len(self.full_encoding), len(stream))
self.assertEqual(str(self.full_encoding), str(stream))
@ -369,7 +413,9 @@ class TestDecryptRequestPayload(testtools.TestCase):
initial_counter_value=1
),
data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF',
iv_counter_nonce=b'\x01'
iv_counter_nonce=b'\x01',
auth_additional_data=b'\x01\x10\x80\xFF',
auth_tag=b'\x01\x90\xFE'
)
b = payloads.DecryptRequestPayload(
unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959',
@ -390,7 +436,9 @@ class TestDecryptRequestPayload(testtools.TestCase):
initial_counter_value=1
),
data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF',
iv_counter_nonce=b'\x01'
iv_counter_nonce=b'\x01',
auth_additional_data=b'\x01\x10\x80\xFF',
auth_tag=b'\x01\x90\xFE'
)
self.assertTrue(a == b)
@ -452,6 +500,30 @@ class TestDecryptRequestPayload(testtools.TestCase):
self.assertFalse(a == b)
self.assertFalse(b == a)
def test_equal_on_not_equal_auth_additional_data(self):
"""
Test that the equality operator returns False when comparing two
Decrypt request payloads with different authenticated additional data
values.
"""
a = payloads.DecryptRequestPayload(auth_additional_data=b'\x22')
b = payloads.DecryptRequestPayload(auth_additional_data=b'\xAA')
self.assertFalse(a == b)
self.assertFalse(b == a)
def test_equal_on_not_equal_auth_tag(self):
"""
Test that the equality operator returns False when comparing two
Decrypt request payloads with different authenticated encryption tag
values.
"""
a = payloads.DecryptRequestPayload(auth_tag=b'\x22')
b = payloads.DecryptRequestPayload(auth_tag=b'\xAA')
self.assertFalse(a == b)
self.assertFalse(b == a)
def test_equal_on_type_mismatch(self):
"""
Test that the equality operator returns False when comparing two
@ -493,7 +565,9 @@ class TestDecryptRequestPayload(testtools.TestCase):
initial_counter_value=1
),
data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF',
iv_counter_nonce=b'\x01'
iv_counter_nonce=b'\x01',
auth_additional_data=b'\x01\x10\x80\xFF',
auth_tag=b'\x01\x90\xFE'
)
b = payloads.DecryptRequestPayload(
unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959',
@ -514,7 +588,9 @@ class TestDecryptRequestPayload(testtools.TestCase):
initial_counter_value=1
),
data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF',
iv_counter_nonce=b'\x01'
iv_counter_nonce=b'\x01',
auth_additional_data=b'\x01\x10\x80\xFF',
auth_tag=b'\x01\x90\xFE'
)
self.assertFalse(a != b)
@ -576,6 +652,30 @@ class TestDecryptRequestPayload(testtools.TestCase):
self.assertTrue(a != b)
self.assertTrue(b != a)
def test_not_equal_on_not_equal_auth_additional_data(self):
"""
Test that the inequality operator returns True when comparing two
Decrypt request payloads with different authenticated additional data
values.
"""
a = payloads.DecryptRequestPayload(auth_additional_data=b'\x22')
b = payloads.DecryptRequestPayload(auth_additional_data=b'\xAA')
self.assertTrue(a != b)
self.assertTrue(b != a)
def test_not_equal_on_not_equal_auth_tag(self):
"""
Test that the inequality operator returns True when comparing two
Decrypt request payloads with different authenticated encryption tag
values.
"""
a = payloads.DecryptRequestPayload(auth_tag=b'\x22')
b = payloads.DecryptRequestPayload(auth_tag=b'\xAA')
self.assertTrue(a != b)
self.assertTrue(b != a)
def test_not_equal_on_type_mismatch(self):
"""
Test that the inequality operator returns True when comparing two
@ -610,7 +710,9 @@ class TestDecryptRequestPayload(testtools.TestCase):
initial_counter_value=1
),
data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF',
iv_counter_nonce=b'\x01'
iv_counter_nonce=b'\x01',
auth_additional_data=b'\x01\x10\x80\xFF',
auth_tag=b'\x01\x90\xFE'
)
expected = (
"DecryptRequestPayload("
@ -631,7 +733,9 @@ class TestDecryptRequestPayload(testtools.TestCase):
"counter_length=0, "
"initial_counter_value=1), "
"data=" + str(b'\x01\x23\x45\x67\x89\xAB\xCD\xEF') + ", "
"iv_counter_nonce=" + str(b'\x01') + ")"
"iv_counter_nonce=" + str(b'\x01') + ", "
"auth_additional_data=" + str(b'\x01\x10\x80\xFF') + ", "
"auth_tag=" + str(b'\x01\x90\xFE') + ")"
)
observed = repr(payload)
@ -661,14 +765,18 @@ class TestDecryptRequestPayload(testtools.TestCase):
unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959',
cryptographic_parameters=cryptographic_parameters,
data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF',
iv_counter_nonce=b'\x01'
iv_counter_nonce=b'\x01',
auth_additional_data=b'\x01\x10\x80\xFF',
auth_tag=b'\x01\x90\xFE'
)
expected = str({
'unique_identifier': 'b4faee10-aa2a-4446-8ad4-0881f3422959',
'cryptographic_parameters': cryptographic_parameters,
'data': b'\x01\x23\x45\x67\x89\xAB\xCD\xEF',
'iv_counter_nonce': b'\x01'
'iv_counter_nonce': b'\x01',
'auth_additional_data': b'\x01\x10\x80\xFF',
'auth_tag': b'\x01\x90\xFE'
})
observed = str(payload)

View File

@ -53,9 +53,10 @@ class TestEncryptRequestPayload(testtools.TestCase):
# Initial Counter Value - 1
# Data - 0x0123456789ABCDEF
# IV/Counter/Nonce - 0x01
# Authenticated Encryption Additional Data - 0x011080ff
self.full_encoding = utils.BytearrayStream(
b'\x42\x00\x79\x01\x00\x00\x01\x28'
b'\x42\x00\x79\x01\x00\x00\x01\x38'
b'\x42\x00\x94\x07\x00\x00\x00\x24\x62\x34\x66\x61\x65\x65\x31\x30'
b'\x2D\x61\x61\x32\x61\x2D\x34\x34\x34\x36\x2D\x38\x61\x64\x34\x2D'
b'\x30\x38\x38\x31\x66\x33\x34\x32\x32\x39\x35\x39\x00\x00\x00\x00'
@ -75,6 +76,7 @@ class TestEncryptRequestPayload(testtools.TestCase):
b'\x42\x00\xD1\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00'
b'\x42\x00\xC2\x08\x00\x00\x00\x08\x01\x23\x45\x67\x89\xAB\xCD\xEF'
b'\x42\x00\x3D\x08\x00\x00\x00\x01\x01\x00\x00\x00\x00\x00\x00\x00'
b'\x42\x00\xFE\x08\x00\x00\x00\x04\x01\x10\x80\xFF\x00\x00\x00\x00'
)
# Adapted from the full encoding above. This encoding matches the
@ -104,6 +106,7 @@ class TestEncryptRequestPayload(testtools.TestCase):
self.assertEqual(None, payload.cryptographic_parameters)
self.assertEqual(None, payload.data)
self.assertEqual(None, payload.iv_counter_nonce)
self.assertEqual(None, payload.auth_additional_data)
def test_init_with_args(self):
"""
@ -114,7 +117,8 @@ class TestEncryptRequestPayload(testtools.TestCase):
unique_identifier='00000000-1111-2222-3333-444444444444',
cryptographic_parameters=attributes.CryptographicParameters(),
data=b'\x01\x02\x03',
iv_counter_nonce=b'\x01'
iv_counter_nonce=b'\x01',
auth_additional_data=b'\x01\x10\x80\xFF'
)
self.assertEqual(
@ -127,6 +131,7 @@ class TestEncryptRequestPayload(testtools.TestCase):
)
self.assertEqual(b'\x01\x02\x03', payload.data)
self.assertEqual(b'\x01', payload.iv_counter_nonce)
self.assertEqual(b'\x01\x10\x80\xFF', payload.auth_additional_data)
def test_invalid_unique_identifier(self):
"""
@ -185,6 +190,20 @@ class TestEncryptRequestPayload(testtools.TestCase):
*args
)
def test_invalid_auth_additional_data(self):
"""
Test that a TypeError is raised when an invalid value is used to set
the authenticated additional data of an Encrypt request payload.
"""
payload = payloads.EncryptRequestPayload()
args = (payload, 'auth_additional_data', 0)
self.assertRaisesRegex(
TypeError,
"authenticated additional data must be bytes",
setattr,
*args
)
def test_read(self):
"""
Test that an Encrypt request payload can be read from a data stream.
@ -195,8 +214,9 @@ class TestEncryptRequestPayload(testtools.TestCase):
self.assertEqual(None, payload.cryptographic_parameters)
self.assertEqual(None, payload.data)
self.assertEqual(None, payload.iv_counter_nonce)
self.assertEqual(None, payload.auth_additional_data)
payload.read(self.full_encoding)
payload.read(self.full_encoding, enums.KMIPVersion.KMIP_1_4)
self.assertEqual(
'b4faee10-aa2a-4446-8ad4-0881f3422959',
@ -245,6 +265,7 @@ class TestEncryptRequestPayload(testtools.TestCase):
)
self.assertEqual(b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', payload.data)
self.assertEqual(b'\x01', payload.iv_counter_nonce)
self.assertEqual(b'\x01\x10\x80\xFF', payload.auth_additional_data)
def test_read_partial(self):
"""
@ -302,10 +323,11 @@ class TestEncryptRequestPayload(testtools.TestCase):
initial_counter_value=1
),
data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF',
iv_counter_nonce=b'\x01'
iv_counter_nonce=b'\x01',
auth_additional_data=b'\x01\x10\x80\xFF'
)
stream = utils.BytearrayStream()
payload.write(stream)
payload.write(stream, enums.KMIPVersion.KMIP_1_4)
self.assertEqual(len(self.full_encoding), len(stream))
self.assertEqual(str(self.full_encoding), str(stream))
@ -369,7 +391,8 @@ class TestEncryptRequestPayload(testtools.TestCase):
initial_counter_value=1
),
data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF',
iv_counter_nonce=b'\x01'
iv_counter_nonce=b'\x01',
auth_additional_data=b'\x01\x10\x80\xFF'
)
b = payloads.EncryptRequestPayload(
unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959',
@ -390,7 +413,8 @@ class TestEncryptRequestPayload(testtools.TestCase):
initial_counter_value=1
),
data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF',
iv_counter_nonce=b'\x01'
iv_counter_nonce=b'\x01',
auth_additional_data=b'\x01\x10\x80\xFF'
)
self.assertTrue(a == b)
@ -452,6 +476,18 @@ class TestEncryptRequestPayload(testtools.TestCase):
self.assertFalse(a == b)
self.assertFalse(b == a)
def test_equal_on_not_equal_auth_additional_data(self):
"""
Test that the equality operator returns False when comparing two
Encrypt request payloads with different authenticated additional data
values.
"""
a = payloads.EncryptRequestPayload(auth_additional_data=b'\x22')
b = payloads.EncryptRequestPayload(auth_additional_data=b'\xAA')
self.assertFalse(a == b)
self.assertFalse(b == a)
def test_equal_on_type_mismatch(self):
"""
Test that the equality operator returns False when comparing two
@ -576,6 +612,18 @@ class TestEncryptRequestPayload(testtools.TestCase):
self.assertTrue(a != b)
self.assertTrue(b != a)
def test_not_equal_on_not_equal_auth_additional_data(self):
"""
Test that the inequality operator returns True when comparing two
Encrypt request payloads with different authenticated additional data
values.
"""
a = payloads.EncryptRequestPayload(auth_additional_data=b'\x22')
b = payloads.EncryptRequestPayload(auth_additional_data=b'\xAA')
self.assertTrue(a != b)
self.assertTrue(b != a)
def test_not_equal_on_type_mismatch(self):
"""
Test that the inequality operator returns True when comparing two
@ -610,7 +658,8 @@ class TestEncryptRequestPayload(testtools.TestCase):
initial_counter_value=1
),
data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF',
iv_counter_nonce=b'\x01'
iv_counter_nonce=b'\x01',
auth_additional_data=b'\x01\x10\x80\xFF'
)
expected = (
"EncryptRequestPayload("
@ -631,7 +680,8 @@ class TestEncryptRequestPayload(testtools.TestCase):
"counter_length=0, "
"initial_counter_value=1), "
"data=" + str(b'\x01\x23\x45\x67\x89\xAB\xCD\xEF') + ", "
"iv_counter_nonce=" + str(b'\x01') + ")"
"iv_counter_nonce=" + str(b'\x01') + ", "
"auth_additional_data=" + str(b'\x01\x10\x80\xFF') + ")"
)
observed = repr(payload)
@ -661,14 +711,16 @@ class TestEncryptRequestPayload(testtools.TestCase):
unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959',
cryptographic_parameters=cryptographic_parameters,
data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF',
iv_counter_nonce=b'\x01'
iv_counter_nonce=b'\x01',
auth_additional_data=b'\x01\x10\x80\xFF'
)
expected = str({
'unique_identifier': 'b4faee10-aa2a-4446-8ad4-0881f3422959',
'cryptographic_parameters': cryptographic_parameters,
'data': b'\x01\x23\x45\x67\x89\xAB\xCD\xEF',
'iv_counter_nonce': b'\x01'
'iv_counter_nonce': b'\x01',
'auth_additional_data': b'\x01\x10\x80\xFF'
})
observed = str(payload)
@ -692,14 +744,16 @@ class TestEncryptResponsePayload(testtools.TestCase):
# Unique Identifier - b4faee10-aa2a-4446-8ad4-0881f3422959
# Data - 0x0123456789ABCDEF
# IV/Counter/Nonce - 0x01
# Authenticated Encryption Tag - 0x0190FE
self.full_encoding = utils.BytearrayStream(
b'\x42\x00\x7C\x01\x00\x00\x00\x50'
b'\x42\x00\x7C\x01\x00\x00\x00\x60'
b'\x42\x00\x94\x07\x00\x00\x00\x24\x62\x34\x66\x61\x65\x65\x31\x30'
b'\x2D\x61\x61\x32\x61\x2D\x34\x34\x34\x36\x2D\x38\x61\x64\x34\x2D'
b'\x30\x38\x38\x31\x66\x33\x34\x32\x32\x39\x35\x39\x00\x00\x00\x00'
b'\x42\x00\xC2\x08\x00\x00\x00\x08\x01\x23\x45\x67\x89\xAB\xCD\xEF'
b'\x42\x00\x3D\x08\x00\x00\x00\x01\x01\x00\x00\x00\x00\x00\x00\x00'
b'\x42\x00\xFF\x08\x00\x00\x00\x03\x01\x90\xFE\x00\x00\x00\x00\x00'
)
# Adapted from the full encoding above. This encoding matches the
@ -743,6 +797,7 @@ class TestEncryptResponsePayload(testtools.TestCase):
self.assertEqual(None, payload.unique_identifier)
self.assertEqual(None, payload.data)
self.assertEqual(None, payload.iv_counter_nonce)
self.assertEqual(None, payload.auth_tag)
def test_init_with_args(self):
"""
@ -752,7 +807,8 @@ class TestEncryptResponsePayload(testtools.TestCase):
payload = payloads.EncryptResponsePayload(
unique_identifier='00000000-1111-2222-3333-444444444444',
data=b'\x01\x02\x03',
iv_counter_nonce=b'\x01'
iv_counter_nonce=b'\x01',
auth_tag=b'\x01\x90\xFE'
)
self.assertEqual(
@ -761,6 +817,7 @@ class TestEncryptResponsePayload(testtools.TestCase):
)
self.assertEqual(b'\x01\x02\x03', payload.data)
self.assertEqual(b'\x01', payload.iv_counter_nonce)
self.assertEqual(b'\x01\x90\xFE', payload.auth_tag)
def test_invalid_unique_identifier(self):
"""
@ -804,6 +861,20 @@ class TestEncryptResponsePayload(testtools.TestCase):
*args
)
def test_invalid_auth_tag(self):
"""
Test that a TypeError is raised when an invalid value is used to set
the authenticated encryption tag of an Encrypt response payload.
"""
payload = payloads.EncryptResponsePayload()
args = (payload, 'auth_tag', 0)
self.assertRaisesRegex(
TypeError,
"authenticated encryption tag must be bytes",
setattr,
*args
)
def test_read(self):
"""
Test that an Encrypt response payload can be read from a data stream.
@ -813,8 +884,9 @@ class TestEncryptResponsePayload(testtools.TestCase):
self.assertEqual(None, payload.unique_identifier)
self.assertEqual(None, payload.data)
self.assertEqual(None, payload.iv_counter_nonce)
self.assertEqual(None, payload.auth_tag)
payload.read(self.full_encoding)
payload.read(self.full_encoding, enums.KMIPVersion.KMIP_1_4)
self.assertEqual(
'b4faee10-aa2a-4446-8ad4-0881f3422959',
@ -822,6 +894,7 @@ class TestEncryptResponsePayload(testtools.TestCase):
)
self.assertEqual(b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', payload.data)
self.assertEqual(b'\x01', payload.iv_counter_nonce)
self.assertEqual(b'\x01\x90\xFE', payload.auth_tag)
def test_read_partial(self):
"""
@ -873,10 +946,11 @@ class TestEncryptResponsePayload(testtools.TestCase):
payload = payloads.EncryptResponsePayload(
unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959',
data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF',
iv_counter_nonce=b'\x01'
iv_counter_nonce=b'\x01',
auth_tag=b'\x01\x90\xFE'
)
stream = utils.BytearrayStream()
payload.write(stream)
payload.write(stream, enums.KMIPVersion.KMIP_1_4)
self.assertEqual(len(self.full_encoding), len(stream))
self.assertEqual(str(self.full_encoding), str(stream))
@ -938,12 +1012,14 @@ class TestEncryptResponsePayload(testtools.TestCase):
a = payloads.EncryptResponsePayload(
unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959',
data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF',
iv_counter_nonce=b'\x01'
iv_counter_nonce=b'\x01',
auth_tag=b'\x01\x90\xFE'
)
b = payloads.EncryptResponsePayload(
unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959',
data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF',
iv_counter_nonce=b'\x01'
iv_counter_nonce=b'\x01',
auth_tag=b'\x01\x90\xFE'
)
self.assertTrue(a == b)
@ -986,6 +1062,18 @@ class TestEncryptResponsePayload(testtools.TestCase):
self.assertFalse(a == b)
self.assertFalse(b == a)
def test_equal_on_not_equal_auth_tag(self):
"""
Test that the equality operator returns False when comparing two
Encrypt response payloads with different authenticated encryption tag
values.
"""
a = payloads.EncryptResponsePayload(auth_tag=b'\x22')
b = payloads.EncryptResponsePayload(auth_tag=b'\xAA')
self.assertFalse(a == b)
self.assertFalse(b == a)
def test_equal_on_type_mismatch(self):
"""
Test that the equality operator returns False when comparing two
@ -1059,6 +1147,18 @@ class TestEncryptResponsePayload(testtools.TestCase):
self.assertTrue(a != b)
self.assertTrue(b != a)
def test_not_equal_on_not_equal_auth_tag(self):
"""
Test that the inequality operator returns True when comparing two
Encrypt response payloads with different authenticated encryption tag
values.
"""
a = payloads.EncryptResponsePayload(auth_tag=b'\x22')
b = payloads.EncryptResponsePayload(auth_tag=b'\xAA')
self.assertTrue(a != b)
self.assertTrue(b != a)
def test_not_equal_on_type_mismatch(self):
"""
Test that the inequality operator returns True when comparing two
@ -1077,13 +1177,15 @@ class TestEncryptResponsePayload(testtools.TestCase):
payload = payloads.EncryptResponsePayload(
unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959',
data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF',
iv_counter_nonce=b'\x01'
iv_counter_nonce=b'\x01',
auth_tag=b'\x01\x90\xFE'
)
expected = (
"EncryptResponsePayload("
"unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', "
"data=" + str(b'\x01\x23\x45\x67\x89\xAB\xCD\xEF') + ", "
"iv_counter_nonce=" + str(b'\x01') + ")"
"iv_counter_nonce=" + str(b'\x01') + ", "
"auth_tag=" + str(b'\x01\x90\xFE') + ")"
)
observed = repr(payload)
@ -1096,13 +1198,15 @@ class TestEncryptResponsePayload(testtools.TestCase):
payload = payloads.EncryptResponsePayload(
unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959',
data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF',
iv_counter_nonce=b'\x01'
iv_counter_nonce=b'\x01',
auth_tag=b'\x01\x90\xFE'
)
expected = str({
'unique_identifier': 'b4faee10-aa2a-4446-8ad4-0881f3422959',
'data': b'\x01\x23\x45\x67\x89\xAB\xCD\xEF',
'iv_counter_nonce': b'\x01'
'iv_counter_nonce': b'\x01',
'auth_tag': b'\x01\x90\xFE'
})
observed = str(payload)

View File

@ -1331,6 +1331,34 @@ class TestCryptographyEngine(testtools.TestCase):
b'\x6a\xcc\x04\x14\x2e\x10\x0a\x65'
b'\xf5\x1b\x97\xad\xf5\x17\x2c\x41'
)},
{'algorithm': enums.CryptographicAlgorithm.AES,
'cipher_mode': enums.BlockCipherMode.GCM,
'key': (
b'\x6e\xd7\x6d\x2d\x97\xc6\x9f\xd1'
b'\x33\x95\x89\x52\x39\x31\xf2\xa6'
b'\xcf\xf5\x54\xb1\x5f\x73\x8f\x21'
b'\xec\x72\xdd\x97\xa7\x33\x09\x07'
),
'iv_nonce': (
b'\x85\x1e\x87\x64\x77\x6e\x67\x96'
b'\xaa\xb7\x22\xdb\xb6\x44\xac\xe8'
),
'plain_text': (
b'\x62\x82\xb8\xc0\x5c\x5c\x15\x30'
b'\xb9\x7d\x48\x16\xca\x43\x47\x62'
),
'auth_additional_data': (
b'\x61\x75\x74\x68\x2d\x74\x65\x73\x74'
),
'auth_tag_length': 16,
'cipher_text': (
b'\x09\x45\x1b\x14\xf9\x25\x68\x24'
b'\x55\x50\x13\xb5\x09\xe9\xd6\x63'
),
'auth_tag': (
b'\xbe\x9f\xef\x78\x5f\x9e\xe6\x16'
b'\x90\xe9\x44\x59\xc1\x84\x44\x7f'
)},
{'algorithm': enums.CryptographicAlgorithm.BLOWFISH,
'cipher_mode': enums.BlockCipherMode.OFB,
'key': (
@ -1441,7 +1469,9 @@ def test_encrypt_symmetric(symmetric_parameters):
symmetric_parameters.get('key'),
symmetric_parameters.get('plain_text'),
cipher_mode=symmetric_parameters.get('cipher_mode'),
iv_nonce=symmetric_parameters.get('iv_nonce')
iv_nonce=symmetric_parameters.get('iv_nonce'),
auth_additional_data=symmetric_parameters.get('auth_additional_data'),
auth_tag_length=symmetric_parameters.get('auth_tag_length')
)
if engine._handle_symmetric_padding.called:
@ -1454,6 +1484,7 @@ def test_encrypt_symmetric(symmetric_parameters):
)
assert symmetric_parameters.get('cipher_text') == result.get('cipher_text')
assert symmetric_parameters.get('auth_tag') == result.get('auth_tag')
def test_decrypt_symmetric(symmetric_parameters):
@ -1472,7 +1503,9 @@ def test_decrypt_symmetric(symmetric_parameters):
symmetric_parameters.get('key'),
symmetric_parameters.get('cipher_text'),
cipher_mode=symmetric_parameters.get('cipher_mode'),
iv_nonce=symmetric_parameters.get('iv_nonce')
iv_nonce=symmetric_parameters.get('iv_nonce'),
auth_additional_data=symmetric_parameters.get('auth_additional_data'),
auth_tag=symmetric_parameters.get('auth_tag')
)
if engine._handle_symmetric_padding.called: