From 412b452adfff587b14acf6e4fbf37aed927b5a08 Mon Sep 17 00:00:00 2001 From: Peter Hamilton Date: Thu, 12 Apr 2018 22:34:58 -0400 Subject: [PATCH] Add the request and response payloads for the Rekey operation This change adds the request/response payloads for the Rekey operation. Unit tests are included to cover the new payloads. The payloads package is also updated to allow direct import of the Rekey payloads going forward. Partially addresses #405 --- kmip/core/messages/payloads/__init__.py | 6 + kmip/core/messages/payloads/rekey.py | 365 +++++ .../unit/core/messages/payloads/test_rekey.py | 1304 +++++++++++++++++ 3 files changed, 1675 insertions(+) create mode 100644 kmip/core/messages/payloads/rekey.py create mode 100644 kmip/tests/unit/core/messages/payloads/test_rekey.py diff --git a/kmip/core/messages/payloads/__init__.py b/kmip/core/messages/payloads/__init__.py index 0d3e72c..423ca07 100644 --- a/kmip/core/messages/payloads/__init__.py +++ b/kmip/core/messages/payloads/__init__.py @@ -104,6 +104,10 @@ from kmip.core.messages.payloads.rekey_key_pair import ( RekeyKeyPairRequestPayload, RekeyKeyPairResponsePayload ) +from kmip.core.messages.payloads.rekey import ( + RekeyRequestPayload, + RekeyResponsePayload +) from kmip.core.messages.payloads.revoke import ( RevokeRequestPayload, RevokeResponsePayload @@ -164,6 +168,8 @@ __all__ = [ "RegisterResponsePayload", "RekeyKeyPairRequestPayload", "RekeyKeyPairResponsePayload", + "RekeyRequestPayload", + "RekeyResponsePayload", "RevokeRequestPayload", "RevokeResponsePayload", "SignRequestPayload", diff --git a/kmip/core/messages/payloads/rekey.py b/kmip/core/messages/payloads/rekey.py new file mode 100644 index 0000000..94ce24e --- /dev/null +++ b/kmip/core/messages/payloads/rekey.py @@ -0,0 +1,365 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import six + +from kmip.core import enums +from kmip.core import objects +from kmip.core import primitives +from kmip.core import utils + + +class RekeyRequestPayload(primitives.Struct): + """ + A request payload for the Rekey operation. + + Attributes: + unique_identifier: The unique ID of the symmetric key to rekey. + offset: The interval between the initialization and activation dates + of the replacement key. + template_attribute: A collection of attributes that should be set on + the replacement key. + """ + def __init__(self, + unique_identifier=None, + offset=None, + template_attribute=None): + """ + Construct a Rekey request payload struct. + + Args: + unique_identifier (string): The ID of the symmetric key to rekey. + Optional, defaults to None. + offset (int): The number of seconds between the initialization and + activation dates of the replacement key. Optional, defaults to + None. + template_attribute (TemplateAttribute): A structure containing a + set of attributes (e.g., cryptographic algorithm, + cryptographic length) that should be set on the replacement + key. Optional, defaults to None. + """ + super(RekeyRequestPayload, self).__init__( + enums.Tags.REQUEST_PAYLOAD + ) + + self._unique_identifier = None + self._offset = None + self._template_attribute = None + + self.unique_identifier = unique_identifier + self.offset = offset + self.template_attribute = template_attribute + + @property + def unique_identifier(self): + if self._unique_identifier: + return self._unique_identifier.value + else: + return None + + @unique_identifier.setter + def unique_identifier(self, value): + if value is None: + self._unique_identifier = None + elif isinstance(value, six.string_types): + self._unique_identifier = primitives.TextString( + value=value, + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + else: + raise TypeError("Unique identifier must be a string.") + + @property + def offset(self): + if self._offset: + return self._offset.value + else: + return None + + @offset.setter + def offset(self, value): + if value is None: + self._offset = None + elif isinstance(value, six.integer_types): + self._offset = primitives.Interval( + value=value, + tag=enums.Tags.OFFSET + ) + else: + raise TypeError("Offset must be an integer.") + + @property + def template_attribute(self): + if self._template_attribute: + return self._template_attribute + else: + return None + + @template_attribute.setter + def template_attribute(self, value): + if value is None: + self._template_attribute = None + elif isinstance(value, objects.TemplateAttribute): + self._template_attribute = value + else: + raise TypeError( + "Template attribute must be a TemplateAttribute struct." + ) + + def read(self, input_stream): + """ + Read the data encoding the Rekey request payload and decode it into + its constituent parts. + + Args: + input_stream (stream): A data stream containing encoded object + data, supporting a read method; usually a BytearrayStream + object. + """ + super(RekeyRequestPayload, self).read(input_stream) + local_stream = utils.BytearrayStream(input_stream.read(self.length)) + + if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, local_stream): + self._unique_identifier = primitives.TextString( + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + self._unique_identifier.read(local_stream) + + if self.is_tag_next(enums.Tags.OFFSET, local_stream): + self._offset = primitives.Interval( + tag=enums.Tags.OFFSET + ) + self._offset.read(local_stream) + + if self.is_tag_next(enums.Tags.TEMPLATE_ATTRIBUTE, local_stream): + self._template_attribute = objects.TemplateAttribute() + self._template_attribute.read(local_stream) + + self.is_oversized(local_stream) + + def write(self, output_stream): + """ + Write the data encoding the Rekey request payload to a stream. + + Args: + output_stream (stream): A data stream in which to encode object + data, supporting a write method; usually a BytearrayStream + object. + """ + local_stream = utils.BytearrayStream() + + if self._unique_identifier is not None: + self._unique_identifier.write(local_stream) + if self._offset is not None: + self._offset.write(local_stream) + if self._template_attribute is not None: + self._template_attribute.write(local_stream) + + self.length = local_stream.length() + super(RekeyRequestPayload, self).write(output_stream) + output_stream.write(local_stream.buffer) + + def __eq__(self, other): + if isinstance(other, RekeyRequestPayload): + if self.unique_identifier != other.unique_identifier: + return False + elif self.offset != other.offset: + return False + elif self.template_attribute != other.template_attribute: + return False + else: + return True + else: + return NotImplemented + + def __ne__(self, other): + if isinstance(other, RekeyRequestPayload): + return not (self == other) + else: + return NotImplemented + + def __repr__(self): + args = ", ".join([ + "unique_identifier='{0}'".format(self.unique_identifier), + "offset={0}".format(self.offset), + "template_attribute={0}".format(repr(self.template_attribute)) + ]) + return "RekeyRequestPayload({0})".format(args) + + def __str__(self): + return str({ + 'unique_identifier': self.unique_identifier, + 'offset': self.offset, + 'template_attribute': str(self.template_attribute) + }) + + +class RekeyResponsePayload(primitives.Struct): + """ + A response payload for the Rekey operation. + + Attributes: + unique_identifier: The unique ID of the replacement key. + template_attribute: A collection of server attributes that were set on + the replacement key. + """ + def __init__(self, + unique_identifier=None, + template_attribute=None): + """ + Construct a Rekey response payload struct. + + Args: + unique_identifier (string): The ID of the replacement key. + Optional, defaults to None. Required for read/write. + template_attribute (TemplateAttribute): A structure containing a + set of attributes (e.g., cryptographic algorithm, + cryptographic length) that were set by the server on the + replacement key. Optional, defaults to None. + """ + super(RekeyResponsePayload, self).__init__( + enums.Tags.RESPONSE_PAYLOAD + ) + + self._unique_identifier = None + self._template_attribute = None + + self.unique_identifier = unique_identifier + self.template_attribute = template_attribute + + @property + def unique_identifier(self): + if self._unique_identifier: + return self._unique_identifier.value + else: + return None + + @unique_identifier.setter + def unique_identifier(self, value): + if value is None: + self._unique_identifier = None + elif isinstance(value, six.string_types): + self._unique_identifier = primitives.TextString( + value=value, + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + else: + raise TypeError("Unique identifier must be a string.") + + @property + def template_attribute(self): + if self._template_attribute: + return self._template_attribute + else: + return None + + @template_attribute.setter + def template_attribute(self, value): + if value is None: + self._template_attribute = None + elif isinstance(value, objects.TemplateAttribute): + self._template_attribute = value + else: + raise TypeError( + "Template attribute must be a TemplateAttribute struct." + ) + + def read(self, input_stream): + """ + Read the data encoding the Rekey response payload and decode it into + its constituent parts. + + Args: + input_stream (stream): A data stream containing encoded object + data, supporting a read method; usually a BytearrayStream + object. + + Raises: + ValueError: Raised if the unique identifier attribute is missing + from the encoded payload. + """ + super(RekeyResponsePayload, self).read(input_stream) + local_stream = utils.BytearrayStream(input_stream.read(self.length)) + + if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, local_stream): + self._unique_identifier = primitives.TextString( + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + self._unique_identifier.read(local_stream) + else: + raise ValueError( + "The Rekey response payload encoding is missing the unique " + "identifier." + ) + + if self.is_tag_next(enums.Tags.TEMPLATE_ATTRIBUTE, local_stream): + self._template_attribute = objects.TemplateAttribute() + self._template_attribute.read(local_stream) + + self.is_oversized(local_stream) + + def write(self, output_stream): + """ + Write the data encoding the Rekey request payload to a stream. + + Args: + output_stream (stream): A data stream in which to encode object + data, supporting a write method; usually a BytearrayStream + object. + + Raises: + ValueError: Raised if the payload is missing the unique identifier. + """ + local_stream = utils.BytearrayStream() + + if self._unique_identifier is not None: + self._unique_identifier.write(local_stream) + else: + raise ValueError( + "The Rekey response payload is missing the unique identifier." + ) + if self._template_attribute is not None: + self._template_attribute.write(local_stream) + + self.length = local_stream.length() + super(RekeyResponsePayload, self).write(output_stream) + output_stream.write(local_stream.buffer) + + def __eq__(self, other): + if isinstance(other, RekeyResponsePayload): + if self.unique_identifier != other.unique_identifier: + return False + elif self.template_attribute != other.template_attribute: + return False + else: + return True + else: + return NotImplemented + + def __ne__(self, other): + if isinstance(other, RekeyResponsePayload): + return not (self == other) + else: + return NotImplemented + + def __repr__(self): + args = ", ".join([ + "unique_identifier='{0}'".format(self.unique_identifier), + "template_attribute={0}".format(repr(self.template_attribute)) + ]) + return "RekeyResponsePayload({0})".format(args) + + def __str__(self): + return str({ + 'unique_identifier': self.unique_identifier, + 'template_attribute': str(self.template_attribute) + }) diff --git a/kmip/tests/unit/core/messages/payloads/test_rekey.py b/kmip/tests/unit/core/messages/payloads/test_rekey.py new file mode 100644 index 0000000..ed1d4fc --- /dev/null +++ b/kmip/tests/unit/core/messages/payloads/test_rekey.py @@ -0,0 +1,1304 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import testtools + +from kmip.core import enums +from kmip.core import objects +from kmip.core import primitives +from kmip.core import utils + +from kmip.core.messages import payloads + + +class TestRekeyRequestPayload(testtools.TestCase): + """ + Test suite for the Rekey request payload. + """ + + def setUp(self): + super(TestRekeyRequestPayload, self).setUp() + + # Encoding obtained from the KMIP 1.1 testing document, + # Sections 9.2 and 9.4. + # + # This encoding matches the following set of values: + # Request Payload + # Unique Identifier - 1346d253-69d6-474c-8cd5-ad475a3e0a81 + # Offset - 0 + # Template Attribute + # Attribute + # Attribute Name - Activation Date + # Attribute Value - Sun Jan 01 12:00:00 CET 2006 + # Attribute + # Attribute Name - Process Start Date + # Attribute Value - Sun Jan 01 12:00:00 CET 2006 + # Attribute + # Attribute Name - Protect Stop Date + # Attribute Value - Wed Jan 01 12:00:00 CET 2020 + # Attribute + # Attribute Name - Deactivation Date + # Attribute Value - Wed Jan 01 12:00:00 CET 2020 + + self.full_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x01\x20' + b'\x42\x00\x94\x07\x00\x00\x00\x24' + b'\x31\x33\x34\x36\x64\x32\x35\x33\x2D\x36\x39\x64\x36\x2D\x34\x37' + b'\x34\x63\x2D\x38\x63\x64\x35\x2D\x61\x64\x34\x37\x35\x61\x33\x65' + b'\x30\x61\x38\x31\x00\x00\x00\x00' + b'\x42\x00\x58\x0A\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x42\x00\x91\x01\x00\x00\x00\xD8' + b'\x42\x00\x08\x01\x00\x00\x00\x28' + b'\x42\x00\x0A\x07\x00\x00\x00\x0F' + b'\x41\x63\x74\x69\x76\x61\x74\x69\x6F\x6E\x20\x44\x61\x74\x65\x00' + b'\x42\x00\x0B\x09\x00\x00\x00\x08\x00\x00\x00\x00\x43\xB7\xB6\x30' + b'\x42\x00\x08\x01\x00\x00\x00\x30' + b'\x42\x00\x0A\x07\x00\x00\x00\x12' + b'\x50\x72\x6F\x63\x65\x73\x73\x20\x53\x74\x61\x72\x74\x20\x44\x61' + b'\x74\x65\x00\x00\x00\x00\x00\x00' + b'\x42\x00\x0B\x09\x00\x00\x00\x08\x00\x00\x00\x00\x43\xB7\xB6\x30' + b'\x42\x00\x08\x01\x00\x00\x00\x30' + b'\x42\x00\x0A\x07\x00\x00\x00\x11' + b'\x50\x72\x6F\x74\x65\x63\x74\x20\x53\x74\x6F\x70\x20\x44\x61\x74' + b'\x65\x00\x00\x00\x00\x00\x00\x00' + b'\x42\x00\x0B\x09\x00\x00\x00\x08\x00\x00\x00\x00\x5E\x0C\x7B\xB0' + b'\x42\x00\x08\x01\x00\x00\x00\x30' + b'\x42\x00\x0A\x07\x00\x00\x00\x11' + b'\x44\x65\x61\x63\x74\x69\x76\x61\x74\x69\x6F\x6E\x20\x44\x61\x74' + b'\x65\x00\x00\x00\x00\x00\x00\x00' + b'\x42\x00\x0B\x09\x00\x00\x00\x08\x00\x00\x00\x00\x5E\x0C\x7B\xB0' + ) + + # Encoding obtained from the KMIP 1.1 testing document, Section 9.1. + # + # This encoding matches the following set of values: + # Request Payload + # Unique Identifier - 964d3dd2-5f06-4529-8bb8-ae630b6ca2e0 + + self.partial_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x00\x30' + b'\x42\x00\x94\x07\x00\x00\x00\x24' + b'\x39\x36\x34\x64\x33\x64\x64\x32\x2D\x35\x66\x30\x36\x2D\x34\x35' + b'\x32\x39\x2D\x38\x62\x62\x38\x2D\x61\x65\x36\x33\x30\x62\x36\x63' + b'\x61\x32\x65\x30\x00\x00\x00\x00' + ) + + self.empty_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x00\x00' + ) + + def tearDown(self): + super(TestRekeyRequestPayload, self).tearDown() + + def test_init(self): + """ + Test that a Rekey request payload can be constructed with no arguments. + """ + payload = payloads.RekeyRequestPayload() + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.offset) + self.assertEqual(None, payload.template_attribute) + + def test_init_with_args(self): + """ + Test that a Rekey request payload can be constructed with valid values. + """ + payload = payloads.RekeyRequestPayload( + unique_identifier='00000000-2222-4444-6666-888888888888', + offset=0, + template_attribute=objects.TemplateAttribute() + ) + + self.assertEqual( + '00000000-2222-4444-6666-888888888888', + payload.unique_identifier + ) + self.assertEqual(0, payload.offset) + self.assertEqual( + objects.TemplateAttribute(), + payload.template_attribute + ) + + def test_invalid_unique_identifier(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the unique identifier of a Rekey request payload. + """ + kwargs = {'unique_identifier': 0} + self.assertRaisesRegexp( + TypeError, + "Unique identifier must be a string.", + payloads.RekeyRequestPayload, + **kwargs + ) + + args = (payloads.RekeyRequestPayload(), 'unique_identifier', 0) + self.assertRaisesRegexp( + TypeError, + "Unique identifier must be a string.", + setattr, + *args + ) + + def test_invalid_offset(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the offset of a Rekey request payload. + """ + kwargs = {'offset': 'invalid'} + self.assertRaisesRegexp( + TypeError, + "Offset must be an integer.", + payloads.RekeyRequestPayload, + **kwargs + ) + + args = (payloads.RekeyRequestPayload(), 'offset', 'invalid') + self.assertRaisesRegexp( + TypeError, + "Offset must be an integer.", + setattr, + *args + ) + + def test_invalid_template_attribute(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the template attribute of a Rekey request payload. + """ + kwargs = {'template_attribute': 'invalid'} + self.assertRaisesRegexp( + TypeError, + "Template attribute must be a TemplateAttribute struct.", + payloads.RekeyRequestPayload, + **kwargs + ) + + args = ( + payloads.RekeyRequestPayload(), + 'template_attribute', + 'invalid' + ) + self.assertRaisesRegexp( + TypeError, + "Template attribute must be a TemplateAttribute struct.", + setattr, + *args + ) + + def test_read(self): + """ + Test that a Rekey request payload can be read from a data stream. + """ + payload = payloads.RekeyRequestPayload() + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.offset) + self.assertEqual(None, payload.template_attribute) + + payload.read(self.full_encoding) + + self.assertEqual( + '1346d253-69d6-474c-8cd5-ad475a3e0a81', + payload.unique_identifier + ) + self.assertEqual(0, payload.offset) + self.assertEqual( + objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Activation Date' + ), + attribute_value=primitives.DateTime( + value=1136113200, + tag=enums.Tags.ACTIVATION_DATE + ) + ), + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Process Start Date' + ), + attribute_value=primitives.DateTime( + value=1136113200, + tag=enums.Tags.PROCESS_START_DATE + ) + ), + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Protect Stop Date' + ), + attribute_value=primitives.DateTime( + value=1577876400, + tag=enums.Tags.PROTECT_STOP_DATE + ) + ), + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Deactivation Date' + ), + attribute_value=primitives.DateTime( + value=1577876400, + tag=enums.Tags.DEACTIVATION_DATE + ) + ) + ] + ), + payload.template_attribute + ) + + def test_read_partial(self): + """ + Test that a Rekey request payload can be read from a partial data + stream. + """ + payload = payloads.RekeyRequestPayload() + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.offset) + self.assertEqual(None, payload.template_attribute) + + payload.read(self.partial_encoding) + + self.assertEqual( + '964d3dd2-5f06-4529-8bb8-ae630b6ca2e0', + payload.unique_identifier + ) + self.assertEqual(None, payload.offset) + self.assertEqual(None, payload.template_attribute) + + def test_read_empty(self): + """ + Test that a Rekey request payload can be read from an empty data + stream. + """ + payload = payloads.RekeyRequestPayload() + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.offset) + self.assertEqual(None, payload.template_attribute) + + payload.read(self.empty_encoding) + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.offset) + self.assertEqual(None, payload.template_attribute) + + def test_write(self): + """ + Test that a Rekey request payload can be written to a data stream. + """ + payload = payloads.RekeyRequestPayload( + unique_identifier='1346d253-69d6-474c-8cd5-ad475a3e0a81', + offset=0, + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Activation Date' + ), + attribute_value=primitives.DateTime( + value=1136113200, + tag=enums.Tags.ACTIVATION_DATE + ) + ), + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Process Start Date' + ), + attribute_value=primitives.DateTime( + value=1136113200, + tag=enums.Tags.PROCESS_START_DATE + ) + ), + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Protect Stop Date' + ), + attribute_value=primitives.DateTime( + value=1577876400, + tag=enums.Tags.PROTECT_STOP_DATE + ) + ), + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Deactivation Date' + ), + attribute_value=primitives.DateTime( + value=1577876400, + tag=enums.Tags.DEACTIVATION_DATE + ) + ) + ] + ) + ) + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.full_encoding), len(stream)) + self.assertEqual(str(self.full_encoding), str(stream)) + + def test_write_partial(self): + """ + Test that a partial Rekey request payload can be written to a data + stream. + """ + payload = payloads.RekeyRequestPayload( + unique_identifier='964d3dd2-5f06-4529-8bb8-ae630b6ca2e0' + ) + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.partial_encoding), len(stream)) + self.assertEqual(str(self.partial_encoding), str(stream)) + + def test_write_empty(self): + """ + Test that an empty Rekey request payload can be written to a data + stream. + """ + payload = payloads.RekeyRequestPayload() + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.empty_encoding), len(stream)) + self.assertEqual(str(self.empty_encoding), str(stream)) + + def test_equal_on_equal(self): + """ + Test that the equality operator returns True when comparing two Rekey + request payloads with the same data. + """ + a = payloads.RekeyRequestPayload() + b = payloads.RekeyRequestPayload() + + self.assertTrue(a == b) + self.assertTrue(b == a) + + a = payloads.RekeyRequestPayload( + unique_identifier='1346d253-69d6-474c-8cd5-ad475a3e0a81', + offset=0, + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Activation Date' + ), + attribute_value=primitives.DateTime( + value=1136113200, + tag=enums.Tags.ACTIVATION_DATE + ) + ) + ] + ) + ) + b = payloads.RekeyRequestPayload( + unique_identifier='1346d253-69d6-474c-8cd5-ad475a3e0a81', + offset=0, + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Activation Date' + ), + attribute_value=primitives.DateTime( + value=1136113200, + tag=enums.Tags.ACTIVATION_DATE + ) + ) + ] + ) + ) + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_not_equal_unique_identifier(self): + """ + Test that the equality operator returns False when comparing two Rekey + request payloads with different unique identifiers. + """ + a = payloads.RekeyRequestPayload( + unique_identifier='a' + ) + b = payloads.RekeyRequestPayload( + unique_identifier='b' + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_offset(self): + """ + Test that the equality operator returns False when comparing two Rekey + request payloads with different offsets. + """ + a = payloads.RekeyRequestPayload( + offset=0 + ) + b = payloads.RekeyRequestPayload( + offset=1 + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_template_attribute(self): + """ + Test that the equality operator returns False when comparing two Rekey + request payloads with different template attributes. + """ + a = payloads.RekeyRequestPayload( + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Protect Stop Date' + ), + attribute_value=primitives.DateTime( + value=1577876400, + tag=enums.Tags.PROTECT_STOP_DATE + ) + ) + ] + ) + ) + b = payloads.RekeyRequestPayload( + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Deactivation Date' + ), + attribute_value=primitives.DateTime( + value=1577876400, + tag=enums.Tags.DEACTIVATION_DATE + ) + ) + ] + ) + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_type_mismatch(self): + """ + Test that the equality operator returns False when comparing two Rekey + request payloads with different types. + """ + a = payloads.RekeyRequestPayload() + b = 'invalid' + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_not_equal_on_equal(self): + """ + Test that the inequality operator returns False when comparing two + Rekey request payloads with the same data. + """ + a = payloads.RekeyRequestPayload() + b = payloads.RekeyRequestPayload() + + self.assertFalse(a != b) + self.assertFalse(b != a) + + a = payloads.RekeyRequestPayload( + unique_identifier='1346d253-69d6-474c-8cd5-ad475a3e0a81', + offset=0, + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Activation Date' + ), + attribute_value=primitives.DateTime( + value=1136113200, + tag=enums.Tags.ACTIVATION_DATE + ) + ) + ] + ) + ) + b = payloads.RekeyRequestPayload( + unique_identifier='1346d253-69d6-474c-8cd5-ad475a3e0a81', + offset=0, + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Activation Date' + ), + attribute_value=primitives.DateTime( + value=1136113200, + tag=enums.Tags.ACTIVATION_DATE + ) + ) + ] + ) + ) + + self.assertFalse(a != b) + self.assertFalse(b != a) + + def test_not_equal_on_not_equal_unique_identifier(self): + """ + Test that the equality operator returns True when comparing two Rekey + request payloads with different unique identifiers. + """ + a = payloads.RekeyRequestPayload( + unique_identifier='a' + ) + b = payloads.RekeyRequestPayload( + unique_identifier='b' + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_offset(self): + """ + Test that the inequality operator returns True when comparing two Rekey + request payloads with different offsets. + """ + a = payloads.RekeyRequestPayload( + offset=0 + ) + b = payloads.RekeyRequestPayload( + offset=1 + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_template_attribute(self): + """ + Test that the inequality operator returns True when comparing two Rekey + request payloads with different template attributes. + """ + a = payloads.RekeyRequestPayload( + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Protect Stop Date' + ), + attribute_value=primitives.DateTime( + value=1577876400, + tag=enums.Tags.PROTECT_STOP_DATE + ) + ) + ] + ) + ) + b = payloads.RekeyRequestPayload( + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Deactivation Date' + ), + attribute_value=primitives.DateTime( + value=1577876400, + tag=enums.Tags.DEACTIVATION_DATE + ) + ) + ] + ) + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_type_mismatch(self): + """ + Test that the inequality operator returns True when comparing two Rekey + request payloads with different types. + """ + a = payloads.RekeyRequestPayload() + b = 'invalid' + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_repr(self): + """ + Test that repr can be applied to a Rekey request payload. + """ + payload = payloads.RekeyRequestPayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038', + offset=0, + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Deactivation Date' + ), + attribute_value=primitives.DateTime( + value=1577876400, + tag=enums.Tags.DEACTIVATION_DATE + ) + ) + ] + ) + ) + + # TODO (peter-hamilton) Update this when TemplateAttributes have repr + expected = ( + "RekeyRequestPayload(" + "unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038', " + "offset=0, " + "template_attribute=Struct())" + ) + observed = repr(payload) + + self.assertEqual(expected, observed) + + def test_str(self): + """ + Test that str can be applied to a Rekey request payload + """ + payload = payloads.RekeyRequestPayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038', + offset=0, + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Deactivation Date' + ), + attribute_value=primitives.DateTime( + value=1577876400, + tag=enums.Tags.DEACTIVATION_DATE + ) + ) + ] + ) + ) + + # TODO (peter-hamilton) Update this when TemplateAttributes have str + expected = str({ + 'unique_identifier': '49a1ca88-6bea-4fb2-b450-7e58802c3038', + 'offset': 0, + 'template_attribute': 'Struct()' + }) + observed = str(payload) + + self.assertEqual(expected, observed) + + +class TestRekeyResponsePayload(testtools.TestCase): + """ + Test suite for the Rekey response payload. + """ + + def setUp(self): + super(TestRekeyResponsePayload, self).setUp() + + # Encoding obtained from the KMIP 1.1 testing document, + # Sections 3.1.1 and 9.2. + # + # This encoding matches the following set of values: + # Response Payload + # Unique Identifier - 8efbbd67-2847-46b5-b7e7-4ab3b5e175de + # Template Attribute + # Attribute + # Attribute Name - Cryptographic Algorithm + # Attribute Value - AES + # Attribute + # Attribute Name - Cryptographic Length + # Attribute Value - 128 + + self.full_encoding = utils.BytearrayStream( + b'\x42\x00\x7C\x01\x00\x00\x00\xA8' + b'\x42\x00\x94\x07\x00\x00\x00\x24' + b'\x38\x65\x66\x62\x62\x64\x36\x37\x2D\x32\x38\x34\x37\x2D\x34\x36' + b'\x62\x35\x2D\x62\x37\x65\x37\x2D\x34\x61\x62\x33\x62\x35\x65\x31' + b'\x37\x35\x64\x65\x00\x00\x00\x00' + b'\x42\x00\x91\x01\x00\x00\x00\x70' + b'\x42\x00\x08\x01\x00\x00\x00\x30' + b'\x42\x00\x0A\x07\x00\x00\x00\x17' + b'\x43\x72\x79\x70\x74\x6F\x67\x72\x61\x70\x68\x69\x63\x20\x41\x6C' + b'\x67\x6F\x72\x69\x74\x68\x6D\x00' + b'\x42\x00\x0B\x05\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00' + b'\x42\x00\x08\x01\x00\x00\x00\x30' + b'\x42\x00\x0A\x07\x00\x00\x00\x14' + b'\x43\x72\x79\x70\x74\x6F\x67\x72\x61\x70\x68\x69\x63\x20\x4C\x65' + b'\x6E\x67\x74\x68\x00\x00\x00\x00' + b'\x42\x00\x0B\x02\x00\x00\x00\x04\x00\x00\x00\x80\x00\x00\x00\x00' + ) + + # Encoding obtained from the KMIP 1.1 testing document, + # Sections 3.1.1 and 9.2. + # + # This encoding matches the following set of values: + # Response Payload + # Unique Identifier - 8efbbd67-2847-46b5-b7e7-4ab3b5e175de + + self.partial_encoding = utils.BytearrayStream( + b'\x42\x00\x7C\x01\x00\x00\x00\x30' + b'\x42\x00\x94\x07\x00\x00\x00\x24' + b'\x38\x65\x66\x62\x62\x64\x36\x37\x2D\x32\x38\x34\x37\x2D\x34\x36' + b'\x62\x35\x2D\x62\x37\x65\x37\x2D\x34\x61\x62\x33\x62\x35\x65\x31' + b'\x37\x35\x64\x65\x00\x00\x00\x00' + ) + + self.empty_encoding = utils.BytearrayStream( + b'\x42\x00\x7C\x01\x00\x00\x00\x00' + ) + + def tearDown(self): + super(TestRekeyResponsePayload, self).tearDown() + + def test_init(self): + """ + Test that a Rekey response payload can be constructed with no + arguments. + """ + payload = payloads.RekeyResponsePayload() + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.template_attribute) + + def test_init_with_args(self): + """ + Test that a Rekey response payload can be constructed with valid + values. + """ + payload = payloads.RekeyResponsePayload( + unique_identifier='00000000-2222-4444-6666-888888888888', + template_attribute=objects.TemplateAttribute() + ) + + self.assertEqual( + '00000000-2222-4444-6666-888888888888', + payload.unique_identifier + ) + self.assertEqual( + objects.TemplateAttribute(), + payload.template_attribute + ) + + def test_invalid_unique_identifier(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the unique identifier of a Rekey response payload. + """ + kwargs = {'unique_identifier': 0} + self.assertRaisesRegexp( + TypeError, + "Unique identifier must be a string.", + payloads.RekeyResponsePayload, + **kwargs + ) + + args = (payloads.RekeyResponsePayload(), 'unique_identifier', 0) + self.assertRaisesRegexp( + TypeError, + "Unique identifier must be a string.", + setattr, + *args + ) + + def test_invalid_template_attribute(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the template attribute of a Rekey response payload. + """ + kwargs = {'template_attribute': 'invalid'} + self.assertRaisesRegexp( + TypeError, + "Template attribute must be a TemplateAttribute struct.", + payloads.RekeyResponsePayload, + **kwargs + ) + + args = ( + payloads.RekeyResponsePayload(), + 'template_attribute', + 'invalid' + ) + self.assertRaisesRegexp( + TypeError, + "Template attribute must be a TemplateAttribute struct.", + setattr, + *args + ) + + def test_read(self): + """ + Test that a Rekey response payload can be read from a data stream. + """ + payload = payloads.RekeyResponsePayload() + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.template_attribute) + + payload.read(self.full_encoding) + + self.assertEqual( + '8efbbd67-2847-46b5-b7e7-4ab3b5e175de', + payload.unique_identifier + ) + self.assertEqual( + objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Algorithm' + ), + attribute_value=primitives.Enumeration( + enums.CryptographicAlgorithm, + value=enums.CryptographicAlgorithm.AES, + tag=enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ) + ), + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Length' + ), + attribute_value=primitives.Integer( + value=128, + tag=enums.Tags.CRYPTOGRAPHIC_LENGTH + ) + ) + ] + ), + payload.template_attribute + ) + + def test_read_partial(self): + """ + Test that a Rekey response payload can be read from a partial data + stream. + """ + payload = payloads.RekeyResponsePayload() + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.template_attribute) + + payload.read(self.partial_encoding) + + self.assertEqual( + '8efbbd67-2847-46b5-b7e7-4ab3b5e175de', + payload.unique_identifier + ) + self.assertEqual(None, payload.template_attribute) + + def test_read_invalid(self): + """ + Test that a ValueError gets raised when a required Rekey response + payload attribute is missing from the payload encoding. + """ + payload = payloads.RekeyResponsePayload() + args = (self.empty_encoding, ) + self.assertRaisesRegexp( + ValueError, + "The Rekey response payload encoding is missing the unique " + "identifier.", + payload.read, + *args + ) + + def test_write(self): + """ + Test that a Rekey response payload can be written to a data stream. + """ + payload = payloads.RekeyResponsePayload( + unique_identifier='8efbbd67-2847-46b5-b7e7-4ab3b5e175de', + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Algorithm' + ), + attribute_value=primitives.Enumeration( + enums.CryptographicAlgorithm, + value=enums.CryptographicAlgorithm.AES, + tag=enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ) + ), + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Length' + ), + attribute_value=primitives.Integer( + value=128, + tag=enums.Tags.CRYPTOGRAPHIC_LENGTH + ) + ) + ] + ) + ) + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.full_encoding), len(stream)) + self.assertEqual(str(self.full_encoding), str(stream)) + + def test_write_partial(self): + """ + Test that a partial Rekey response payload can be written to a data + stream. + """ + payload = payloads.RekeyResponsePayload( + unique_identifier='8efbbd67-2847-46b5-b7e7-4ab3b5e175de' + ) + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.partial_encoding), len(stream)) + self.assertEqual(str(self.partial_encoding), str(stream)) + + def test_write_invalid(self): + """ + Test that a ValueError gets raised when a required Rekey response + payload attribute is missing when encoding the payload. + """ + payload = payloads.RekeyResponsePayload() + stream = utils.BytearrayStream() + args = (stream, ) + self.assertRaisesRegexp( + ValueError, + "The Rekey response payload is missing the unique identifier.", + payload.write, + *args + ) + + def test_equal_on_equal(self): + """ + Test that the equality operator returns True when comparing two Rekey + response payloads with the same data. + """ + a = payloads.RekeyResponsePayload() + b = payloads.RekeyResponsePayload() + + self.assertTrue(a == b) + self.assertTrue(b == a) + + a = payloads.RekeyResponsePayload( + unique_identifier='1346d253-69d6-474c-8cd5-ad475a3e0a81', + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Algorithm' + ), + attribute_value=primitives.Enumeration( + enums.CryptographicAlgorithm, + value=enums.CryptographicAlgorithm.AES, + tag=enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ) + ), + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Length' + ), + attribute_value=primitives.Integer( + value=128, + tag=enums.Tags.CRYPTOGRAPHIC_LENGTH + ) + ) + ] + ) + ) + b = payloads.RekeyResponsePayload( + unique_identifier='1346d253-69d6-474c-8cd5-ad475a3e0a81', + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Algorithm' + ), + attribute_value=primitives.Enumeration( + enums.CryptographicAlgorithm, + value=enums.CryptographicAlgorithm.AES, + tag=enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ) + ), + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Length' + ), + attribute_value=primitives.Integer( + value=128, + tag=enums.Tags.CRYPTOGRAPHIC_LENGTH + ) + ) + ] + ) + ) + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_not_equal_unique_identifier(self): + """ + Test that the equality operator returns False when comparing two Rekey + response payloads with different unique identifiers. + """ + a = payloads.RekeyResponsePayload( + unique_identifier='a' + ) + b = payloads.RekeyResponsePayload( + unique_identifier='b' + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_template_attribute(self): + """ + Test that the equality operator returns False when comparing two Rekey + response payloads with different template attributes. + """ + a = payloads.RekeyResponsePayload( + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Algorithm' + ), + attribute_value=primitives.Enumeration( + enums.CryptographicAlgorithm, + value=enums.CryptographicAlgorithm.AES, + tag=enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ) + ) + ] + ) + ) + b = payloads.RekeyResponsePayload( + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Length' + ), + attribute_value=primitives.Integer( + value=128, + tag=enums.Tags.CRYPTOGRAPHIC_LENGTH + ) + ) + ] + ) + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_type_mismatch(self): + """ + Test that the equality operator returns False when comparing two Rekey + response payloads with different types. + """ + a = payloads.RekeyResponsePayload() + b = 'invalid' + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_not_equal_on_equal(self): + """ + Test that the inequality operator returns False when comparing two + Rekey response payloads with the same data. + """ + a = payloads.RekeyResponsePayload() + b = payloads.RekeyResponsePayload() + + self.assertFalse(a != b) + self.assertFalse(b != a) + + a = payloads.RekeyResponsePayload( + unique_identifier='1346d253-69d6-474c-8cd5-ad475a3e0a81', + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Algorithm' + ), + attribute_value=primitives.Enumeration( + enums.CryptographicAlgorithm, + value=enums.CryptographicAlgorithm.AES, + tag=enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ) + ), + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Length' + ), + attribute_value=primitives.Integer( + value=128, + tag=enums.Tags.CRYPTOGRAPHIC_LENGTH + ) + ) + ] + ) + ) + b = payloads.RekeyResponsePayload( + unique_identifier='1346d253-69d6-474c-8cd5-ad475a3e0a81', + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Algorithm' + ), + attribute_value=primitives.Enumeration( + enums.CryptographicAlgorithm, + value=enums.CryptographicAlgorithm.AES, + tag=enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ) + ), + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Length' + ), + attribute_value=primitives.Integer( + value=128, + tag=enums.Tags.CRYPTOGRAPHIC_LENGTH + ) + ) + ] + ) + ) + + self.assertFalse(a != b) + self.assertFalse(b != a) + + def test_not_equal_on_not_equal_unique_identifier(self): + """ + Test that the inequality operator returns True when comparing two Rekey + response payloads with different unique identifiers. + """ + a = payloads.RekeyResponsePayload( + unique_identifier='a' + ) + b = payloads.RekeyResponsePayload( + unique_identifier='b' + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_template_attribute(self): + """ + Test that the inequality operator returns True when comparing two Rekey + response payloads with different template attributes. + """ + a = payloads.RekeyResponsePayload( + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Algorithm' + ), + attribute_value=primitives.Enumeration( + enums.CryptographicAlgorithm, + value=enums.CryptographicAlgorithm.AES, + tag=enums.Tags.CRYPTOGRAPHIC_ALGORITHM + ) + ) + ] + ) + ) + b = payloads.RekeyResponsePayload( + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Length' + ), + attribute_value=primitives.Integer( + value=128, + tag=enums.Tags.CRYPTOGRAPHIC_LENGTH + ) + ) + ] + ) + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_type_mismatch(self): + """ + Test that the inequality operator returns True when comparing two Rekey + response payloads with different types. + """ + a = payloads.RekeyResponsePayload() + b = 'invalid' + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_repr(self): + """ + Test that repr can be applied to a Rekey response payload. + """ + payload = payloads.RekeyResponsePayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038', + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Length' + ), + attribute_value=primitives.Integer( + value=128, + tag=enums.Tags.CRYPTOGRAPHIC_LENGTH + ) + ) + ] + ) + ) + + # TODO (peter-hamilton) Update this when TemplateAttributes have repr + expected = ( + "RekeyResponsePayload(" + "unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038', " + "template_attribute=Struct())" + ) + observed = repr(payload) + + self.assertEqual(expected, observed) + + def test_str(self): + """ + Test that str can be applied to a Rekey response payload + """ + payload = payloads.RekeyResponsePayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038', + template_attribute=objects.TemplateAttribute( + attributes=[ + objects.Attribute( + attribute_name=objects.Attribute.AttributeName( + 'Cryptographic Length' + ), + attribute_value=primitives.Integer( + value=128, + tag=enums.Tags.CRYPTOGRAPHIC_LENGTH + ) + ) + ] + ) + ) + + # TODO (peter-hamilton) Update this when TemplateAttributes have str + expected = str({ + 'unique_identifier': '49a1ca88-6bea-4fb2-b450-7e58802c3038', + 'template_attribute': 'Struct()' + }) + observed = str(payload) + + self.assertEqual(expected, observed)