diff --git a/kmip/core/messages/payloads/__init__.py b/kmip/core/messages/payloads/__init__.py index 711f50a..c089d2a 100644 --- a/kmip/core/messages/payloads/__init__.py +++ b/kmip/core/messages/payloads/__init__.py @@ -17,6 +17,10 @@ from kmip.core.messages.payloads.activate import ( ActivateRequestPayload, ActivateResponsePayload ) +from kmip.core.messages.payloads.archive import ( + ArchiveRequestPayload, + ArchiveResponsePayload +) from kmip.core.messages.payloads.create import ( CreateRequestPayload, CreateResponsePayload @@ -94,6 +98,8 @@ from kmip.core.messages.payloads.signature_verify import ( __all__ = [ "ActivateRequestPayload", "ActivateResponsePayload", + "ArchiveRequestPayload", + "ArchiveResponsePayload", "CreateRequestPayload", "CreateResponsePayload", "CreateKeyPairRequestPayload", diff --git a/kmip/core/messages/payloads/archive.py b/kmip/core/messages/payloads/archive.py new file mode 100644 index 0000000..0b28021 --- /dev/null +++ b/kmip/core/messages/payloads/archive.py @@ -0,0 +1,246 @@ +# Copyright (c) 2017 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import six + +from kmip import enums +from kmip.core import primitives +from kmip.core import utils + + +class ArchiveRequestPayload(primitives.Struct): + """ + A request payload for the Archive operation. + + Attributes: + unique_identifier: The unique ID of the object to archive. + """ + + def __init__(self, unique_identifier=None): + """ + Construct an Archive request payload struct. + + Args: + unique_identifier (string): The ID of the managed object (e.g., + a public key) to archive. Optional, defaults to None. + """ + super(ArchiveRequestPayload, self).__init__( + enums.Tags.REQUEST_PAYLOAD + ) + + self._unique_identifier = None + self.unique_identifier = unique_identifier + + @property + def unique_identifier(self): + if self._unique_identifier: + return self._unique_identifier.value + else: + return None + + @unique_identifier.setter + def unique_identifier(self, value): + if value is None: + self._unique_identifier = None + elif isinstance(value, six.string_types): + self._unique_identifier = primitives.TextString( + value=value, + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + else: + raise TypeError("Unique identifier must be a string.") + + def read(self, input_stream): + """ + Read the data encoding the Archive request payload and decode it into + its constituent parts. + + Args: + input_stream (stream): A data stream containing encoded object + data, supporting a read method; usually a BytearrayStream + object. + + Raises: + ValueError: Raised if the data attribute is missing from the + encoded payload. + """ + super(ArchiveRequestPayload, self).read(input_stream) + local_stream = utils.BytearrayStream(input_stream.read(self.length)) + + if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, local_stream): + self._unique_identifier = primitives.TextString( + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + self._unique_identifier.read(local_stream) + + self.is_oversized(local_stream) + + def write(self, output_stream): + """ + Write the data encoding the Archive request payload to a stream. + + Args: + output_stream (stream): A data stream in which to encode object + data, supporting a write method; usually a BytearrayStream + object. + + Raises: + ValueError: Raised if the data attribute is not defined. + """ + local_stream = utils.BytearrayStream() + + if self._unique_identifier: + self._unique_identifier.write(local_stream) + + self.length = local_stream.length() + super(ArchiveRequestPayload, self).write(output_stream) + output_stream.write(local_stream.buffer) + + def __eq__(self, other): + if isinstance(other, ArchiveRequestPayload): + if self.unique_identifier != other.unique_identifier: + return False + else: + return True + else: + return NotImplemented + + def __ne__(self, other): + if isinstance(other, ArchiveRequestPayload): + return not (self == other) + else: + return NotImplemented + + def __repr__(self): + args = "unique_identifier='{0}'".format(self.unique_identifier) + return "ArchiveRequestPayload({0})".format(args) + + def __str__(self): + return str({ + 'unique_identifier': self.unique_identifier + }) + + +class ArchiveResponsePayload(primitives.Struct): + """ + A response payload for the Archive operation. + + Attributes: + unique_identifier: The unique ID of the object that was archived. + """ + + def __init__(self, unique_identifier=None): + """ + Construct an Archive response payload struct. + + Args: + unique_identifier (string): The ID of the managed object (e.g., + a public key) that was archived. Optional, defaults to None. + """ + super(ArchiveResponsePayload, self).__init__( + enums.Tags.RESPONSE_PAYLOAD + ) + + self._unique_identifier = None + self.unique_identifier = unique_identifier + + @property + def unique_identifier(self): + if self._unique_identifier: + return self._unique_identifier.value + else: + return None + + @unique_identifier.setter + def unique_identifier(self, value): + if value is None: + self._unique_identifier = None + elif isinstance(value, six.string_types): + self._unique_identifier = primitives.TextString( + value=value, + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + else: + raise TypeError("Unique identifier must be a string.") + + def read(self, input_stream): + """ + Read the data encoding the Archive response payload and decode it + into its constituent parts. + + Args: + input_stream (stream): A data stream containing encoded object + data, supporting a read method; usually a BytearrayStream + object. + + Raises: + ValueError: Raised if the data attribute is missing from the + encoded payload. + """ + super(ArchiveResponsePayload, self).read(input_stream) + local_stream = utils.BytearrayStream(input_stream.read(self.length)) + + if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, local_stream): + self._unique_identifier = primitives.TextString( + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + self._unique_identifier.read(local_stream) + + self.is_oversized(local_stream) + + def write(self, output_stream): + """ + Write the data encoding the Archive response payload to a stream. + + Args: + output_stream (stream): A data stream in which to encode object + data, supporting a write method; usually a BytearrayStream + object. + + Raises: + ValueError: Raised if the data attribute is not defined. + """ + local_stream = utils.BytearrayStream() + + if self._unique_identifier: + self._unique_identifier.write(local_stream) + + self.length = local_stream.length() + super(ArchiveResponsePayload, self).write(output_stream) + output_stream.write(local_stream.buffer) + + def __eq__(self, other): + if isinstance(other, ArchiveResponsePayload): + if self.unique_identifier != other.unique_identifier: + return False + else: + return True + else: + return NotImplemented + + def __ne__(self, other): + if isinstance(other, ArchiveResponsePayload): + return not (self == other) + else: + return NotImplemented + + def __repr__(self): + args = "unique_identifier='{0}'".format(self.unique_identifier) + return "ArchiveResponsePayload({0})".format(args) + + def __str__(self): + return str({ + 'unique_identifier': self.unique_identifier, + }) diff --git a/kmip/tests/unit/core/messages/payloads/test_archive.py b/kmip/tests/unit/core/messages/payloads/test_archive.py new file mode 100644 index 0000000..2073109 --- /dev/null +++ b/kmip/tests/unit/core/messages/payloads/test_archive.py @@ -0,0 +1,523 @@ +# Copyright (c) 2017 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import testtools + +from kmip.core import utils +from kmip.core.messages import payloads + + +class TestArchiveRequestPayload(testtools.TestCase): + """ + Test suite for the Archive request payload. + """ + + def setUp(self): + super(TestArchiveRequestPayload, self).setUp() + + # Encoding obtained from the KMIP 1.1 testing document, Section 10.1. + # + # This encoding matches the following set of values: + # Request Payload + # Unique Identifier - f613dba1-b557-489a-87c5-3c0ecd4294e3 + + self.full_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x00\x30' + b'\x42\x00\x94\x07\x00\x00\x00\x24' + b'\x66\x36\x31\x33\x64\x62\x61\x31\x2D\x62\x35\x35\x37\x2D\x34\x38' + b'\x39\x61\x2D\x38\x37\x63\x35\x2D\x33\x63\x30\x65\x63\x64\x34\x32' + b'\x39\x34\x65\x33\x00\x00\x00\x00' + ) + + self.empty_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x00\x00' + ) + + def tearDown(self): + super(TestArchiveRequestPayload, self).tearDown() + + def test_init(self): + """ + Test that an Archive request payload can be constructed with no + arguments. + """ + payload = payloads.ArchiveRequestPayload() + + self.assertEqual(None, payload.unique_identifier) + + def test_init_with_args(self): + """ + Test that an Archive request payload can be constructed with valid + values. + """ + payload = payloads.ArchiveRequestPayload( + unique_identifier='00000000-1111-2222-3333-444444444444' + ) + + self.assertEqual( + '00000000-1111-2222-3333-444444444444', + payload.unique_identifier + ) + + def test_invalid_unique_identifier(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the unique identifier of an Archive request payload. + """ + kwargs = {'unique_identifier': 0} + self.assertRaisesRegexp( + TypeError, + "Unique identifier must be a string.", + payloads.ArchiveRequestPayload, + **kwargs + ) + + payload = payloads.ArchiveRequestPayload() + args = (payload, 'unique_identifier', 0) + self.assertRaisesRegexp( + TypeError, + "Unique identifier must be a string.", + setattr, + *args + ) + + def test_read(self): + """ + Test that an Archive request payload can be read from a data stream. + """ + payload = payloads.ArchiveRequestPayload() + + self.assertEqual(None, payload.unique_identifier) + + payload.read(self.full_encoding) + + self.assertEqual( + 'f613dba1-b557-489a-87c5-3c0ecd4294e3', + payload.unique_identifier + ) + + def test_read_empty(self): + """ + Test that an Archive request payload can be read from an empty data + stream. + """ + payload = payloads.ArchiveRequestPayload() + + self.assertEqual(None, payload.unique_identifier) + + payload.read(self.empty_encoding) + + self.assertEqual(None, payload.unique_identifier) + + def test_write(self): + """ + Test that an Archive request payload can be written to a data stream. + """ + payload = payloads.ArchiveRequestPayload( + unique_identifier='f613dba1-b557-489a-87c5-3c0ecd4294e3' + ) + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.full_encoding), len(stream)) + self.assertEqual(str(self.full_encoding), str(stream)) + + def test_write_empty(self): + """ + Test that an empty Archive request payload can be written + to a data stream. + """ + payload = payloads.ArchiveRequestPayload() + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.empty_encoding), len(stream)) + self.assertEqual(str(self.empty_encoding), str(stream)) + + def test_equal_on_equal(self): + """ + Test that the equality operator returns True when comparing two + Archive request payloads with the same data. + """ + a = payloads.ArchiveRequestPayload() + b = payloads.ArchiveRequestPayload() + + self.assertTrue(a == b) + self.assertTrue(b == a) + + a = payloads.ArchiveRequestPayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038' + ) + b = payloads.ArchiveRequestPayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038' + ) + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_not_equal_unique_identifier(self): + """ + Test that the equality operator returns False when comparing two + Archive request payloads with different unique identifiers. + """ + a = payloads.ArchiveRequestPayload( + unique_identifier='a' + ) + b = payloads.ArchiveRequestPayload( + unique_identifier='b' + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_type_mismatch(self): + """ + Test that the equality operator returns False when comparing two + Archive request payloads with different types. + """ + a = payloads.ArchiveRequestPayload() + b = 'invalid' + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_not_equal_on_equal(self): + """ + Test that the inequality operator returns False when comparing two + Archive request payloads with the same data. + """ + a = payloads.ArchiveRequestPayload() + b = payloads.ArchiveRequestPayload() + + self.assertFalse(a != b) + self.assertFalse(b != a) + + a = payloads.ArchiveRequestPayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038' + ) + b = payloads.ArchiveRequestPayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038' + ) + + self.assertFalse(a != b) + self.assertFalse(b != a) + + def test_not_equal_on_not_equal_unique_identifier(self): + """ + Test that the inequality operator returns True when comparing two + Archive request payloads with different unique identifiers. + """ + a = payloads.ArchiveRequestPayload( + unique_identifier='a' + ) + b = payloads.ArchiveRequestPayload( + unique_identifier='b' + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_type_mismatch(self): + """ + Test that the inequality operator returns True when comparing two + Archive request payloads with different types. + """ + a = payloads.ArchiveRequestPayload() + b = 'invalid' + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_repr(self): + """ + Test that repr can be applied to an Archive request payload. + """ + payload = payloads.ArchiveRequestPayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038' + ) + expected = ( + "ArchiveRequestPayload(" + "unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038')" + ) + observed = repr(payload) + + self.assertEqual(expected, observed) + + def test_str(self): + """ + Test that str can be applied to an Archive request payload. + """ + payload = payloads.ArchiveRequestPayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038' + ) + + expected = str({ + 'unique_identifier': '49a1ca88-6bea-4fb2-b450-7e58802c3038' + }) + observed = str(payload) + + self.assertEqual(expected, observed) + + +class TestArchiveResponsePayload(testtools.TestCase): + """ + Test suite for the Archive response payload. + """ + + def setUp(self): + super(TestArchiveResponsePayload, self).setUp() + + # Encoding obtained from the KMIP 1.1 testing document, Section 10.1. + # + # This encoding matches the following set of values: + # Response Payload + # Unique Identifier - f613dba1-b557-489a-87c5-3c0ecd4294e3 + + self.full_encoding = utils.BytearrayStream( + b'\x42\x00\x7C\x01\x00\x00\x00\x30' + b'\x42\x00\x94\x07\x00\x00\x00\x24' + b'\x66\x36\x31\x33\x64\x62\x61\x31\x2D\x62\x35\x35\x37\x2D\x34\x38' + b'\x39\x61\x2D\x38\x37\x63\x35\x2D\x33\x63\x30\x65\x63\x64\x34\x32' + b'\x39\x34\x65\x33\x00\x00\x00\x00' + ) + + self.empty_encoding = utils.BytearrayStream( + b'\x42\x00\x7C\x01\x00\x00\x00\x00' + ) + + def tearDown(self): + super(TestArchiveResponsePayload, self).tearDown() + + def test_init(self): + """ + Test that an Archive response payload can be constructed with no + arguments. + """ + payload = payloads.ArchiveResponsePayload() + + self.assertEqual(None, payload.unique_identifier) + + def test_init_with_args(self): + """ + Test that an Archive response payload can be constructed with valid + values. + """ + payload = payloads.ArchiveResponsePayload( + unique_identifier='00000000-1111-2222-3333-444444444444' + ) + + self.assertEqual( + '00000000-1111-2222-3333-444444444444', + payload.unique_identifier + ) + + def test_invalid_unique_identifier(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the unique identifier of an Archive response payload. + """ + kwargs = {'unique_identifier': 0} + self.assertRaisesRegexp( + TypeError, + "Unique identifier must be a string.", + payloads.ArchiveResponsePayload, + **kwargs + ) + + payload = payloads.ArchiveResponsePayload() + args = (payload, 'unique_identifier', 0) + self.assertRaisesRegexp( + TypeError, + "Unique identifier must be a string.", + setattr, + *args + ) + + def test_read(self): + """ + Test that an Archive response payload can be read from a data stream. + """ + payload = payloads.ArchiveResponsePayload() + + self.assertEqual(None, payload.unique_identifier) + + payload.read(self.full_encoding) + + self.assertEqual( + 'f613dba1-b557-489a-87c5-3c0ecd4294e3', + payload.unique_identifier + ) + + def test_read_empty(self): + """ + Test that an Archive response payload can be read from an empty data + stream. + """ + payload = payloads.ArchiveResponsePayload() + + self.assertEqual(None, payload.unique_identifier) + + payload.read(self.empty_encoding) + + self.assertEqual(None, payload.unique_identifier) + + def test_write(self): + """ + Test that an Archive response payload can be written to a data stream. + """ + payload = payloads.ArchiveResponsePayload( + unique_identifier='f613dba1-b557-489a-87c5-3c0ecd4294e3' + ) + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.full_encoding), len(stream)) + self.assertEqual(str(self.full_encoding), str(stream)) + + def test_write_empty(self): + """ + Test that an empty Archive response payload can be written to a data + stream. + """ + payload = payloads.ArchiveResponsePayload() + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.empty_encoding), len(stream)) + self.assertEqual(str(self.empty_encoding), str(stream)) + + def test_equal_on_equal(self): + """ + Test that the equality operator returns True when comparing two + Archive response payloads with the same data. + """ + a = payloads.ArchiveResponsePayload() + b = payloads.ArchiveResponsePayload() + + self.assertTrue(a == b) + self.assertTrue(b == a) + + a = payloads.ArchiveResponsePayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038' + ) + b = payloads.ArchiveResponsePayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038' + ) + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_not_equal_unique_identifier(self): + """ + Test that the equality operator returns False when comparing two + Archive response payloads with different unique identifiers. + """ + a = payloads.ArchiveResponsePayload( + unique_identifier='a' + ) + b = payloads.ArchiveResponsePayload( + unique_identifier='b' + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_type_mismatch(self): + """ + Test that the equality operator returns False when comparing two + Archive response payloads with different types. + """ + a = payloads.ArchiveResponsePayload() + b = 'invalid' + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_not_equal_on_equal(self): + """ + Test that the inequality operator returns False when comparing two + Archive response payloads with the same data. + """ + a = payloads.ArchiveResponsePayload() + b = payloads.ArchiveResponsePayload() + + self.assertFalse(a != b) + self.assertFalse(b != a) + + a = payloads.ArchiveResponsePayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038' + ) + b = payloads.ArchiveResponsePayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038' + ) + + self.assertFalse(a != b) + self.assertFalse(b != a) + + def test_not_equal_on_not_equal_unique_identifier(self): + """ + Test that the inequality operator returns True when comparing two + Archive response payloads with different unique identifiers. + """ + a = payloads.ArchiveResponsePayload( + unique_identifier='a' + ) + b = payloads.ArchiveResponsePayload( + unique_identifier='b' + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_type_mismatch(self): + """ + Test that the inequality operator returns True when comparing two + Archive response payloads with different types. + """ + a = payloads.ArchiveResponsePayload() + b = 'invalid' + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_repr(self): + """ + Test that repr can be applied to a Archive response payload. + """ + payload = payloads.ArchiveResponsePayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038' + ) + expected = ( + "ArchiveResponsePayload(" + "unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038')" + ) + observed = repr(payload) + + self.assertEqual(expected, observed) + + def test_str(self): + """ + Test that str can be applied to a Archive response payload + """ + payload = payloads.ArchiveResponsePayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038' + ) + + expected = str({ + 'unique_identifier': '49a1ca88-6bea-4fb2-b450-7e58802c3038' + }) + observed = str(payload) + + self.assertEqual(expected, observed)