Update the ResponseHeader to support the server hashed password

This change updates the ResponseHeader to support the new server
hashed password field added in KMIP 2.0. Unit tests have been
added to cover the change.
This commit is contained in:
Peter Hamilton 2019-05-09 15:08:53 -04:00 committed by Peter Hamilton
parent 4a9690165a
commit 3b7d9bc21f
2 changed files with 165 additions and 9 deletions

View File

@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import six
from kmip.core import enums
from kmip.core.enums import Tags
@ -143,13 +145,36 @@ class ResponseHeader(Struct):
def __init__(self,
protocol_version=None,
time_stamp=None,
batch_count=None):
batch_count=None,
server_hashed_password=None):
super(ResponseHeader, self).__init__(tag=Tags.RESPONSE_HEADER)
self.protocol_version = protocol_version
self.time_stamp = time_stamp
self.batch_count = batch_count
self.server_hashed_password = server_hashed_password
self.validate()
@property
def server_hashed_password(self):
if self._server_hashed_password:
return self._server_hashed_password.value
return None
@server_hashed_password.setter
def server_hashed_password(self, value):
if value is None:
self._server_hashed_password = None
elif isinstance(value, six.binary_type):
self._server_hashed_password = primitives.ByteString(
value=value,
tag=enums.Tags.SERVER_HASHED_PASSWORD
)
else:
raise TypeError(
"The server hashed password must be a binary string."
)
def read(self, istream, kmip_version=enums.KMIPVersion.KMIP_1_0):
super(ResponseHeader, self).read(
istream,
@ -163,6 +188,14 @@ class ResponseHeader(Struct):
self.time_stamp = contents.TimeStamp()
self.time_stamp.read(tstream, kmip_version=kmip_version)
if kmip_version >= enums.KMIPVersion.KMIP_2_0:
if self.is_tag_next(enums.Tags.SERVER_HASHED_PASSWORD, tstream):
server_hashed_password = primitives.ByteString(
tag=enums.Tags.SERVER_HASHED_PASSWORD
)
server_hashed_password.read(tstream, kmip_version=kmip_version)
self._server_hashed_password = server_hashed_password
self.batch_count = contents.BatchCount()
self.batch_count.read(tstream, kmip_version=kmip_version)
@ -175,6 +208,14 @@ class ResponseHeader(Struct):
# Write the contents of a response header to the stream
self.protocol_version.write(tstream, kmip_version=kmip_version)
self.time_stamp.write(tstream, kmip_version=kmip_version)
if kmip_version >= enums.KMIPVersion.KMIP_2_0:
if self._server_hashed_password:
self._server_hashed_password.write(
tstream,
kmip_version=kmip_version
)
self.batch_count.write(tstream, kmip_version=kmip_version)
# Write the length and value of the request header

View File

@ -126,16 +126,12 @@ class TestRequestBatchItem(testtools.TestCase):
request_batch_item.operation.value
)
self.assertTrue(request_batch_item.ephemeral)
# self.assertEqual(
# payloads.DestroyRequestPayload(
# unique_identifier=attr.UniqueIdentifier(
# value="fb4b5b9c-6188-4c63-8142-fe9c328129fc"
# )
# ),
# request_batch_item.request_payload
# )
def test_write_kmip_2_0(self):
"""
Test that a RequestBatchItem structure can be written to a data
stream when including KMIP 2.0 fields.
"""
request_batch_item = messages.RequestBatchItem(
operation=contents.Operation(enums.Operation.DESTROY),
ephemeral=True,
@ -156,6 +152,125 @@ class TestRequestBatchItem(testtools.TestCase):
self.assertEqual(str(self.encoding_kmip_2_0), str(buffer))
class TestResponseHeader(testtools.TestCase):
def setUp(self):
super(TestResponseHeader, self).setUp()
# Encoding obtained from the KMIP 1.1 testing document,
# Section 3.1.1. Modified manually to include the server hashed
# password.
#
# This encoding matches the following set of values:
# Response Header
# Protocol Version
# Protocol Version Major - 1
# Protocol Version Minor - 1
# Time Stamp - 0x4F9A54E5 (Fri Apr 27 10:12:21 CEST 2012)
# Server Hashed Password -
# d3c3bf31b8bfbbef53120fd0951be538341a3fffbd75cb7076e556ab0129
# 0f6b
# Batch Count - 1
self.encoding_kmip_2_0 = utils.BytearrayStream(
b'\x42\x00\x7A\x01\x00\x00\x00\x70'
b'\x42\x00\x69\x01\x00\x00\x00\x20'
b'\x42\x00\x6A\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00'
b'\x42\x00\x6B\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00'
b'\x42\x00\x92\x09\x00\x00\x00\x08\x00\x00\x00\x00\x4F\x9A\x54\xE5'
b'\x42\x01\x55\x08\x00\x00\x00\x20'
b'\xD3\xC3\xBF\x31\xB8\xBF\xBB\xEF\x53\x12\x0F\xD0\x95\x1B\xE5\x38'
b'\x34\x1A\x3F\xFF\xBD\x75\xCB\x70\x76\xE5\x56\xAB\x01\x29\x0F\x6B'
b'\x42\x00\x0D\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00'
)
def tearDown(self):
super(TestResponseHeader, self).tearDown()
def test_invalid_server_hashed_password(self):
"""
Test that a TypeError is raised when an invalid value is used to set
the server hashed password of a ResponseHeader.
"""
kwargs = {"server_hashed_password": []}
self.assertRaisesRegex(
TypeError,
"The server hashed password must be a binary string.",
messages.ResponseHeader,
**kwargs
)
args = (
messages.ResponseHeader(),
"server_hashed_password",
[]
)
self.assertRaisesRegex(
TypeError,
"The server hashed password must be a binary string.",
setattr,
*args
)
def test_read_kmip_2_0(self):
"""
Test that a ResponseHeader structure can be correctly read in
from a data stream when including KMIP 2.0 fields.
"""
response_header = messages.ResponseHeader()
self.assertIsNone(response_header.protocol_version)
self.assertIsNone(response_header.time_stamp)
self.assertIsNone(response_header.server_hashed_password)
self.assertIsNone(response_header.batch_count)
response_header.read(
self.encoding_kmip_2_0,
kmip_version=enums.KMIPVersion.KMIP_2_0
)
self.assertEqual(
contents.ProtocolVersion(major=1, minor=1),
response_header.protocol_version
)
self.assertEqual(0x4F9A54E5, response_header.time_stamp.value)
self.assertEqual(
(
b'\xD3\xC3\xBF\x31\xB8\xBF\xBB\xEF'
b'\x53\x12\x0F\xD0\x95\x1B\xE5\x38'
b'\x34\x1A\x3F\xFF\xBD\x75\xCB\x70'
b'\x76\xE5\x56\xAB\x01\x29\x0F\x6B'
),
response_header.server_hashed_password
)
self.assertEqual(1, response_header.batch_count.value)
def test_write_kmip_2_0(self):
"""
Test that a ResponseHeader structure can be written to a data
stream when including KMIP 2.0 fields.
"""
response_header = messages.ResponseHeader(
protocol_version=contents.ProtocolVersion(major=1, minor=1),
time_stamp=contents.TimeStamp(value=0x4F9A54E5),
server_hashed_password=(
b'\xD3\xC3\xBF\x31\xB8\xBF\xBB\xEF'
b'\x53\x12\x0F\xD0\x95\x1B\xE5\x38'
b'\x34\x1A\x3F\xFF\xBD\x75\xCB\x70'
b'\x76\xE5\x56\xAB\x01\x29\x0F\x6B'
),
batch_count=contents.BatchCount(value=1)
)
buffer = utils.BytearrayStream()
response_header.write(
buffer,
kmip_version=enums.KMIPVersion.KMIP_2_0
)
self.assertEqual(len(self.encoding_kmip_2_0), len(buffer))
self.assertEqual(str(self.encoding_kmip_2_0), str(buffer))
class TestRequestMessage(TestCase):
def setUp(self):