From 26c1c71fffdf93d4770b890546f8bace5250fb75 Mon Sep 17 00:00:00 2001 From: oleksiys Date: Tue, 6 Aug 2019 17:53:51 -0700 Subject: [PATCH] Add support for AES GCM mode. --- kmip/core/messages/payloads/decrypt.py | 108 ++++++++++++- kmip/core/messages/payloads/encrypt.py | 118 +++++++++++++- kmip/services/server/crypto/engine.py | 104 ++++++++++-- kmip/services/server/engine.py | 11 +- .../core/messages/payloads/test_decrypt.py | 134 ++++++++++++++-- .../core/messages/payloads/test_encrypt.py | 148 +++++++++++++++--- .../services/server/crypto/test_engine.py | 37 ++++- 7 files changed, 596 insertions(+), 64 deletions(-) diff --git a/kmip/core/messages/payloads/decrypt.py b/kmip/core/messages/payloads/decrypt.py index cfe947a..e748170 100644 --- a/kmip/core/messages/payloads/decrypt.py +++ b/kmip/core/messages/payloads/decrypt.py @@ -33,13 +33,22 @@ class DecryptRequestPayload(primitives.Struct): data: The data to be decrypted in the form of a binary string. iv_counter_nonce: An IV/counter/nonce to be used with the decryption algorithm. Comes in the form of a binary string. + auth_additional_data: Any additional data to be authenticated via the + Authenticated Encryption Tag. Added in KMIP 1.4. + auth_tag: Specifies the tag that will be needed to + authenticate the decrypted data. Only returned on completion + of the encryption of the last of the plaintext by an + authenticated encryption cipher. Optional, defaults to None. + Added in KMIP 1.4. """ def __init__(self, unique_identifier=None, cryptographic_parameters=None, data=None, - iv_counter_nonce=None): + iv_counter_nonce=None, + auth_additional_data=None, + auth_tag=None): """ Construct a Decrypt request payload struct. @@ -56,6 +65,14 @@ class DecryptRequestPayload(primitives.Struct): encoding and decoding. iv_counter_nonce (bytes): The IV/counter/nonce value to be used with the decryption algorithm. Optional, defaults to None. + auth_additional_data (bytes): Any additional data to be + authenticated via the Authenticated Encryption Tag. + Added in KMIP 1.4. + auth_tag (bytes): Specifies the tag that will be needed to + authenticate the decrypted data. Only returned on completion + of the encryption of the last of the plaintext by an + authenticated encryption cipher. Optional, defaults to None. + Added in KMIP 1.4. """ super(DecryptRequestPayload, self).__init__( enums.Tags.REQUEST_PAYLOAD @@ -65,11 +82,15 @@ class DecryptRequestPayload(primitives.Struct): self._cryptographic_parameters = None self._data = None self._iv_counter_nonce = None + self._auth_additional_data = None + self._auth_tag = None self.unique_identifier = unique_identifier self.cryptographic_parameters = cryptographic_parameters self.data = data self.iv_counter_nonce = iv_counter_nonce + self.auth_additional_data = auth_additional_data + self.auth_tag = auth_tag @property def unique_identifier(self): @@ -144,6 +165,44 @@ class DecryptRequestPayload(primitives.Struct): else: raise TypeError("IV/counter/nonce must be bytes") + @property + def auth_additional_data(self): + if self._auth_additional_data: + return self._auth_additional_data.value + else: + return None + + @auth_additional_data.setter + def auth_additional_data(self, value): + if value is None: + self._auth_additional_data = None + elif isinstance(value, six.binary_type): + self._auth_additional_data = primitives.ByteString( + value=value, + tag=enums.Tags.AUTHENTICATED_ENCRYPTION_ADDITIONAL_DATA + ) + else: + raise TypeError("authenticated additional data must be bytes") + + @property + def auth_tag(self): + if self._auth_tag: + return self._auth_tag.value + else: + return None + + @auth_tag.setter + def auth_tag(self, value): + if value is None: + self._auth_tag = None + elif isinstance(value, six.binary_type): + self._auth_tag = primitives.ByteString( + value=value, + tag=enums.Tags.AUTHENTICATED_ENCRYPTION_TAG + ) + else: + raise TypeError("authenticated encryption tag must be bytes") + def read(self, input_stream, kmip_version=enums.KMIPVersion.KMIP_1_0): """ Read the data encoding the Decrypt request payload and decode it @@ -202,6 +261,29 @@ class DecryptRequestPayload(primitives.Struct): kmip_version=kmip_version ) + if kmip_version >= enums.KMIPVersion.KMIP_1_4: + if self.is_tag_next( + enums.Tags.AUTHENTICATED_ENCRYPTION_ADDITIONAL_DATA, + local_stream + ): + self._auth_additional_data = primitives.ByteString( + tag=enums.Tags.AUTHENTICATED_ENCRYPTION_ADDITIONAL_DATA + ) + self._auth_additional_data.read( + local_stream, + kmip_version=kmip_version + ) + if self.is_tag_next( + enums.Tags.AUTHENTICATED_ENCRYPTION_TAG, local_stream + ): + self._auth_tag = primitives.ByteString( + tag=enums.Tags.AUTHENTICATED_ENCRYPTION_TAG + ) + self._auth_tag.read( + local_stream, + kmip_version=kmip_version + ) + self.is_oversized(local_stream) def write(self, output_stream, kmip_version=enums.KMIPVersion.KMIP_1_0): @@ -243,6 +325,18 @@ class DecryptRequestPayload(primitives.Struct): kmip_version=kmip_version ) + if kmip_version >= enums.KMIPVersion.KMIP_1_4: + if self._auth_additional_data: + self._auth_additional_data.write( + local_stream, + kmip_version=kmip_version + ) + if self._auth_tag: + self._auth_tag.write( + local_stream, + kmip_version=kmip_version + ) + self.length = local_stream.length() super(DecryptRequestPayload, self).write( output_stream, @@ -261,6 +355,10 @@ class DecryptRequestPayload(primitives.Struct): return False elif self.iv_counter_nonce != other.iv_counter_nonce: return False + elif self.auth_additional_data != other.auth_additional_data: + return False + elif self.auth_tag != other.auth_tag: + return False else: return True else: @@ -279,7 +377,9 @@ class DecryptRequestPayload(primitives.Struct): repr(self.cryptographic_parameters) ), "data={0}".format(self.data), - "iv_counter_nonce={0}".format(self.iv_counter_nonce) + "iv_counter_nonce={0}".format(self.iv_counter_nonce), + "auth_additional_data={0}".format(self.auth_additional_data), + "auth_tag={0}".format(self.auth_tag) ]) return "DecryptRequestPayload({0})".format(args) @@ -288,7 +388,9 @@ class DecryptRequestPayload(primitives.Struct): 'unique_identifier': self.unique_identifier, 'cryptographic_parameters': self.cryptographic_parameters, 'data': self.data, - 'iv_counter_nonce': self.iv_counter_nonce + 'iv_counter_nonce': self.iv_counter_nonce, + 'auth_additional_data': self.auth_additional_data, + 'auth_tag': self.auth_tag }) diff --git a/kmip/core/messages/payloads/encrypt.py b/kmip/core/messages/payloads/encrypt.py index 060aba5..4682705 100644 --- a/kmip/core/messages/payloads/encrypt.py +++ b/kmip/core/messages/payloads/encrypt.py @@ -33,13 +33,16 @@ class EncryptRequestPayload(primitives.Struct): data: The data to be encrypted in the form of a binary string. iv_counter_nonce: An IV/counter/nonce to be used with the encryption algorithm. Comes in the form of a binary string. + auth_additional_data: Any additional data to be authenticated via the + Authenticated Encryption Tag. Added in KMIP 1.4. """ def __init__(self, unique_identifier=None, cryptographic_parameters=None, data=None, - iv_counter_nonce=None): + iv_counter_nonce=None, + auth_additional_data=None): """ Construct an Encrypt request payload struct. @@ -56,6 +59,9 @@ class EncryptRequestPayload(primitives.Struct): encoding and decoding. iv_counter_nonce (bytes): The IV/counter/nonce value to be used with the encryption algorithm. Optional, defaults to None. + auth_additional_data (bytes): Any additional data to be + authenticated via the Authenticated Encryption Tag. + Added in KMIP 1.4. """ super(EncryptRequestPayload, self).__init__( enums.Tags.REQUEST_PAYLOAD @@ -65,11 +71,13 @@ class EncryptRequestPayload(primitives.Struct): self._cryptographic_parameters = None self._data = None self._iv_counter_nonce = None + self._auth_additional_data = None self.unique_identifier = unique_identifier self.cryptographic_parameters = cryptographic_parameters self.data = data self.iv_counter_nonce = iv_counter_nonce + self.auth_additional_data = auth_additional_data @property def unique_identifier(self): @@ -144,6 +152,25 @@ class EncryptRequestPayload(primitives.Struct): else: raise TypeError("IV/counter/nonce must be bytes") + @property + def auth_additional_data(self): + if self._auth_additional_data: + return self._auth_additional_data.value + else: + return None + + @auth_additional_data.setter + def auth_additional_data(self, value): + if value is None: + self._auth_additional_data = None + elif isinstance(value, six.binary_type): + self._auth_additional_data = primitives.ByteString( + value=value, + tag=enums.Tags.AUTHENTICATED_ENCRYPTION_ADDITIONAL_DATA + ) + else: + raise TypeError("authenticated additional data must be bytes") + def read(self, input_stream, kmip_version=enums.KMIPVersion.KMIP_1_0): """ Read the data encoding the Encrypt request payload and decode it @@ -202,6 +229,19 @@ class EncryptRequestPayload(primitives.Struct): kmip_version=kmip_version ) + if kmip_version >= enums.KMIPVersion.KMIP_1_4: + if self.is_tag_next( + enums.Tags.AUTHENTICATED_ENCRYPTION_ADDITIONAL_DATA, + local_stream + ): + self._auth_additional_data = primitives.ByteString( + tag=enums.Tags.AUTHENTICATED_ENCRYPTION_ADDITIONAL_DATA + ) + self._auth_additional_data.read( + local_stream, + kmip_version=kmip_version + ) + self.is_oversized(local_stream) def write(self, output_stream, kmip_version=enums.KMIPVersion.KMIP_1_0): @@ -243,6 +283,13 @@ class EncryptRequestPayload(primitives.Struct): kmip_version=kmip_version ) + if kmip_version >= enums.KMIPVersion.KMIP_1_4: + if self._auth_additional_data: + self._auth_additional_data.write( + local_stream, + kmip_version=kmip_version + ) + self.length = local_stream.length() super(EncryptRequestPayload, self).write( output_stream, @@ -261,6 +308,8 @@ class EncryptRequestPayload(primitives.Struct): return False elif self.iv_counter_nonce != other.iv_counter_nonce: return False + elif self.auth_additional_data != other.auth_additional_data: + return False else: return True else: @@ -279,7 +328,8 @@ class EncryptRequestPayload(primitives.Struct): repr(self.cryptographic_parameters) ), "data={0}".format(self.data), - "iv_counter_nonce={0}".format(self.iv_counter_nonce) + "iv_counter_nonce={0}".format(self.iv_counter_nonce), + "auth_additional_data={0}".format(self.auth_additional_data) ]) return "EncryptRequestPayload({0})".format(args) @@ -288,7 +338,8 @@ class EncryptRequestPayload(primitives.Struct): 'unique_identifier': self.unique_identifier, 'cryptographic_parameters': self.cryptographic_parameters, 'data': self.data, - 'iv_counter_nonce': self.iv_counter_nonce + 'iv_counter_nonce': self.iv_counter_nonce, + 'auth_additional_data': self.auth_additional_data }) @@ -302,12 +353,18 @@ class EncryptResponsePayload(primitives.Struct): data: The encrypted data in the form of a binary string. iv_counter_nonce: The IV/counter/nonce used with the encryption algorithm. Comes in the form of a binary string. + auth_tag: Specifies the tag that will be needed to + authenticate the decrypted data. Only returned on completion + of the encryption of the last of the plaintext by an + authenticated encryption cipher. Optional, defaults to None. + Added in KMIP 1.4. """ def __init__(self, unique_identifier=None, data=None, - iv_counter_nonce=None): + iv_counter_nonce=None, + auth_tag=None): """ Construct an Encrypt response payload struct. @@ -321,6 +378,11 @@ class EncryptResponsePayload(primitives.Struct): the encryption algorithm if it was required and if this value was not originally specified by the client. Optional, defaults to None. + auth_tag (bytes): Specifies the tag that will be needed to + authenticate the decrypted data. Only returned on completion + of the encryption of the last of the plaintext by an + authenticated encryption cipher. Optional, defaults to None. + Added in KMIP 1.4. """ super(EncryptResponsePayload, self).__init__( enums.Tags.RESPONSE_PAYLOAD @@ -329,10 +391,12 @@ class EncryptResponsePayload(primitives.Struct): self._unique_identifier = None self._data = None self._iv_counter_nonce = None + self._auth_tag = None self.unique_identifier = unique_identifier self.data = data self.iv_counter_nonce = iv_counter_nonce + self.auth_tag = auth_tag @property def unique_identifier(self): @@ -391,6 +455,25 @@ class EncryptResponsePayload(primitives.Struct): else: raise TypeError("IV/counter/nonce must be bytes") + @property + def auth_tag(self): + if self._auth_tag: + return self._auth_tag.value + else: + return None + + @auth_tag.setter + def auth_tag(self, value): + if value is None: + self._auth_tag = None + elif isinstance(value, six.binary_type): + self._auth_tag = primitives.ByteString( + value=value, + tag=enums.Tags.AUTHENTICATED_ENCRYPTION_TAG + ) + else: + raise TypeError("authenticated encryption tag must be bytes") + def read(self, input_stream, kmip_version=enums.KMIPVersion.KMIP_1_0): """ Read the data encoding the Encrypt response payload and decode it @@ -445,6 +528,18 @@ class EncryptResponsePayload(primitives.Struct): kmip_version=kmip_version ) + if kmip_version >= enums.KMIPVersion.KMIP_1_4: + if self.is_tag_next( + enums.Tags.AUTHENTICATED_ENCRYPTION_TAG, local_stream + ): + self._auth_tag = primitives.ByteString( + tag=enums.Tags.AUTHENTICATED_ENCRYPTION_TAG + ) + self._auth_tag.read( + local_stream, + kmip_version=kmip_version + ) + self.is_oversized(local_stream) def write(self, output_stream, kmip_version=enums.KMIPVersion.KMIP_1_0): @@ -486,6 +581,13 @@ class EncryptResponsePayload(primitives.Struct): kmip_version=kmip_version ) + if kmip_version >= enums.KMIPVersion.KMIP_1_4: + if self._auth_tag: + self._auth_tag.write( + local_stream, + kmip_version=kmip_version + ) + self.length = local_stream.length() super(EncryptResponsePayload, self).write( output_stream, @@ -501,6 +603,8 @@ class EncryptResponsePayload(primitives.Struct): return False elif self.iv_counter_nonce != other.iv_counter_nonce: return False + elif self.auth_tag != other.auth_tag: + return False else: return True else: @@ -516,7 +620,8 @@ class EncryptResponsePayload(primitives.Struct): args = ", ".join([ "unique_identifier='{0}'".format(self.unique_identifier), "data={0}".format(self.data), - "iv_counter_nonce={0}".format(self.iv_counter_nonce) + "iv_counter_nonce={0}".format(self.iv_counter_nonce), + "auth_tag={0}".format(self.auth_tag) ]) return "EncryptResponsePayload({0})".format(args) @@ -524,5 +629,6 @@ class EncryptResponsePayload(primitives.Struct): return str({ 'unique_identifier': self.unique_identifier, 'data': self.data, - 'iv_counter_nonce': self.iv_counter_nonce + 'iv_counter_nonce': self.iv_counter_nonce, + 'auth_tag': self.auth_tag }) diff --git a/kmip/services/server/crypto/engine.py b/kmip/services/server/crypto/engine.py index 232f384..592f575 100644 --- a/kmip/services/server/crypto/engine.py +++ b/kmip/services/server/crypto/engine.py @@ -79,14 +79,13 @@ class CryptographyEngine(api.CryptographicEngine): enums.HashingAlgorithm.SHA_512: hashes.SHA512 } - # GCM is supported by cryptography but requires inputs that are not - # supported by the KMIP spec. It is excluded for now. self._modes = { enums.BlockCipherMode.CBC: modes.CBC, enums.BlockCipherMode.ECB: modes.ECB, enums.BlockCipherMode.OFB: modes.OFB, enums.BlockCipherMode.CFB: modes.CFB, - enums.BlockCipherMode.CTR: modes.CTR + enums.BlockCipherMode.CTR: modes.CTR, + enums.BlockCipherMode.GCM: modes.GCM } self._asymmetric_padding_methods = { enums.PaddingMethod.OAEP: asymmetric_padding.OAEP, @@ -294,6 +293,8 @@ class CryptographyEngine(api.CryptographicEngine): cipher_mode=None, padding_method=None, iv_nonce=None, + auth_additional_data=None, + auth_tag_length=None, hashing_algorithm=None): """ Encrypt data using symmetric or asymmetric encryption. @@ -316,6 +317,12 @@ class CryptographyEngine(api.CryptographicEngine): of the encryption algorithm. Optional, defaults to None. If required and not provided, it will be autogenerated and returned with the cipher text. + auth_additional_data (bytes): Any additional data to be + authenticated via the Authenticated Encryption Tag. + Optional, defaults to None. + auth_tag_length (int): The length of the authentication tag in + bytes. This parameter SHALL be provided when the Block + Cipher Mode is GCM. hashing_algorithm (HashingAlgorithm): An enumeration specifying the hashing algorithm to use with the encryption algorithm, if needed. Required for OAEP-based asymmetric encryption. @@ -328,6 +335,8 @@ class CryptographyEngine(api.CryptographicEngine): * iv_nonce - the bytes of the IV/counter/nonce used if it was needed by the encryption scheme and if it was automatically generated for the encryption + * auth_tag - the bytes of the authentication tag used in GCM or + CCM mode Raises: InvalidField: Raised when the algorithm is unsupported or the @@ -373,7 +382,9 @@ class CryptographyEngine(api.CryptographicEngine): plain_text, cipher_mode=cipher_mode, padding_method=padding_method, - iv_nonce=iv_nonce + iv_nonce=iv_nonce, + auth_additional_data=auth_additional_data, + auth_tag_length=auth_tag_length ) def _encrypt_symmetric( @@ -383,7 +394,9 @@ class CryptographyEngine(api.CryptographicEngine): plain_text, cipher_mode=None, padding_method=None, - iv_nonce=None): + iv_nonce=None, + auth_additional_data=None, + auth_tag_length=None): """ Encrypt data using symmetric encryption. @@ -406,6 +419,12 @@ class CryptographyEngine(api.CryptographicEngine): of the encryption algorithm. Optional, defaults to None. If required and not provided, it will be autogenerated and returned with the cipher text. + auth_additional_data (bytes): Any additional data to be + authenticated via the Authenticated Encryption Tag. + Optional, defaults to None. + auth_tag_length (int): The length of the authentication tag in + bytes. This parameter SHALL be provided when the Block Cipher + Mode is GCM. Returns: dict: A dictionary containing the encrypted data, with at least @@ -414,6 +433,8 @@ class CryptographyEngine(api.CryptographicEngine): * iv_nonce - the bytes of the IV/counter/nonce used if it was needed by the encryption scheme and if it was automatically generated for the encryption + * auth_tag - the bytes of the authentication tag used in + GCM mode Raises: InvalidField: Raised when the algorithm is unsupported or the @@ -440,6 +461,18 @@ class CryptographyEngine(api.CryptographicEngine): "Invalid key bytes for the specified encryption algorithm." ) + is_gcm_mode = cipher_mode == enums.BlockCipherMode.GCM + if not is_gcm_mode and auth_additional_data is not None: + raise exceptions.InvalidField( + 'Authenticated encryption additional data is supported ' + 'in GCM mode only.' + ) + if is_gcm_mode and auth_tag_length is None: + raise exceptions.InvalidField( + 'Authenticated encryption tag length must be provided ' + 'in GCM mode only.' + ) + # Set up the cipher mode if needed return_iv_nonce = False if encryption_algorithm == enums.CryptographicAlgorithm.RC4: @@ -459,7 +492,10 @@ class CryptographyEngine(api.CryptographicEngine): if iv_nonce is None: iv_nonce = os.urandom(algorithm.block_size // 8) return_iv_nonce = True - mode = mode(iv_nonce) + if is_gcm_mode: + mode = mode(iv_nonce, None, auth_tag_length) + else: + mode = mode(iv_nonce) else: mode = mode() @@ -477,15 +513,16 @@ class CryptographyEngine(api.CryptographicEngine): # Encrypt the plain text cipher = ciphers.Cipher(algorithm, mode, backend=default_backend()) encryptor = cipher.encryptor() + if auth_additional_data is not None: + encryptor.authenticate_additional_data(auth_additional_data) cipher_text = encryptor.update(plain_text) + encryptor.finalize() + result = {'cipher_text': cipher_text} if return_iv_nonce: - return { - 'cipher_text': cipher_text, - 'iv_nonce': iv_nonce - } - else: - return {'cipher_text': cipher_text} + result['iv_nonce'] = iv_nonce + if is_gcm_mode: + result['auth_tag'] = encryptor.tag + return result def _encrypt_asymmetric(self, encryption_algorithm, @@ -612,6 +649,8 @@ class CryptographyEngine(api.CryptographicEngine): cipher_mode=None, padding_method=None, iv_nonce=None, + auth_additional_data=None, + auth_tag=None, hashing_algorithm=None): """ Decrypt data using symmetric decryption. @@ -633,6 +672,14 @@ class CryptographyEngine(api.CryptographicEngine): Optional otherwise, defaults to None. iv_nonce (bytes): The IV/nonce value to use to initialize the mode of the decryption algorithm. Optional, defaults to None. + auth_additional_data (bytes): Any additional data to be + authenticated via the Authenticated Encryption Tag. + Added in KMIP 1.4. + auth_tag (bytes): Specifies the tag that will be needed to + authenticate the decrypted data. Only returned on completion + of the encryption of the last of the plaintext by an + authenticated encryption cipher. Optional, defaults to None. + Added in KMIP 1.4. hashing_algorithm (HashingAlgorithm): An enumeration specifying the hashing algorithm to use with the decryption algorithm, if needed. Required for OAEP-based asymmetric decryption. @@ -687,7 +734,9 @@ class CryptographyEngine(api.CryptographicEngine): cipher_text, cipher_mode=cipher_mode, padding_method=padding_method, - iv_nonce=iv_nonce + iv_nonce=iv_nonce, + auth_additional_data=auth_additional_data, + auth_tag=auth_tag ) def _decrypt_symmetric( @@ -697,7 +746,9 @@ class CryptographyEngine(api.CryptographicEngine): cipher_text, cipher_mode=None, padding_method=None, - iv_nonce=None): + iv_nonce=None, + auth_additional_data=None, + auth_tag=None): """ Decrypt data using symmetric decryption. @@ -718,6 +769,14 @@ class CryptographyEngine(api.CryptographicEngine): Optional otherwise, defaults to None. iv_nonce (bytes): The IV/nonce value to use to initialize the mode of the decryption algorithm. Optional, defaults to None. + auth_additional_data (bytes): Any additional data to be + authenticated via the Authenticated Encryption Tag. + Added in KMIP 1.4. + auth_tag (bytes): Specifies the tag that will be needed to + authenticate the decrypted data. Only returned on completion + of the encryption of the last of the plaintext by an + authenticated encryption cipher. Optional, defaults to None. + Added in KMIP 1.4. Returns: bytes: the bytes of the decrypted data @@ -746,6 +805,16 @@ class CryptographyEngine(api.CryptographicEngine): "Invalid key bytes for the specified decryption algorithm." ) + is_gcm_mode = cipher_mode == enums.BlockCipherMode.GCM + if auth_additional_data is not None and not is_gcm_mode: + raise exceptions.InvalidField( + 'Additional data is supported in GCM mode only.' + ) + if is_gcm_mode and auth_tag is None: + raise exceptions.InvalidField( + 'Authenticated tag must be provided in GCM mode.' + ) + # Set up the cipher mode if needed if decryption_algorithm == enums.CryptographicAlgorithm.RC4: mode = None @@ -765,13 +834,18 @@ class CryptographyEngine(api.CryptographicEngine): raise exceptions.InvalidField( "IV/nonce is required." ) - mode = mode(iv_nonce) + if is_gcm_mode: + mode = mode(iv_nonce, auth_tag) + else: + mode = mode(iv_nonce) else: mode = mode() # Decrypt the plain text cipher = ciphers.Cipher(algorithm, mode, backend=default_backend()) decryptor = cipher.decryptor() + if auth_additional_data is not None: + decryptor.authenticate_additional_data(auth_additional_data) plain_text = decryptor.update(cipher_text) + decryptor.finalize() # Unpad the plain text if needed (separate methods for testing diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index 4fd3f5c..22607f4 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -2321,13 +2321,16 @@ class KmipEngine(object): payload.data, cipher_mode=cryptographic_parameters.block_cipher_mode, padding_method=cryptographic_parameters.padding_method, - iv_nonce=payload.iv_counter_nonce + iv_nonce=payload.iv_counter_nonce, + auth_additional_data=payload.auth_additional_data, + auth_tag_length=cryptographic_parameters.tag_length ) response_payload = payloads.EncryptResponsePayload( unique_identifier, result.get('cipher_text'), - result.get('iv_nonce') + result.get('iv_nonce'), + result.get('auth_tag') ) return response_payload @@ -2384,7 +2387,9 @@ class KmipEngine(object): payload.data, cipher_mode=cryptographic_parameters.block_cipher_mode, padding_method=cryptographic_parameters.padding_method, - iv_nonce=payload.iv_counter_nonce + iv_nonce=payload.iv_counter_nonce, + auth_additional_data=payload.auth_additional_data, + auth_tag=payload.auth_tag ) response_payload = payloads.DecryptResponsePayload( diff --git a/kmip/tests/unit/core/messages/payloads/test_decrypt.py b/kmip/tests/unit/core/messages/payloads/test_decrypt.py index 0b85286..20b9747 100644 --- a/kmip/tests/unit/core/messages/payloads/test_decrypt.py +++ b/kmip/tests/unit/core/messages/payloads/test_decrypt.py @@ -53,9 +53,11 @@ class TestDecryptRequestPayload(testtools.TestCase): # Initial Counter Value - 1 # Data - 0x0123456789ABCDEF # IV/Counter/Nonce - 0x01 + # Authenticated Encryption Additional Data - 0x011080ff + # Authenticated Encryption Tag - 0x0190FE self.full_encoding = utils.BytearrayStream( - b'\x42\x00\x79\x01\x00\x00\x01\x28' + b'\x42\x00\x79\x01\x00\x00\x01\x48' b'\x42\x00\x94\x07\x00\x00\x00\x24\x62\x34\x66\x61\x65\x65\x31\x30' b'\x2D\x61\x61\x32\x61\x2D\x34\x34\x34\x36\x2D\x38\x61\x64\x34\x2D' b'\x30\x38\x38\x31\x66\x33\x34\x32\x32\x39\x35\x39\x00\x00\x00\x00' @@ -75,6 +77,8 @@ class TestDecryptRequestPayload(testtools.TestCase): b'\x42\x00\xD1\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' b'\x42\x00\xC2\x08\x00\x00\x00\x08\x01\x23\x45\x67\x89\xAB\xCD\xEF' b'\x42\x00\x3D\x08\x00\x00\x00\x01\x01\x00\x00\x00\x00\x00\x00\x00' + b'\x42\x00\xFE\x08\x00\x00\x00\x04\x01\x10\x80\xFF\x00\x00\x00\x00' + b'\x42\x00\xFF\x08\x00\x00\x00\x03\x01\x90\xFE\x00\x00\x00\x00\x00' ) # Adapted from the full encoding above. This encoding matches the @@ -104,6 +108,8 @@ class TestDecryptRequestPayload(testtools.TestCase): self.assertEqual(None, payload.cryptographic_parameters) self.assertEqual(None, payload.data) self.assertEqual(None, payload.iv_counter_nonce) + self.assertEqual(None, payload.auth_additional_data) + self.assertEqual(None, payload.auth_tag) def test_init_with_args(self): """ @@ -114,7 +120,9 @@ class TestDecryptRequestPayload(testtools.TestCase): unique_identifier='00000000-1111-2222-3333-444444444444', cryptographic_parameters=attributes.CryptographicParameters(), data=b'\x01\x02\x03', - iv_counter_nonce=b'\x01' + iv_counter_nonce=b'\x01', + auth_additional_data=b'\x01\x10\x80\xFF', + auth_tag=b'\x01\x90\xFE' ) self.assertEqual( @@ -127,6 +135,8 @@ class TestDecryptRequestPayload(testtools.TestCase): ) self.assertEqual(b'\x01\x02\x03', payload.data) self.assertEqual(b'\x01', payload.iv_counter_nonce) + self.assertEqual(b'\x01\x10\x80\xFF', payload.auth_additional_data) + self.assertEqual(b'\x01\x90\xFE', payload.auth_tag) def test_invalid_unique_identifier(self): """ @@ -185,6 +195,34 @@ class TestDecryptRequestPayload(testtools.TestCase): *args ) + def test_invalid_auth_additional_data(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the authenticated additional data of an Decrypt request payload. + """ + payload = payloads.DecryptRequestPayload() + args = (payload, 'auth_additional_data', 0) + self.assertRaisesRegex( + TypeError, + "authenticated additional data must be bytes", + setattr, + *args + ) + + def test_invalid_auth_tag(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the authenticated encryption tag of an Decrypt request payload. + """ + payload = payloads.DecryptRequestPayload() + args = (payload, 'auth_tag', 0) + self.assertRaisesRegex( + TypeError, + "authenticated encryption tag must be bytes", + setattr, + *args + ) + def test_read(self): """ Test that a Decrypt request payload can be read from a data stream. @@ -195,8 +233,10 @@ class TestDecryptRequestPayload(testtools.TestCase): self.assertEqual(None, payload.cryptographic_parameters) self.assertEqual(None, payload.data) self.assertEqual(None, payload.iv_counter_nonce) + self.assertEqual(None, payload.auth_additional_data) + self.assertEqual(None, payload.auth_tag) - payload.read(self.full_encoding) + payload.read(self.full_encoding, enums.KMIPVersion.KMIP_1_4) self.assertEqual( 'b4faee10-aa2a-4446-8ad4-0881f3422959', @@ -245,6 +285,8 @@ class TestDecryptRequestPayload(testtools.TestCase): ) self.assertEqual(b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', payload.data) self.assertEqual(b'\x01', payload.iv_counter_nonce) + self.assertEqual(b'\x01\x10\x80\xFF', payload.auth_additional_data) + self.assertEqual(b'\x01\x90\xFE', payload.auth_tag) def test_read_partial(self): """ @@ -302,10 +344,12 @@ class TestDecryptRequestPayload(testtools.TestCase): initial_counter_value=1 ), data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', - iv_counter_nonce=b'\x01' + iv_counter_nonce=b'\x01', + auth_additional_data=b'\x01\x10\x80\xFF', + auth_tag=b'\x01\x90\xFE' ) stream = utils.BytearrayStream() - payload.write(stream) + payload.write(stream, enums.KMIPVersion.KMIP_1_4) self.assertEqual(len(self.full_encoding), len(stream)) self.assertEqual(str(self.full_encoding), str(stream)) @@ -369,7 +413,9 @@ class TestDecryptRequestPayload(testtools.TestCase): initial_counter_value=1 ), data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', - iv_counter_nonce=b'\x01' + iv_counter_nonce=b'\x01', + auth_additional_data=b'\x01\x10\x80\xFF', + auth_tag=b'\x01\x90\xFE' ) b = payloads.DecryptRequestPayload( unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', @@ -390,7 +436,9 @@ class TestDecryptRequestPayload(testtools.TestCase): initial_counter_value=1 ), data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', - iv_counter_nonce=b'\x01' + iv_counter_nonce=b'\x01', + auth_additional_data=b'\x01\x10\x80\xFF', + auth_tag=b'\x01\x90\xFE' ) self.assertTrue(a == b) @@ -452,6 +500,30 @@ class TestDecryptRequestPayload(testtools.TestCase): self.assertFalse(a == b) self.assertFalse(b == a) + def test_equal_on_not_equal_auth_additional_data(self): + """ + Test that the equality operator returns False when comparing two + Decrypt request payloads with different authenticated additional data + values. + """ + a = payloads.DecryptRequestPayload(auth_additional_data=b'\x22') + b = payloads.DecryptRequestPayload(auth_additional_data=b'\xAA') + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_auth_tag(self): + """ + Test that the equality operator returns False when comparing two + Decrypt request payloads with different authenticated encryption tag + values. + """ + a = payloads.DecryptRequestPayload(auth_tag=b'\x22') + b = payloads.DecryptRequestPayload(auth_tag=b'\xAA') + + self.assertFalse(a == b) + self.assertFalse(b == a) + def test_equal_on_type_mismatch(self): """ Test that the equality operator returns False when comparing two @@ -493,7 +565,9 @@ class TestDecryptRequestPayload(testtools.TestCase): initial_counter_value=1 ), data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', - iv_counter_nonce=b'\x01' + iv_counter_nonce=b'\x01', + auth_additional_data=b'\x01\x10\x80\xFF', + auth_tag=b'\x01\x90\xFE' ) b = payloads.DecryptRequestPayload( unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', @@ -514,7 +588,9 @@ class TestDecryptRequestPayload(testtools.TestCase): initial_counter_value=1 ), data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', - iv_counter_nonce=b'\x01' + iv_counter_nonce=b'\x01', + auth_additional_data=b'\x01\x10\x80\xFF', + auth_tag=b'\x01\x90\xFE' ) self.assertFalse(a != b) @@ -576,6 +652,30 @@ class TestDecryptRequestPayload(testtools.TestCase): self.assertTrue(a != b) self.assertTrue(b != a) + def test_not_equal_on_not_equal_auth_additional_data(self): + """ + Test that the inequality operator returns True when comparing two + Decrypt request payloads with different authenticated additional data + values. + """ + a = payloads.DecryptRequestPayload(auth_additional_data=b'\x22') + b = payloads.DecryptRequestPayload(auth_additional_data=b'\xAA') + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_auth_tag(self): + """ + Test that the inequality operator returns True when comparing two + Decrypt request payloads with different authenticated encryption tag + values. + """ + a = payloads.DecryptRequestPayload(auth_tag=b'\x22') + b = payloads.DecryptRequestPayload(auth_tag=b'\xAA') + + self.assertTrue(a != b) + self.assertTrue(b != a) + def test_not_equal_on_type_mismatch(self): """ Test that the inequality operator returns True when comparing two @@ -610,7 +710,9 @@ class TestDecryptRequestPayload(testtools.TestCase): initial_counter_value=1 ), data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', - iv_counter_nonce=b'\x01' + iv_counter_nonce=b'\x01', + auth_additional_data=b'\x01\x10\x80\xFF', + auth_tag=b'\x01\x90\xFE' ) expected = ( "DecryptRequestPayload(" @@ -631,7 +733,9 @@ class TestDecryptRequestPayload(testtools.TestCase): "counter_length=0, " "initial_counter_value=1), " "data=" + str(b'\x01\x23\x45\x67\x89\xAB\xCD\xEF') + ", " - "iv_counter_nonce=" + str(b'\x01') + ")" + "iv_counter_nonce=" + str(b'\x01') + ", " + "auth_additional_data=" + str(b'\x01\x10\x80\xFF') + ", " + "auth_tag=" + str(b'\x01\x90\xFE') + ")" ) observed = repr(payload) @@ -661,14 +765,18 @@ class TestDecryptRequestPayload(testtools.TestCase): unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', cryptographic_parameters=cryptographic_parameters, data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', - iv_counter_nonce=b'\x01' + iv_counter_nonce=b'\x01', + auth_additional_data=b'\x01\x10\x80\xFF', + auth_tag=b'\x01\x90\xFE' ) expected = str({ 'unique_identifier': 'b4faee10-aa2a-4446-8ad4-0881f3422959', 'cryptographic_parameters': cryptographic_parameters, 'data': b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', - 'iv_counter_nonce': b'\x01' + 'iv_counter_nonce': b'\x01', + 'auth_additional_data': b'\x01\x10\x80\xFF', + 'auth_tag': b'\x01\x90\xFE' }) observed = str(payload) diff --git a/kmip/tests/unit/core/messages/payloads/test_encrypt.py b/kmip/tests/unit/core/messages/payloads/test_encrypt.py index fe275e5..82437a3 100644 --- a/kmip/tests/unit/core/messages/payloads/test_encrypt.py +++ b/kmip/tests/unit/core/messages/payloads/test_encrypt.py @@ -53,9 +53,10 @@ class TestEncryptRequestPayload(testtools.TestCase): # Initial Counter Value - 1 # Data - 0x0123456789ABCDEF # IV/Counter/Nonce - 0x01 + # Authenticated Encryption Additional Data - 0x011080ff self.full_encoding = utils.BytearrayStream( - b'\x42\x00\x79\x01\x00\x00\x01\x28' + b'\x42\x00\x79\x01\x00\x00\x01\x38' b'\x42\x00\x94\x07\x00\x00\x00\x24\x62\x34\x66\x61\x65\x65\x31\x30' b'\x2D\x61\x61\x32\x61\x2D\x34\x34\x34\x36\x2D\x38\x61\x64\x34\x2D' b'\x30\x38\x38\x31\x66\x33\x34\x32\x32\x39\x35\x39\x00\x00\x00\x00' @@ -75,6 +76,7 @@ class TestEncryptRequestPayload(testtools.TestCase): b'\x42\x00\xD1\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' b'\x42\x00\xC2\x08\x00\x00\x00\x08\x01\x23\x45\x67\x89\xAB\xCD\xEF' b'\x42\x00\x3D\x08\x00\x00\x00\x01\x01\x00\x00\x00\x00\x00\x00\x00' + b'\x42\x00\xFE\x08\x00\x00\x00\x04\x01\x10\x80\xFF\x00\x00\x00\x00' ) # Adapted from the full encoding above. This encoding matches the @@ -104,6 +106,7 @@ class TestEncryptRequestPayload(testtools.TestCase): self.assertEqual(None, payload.cryptographic_parameters) self.assertEqual(None, payload.data) self.assertEqual(None, payload.iv_counter_nonce) + self.assertEqual(None, payload.auth_additional_data) def test_init_with_args(self): """ @@ -114,7 +117,8 @@ class TestEncryptRequestPayload(testtools.TestCase): unique_identifier='00000000-1111-2222-3333-444444444444', cryptographic_parameters=attributes.CryptographicParameters(), data=b'\x01\x02\x03', - iv_counter_nonce=b'\x01' + iv_counter_nonce=b'\x01', + auth_additional_data=b'\x01\x10\x80\xFF' ) self.assertEqual( @@ -127,6 +131,7 @@ class TestEncryptRequestPayload(testtools.TestCase): ) self.assertEqual(b'\x01\x02\x03', payload.data) self.assertEqual(b'\x01', payload.iv_counter_nonce) + self.assertEqual(b'\x01\x10\x80\xFF', payload.auth_additional_data) def test_invalid_unique_identifier(self): """ @@ -185,6 +190,20 @@ class TestEncryptRequestPayload(testtools.TestCase): *args ) + def test_invalid_auth_additional_data(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the authenticated additional data of an Encrypt request payload. + """ + payload = payloads.EncryptRequestPayload() + args = (payload, 'auth_additional_data', 0) + self.assertRaisesRegex( + TypeError, + "authenticated additional data must be bytes", + setattr, + *args + ) + def test_read(self): """ Test that an Encrypt request payload can be read from a data stream. @@ -195,8 +214,9 @@ class TestEncryptRequestPayload(testtools.TestCase): self.assertEqual(None, payload.cryptographic_parameters) self.assertEqual(None, payload.data) self.assertEqual(None, payload.iv_counter_nonce) + self.assertEqual(None, payload.auth_additional_data) - payload.read(self.full_encoding) + payload.read(self.full_encoding, enums.KMIPVersion.KMIP_1_4) self.assertEqual( 'b4faee10-aa2a-4446-8ad4-0881f3422959', @@ -245,6 +265,7 @@ class TestEncryptRequestPayload(testtools.TestCase): ) self.assertEqual(b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', payload.data) self.assertEqual(b'\x01', payload.iv_counter_nonce) + self.assertEqual(b'\x01\x10\x80\xFF', payload.auth_additional_data) def test_read_partial(self): """ @@ -302,10 +323,11 @@ class TestEncryptRequestPayload(testtools.TestCase): initial_counter_value=1 ), data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', - iv_counter_nonce=b'\x01' + iv_counter_nonce=b'\x01', + auth_additional_data=b'\x01\x10\x80\xFF' ) stream = utils.BytearrayStream() - payload.write(stream) + payload.write(stream, enums.KMIPVersion.KMIP_1_4) self.assertEqual(len(self.full_encoding), len(stream)) self.assertEqual(str(self.full_encoding), str(stream)) @@ -369,7 +391,8 @@ class TestEncryptRequestPayload(testtools.TestCase): initial_counter_value=1 ), data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', - iv_counter_nonce=b'\x01' + iv_counter_nonce=b'\x01', + auth_additional_data=b'\x01\x10\x80\xFF' ) b = payloads.EncryptRequestPayload( unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', @@ -390,7 +413,8 @@ class TestEncryptRequestPayload(testtools.TestCase): initial_counter_value=1 ), data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', - iv_counter_nonce=b'\x01' + iv_counter_nonce=b'\x01', + auth_additional_data=b'\x01\x10\x80\xFF' ) self.assertTrue(a == b) @@ -452,6 +476,18 @@ class TestEncryptRequestPayload(testtools.TestCase): self.assertFalse(a == b) self.assertFalse(b == a) + def test_equal_on_not_equal_auth_additional_data(self): + """ + Test that the equality operator returns False when comparing two + Encrypt request payloads with different authenticated additional data + values. + """ + a = payloads.EncryptRequestPayload(auth_additional_data=b'\x22') + b = payloads.EncryptRequestPayload(auth_additional_data=b'\xAA') + + self.assertFalse(a == b) + self.assertFalse(b == a) + def test_equal_on_type_mismatch(self): """ Test that the equality operator returns False when comparing two @@ -576,6 +612,18 @@ class TestEncryptRequestPayload(testtools.TestCase): self.assertTrue(a != b) self.assertTrue(b != a) + def test_not_equal_on_not_equal_auth_additional_data(self): + """ + Test that the inequality operator returns True when comparing two + Encrypt request payloads with different authenticated additional data + values. + """ + a = payloads.EncryptRequestPayload(auth_additional_data=b'\x22') + b = payloads.EncryptRequestPayload(auth_additional_data=b'\xAA') + + self.assertTrue(a != b) + self.assertTrue(b != a) + def test_not_equal_on_type_mismatch(self): """ Test that the inequality operator returns True when comparing two @@ -610,7 +658,8 @@ class TestEncryptRequestPayload(testtools.TestCase): initial_counter_value=1 ), data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', - iv_counter_nonce=b'\x01' + iv_counter_nonce=b'\x01', + auth_additional_data=b'\x01\x10\x80\xFF' ) expected = ( "EncryptRequestPayload(" @@ -631,7 +680,8 @@ class TestEncryptRequestPayload(testtools.TestCase): "counter_length=0, " "initial_counter_value=1), " "data=" + str(b'\x01\x23\x45\x67\x89\xAB\xCD\xEF') + ", " - "iv_counter_nonce=" + str(b'\x01') + ")" + "iv_counter_nonce=" + str(b'\x01') + ", " + "auth_additional_data=" + str(b'\x01\x10\x80\xFF') + ")" ) observed = repr(payload) @@ -661,14 +711,16 @@ class TestEncryptRequestPayload(testtools.TestCase): unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', cryptographic_parameters=cryptographic_parameters, data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', - iv_counter_nonce=b'\x01' + iv_counter_nonce=b'\x01', + auth_additional_data=b'\x01\x10\x80\xFF' ) expected = str({ 'unique_identifier': 'b4faee10-aa2a-4446-8ad4-0881f3422959', 'cryptographic_parameters': cryptographic_parameters, 'data': b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', - 'iv_counter_nonce': b'\x01' + 'iv_counter_nonce': b'\x01', + 'auth_additional_data': b'\x01\x10\x80\xFF' }) observed = str(payload) @@ -692,14 +744,16 @@ class TestEncryptResponsePayload(testtools.TestCase): # Unique Identifier - b4faee10-aa2a-4446-8ad4-0881f3422959 # Data - 0x0123456789ABCDEF # IV/Counter/Nonce - 0x01 + # Authenticated Encryption Tag - 0x0190FE self.full_encoding = utils.BytearrayStream( - b'\x42\x00\x7C\x01\x00\x00\x00\x50' + b'\x42\x00\x7C\x01\x00\x00\x00\x60' b'\x42\x00\x94\x07\x00\x00\x00\x24\x62\x34\x66\x61\x65\x65\x31\x30' b'\x2D\x61\x61\x32\x61\x2D\x34\x34\x34\x36\x2D\x38\x61\x64\x34\x2D' b'\x30\x38\x38\x31\x66\x33\x34\x32\x32\x39\x35\x39\x00\x00\x00\x00' b'\x42\x00\xC2\x08\x00\x00\x00\x08\x01\x23\x45\x67\x89\xAB\xCD\xEF' b'\x42\x00\x3D\x08\x00\x00\x00\x01\x01\x00\x00\x00\x00\x00\x00\x00' + b'\x42\x00\xFF\x08\x00\x00\x00\x03\x01\x90\xFE\x00\x00\x00\x00\x00' ) # Adapted from the full encoding above. This encoding matches the @@ -743,6 +797,7 @@ class TestEncryptResponsePayload(testtools.TestCase): self.assertEqual(None, payload.unique_identifier) self.assertEqual(None, payload.data) self.assertEqual(None, payload.iv_counter_nonce) + self.assertEqual(None, payload.auth_tag) def test_init_with_args(self): """ @@ -752,7 +807,8 @@ class TestEncryptResponsePayload(testtools.TestCase): payload = payloads.EncryptResponsePayload( unique_identifier='00000000-1111-2222-3333-444444444444', data=b'\x01\x02\x03', - iv_counter_nonce=b'\x01' + iv_counter_nonce=b'\x01', + auth_tag=b'\x01\x90\xFE' ) self.assertEqual( @@ -761,6 +817,7 @@ class TestEncryptResponsePayload(testtools.TestCase): ) self.assertEqual(b'\x01\x02\x03', payload.data) self.assertEqual(b'\x01', payload.iv_counter_nonce) + self.assertEqual(b'\x01\x90\xFE', payload.auth_tag) def test_invalid_unique_identifier(self): """ @@ -804,6 +861,20 @@ class TestEncryptResponsePayload(testtools.TestCase): *args ) + def test_invalid_auth_tag(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the authenticated encryption tag of an Encrypt response payload. + """ + payload = payloads.EncryptResponsePayload() + args = (payload, 'auth_tag', 0) + self.assertRaisesRegex( + TypeError, + "authenticated encryption tag must be bytes", + setattr, + *args + ) + def test_read(self): """ Test that an Encrypt response payload can be read from a data stream. @@ -813,8 +884,9 @@ class TestEncryptResponsePayload(testtools.TestCase): self.assertEqual(None, payload.unique_identifier) self.assertEqual(None, payload.data) self.assertEqual(None, payload.iv_counter_nonce) + self.assertEqual(None, payload.auth_tag) - payload.read(self.full_encoding) + payload.read(self.full_encoding, enums.KMIPVersion.KMIP_1_4) self.assertEqual( 'b4faee10-aa2a-4446-8ad4-0881f3422959', @@ -822,6 +894,7 @@ class TestEncryptResponsePayload(testtools.TestCase): ) self.assertEqual(b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', payload.data) self.assertEqual(b'\x01', payload.iv_counter_nonce) + self.assertEqual(b'\x01\x90\xFE', payload.auth_tag) def test_read_partial(self): """ @@ -873,10 +946,11 @@ class TestEncryptResponsePayload(testtools.TestCase): payload = payloads.EncryptResponsePayload( unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', - iv_counter_nonce=b'\x01' + iv_counter_nonce=b'\x01', + auth_tag=b'\x01\x90\xFE' ) stream = utils.BytearrayStream() - payload.write(stream) + payload.write(stream, enums.KMIPVersion.KMIP_1_4) self.assertEqual(len(self.full_encoding), len(stream)) self.assertEqual(str(self.full_encoding), str(stream)) @@ -938,12 +1012,14 @@ class TestEncryptResponsePayload(testtools.TestCase): a = payloads.EncryptResponsePayload( unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', - iv_counter_nonce=b'\x01' + iv_counter_nonce=b'\x01', + auth_tag=b'\x01\x90\xFE' ) b = payloads.EncryptResponsePayload( unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', - iv_counter_nonce=b'\x01' + iv_counter_nonce=b'\x01', + auth_tag=b'\x01\x90\xFE' ) self.assertTrue(a == b) @@ -986,6 +1062,18 @@ class TestEncryptResponsePayload(testtools.TestCase): self.assertFalse(a == b) self.assertFalse(b == a) + def test_equal_on_not_equal_auth_tag(self): + """ + Test that the equality operator returns False when comparing two + Encrypt response payloads with different authenticated encryption tag + values. + """ + a = payloads.EncryptResponsePayload(auth_tag=b'\x22') + b = payloads.EncryptResponsePayload(auth_tag=b'\xAA') + + self.assertFalse(a == b) + self.assertFalse(b == a) + def test_equal_on_type_mismatch(self): """ Test that the equality operator returns False when comparing two @@ -1059,6 +1147,18 @@ class TestEncryptResponsePayload(testtools.TestCase): self.assertTrue(a != b) self.assertTrue(b != a) + def test_not_equal_on_not_equal_auth_tag(self): + """ + Test that the inequality operator returns True when comparing two + Encrypt response payloads with different authenticated encryption tag + values. + """ + a = payloads.EncryptResponsePayload(auth_tag=b'\x22') + b = payloads.EncryptResponsePayload(auth_tag=b'\xAA') + + self.assertTrue(a != b) + self.assertTrue(b != a) + def test_not_equal_on_type_mismatch(self): """ Test that the inequality operator returns True when comparing two @@ -1077,13 +1177,15 @@ class TestEncryptResponsePayload(testtools.TestCase): payload = payloads.EncryptResponsePayload( unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', - iv_counter_nonce=b'\x01' + iv_counter_nonce=b'\x01', + auth_tag=b'\x01\x90\xFE' ) expected = ( "EncryptResponsePayload(" "unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', " "data=" + str(b'\x01\x23\x45\x67\x89\xAB\xCD\xEF') + ", " - "iv_counter_nonce=" + str(b'\x01') + ")" + "iv_counter_nonce=" + str(b'\x01') + ", " + "auth_tag=" + str(b'\x01\x90\xFE') + ")" ) observed = repr(payload) @@ -1096,13 +1198,15 @@ class TestEncryptResponsePayload(testtools.TestCase): payload = payloads.EncryptResponsePayload( unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', - iv_counter_nonce=b'\x01' + iv_counter_nonce=b'\x01', + auth_tag=b'\x01\x90\xFE' ) expected = str({ 'unique_identifier': 'b4faee10-aa2a-4446-8ad4-0881f3422959', 'data': b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', - 'iv_counter_nonce': b'\x01' + 'iv_counter_nonce': b'\x01', + 'auth_tag': b'\x01\x90\xFE' }) observed = str(payload) diff --git a/kmip/tests/unit/services/server/crypto/test_engine.py b/kmip/tests/unit/services/server/crypto/test_engine.py index af04543..9f85951 100644 --- a/kmip/tests/unit/services/server/crypto/test_engine.py +++ b/kmip/tests/unit/services/server/crypto/test_engine.py @@ -1331,6 +1331,34 @@ class TestCryptographyEngine(testtools.TestCase): b'\x6a\xcc\x04\x14\x2e\x10\x0a\x65' b'\xf5\x1b\x97\xad\xf5\x17\x2c\x41' )}, + {'algorithm': enums.CryptographicAlgorithm.AES, + 'cipher_mode': enums.BlockCipherMode.GCM, + 'key': ( + b'\x6e\xd7\x6d\x2d\x97\xc6\x9f\xd1' + b'\x33\x95\x89\x52\x39\x31\xf2\xa6' + b'\xcf\xf5\x54\xb1\x5f\x73\x8f\x21' + b'\xec\x72\xdd\x97\xa7\x33\x09\x07' + ), + 'iv_nonce': ( + b'\x85\x1e\x87\x64\x77\x6e\x67\x96' + b'\xaa\xb7\x22\xdb\xb6\x44\xac\xe8' + ), + 'plain_text': ( + b'\x62\x82\xb8\xc0\x5c\x5c\x15\x30' + b'\xb9\x7d\x48\x16\xca\x43\x47\x62' + ), + 'auth_additional_data': ( + b'\x61\x75\x74\x68\x2d\x74\x65\x73\x74' + ), + 'auth_tag_length': 16, + 'cipher_text': ( + b'\x09\x45\x1b\x14\xf9\x25\x68\x24' + b'\x55\x50\x13\xb5\x09\xe9\xd6\x63' + ), + 'auth_tag': ( + b'\xbe\x9f\xef\x78\x5f\x9e\xe6\x16' + b'\x90\xe9\x44\x59\xc1\x84\x44\x7f' + )}, {'algorithm': enums.CryptographicAlgorithm.BLOWFISH, 'cipher_mode': enums.BlockCipherMode.OFB, 'key': ( @@ -1441,7 +1469,9 @@ def test_encrypt_symmetric(symmetric_parameters): symmetric_parameters.get('key'), symmetric_parameters.get('plain_text'), cipher_mode=symmetric_parameters.get('cipher_mode'), - iv_nonce=symmetric_parameters.get('iv_nonce') + iv_nonce=symmetric_parameters.get('iv_nonce'), + auth_additional_data=symmetric_parameters.get('auth_additional_data'), + auth_tag_length=symmetric_parameters.get('auth_tag_length') ) if engine._handle_symmetric_padding.called: @@ -1454,6 +1484,7 @@ def test_encrypt_symmetric(symmetric_parameters): ) assert symmetric_parameters.get('cipher_text') == result.get('cipher_text') + assert symmetric_parameters.get('auth_tag') == result.get('auth_tag') def test_decrypt_symmetric(symmetric_parameters): @@ -1472,7 +1503,9 @@ def test_decrypt_symmetric(symmetric_parameters): symmetric_parameters.get('key'), symmetric_parameters.get('cipher_text'), cipher_mode=symmetric_parameters.get('cipher_mode'), - iv_nonce=symmetric_parameters.get('iv_nonce') + iv_nonce=symmetric_parameters.get('iv_nonce'), + auth_additional_data=symmetric_parameters.get('auth_additional_data'), + auth_tag=symmetric_parameters.get('auth_tag') ) if engine._handle_symmetric_padding.called: