From b84af4f88d9678a50ddc596fa22b56800160e944 Mon Sep 17 00:00:00 2001 From: Peter Hamilton Date: Tue, 5 Dec 2017 03:27:45 -0500 Subject: [PATCH] Add payload for the Poll operation This change adds the request payload for the Poll operation. A unit test suite for the payload is included. The payload can be imported directly from the payloads package. --- kmip/core/messages/payloads/__init__.py | 4 + kmip/core/messages/payloads/poll.py | 142 ++++++++++ .../unit/core/messages/payloads/test_poll.py | 264 ++++++++++++++++++ 3 files changed, 410 insertions(+) create mode 100644 kmip/core/messages/payloads/poll.py create mode 100644 kmip/tests/unit/core/messages/payloads/test_poll.py diff --git a/kmip/core/messages/payloads/__init__.py b/kmip/core/messages/payloads/__init__.py index ebd92d4..0d3e72c 100644 --- a/kmip/core/messages/payloads/__init__.py +++ b/kmip/core/messages/payloads/__init__.py @@ -85,6 +85,9 @@ from kmip.core.messages.payloads.obtain_lease import ( ObtainLeaseRequestPayload, ObtainLeaseResponsePayload ) +from kmip.core.messages.payloads.poll import ( + PollRequestPayload +) from kmip.core.messages.payloads.query import ( QueryRequestPayload, QueryResponsePayload @@ -152,6 +155,7 @@ __all__ = [ "MACResponsePayload", "ObtainLeaseRequestPayload", "ObtainLeaseResponsePayload", + "PollRequestPayload", "QueryRequestPayload", "QueryResponsePayload", "RecoverRequestPayload", diff --git a/kmip/core/messages/payloads/poll.py b/kmip/core/messages/payloads/poll.py new file mode 100644 index 0000000..44b7d3e --- /dev/null +++ b/kmip/core/messages/payloads/poll.py @@ -0,0 +1,142 @@ +# Copyright (c) 2017 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import six + +from kmip import enums +from kmip.core import primitives +from kmip.core import utils + + +class PollRequestPayload(primitives.Struct): + """ + A request payload for the Poll operation. + + Attributes: + asynchronous_correlation_value: The unique ID, in bytes, of the + operation to poll. + """ + + def __init__(self, asynchronous_correlation_value=None): + """ + Construct a Poll request payload struct. + + Args: + asynchronous_correlation_value (bytes): The ID of a pending + operation to poll the status of, in bytes. Optional, defaults + to None. + """ + super(PollRequestPayload, self).__init__( + enums.Tags.REQUEST_PAYLOAD + ) + + self._asynchronous_correlation_value = None + self.asynchronous_correlation_value = asynchronous_correlation_value + + @property + def asynchronous_correlation_value(self): + if self._asynchronous_correlation_value: + return self._asynchronous_correlation_value.value + else: + return None + + @asynchronous_correlation_value.setter + def asynchronous_correlation_value(self, value): + if value is None: + self._asynchronous_correlation_value = None + elif isinstance(value, six.binary_type): + self._asynchronous_correlation_value = primitives.ByteString( + value=value, + tag=enums.Tags.ASYNCHRONOUS_CORRELATION_VALUE + ) + else: + raise TypeError("Asynchronous correlation value must be bytes.") + + def read(self, input_stream): + """ + Read the data encoding the Poll request payload and decode it into + its constituent parts. + + Args: + input_stream (stream): A data stream containing encoded object + data, supporting a read method; usually a BytearrayStream + object. + + Raises: + ValueError: Raised if the data attribute is missing from the + encoded payload. + """ + super(PollRequestPayload, self).read(input_stream) + local_stream = utils.BytearrayStream(input_stream.read(self.length)) + + if self.is_tag_next( + enums.Tags.ASYNCHRONOUS_CORRELATION_VALUE, + local_stream + ): + self._asynchronous_correlation_value = primitives.ByteString( + tag=enums.Tags.ASYNCHRONOUS_CORRELATION_VALUE + ) + self._asynchronous_correlation_value.read(local_stream) + + self.is_oversized(local_stream) + + def write(self, output_stream): + """ + Write the data encoding the Poll request payload to a stream. + + Args: + output_stream (stream): A data stream in which to encode object + data, supporting a write method; usually a BytearrayStream + object. + + Raises: + ValueError: Raised if the data attribute is not defined. + """ + local_stream = utils.BytearrayStream() + + if self._asynchronous_correlation_value: + self._asynchronous_correlation_value.write(local_stream) + + self.length = local_stream.length() + super(PollRequestPayload, self).write(output_stream) + output_stream.write(local_stream.buffer) + + def __eq__(self, other): + if isinstance(other, PollRequestPayload): + if self.asynchronous_correlation_value != \ + other.asynchronous_correlation_value: + return False + else: + return True + else: + return NotImplemented + + def __ne__(self, other): + if isinstance(other, PollRequestPayload): + return not (self == other) + else: + return NotImplemented + + def __repr__(self): + args = "asynchronous_correlation_value={0}".format( + self.asynchronous_correlation_value + ) + return "PollRequestPayload({0})".format(args) + + def __str__(self): + return str({ + 'asynchronous_correlation_value': + self.asynchronous_correlation_value + }) diff --git a/kmip/tests/unit/core/messages/payloads/test_poll.py b/kmip/tests/unit/core/messages/payloads/test_poll.py new file mode 100644 index 0000000..da92852 --- /dev/null +++ b/kmip/tests/unit/core/messages/payloads/test_poll.py @@ -0,0 +1,264 @@ +# Copyright (c) 2017 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import testtools + +from kmip.core import utils +from kmip.core.messages import payloads + + +class TestPollRequestPayload(testtools.TestCase): + """ + Test suite for the Poll request payload. + """ + + def setUp(self): + super(TestPollRequestPayload, self).setUp() + + # Encoding obtained from the KMIP 1.1 testing document, Section 10.1. + # + # This encoding matches the following set of values: + # Request Payload + # Asynchronous Correlation Value - 0xE7125DE85B3C90A6 + + self.full_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x00\x10' + b'\x42\x00\x06\x08\x00\x00\x00\x08\xE7\x12\x5D\xE8\x5B\x3C\x90\xA6' + ) + + self.empty_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x00\x00' + ) + + def tearDown(self): + super(TestPollRequestPayload, self).tearDown() + + def test_init(self): + """ + Test that a Poll request payload can be constructed with no arguments. + """ + payload = payloads.PollRequestPayload() + + self.assertEqual(None, payload.asynchronous_correlation_value) + + def test_init_with_args(self): + """ + Test that an Poll request payload can be constructed with valid + values. + """ + payload = payloads.PollRequestPayload( + asynchronous_correlation_value=b'\x01' + ) + + self.assertEqual(b'\x01', payload.asynchronous_correlation_value) + + def test_invalid_asynchronous_correlation_value(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the asynchronous correlation value of an Poll request payload. + """ + kwargs = {'asynchronous_correlation_value': 0} + self.assertRaisesRegexp( + TypeError, + "Asynchronous correlation value must be bytes.", + payloads.PollRequestPayload, + **kwargs + ) + + payload = payloads.PollRequestPayload() + args = (payload, 'asynchronous_correlation_value', 0) + self.assertRaisesRegexp( + TypeError, + "Asynchronous correlation value must be bytes.", + setattr, + *args + ) + + def test_read(self): + """ + Test that a Poll request payload can be read from a data stream. + """ + payload = payloads.PollRequestPayload() + + self.assertEqual(None, payload.asynchronous_correlation_value) + + payload.read(self.full_encoding) + + self.assertEqual( + b'\xE7\x12\x5D\xE8\x5B\x3C\x90\xA6', + payload.asynchronous_correlation_value + ) + + def test_read_empty(self): + """ + Test that an Poll request payload can be read from an empty data + stream. + """ + payload = payloads.PollRequestPayload() + + self.assertEqual(None, payload.asynchronous_correlation_value) + + payload.read(self.empty_encoding) + + self.assertEqual(None, payload.asynchronous_correlation_value) + + def test_write(self): + """ + Test that a Poll request payload can be written to a data stream. + """ + payload = payloads.PollRequestPayload( + asynchronous_correlation_value=b'\xE7\x12\x5D\xE8\x5B\x3C\x90\xA6' + ) + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.full_encoding), len(stream)) + self.assertEqual(str(self.full_encoding), str(stream)) + + def test_write_empty(self): + """ + Test that an empty Poll request payload can be written to a data + stream. + """ + payload = payloads.PollRequestPayload() + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.empty_encoding), len(stream)) + self.assertEqual(str(self.empty_encoding), str(stream)) + + def test_equal_on_equal(self): + """ + Test that the equality operator returns True when comparing two Poll + request payloads with the same data. + """ + a = payloads.PollRequestPayload() + b = payloads.PollRequestPayload() + + self.assertTrue(a == b) + self.assertTrue(b == a) + + a = payloads.PollRequestPayload( + asynchronous_correlation_value=b'\x49\xa1\xca\x88' + ) + b = payloads.PollRequestPayload( + asynchronous_correlation_value=b'\x49\xa1\xca\x88' + ) + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_not_equal_asynchronous_correlation_value(self): + """ + Test that the equality operator returns False when comparing two Poll + request payloads with different asynchronous correlation values. + """ + a = payloads.PollRequestPayload( + asynchronous_correlation_value=b'\xaa' + ) + b = payloads.PollRequestPayload( + asynchronous_correlation_value=b'\xbb' + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_type_mismatch(self): + """ + Test that the equality operator returns False when comparing two Poll + request payloads with different types. + """ + a = payloads.PollRequestPayload() + b = 'invalid' + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_not_equal_on_equal(self): + """ + Test that the inequality operator returns False when comparing two Poll + request payloads with the same data. + """ + a = payloads.PollRequestPayload() + b = payloads.PollRequestPayload() + + self.assertFalse(a != b) + self.assertFalse(b != a) + + a = payloads.PollRequestPayload( + asynchronous_correlation_value=b'\x49\xa1\xca\x88' + ) + b = payloads.PollRequestPayload( + asynchronous_correlation_value=b'\x49\xa1\xca\x88' + ) + + self.assertFalse(a != b) + self.assertFalse(b != a) + + def test_not_equal_on_not_equal_asynchronous_correlation_value(self): + """ + Test that the inequality operator returns True when comparing two Poll + request payloads with different asynchronous correlation values. + """ + a = payloads.PollRequestPayload( + asynchronous_correlation_value=b'\xaa' + ) + b = payloads.PollRequestPayload( + asynchronous_correlation_value=b'\xbb' + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_type_mismatch(self): + """ + Test that the inequality operator returns True when comparing two Poll + request payloads with different types. + """ + a = payloads.PollRequestPayload() + b = 'invalid' + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_repr(self): + """ + Test that repr can be applied to a Poll request payload. + """ + payload = payloads.PollRequestPayload( + asynchronous_correlation_value=b'\xaa' + ) + expected = ( + "PollRequestPayload(" + "asynchronous_correlation_value=" + str(b'\xaa') + ")" + ) + observed = repr(payload) + + self.assertEqual(expected, observed) + + def test_str(self): + """ + Test that str can be applied to a Poll request payload. + """ + payload = payloads.PollRequestPayload( + asynchronous_correlation_value=b'\xaa' + ) + + expected = str({ + 'asynchronous_correlation_value': b'\xaa' + }) + observed = str(payload) + + self.assertEqual(expected, observed)