From a910dccf07c771610b5552412ef6fe5d4ce17278 Mon Sep 17 00:00:00 2001 From: dane-fichter Date: Thu, 8 Jun 2017 04:52:33 -0400 Subject: [PATCH] Adding request and response payloads for the Sign operation --- kmip/core/factories/payloads/request.py | 4 + kmip/core/factories/payloads/response.py | 4 + kmip/core/messages/payloads/sign.py | 386 +++++++++ .../core/factories/payloads/test_request.py | 4 +- .../core/factories/payloads/test_response.py | 4 +- .../unit/core/messages/payloads/test_sign.py | 782 ++++++++++++++++++ 6 files changed, 1182 insertions(+), 2 deletions(-) create mode 100644 kmip/core/messages/payloads/sign.py create mode 100644 kmip/tests/unit/core/messages/payloads/test_sign.py diff --git a/kmip/core/factories/payloads/request.py b/kmip/core/factories/payloads/request.py index e405287..73c53bc 100644 --- a/kmip/core/factories/payloads/request.py +++ b/kmip/core/factories/payloads/request.py @@ -31,6 +31,7 @@ from kmip.core.messages.payloads import query from kmip.core.messages.payloads import rekey_key_pair from kmip.core.messages.payloads import register from kmip.core.messages.payloads import revoke +from kmip.core.messages.payloads import sign from kmip.core.messages.payloads import mac @@ -86,3 +87,6 @@ class RequestPayloadFactory(PayloadFactory): def _create_decrypt_payload(self): return decrypt.DecryptRequestPayload() + + def _create_sign_payload(self): + return sign.SignRequestPayload() diff --git a/kmip/core/factories/payloads/response.py b/kmip/core/factories/payloads/response.py index 8444176..7792825 100644 --- a/kmip/core/factories/payloads/response.py +++ b/kmip/core/factories/payloads/response.py @@ -32,6 +32,7 @@ from kmip.core.messages.payloads import rekey_key_pair from kmip.core.messages.payloads import register from kmip.core.messages.payloads import revoke from kmip.core.messages.payloads import mac +from kmip.core.messages.payloads import sign class ResponsePayloadFactory(PayloadFactory): @@ -86,3 +87,6 @@ class ResponsePayloadFactory(PayloadFactory): def _create_decrypt_payload(self): return decrypt.DecryptResponsePayload() + + def _create_sign_payload(self): + return sign.SignResponsePayload() diff --git a/kmip/core/messages/payloads/sign.py b/kmip/core/messages/payloads/sign.py new file mode 100644 index 0000000..3e88999 --- /dev/null +++ b/kmip/core/messages/payloads/sign.py @@ -0,0 +1,386 @@ +# Copyright (c) 2017 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import six + +from kmip.core import attributes +from kmip.core import enums +from kmip.core import primitives +from kmip.core import utils + + +class SignRequestPayload(primitives.Struct): + """ + A request payload for the Sign operation. + + Attributes: + unique_identifier: The unique ID of the managed object to be used for + signing some data. + cryptographic_parameters: A collection of settings relevant for the + signature operation. + data: The data to be signed in the form of a binary string. + """ + + def __init__(self, + unique_identifier=None, + cryptographic_parameters=None, + data=None): + """ + Construct a Sign request payload struct. + + Args: + unique_identifier (string): The ID of the managed object (e.g. a + public key) to be used for encryption. Optional, defaults to + None. If not included, the ID placeholder will be used. + cryptographic_parameters (CryptographicParameters): A + CryptographicParameters struct containing the settings for + the signature algorithm. Optional, defaults to None. If not + included, the CryptographicParameters associated with the + managed object will be used instead. + data (bytes): The data to be signed, in binary form. + """ + super(SignRequestPayload, self).__init__( + enums.Tags.REQUEST_PAYLOAD + ) + + self._unique_identifier = None + self._cryptographic_parameters = None + self._data = None + + self.unique_identifier = unique_identifier + self.cryptographic_parameters = cryptographic_parameters + self.data = data + + @property + def unique_identifier(self): + if self._unique_identifier: + return self._unique_identifier.value + else: + return None + + @unique_identifier.setter + def unique_identifier(self, value): + if value is None: + self._unique_identifier = None + elif isinstance(value, six.string_types): + self._unique_identifier = primitives.TextString( + value=value, + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + else: + raise TypeError("unique identifier must be a string") + + @property + def cryptographic_parameters(self): + return self._cryptographic_parameters + + @cryptographic_parameters.setter + def cryptographic_parameters(self, value): + if value is None: + self._cryptographic_parameters = None + elif isinstance(value, attributes.CryptographicParameters): + self._cryptographic_parameters = value + else: + raise TypeError( + "cryptographic parameters must be a CryptographicParameters " + "struct" + ) + + @property + def data(self): + if self._data: + return self._data.value + else: + return None + + @data.setter + def data(self, value): + if value is None: + self._data is None + elif isinstance(value, six.binary_type): + self._data = primitives.ByteString( + value=value, + tag=enums.Tags.DATA + ) + else: + raise TypeError("data must be bytes") + + def read(self, input_stream): + """ + Read the data encoding the Sign request payload and decode it + into its parts + + Args: + input_stream (stream): A data stream containing encoded object + data, supporting a read method. + + Raises: + ValueError: Raised if the data attribute is missing from the + encoded payload. + """ + super(SignRequestPayload, self).read(input_stream) + local_stream = utils.BytearrayStream(input_stream.read(self.length)) + + if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, local_stream): + self._unique_identifier = primitives.TextString( + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + + self._unique_identifier.read(local_stream) + + if self.is_tag_next( + enums.Tags.CRYPTOGRAPHIC_PARAMETERS, + local_stream + ): + self._cryptographic_parameters = \ + attributes.CryptographicParameters() + self._cryptographic_parameters.read(local_stream) + + if self.is_tag_next(enums.Tags.DATA, local_stream): + self._data = primitives.ByteString(tag=enums.Tags.DATA) + self._data.read(local_stream) + + else: + raise ValueError( + "invalid payload missing the data attribute" + ) + + def write(self, output_stream): + """ + Write the data encoding the Sign request payload to a stream. + + Args: + output_stream (stream): A data stream in which to encode object + data, supporting a write method; usually a BytearrayStream + object. + + Raises: + ValueError: Raised if the data attribute is not defined. + """ + local_stream = utils.BytearrayStream() + + if self._unique_identifier: + self._unique_identifier.write(local_stream) + if self._cryptographic_parameters: + self._cryptographic_parameters.write(local_stream) + + if self._data: + self._data.write(local_stream) + else: + raise ValueError("invalid payload missing the data attribute") + + self.length = local_stream.length() + super(SignRequestPayload, self).write(output_stream) + output_stream.write(local_stream.buffer) + + def __eq__(self, other): + if isinstance(other, SignRequestPayload): + if self.unique_identifier != other.unique_identifier: + return False + elif self.cryptographic_parameters !=\ + other.cryptographic_parameters: + return False + elif self.data != other.data: + return False + else: + return True + else: + return NotImplemented + + def __ne__(self, other): + if isinstance(other, SignRequestPayload): + return not (self == other) + else: + return NotImplemented + + def __repr__(self): + args = ", ".join([ + "unique_identifier='{0}'".format(self.unique_identifier), + "cryptographic_parameters={0}".format( + repr(self.cryptographic_parameters) + ), + "data={0}".format(self.data) + ]) + return "SignRequestPayload({0})".format(args) + + def __str__(self): + return str({ + 'unique_identifier': self.unique_identifier, + 'cryptographic_parameters': self.cryptographic_parameters, + 'data': self.data + }) + + +class SignResponsePayload(primitives.Struct): + """ + A response payload for the Sign operation. + + Attributes: + unique_identifier: The unique ID of the managed object used to sign + the data. + signature_data: The signature data as a byte string. + """ + + def __init__(self, + unique_identifier=None, + signature_data=None): + super(SignResponsePayload, self).__init__( + enums.Tags.RESPONSE_PAYLOAD + ) + + self._unique_identifier = None + self._signature_data = None + + self.unique_identifier = unique_identifier + self.signature_data = signature_data + + @property + def unique_identifier(self): + if self._unique_identifier: + return self._unique_identifier.value + else: + return None + + @unique_identifier.setter + def unique_identifier(self, value): + if value is None: + self._unique_identifier = None + elif isinstance(value, six.string_types): + self._unique_identifier = primitives.TextString( + value=value, + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + else: + raise TypeError("unique identifier must be a string") + + @property + def signature_data(self): + if self._signature_data: + return self._signature_data.value + else: + return None + + @signature_data.setter + def signature_data(self, value): + if value is None: + self._signature_data = None + elif isinstance(value, six.binary_type): + self._signature_data = primitives.ByteString( + value=value, + tag=enums.Tags.SIGNATURE_DATA + ) + else: + raise TypeError("signature data must be bytes") + + def read(self, input_stream): + """ + Read the data encoding the Sign response payload and decode it. + + Args: + input_stream (stream): A data stream containing encoded object + data, supporting a read method; usually a BytearrayStream + object. + + Raises: + ValueError: Raised if the unique_identifier or signature attributes + are missing from the encoded payload. + """ + + super(SignResponsePayload, self).read(input_stream) + local_stream = utils.BytearrayStream(input_stream.read(self.length)) + + if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, local_stream): + self._unique_identifier = primitives.TextString( + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + self._unique_identifier.read(local_stream) + else: + raise ValueError( + "invalid payload missing the unique identifier attribute" + ) + + if self.is_tag_next(enums.Tags.SIGNATURE_DATA, local_stream): + self._signature_data = primitives.ByteString( + tag=enums.Tags.SIGNATURE_DATA + ) + self._signature_data.read(local_stream) + else: + raise ValueError( + "invalid payload missing the signature data attribute" + ) + + def write(self, output_stream): + """ + Write the data encoding the Sign response to a stream. + + Args: + output_stream (stream): A data stream in which to encode object + data, supporting a write method; usually a BytearrayStream + object. + + Raises: + ValueError: Raised if the unique_identifier or signature + attributes are not defined. + """ + + local_stream = utils.BytearrayStream() + + if self._unique_identifier: + self._unique_identifier.write(local_stream) + else: + raise ValueError( + "invalid payload missing the unique identifier attribute" + ) + + if self._signature_data: + self._signature_data.write(local_stream) + else: + raise ValueError( + "invalid payload missing the signature attribute" + ) + + self.length = local_stream.length() + super(SignResponsePayload, self).write(output_stream) + output_stream.write(local_stream.buffer) + + def __eq__(self, other): + if isinstance(other, SignResponsePayload): + if self.unique_identifier != other.unique_identifier: + return False + elif self.signature_data != other.signature_data: + return False + else: + return True + else: + return NotImplemented + + def __ne__(self, other): + if isinstance(other, SignResponsePayload): + return not (self == other) + else: + return NotImplemented + + def __repr__(self): + args = ", ".join([ + "unique_identifier='{0}'".format(self.unique_identifier), + "signature_data={0}".format(self.signature_data) + ]) + return "SignResponsePayload({0})".format(args) + + def __str__(self): + return str({ + 'unique_identifier': self.unique_identifier, + 'signature_data': self.signature_data + }) diff --git a/kmip/tests/unit/core/factories/payloads/test_request.py b/kmip/tests/unit/core/factories/payloads/test_request.py index 06896cd..5872aaa 100644 --- a/kmip/tests/unit/core/factories/payloads/test_request.py +++ b/kmip/tests/unit/core/factories/payloads/test_request.py @@ -34,6 +34,7 @@ from kmip.core.messages.payloads import query from kmip.core.messages.payloads import rekey_key_pair from kmip.core.messages.payloads import register from kmip.core.messages.payloads import revoke +from kmip.core.messages.payloads import sign from kmip.core.messages.payloads import mac @@ -217,7 +218,8 @@ class TestRequestPayloadFactory(testtools.TestCase): self._test_payload_type(payload, decrypt.DecryptRequestPayload) def test_create_sign_payload(self): - self._test_not_implemented(self.factory.create, enums.Operation.SIGN) + payload = self.factory.create(enums.Operation.SIGN) + self._test_payload_type(payload, sign.SignRequestPayload) def test_create_signature_verify_payload(self): self._test_not_implemented( diff --git a/kmip/tests/unit/core/factories/payloads/test_response.py b/kmip/tests/unit/core/factories/payloads/test_response.py index b76b71c..70dc914 100644 --- a/kmip/tests/unit/core/factories/payloads/test_response.py +++ b/kmip/tests/unit/core/factories/payloads/test_response.py @@ -34,6 +34,7 @@ from kmip.core.messages.payloads import query from kmip.core.messages.payloads import rekey_key_pair from kmip.core.messages.payloads import register from kmip.core.messages.payloads import revoke +from kmip.core.messages.payloads import sign from kmip.core.messages.payloads import mac @@ -215,7 +216,8 @@ class TestResponsePayloadFactory(testtools.TestCase): self._test_payload_type(payload, decrypt.DecryptResponsePayload) def test_create_sign_payload(self): - self._test_not_implemented(self.factory.create, enums.Operation.SIGN) + payload = self.factory.create(enums.Operation.SIGN) + self._test_payload_type(payload, sign.SignResponsePayload) def test_create_signature_verify_payload(self): self._test_not_implemented( diff --git a/kmip/tests/unit/core/messages/payloads/test_sign.py b/kmip/tests/unit/core/messages/payloads/test_sign.py new file mode 100644 index 0000000..5ede846 --- /dev/null +++ b/kmip/tests/unit/core/messages/payloads/test_sign.py @@ -0,0 +1,782 @@ +# Copyright (c) 2017 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import testtools + +from kmip.core import attributes +from kmip.core import enums +from kmip.core import utils + +from kmip.core.messages.payloads import sign + + +class TestSignRequestPayload(testtools.TestCase): + """ + Test suite for the Sign request payload. + """ + + def setUp(self): + super(TestSignRequestPayload, self).setUp() + + # Encoding obtained in part from KMIP 1.4 testing document, + # partially cobbled together by hand from other test cases + # in this code base. + # + # This encoding matches the following set of values: + # Unique Identifier - b4faee10-aa2a-4446-8ad4-0881f3422959 + # Cryptographic Parameters + # Cryptographic Algorithm - ECDSA + # Data - 01020304050607080910111213141516 + + self.full_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x00\x60' + b'\x42\x00\x94\x07\x00\x00\x00\x24\x62\x34\x66\x61\x65\x65\x31\x30' + b'\x2D\x61\x61\x32\x61\x2D\x34\x34\x34\x36\x2D\x38\x61\x64\x34\x2D' + b'\x30\x38\x38\x31\x66\x33\x34\x32\x32\x39\x35\x39\x00\x00\x00\x00' + b'\x42\x00\x2B\x01\x00\x00\x00\x10' + b'\x42\x00\x28\x05\x00\x00\x00\x04\x00\x00\x00\x06\x00\x00\x00\x00' + b'\x42\x00\xC2\x08\x00\x00\x00\x10\x01\x02\x03\x04\x05\x06\x07\x08' + b'\x09\x10\x11\x12\x13\x14\x15\x16' + ) + + self.minimum_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x00\x18' + b'\x42\x00\xC2\x08\x00\x00\x00\x10\x01\x02\x03\x04\x05\x06\x07\x08' + b'\x09\x10\x11\x12\x13\x14\x15\x16' + ) + + self.empty_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x00\x00' + ) + + def tearDown(self): + super(TestSignRequestPayload, self).tearDown() + + def test_init(self): + """ + Test that a Sign request payload can be constructed with no arguments. + """ + payload = sign.SignRequestPayload() + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.cryptographic_parameters) + self.assertEqual(None, payload.data) + + def test_init_with_args(self): + """ + Test that a Sign request payload can be constructed with valid values. + """ + payload = sign.SignRequestPayload( + unique_identifier='00000000-1111-2222-3333-444444444444', + cryptographic_parameters=attributes.CryptographicParameters(), + data=b'\x01\x02\x03' + ) + + self.assertEqual( + '00000000-1111-2222-3333-444444444444', + payload.unique_identifier + ) + self.assertEqual( + attributes.CryptographicParameters(), + payload.cryptographic_parameters + ) + self.assertEqual(b'\x01\x02\x03', payload.data) + + def test_invalid_unique_identifier(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the unique identifier of a Sign request payload. + """ + payload = sign.SignRequestPayload() + args = (payload, 'unique_identifier', 0) + self.assertRaisesRegexp( + TypeError, + "unique identifier must be a string", + setattr, + *args + ) + + def test_invalid_cryptographic_parameters(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the cryptographic parameters of a Sign request payload. + """ + payload = sign.SignRequestPayload() + args = (payload, 'cryptographic_parameters', b'\x01\x02\x03') + self.assertRaisesRegexp( + TypeError, + "cryptographic parameters must be a CryptographicParameters " + "struct", + setattr, + *args + ) + + def test_invalid_data(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the data of a Sign request payload. + """ + payload = sign.SignRequestPayload() + args = (payload, 'data', 0) + self.assertRaisesRegexp( + TypeError, + "data must be bytes", + setattr, + *args + ) + + def test_read(self): + """ + Test that a Sign request payload can be read from a data stream. + """ + payload = sign.SignRequestPayload() + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.cryptographic_parameters) + self.assertEqual(None, payload.data) + + payload.read(self.full_encoding) + + self.assertEqual( + 'b4faee10-aa2a-4446-8ad4-0881f3422959', + payload.unique_identifier + ) + self.assertIsNotNone(payload.cryptographic_parameters) + self.assertEqual( + enums.CryptographicAlgorithm.ECDSA, + payload.cryptographic_parameters.cryptographic_algorithm + ) + self.assertEqual( + b'\x01\x02\x03\x04\x05\x06\x07\x08' + b'\x09\x10\x11\x12\x13\x14\x15\x16', + payload.data + ) + + def test_read_partial(self): + """ + Test that a Sign request payload can be read from a partial data + stream containing the minimum required attributes. + """ + payload = sign.SignRequestPayload() + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.cryptographic_parameters) + self.assertEqual(None, payload.data) + + payload.read(self.minimum_encoding) + + self.assertEqual( + b'\x01\x02\x03\x04\x05\x06\x07\x08' + b'\x09\x10\x11\x12\x13\x14\x15\x16', + payload.data + ) + + def test_read_invalid(self): + """ + Test that a ValueError gets raised when a required Sign request + payload attribute is missing from the payload encoding. + """ + payload = sign.SignRequestPayload() + args = (self.empty_encoding, ) + self.assertRaisesRegexp( + ValueError, + "invalid payload missing the data attribute", + payload.read, + *args + ) + + def test_write(self): + """ + Test that a Sign request payload can be written to a data stream. + """ + payload = sign.SignRequestPayload( + unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', + cryptographic_parameters=attributes.CryptographicParameters( + cryptographic_algorithm=enums.CryptographicAlgorithm.ECDSA + ), + data=b'\x01\x02\x03\x04\x05\x06\x07\x08' + b'\x09\x10\x11\x12\x13\x14\x15\x16' + ) + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.full_encoding), len(stream)) + self.assertEqual(str(self.full_encoding), str(stream)) + + def test_write_partial(self): + """ + Test that a partially defined Sign request payload can be written + to a data stream. + """ + payload = sign.SignRequestPayload( + data=b'\x01\x02\x03\x04\x05\x06\x07\x08' + b'\x09\x10\x11\x12\x13\x14\x15\x16' + ) + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.minimum_encoding), len(stream)) + self.assertEqual(str(self.minimum_encoding), str(stream)) + + def test_write_invalid(self): + """ + Test that a ValueError gets raised when a required Sign request + payload attribute is missing when encoding the payload. + """ + payload = sign.SignRequestPayload() + stream = utils.BytearrayStream() + args = (stream, ) + self.assertRaisesRegexp( + ValueError, + "invalid payload missing the data attribute", + payload.write, + *args + ) + + def test_equal_on_equal(self): + """ + Test that the equality operator returns True when comparing two + Sign request payloads with the same data. + """ + a = sign.SignRequestPayload() + b = sign.SignRequestPayload() + + self.assertTrue(a == b) + self.assertTrue(b == a) + + a = sign.SignRequestPayload( + unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', + cryptographic_parameters=attributes.CryptographicParameters( + cryptographic_algorithm=enums.CryptographicAlgorithm.ECDSA + ), + data=b'\x01\x02\x03\x04\x05\x06\x07\x08' + b'\x09\x10\x11\x12\x13\x14\x15\x16' + ) + b = sign.SignRequestPayload( + unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', + cryptographic_parameters=attributes.CryptographicParameters( + cryptographic_algorithm=enums.CryptographicAlgorithm.ECDSA + ), + data=b'\x01\x02\x03\x04\x05\x06\x07\x08' + b'\x09\x10\x11\x12\x13\x14\x15\x16' + ) + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_not_equal_unique_identifier(self): + """ + Test that the equality operator returns False when comparing two + Sign request payloads with different unique identifiers. + """ + a = sign.SignRequestPayload( + unique_identifier='a' + ) + b = sign.SignRequestPayload( + unique_identifier='b' + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_cryptographic_parameters(self): + """ + Test that the equality operator returns False when comparing two + Sign request payloads with cryptographic parameters. + """ + a = sign.SignRequestPayload( + cryptographic_parameters=attributes.CryptographicParameters( + hashing_algorithm=enums.HashingAlgorithm.MD5 + ) + ) + b = sign.SignRequestPayload( + cryptographic_parameters=attributes.CryptographicParameters( + cryptographic_algorithm=enums.CryptographicAlgorithm.ECDSA + ) + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_data(self): + """ + Test that the equality operator returns False when comparing two + Sign request payloads with different data. + """ + a = sign.SignRequestPayload(data=b'\x01') + b = sign.SignRequestPayload(data=b'\xFF') + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_type_mismatch(self): + """ + Test that the equality operator returns False when comparing two + Sign request payloads with different types. + """ + a = sign.SignRequestPayload() + b = 'invalid' + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_not_equal_on_equal(self): + """ + Test that the inequality operator returns False when comparing two + Sign request payloads with the same data. + """ + a = sign.SignRequestPayload() + b = sign.SignRequestPayload() + + self.assertFalse(a != b) + self.assertFalse(b != a) + + a = sign.SignRequestPayload( + unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', + cryptographic_parameters=attributes.CryptographicParameters( + cryptographic_algorithm=enums.CryptographicAlgorithm.ECDSA + ), + data=b'\x01\x02\x03\x04\x05\x06\x07\x08' + b'\x09\x10\x11\x12\x13\x14\x15\x16' + ) + b = sign.SignRequestPayload( + unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', + cryptographic_parameters=attributes.CryptographicParameters( + cryptographic_algorithm=enums.CryptographicAlgorithm.ECDSA + ), + data=b'\x01\x02\x03\x04\x05\x06\x07\x08' + b'\x09\x10\x11\x12\x13\x14\x15\x16' + ) + + self.assertFalse(a != b) + self.assertFalse(b != a) + + def test_not_equal_on_not_equal_unique_identifier(self): + """ + Test that the inequality operator returns True when comparing two + Sign request payloads with different unique identifiers. + """ + a = sign.SignRequestPayload( + unique_identifier='a' + ) + b = sign.SignRequestPayload( + unique_identifier='b' + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_cryptographic_parameters(self): + """ + Test that the equality operator returns False when comparing two + Sign request payloads with cryptographic parameters. + """ + a = sign.SignRequestPayload( + cryptographic_parameters=attributes.CryptographicParameters( + hashing_algorithm=enums.HashingAlgorithm.MD5 + ) + ) + b = sign.SignRequestPayload( + cryptographic_parameters=attributes.CryptographicParameters( + cryptographic_algorithm=enums.CryptographicAlgorithm.ECDSA + ) + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_data(self): + """ + Test that the inequality operator returns True when comparing two + Sign request payloads with different data. + """ + a = sign.SignRequestPayload(data=b'\x01') + b = sign.SignRequestPayload(data=b'\xFF') + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_type_mismatch(self): + """ + Test that the inequality operator returns True when comparing two + Sign request payloads with different types. + """ + a = sign.SignRequestPayload() + b = 'invalid' + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_repr(self): + """ + Test that repr can be applied to a Sign request payload. + """ + payload = sign.SignRequestPayload( + unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', + cryptographic_parameters=attributes.CryptographicParameters( + cryptographic_algorithm=enums.CryptographicAlgorithm.ECDSA + ), + data=b'\x01\x02\x03\x04\x05\x06\x07\x08' + ) + expected = ( + "SignRequestPayload(" + "unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', " + "cryptographic_parameters=CryptographicParameters(" + "block_cipher_mode=None, padding_method=None, " + "hashing_algorithm=None, key_role_type=None, " + "digital_signature_algorithm=None, " + "cryptographic_algorithm=CryptographicAlgorithm.ECDSA, " + "random_iv=None, iv_length=None, tag_length=None, " + "fixed_field_length=None, invocation_field_length=None, " + "counter_length=None, initial_counter_value=None), " + "data=" + str(b'\x01\x02\x03\x04\x05\x06\x07\x08') + ")" + ) + observed = repr(payload) + + self.assertEqual(expected, observed) + + def test_str(self): + """ + Test that str can be applied to a Sign request payload. + """ + crypto_params = attributes.CryptographicParameters( + cryptographic_algorithm=enums.CryptographicAlgorithm.ECDSA + ) + payload = sign.SignRequestPayload( + unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', + cryptographic_parameters=crypto_params, + data=b'\x01\x02\x03\x04\x05\x06\x07\x08', + ) + + expected = str({ + 'unique_identifier': 'b4faee10-aa2a-4446-8ad4-0881f3422959', + 'cryptographic_parameters': crypto_params, + 'data': b'\x01\x02\x03\x04\x05\x06\x07\x08' + }) + observed = str(payload) + + self.assertEqual(expected, observed) + + +class TestSignResponsePayload(testtools.TestCase): + """ + Test suite for the Sign response payload. + """ + + def setUp(self): + super(TestSignResponsePayload, self).setUp() + # Encoding obtained in part from KMIP 1.4 testing document, + # partially cobbled together by hand from other test cases + # in this code base. + # + # This encoding matches the following set of values: + # Unique Identifier - b4faee10-aa2a-4446-8ad4-0881f3422959 + # Signature Data - 01020304050607080910111213141516 + + self.full_encoding = utils.BytearrayStream( + b'\x42\x00\x7C\x01\x00\x00\x00\x48' + b'\x42\x00\x94\x07\x00\x00\x00\x24\x62\x34\x66\x61\x65\x65\x31\x30' + b'\x2D\x61\x61\x32\x61\x2D\x34\x34\x34\x36\x2D\x38\x61\x64\x34\x2D' + b'\x30\x38\x38\x31\x66\x33\x34\x32\x32\x39\x35\x39\x00\x00\x00\x00' + b'\x42\x00\xC3\x08\x00\x00\x00\x10\x01\x02\x03\x04\x05\x06\x07\x08' + b'\x09\x10\x11\x12\x13\x14\x15\x16' + ) + self.incomplete_encoding = utils.BytearrayStream( + b'\x42\x00\x7C\x01\x00\x00\x00\x30' + b'\x42\x00\x94\x07\x00\x00\x00\x24\x62\x34\x66\x61\x65\x65\x31\x30' + b'\x2D\x61\x61\x32\x61\x2D\x34\x34\x34\x36\x2D\x38\x61\x64\x34\x2D' + b'\x30\x38\x38\x31\x66\x33\x34\x32\x32\x39\x35\x39\x00\x00\x00\x00' + ) + self.empty_encoding = utils.BytearrayStream( + b'\x42\x00\x7C\x01\x00\x00\x00\x00' + ) + + def tearDown(self): + super(TestSignResponsePayload, self).tearDown() + + def test_init(self): + """ + Test that a Sign response payload can be constructed with no + arguments. + """ + payload = sign.SignResponsePayload() + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.signature_data) + + def test_init_with_args(self): + """ + Test that a Sign response payload can be constructed with valid + values. + """ + payload = sign.SignResponsePayload( + unique_identifier='00000000-1111-2222-3333-444444444444', + signature_data=b'\x01\x02\x03' + ) + + self.assertEqual( + '00000000-1111-2222-3333-444444444444', + payload.unique_identifier + ) + self.assertEqual(b'\x01\x02\x03', payload.signature_data) + + def test_invalid_unique_identifier(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the unique identifier of a Sign response payload. + """ + payload = sign.SignResponsePayload() + args = (payload, 'unique_identifier', 0) + self.assertRaisesRegexp( + TypeError, + "unique identifier must be a string", + setattr, + *args + ) + + def test_invalid_signature_data(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the signature data of a Sign response payload. + """ + payload = sign.SignResponsePayload() + args = (payload, 'signature_data', 0) + self.assertRaisesRegexp( + TypeError, + "signature data must be bytes", + setattr, + *args + ) + + def test_read(self): + """ + Test that a Sign response payload can be read from a data stream. + """ + payload = sign.SignResponsePayload() + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.signature_data) + + payload.read(self.full_encoding) + + self.assertEqual( + 'b4faee10-aa2a-4446-8ad4-0881f3422959', + payload.unique_identifier + ) + self.assertEqual( + b'\x01\x02\x03\x04\x05\x06\x07\x08' + b'\x09\x10\x11\x12\x13\x14\x15\x16', + payload.signature_data + ) + + def test_read_invalid(self): + """ + Test that a ValueError gets raised when required Sign response + payload attributes are missing from the payload encoding. + """ + payload = sign.SignResponsePayload() + args = (self.empty_encoding, ) + self.assertRaisesRegexp( + ValueError, + "invalid payload missing the unique identifier attribute", + payload.read, + *args + ) + + payload = sign.SignResponsePayload() + args = (self.incomplete_encoding, ) + self.assertRaisesRegexp( + ValueError, + "invalid payload missing the signature data attribute", + payload.read, + *args + ) + + def test_write(self): + """ + Test that a Sign response payload can be written to a data stream. + """ + payload = sign.SignResponsePayload( + unique_identifier='b4faee10-aa2a-4446-8ad4-0881f3422959', + signature_data=b'\x01\x02\x03\x04\x05\x06\x07\x08' + b'\x09\x10\x11\x12\x13\x14\x15\x16' + ) + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.full_encoding), len(stream)) + self.assertEqual(str(self.full_encoding), str(stream)) + + def test_write_invalid(self): + """ + Test that a ValueError gets raised when a required Sign response + payload attribute is missing when encoding the payload. + """ + payload = sign.SignResponsePayload() + stream = utils.BytearrayStream() + args = (stream, ) + self.assertRaisesRegexp( + ValueError, + "invalid payload missing the unique identifier attribute", + payload.write, + *args + ) + + def test_equal_on_equal(self): + """ + Test that the equality operator returns True when comparing two + equal sign response payloads + """ + encoding = utils.BytearrayStream(self.full_encoding.buffer) + a = sign.SignResponsePayload() + b = sign.SignResponsePayload() + + self.assertTrue(a == b) + self.assertTrue(b == a) + + a.read(encoding) + b.read(self.full_encoding) + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_not_equal_unique_identifier(self): + """ + Test that the equality operator returns False when comparing two + sign reponse payloads with different unique_identifier. + """ + a = sign.SignResponsePayload(unique_identifier='a', + signature_data=b'\x01') + + b = sign.SignResponsePayload(unique_identifier='b', + signature_data=b'\x01') + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_signature_data(self): + """ + Test that the equality operator returns False when comparing two + sign response payloads with different signature_data. + """ + a = sign.SignResponsePayload(unique_identifier='a', + signature_data=b'\x01') + b = sign.SignResponsePayload(unique_identifier='a', + signature_data=b'\x02') + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_type_mismatch(self): + """ + Test that the equality operator returns False when comparing a sign + response payload to another type. + """ + a = sign.SignResponsePayload() + b = 'invalid' + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_not_equal_on_equal(self): + """ + Test that the inequality operator returns false when comparing two + equal sign response payloads. + """ + encoding = utils.BytearrayStream(self.full_encoding.buffer) + a = sign.SignResponsePayload() + b = sign.SignResponsePayload() + + self.assertFalse(a != b) + self.assertFalse(b != a) + + a.read(encoding) + b.read(self.full_encoding) + + self.assertFalse(a != b) + self.assertFalse(b != a) + + def test_not_equal_on_not_equal_unique_identifier(self): + """ + Test that the inequality operator returns True when comparing two + sign response payloads with different unique_identifier. + """ + a = sign.SignResponsePayload(unique_identifier='a', + signature_data=b'\x01') + + b = sign.SignResponsePayload(unique_identifier='b', + signature_data=b'\x01') + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_signature_data(self): + """ + Test that the inequality operator returns True when comparing two + sign response payloads with different signature_data. + """ + a = sign.SignResponsePayload(unique_identifier='a', + signature_data=b'\x01') + b = sign.SignResponsePayload(unique_identifier='a', + signature_data=b'\x02') + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_type_mismatch(self): + """ + Test that the inequality operator returns True when comparing a + sign response payload to a different type. + """ + a = sign.SignResponsePayload() + b = 'invalid' + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_repr(self): + """ + Test that repr can applied to a sign response payload. + """ + payload = sign.SignResponsePayload( + unique_identifier='00000000-1111-2222-3333-444444444444', + signature_data=b'\x01\x02\x03' + ) + + expected = ( + "SignResponsePayload(" + "unique_identifier='00000000-1111-2222-3333-444444444444', " + "signature_data="+str(b'\x01\x02\x03') + ")" + ) + + observed = repr(payload) + self.assertEqual(expected, observed) + + def test_str(self): + """ + Test that str can be applied to a sign response payload. + """ + payload = sign.SignResponsePayload( + unique_identifier='00000000-1111-2222-3333-444444444444', + signature_data=b'\x01\x02\x03' + ) + + expected = str({ + 'unique_identifier': '00000000-1111-2222-3333-444444444444', + 'signature_data': b'\x01\x02\x03' + }) + + observed = str(payload) + self.assertEqual(expected, observed)