mirror of https://github.com/OpenKMIP/PyKMIP.git
Merge pull request #289 from OpenKMIP/feat/add-encrypt
Add encryption support to the server cryptography engine
This commit is contained in:
commit
920dce73f4
|
@ -421,16 +421,16 @@ class Operation(enum.Enum):
|
|||
|
||||
|
||||
class PaddingMethod(enum.Enum):
|
||||
NONE = 0x00000001
|
||||
OAEP = 0x00000002
|
||||
PKCS5 = 0x00000003
|
||||
SSL3 = 0x00000004
|
||||
ZEROS = 0x00000005
|
||||
ANSI_X9_23 = 0x00000006
|
||||
ISO_10126 = 0x00000007
|
||||
PKCS1_V_1_5 = 0x00000008
|
||||
X9_31 = 0x00000009
|
||||
PSS = 0x0000000A
|
||||
NONE = 0x00000001
|
||||
OAEP = 0x00000002
|
||||
PKCS5 = 0x00000003
|
||||
SSL3 = 0x00000004
|
||||
ZEROS = 0x00000005
|
||||
ANSI_X923 = 0x00000006
|
||||
ISO_10126 = 0x00000007
|
||||
PKCS1v15 = 0x00000008
|
||||
X931 = 0x00000009
|
||||
PSS = 0x0000000A
|
||||
|
||||
|
||||
class Policy(enum.Enum):
|
||||
|
|
|
@ -81,3 +81,43 @@ class CryptographicEngine(object):
|
|||
Returns:
|
||||
bytes: The MAC data
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def encrypt(self,
|
||||
encryption_algorithm,
|
||||
encryption_key,
|
||||
plain_text,
|
||||
cipher_mode=None,
|
||||
padding_method=None,
|
||||
iv_nonce=None):
|
||||
"""
|
||||
Encrypt data using symmetric encryption.
|
||||
|
||||
Args:
|
||||
encryption_algorithm (CryptographicAlgorithm): An enumeration
|
||||
specifying the symmetric encryption algorithm to use for
|
||||
encryption.
|
||||
encryption_key (bytes): The bytes of the symmetric key to use for
|
||||
encryption.
|
||||
plain_text (bytes): The bytes to be encrypted.
|
||||
cipher_mode (BlockCipherMode): An enumeration specifying the
|
||||
block cipher mode to use with the encryption algorithm.
|
||||
Required in the general case. Optional if the encryption
|
||||
algorithm is RC4 (aka ARC4). If optional, defaults to None.
|
||||
padding_method (PaddingMethod): An enumeration specifying the
|
||||
padding method to use on the data before encryption. Required
|
||||
if the cipher mode is for block ciphers (e.g., CBC, ECB).
|
||||
Optional otherwise, defaults to None.
|
||||
iv_nonce (bytes): The IV/nonce value to use to initialize the mode
|
||||
of the encryption algorithm. Optional, defaults to None. If
|
||||
required and not provided, it will be autogenerated and
|
||||
returned with the cipher text.
|
||||
|
||||
Returns:
|
||||
dict: A dictionary containing the encrypted data, with at least
|
||||
the following key/value fields:
|
||||
* cipher_text - the bytes of the encrypted data
|
||||
* iv_nonce - the bytes of the IV/counter/nonce used if it
|
||||
was needed by the encryption scheme and if it was
|
||||
automatically generated for the encryption
|
||||
"""
|
||||
|
|
|
@ -18,8 +18,12 @@ import os
|
|||
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import serialization, hashes, hmac, cmac
|
||||
from cryptography.hazmat.primitives import padding as symmetric_padding
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||
from cryptography.hazmat.primitives.ciphers import algorithms
|
||||
from cryptography.hazmat.primitives.asymmetric import padding as \
|
||||
asymmetric_padding
|
||||
from cryptography.hazmat.primitives import ciphers
|
||||
from cryptography.hazmat.primitives.ciphers import algorithms, modes
|
||||
|
||||
from kmip.core import enums
|
||||
from kmip.core import exceptions
|
||||
|
@ -38,16 +42,18 @@ class CryptographyEngine(api.CryptographicEngine):
|
|||
"""
|
||||
self.logger = logging.getLogger('kmip.server.engine.cryptography')
|
||||
|
||||
# The IDEA algorithm is supported by cryptography but may not be
|
||||
# supported by certain backends, like OpenSSL.
|
||||
self._symmetric_key_algorithms = {
|
||||
enums.CryptographicAlgorithm.TRIPLE_DES: algorithms.TripleDES,
|
||||
enums.CryptographicAlgorithm.AES: algorithms.AES,
|
||||
enums.CryptographicAlgorithm.BLOWFISH: algorithms.Blowfish,
|
||||
enums.CryptographicAlgorithm.CAMELLIA: algorithms.Camellia,
|
||||
enums.CryptographicAlgorithm.CAST5: algorithms.CAST5,
|
||||
enums.CryptographicAlgorithm.IDEA: algorithms.IDEA,
|
||||
enums.CryptographicAlgorithm.RC4: algorithms.ARC4
|
||||
enums.CryptographicAlgorithm.AES: algorithms.AES,
|
||||
enums.CryptographicAlgorithm.BLOWFISH: algorithms.Blowfish,
|
||||
enums.CryptographicAlgorithm.CAMELLIA: algorithms.Camellia,
|
||||
enums.CryptographicAlgorithm.CAST5: algorithms.CAST5,
|
||||
enums.CryptographicAlgorithm.IDEA: algorithms.IDEA,
|
||||
enums.CryptographicAlgorithm.RC4: algorithms.ARC4
|
||||
}
|
||||
self._asymetric_key_algorithms = {
|
||||
self._asymmetric_key_algorithms = {
|
||||
enums.CryptographicAlgorithm.RSA: self._create_rsa_key_pair
|
||||
}
|
||||
self._hash_algorithms = {
|
||||
|
@ -59,6 +65,43 @@ class CryptographyEngine(api.CryptographicEngine):
|
|||
enums.CryptographicAlgorithm.HMAC_MD5: hashes.MD5
|
||||
}
|
||||
|
||||
# TODO(peter-hamilton): Consider merging above hash dict and this one
|
||||
self._encryption_hash_algorithms = {
|
||||
enums.HashingAlgorithm.MD5: hashes.MD5,
|
||||
enums.HashingAlgorithm.SHA_1: hashes.SHA1,
|
||||
enums.HashingAlgorithm.SHA_224: hashes.SHA224,
|
||||
enums.HashingAlgorithm.SHA_256: hashes.SHA256,
|
||||
enums.HashingAlgorithm.SHA_384: hashes.SHA384,
|
||||
enums.HashingAlgorithm.SHA_512: hashes.SHA512
|
||||
}
|
||||
|
||||
# GCM is supported by cryptography but requires inputs that are not
|
||||
# supported by the KMIP spec. It is excluded for now.
|
||||
self._modes = {
|
||||
enums.BlockCipherMode.CBC: modes.CBC,
|
||||
enums.BlockCipherMode.ECB: modes.ECB,
|
||||
enums.BlockCipherMode.OFB: modes.OFB,
|
||||
enums.BlockCipherMode.CFB: modes.CFB,
|
||||
enums.BlockCipherMode.CTR: modes.CTR
|
||||
}
|
||||
self._asymmetric_padding_methods = {
|
||||
enums.PaddingMethod.OAEP: asymmetric_padding.OAEP,
|
||||
enums.PaddingMethod.PKCS1v15: asymmetric_padding.PKCS1v15
|
||||
}
|
||||
self._symmetric_padding_methods = {
|
||||
enums.PaddingMethod.ANSI_X923: symmetric_padding.ANSIX923,
|
||||
enums.PaddingMethod.PKCS5: symmetric_padding.PKCS7
|
||||
}
|
||||
self._no_mode_needed = [
|
||||
enums.CryptographicAlgorithm.RC4
|
||||
]
|
||||
self._no_padding_needed = [
|
||||
enums.BlockCipherMode.CTR,
|
||||
enums.BlockCipherMode.OFB,
|
||||
enums.BlockCipherMode.CFB,
|
||||
enums.BlockCipherMode.GCM
|
||||
]
|
||||
|
||||
def create_symmetric_key(self, algorithm, length):
|
||||
"""
|
||||
Create a symmetric key.
|
||||
|
@ -147,13 +190,13 @@ class CryptographyEngine(api.CryptographicEngine):
|
|||
>>> key = engine.create_asymmetric_key(
|
||||
... CryptographicAlgorithm.RSA, 2048)
|
||||
"""
|
||||
if algorithm not in self._asymetric_key_algorithms.keys():
|
||||
if algorithm not in self._asymmetric_key_algorithms.keys():
|
||||
raise exceptions.InvalidField(
|
||||
"The cryptographic algorithm ({0}) is not a supported "
|
||||
"asymmetric key algorithm.".format(algorithm)
|
||||
)
|
||||
|
||||
engine_method = self._asymetric_key_algorithms.get(algorithm)
|
||||
engine_method = self._asymmetric_key_algorithms.get(algorithm)
|
||||
return engine_method(length)
|
||||
|
||||
def mac(self, algorithm, key, data):
|
||||
|
@ -224,6 +267,169 @@ class CryptographyEngine(api.CryptographicEngine):
|
|||
)
|
||||
return mac_data
|
||||
|
||||
def encrypt(self,
|
||||
encryption_algorithm,
|
||||
encryption_key,
|
||||
plain_text,
|
||||
cipher_mode=None,
|
||||
padding_method=None,
|
||||
iv_nonce=None):
|
||||
"""
|
||||
Encrypt data using symmetric encryption.
|
||||
|
||||
Args:
|
||||
encryption_algorithm (CryptographicAlgorithm): An enumeration
|
||||
specifying the symmetric encryption algorithm to use for
|
||||
encryption.
|
||||
encryption_key (bytes): The bytes of the symmetric key to use for
|
||||
encryption.
|
||||
plain_text (bytes): The bytes to be encrypted.
|
||||
cipher_mode (BlockCipherMode): An enumeration specifying the
|
||||
block cipher mode to use with the encryption algorithm.
|
||||
Required in the general case. Optional if the encryption
|
||||
algorithm is RC4 (aka ARC4). If optional, defaults to None.
|
||||
padding_method (PaddingMethod): An enumeration specifying the
|
||||
padding method to use on the data before encryption. Required
|
||||
if the cipher mode is for block ciphers (e.g., CBC, ECB).
|
||||
Optional otherwise, defaults to None.
|
||||
iv_nonce (bytes): The IV/nonce value to use to initialize the mode
|
||||
of the encryption algorithm. Optional, defaults to None. If
|
||||
required and not provided, it will be autogenerated and
|
||||
returned with the cipher text.
|
||||
|
||||
Returns:
|
||||
dict: A dictionary containing the encrypted data, with at least
|
||||
the following key/value fields:
|
||||
* cipher_text - the bytes of the encrypted data
|
||||
* iv_nonce - the bytes of the IV/counter/nonce used if it
|
||||
was needed by the encryption scheme and if it was
|
||||
automatically generated for the encryption
|
||||
|
||||
Raises:
|
||||
InvalidField: Raised when the algorithm is unsupported or the
|
||||
length is incompatible with the algorithm.
|
||||
CryptographicFailure: Raised when the key generation process
|
||||
fails.
|
||||
|
||||
Example:
|
||||
>>> engine = CryptographyEngine()
|
||||
>>> result = engine.encrypt(
|
||||
... encryption_algorithm=CryptographicAlgorithm.AES,
|
||||
... encryption_key=(
|
||||
... b'\xF3\x96\xE7\x1C\xCF\xCD\xEC\x1F'
|
||||
... b'\xFC\xE2\x8E\xA6\xF8\x74\x28\xB0'
|
||||
... ),
|
||||
... plain_text=(
|
||||
... b'\x00\x01\x02\x03\x04\x05\x06\x07'
|
||||
... b'\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F'
|
||||
... ),
|
||||
... cipher_mode=BlockCipherMode.CBC,
|
||||
... padding_method=PaddingMethod.ANSI_X923,
|
||||
... )
|
||||
>>> result.get('cipher_text')
|
||||
b'\x18[\xb9y\x1bL\xd1\x8f\x9a\xa0e\x02b\xa3=c'
|
||||
>>> result.iv_counter_nonce
|
||||
b'8qA\x05\xc4\x86\x03\xd9=\xef\xdf\xb8ke\x9a\xa2'
|
||||
"""
|
||||
|
||||
# Set up the algorithm
|
||||
if encryption_algorithm is None:
|
||||
raise exceptions.InvalidField("Encryption algorithm is required.")
|
||||
algorithm = self._symmetric_key_algorithms.get(
|
||||
encryption_algorithm,
|
||||
None
|
||||
)
|
||||
if algorithm is None:
|
||||
raise exceptions.InvalidField(
|
||||
"Encryption algorithm '{0}' is not a supported symmetric "
|
||||
"encryption algorithm.".format(encryption_algorithm)
|
||||
)
|
||||
try:
|
||||
algorithm = algorithm(encryption_key)
|
||||
except Exception as e:
|
||||
self.logger.exception(e)
|
||||
raise exceptions.CryptographicFailure(
|
||||
"Invalid key bytes for the specified encryption algorithm."
|
||||
)
|
||||
|
||||
# Set up the cipher mode if needed
|
||||
return_iv_nonce = False
|
||||
if encryption_algorithm == enums.CryptographicAlgorithm.RC4:
|
||||
mode = None
|
||||
else:
|
||||
if cipher_mode is None:
|
||||
raise exceptions.InvalidField("Cipher mode is required.")
|
||||
mode = self._modes.get(cipher_mode, None)
|
||||
if mode is None:
|
||||
raise exceptions.InvalidField(
|
||||
"Cipher mode '{0}' is not a supported mode.".format(
|
||||
cipher_mode
|
||||
)
|
||||
)
|
||||
if hasattr(mode, 'initialization_vector') or \
|
||||
hasattr(mode, 'nonce'):
|
||||
if iv_nonce is None:
|
||||
iv_nonce = os.urandom(algorithm.block_size // 8)
|
||||
return_iv_nonce = True
|
||||
mode = mode(iv_nonce)
|
||||
else:
|
||||
mode = mode()
|
||||
|
||||
# Pad the plain text if needed (separate methods for testing purposes)
|
||||
if cipher_mode in [
|
||||
enums.BlockCipherMode.CBC,
|
||||
enums.BlockCipherMode.ECB
|
||||
]:
|
||||
plain_text = self._handle_symmetric_padding(
|
||||
self._symmetric_key_algorithms.get(encryption_algorithm),
|
||||
plain_text,
|
||||
padding_method
|
||||
)
|
||||
|
||||
# Encrypt the plain text
|
||||
cipher = ciphers.Cipher(algorithm, mode, backend=default_backend())
|
||||
encryptor = cipher.encryptor()
|
||||
cipher_text = encryptor.update(plain_text) + encryptor.finalize()
|
||||
|
||||
if return_iv_nonce:
|
||||
return {
|
||||
'cipher_text': cipher_text,
|
||||
'iv_nonce': iv_nonce
|
||||
}
|
||||
else:
|
||||
return {'cipher_text': cipher_text}
|
||||
|
||||
def _handle_symmetric_padding(self,
|
||||
algorithm,
|
||||
plain_text,
|
||||
padding_method):
|
||||
# KMIP 1.3 test TC-STREAM-ENC-2-13.xml demonstrates a case
|
||||
# where an encrypt call for 3DES-ECB does not use padding if
|
||||
# the plaintext fits the blocksize of the algorithm. This does
|
||||
# not appear to be documented explicitly in the KMIP spec. It
|
||||
# also makes failures during unpadding after decryption
|
||||
# impossible to differentiate from cipher text/key mismatches.
|
||||
# For now, ALWAYS apply padding regardless of plain text length.
|
||||
if padding_method in self._symmetric_padding_methods.keys():
|
||||
padding_method = self._symmetric_padding_methods.get(
|
||||
padding_method
|
||||
)
|
||||
padder = padding_method(algorithm.block_size).padder()
|
||||
plain_text = padder.update(plain_text)
|
||||
plain_text += padder.finalize()
|
||||
else:
|
||||
if padding_method is None:
|
||||
raise exceptions.InvalidField(
|
||||
"Padding method is required."
|
||||
)
|
||||
else:
|
||||
raise exceptions.InvalidField(
|
||||
"Padding method '{0}' is not supported.".format(
|
||||
padding_method
|
||||
)
|
||||
)
|
||||
return plain_text
|
||||
|
||||
def _create_rsa_key_pair(self, length, public_exponent=65537):
|
||||
"""
|
||||
Create an RSA key pair.
|
||||
|
|
|
@ -87,7 +87,7 @@ class TestAttributeValueFactory(testtools.TestCase):
|
|||
"""
|
||||
value = {
|
||||
'block_cipher_mode': enums.BlockCipherMode.NIST_KEY_WRAP,
|
||||
'padding_method': enums.PaddingMethod.ANSI_X9_23,
|
||||
'padding_method': enums.PaddingMethod.ANSI_X923,
|
||||
'key_role_type': enums.KeyRoleType.KEK,
|
||||
'hashing_algorithm': enums.HashingAlgorithm.SHA_512,
|
||||
'digital_signature_algorithm':
|
||||
|
@ -116,7 +116,7 @@ class TestAttributeValueFactory(testtools.TestCase):
|
|||
cryptographic_parameters.block_cipher_mode
|
||||
)
|
||||
self.assertEqual(
|
||||
enums.PaddingMethod.ANSI_X9_23,
|
||||
enums.PaddingMethod.ANSI_X923,
|
||||
cryptographic_parameters.padding_method
|
||||
)
|
||||
self.assertEqual(
|
||||
|
|
|
@ -13,8 +13,12 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
import pytest
|
||||
import testtools
|
||||
|
||||
from cryptography.hazmat.primitives.ciphers import algorithms
|
||||
|
||||
from kmip.core import enums
|
||||
from kmip.core import exceptions
|
||||
from kmip.services.server import crypto
|
||||
|
@ -219,7 +223,6 @@ class TestCryptographyEngine(testtools.TestCase):
|
|||
)
|
||||
|
||||
def test_mac_with_cryptographic_failure(self):
|
||||
pass
|
||||
"""
|
||||
Test that an CryptographicFailure error is raised when the MAC
|
||||
process fails.
|
||||
|
@ -257,3 +260,388 @@ class TestCryptographyEngine(testtools.TestCase):
|
|||
engine.mac,
|
||||
*args
|
||||
)
|
||||
|
||||
def test_encrypt_invalid_algorithm(self):
|
||||
"""
|
||||
Test that the right errors are raised when invalid encryption
|
||||
algorithms are used.
|
||||
"""
|
||||
engine = crypto.CryptographyEngine()
|
||||
|
||||
args = (None, b'', b'')
|
||||
self.assertRaisesRegexp(
|
||||
exceptions.InvalidField,
|
||||
"Encryption algorithm is required.",
|
||||
engine.encrypt,
|
||||
*args
|
||||
)
|
||||
|
||||
args = ('invalid', b'', b'')
|
||||
self.assertRaisesRegexp(
|
||||
exceptions.InvalidField,
|
||||
"Encryption algorithm 'invalid' is not a supported symmetric "
|
||||
"encryption algorithm.",
|
||||
engine.encrypt,
|
||||
*args
|
||||
)
|
||||
|
||||
def test_encrypt_invalid_algorithm_key(self):
|
||||
"""
|
||||
Test that the right error is raised when an invalid key is used with
|
||||
an encryption algorithm.
|
||||
"""
|
||||
engine = crypto.CryptographyEngine()
|
||||
|
||||
args = (enums.CryptographicAlgorithm.AES, b'', b'')
|
||||
self.assertRaisesRegexp(
|
||||
exceptions.CryptographicFailure,
|
||||
"Invalid key bytes for the specified encryption algorithm.",
|
||||
engine.encrypt,
|
||||
*args
|
||||
)
|
||||
|
||||
def test_encrypt_no_mode_needed(self):
|
||||
"""
|
||||
Test that data can be encrypted for certain inputs without a cipher
|
||||
mode.
|
||||
"""
|
||||
engine = crypto.CryptographyEngine()
|
||||
|
||||
engine.encrypt(
|
||||
enums.CryptographicAlgorithm.RC4,
|
||||
b'\x00\x01\x02\x03\x04\x05\x06\x07',
|
||||
b'\x0F\x0E\x0D\x0C\x0B\x0A\x09\x08'
|
||||
)
|
||||
|
||||
def test_encrypt_invalid_cipher_mode(self):
|
||||
"""
|
||||
Test that the right errors are raised when invalid cipher modes are
|
||||
used.
|
||||
"""
|
||||
engine = crypto.CryptographyEngine()
|
||||
|
||||
args = (
|
||||
enums.CryptographicAlgorithm.AES,
|
||||
b'\x00\x01\x02\x03\x04\x05\x06\x07'
|
||||
b'\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F',
|
||||
b'\x0F\x0E\x0D\x0C\x0B\x0A\x09\x08'
|
||||
b'\x07\x06\x05\x04\x03\x02\x01\x00'
|
||||
)
|
||||
self.assertRaisesRegexp(
|
||||
exceptions.InvalidField,
|
||||
"Cipher mode is required.",
|
||||
engine.encrypt,
|
||||
*args
|
||||
)
|
||||
|
||||
kwargs = {'cipher_mode': 'invalid'}
|
||||
self.assertRaisesRegexp(
|
||||
exceptions.InvalidField,
|
||||
"Cipher mode 'invalid' is not a supported mode.",
|
||||
engine.encrypt,
|
||||
*args,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
def test_encrypt_generate_iv(self):
|
||||
"""
|
||||
Test that the initialization vector is correctly generated and
|
||||
returned for an appropriate set of encryption inputs.
|
||||
"""
|
||||
engine = crypto.CryptographyEngine()
|
||||
|
||||
result = engine.encrypt(
|
||||
enums.CryptographicAlgorithm.AES,
|
||||
b'\x00\x01\x02\x03\x04\x05\x06\x07'
|
||||
b'\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F',
|
||||
b'\x0F\x0E\x0D\x0C\x0B\x0A\x09\x08'
|
||||
b'\x07\x06\x05\x04\x03\x02\x01\x00',
|
||||
cipher_mode=enums.BlockCipherMode.CBC,
|
||||
padding_method=enums.PaddingMethod.PKCS5
|
||||
)
|
||||
|
||||
self.assertIn('iv_nonce', result.keys())
|
||||
self.assertIsNotNone(result.get('iv_nonce'))
|
||||
|
||||
result = engine.encrypt(
|
||||
enums.CryptographicAlgorithm.AES,
|
||||
b'\x00\x01\x02\x03\x04\x05\x06\x07'
|
||||
b'\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F',
|
||||
b'\x0F\x0E\x0D\x0C\x0B\x0A\x09\x08'
|
||||
b'\x07\x06\x05\x04\x03\x02\x01\x00',
|
||||
cipher_mode=enums.BlockCipherMode.CBC,
|
||||
padding_method=enums.PaddingMethod.PKCS5,
|
||||
iv_nonce=(
|
||||
b'\x00\x10\x20\x30\x40\x50\x60\x70'
|
||||
b'\x80\x90\xA0\xB0\xC0\xD0\xE0\xF0'
|
||||
)
|
||||
)
|
||||
|
||||
self.assertNotIn('iv_nonce', result.keys())
|
||||
|
||||
def test_handle_symmetric_padding_invalid(self):
|
||||
"""
|
||||
Test that the right errors are raised when invalid padding methods
|
||||
are used.
|
||||
"""
|
||||
engine = crypto.CryptographyEngine()
|
||||
|
||||
args = (
|
||||
algorithms.AES,
|
||||
b'\x01\x02\x03\x04',
|
||||
None
|
||||
)
|
||||
|
||||
self.assertRaisesRegexp(
|
||||
exceptions.InvalidField,
|
||||
"Padding method is required.",
|
||||
engine._handle_symmetric_padding,
|
||||
*args
|
||||
)
|
||||
|
||||
args = (
|
||||
algorithms.AES,
|
||||
b'\x01\x02\x03\x04',
|
||||
'invalid'
|
||||
)
|
||||
|
||||
self.assertRaisesRegexp(
|
||||
exceptions.InvalidField,
|
||||
"Padding method 'invalid' is not supported.",
|
||||
engine._handle_symmetric_padding,
|
||||
*args
|
||||
)
|
||||
|
||||
|
||||
# TODO(peter-hamilton): Replace this with actual fixture files from NIST CAPV.
|
||||
# Most of these test vectors were obtained from the pyca/cryptography test
|
||||
# suite.
|
||||
@pytest.fixture(
|
||||
scope='function',
|
||||
params=[
|
||||
{'algorithm': enums.CryptographicAlgorithm.TRIPLE_DES,
|
||||
'cipher_mode': enums.BlockCipherMode.ECB,
|
||||
'key': (
|
||||
b'\x01\x01\x01\x01\x01\x01\x01\x01'
|
||||
b'\x01\x01\x01\x01\x01\x01\x01\x01'
|
||||
b'\x01\x01\x01\x01\x01\x01\x01\x01'
|
||||
),
|
||||
'plain_text': (
|
||||
b'\x01\x02\x03\x04\x05\x06\x07\x08'
|
||||
),
|
||||
'cipher_text': (
|
||||
b'\xCE\xAD\x37\x3D\xB8\x0E\xAB\xF8'
|
||||
),
|
||||
'iv_nonce': None},
|
||||
{'algorithm': enums.CryptographicAlgorithm.AES,
|
||||
'cipher_mode': enums.BlockCipherMode.ECB,
|
||||
'key': (
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||
),
|
||||
'plain_text': (
|
||||
b'\xf3\x44\x81\xec\x3c\xc6\x27\xba'
|
||||
b'\xcd\x5d\xc3\xfb\x08\xf2\x73\xe6'
|
||||
),
|
||||
'cipher_text': (
|
||||
b'\x03\x36\x76\x3e\x96\x6d\x92\x59'
|
||||
b'\x5a\x56\x7c\xc9\xce\x53\x7f\x5e'
|
||||
),
|
||||
'iv_nonce': None},
|
||||
{'algorithm': enums.CryptographicAlgorithm.AES,
|
||||
'cipher_mode': enums.BlockCipherMode.CBC,
|
||||
'key': (
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||
),
|
||||
'iv_nonce': (
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||
),
|
||||
'plain_text': (
|
||||
b'\xf3\x44\x81\xec\x3c\xc6\x27\xba'
|
||||
b'\xcd\x5d\xc3\xfb\x08\xf2\x73\xe6'
|
||||
),
|
||||
'cipher_text': (
|
||||
b'\x03\x36\x76\x3e\x96\x6d\x92\x59'
|
||||
b'\x5a\x56\x7c\xc9\xce\x53\x7f\x5e'
|
||||
)},
|
||||
{'algorithm': enums.CryptographicAlgorithm.AES,
|
||||
'cipher_mode': enums.BlockCipherMode.CBC,
|
||||
'key': (
|
||||
b'\x6e\xd7\x6d\x2d\x97\xc6\x9f\xd1'
|
||||
b'\x33\x95\x89\x52\x39\x31\xf2\xa6'
|
||||
b'\xcf\xf5\x54\xb1\x5f\x73\x8f\x21'
|
||||
b'\xec\x72\xdd\x97\xa7\x33\x09\x07'
|
||||
),
|
||||
'iv_nonce': (
|
||||
b'\x85\x1e\x87\x64\x77\x6e\x67\x96'
|
||||
b'\xaa\xb7\x22\xdb\xb6\x44\xac\xe8'
|
||||
),
|
||||
'plain_text': (
|
||||
b'\x62\x82\xb8\xc0\x5c\x5c\x15\x30'
|
||||
b'\xb9\x7d\x48\x16\xca\x43\x47\x62'
|
||||
),
|
||||
'cipher_text': (
|
||||
b'\x6a\xcc\x04\x14\x2e\x10\x0a\x65'
|
||||
b'\xf5\x1b\x97\xad\xf5\x17\x2c\x41'
|
||||
)},
|
||||
{'algorithm': enums.CryptographicAlgorithm.BLOWFISH,
|
||||
'cipher_mode': enums.BlockCipherMode.OFB,
|
||||
'key': (
|
||||
b'\x01\x23\x45\x67\x89\xAB\xCD\xEF'
|
||||
b'\xF0\xE1\xD2\xC3\xB4\xA5\x96\x87'
|
||||
),
|
||||
'iv_nonce': b'\xFE\xDC\xBA\x98\x76\x54\x32\x10',
|
||||
'plain_text': (
|
||||
b'\x37\x36\x35\x34\x33\x32\x31\x20'
|
||||
b'\x4E\x6F\x77\x20\x69\x73\x20\x74'
|
||||
b'\x68\x65\x20\x74\x69\x6D\x65\x20'
|
||||
b'\x66\x6F\x72\x20\x00'
|
||||
),
|
||||
'cipher_text': (
|
||||
b'\xE7\x32\x14\xA2\x82\x21\x39\xCA'
|
||||
b'\x62\xB3\x43\xCC\x5B\x65\x58\x73'
|
||||
b'\x10\xDD\x90\x8D\x0C\x24\x1B\x22'
|
||||
b'\x63\xC2\xCF\x80\xDA'
|
||||
)},
|
||||
{'algorithm': enums.CryptographicAlgorithm.CAST5,
|
||||
'cipher_mode': enums.BlockCipherMode.CFB,
|
||||
'key': (
|
||||
b'\xb9\xba\x9f\xa3\x2c\xc4\x91\xd8'
|
||||
b'\xac\x2b\xeb\x5f\x99\x19\x3d\x57'
|
||||
),
|
||||
'iv_nonce': b'\x95\x51\x14\x52\xb7\x1e\x53\xe9',
|
||||
'plain_text': (
|
||||
b'\xb4\x03\x82\x70\x5a\xae\xea\x41'
|
||||
b'\x09\x7c\x30\x9d\xa6\xcd\x06\x01'
|
||||
b'\x0f\x15\xe0\x9c\x01\x30\xfa\x4b'
|
||||
b'\x3a\xf6\x9c\xc8\xda\x10\x9d\x1f'
|
||||
b'\x0f\x0a\x26\x61\xf1\xa8\xb8\x9b'
|
||||
b'\xab\x7e\x70\x09\xdc\xbb\x8a\x88'
|
||||
b'\x3d\x46\x25\x4a\x83\x0c\x45\xcd'
|
||||
b'\x87\x98\x1e\x0e\xa4\xe4\x90\xfa'
|
||||
),
|
||||
'cipher_text': (
|
||||
b'\x67\x74\xad\xe6\x98\x43\x92\xea'
|
||||
b'\xf6\x70\xdc\x2f\x8c\x23\x97\xe8'
|
||||
b'\x7a\xf5\xc8\x50\x32\x53\x76\xd9'
|
||||
b'\x23\x0c\xf6\x22\xd7\xf0\xa0\xfd'
|
||||
b'\x0a\x4a\x0c\x68\x56\x5c\x9e\xfd'
|
||||
b'\xaf\x58\xc2\xae\xc1\x8e\x35\x2a'
|
||||
b'\x31\x5a\x0f\x9c\xa6\xbe\xeb\x8e'
|
||||
b'\x1b\xf4\xdf\xb6\x73\x76\x8f\x0e'
|
||||
)},
|
||||
{'algorithm': enums.CryptographicAlgorithm.CAMELLIA,
|
||||
'cipher_mode': enums.BlockCipherMode.OFB,
|
||||
'key': (
|
||||
b'\x2B\x7E\x15\x16\x28\xAE\xD2\xA6'
|
||||
b'\xAB\xF7\x15\x88\x09\xCF\x4F\x3C'
|
||||
),
|
||||
'iv_nonce': (
|
||||
b'\x00\x01\x02\x03\x04\x05\x06\x07'
|
||||
b'\x08\x09\x0A\x0B\x0C\x0D\x0E\x0F'
|
||||
),
|
||||
'plain_text': (
|
||||
b'\x6B\xC1\xBE\xE2\x2E\x40\x9F\x96'
|
||||
b'\xE9\x3D\x7E\x11\x73\x93\x17\x2A'
|
||||
),
|
||||
'cipher_text': (
|
||||
b'\x14\xF7\x64\x61\x87\x81\x7E\xB5'
|
||||
b'\x86\x59\x91\x46\xB8\x2B\xD7\x19'
|
||||
)},
|
||||
{'algorithm': enums.CryptographicAlgorithm.RC4,
|
||||
'cipher_mode': None,
|
||||
'key': (
|
||||
b'\x01\x02\x03\x04\x05\x06\x07\x08'
|
||||
b'\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10'
|
||||
b'\x11\x12\x13\x14\x15\x16\x17\x18'
|
||||
b'\x19\x1a\x1b\x1c\x1d\x1e\x1f\x20'
|
||||
),
|
||||
'iv_nonce': None,
|
||||
'plain_text': (
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||
),
|
||||
'cipher_text': (
|
||||
b'\xea\xa6\xbd\x25\x88\x0b\xf9\x3d'
|
||||
b'\x3f\x5d\x1e\x4c\xa2\x61\x1d\x91'
|
||||
)},
|
||||
{'algorithm': enums.CryptographicAlgorithm.TRIPLE_DES,
|
||||
'cipher_mode': enums.BlockCipherMode.ECB,
|
||||
'key': b'\x01\x01\x01\x01\x01\x01\x01\x01',
|
||||
'plain_text': b'\x80\x00\x00\x00\x00\x00\x00\x00',
|
||||
'cipher_text': b'\x95\xF8\xA5\xE5\xDD\x31\xD9\x00',
|
||||
'iv_nonce': None}
|
||||
]
|
||||
)
|
||||
def encrypt_parameters(request):
|
||||
return request.param
|
||||
|
||||
|
||||
def test_encrypt(encrypt_parameters):
|
||||
"""
|
||||
Test that various encryption algorithms and block cipher modes can be
|
||||
used to correctly encrypt data.
|
||||
"""
|
||||
|
||||
engine = crypto.CryptographyEngine()
|
||||
|
||||
engine._handle_symmetric_padding = mock.MagicMock(
|
||||
return_value=encrypt_parameters.get('plain_text')
|
||||
)
|
||||
|
||||
result = engine.encrypt(
|
||||
encrypt_parameters.get('algorithm'),
|
||||
encrypt_parameters.get('key'),
|
||||
encrypt_parameters.get('plain_text'),
|
||||
cipher_mode=encrypt_parameters.get('cipher_mode'),
|
||||
iv_nonce=encrypt_parameters.get('iv_nonce')
|
||||
)
|
||||
|
||||
if engine._handle_symmetric_padding.called:
|
||||
engine._handle_symmetric_padding.assert_called_once_with(
|
||||
engine._symmetric_key_algorithms.get(
|
||||
encrypt_parameters.get('algorithm')
|
||||
),
|
||||
encrypt_parameters.get('plain_text'),
|
||||
None
|
||||
)
|
||||
|
||||
assert encrypt_parameters.get('cipher_text') == result.get('cipher_text')
|
||||
|
||||
|
||||
@pytest.fixture(
|
||||
scope='function',
|
||||
params=[
|
||||
{'algorithm': algorithms.AES,
|
||||
'plain_text': b'\x48\x65\x6C\x6C\x6F',
|
||||
'padding_method': enums.PaddingMethod.PKCS5,
|
||||
'result': (
|
||||
b'\x48\x65\x6C\x6C\x6F\x0B\x0B\x0B'
|
||||
b'\x0B\x0B\x0B\x0B\x0B\x0B\x0B\x0B'
|
||||
)},
|
||||
{'algorithm': algorithms.TripleDES,
|
||||
'plain_text': b'\x48\x65\x6C\x6C\x6F',
|
||||
'padding_method': enums.PaddingMethod.ANSI_X923,
|
||||
'result': b'\x48\x65\x6C\x6C\x6F\x00\x00\x03'}
|
||||
]
|
||||
)
|
||||
def symmetric_padding_parameters(request):
|
||||
return request.param
|
||||
|
||||
|
||||
def test_handle_symmetric_padding(symmetric_padding_parameters):
|
||||
"""
|
||||
Test that data of various lengths can be padded correctly using different
|
||||
padding schemes.
|
||||
"""
|
||||
engine = crypto.CryptographyEngine()
|
||||
|
||||
result = engine._handle_symmetric_padding(
|
||||
symmetric_padding_parameters.get('algorithm'),
|
||||
symmetric_padding_parameters.get('plain_text'),
|
||||
symmetric_padding_parameters.get('padding_method')
|
||||
)
|
||||
|
||||
assert result == symmetric_padding_parameters.get('result')
|
||||
|
|
Loading…
Reference in New Issue