From 53cbab73965b3907619b1a0ac6eacff7f9ea34b0 Mon Sep 17 00:00:00 2001 From: Peter Hamilton Date: Wed, 29 Nov 2017 00:40:07 -0500 Subject: [PATCH] Add payloads for the Check operation This change adds the request and response payloads needed for the Check operation. Unit tests have been added to cover the new payload structures. --- kmip/core/messages/payloads/__init__.py | 6 + kmip/core/messages/payloads/check.py | 484 ++++++++ .../unit/core/messages/payloads/test_check.py | 1043 +++++++++++++++++ 3 files changed, 1533 insertions(+) create mode 100644 kmip/core/messages/payloads/check.py create mode 100644 kmip/tests/unit/core/messages/payloads/test_check.py diff --git a/kmip/core/messages/payloads/__init__.py b/kmip/core/messages/payloads/__init__.py index 55c3981..ebe3e8b 100644 --- a/kmip/core/messages/payloads/__init__.py +++ b/kmip/core/messages/payloads/__init__.py @@ -25,6 +25,10 @@ from kmip.core.messages.payloads.cancel import ( CancelRequestPayload, CancelResponsePayload ) +from kmip.core.messages.payloads.check import ( + CheckRequestPayload, + CheckResponsePayload +) from kmip.core.messages.payloads.create import ( CreateRequestPayload, CreateResponsePayload @@ -110,6 +114,8 @@ __all__ = [ "ArchiveResponsePayload", "CancelRequestPayload", "CancelResponsePayload", + "CheckRequestPayload", + "CheckResponsePayload", "CreateRequestPayload", "CreateResponsePayload", "CreateKeyPairRequestPayload", diff --git a/kmip/core/messages/payloads/check.py b/kmip/core/messages/payloads/check.py new file mode 100644 index 0000000..7dd19dd --- /dev/null +++ b/kmip/core/messages/payloads/check.py @@ -0,0 +1,484 @@ +# Copyright (c) 2017 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import six + +from kmip import enums +from kmip.core import primitives +from kmip.core import utils + + +class CheckRequestPayload(primitives.Struct): + """ + A request payload for the Check operation. + + Attributes: + unique_identifier: The unique ID of the object to be checked. + usage_limits_count: The number of usage limits units that should be + available on the checked object. + cryptographic_usage_mask: The numeric representation of a set of usage + masks that should be set on the checked object. + lease_time: The date in seconds since the epoch that a lease should be + available for on the checked object. + """ + + def __init__(self, + unique_identifier=None, + usage_limits_count=None, + cryptographic_usage_mask=None, + lease_time=None): + """ + Construct a Check request payload struct. + + Args: + unique_identifier (string): The ID of the managed object (e.g., + a public key) to be checked. Optional, defaults to None. + usage_limits_count (int): The number of usage limits units that + should be available on the checked object. Optional, defaults + to None. + cryptographic_usage_mask (int): The numeric representation of a + set of usage masks that should be set on the checked object. + Optional, defaults to None. + lease_time (int): The date in seconds since the epoch that a + lease should be available for on the checked object. Optional, + defaults to None. + """ + super(CheckRequestPayload, self).__init__(enums.Tags.REQUEST_PAYLOAD) + + self._unique_identifier = None + self._usage_limits_count = None + self._cryptographic_usage_mask = None + self._lease_time = None + + self.unique_identifier = unique_identifier + self.usage_limits_count = usage_limits_count + self.cryptographic_usage_mask = cryptographic_usage_mask + self.lease_time = lease_time + + @property + def unique_identifier(self): + if self._unique_identifier: + return self._unique_identifier.value + else: + return None + + @unique_identifier.setter + def unique_identifier(self, value): + if value is None: + self._unique_identifier = None + elif isinstance(value, six.string_types): + self._unique_identifier = primitives.TextString( + value=value, + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + else: + raise TypeError("Unique identifier must be a string.") + + @property + def usage_limits_count(self): + if self._usage_limits_count: + return self._usage_limits_count.value + else: + return None + + @usage_limits_count.setter + def usage_limits_count(self, value): + if value is None: + self._usage_limits_count = None + elif isinstance(value, six.integer_types): + self._usage_limits_count = primitives.LongInteger( + value=value, + tag=enums.Tags.USAGE_LIMITS_COUNT + ) + else: + raise TypeError("Usage limits count must be an integer.") + + @property + def cryptographic_usage_mask(self): + if self._cryptographic_usage_mask: + return self._cryptographic_usage_mask.value + else: + return None + + @cryptographic_usage_mask.setter + def cryptographic_usage_mask(self, value): + if value is None: + self._cryptographic_usage_mask = None + elif isinstance(value, six.integer_types): + self._cryptographic_usage_mask = primitives.Integer( + value=value, + tag=enums.Tags.CRYPTOGRAPHIC_USAGE_MASK + ) + else: + raise TypeError("Cryptographic usage mask must be an integer.") + + @property + def lease_time(self): + if self._lease_time: + return self._lease_time.value + else: + return None + + @lease_time.setter + def lease_time(self, value): + if value is None: + self._lease_time = None + elif isinstance(value, six.integer_types): + self._lease_time = primitives.Interval( + value=value, + tag=enums.Tags.LEASE_TIME + ) + else: + raise TypeError("Lease time must be an integer.") + + def read(self, input_stream): + """ + Read the data encoding the Check request payload and decode it into + its constituent parts. + + Args: + input_stream (stream): A data stream containing encoded object + data, supporting a read method; usually a BytearrayStream + object. + + Raises: + ValueError: Raised if the data attribute is missing from the + encoded payload. + """ + super(CheckRequestPayload, self).read(input_stream) + local_stream = utils.BytearrayStream(input_stream.read(self.length)) + + if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, local_stream): + self._unique_identifier = primitives.TextString( + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + self._unique_identifier.read(local_stream) + if self.is_tag_next(enums.Tags.USAGE_LIMITS_COUNT, local_stream): + self._usage_limits_count = primitives.LongInteger( + tag=enums.Tags.USAGE_LIMITS_COUNT + ) + self._usage_limits_count.read(local_stream) + if self.is_tag_next(enums.Tags.CRYPTOGRAPHIC_USAGE_MASK, local_stream): + self._cryptographic_usage_mask = primitives.Integer( + tag=enums.Tags.CRYPTOGRAPHIC_USAGE_MASK + ) + self._cryptographic_usage_mask.read(local_stream) + if self.is_tag_next(enums.Tags.LEASE_TIME, local_stream): + self._lease_time = primitives.Interval( + tag=enums.Tags.LEASE_TIME + ) + self._lease_time.read(local_stream) + + self.is_oversized(local_stream) + + def write(self, output_stream): + """ + Write the data encoding the Check request payload to a stream. + + Args: + output_stream (stream): A data stream in which to encode object + data, supporting a write method; usually a BytearrayStream + object. + + Raises: + ValueError: Raised if the data attribute is not defined. + """ + local_stream = utils.BytearrayStream() + + if self._unique_identifier: + self._unique_identifier.write(local_stream) + if self._usage_limits_count: + self._usage_limits_count.write(local_stream) + if self._cryptographic_usage_mask: + self._cryptographic_usage_mask.write(local_stream) + if self._lease_time: + self._lease_time.write(local_stream) + + self.length = local_stream.length() + super(CheckRequestPayload, self).write(output_stream) + output_stream.write(local_stream.buffer) + + def __eq__(self, other): + if isinstance(other, CheckRequestPayload): + if self.unique_identifier != other.unique_identifier: + return False + elif self.usage_limits_count != other.usage_limits_count: + return False + elif self.cryptographic_usage_mask != \ + other.cryptographic_usage_mask: + return False + elif self.lease_time != other.lease_time: + return False + else: + return True + else: + return NotImplemented + + def __ne__(self, other): + if isinstance(other, CheckRequestPayload): + return not (self == other) + else: + return NotImplemented + + def __repr__(self): + args = ", ".join([ + "unique_identifier='{0}'".format(self.unique_identifier), + "usage_limits_count={0}".format(self.usage_limits_count), + "cryptographic_usage_mask={0}".format( + self.cryptographic_usage_mask + ), + "lease_time={0}".format(self.lease_time) + ]) + return "CheckRequestPayload({0})".format(args) + + def __str__(self): + return str({ + 'unique_identifier': self.unique_identifier, + 'usage_limits_count': self.usage_limits_count, + 'cryptographic_usage_mask': self.cryptographic_usage_mask, + 'lease_time': self.lease_time + }) + + +class CheckResponsePayload(primitives.Struct): + """ + A response payload for the Check operation. + + Attributes: + unique_identifier: The unique ID of the object that was checked. + usage_limits_count: The number of usage limits units that should be + available on the checked object. + cryptographic_usage_mask: The numeric representation of a set of usage + masks that should be set on the checked object. + lease_time: The date in seconds since the epoch that a lease should be + available for on the checked object. + """ + + def __init__(self, + unique_identifier=None, + usage_limits_count=None, + cryptographic_usage_mask=None, + lease_time=None): + """ + Construct a Check response payload struct. + + Args: + unique_identifier (string): The ID of the managed object (e.g., + a public key) that was checked. Optional, defaults to None. + usage_limits_count (int): The number of usage limits units that + should be available on the checked object. Optional, defaults + to None. + cryptographic_usage_mask (int): The numeric representation of a + set of usage masks that should be set on the checked object. + Optional, defaults to None. + lease_time (int): The date in seconds since the epoch that a + lease should be available for on the checked object. Optional, + defaults to None. + """ + super(CheckResponsePayload, self).__init__(enums.Tags.RESPONSE_PAYLOAD) + + self._unique_identifier = None + self._usage_limits_count = None + self._cryptographic_usage_mask = None + self._lease_time = None + + self.unique_identifier = unique_identifier + self.usage_limits_count = usage_limits_count + self.cryptographic_usage_mask = cryptographic_usage_mask + self.lease_time = lease_time + + @property + def unique_identifier(self): + if self._unique_identifier: + return self._unique_identifier.value + else: + return None + + @unique_identifier.setter + def unique_identifier(self, value): + if value is None: + self._unique_identifier = None + elif isinstance(value, six.string_types): + self._unique_identifier = primitives.TextString( + value=value, + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + else: + raise TypeError("Unique identifier must be a string.") + + @property + def usage_limits_count(self): + if self._usage_limits_count: + return self._usage_limits_count.value + else: + return None + + @usage_limits_count.setter + def usage_limits_count(self, value): + if value is None: + self._usage_limits_count = None + elif isinstance(value, six.integer_types): + self._usage_limits_count = primitives.LongInteger( + value=value, + tag=enums.Tags.USAGE_LIMITS_COUNT + ) + else: + raise TypeError("Usage limits count must be an integer.") + + @property + def cryptographic_usage_mask(self): + if self._cryptographic_usage_mask: + return self._cryptographic_usage_mask.value + else: + return None + + @cryptographic_usage_mask.setter + def cryptographic_usage_mask(self, value): + if value is None: + self._cryptographic_usage_mask = None + elif isinstance(value, six.integer_types): + self._cryptographic_usage_mask = primitives.Integer( + value=value, + tag=enums.Tags.CRYPTOGRAPHIC_USAGE_MASK + ) + else: + raise TypeError("Cryptographic usage mask must be an integer.") + + @property + def lease_time(self): + if self._lease_time: + return self._lease_time.value + else: + return None + + @lease_time.setter + def lease_time(self, value): + if value is None: + self._lease_time = None + elif isinstance(value, six.integer_types): + self._lease_time = primitives.Interval( + value=value, + tag=enums.Tags.LEASE_TIME + ) + else: + raise TypeError("Lease time must be an integer.") + + def read(self, input_stream): + """ + Read the data encoding the Check response payload and decode it into + its constituent parts. + + Args: + input_stream (stream): A data stream containing encoded object + data, supporting a read method; usually a BytearrayStream + object. + + Raises: + ValueError: Raised if the data attribute is missing from the + encoded payload. + """ + super(CheckResponsePayload, self).read(input_stream) + local_stream = utils.BytearrayStream(input_stream.read(self.length)) + + if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, local_stream): + self._unique_identifier = primitives.TextString( + tag=enums.Tags.UNIQUE_IDENTIFIER + ) + self._unique_identifier.read(local_stream) + if self.is_tag_next(enums.Tags.USAGE_LIMITS_COUNT, local_stream): + self._usage_limits_count = primitives.LongInteger( + tag=enums.Tags.USAGE_LIMITS_COUNT + ) + self._usage_limits_count.read(local_stream) + if self.is_tag_next(enums.Tags.CRYPTOGRAPHIC_USAGE_MASK, local_stream): + self._cryptographic_usage_mask = primitives.Integer( + tag=enums.Tags.CRYPTOGRAPHIC_USAGE_MASK + ) + self._cryptographic_usage_mask.read(local_stream) + if self.is_tag_next(enums.Tags.LEASE_TIME, local_stream): + self._lease_time = primitives.Interval( + tag=enums.Tags.LEASE_TIME + ) + self._lease_time.read(local_stream) + + self.is_oversized(local_stream) + + def write(self, output_stream): + """ + Write the data encoding the Check response payload to a stream. + + Args: + output_stream (stream): A data stream in which to encode object + data, supporting a write method; usually a BytearrayStream + object. + + Raises: + ValueError: Raised if the data attribute is not defined. + """ + local_stream = utils.BytearrayStream() + + if self._unique_identifier: + self._unique_identifier.write(local_stream) + if self._usage_limits_count: + self._usage_limits_count.write(local_stream) + if self._cryptographic_usage_mask: + self._cryptographic_usage_mask.write(local_stream) + if self._lease_time: + self._lease_time.write(local_stream) + + self.length = local_stream.length() + super(CheckResponsePayload, self).write(output_stream) + output_stream.write(local_stream.buffer) + + def __eq__(self, other): + if isinstance(other, CheckResponsePayload): + if self.unique_identifier != other.unique_identifier: + return False + elif self.usage_limits_count != other.usage_limits_count: + return False + elif self.cryptographic_usage_mask != \ + other.cryptographic_usage_mask: + return False + elif self.lease_time != other.lease_time: + return False + else: + return True + else: + return NotImplemented + + def __ne__(self, other): + if isinstance(other, CheckResponsePayload): + return not (self == other) + else: + return NotImplemented + + def __repr__(self): + args = ", ".join([ + "unique_identifier='{0}'".format(self.unique_identifier), + "usage_limits_count={0}".format(self.usage_limits_count), + "cryptographic_usage_mask={0}".format( + self.cryptographic_usage_mask + ), + "lease_time={0}".format(self.lease_time) + ]) + return "CheckResponsePayload({0})".format(args) + + def __str__(self): + return str({ + 'unique_identifier': self.unique_identifier, + 'usage_limits_count': self.usage_limits_count, + 'cryptographic_usage_mask': self.cryptographic_usage_mask, + 'lease_time': self.lease_time + }) diff --git a/kmip/tests/unit/core/messages/payloads/test_check.py b/kmip/tests/unit/core/messages/payloads/test_check.py new file mode 100644 index 0000000..484be6b --- /dev/null +++ b/kmip/tests/unit/core/messages/payloads/test_check.py @@ -0,0 +1,1043 @@ +# Copyright (c) 2017 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import testtools + +from kmip.core import utils +from kmip.core.messages import payloads + + +class TestCheckRequestPayload(testtools.TestCase): + """ + Test suite for the Check request payload. + """ + + def setUp(self): + super(TestCheckRequestPayload, self).setUp() + + # Encoding obtained in part from the KMIP 1.1 testing document, + # Section 5.1. The rest of the encoding was built by hand. + # + # This encoding matches the following set of values: + # Request Payload + # Unique Identifier - 2c23217e-f53c-4bdf-ad0a-58a31fd3d4b6 + # Usage Limits Count - 500 + # Cryptographic Usage Mask - Encrypt | Decrypt (4 | 8 -> 12 or C) + # Lease Time - 0 + + self.full_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x00\x60' + b'\x42\x00\x94\x07\x00\x00\x00\x24' + b'\x32\x63\x32\x33\x32\x31\x37\x65\x2D\x66\x35\x33\x63\x2D\x34\x62' + b'\x64\x66\x2D\x61\x64\x30\x61\x2D\x35\x38\x61\x33\x31\x66\x64\x33' + b'\x64\x34\x62\x36\x00\x00\x00\x00' + b'\x42\x00\x96\x03\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x01\xF4' + b'\x42\x00\x2C\x02\x00\x00\x00\x04\x00\x00\x00\x0C\x00\x00\x00\x00' + b'\x42\x00\x49\x0A\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00' + ) + + # This encoding matches the following set of values: + # Request Payload + # Unique Identifier - 2c23217e-f53c-4bdf-ad0a-58a31fd3d4b6 + # Usage Limits Count - 500 + self.partial_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x00\x40' + b'\x42\x00\x94\x07\x00\x00\x00\x24' + b'\x32\x63\x32\x33\x32\x31\x37\x65\x2D\x66\x35\x33\x63\x2D\x34\x62' + b'\x64\x66\x2D\x61\x64\x30\x61\x2D\x35\x38\x61\x33\x31\x66\x64\x33' + b'\x64\x34\x62\x36\x00\x00\x00\x00' + b'\x42\x00\x96\x03\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x01\xF4' + ) + + self.empty_encoding = utils.BytearrayStream( + b'\x42\x00\x79\x01\x00\x00\x00\x00' + ) + + def tearDown(self): + super(TestCheckRequestPayload, self).tearDown() + + def test_init(self): + """ + Test that a Check request payload can be constructed with no arguments. + """ + payload = payloads.CheckRequestPayload() + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.usage_limits_count) + self.assertEqual(None, payload.cryptographic_usage_mask) + self.assertEqual(None, payload.lease_time) + + def test_init_with_args(self): + """ + Test that a Check request payload can be constructed with valid values. + """ + payload = payloads.CheckRequestPayload( + unique_identifier='00000000-1111-2222-3333-444444444444', + usage_limits_count=10, + cryptographic_usage_mask=12, + lease_time=1000000000 + ) + + self.assertEqual( + '00000000-1111-2222-3333-444444444444', + payload.unique_identifier + ) + self.assertEqual(10, payload.usage_limits_count) + self.assertEqual(12, payload.cryptographic_usage_mask) + self.assertEqual(1000000000, payload.lease_time) + + def test_invalid_unique_identifier(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the unique identifier of a Check request payload. + """ + kwargs = {'unique_identifier': 0} + self.assertRaisesRegexp( + TypeError, + "Unique identifier must be a string.", + payloads.CheckRequestPayload, + **kwargs + ) + + payload = payloads.CheckRequestPayload() + args = (payload, 'unique_identifier', 0) + self.assertRaisesRegexp( + TypeError, + "Unique identifier must be a string.", + setattr, + *args + ) + + def test_invalid_usage_limits_count(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the usage limits count of a Check request payload. + """ + kwargs = {'usage_limits_count': 'invalid'} + self.assertRaisesRegexp( + TypeError, + "Usage limits count must be an integer.", + payloads.CheckRequestPayload, + **kwargs + ) + + payload = payloads.CheckRequestPayload() + args = (payload, 'usage_limits_count', 'invalid') + self.assertRaisesRegexp( + TypeError, + "Usage limits count must be an integer.", + setattr, + *args + ) + + def test_invalid_cryptographic_usage_mask(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the cryptographic usage mask of a Check request payload. + """ + kwargs = {'cryptographic_usage_mask': 'invalid'} + self.assertRaisesRegexp( + TypeError, + "Cryptographic usage mask must be an integer.", + payloads.CheckRequestPayload, + **kwargs + ) + + payload = payloads.CheckRequestPayload() + args = (payload, 'cryptographic_usage_mask', 'invalid') + self.assertRaisesRegexp( + TypeError, + "Cryptographic usage mask must be an integer.", + setattr, + *args + ) + + def test_invalid_lease_time(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the lease time of a Check request payload. + """ + kwargs = {'lease_time': 'invalid'} + self.assertRaisesRegexp( + TypeError, + "Lease time must be an integer.", + payloads.CheckRequestPayload, + **kwargs + ) + + payload = payloads.CheckRequestPayload() + args = (payload, 'lease_time', 'invalid') + self.assertRaisesRegexp( + TypeError, + "Lease time must be an integer.", + setattr, + *args + ) + + def test_read(self): + """ + Test that a Check request payload can be read from a data stream. + """ + payload = payloads.CheckRequestPayload() + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.usage_limits_count) + self.assertEqual(None, payload.cryptographic_usage_mask) + self.assertEqual(None, payload.lease_time) + + payload.read(self.full_encoding) + + self.assertEqual( + '2c23217e-f53c-4bdf-ad0a-58a31fd3d4b6', + payload.unique_identifier + ) + self.assertEqual(500, payload.usage_limits_count) + self.assertEqual(12, payload.cryptographic_usage_mask) + self.assertEqual(0, payload.lease_time) + + def test_read_partial(self): + """ + Test that a Check request payload can be read from a partial data + stream. + """ + payload = payloads.CheckRequestPayload() + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.usage_limits_count) + self.assertEqual(None, payload.cryptographic_usage_mask) + self.assertEqual(None, payload.lease_time) + + payload.read(self.partial_encoding) + + self.assertEqual( + '2c23217e-f53c-4bdf-ad0a-58a31fd3d4b6', + payload.unique_identifier + ) + self.assertEqual(500, payload.usage_limits_count) + self.assertEqual(None, payload.cryptographic_usage_mask) + self.assertEqual(None, payload.lease_time) + + def test_read_empty(self): + """ + Test that a Check request payload can be read from an empty data + stream. + """ + payload = payloads.CheckRequestPayload() + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.usage_limits_count) + self.assertEqual(None, payload.cryptographic_usage_mask) + self.assertEqual(None, payload.lease_time) + + payload.read(self.empty_encoding) + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.usage_limits_count) + self.assertEqual(None, payload.cryptographic_usage_mask) + self.assertEqual(None, payload.lease_time) + + def test_write(self): + """ + Test that a Check request payload can be written to a data stream. + """ + payload = payloads.CheckRequestPayload( + unique_identifier='2c23217e-f53c-4bdf-ad0a-58a31fd3d4b6', + usage_limits_count=500, + cryptographic_usage_mask=12, + lease_time=0 + ) + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.full_encoding), len(stream)) + self.assertEqual(str(self.full_encoding), str(stream)) + + def test_write_partial(self): + """ + Test that a partial Check request payload can be written to a data + stream. + """ + payload = payloads.CheckRequestPayload( + unique_identifier='2c23217e-f53c-4bdf-ad0a-58a31fd3d4b6', + usage_limits_count=500 + ) + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.partial_encoding), len(stream)) + self.assertEqual(str(self.partial_encoding), str(stream)) + + def test_write_empty(self): + """ + Test that an empty Check request payload can be written to a data + stream. + """ + payload = payloads.CheckRequestPayload() + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.empty_encoding), len(stream)) + self.assertEqual(str(self.empty_encoding), str(stream)) + + def test_equal_on_equal(self): + """ + Test that the equality operator returns True when comparing two + Check request payloads with the same data. + """ + a = payloads.CheckRequestPayload() + b = payloads.CheckRequestPayload() + + self.assertTrue(a == b) + self.assertTrue(b == a) + + a = payloads.CheckRequestPayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038', + usage_limits_count=200, + cryptographic_usage_mask=4, + lease_time=1511882848 + ) + b = payloads.CheckRequestPayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038', + usage_limits_count=200, + cryptographic_usage_mask=4, + lease_time=1511882848 + ) + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_not_equal_unique_identifier(self): + """ + Test that the equality operator returns False when comparing two + Check request payloads with different unique identifiers. + """ + a = payloads.CheckRequestPayload( + unique_identifier='a' + ) + b = payloads.CheckRequestPayload( + unique_identifier='b' + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_usage_limits_count(self): + """ + Test that the equality operator returns False when comparing two + Check request payloads with different usage limits counts. + """ + a = payloads.CheckRequestPayload( + usage_limits_count=0 + ) + b = payloads.CheckRequestPayload( + usage_limits_count=1 + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_cryptographic_usage_mask(self): + """ + Test that the equality operator returns False when comparing two + Check request payloads with different cryptographic usage masks. + """ + a = payloads.CheckRequestPayload( + cryptographic_usage_mask=4 + ) + b = payloads.CheckRequestPayload( + cryptographic_usage_mask=12 + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_lease_time(self): + """ + Test that the equality operator returns False when comparing two + Check request payloads with different lease times. + """ + a = payloads.CheckRequestPayload( + lease_time=0 + ) + b = payloads.CheckRequestPayload( + lease_time=1511882848 + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_type_mismatch(self): + """ + Test that the equality operator returns False when comparing two + Check request payloads with different types. + """ + a = payloads.CheckRequestPayload() + b = 'invalid' + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_not_equal_on_equal(self): + """ + Test that the inequality operator returns False when comparing two + Check request payloads with the same data. + """ + a = payloads.CheckRequestPayload() + b = payloads.CheckRequestPayload() + + self.assertFalse(a != b) + self.assertFalse(b != a) + + a = payloads.CheckRequestPayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038', + usage_limits_count=200, + cryptographic_usage_mask=4, + lease_time=1511882848 + ) + b = payloads.CheckRequestPayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038', + usage_limits_count=200, + cryptographic_usage_mask=4, + lease_time=1511882848 + ) + + self.assertFalse(a != b) + self.assertFalse(b != a) + + def test_not_equal_on_not_equal_unique_identifier(self): + """ + Test that the inequality operator returns True when comparing two + Check request payloads with different unique identifiers. + """ + a = payloads.CheckRequestPayload( + unique_identifier='a' + ) + b = payloads.CheckRequestPayload( + unique_identifier='b' + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_usage_limits_count(self): + """ + Test that the inequality operator returns True when comparing two + Check request payloads with different usage limits counts. + """ + a = payloads.CheckRequestPayload( + usage_limits_count=0 + ) + b = payloads.CheckRequestPayload( + usage_limits_count=1 + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_cryptographic_usage_mask(self): + """ + Test that the inequality operator returns True when comparing two + Check request payloads with different cryptographic usage masks. + """ + a = payloads.CheckRequestPayload( + cryptographic_usage_mask=4 + ) + b = payloads.CheckRequestPayload( + cryptographic_usage_mask=12 + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_lease_time(self): + """ + Test that the inequality operator returns True when comparing two + Check request payloads with different lease times. + """ + a = payloads.CheckRequestPayload( + lease_time=0 + ) + b = payloads.CheckRequestPayload( + lease_time=1511882848 + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_type_mismatch(self): + """ + Test that the inequality operator returns True when comparing two + Check request payloads with different types. + """ + a = payloads.CheckRequestPayload() + b = 'invalid' + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_repr(self): + """ + Test that repr can be applied to a Check request payload. + """ + payload = payloads.CheckRequestPayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038', + usage_limits_count=1000, + cryptographic_usage_mask=8, + lease_time=1511882898 + ) + expected = ( + "CheckRequestPayload(" + "unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038', " + "usage_limits_count=1000, " + "cryptographic_usage_mask=8, " + "lease_time=1511882898)" + ) + observed = repr(payload) + + self.assertEqual(expected, observed) + + def test_str(self): + """ + Test that str can be applied to a Check request payload + """ + payload = payloads.CheckRequestPayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038', + usage_limits_count=1000, + cryptographic_usage_mask=8, + lease_time=1511882898 + ) + + expected = str({ + 'unique_identifier': '49a1ca88-6bea-4fb2-b450-7e58802c3038', + 'usage_limits_count': 1000, + 'cryptographic_usage_mask': 8, + 'lease_time': 1511882898 + }) + observed = str(payload) + + self.assertEqual(expected, observed) + + +class TestCheckResponsePayload(testtools.TestCase): + """ + Test suite for the Check response payload. + """ + + def setUp(self): + super(TestCheckResponsePayload, self).setUp() + + # Encoding obtained in part from the KMIP 1.1 testing document, + # Section 5.1. The rest of the encoding was built by hand. + # + # This encoding matches the following set of values: + # Response Payload + # Unique Identifier - 2c23217e-f53c-4bdf-ad0a-58a31fd3d4b6 + # Usage Limits Count - 500 + # Cryptographic Usage Mask - Encrypt | Decrypt (4 | 8 -> 12 or C) + # Lease Time - 0 + + self.full_encoding = utils.BytearrayStream( + b'\x42\x00\x7C\x01\x00\x00\x00\x60' + b'\x42\x00\x94\x07\x00\x00\x00\x24' + b'\x32\x63\x32\x33\x32\x31\x37\x65\x2D\x66\x35\x33\x63\x2D\x34\x62' + b'\x64\x66\x2D\x61\x64\x30\x61\x2D\x35\x38\x61\x33\x31\x66\x64\x33' + b'\x64\x34\x62\x36\x00\x00\x00\x00' + b'\x42\x00\x96\x03\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x01\xF4' + b'\x42\x00\x2C\x02\x00\x00\x00\x04\x00\x00\x00\x0C\x00\x00\x00\x00' + b'\x42\x00\x49\x0A\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00' + ) + + # This encoding matches the following set of values: + # Response Payload + # Unique Identifier - 2c23217e-f53c-4bdf-ad0a-58a31fd3d4b6 + # Usage Limits Count - 500 + self.partial_encoding = utils.BytearrayStream( + b'\x42\x00\x7C\x01\x00\x00\x00\x40' + b'\x42\x00\x94\x07\x00\x00\x00\x24' + b'\x32\x63\x32\x33\x32\x31\x37\x65\x2D\x66\x35\x33\x63\x2D\x34\x62' + b'\x64\x66\x2D\x61\x64\x30\x61\x2D\x35\x38\x61\x33\x31\x66\x64\x33' + b'\x64\x34\x62\x36\x00\x00\x00\x00' + b'\x42\x00\x96\x03\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x01\xF4' + ) + + self.empty_encoding = utils.BytearrayStream( + b'\x42\x00\x7C\x01\x00\x00\x00\x00' + ) + + def tearDown(self): + super(TestCheckResponsePayload, self).tearDown() + + def test_init(self): + """ + Test that a Check response payload can be constructed with no + arguments. + """ + payload = payloads.CheckResponsePayload() + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.usage_limits_count) + self.assertEqual(None, payload.cryptographic_usage_mask) + self.assertEqual(None, payload.lease_time) + + def test_init_with_args(self): + """ + Test that a Check response payload can be constructed with valid + values. + """ + payload = payloads.CheckResponsePayload( + unique_identifier='00000000-1111-2222-3333-444444444444', + usage_limits_count=10, + cryptographic_usage_mask=12, + lease_time=1000000000 + ) + + self.assertEqual( + '00000000-1111-2222-3333-444444444444', + payload.unique_identifier + ) + self.assertEqual(10, payload.usage_limits_count) + self.assertEqual(12, payload.cryptographic_usage_mask) + self.assertEqual(1000000000, payload.lease_time) + + def test_invalid_unique_identifier(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the unique identifier of a Check response payload. + """ + kwargs = {'unique_identifier': 0} + self.assertRaisesRegexp( + TypeError, + "Unique identifier must be a string.", + payloads.CheckResponsePayload, + **kwargs + ) + + payload = payloads.CheckResponsePayload() + args = (payload, 'unique_identifier', 0) + self.assertRaisesRegexp( + TypeError, + "Unique identifier must be a string.", + setattr, + *args + ) + + def test_invalid_usage_limits_count(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the usage limits count of a Check response payload. + """ + kwargs = {'usage_limits_count': 'invalid'} + self.assertRaisesRegexp( + TypeError, + "Usage limits count must be an integer.", + payloads.CheckResponsePayload, + **kwargs + ) + + payload = payloads.CheckResponsePayload() + args = (payload, 'usage_limits_count', 'invalid') + self.assertRaisesRegexp( + TypeError, + "Usage limits count must be an integer.", + setattr, + *args + ) + + def test_invalid_cryptographic_usage_mask(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the cryptographic usage mask of a Check response payload. + """ + kwargs = {'cryptographic_usage_mask': 'invalid'} + self.assertRaisesRegexp( + TypeError, + "Cryptographic usage mask must be an integer.", + payloads.CheckResponsePayload, + **kwargs + ) + + payload = payloads.CheckResponsePayload() + args = (payload, 'cryptographic_usage_mask', 'invalid') + self.assertRaisesRegexp( + TypeError, + "Cryptographic usage mask must be an integer.", + setattr, + *args + ) + + def test_invalid_lease_time(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the lease time of a Check response payload. + """ + kwargs = {'lease_time': 'invalid'} + self.assertRaisesRegexp( + TypeError, + "Lease time must be an integer.", + payloads.CheckResponsePayload, + **kwargs + ) + + payload = payloads.CheckResponsePayload() + args = (payload, 'lease_time', 'invalid') + self.assertRaisesRegexp( + TypeError, + "Lease time must be an integer.", + setattr, + *args + ) + + def test_read(self): + """ + Test that a Check response payload can be read from a data stream. + """ + payload = payloads.CheckResponsePayload() + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.usage_limits_count) + self.assertEqual(None, payload.cryptographic_usage_mask) + self.assertEqual(None, payload.lease_time) + + payload.read(self.full_encoding) + + self.assertEqual( + '2c23217e-f53c-4bdf-ad0a-58a31fd3d4b6', + payload.unique_identifier + ) + self.assertEqual(500, payload.usage_limits_count) + self.assertEqual(12, payload.cryptographic_usage_mask) + self.assertEqual(0, payload.lease_time) + + def test_read_partial(self): + """ + Test that a Check response payload can be read from a partial data + stream. + """ + payload = payloads.CheckResponsePayload() + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.usage_limits_count) + self.assertEqual(None, payload.cryptographic_usage_mask) + self.assertEqual(None, payload.lease_time) + + payload.read(self.partial_encoding) + + self.assertEqual( + '2c23217e-f53c-4bdf-ad0a-58a31fd3d4b6', + payload.unique_identifier + ) + self.assertEqual(500, payload.usage_limits_count) + self.assertEqual(None, payload.cryptographic_usage_mask) + self.assertEqual(None, payload.lease_time) + + def test_read_empty(self): + """ + Test that a Check response payload can be read from an empty data + stream. + """ + payload = payloads.CheckResponsePayload() + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.usage_limits_count) + self.assertEqual(None, payload.cryptographic_usage_mask) + self.assertEqual(None, payload.lease_time) + + payload.read(self.empty_encoding) + + self.assertEqual(None, payload.unique_identifier) + self.assertEqual(None, payload.usage_limits_count) + self.assertEqual(None, payload.cryptographic_usage_mask) + self.assertEqual(None, payload.lease_time) + + def test_write(self): + """ + Test that a Check response payload can be written to a data stream. + """ + payload = payloads.CheckResponsePayload( + unique_identifier='2c23217e-f53c-4bdf-ad0a-58a31fd3d4b6', + usage_limits_count=500, + cryptographic_usage_mask=12, + lease_time=0 + ) + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.full_encoding), len(stream)) + self.assertEqual(str(self.full_encoding), str(stream)) + + def test_write_partial(self): + """ + Test that a partial Check response payload can be written to a data + stream. + """ + payload = payloads.CheckResponsePayload( + unique_identifier='2c23217e-f53c-4bdf-ad0a-58a31fd3d4b6', + usage_limits_count=500 + ) + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.partial_encoding), len(stream)) + self.assertEqual(str(self.partial_encoding), str(stream)) + + def test_write_empty(self): + """ + Test that an empty Check response payload can be written to a data + stream. + """ + payload = payloads.CheckResponsePayload() + stream = utils.BytearrayStream() + payload.write(stream) + + self.assertEqual(len(self.empty_encoding), len(stream)) + self.assertEqual(str(self.empty_encoding), str(stream)) + + def test_equal_on_equal(self): + """ + Test that the equality operator returns True when comparing two + Check response payloads with the same data. + """ + a = payloads.CheckResponsePayload() + b = payloads.CheckResponsePayload() + + self.assertTrue(a == b) + self.assertTrue(b == a) + + a = payloads.CheckResponsePayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038', + usage_limits_count=200, + cryptographic_usage_mask=4, + lease_time=1511882848 + ) + b = payloads.CheckResponsePayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038', + usage_limits_count=200, + cryptographic_usage_mask=4, + lease_time=1511882848 + ) + + self.assertTrue(a == b) + self.assertTrue(b == a) + + def test_equal_on_not_equal_unique_identifier(self): + """ + Test that the equality operator returns False when comparing two + Check response payloads with different unique identifiers. + """ + a = payloads.CheckResponsePayload( + unique_identifier='a' + ) + b = payloads.CheckResponsePayload( + unique_identifier='b' + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_usage_limits_count(self): + """ + Test that the equality operator returns False when comparing two + Check response payloads with different usage limits counts. + """ + a = payloads.CheckResponsePayload( + usage_limits_count=0 + ) + b = payloads.CheckResponsePayload( + usage_limits_count=1 + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_cryptographic_usage_mask(self): + """ + Test that the equality operator returns False when comparing two + Check response payloads with different cryptographic usage masks. + """ + a = payloads.CheckResponsePayload( + cryptographic_usage_mask=4 + ) + b = payloads.CheckResponsePayload( + cryptographic_usage_mask=12 + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_not_equal_lease_time(self): + """ + Test that the equality operator returns False when comparing two + Check response payloads with different lease times. + """ + a = payloads.CheckResponsePayload( + lease_time=0 + ) + b = payloads.CheckResponsePayload( + lease_time=1511882848 + ) + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_equal_on_type_mismatch(self): + """ + Test that the equality operator returns False when comparing two + Check response payloads with different types. + """ + a = payloads.CheckResponsePayload() + b = 'invalid' + + self.assertFalse(a == b) + self.assertFalse(b == a) + + def test_not_equal_on_equal(self): + """ + Test that the inequality operator returns False when comparing two + Check response payloads with the same data. + """ + a = payloads.CheckResponsePayload() + b = payloads.CheckResponsePayload() + + self.assertFalse(a != b) + self.assertFalse(b != a) + + a = payloads.CheckResponsePayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038', + usage_limits_count=200, + cryptographic_usage_mask=4, + lease_time=1511882848 + ) + b = payloads.CheckResponsePayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038', + usage_limits_count=200, + cryptographic_usage_mask=4, + lease_time=1511882848 + ) + + self.assertFalse(a != b) + self.assertFalse(b != a) + + def test_not_equal_on_not_equal_unique_identifier(self): + """ + Test that the inequality operator returns True when comparing two + Check response payloads with different unique identifiers. + """ + a = payloads.CheckResponsePayload( + unique_identifier='a' + ) + b = payloads.CheckResponsePayload( + unique_identifier='b' + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_usage_limits_count(self): + """ + Test that the inequality operator returns True when comparing two + Check response payloads with different usage limits counts. + """ + a = payloads.CheckResponsePayload( + usage_limits_count=0 + ) + b = payloads.CheckResponsePayload( + usage_limits_count=1 + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_cryptographic_usage_mask(self): + """ + Test that the inequality operator returns True when comparing two + Check response payloads with different cryptographic usage masks. + """ + a = payloads.CheckResponsePayload( + cryptographic_usage_mask=4 + ) + b = payloads.CheckResponsePayload( + cryptographic_usage_mask=12 + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_not_equal_lease_time(self): + """ + Test that the inequality operator returns True when comparing two + Check response payloads with different lease times. + """ + a = payloads.CheckResponsePayload( + lease_time=0 + ) + b = payloads.CheckResponsePayload( + lease_time=1511882848 + ) + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_not_equal_on_type_mismatch(self): + """ + Test that the inequality operator returns True when comparing two + Check response payloads with different types. + """ + a = payloads.CheckResponsePayload() + b = 'invalid' + + self.assertTrue(a != b) + self.assertTrue(b != a) + + def test_repr(self): + """ + Test that repr can be applied to a Check response payload. + """ + payload = payloads.CheckResponsePayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038', + usage_limits_count=1000, + cryptographic_usage_mask=8, + lease_time=1511882898 + ) + expected = ( + "CheckResponsePayload(" + "unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038', " + "usage_limits_count=1000, " + "cryptographic_usage_mask=8, " + "lease_time=1511882898)" + ) + observed = repr(payload) + + self.assertEqual(expected, observed) + + def test_str(self): + """ + Test that str can be applied to a Check response payload + """ + payload = payloads.CheckResponsePayload( + unique_identifier='49a1ca88-6bea-4fb2-b450-7e58802c3038', + usage_limits_count=1000, + cryptographic_usage_mask=8, + lease_time=1511882898 + ) + + expected = str({ + 'unique_identifier': '49a1ca88-6bea-4fb2-b450-7e58802c3038', + 'usage_limits_count': 1000, + 'cryptographic_usage_mask': 8, + 'lease_time': 1511882898 + }) + observed = str(payload) + + self.assertEqual(expected, observed)