diff --git a/kmip/core/messages/messages.py b/kmip/core/messages/messages.py index fd4e811..359f38b 100644 --- a/kmip/core/messages/messages.py +++ b/kmip/core/messages/messages.py @@ -13,6 +13,8 @@ # License for the specific language governing permissions and limitations # under the License. +import six + from kmip.core import enums from kmip.core.enums import Tags @@ -143,13 +145,36 @@ class ResponseHeader(Struct): def __init__(self, protocol_version=None, time_stamp=None, - batch_count=None): + batch_count=None, + server_hashed_password=None): super(ResponseHeader, self).__init__(tag=Tags.RESPONSE_HEADER) self.protocol_version = protocol_version self.time_stamp = time_stamp self.batch_count = batch_count + self.server_hashed_password = server_hashed_password + self.validate() + @property + def server_hashed_password(self): + if self._server_hashed_password: + return self._server_hashed_password.value + return None + + @server_hashed_password.setter + def server_hashed_password(self, value): + if value is None: + self._server_hashed_password = None + elif isinstance(value, six.binary_type): + self._server_hashed_password = primitives.ByteString( + value=value, + tag=enums.Tags.SERVER_HASHED_PASSWORD + ) + else: + raise TypeError( + "The server hashed password must be a binary string." + ) + def read(self, istream, kmip_version=enums.KMIPVersion.KMIP_1_0): super(ResponseHeader, self).read( istream, @@ -163,6 +188,14 @@ class ResponseHeader(Struct): self.time_stamp = contents.TimeStamp() self.time_stamp.read(tstream, kmip_version=kmip_version) + if kmip_version >= enums.KMIPVersion.KMIP_2_0: + if self.is_tag_next(enums.Tags.SERVER_HASHED_PASSWORD, tstream): + server_hashed_password = primitives.ByteString( + tag=enums.Tags.SERVER_HASHED_PASSWORD + ) + server_hashed_password.read(tstream, kmip_version=kmip_version) + self._server_hashed_password = server_hashed_password + self.batch_count = contents.BatchCount() self.batch_count.read(tstream, kmip_version=kmip_version) @@ -175,6 +208,14 @@ class ResponseHeader(Struct): # Write the contents of a response header to the stream self.protocol_version.write(tstream, kmip_version=kmip_version) self.time_stamp.write(tstream, kmip_version=kmip_version) + + if kmip_version >= enums.KMIPVersion.KMIP_2_0: + if self._server_hashed_password: + self._server_hashed_password.write( + tstream, + kmip_version=kmip_version + ) + self.batch_count.write(tstream, kmip_version=kmip_version) # Write the length and value of the request header diff --git a/kmip/tests/unit/core/messages/test_messages.py b/kmip/tests/unit/core/messages/test_messages.py index ad25de0..ff776a4 100644 --- a/kmip/tests/unit/core/messages/test_messages.py +++ b/kmip/tests/unit/core/messages/test_messages.py @@ -126,16 +126,12 @@ class TestRequestBatchItem(testtools.TestCase): request_batch_item.operation.value ) self.assertTrue(request_batch_item.ephemeral) - # self.assertEqual( - # payloads.DestroyRequestPayload( - # unique_identifier=attr.UniqueIdentifier( - # value="fb4b5b9c-6188-4c63-8142-fe9c328129fc" - # ) - # ), - # request_batch_item.request_payload - # ) def test_write_kmip_2_0(self): + """ + Test that a RequestBatchItem structure can be written to a data + stream when including KMIP 2.0 fields. + """ request_batch_item = messages.RequestBatchItem( operation=contents.Operation(enums.Operation.DESTROY), ephemeral=True, @@ -156,6 +152,125 @@ class TestRequestBatchItem(testtools.TestCase): self.assertEqual(str(self.encoding_kmip_2_0), str(buffer)) +class TestResponseHeader(testtools.TestCase): + + def setUp(self): + super(TestResponseHeader, self).setUp() + + # Encoding obtained from the KMIP 1.1 testing document, + # Section 3.1.1. Modified manually to include the server hashed + # password. + # + # This encoding matches the following set of values: + # Response Header + # Protocol Version + # Protocol Version Major - 1 + # Protocol Version Minor - 1 + # Time Stamp - 0x4F9A54E5 (Fri Apr 27 10:12:21 CEST 2012) + # Server Hashed Password - + # d3c3bf31b8bfbbef53120fd0951be538341a3fffbd75cb7076e556ab0129 + # 0f6b + # Batch Count - 1 + self.encoding_kmip_2_0 = utils.BytearrayStream( + b'\x42\x00\x7A\x01\x00\x00\x00\x70' + b'\x42\x00\x69\x01\x00\x00\x00\x20' + b'\x42\x00\x6A\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x00\x6B\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + b'\x42\x00\x92\x09\x00\x00\x00\x08\x00\x00\x00\x00\x4F\x9A\x54\xE5' + b'\x42\x01\x55\x08\x00\x00\x00\x20' + b'\xD3\xC3\xBF\x31\xB8\xBF\xBB\xEF\x53\x12\x0F\xD0\x95\x1B\xE5\x38' + b'\x34\x1A\x3F\xFF\xBD\x75\xCB\x70\x76\xE5\x56\xAB\x01\x29\x0F\x6B' + b'\x42\x00\x0D\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00' + ) + + def tearDown(self): + super(TestResponseHeader, self).tearDown() + + def test_invalid_server_hashed_password(self): + """ + Test that a TypeError is raised when an invalid value is used to set + the server hashed password of a ResponseHeader. + """ + kwargs = {"server_hashed_password": []} + self.assertRaisesRegex( + TypeError, + "The server hashed password must be a binary string.", + messages.ResponseHeader, + **kwargs + ) + + args = ( + messages.ResponseHeader(), + "server_hashed_password", + [] + ) + self.assertRaisesRegex( + TypeError, + "The server hashed password must be a binary string.", + setattr, + *args + ) + + def test_read_kmip_2_0(self): + """ + Test that a ResponseHeader structure can be correctly read in + from a data stream when including KMIP 2.0 fields. + """ + response_header = messages.ResponseHeader() + + self.assertIsNone(response_header.protocol_version) + self.assertIsNone(response_header.time_stamp) + self.assertIsNone(response_header.server_hashed_password) + self.assertIsNone(response_header.batch_count) + + response_header.read( + self.encoding_kmip_2_0, + kmip_version=enums.KMIPVersion.KMIP_2_0 + ) + + self.assertEqual( + contents.ProtocolVersion(major=1, minor=1), + response_header.protocol_version + ) + self.assertEqual(0x4F9A54E5, response_header.time_stamp.value) + self.assertEqual( + ( + b'\xD3\xC3\xBF\x31\xB8\xBF\xBB\xEF' + b'\x53\x12\x0F\xD0\x95\x1B\xE5\x38' + b'\x34\x1A\x3F\xFF\xBD\x75\xCB\x70' + b'\x76\xE5\x56\xAB\x01\x29\x0F\x6B' + ), + response_header.server_hashed_password + ) + self.assertEqual(1, response_header.batch_count.value) + + def test_write_kmip_2_0(self): + """ + Test that a ResponseHeader structure can be written to a data + stream when including KMIP 2.0 fields. + """ + response_header = messages.ResponseHeader( + protocol_version=contents.ProtocolVersion(major=1, minor=1), + time_stamp=contents.TimeStamp(value=0x4F9A54E5), + server_hashed_password=( + b'\xD3\xC3\xBF\x31\xB8\xBF\xBB\xEF' + b'\x53\x12\x0F\xD0\x95\x1B\xE5\x38' + b'\x34\x1A\x3F\xFF\xBD\x75\xCB\x70' + b'\x76\xE5\x56\xAB\x01\x29\x0F\x6B' + ), + batch_count=contents.BatchCount(value=1) + ) + + buffer = utils.BytearrayStream() + response_header.write( + buffer, + kmip_version=enums.KMIPVersion.KMIP_2_0 + ) + + self.assertEqual(len(self.encoding_kmip_2_0), len(buffer)) + self.assertEqual(str(self.encoding_kmip_2_0), str(buffer)) + + class TestRequestMessage(TestCase): def setUp(self):