diff --git a/kmip/core/messages/payloads/__init__.py b/kmip/core/messages/payloads/__init__.py index ebe3e8b..b2a2ab5 100644 --- a/kmip/core/messages/payloads/__init__.py +++ b/kmip/core/messages/payloads/__init__.py @@ -85,6 +85,10 @@ from kmip.core.messages.payloads.query import ( QueryRequestPayload, QueryResponsePayload ) +from kmip.core.messages.payloads.recover import ( + RecoverRequestPayload, + RecoverResponsePayload +) from kmip.core.messages.payloads.register import ( RegisterRequestPayload, RegisterResponsePayload @@ -144,6 +148,8 @@ __all__ = [ "MACResponsePayload", "QueryRequestPayload", "QueryResponsePayload", + "RecoverRequestPayload", + "RecoverResponsePayload", "RegisterRequestPayload", "RegisterResponsePayload", "RekeyKeyPairRequestPayload", diff --git a/kmip/core/messages/payloads/recover.py b/kmip/core/messages/payloads/recover.py new file mode 100644 index 0000000..a6ab9a3 --- /dev/null +++ b/kmip/core/messages/payloads/recover.py @@ -0,0 +1,246 @@ +# Copyright (c) 2017 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import six + +from kmip import enums +from kmip.core import primitives +from kmip.core import utils + + +class RecoverRequestPayload(primitives.Struct): + """ + A request payload for the Recover operation. + + Attributes: + unique_identifier: The unique ID of the object to recover. + """ + + def __init__(self, unique_identifier=None): + """ + Construct a Recover request payload struct. + + Args: + unique_identifier (string): The ID of the managed object (e.g., + a public key) to recover. Optional, defaults to None. + """ + super(RecoverRequestPayload, self).__init__( + enums.Tags.REQUEST_PAYLOAD + ) + + self._unique_identifier = None + self.unique_identifier = unique_identifier + + @property + def unique_identifier(self): + if self._unique_identifier: + return self._unique_identifier.value + else: + return None + + @unique_identifier.setter + def unique_identifier(self, value): + if value is None: + self._unique_identifier = None + elif isinstance(value, six.string_types): + self._unique_identifier = primitives.TextString( + value=value, + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + else: + raise TypeError("Unique identifier must be a string.") + + def read(self, input_stream): + """ + Read the data encoding the Recover request payload and decode it into + its constituent parts. + + Args: + input_stream (stream): A data stream containing encoded object + data, supporting a read method; usually a BytearrayStream + object. + + Raises: + ValueError: Raised if the data attribute is missing from the + encoded payload. + """ + super(RecoverRequestPayload, self).read(input_stream) + local_stream = utils.BytearrayStream(input_stream.read(self.length)) + + if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, local_stream): + self._unique_identifier = primitives.TextString( + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + self._unique_identifier.read(local_stream) + + self.is_oversized(local_stream) + + def write(self, output_stream): + """ + Write the data encoding the Recover request payload to a stream. + + Args: + output_stream (stream): A data stream in which to encode object + data, supporting a write method; usually a BytearrayStream + object. + + Raises: + ValueError: Raised if the data attribute is not defined. + """ + local_stream = utils.BytearrayStream() + + if self._unique_identifier: + self._unique_identifier.write(local_stream) + + self.length = local_stream.length() + super(RecoverRequestPayload, self).write(output_stream) + output_stream.write(local_stream.buffer) + + def __eq__(self, other): + if isinstance(other, RecoverRequestPayload): + if self.unique_identifier != other.unique_identifier: + return False + else: + return True + else: + return NotImplemented + + def __ne__(self, other): + if isinstance(other, RecoverRequestPayload): + return not (self == other) + else: + return NotImplemented + + def __repr__(self): + args = "unique_identifier='{0}'".format(self.unique_identifier) + return "RecoverRequestPayload({0})".format(args) + + def __str__(self): + return str({ + 'unique_identifier': self.unique_identifier + }) + + +class RecoverResponsePayload(primitives.Struct): + """ + A response payload for the Recover operation. + + Attributes: + unique_identifier: The unique ID of the object that was recovered. + """ + + def __init__(self, unique_identifier=None): + """ + Construct a Recover response payload struct. + + Args: + unique_identifier (string): The ID of the managed object (e.g., + a public key) that was recovered. Optional, defaults to None. + """ + super(RecoverResponsePayload, self).__init__( + enums.Tags.RESPONSE_PAYLOAD + ) + + self._unique_identifier = None + self.unique_identifier = unique_identifier + + @property + def unique_identifier(self): + if self._unique_identifier: + return self._unique_identifier.value + else: + return None + + @unique_identifier.setter + def unique_identifier(self, value): + if value is None: + self._unique_identifier = None + elif isinstance(value, six.string_types): + self._unique_identifier = primitives.TextString( + value=value, + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + else: + raise TypeError("Unique identifier must be a string.") + + def read(self, input_stream): + """ + Read the data encoding the Recover response payload and decode it + into its constituent parts. + + Args: + input_stream (stream): A data stream containing encoded object + data, supporting a read method; usually a BytearrayStream + object. + + Raises: + ValueError: Raised if the data attribute is missing from the + encoded payload. + """ + super(RecoverResponsePayload, self).read(input_stream) + local_stream = utils.BytearrayStream(input_stream.read(self.length)) + + if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, local_stream): + self._unique_identifier = primitives.TextString( + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + self._unique_identifier.read(local_stream) + + self.is_oversized(local_stream) + + def write(self, output_stream): + """ + Write the data encoding the Recover response payload to a stream. + + Args: + output_stream (stream): A data stream in which to encode object + data, supporting a write method; usually a BytearrayStream + object. + + Raises: + ValueError: Raised if the data attribute is not defined. + """ + local_stream = utils.BytearrayStream() + + if self._unique_identifier: + self._unique_identifier.write(local_stream) + + self.length = local_stream.length() + super(RecoverResponsePayload, self).write(output_stream) + output_stream.write(local_stream.buffer) + + def __eq__(self, other): + if isinstance(other, RecoverResponsePayload): + if self.unique_identifier != other.unique_identifier: + return False + else: + return True + else: + return NotImplemented + + def __ne__(self, other): + if isinstance(other, RecoverResponsePayload): + return not (self == other) + else: + return NotImplemented + + def __repr__(self): + args = "unique_identifier='{0}'".format(self.unique_identifier) + return "RecoverResponsePayload({0})".format(args) + + def __str__(self): + return str({ + 'unique_identifier': self.unique_identifier, + }) diff --git a/kmip/tests/unit/core/messages/payloads/test_recover.py b/kmip/tests/unit/core/messages/payloads/test_recover.py new file mode 100644 index 0000000..04e58e4 --- /dev/null +++ b/kmip/tests/unit/core/messages/payloads/test_recover.py @@ -0,0 +1,523 @@ +# Copyright (c) 2017 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import testtools + +from kmip.core import utils +from kmip.core.messages import payloads + + +class TestRecoverRequestPayload(testtools.TestCase): + """ + Test suite for the Recover request payload. + """ + + def setUp(self): + super(TestRecoverRequestPayload, self).setUp() + + # Encoding obtained from the KMIP 1.1 testing document, Section 10.1. + # + # This encoding matches the following set of values: + # Request Payload + # Unique Identifier - f613dba1-b557-489a-87c5-3c0ecd4294e3 + + self.full_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x00\x30' + b'\x42\x00\x94\x07\x00\x00\x00\x24' + b'\x66\x36\x31\x33\x64\x62\x61\x31\x2D\x62\x35\x35\x37\x2D\x34\x38' + b'\x39\x61\x2D\x38\x37\x63\x35\x2D\x33\x63\x30\x65\x63\x64\x34\x32' + b'\x39\x34\x65\x33\x00\x00\x00\x00' + ) + + self.empty_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x00\x00' + ) + + def tearDown(self): + super(TestRecoverRequestPayload, self).tearDown() + + def test_init(self): + """ + Test that a Recover request payload can be constructed with no + arguments. + """ + payload = payloads.RecoverRequestPayload() + + self.assertEqual(None, payload.unique_identifier) + + def test_init_with_args(self): + """ + Test that a Recover request payload can be constructed with valid + values. + """ + payload = payloads.RecoverRequestPayload( + unique_identifier='00000000-1111-2222-3333-444444444444' + ) + + self.assertEqual( + '00000000-1111-2222-3333-444444444444', + payload.unique_identifier + ) + + def test_invalid_unique_identifier(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the unique identifier of a Recover request payload. + """ + kwargs = {'unique_identifier': 0} + self.assertRaisesRegexp( + TypeError, + "Unique identifier must be a string.", + payloads.RecoverRequestPayload, + **kwargs + ) + + payload = payloads.RecoverRequestPayload() + args = (payload, 'unique_identifier', 0) + self.assertRaisesRegexp( + TypeError, + "Unique identifier must be a string.", + setattr, + *args + ) + + def test_read(self): + """ + Test that a Recover request payload can be read from a data stream. + """ + payload = payloads.RecoverRequestPayload() + + self.assertEqual(None, payload.unique_identifier) + + payload.read(self.full_encoding) + + self.assertEqual( + 'f613dba1-b557-489a-87c5-3c0ecd4294e3', + payload.unique_identifier + ) + + def test_read_empty(self): + """ + Test that a Recover request payload can be read from an empty data + stream. + """ + payload = payloads.RecoverRequestPayload() + + self.assertEqual(None, payload.unique_identifier) + + payload.read(self.empty_encoding) + + self.assertEqual(None, payload.unique_identifier) + + def test_write(self): + """ + Test that a Recover request payload can be written to a data stream. + """ + payload = payloads.RecoverRequestPayload( + unique_identifier='f613dba1-b557-489a-87c5-3c0ecd4294e3' + ) + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.full_encoding), len(stream)) + self.assertEqual(str(self.full_encoding), str(stream)) + + def test_write_empty(self): + """ + Test that an empty Recover request payload can be written + to a data stream. + """ + payload = payloads.RecoverRequestPayload() + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.empty_encoding), len(stream)) + self.assertEqual(str(self.empty_encoding), str(stream)) + + def test_equal_on_equal(self): + """ + Test that the equality operator returns True when comparing two + Recover request payloads with the same data. + """ + a = payloads.RecoverRequestPayload() + b = payloads.RecoverRequestPayload() + + self.assertTrue(a == b) + self.assertTrue(b == a) + + a = payloads.RecoverRequestPayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038' + ) + b = payloads.RecoverRequestPayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038' + ) + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_not_equal_unique_identifier(self): + """ + Test that the equality operator returns False when comparing two + Recover request payloads with different unique identifiers. + """ + a = payloads.RecoverRequestPayload( + unique_identifier='a' + ) + b = payloads.RecoverRequestPayload( + unique_identifier='b' + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_type_mismatch(self): + """ + Test that the equality operator returns False when comparing two + Recover request payloads with different types. + """ + a = payloads.RecoverRequestPayload() + b = 'invalid' + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_not_equal_on_equal(self): + """ + Test that the inequality operator returns False when comparing two + Recover request payloads with the same data. + """ + a = payloads.RecoverRequestPayload() + b = payloads.RecoverRequestPayload() + + self.assertFalse(a != b) + self.assertFalse(b != a) + + a = payloads.RecoverRequestPayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038' + ) + b = payloads.RecoverRequestPayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038' + ) + + self.assertFalse(a != b) + self.assertFalse(b != a) + + def test_not_equal_on_not_equal_unique_identifier(self): + """ + Test that the inequality operator returns True when comparing two + Recover request payloads with different unique identifiers. + """ + a = payloads.RecoverRequestPayload( + unique_identifier='a' + ) + b = payloads.RecoverRequestPayload( + unique_identifier='b' + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_type_mismatch(self): + """ + Test that the inequality operator returns True when comparing two + Recover request payloads with different types. + """ + a = payloads.RecoverRequestPayload() + b = 'invalid' + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_repr(self): + """ + Test that repr can be applied to a Recover request payload. + """ + payload = payloads.RecoverRequestPayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038' + ) + expected = ( + "RecoverRequestPayload(" + "unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038')" + ) + observed = repr(payload) + + self.assertEqual(expected, observed) + + def test_str(self): + """ + Test that str can be applied to a Recover request payload. + """ + payload = payloads.RecoverRequestPayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038' + ) + + expected = str({ + 'unique_identifier': '49a1ca88-6bea-4fb2-b450-7e58802c3038' + }) + observed = str(payload) + + self.assertEqual(expected, observed) + + +class TestRecoverResponsePayload(testtools.TestCase): + """ + Test suite for the Recover response payload. + """ + + def setUp(self): + super(TestRecoverResponsePayload, self).setUp() + + # Encoding obtained from the KMIP 1.1 testing document, Section 10.1. + # + # This encoding matches the following set of values: + # Response Payload + # Unique Identifier - f613dba1-b557-489a-87c5-3c0ecd4294e3 + + self.full_encoding = utils.BytearrayStream( + b'\x42\x00\x7C\x01\x00\x00\x00\x30' + b'\x42\x00\x94\x07\x00\x00\x00\x24' + b'\x66\x36\x31\x33\x64\x62\x61\x31\x2D\x62\x35\x35\x37\x2D\x34\x38' + b'\x39\x61\x2D\x38\x37\x63\x35\x2D\x33\x63\x30\x65\x63\x64\x34\x32' + b'\x39\x34\x65\x33\x00\x00\x00\x00' + ) + + self.empty_encoding = utils.BytearrayStream( + b'\x42\x00\x7C\x01\x00\x00\x00\x00' + ) + + def tearDown(self): + super(TestRecoverResponsePayload, self).tearDown() + + def test_init(self): + """ + Test that a Recover response payload can be constructed with no + arguments. + """ + payload = payloads.RecoverResponsePayload() + + self.assertEqual(None, payload.unique_identifier) + + def test_init_with_args(self): + """ + Test that a Recover response payload can be constructed with valid + values. + """ + payload = payloads.RecoverResponsePayload( + unique_identifier='00000000-1111-2222-3333-444444444444' + ) + + self.assertEqual( + '00000000-1111-2222-3333-444444444444', + payload.unique_identifier + ) + + def test_invalid_unique_identifier(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the unique identifier of a Recover response payload. + """ + kwargs = {'unique_identifier': 0} + self.assertRaisesRegexp( + TypeError, + "Unique identifier must be a string.", + payloads.RecoverResponsePayload, + **kwargs + ) + + payload = payloads.RecoverResponsePayload() + args = (payload, 'unique_identifier', 0) + self.assertRaisesRegexp( + TypeError, + "Unique identifier must be a string.", + setattr, + *args + ) + + def test_read(self): + """ + Test that a Recover response payload can be read from a data stream. + """ + payload = payloads.RecoverResponsePayload() + + self.assertEqual(None, payload.unique_identifier) + + payload.read(self.full_encoding) + + self.assertEqual( + 'f613dba1-b557-489a-87c5-3c0ecd4294e3', + payload.unique_identifier + ) + + def test_read_empty(self): + """ + Test that a Recover response payload can be read from an empty data + stream. + """ + payload = payloads.RecoverResponsePayload() + + self.assertEqual(None, payload.unique_identifier) + + payload.read(self.empty_encoding) + + self.assertEqual(None, payload.unique_identifier) + + def test_write(self): + """ + Test that a Recover response payload can be written to a data stream. + """ + payload = payloads.RecoverResponsePayload( + unique_identifier='f613dba1-b557-489a-87c5-3c0ecd4294e3' + ) + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.full_encoding), len(stream)) + self.assertEqual(str(self.full_encoding), str(stream)) + + def test_write_empty(self): + """ + Test that an empty Recover response payload can be written to a data + stream. + """ + payload = payloads.RecoverResponsePayload() + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.empty_encoding), len(stream)) + self.assertEqual(str(self.empty_encoding), str(stream)) + + def test_equal_on_equal(self): + """ + Test that the equality operator returns True when comparing two + Recover response payloads with the same data. + """ + a = payloads.RecoverResponsePayload() + b = payloads.RecoverResponsePayload() + + self.assertTrue(a == b) + self.assertTrue(b == a) + + a = payloads.RecoverResponsePayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038' + ) + b = payloads.RecoverResponsePayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038' + ) + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_not_equal_unique_identifier(self): + """ + Test that the equality operator returns False when comparing two + Recover response payloads with different unique identifiers. + """ + a = payloads.RecoverResponsePayload( + unique_identifier='a' + ) + b = payloads.RecoverResponsePayload( + unique_identifier='b' + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_type_mismatch(self): + """ + Test that the equality operator returns False when comparing two + Recover response payloads with different types. + """ + a = payloads.RecoverResponsePayload() + b = 'invalid' + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_not_equal_on_equal(self): + """ + Test that the inequality operator returns False when comparing two + Recover response payloads with the same data. + """ + a = payloads.RecoverResponsePayload() + b = payloads.RecoverResponsePayload() + + self.assertFalse(a != b) + self.assertFalse(b != a) + + a = payloads.RecoverResponsePayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038' + ) + b = payloads.RecoverResponsePayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038' + ) + + self.assertFalse(a != b) + self.assertFalse(b != a) + + def test_not_equal_on_not_equal_unique_identifier(self): + """ + Test that the inequality operator returns True when comparing two + Recover response payloads with different unique identifiers. + """ + a = payloads.RecoverResponsePayload( + unique_identifier='a' + ) + b = payloads.RecoverResponsePayload( + unique_identifier='b' + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_type_mismatch(self): + """ + Test that the inequality operator returns True when comparing two + Recover response payloads with different types. + """ + a = payloads.RecoverResponsePayload() + b = 'invalid' + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_repr(self): + """ + Test that repr can be applied to a Recover response payload. + """ + payload = payloads.RecoverResponsePayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038' + ) + expected = ( + "RecoverResponsePayload(" + "unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038')" + ) + observed = repr(payload) + + self.assertEqual(expected, observed) + + def test_str(self): + """ + Test that str can be applied to a Recover response payload + """ + payload = payloads.RecoverResponsePayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038' + ) + + expected = str({ + 'unique_identifier': '49a1ca88-6bea-4fb2-b450-7e58802c3038' + }) + observed = str(payload) + + self.assertEqual(expected, observed)