From 3b7d9bc21f588622fb0e21e1ddbe560513b16778 Mon Sep 17 00:00:00 2001 From: Peter Hamilton Date: Thu, 9 May 2019 15:08:53 -0400 Subject: [PATCH] Update the ResponseHeader to support the server hashed password This change updates the ResponseHeader to support the new server hashed password field added in KMIP 2.0. Unit tests have been added to cover the change. --- kmip/core/messages/messages.py | 43 +++++- .../tests/unit/core/messages/test_messages.py | 131 ++++++++++++++++-- 2 files changed, 165 insertions(+), 9 deletions(-) diff --git a/kmip/core/messages/messages.py b/kmip/core/messages/messages.py index fd4e811..359f38b 100644 --- a/kmip/core/messages/messages.py +++ b/kmip/core/messages/messages.py @@ -13,6 +13,8 @@ # License for the specific language governing permissions and limitations # under the License. +import six + from kmip.core import enums from kmip.core.enums import Tags @@ -143,13 +145,36 @@ class ResponseHeader(Struct): def __init__(self, protocol_version=None, time_stamp=None, - batch_count=None): + batch_count=None, + server_hashed_password=None): super(ResponseHeader, self).__init__(tag=Tags.RESPONSE_HEADER) self.protocol_version = protocol_version self.time_stamp = time_stamp self.batch_count = batch_count + self.server_hashed_password = server_hashed_password + self.validate() + @property + def server_hashed_password(self): + if self._server_hashed_password: + return self._server_hashed_password.value + return None + + @server_hashed_password.setter + def server_hashed_password(self, value): + if value is None: + self._server_hashed_password = None + elif isinstance(value, six.binary_type): + self._server_hashed_password = primitives.ByteString( + value=value, + tag=enums.Tags.SERVER_HASHED_PASSWORD + ) + else: + raise TypeError( + "The server hashed password must be a binary string." + ) + def read(self, istream, kmip_version=enums.KMIPVersion.KMIP_1_0): super(ResponseHeader, self).read( istream, @@ -163,6 +188,14 @@ class ResponseHeader(Struct): self.time_stamp = contents.TimeStamp() self.time_stamp.read(tstream, kmip_version=kmip_version) + if kmip_version >= enums.KMIPVersion.KMIP_2_0: + if self.is_tag_next(enums.Tags.SERVER_HASHED_PASSWORD, tstream): + server_hashed_password = primitives.ByteString( + tag=enums.Tags.SERVER_HASHED_PASSWORD + ) + server_hashed_password.read(tstream, kmip_version=kmip_version) + self._server_hashed_password = server_hashed_password + self.batch_count = contents.BatchCount() self.batch_count.read(tstream, kmip_version=kmip_version) @@ -175,6 +208,14 @@ class ResponseHeader(Struct): # Write the contents of a response header to the stream self.protocol_version.write(tstream, kmip_version=kmip_version) self.time_stamp.write(tstream, kmip_version=kmip_version) + + if kmip_version >= enums.KMIPVersion.KMIP_2_0: + if self._server_hashed_password: + self._server_hashed_password.write( + tstream, + kmip_version=kmip_version + ) + self.batch_count.write(tstream, kmip_version=kmip_version) # Write the length and value of the request header diff --git a/kmip/tests/unit/core/messages/test_messages.py b/kmip/tests/unit/core/messages/test_messages.py index ad25de0..ff776a4 100644 --- a/kmip/tests/unit/core/messages/test_messages.py +++ b/kmip/tests/unit/core/messages/test_messages.py @@ -126,16 +126,12 @@ class TestRequestBatchItem(testtools.TestCase): request_batch_item.operation.value ) self.assertTrue(request_batch_item.ephemeral) - # self.assertEqual( - # payloads.DestroyRequestPayload( - # unique_identifier=attr.UniqueIdentifier( - # value="fb4b5b9c-6188-4c63-8142-fe9c328129fc" - # ) - # ), - # request_batch_item.request_payload - # ) def test_write_kmip_2_0(self): + """ + Test that a RequestBatchItem structure can be written to a data + stream when including KMIP 2.0 fields. + """ request_batch_item = messages.RequestBatchItem( operation=contents.Operation(enums.Operation.DESTROY), ephemeral=True, @@ -156,6 +152,125 @@ class TestRequestBatchItem(testtools.TestCase): self.assertEqual(str(self.encoding_kmip_2_0), str(buffer)) +class TestResponseHeader(testtools.TestCase): + + def setUp(self): + super(TestResponseHeader, self).setUp() + + # Encoding obtained from the KMIP 1.1 testing document, + # Section 3.1.1. Modified manually to include the server hashed + # password. + # + # This encoding matches the following set of values: + # Response Header + # Protocol Version + # Protocol Version Major - 1 + # Protocol Version Minor - 1 + # Time Stamp - 0x4F9A54E5 (Fri Apr 27 10:12:21 CEST 2012) + # Server Hashed Password - + # d3c3bf31b8bfbbef53120fd0951be538341a3fffbd75cb7076e556ab0129 + # 0f6b + # Batch Count - 1 + self.encoding_kmip_2_0 = utils.BytearrayStream( + b'\x42\x00\x7A\x01\x00\x00\x00\x70' + b'\x42\x00\x69\x01\x00\x00\x00\x20' + b'\x42\x00\x6A\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x00\x6B\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x00\x92\x09\x00\x00\x00\x08\x00\x00\x00\x00\x4F\x9A\x54\xE5' + b'\x42\x01\x55\x08\x00\x00\x00\x20' + b'\xD3\xC3\xBF\x31\xB8\xBF\xBB\xEF\x53\x12\x0F\xD0\x95\x1B\xE5\x38' + b'\x34\x1A\x3F\xFF\xBD\x75\xCB\x70\x76\xE5\x56\xAB\x01\x29\x0F\x6B' + b'\x42\x00\x0D\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + ) + + def tearDown(self): + super(TestResponseHeader, self).tearDown() + + def test_invalid_server_hashed_password(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the server hashed password of a ResponseHeader. + """ + kwargs = {"server_hashed_password": []} + self.assertRaisesRegex( + TypeError, + "The server hashed password must be a binary string.", + messages.ResponseHeader, + **kwargs + ) + + args = ( + messages.ResponseHeader(), + "server_hashed_password", + [] + ) + self.assertRaisesRegex( + TypeError, + "The server hashed password must be a binary string.", + setattr, + *args + ) + + def test_read_kmip_2_0(self): + """ + Test that a ResponseHeader structure can be correctly read in + from a data stream when including KMIP 2.0 fields. + """ + response_header = messages.ResponseHeader() + + self.assertIsNone(response_header.protocol_version) + self.assertIsNone(response_header.time_stamp) + self.assertIsNone(response_header.server_hashed_password) + self.assertIsNone(response_header.batch_count) + + response_header.read( + self.encoding_kmip_2_0, + kmip_version=enums.KMIPVersion.KMIP_2_0 + ) + + self.assertEqual( + contents.ProtocolVersion(major=1, minor=1), + response_header.protocol_version + ) + self.assertEqual(0x4F9A54E5, response_header.time_stamp.value) + self.assertEqual( + ( + b'\xD3\xC3\xBF\x31\xB8\xBF\xBB\xEF' + b'\x53\x12\x0F\xD0\x95\x1B\xE5\x38' + b'\x34\x1A\x3F\xFF\xBD\x75\xCB\x70' + b'\x76\xE5\x56\xAB\x01\x29\x0F\x6B' + ), + response_header.server_hashed_password + ) + self.assertEqual(1, response_header.batch_count.value) + + def test_write_kmip_2_0(self): + """ + Test that a ResponseHeader structure can be written to a data + stream when including KMIP 2.0 fields. + """ + response_header = messages.ResponseHeader( + protocol_version=contents.ProtocolVersion(major=1, minor=1), + time_stamp=contents.TimeStamp(value=0x4F9A54E5), + server_hashed_password=( + b'\xD3\xC3\xBF\x31\xB8\xBF\xBB\xEF' + b'\x53\x12\x0F\xD0\x95\x1B\xE5\x38' + b'\x34\x1A\x3F\xFF\xBD\x75\xCB\x70' + b'\x76\xE5\x56\xAB\x01\x29\x0F\x6B' + ), + batch_count=contents.BatchCount(value=1) + ) + + buffer = utils.BytearrayStream() + response_header.write( + buffer, + kmip_version=enums.KMIPVersion.KMIP_2_0 + ) + + self.assertEqual(len(self.encoding_kmip_2_0), len(buffer)) + self.assertEqual(str(self.encoding_kmip_2_0), str(buffer)) + + class TestRequestMessage(TestCase): def setUp(self):