diff --git a/kmip/core/factories/payloads/request.py b/kmip/core/factories/payloads/request.py index a76a9ed..4db2b20 100644 --- a/kmip/core/factories/payloads/request.py +++ b/kmip/core/factories/payloads/request.py @@ -20,6 +20,7 @@ from kmip.core.messages.payloads import create from kmip.core.messages.payloads import create_key_pair from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import discover_versions +from kmip.core.messages.payloads import encrypt from kmip.core.messages.payloads import get from kmip.core.messages.payloads import get_attribute_list from kmip.core.messages.payloads import get_attributes @@ -74,3 +75,6 @@ class RequestPayloadFactory(PayloadFactory): def _create_mac_payload(self): return mac.MACRequestPayload() + + def _create_encrypt_payload(self): + return encrypt.EncryptRequestPayload() diff --git a/kmip/core/factories/payloads/response.py b/kmip/core/factories/payloads/response.py index b1b8abc..c86acdc 100644 --- a/kmip/core/factories/payloads/response.py +++ b/kmip/core/factories/payloads/response.py @@ -20,6 +20,7 @@ from kmip.core.messages.payloads import create from kmip.core.messages.payloads import create_key_pair from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import discover_versions +from kmip.core.messages.payloads import encrypt from kmip.core.messages.payloads import get from kmip.core.messages.payloads import get_attribute_list from kmip.core.messages.payloads import get_attributes @@ -74,3 +75,6 @@ class ResponsePayloadFactory(PayloadFactory): def _create_mac_payload(self): return mac.MACResponsePayload() + + def _create_encrypt_payload(self): + return encrypt.EncryptResponsePayload() diff --git a/kmip/core/messages/payloads/encrypt.py b/kmip/core/messages/payloads/encrypt.py new file mode 100644 index 0000000..3ef4281 --- /dev/null +++ b/kmip/core/messages/payloads/encrypt.py @@ -0,0 +1,471 @@ +# Copyright (c) 2017 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import six + +from kmip.core import attributes +from kmip.core import enums +from kmip.core import primitives +from kmip.core import utils + + +class EncryptRequestPayload(primitives.Struct): + """ + A request payload for the Encrypt operation. + + Attributes: + unique_identifier: The unique ID of the managed object to be used for + encryption. + cryptographic_parameters: A collection of settings relevant for + the encryption operation. + data: The data to be encrypted in the form of a binary string. + iv_counter_nonce: An IV/counter/nonce to be used with the encryption + algorithm. Comes in the form of a binary string. + """ + + def __init__(self, + unique_identifier=None, + cryptographic_parameters=None, + data=None, + iv_counter_nonce=None): + """ + Construct an Encrypt request payload struct. + + Args: + unique_identifier (string): The ID of the managed object (e.g., + a symmetric key) to be used for encryption. Optional, defaults + to None. If not included, the ID placeholder will be used. + cryptographic_parameters (CryptographicParameters): A + CryptographicParameters struct containing the settings for + the encryption algorithm. Optional, defaults to None. If not + included, the CryptographicParameters associated with the + managed object will be used instead. + data (bytes): The data to encrypt in binary form. Required for + encoding and decoding. + iv_counter_nonce (bytes): The IV/counter/nonce value to be used + with the encryption algorithm. Optional, defaults to None. + """ + super(EncryptRequestPayload, self).__init__( + enums.Tags.REQUEST_PAYLOAD + ) + + self._unique_identifier = None + self._cryptographic_parameters = None + self._data = None + self._iv_counter_nonce = None + + self.unique_identifier = unique_identifier + self.cryptographic_parameters = cryptographic_parameters + self.data = data + self.iv_counter_nonce = iv_counter_nonce + + @property + def unique_identifier(self): + if self._unique_identifier: + return self._unique_identifier.value + else: + return None + + @unique_identifier.setter + def unique_identifier(self, value): + if value is None: + self._unique_identifier = None + elif isinstance(value, six.string_types): + self._unique_identifier = primitives.TextString( + value=value, + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + else: + raise TypeError("unique identifier must be a string") + + @property + def cryptographic_parameters(self): + return self._cryptographic_parameters + + @cryptographic_parameters.setter + def cryptographic_parameters(self, value): + if value is None: + self._cryptographic_parameters = None + elif isinstance(value, attributes.CryptographicParameters): + self._cryptographic_parameters = value + else: + raise TypeError( + "cryptographic parameters must be a CryptographicParameters " + "struct" + ) + + @property + def data(self): + if self._data: + return self._data.value + else: + return None + + @data.setter + def data(self, value): + if value is None: + self._data = None + elif isinstance(value, six.binary_type): + self._data = primitives.ByteString( + value=value, + tag=enums.Tags.DATA + ) + else: + raise TypeError("data must be bytes") + + @property + def iv_counter_nonce(self): + if self._iv_counter_nonce: + return self._iv_counter_nonce.value + else: + return None + + @iv_counter_nonce.setter + def iv_counter_nonce(self, value): + if value is None: + self._iv_counter_nonce = None + elif isinstance(value, six.binary_type): + self._iv_counter_nonce = primitives.ByteString( + value=value, + tag=enums.Tags.IV_COUNTER_NONCE + ) + else: + raise TypeError("IV/counter/nonce must be bytes") + + def read(self, input_stream): + """ + Read the data encoding the Encrypt request payload and decode it + into its constituent parts. + + Args: + input_stream (stream): A data stream containing encoded object + data, supporting a read method; usually a BytearrayStream + object. + + Raises: + ValueError: Raised if the data attribute is missing from the + encoded payload. + """ + super(EncryptRequestPayload, self).read(input_stream) + local_stream = utils.BytearrayStream(input_stream.read(self.length)) + + if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, local_stream): + self._unique_identifier = primitives.TextString( + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + self._unique_identifier.read(local_stream) + + if self.is_tag_next( + enums.Tags.CRYPTOGRAPHIC_PARAMETERS, + local_stream + ): + self._cryptographic_parameters = \ + attributes.CryptographicParameters() + self._cryptographic_parameters.read(local_stream) + + if self.is_tag_next(enums.Tags.DATA, local_stream): + self._data = primitives.ByteString(tag=enums.Tags.DATA) + self._data.read(local_stream) + else: + raise ValueError("invalid payload missing the data attribute") + + if self.is_tag_next(enums.Tags.IV_COUNTER_NONCE, local_stream): + self._iv_counter_nonce = primitives.ByteString( + tag=enums.Tags.IV_COUNTER_NONCE + ) + self._iv_counter_nonce.read(local_stream) + + self.is_oversized(local_stream) + + def write(self, output_stream): + """ + Write the data encoding the Encrypt request payload to a stream. + + Args: + output_stream (stream): A data stream in which to encode object + data, supporting a write method; usually a BytearrayStream + object. + + Raises: + ValueError: Raised if the data attribute is not defined. + """ + local_stream = utils.BytearrayStream() + + if self._unique_identifier: + self._unique_identifier.write(local_stream) + if self._cryptographic_parameters: + self._cryptographic_parameters.write(local_stream) + + if self._data: + self._data.write(local_stream) + else: + raise ValueError("invalid payload missing the data attribute") + + if self._iv_counter_nonce: + self._iv_counter_nonce.write(local_stream) + + self.length = local_stream.length() + super(EncryptRequestPayload, self).write(output_stream) + output_stream.write(local_stream.buffer) + + def __eq__(self, other): + if isinstance(other, EncryptRequestPayload): + if self.unique_identifier != other.unique_identifier: + return False + elif self.cryptographic_parameters !=\ + other.cryptographic_parameters: + return False + elif self.data != other.data: + return False + elif self.iv_counter_nonce != other.iv_counter_nonce: + return False + else: + return True + else: + return NotImplemented + + def __ne__(self, other): + if isinstance(other, EncryptRequestPayload): + return not (self == other) + else: + return NotImplemented + + def __repr__(self): + args = ", ".join([ + "unique_identifier='{0}'".format(self.unique_identifier), + "cryptographic_parameters={0}".format( + repr(self.cryptographic_parameters) + ), + "data={0}".format(self.data), + "iv_counter_nonce={0}".format(self.iv_counter_nonce) + ]) + return "EncryptRequestPayload({0})".format(args) + + def __str__(self): + return str({ + 'unique_identifier': self.unique_identifier, + 'cryptographic_parameters': self.cryptographic_parameters, + 'data': self.data, + 'iv_counter_nonce': self.iv_counter_nonce + }) + + +class EncryptResponsePayload(primitives.Struct): + """ + A response payload for the Encrypt operation. + + Attributes: + unique_identifier: The unique ID of the managed object used for the + encryption. + data: The encrypted data in the form of a binary string. + iv_counter_nonce: The IV/counter/nonce used with the encryption + algorithm. Comes in the form of a binary string. + """ + + def __init__(self, + unique_identifier=None, + data=None, + iv_counter_nonce=None): + """ + Construct an Encrypt response payload struct. + + Args: + unique_identifier (string): The ID of the managed object (e.g., + a symmetric key) used for encryption. Required for encoding + and decoding. + data (bytes): The encrypted data in binary form. Required for + encoding and decoding. + iv_counter_nonce (bytes): The IV/counter/nonce value used with + the encryption algorithm if it was required and if this + value was not originally specified by the client. Optional, + defaults to None. + """ + super(EncryptResponsePayload, self).__init__( + enums.Tags.RESPONSE_PAYLOAD + ) + + self._unique_identifier = None + self._data = None + self._iv_counter_nonce = None + + self.unique_identifier = unique_identifier + self.data = data + self.iv_counter_nonce = iv_counter_nonce + + @property + def unique_identifier(self): + if self._unique_identifier: + return self._unique_identifier.value + else: + return None + + @unique_identifier.setter + def unique_identifier(self, value): + if value is None: + self._unique_identifier = None + elif isinstance(value, six.string_types): + self._unique_identifier = primitives.TextString( + value=value, + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + else: + raise TypeError("unique identifier must be a string") + + @property + def data(self): + if self._data: + return self._data.value + else: + return None + + @data.setter + def data(self, value): + if value is None: + self._data = None + elif isinstance(value, six.binary_type): + self._data = primitives.ByteString( + value=value, + tag=enums.Tags.DATA + ) + else: + raise TypeError("data must be bytes") + + @property + def iv_counter_nonce(self): + if self._iv_counter_nonce: + return self._iv_counter_nonce.value + else: + return None + + @iv_counter_nonce.setter + def iv_counter_nonce(self, value): + if value is None: + self._iv_counter_nonce = None + elif isinstance(value, six.binary_type): + self._iv_counter_nonce = primitives.ByteString( + value=value, + tag=enums.Tags.IV_COUNTER_NONCE + ) + else: + raise TypeError("IV/counter/nonce must be bytes") + + def read(self, input_stream): + """ + Read the data encoding the Encrypt response payload and decode it + into its constituent parts. + + Args: + input_stream (stream): A data stream containing encoded object + data, supporting a read method; usually a BytearrayStream + object. + + Raises: + ValueError: Raised if the unique_identifier or data attributes + are missing from the encoded payload. + """ + super(EncryptResponsePayload, self).read(input_stream) + local_stream = utils.BytearrayStream(input_stream.read(self.length)) + + if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, local_stream): + self._unique_identifier = primitives.TextString( + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + self._unique_identifier.read(local_stream) + else: + raise ValueError( + "invalid payload missing the unique identifier attribute" + ) + + if self.is_tag_next(enums.Tags.DATA, local_stream): + self._data = primitives.ByteString(tag=enums.Tags.DATA) + self._data.read(local_stream) + else: + raise ValueError("invalid payload missing the data attribute") + + if self.is_tag_next(enums.Tags.IV_COUNTER_NONCE, local_stream): + self._iv_counter_nonce = primitives.ByteString( + tag=enums.Tags.IV_COUNTER_NONCE + ) + self._iv_counter_nonce.read(local_stream) + + self.is_oversized(local_stream) + + def write(self, output_stream): + """ + Write the data encoding the Encrypt response payload to a stream. + + Args: + output_stream (stream): A data stream in which to encode object + data, supporting a write method; usually a BytearrayStream + object. + + Raises: + ValueError: Raised if the unique_identifier or data attributes + are not defined. + """ + local_stream = utils.BytearrayStream() + + if self._unique_identifier: + self._unique_identifier.write(local_stream) + else: + raise ValueError( + "invalid payload missing the unique identifier attribute" + ) + + if self._data: + self._data.write(local_stream) + else: + raise ValueError("invalid payload missing the data attribute") + + if self._iv_counter_nonce: + self._iv_counter_nonce.write(local_stream) + + self.length = local_stream.length() + super(EncryptResponsePayload, self).write(output_stream) + output_stream.write(local_stream.buffer) + + def __eq__(self, other): + if isinstance(other, EncryptResponsePayload): + if self.unique_identifier != other.unique_identifier: + return False + elif self.data != other.data: + return False + elif self.iv_counter_nonce != other.iv_counter_nonce: + return False + else: + return True + else: + return NotImplemented + + def __ne__(self, other): + if isinstance(other, EncryptResponsePayload): + return not (self == other) + else: + return NotImplemented + + def __repr__(self): + args = ", ".join([ + "unique_identifier='{0}'".format(self.unique_identifier), + "data={0}".format(self.data), + "iv_counter_nonce={0}".format(self.iv_counter_nonce) + ]) + return "EncryptResponsePayload({0})".format(args) + + def __str__(self): + return str({ + 'unique_identifier': self.unique_identifier, + 'data': self.data, + 'iv_counter_nonce': self.iv_counter_nonce + }) diff --git a/kmip/tests/unit/core/factories/payloads/test_request.py b/kmip/tests/unit/core/factories/payloads/test_request.py index 1b16474..fe35b4c 100644 --- a/kmip/tests/unit/core/factories/payloads/test_request.py +++ b/kmip/tests/unit/core/factories/payloads/test_request.py @@ -23,6 +23,7 @@ from kmip.core.messages.payloads import create from kmip.core.messages.payloads import create_key_pair from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import discover_versions +from kmip.core.messages.payloads import encrypt from kmip.core.messages.payloads import get from kmip.core.messages.payloads import get_attribute_list from kmip.core.messages.payloads import get_attributes @@ -208,10 +209,8 @@ class TestRequestPayloadFactory(testtools.TestCase): ) def test_create_encrypt_payload(self): - self._test_not_implemented( - self.factory.create, - enums.Operation.ENCRYPT - ) + payload = self.factory.create(enums.Operation.ENCRYPT) + self._test_payload_type(payload, encrypt.EncryptRequestPayload) def test_create_decrypt_payload(self): self._test_not_implemented( diff --git a/kmip/tests/unit/core/factories/payloads/test_response.py b/kmip/tests/unit/core/factories/payloads/test_response.py index 95043b1..73f9c87 100644 --- a/kmip/tests/unit/core/factories/payloads/test_response.py +++ b/kmip/tests/unit/core/factories/payloads/test_response.py @@ -23,6 +23,7 @@ from kmip.core.messages.payloads import create from kmip.core.messages.payloads import create_key_pair from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import discover_versions +from kmip.core.messages.payloads import encrypt from kmip.core.messages.payloads import get from kmip.core.messages.payloads import get_attribute_list from kmip.core.messages.payloads import get_attributes @@ -206,10 +207,8 @@ class TestResponsePayloadFactory(testtools.TestCase): ) def test_create_encrypt_payload(self): - self._test_not_implemented( - self.factory.create, - enums.Operation.ENCRYPT - ) + payload = self.factory.create(enums.Operation.ENCRYPT) + self._test_payload_type(payload, encrypt.EncryptResponsePayload) def test_create_decrypt_payload(self): self._test_not_implemented( diff --git a/kmip/tests/unit/core/messages/payloads/test_encrypt.py b/kmip/tests/unit/core/messages/payloads/test_encrypt.py new file mode 100644 index 0000000..8c71258 --- /dev/null +++ b/kmip/tests/unit/core/messages/payloads/test_encrypt.py @@ -0,0 +1,1109 @@ +# Copyright (c) 2017 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import testtools + +from kmip.core import attributes +from kmip.core import enums +from kmip.core import utils + +from kmip.core.messages.payloads import encrypt + + +class TestEncryptRequestPayload(testtools.TestCase): + """ + Test suite for the Encrypt request payload. + """ + + def setUp(self): + super(TestEncryptRequestPayload, self).setUp() + + # Encoding obtained in part from the KMIP 1.1 testing document, + # Section 11.1. The rest of the encoding for KMIP 1.2+ features was + # built by hand; later KMIP testing documents do not include the + # encoding, so a manual construction is necessary. + # + # This encoding matches the following set of values: + # Unique Identifier - b4faee10-aa2a-4446-8ad4-0881f3422959 + # Cryptographic Parameters + # Block Cipher Mode - CBC + # Padding Method - PKCS5 + # Hashing Algorithm - SHA-1 + # Key Role Type - KEK + # Digital Signature Algorithm - SHA-256 with RSA + # Cryptographic Algorithm - AES + # Random IV - True + # IV Length - 96 + # Tag Length - 128 + # Fixed Field Length - 32 + # Invocation Field Length - 64 + # Counter Length - 0 + # Initial Counter Value - 1 + # Data - 0x0123456789ABCDEF + # IV/Counter/Nonce - 0x01 + + self.full_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x01\x28' + b'\x42\x00\x94\x07\x00\x00\x00\x24\x62\x34\x66\x61\x65\x65\x31\x30' + b'\x2D\x61\x61\x32\x61\x2D\x34\x34\x34\x36\x2D\x38\x61\x64\x34\x2D' + b'\x30\x38\x38\x31\x66\x33\x34\x32\x32\x39\x35\x39\x00\x00\x00\x00' + b'\x42\x00\x2B\x01\x00\x00\x00\xD0' + b'\x42\x00\x11\x05\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x00\x5F\x05\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00' + b'\x42\x00\x38\x05\x00\x00\x00\x04\x00\x00\x00\x04\x00\x00\x00\x00' + b'\x42\x00\x83\x05\x00\x00\x00\x04\x00\x00\x00\x0B\x00\x00\x00\x00' + b'\x42\x00\xAE\x05\x00\x00\x00\x04\x00\x00\x00\x05\x00\x00\x00\x00' + b'\x42\x00\x28\x05\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00' + b'\x42\x00\xC5\x06\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01' + b'\x42\x00\xCD\x02\x00\x00\x00\x04\x00\x00\x00\x60\x00\x00\x00\x00' + b'\x42\x00\xCE\x02\x00\x00\x00\x04\x00\x00\x00\x80\x00\x00\x00\x00' + b'\x42\x00\xCF\x02\x00\x00\x00\x04\x00\x00\x00\x20\x00\x00\x00\x00' + b'\x42\x00\xD2\x02\x00\x00\x00\x04\x00\x00\x00\x40\x00\x00\x00\x00' + b'\x42\x00\xD0\x02\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x42\x00\xD1\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x00\xC2\x08\x00\x00\x00\x08\x01\x23\x45\x67\x89\xAB\xCD\xEF' + b'\x42\x00\x3D\x08\x00\x00\x00\x01\x01\x00\x00\x00\x00\x00\x00\x00' + ) + + # Adapted from the full encoding above. This encoding matches the + # following set of values: + # Data - 0x0123456789ABCDEF + + self.minimum_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x00\x10' + b'\x42\x00\xC2\x08\x00\x00\x00\x08\x01\x02\x03\x04\x05\x06\x07\x08' + ) + + self.empty_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x00\x00' + ) + + def tearDown(self): + super(TestEncryptRequestPayload, self).tearDown() + + def test_init(self): + """ + Test that an Encrypt request payload can be constructed with no + arguments. + """ + payload = encrypt.EncryptRequestPayload() + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.cryptographic_parameters) + self.assertEqual(None, payload.data) + self.assertEqual(None, payload.iv_counter_nonce) + + def test_init_with_args(self): + """ + Test that an Encrypt request payload can be constructed with valid + values + """ + payload = encrypt.EncryptRequestPayload( + unique_identifier='00000000-1111-2222-3333-444444444444', + cryptographic_parameters=attributes.CryptographicParameters(), + data=b'\x01\x02\x03', + iv_counter_nonce=b'\x01' + ) + + self.assertEqual( + '00000000-1111-2222-3333-444444444444', + payload.unique_identifier + ) + self.assertEqual( + attributes.CryptographicParameters(), + payload.cryptographic_parameters + ) + self.assertEqual(b'\x01\x02\x03', payload.data) + self.assertEqual(b'\x01', payload.iv_counter_nonce) + + def test_invalid_unique_identifier(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the unique identifier of an Encrypt request payload. + """ + payload = encrypt.EncryptRequestPayload() + args = (payload, 'unique_identifier', 0) + self.assertRaisesRegexp( + TypeError, + "unique identifier must be a string", + setattr, + *args + ) + + def test_invalid_cryptographic_parameters(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the cryptographic parameters of an Encrypt request payload. + """ + payload = encrypt.EncryptRequestPayload() + args = (payload, 'cryptographic_parameters', 'invalid') + self.assertRaisesRegexp( + TypeError, + "cryptographic parameters must be a CryptographicParameters " + "struct", + setattr, + *args + ) + + def test_invalid_data(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the data of an Encrypt request payload. + """ + payload = encrypt.EncryptRequestPayload() + args = (payload, 'data', 0) + self.assertRaisesRegexp( + TypeError, + "data must be bytes", + setattr, + *args + ) + + def test_invalid_iv_counter_nonce(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the IV/counter/nonce of an Encrypt request payload. + """ + payload = encrypt.EncryptRequestPayload() + args = (payload, 'iv_counter_nonce', 0) + self.assertRaisesRegexp( + TypeError, + "IV/counter/nonce must be bytes", + setattr, + *args + ) + + def test_read(self): + """ + Test that an Encrypt request payload can be read from a data stream. + """ + payload = encrypt.EncryptRequestPayload() + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.cryptographic_parameters) + self.assertEqual(None, payload.data) + self.assertEqual(None, payload.iv_counter_nonce) + + payload.read(self.full_encoding) + + self.assertEqual( + 'b4faee10-aa2a-4446-8ad4-0881f3422959', + payload.unique_identifier + ) + self.assertIsNotNone(payload.cryptographic_parameters) + self.assertEqual( + enums.BlockCipherMode.CBC, + payload.cryptographic_parameters.block_cipher_mode + ) + self.assertEqual( + enums.PaddingMethod.PKCS5, + payload.cryptographic_parameters.padding_method + ) + self.assertEqual( + enums.HashingAlgorithm.SHA_1, + payload.cryptographic_parameters.hashing_algorithm + ) + self.assertEqual( + enums.KeyRoleType.KEK, + payload.cryptographic_parameters.key_role_type + ) + self.assertEqual( + enums.DigitalSignatureAlgorithm.SHA256_WITH_RSA_ENCRYPTION, + payload.cryptographic_parameters.digital_signature_algorithm + ) + self.assertEqual( + enums.CryptographicAlgorithm.AES, + payload.cryptographic_parameters.cryptographic_algorithm + ) + self.assertEqual(True, payload.cryptographic_parameters.random_iv) + self.assertEqual(96, payload.cryptographic_parameters.iv_length) + self.assertEqual(128, payload.cryptographic_parameters.tag_length) + self.assertEqual( + 32, + payload.cryptographic_parameters.fixed_field_length + ) + self.assertEqual( + 64, + payload.cryptographic_parameters.invocation_field_length + ) + self.assertEqual(0, payload.cryptographic_parameters.counter_length) + self.assertEqual( + 1, + payload.cryptographic_parameters.initial_counter_value + ) + self.assertEqual(b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', payload.data) + self.assertEqual(b'\x01', payload.iv_counter_nonce) + + def test_read_partial(self): + """ + Test that an Encrypt request payload can be read from a partial data + stream containing the minimum required attributes. + """ + payload = encrypt.EncryptRequestPayload() + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.cryptographic_parameters) + self.assertEqual(None, payload.data) + self.assertEqual(None, payload.iv_counter_nonce) + + payload.read(self.minimum_encoding) + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.cryptographic_parameters) + self.assertEqual(b'\x01\x02\x03\x04\x05\x06\x07\x08', payload.data) + self.assertEqual(None, payload.iv_counter_nonce) + + def test_read_invalid(self): + """ + Test that a ValueError gets raised when a required Encrypt request + payload attribute is missing from the payload encoding. + """ + payload = encrypt.EncryptRequestPayload() + args = (self.empty_encoding, ) + self.assertRaisesRegexp( + ValueError, + "invalid payload missing the data attribute", + payload.read, + *args + ) + + def test_write(self): + """ + Test that an Encrypt request payload can be written to a data stream. + """ + payload = encrypt.EncryptRequestPayload( + unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', + cryptographic_parameters=attributes.CryptographicParameters( + block_cipher_mode=enums.BlockCipherMode.CBC, + padding_method=enums.PaddingMethod.PKCS5, + hashing_algorithm=enums.HashingAlgorithm.SHA_1, + key_role_type=enums.KeyRoleType.KEK, + digital_signature_algorithm=enums.DigitalSignatureAlgorithm. + SHA256_WITH_RSA_ENCRYPTION, + cryptographic_algorithm=enums.CryptographicAlgorithm.AES, + random_iv=True, + iv_length=96, + tag_length=128, + fixed_field_length=32, + invocation_field_length=64, + counter_length=0, + initial_counter_value=1 + ), + data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', + iv_counter_nonce=b'\x01' + ) + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.full_encoding), len(stream)) + self.assertEqual(str(self.full_encoding), str(stream)) + + def test_write_partial(self): + """ + Test that a partially defined Encrypt request payload can be written + to a data stream. + """ + payload = encrypt.EncryptRequestPayload( + data=b'\x01\x02\x03\x04\x05\x06\x07\x08' + ) + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.minimum_encoding), len(stream)) + self.assertEqual(str(self.minimum_encoding), str(stream)) + + def test_write_invalid(self): + """ + Test that a ValueError gets raised when a required Encrypt request + payload attribute is missing when encoding the payload. + """ + payload = encrypt.EncryptRequestPayload() + stream = utils.BytearrayStream() + args = (stream, ) + self.assertRaisesRegexp( + ValueError, + "invalid payload missing the data attribute", + payload.write, + *args + ) + + def test_equal_on_equal(self): + """ + Test that the equality operator returns True when comparing two + Encrypt request payloads with the same data. + """ + a = encrypt.EncryptRequestPayload() + b = encrypt.EncryptRequestPayload() + + self.assertTrue(a == b) + self.assertTrue(b == a) + + a = encrypt.EncryptRequestPayload( + unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', + cryptographic_parameters=attributes.CryptographicParameters( + block_cipher_mode=enums.BlockCipherMode.CBC, + padding_method=enums.PaddingMethod.PKCS5, + hashing_algorithm=enums.HashingAlgorithm.SHA_1, + key_role_type=enums.KeyRoleType.KEK, + digital_signature_algorithm=enums.DigitalSignatureAlgorithm. + SHA256_WITH_RSA_ENCRYPTION, + cryptographic_algorithm=enums.CryptographicAlgorithm.AES, + random_iv=True, + iv_length=96, + tag_length=128, + fixed_field_length=32, + invocation_field_length=64, + counter_length=0, + initial_counter_value=1 + ), + data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', + iv_counter_nonce=b'\x01' + ) + b = encrypt.EncryptRequestPayload( + unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', + cryptographic_parameters=attributes.CryptographicParameters( + block_cipher_mode=enums.BlockCipherMode.CBC, + padding_method=enums.PaddingMethod.PKCS5, + hashing_algorithm=enums.HashingAlgorithm.SHA_1, + key_role_type=enums.KeyRoleType.KEK, + digital_signature_algorithm=enums.DigitalSignatureAlgorithm. + SHA256_WITH_RSA_ENCRYPTION, + cryptographic_algorithm=enums.CryptographicAlgorithm.AES, + random_iv=True, + iv_length=96, + tag_length=128, + fixed_field_length=32, + invocation_field_length=64, + counter_length=0, + initial_counter_value=1 + ), + data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', + iv_counter_nonce=b'\x01' + ) + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_not_equal_unique_identifier(self): + """ + Test that the equality operator returns False when comparing two + Encrypt request payloads with different unique identifiers. + """ + a = encrypt.EncryptRequestPayload( + unique_identifier='a' + ) + b = encrypt.EncryptRequestPayload( + unique_identifier='b' + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_cryptographic_parameters(self): + """ + Test that the equality operator returns False when comparing two + Encrypt request payloads with different cryptographic parameters. + """ + a = encrypt.EncryptRequestPayload( + cryptographic_parameters=attributes.CryptographicParameters( + block_cipher_mode=enums.BlockCipherMode.CBC + ) + ) + b = encrypt.EncryptRequestPayload( + cryptographic_parameters=attributes.CryptographicParameters( + hashing_algorithm=enums.HashingAlgorithm.MD5 + ) + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_data(self): + """ + Test that the equality operator returns False when comparing two + Encrypt request payloads with different data. + """ + a = encrypt.EncryptRequestPayload(data=b'\x11') + b = encrypt.EncryptRequestPayload(data=b'\xFF') + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_iv_counter_nonce(self): + """ + Test that the equality operator returns False when comparing two + Encrypt request payloads with different IV/counter/nonce values. + """ + a = encrypt.EncryptRequestPayload(iv_counter_nonce=b'\x22') + b = encrypt.EncryptRequestPayload(iv_counter_nonce=b'\xAA') + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_type_mismatch(self): + """ + Test that the equality operator returns False when comparing two + Encrypt request payloads with different types. + """ + a = encrypt.EncryptRequestPayload() + b = 'invalid' + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_not_equal_on_equal(self): + """ + Test that the inequality operator returns False when comparing two + Encrypt request payloads with the same data. + """ + a = encrypt.EncryptRequestPayload() + b = encrypt.EncryptRequestPayload() + + self.assertFalse(a != b) + self.assertFalse(b != a) + + a = encrypt.EncryptRequestPayload( + unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', + cryptographic_parameters=attributes.CryptographicParameters( + block_cipher_mode=enums.BlockCipherMode.CBC, + padding_method=enums.PaddingMethod.PKCS5, + hashing_algorithm=enums.HashingAlgorithm.SHA_1, + key_role_type=enums.KeyRoleType.KEK, + digital_signature_algorithm=enums.DigitalSignatureAlgorithm. + SHA256_WITH_RSA_ENCRYPTION, + cryptographic_algorithm=enums.CryptographicAlgorithm.AES, + random_iv=True, + iv_length=96, + tag_length=128, + fixed_field_length=32, + invocation_field_length=64, + counter_length=0, + initial_counter_value=1 + ), + data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', + iv_counter_nonce=b'\x01' + ) + b = encrypt.EncryptRequestPayload( + unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', + cryptographic_parameters=attributes.CryptographicParameters( + block_cipher_mode=enums.BlockCipherMode.CBC, + padding_method=enums.PaddingMethod.PKCS5, + hashing_algorithm=enums.HashingAlgorithm.SHA_1, + key_role_type=enums.KeyRoleType.KEK, + digital_signature_algorithm=enums.DigitalSignatureAlgorithm. + SHA256_WITH_RSA_ENCRYPTION, + cryptographic_algorithm=enums.CryptographicAlgorithm.AES, + random_iv=True, + iv_length=96, + tag_length=128, + fixed_field_length=32, + invocation_field_length=64, + counter_length=0, + initial_counter_value=1 + ), + data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', + iv_counter_nonce=b'\x01' + ) + + self.assertFalse(a != b) + self.assertFalse(b != a) + + def test_not_equal_on_not_equal_unique_identifier(self): + """ + Test that the inequality operator returns True when comparing two + Encrypt request payloads with different unique identifiers. + """ + a = encrypt.EncryptRequestPayload( + unique_identifier='a' + ) + b = encrypt.EncryptRequestPayload( + unique_identifier='b' + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_cryptographic_parameters(self): + """ + Test that the inequality operator returns True when comparing two + Encrypt request payloads with different cryptographic parameters. + """ + a = encrypt.EncryptRequestPayload( + cryptographic_parameters=attributes.CryptographicParameters( + block_cipher_mode=enums.BlockCipherMode.CBC + ) + ) + b = encrypt.EncryptRequestPayload( + cryptographic_parameters=attributes.CryptographicParameters( + hashing_algorithm=enums.HashingAlgorithm.MD5 + ) + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_data(self): + """ + Test that the inequality operator returns True when comparing two + Encrypt request payloads with different data. + """ + a = encrypt.EncryptRequestPayload(data=b'\x11') + b = encrypt.EncryptRequestPayload(data=b'\xFF') + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_iv_counter_nonce(self): + """ + Test that the inequality operator returns True when comparing two + Encrypt request payloads with different IV/counter/nonce values. + """ + a = encrypt.EncryptRequestPayload(iv_counter_nonce=b'\x22') + b = encrypt.EncryptRequestPayload(iv_counter_nonce=b'\xAA') + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_type_mismatch(self): + """ + Test that the inequality operator returns True when comparing two + Encrypt request payloads with different types. + """ + a = encrypt.EncryptRequestPayload() + b = 'invalid' + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_repr(self): + """ + Test that repr can be applied to an Encrypt request payload. + """ + payload = encrypt.EncryptRequestPayload( + unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', + cryptographic_parameters=attributes.CryptographicParameters( + block_cipher_mode=enums.BlockCipherMode.CBC, + padding_method=enums.PaddingMethod.PKCS5, + hashing_algorithm=enums.HashingAlgorithm.SHA_1, + key_role_type=enums.KeyRoleType.KEK, + digital_signature_algorithm=enums.DigitalSignatureAlgorithm. + SHA256_WITH_RSA_ENCRYPTION, + cryptographic_algorithm=enums.CryptographicAlgorithm.AES, + random_iv=True, + iv_length=96, + tag_length=128, + fixed_field_length=32, + invocation_field_length=64, + counter_length=0, + initial_counter_value=1 + ), + data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', + iv_counter_nonce=b'\x01' + ) + expected = ( + "EncryptRequestPayload(" + "unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', " + "cryptographic_parameters=CryptographicParameters(" + "block_cipher_mode=BlockCipherMode.CBC, " + "padding_method=PaddingMethod.PKCS5, " + "hashing_algorithm=HashingAlgorithm.SHA_1, " + "key_role_type=KeyRoleType.KEK, " + "digital_signature_algorithm=" + "DigitalSignatureAlgorithm.SHA256_WITH_RSA_ENCRYPTION, " + "cryptographic_algorithm=CryptographicAlgorithm.AES, " + "random_iv=True, " + "iv_length=96, " + "tag_length=128, " + "fixed_field_length=32, " + "invocation_field_length=64, " + "counter_length=0, " + "initial_counter_value=1), " + "data=" + str(b'\x01\x23\x45\x67\x89\xAB\xCD\xEF') + ", " + "iv_counter_nonce=" + str(b'\x01') + ")" + ) + observed = repr(payload) + + self.assertEqual(expected, observed) + + def test_str(self): + """ + Test that str can be applied to an Encrypt request payload + """ + cryptographic_parameters = attributes.CryptographicParameters( + block_cipher_mode=enums.BlockCipherMode.CBC, + padding_method=enums.PaddingMethod.PKCS5, + hashing_algorithm=enums.HashingAlgorithm.SHA_1, + key_role_type=enums.KeyRoleType.KEK, + digital_signature_algorithm=enums.DigitalSignatureAlgorithm. + SHA256_WITH_RSA_ENCRYPTION, + cryptographic_algorithm=enums.CryptographicAlgorithm.AES, + random_iv=True, + iv_length=96, + tag_length=128, + fixed_field_length=32, + invocation_field_length=64, + counter_length=0, + initial_counter_value=1 + ) + payload = encrypt.EncryptRequestPayload( + unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', + cryptographic_parameters=cryptographic_parameters, + data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', + iv_counter_nonce=b'\x01' + ) + + expected = str({ + 'unique_identifier': 'b4faee10-aa2a-4446-8ad4-0881f3422959', + 'cryptographic_parameters': cryptographic_parameters, + 'data': b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', + 'iv_counter_nonce': b'\x01' + }) + observed = str(payload) + + self.assertEqual(expected, observed) + + +class TestEncryptResponsePayload(testtools.TestCase): + """ + Test suite for the Encrypt response payload. + """ + + def setUp(self): + super(TestEncryptResponsePayload, self).setUp() + + # Encoding obtained in part from the KMIP 1.1 testing document, + # Section 11.1. The rest of the encoding for KMIP 1.2+ features was + # built by hand; later KMIP testing documents do not include the + # encoding, so a manual construction is necessary. + # + # This encoding matches the following set of values: + # Unique Identifier - b4faee10-aa2a-4446-8ad4-0881f3422959 + # Data - 0x0123456789ABCDEF + # IV/Counter/Nonce - 0x01 + + self.full_encoding = utils.BytearrayStream( + b'\x42\x00\x7C\x01\x00\x00\x00\x50' + b'\x42\x00\x94\x07\x00\x00\x00\x24\x62\x34\x66\x61\x65\x65\x31\x30' + b'\x2D\x61\x61\x32\x61\x2D\x34\x34\x34\x36\x2D\x38\x61\x64\x34\x2D' + b'\x30\x38\x38\x31\x66\x33\x34\x32\x32\x39\x35\x39\x00\x00\x00\x00' + b'\x42\x00\xC2\x08\x00\x00\x00\x08\x01\x23\x45\x67\x89\xAB\xCD\xEF' + b'\x42\x00\x3D\x08\x00\x00\x00\x01\x01\x00\x00\x00\x00\x00\x00\x00' + ) + + # Adapted from the full encoding above. This encoding matches the + # following set of values: + # Unique Identifier - b4faee10-aa2a-4446-8ad4-0881f3422959 + # Data - 0x0123456789ABCDEF + + self.minimum_encoding = utils.BytearrayStream( + b'\x42\x00\x7C\x01\x00\x00\x00\x40' + b'\x42\x00\x94\x07\x00\x00\x00\x24\x62\x34\x66\x61\x65\x65\x31\x30' + b'\x2D\x61\x61\x32\x61\x2D\x34\x34\x34\x36\x2D\x38\x61\x64\x34\x2D' + b'\x30\x38\x38\x31\x66\x33\x34\x32\x32\x39\x35\x39\x00\x00\x00\x00' + b'\x42\x00\xC2\x08\x00\x00\x00\x08\x01\x02\x03\x04\x05\x06\x07\x08' + ) + + # Adapted from the minimum encoding above. This encoding matches the + # following set of values: + # Unique Identifier - b4faee10-aa2a-4446-8ad4-0881f3422959 + + self.incomplete_encoding = utils.BytearrayStream( + b'\x42\x00\x7C\x01\x00\x00\x00\x30' + b'\x42\x00\x94\x07\x00\x00\x00\x24\x62\x34\x66\x61\x65\x65\x31\x30' + b'\x2D\x61\x61\x32\x61\x2D\x34\x34\x34\x36\x2D\x38\x61\x64\x34\x2D' + b'\x30\x38\x38\x31\x66\x33\x34\x32\x32\x39\x35\x39\x00\x00\x00\x00' + ) + + self.empty_encoding = utils.BytearrayStream( + b'\x42\x00\x7C\x01\x00\x00\x00\x00' + ) + + def tearDown(self): + super(TestEncryptResponsePayload, self).tearDown() + + def test_init(self): + """ + Test that an Encrypt response payload can be constructed with no + arguments. + """ + payload = encrypt.EncryptResponsePayload() + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.data) + self.assertEqual(None, payload.iv_counter_nonce) + + def test_init_with_args(self): + """ + Test that an Encrypt response payload can be constructed with valid + values + """ + payload = encrypt.EncryptResponsePayload( + unique_identifier='00000000-1111-2222-3333-444444444444', + data=b'\x01\x02\x03', + iv_counter_nonce=b'\x01' + ) + + self.assertEqual( + '00000000-1111-2222-3333-444444444444', + payload.unique_identifier + ) + self.assertEqual(b'\x01\x02\x03', payload.data) + self.assertEqual(b'\x01', payload.iv_counter_nonce) + + def test_invalid_unique_identifier(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the unique identifier of an Encrypt response payload. + """ + payload = encrypt.EncryptResponsePayload() + args = (payload, 'unique_identifier', 0) + self.assertRaisesRegexp( + TypeError, + "unique identifier must be a string", + setattr, + *args + ) + + def test_invalid_data(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the data of an Encrypt response payload. + """ + payload = encrypt.EncryptResponsePayload() + args = (payload, 'data', 0) + self.assertRaisesRegexp( + TypeError, + "data must be bytes", + setattr, + *args + ) + + def test_invalid_iv_counter_nonce(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the IV/counter/nonce of an Encrypt response payload. + """ + payload = encrypt.EncryptResponsePayload() + args = (payload, 'iv_counter_nonce', 0) + self.assertRaisesRegexp( + TypeError, + "IV/counter/nonce must be bytes", + setattr, + *args + ) + + def test_read(self): + """ + Test that an Encrypt response payload can be read from a data stream. + """ + payload = encrypt.EncryptResponsePayload() + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.data) + self.assertEqual(None, payload.iv_counter_nonce) + + payload.read(self.full_encoding) + + self.assertEqual( + 'b4faee10-aa2a-4446-8ad4-0881f3422959', + payload.unique_identifier + ) + self.assertEqual(b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', payload.data) + self.assertEqual(b'\x01', payload.iv_counter_nonce) + + def test_read_partial(self): + """ + Test that an Encrypt response payload can be read from a partial data + stream containing the minimum required attributes. + """ + payload = encrypt.EncryptResponsePayload() + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.data) + self.assertEqual(None, payload.iv_counter_nonce) + + payload.read(self.minimum_encoding) + + self.assertEqual( + 'b4faee10-aa2a-4446-8ad4-0881f3422959', + payload.unique_identifier + ) + self.assertEqual(b'\x01\x02\x03\x04\x05\x06\x07\x08', payload.data) + self.assertEqual(None, payload.iv_counter_nonce) + + def test_read_invalid(self): + """ + Test that a ValueError gets raised when required Encrypt response + payload attributes are missing from the payload encoding. + """ + payload = encrypt.EncryptResponsePayload() + args = (self.empty_encoding, ) + self.assertRaisesRegexp( + ValueError, + "invalid payload missing the unique identifier attribute", + payload.read, + *args + ) + + payload = encrypt.EncryptResponsePayload() + args = (self.incomplete_encoding, ) + self.assertRaisesRegexp( + ValueError, + "invalid payload missing the data attribute", + payload.read, + *args + ) + + def test_write(self): + """ + Test that an Encrypt response payload can be written to a data stream. + """ + payload = encrypt.EncryptResponsePayload( + unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', + data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', + iv_counter_nonce=b'\x01' + ) + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.full_encoding), len(stream)) + self.assertEqual(str(self.full_encoding), str(stream)) + + def test_write_partial(self): + """ + Test that a partially defined Encrypt response payload can be written + to a data stream. + """ + payload = encrypt.EncryptResponsePayload( + unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', + data=b'\x01\x02\x03\x04\x05\x06\x07\x08' + ) + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.minimum_encoding), len(stream)) + self.assertEqual(str(self.minimum_encoding), str(stream)) + + def test_write_invalid(self): + """ + Test that a ValueError gets raised when required Encrypt response + payload attributes are missing when encoding the payload. + """ + payload = encrypt.EncryptResponsePayload() + self.assertIsNone(payload.unique_identifier) + stream = utils.BytearrayStream() + args = (stream, ) + self.assertRaisesRegexp( + ValueError, + "invalid payload missing the unique identifier attribute", + payload.write, + *args + ) + + payload = encrypt.EncryptResponsePayload( + unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959' + ) + stream = utils.BytearrayStream() + args = (stream, ) + self.assertRaisesRegexp( + ValueError, + "invalid payload missing the data attribute", + payload.write, + *args + ) + + def test_equal_on_equal(self): + """ + Test that the equality operator returns True when comparing two + Encrypt response payloads with the same data. + """ + a = encrypt.EncryptResponsePayload() + b = encrypt.EncryptResponsePayload() + + self.assertTrue(a == b) + self.assertTrue(b == a) + + a = encrypt.EncryptResponsePayload( + unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', + data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', + iv_counter_nonce=b'\x01' + ) + b = encrypt.EncryptResponsePayload( + unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', + data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', + iv_counter_nonce=b'\x01' + ) + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_not_equal_unique_identifier(self): + """ + Test that the equality operator returns False when comparing two + Encrypt response payloads with different unique identifiers. + """ + a = encrypt.EncryptResponsePayload( + unique_identifier='a' + ) + b = encrypt.EncryptResponsePayload( + unique_identifier='b' + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_data(self): + """ + Test that the equality operator returns False when comparing two + Encrypt response payloads with different data. + """ + a = encrypt.EncryptResponsePayload(data=b'\x11') + b = encrypt.EncryptResponsePayload(data=b'\xFF') + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_iv_counter_nonce(self): + """ + Test that the equality operator returns False when comparing two + Encrypt response payloads with different IV/counter/nonce values. + """ + a = encrypt.EncryptResponsePayload(iv_counter_nonce=b'\x22') + b = encrypt.EncryptResponsePayload(iv_counter_nonce=b'\xAA') + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_type_mismatch(self): + """ + Test that the equality operator returns False when comparing two + Encrypt response payloads with different types. + """ + a = encrypt.EncryptResponsePayload() + b = 'invalid' + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_not_equal_on_equal(self): + """ + Test that the inequality operator returns False when comparing two + Encrypt response payloads with the same data. + """ + a = encrypt.EncryptResponsePayload() + b = encrypt.EncryptResponsePayload() + + self.assertFalse(a != b) + self.assertFalse(b != a) + + a = encrypt.EncryptResponsePayload( + unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', + data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', + iv_counter_nonce=b'\x01' + ) + b = encrypt.EncryptResponsePayload( + unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', + data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', + iv_counter_nonce=b'\x01' + ) + + self.assertFalse(a != b) + self.assertFalse(b != a) + + def test_not_equal_on_not_equal_unique_identifier(self): + """ + Test that the inequality operator returns True when comparing two + Encrypt response payloads with different unique identifiers. + """ + a = encrypt.EncryptResponsePayload( + unique_identifier='a' + ) + b = encrypt.EncryptResponsePayload( + unique_identifier='b' + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_data(self): + """ + Test that the inequality operator returns True when comparing two + Encrypt response payloads with different data. + """ + a = encrypt.EncryptResponsePayload(data=b'\x11') + b = encrypt.EncryptResponsePayload(data=b'\xFF') + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_iv_counter_nonce(self): + """ + Test that the inequality operator returns True when comparing two + Encrypt response payloads with different IV/counter/nonce values. + """ + a = encrypt.EncryptResponsePayload(iv_counter_nonce=b'\x22') + b = encrypt.EncryptResponsePayload(iv_counter_nonce=b'\xAA') + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_type_mismatch(self): + """ + Test that the inequality operator returns True when comparing two + Encrypt response payloads with different types. + """ + a = encrypt.EncryptResponsePayload() + b = 'invalid' + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_repr(self): + """ + Test that repr can be applied to an Encrypt response payload. + """ + payload = encrypt.EncryptResponsePayload( + unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', + data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', + iv_counter_nonce=b'\x01' + ) + expected = ( + "EncryptResponsePayload(" + "unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', " + "data=" + str(b'\x01\x23\x45\x67\x89\xAB\xCD\xEF') + ", " + "iv_counter_nonce=" + str(b'\x01') + ")" + ) + observed = repr(payload) + + self.assertEqual(expected, observed) + + def test_str(self): + """ + Test that str can be applied to an Encrypt response payload + """ + payload = encrypt.EncryptResponsePayload( + unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', + data=b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', + iv_counter_nonce=b'\x01' + ) + + expected = str({ + 'unique_identifier': 'b4faee10-aa2a-4446-8ad4-0881f3422959', + 'data': b'\x01\x23\x45\x67\x89\xAB\xCD\xEF', + 'iv_counter_nonce': b'\x01' + }) + observed = str(payload) + + self.assertEqual(expected, observed)