From 60bd56066e194f80b05f3cb27cb0fc21d83e568f Mon Sep 17 00:00:00 2001 From: Peter Hamilton Date: Tue, 27 Jun 2017 14:05:28 -0400 Subject: [PATCH] Add decryption support to the server cryptography engine This change adds decryption functionality to the cryptographic engine used by the server. It supports a variety of symmetric decryption algorithms and block cipher modes. Asymmetric decryption support will be added in a future patch. Unit tests and minor updates to surrounding code are included. --- kmip/services/server/crypto/api.py | 33 ++++ kmip/services/server/crypto/engine.py | 130 ++++++++++++++- .../services/server/crypto/test_engine.py | 149 +++++++++++++++++- 3 files changed, 307 insertions(+), 5 deletions(-) diff --git a/kmip/services/server/crypto/api.py b/kmip/services/server/crypto/api.py index 5612dde..333b8e5 100644 --- a/kmip/services/server/crypto/api.py +++ b/kmip/services/server/crypto/api.py @@ -121,3 +121,36 @@ class CryptographicEngine(object): was needed by the encryption scheme and if it was automatically generated for the encryption """ + + @abstractmethod + def decrypt(self, + decryption_algorithm, + decryption_key, + cipher_text, + cipher_mode=None, + padding_method=None, + iv_nonce=None): + """ + Decrypt data using symmetric decryption. + + Args: + decryption_algorithm (CryptographicAlgorithm): An enumeration + specifying the symmetric decryption algorithm to use for + decryption. + decryption_key (bytes): The bytes of the symmetric key to use for + decryption. + cipher_text (bytes): The bytes to be decrypted. + cipher_mode (BlockCipherMode): An enumeration specifying the + block cipher mode to use with the decryption algorithm. + Required in the general case. Optional if the decryption + algorithm is RC4 (aka ARC4). If optional, defaults to None. + padding_method (PaddingMethod): An enumeration specifying the + padding method to use on the data after decryption. Required + if the cipher mode is for block ciphers (e.g., CBC, ECB). + Optional otherwise, defaults to None. + iv_nonce (bytes): The IV/nonce value to use to initialize the mode + of the decryption algorithm. Optional, defaults to None. + + Returns: + bytes: the bytes of the decrypted data + """ diff --git a/kmip/services/server/crypto/engine.py b/kmip/services/server/crypto/engine.py index a91b44e..d516e08 100644 --- a/kmip/services/server/crypto/engine.py +++ b/kmip/services/server/crypto/engine.py @@ -402,7 +402,8 @@ class CryptographyEngine(api.CryptographicEngine): def _handle_symmetric_padding(self, algorithm, plain_text, - padding_method): + padding_method, + undo_padding=False): # KMIP 1.3 test TC-STREAM-ENC-2-13.xml demonstrates a case # where an encrypt call for 3DES-ECB does not use padding if # the plaintext fits the blocksize of the algorithm. This does @@ -414,7 +415,10 @@ class CryptographyEngine(api.CryptographicEngine): padding_method = self._symmetric_padding_methods.get( padding_method ) - padder = padding_method(algorithm.block_size).padder() + if undo_padding: + padder = padding_method(algorithm.block_size).unpadder() + else: + padder = padding_method(algorithm.block_size).padder() plain_text = padder.update(plain_text) plain_text += padder.finalize() else: @@ -430,6 +434,128 @@ class CryptographyEngine(api.CryptographicEngine): ) return plain_text + def decrypt(self, + decryption_algorithm, + decryption_key, + cipher_text, + cipher_mode=None, + padding_method=None, + iv_nonce=None): + """ + Decrypt data using symmetric decryption. + + Args: + decryption_algorithm (CryptographicAlgorithm): An enumeration + specifying the symmetric decryption algorithm to use for + decryption. + decryption_key (bytes): The bytes of the symmetric key to use for + decryption. + cipher_text (bytes): The bytes to be decrypted. + cipher_mode (BlockCipherMode): An enumeration specifying the + block cipher mode to use with the decryption algorithm. + Required in the general case. Optional if the decryption + algorithm is RC4 (aka ARC4). If optional, defaults to None. + padding_method (PaddingMethod): An enumeration specifying the + padding method to use on the data after decryption. Required + if the cipher mode is for block ciphers (e.g., CBC, ECB). + Optional otherwise, defaults to None. + iv_nonce (bytes): The IV/nonce value to use to initialize the mode + of the decryption algorithm. Optional, defaults to None. + + Returns: + bytes: the bytes of the decrypted data + + Raises: + InvalidField: Raised when the algorithm is unsupported or the + length is incompatible with the algorithm. + CryptographicFailure: Raised when the key generation process + fails. + + Example: + >>> engine = CryptographyEngine() + >>> result = engine.decrypt( + ... decryption_algorithm=CryptographicAlgorithm.AES, + ... decryption_key=( + ... b'\xF3\x96\xE7\x1C\xCF\xCD\xEC\x1F' + ... b'\xFC\xE2\x8E\xA6\xF8\x74\x28\xB0' + ... ), + ... cipher_text=( + ... b'\x18\x5B\xB9\x79\x1B\x4C\xD1\x8F' + ... b'\x9A\xA0\x65\x02\x62\xA3\x3D\x63' + ... ), + ... cipher_mode=BlockCipherMode.CBC, + ... padding_method=PaddingMethod.ANSI_X923, + ... iv_nonce=( + ... b'\x38\x71\x41\x05\xC4\x86\x03\xD9' + ... b'\x3D\xEF\xDF\xB8\x6B\x65\x9A\xA2' + ... ) + ... ) + >>> result + b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f' + """ + + # Set up the algorithm + if decryption_algorithm is None: + raise exceptions.InvalidField("Decryption algorithm is required.") + algorithm = self._symmetric_key_algorithms.get( + decryption_algorithm, + None + ) + if algorithm is None: + raise exceptions.InvalidField( + "Decryption algorithm '{0}' is not a supported symmetric " + "decryption algorithm.".format(decryption_algorithm) + ) + try: + algorithm = algorithm(decryption_key) + except Exception as e: + self.logger.exception(e) + raise exceptions.CryptographicFailure( + "Invalid key bytes for the specified decryption algorithm." + ) + + # Set up the cipher mode if needed + if decryption_algorithm == enums.CryptographicAlgorithm.RC4: + mode = None + else: + if cipher_mode is None: + raise exceptions.InvalidField("Cipher mode is required.") + mode = self._modes.get(cipher_mode, None) + if mode is None: + raise exceptions.InvalidField( + "Cipher mode '{0}' is not a supported mode.".format( + cipher_mode + ) + ) + if hasattr(mode, 'initialization_vector') or \ + hasattr(mode, 'nonce'): + if iv_nonce is None: + raise exceptions.InvalidField( + "IV/nonce is required." + ) + mode = mode(iv_nonce) + else: + mode = mode() + + # Decrypt the plain text + cipher = ciphers.Cipher(algorithm, mode, backend=default_backend()) + decryptor = cipher.decryptor() + plain_text = decryptor.update(cipher_text) + decryptor.finalize() + + # Unpad the plain text if needed (separate methods for testing + # purposes) + if cipher_mode in [ + enums.BlockCipherMode.CBC, + enums.BlockCipherMode.ECB + ]: + plain_text = self._handle_symmetric_padding( + self._symmetric_key_algorithms.get(decryption_algorithm), + plain_text, + padding_method + ) + + return plain_text + def _create_rsa_key_pair(self, length, public_exponent=65537): """ Create an RSA key pair. diff --git a/kmip/tests/unit/services/server/crypto/test_engine.py b/kmip/tests/unit/services/server/crypto/test_engine.py index a7072e3..c61cd8d 100644 --- a/kmip/tests/unit/services/server/crypto/test_engine.py +++ b/kmip/tests/unit/services/server/crypto/test_engine.py @@ -379,6 +379,101 @@ class TestCryptographyEngine(testtools.TestCase): self.assertNotIn('iv_nonce', result.keys()) + def test_decrypt_invalid_algorithm(self): + """ + Test that the right errors are raised when invalid decryption + algorithms are used. + """ + engine = crypto.CryptographyEngine() + + args = (None, b'', b'') + self.assertRaisesRegexp( + exceptions.InvalidField, + "Decryption algorithm is required.", + engine.decrypt, + *args + ) + + args = ('invalid', b'', b'') + self.assertRaisesRegexp( + exceptions.InvalidField, + "Decryption algorithm 'invalid' is not a supported symmetric " + "decryption algorithm.", + engine.decrypt, + *args + ) + + def test_decrypt_invalid_algorithm_key(self): + """ + Test that the right error is raised when an invalid key is used with + a decryption algorithm. + """ + engine = crypto.CryptographyEngine() + + args = (enums.CryptographicAlgorithm.AES, b'', b'') + self.assertRaisesRegexp( + exceptions.CryptographicFailure, + "Invalid key bytes for the specified decryption algorithm.", + engine.decrypt, + *args + ) + + def test_decrypt_invalid_cipher_mode(self): + """ + Test that the right errors are raised when invalid cipher modes are + used. + """ + engine = crypto.CryptographyEngine() + + args = ( + enums.CryptographicAlgorithm.AES, + b'\x00\x01\x02\x03\x04\x05\x06\x07' + b'\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F', + b'\x0F\x0E\x0D\x0C\x0B\x0A\x09\x08' + b'\x07\x06\x05\x04\x03\x02\x01\x00' + ) + self.assertRaisesRegexp( + exceptions.InvalidField, + "Cipher mode is required.", + engine.decrypt, + *args + ) + + kwargs = {'cipher_mode': 'invalid'} + self.assertRaisesRegexp( + exceptions.InvalidField, + "Cipher mode 'invalid' is not a supported mode.", + engine.decrypt, + *args, + **kwargs + ) + + def test_decrypt_missing_iv_nonce(self): + """ + Test that the right error is raised when an IV/nonce is not provided + for the decryption algorithm. + """ + engine = crypto.CryptographyEngine() + + args = ( + enums.CryptographicAlgorithm.AES, + b'\x00\x01\x02\x03\x04\x05\x06\x07' + b'\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F', + b'\x0F\x0E\x0D\x0C\x0B\x0A\x09\x08' + b'\x07\x06\x05\x04\x03\x02\x01\x00' + ) + kwargs = { + 'cipher_mode': enums.BlockCipherMode.CBC, + 'padding_method': enums.PaddingMethod.PKCS5 + } + self.assertRaisesRegexp( + exceptions.InvalidField, + "IV/nonce is required.", + engine.decrypt, + *args, + **kwargs + ) + def test_handle_symmetric_padding_invalid(self): """ Test that the right errors are raised when invalid padding methods @@ -611,20 +706,51 @@ def test_encrypt(encrypt_parameters): assert encrypt_parameters.get('cipher_text') == result.get('cipher_text') +def test_decrypt(encrypt_parameters): + """ + Test that various decryption algorithms and block cipher modes can be + used to correctly decrypt data. + """ + engine = crypto.CryptographyEngine() + + engine._handle_symmetric_padding = mock.MagicMock( + return_value=encrypt_parameters.get('plain_text') + ) + + result = engine.decrypt( + encrypt_parameters.get('algorithm'), + encrypt_parameters.get('key'), + encrypt_parameters.get('cipher_text'), + cipher_mode=encrypt_parameters.get('cipher_mode'), + iv_nonce=encrypt_parameters.get('iv_nonce') + ) + + if engine._handle_symmetric_padding.called: + engine._handle_symmetric_padding.assert_called_once_with( + engine._symmetric_key_algorithms.get( + encrypt_parameters.get('algorithm') + ), + encrypt_parameters.get('plain_text'), + None + ) + + assert encrypt_parameters.get('plain_text') == result + + @pytest.fixture( scope='function', params=[ {'algorithm': algorithms.AES, 'plain_text': b'\x48\x65\x6C\x6C\x6F', 'padding_method': enums.PaddingMethod.PKCS5, - 'result': ( + 'padded_text': ( b'\x48\x65\x6C\x6C\x6F\x0B\x0B\x0B' b'\x0B\x0B\x0B\x0B\x0B\x0B\x0B\x0B' )}, {'algorithm': algorithms.TripleDES, 'plain_text': b'\x48\x65\x6C\x6C\x6F', 'padding_method': enums.PaddingMethod.ANSI_X923, - 'result': b'\x48\x65\x6C\x6C\x6F\x00\x00\x03'} + 'padded_text': b'\x48\x65\x6C\x6C\x6F\x00\x00\x03'} ] ) def symmetric_padding_parameters(request): @@ -644,4 +770,21 @@ def test_handle_symmetric_padding(symmetric_padding_parameters): symmetric_padding_parameters.get('padding_method') ) - assert result == symmetric_padding_parameters.get('result') + assert result == symmetric_padding_parameters.get('padded_text') + + +def test_handle_symmetric_padding_undo(symmetric_padding_parameters): + """ + Test that data of various lengths can be unpadded correctly using + different padding schemes. + """ + engine = crypto.CryptographyEngine() + + result = engine._handle_symmetric_padding( + symmetric_padding_parameters.get('algorithm'), + symmetric_padding_parameters.get('padded_text'), + symmetric_padding_parameters.get('padding_method'), + undo_padding=True + ) + + assert result == symmetric_padding_parameters.get('plain_text')