Update the Locate payloads to support KMIP 2.0

This change updates the Locate payloads to support KMIP 2.0
features, including swapping out individual Attribute structures
for the new Attributes structure in the request payload. Unit
tests have been added to cover these changes.
This commit is contained in:
Peter Hamilton 2019-03-11 16:27:21 -04:00 committed by Peter Hamilton
parent 938a0a3b16
commit fe3095c22b
2 changed files with 189 additions and 8 deletions

View File

@ -16,6 +16,7 @@
import six
from kmip.core import enums
from kmip.core import exceptions
from kmip.core import objects
from kmip.core import primitives
from kmip.core import utils
@ -61,7 +62,8 @@ class LocateRequestPayload(primitives.Struct):
objects. Optional, defaults to None.
attributes (list): A list of Attribute structures containing the
attribute values that should be used to filter and match
objects. Optional, defaults to None.
objects. Optional, defaults to None. Required for read/write
for KMIP 2.0+.
"""
super(LocateRequestPayload, self).__init__(enums.Tags.REQUEST_PAYLOAD)
@ -198,6 +200,10 @@ class LocateRequestPayload(primitives.Struct):
kmip_version (KMIPVersion): An enumeration defining the KMIP
version with which the object will be decoded. Optional,
defaults to KMIP 1.0.
Raises:
InvalidKmipEncoding: Raised if the attributes structure is missing
from the encoded payload for KMIP 2.0+ encodings.
"""
super(LocateRequestPayload, self).read(
input_buffer,
@ -242,10 +248,25 @@ class LocateRequestPayload(primitives.Struct):
kmip_version=kmip_version
)
while self.is_tag_next(enums.Tags.ATTRIBUTE, local_buffer):
attribute = objects.Attribute()
attribute.read(local_buffer, kmip_version=kmip_version)
self._attributes.append(attribute)
if kmip_version < enums.KMIPVersion.KMIP_2_0:
while self.is_tag_next(enums.Tags.ATTRIBUTE, local_buffer):
attribute = objects.Attribute()
attribute.read(local_buffer, kmip_version=kmip_version)
self._attributes.append(attribute)
else:
if self.is_tag_next(enums.Tags.ATTRIBUTES, local_buffer):
attributes = objects.Attributes()
attributes.read(local_buffer, kmip_version=kmip_version)
# TODO (ph) Add a new utility to avoid using TemplateAttributes
temp_attr = objects.convert_attributes_to_template_attribute(
attributes
)
self._attributes = temp_attr.attributes
else:
raise exceptions.InvalidKmipEncoding(
"The Locate request payload encoding is missing the "
"attributes structure."
)
def write(self, output_buffer, kmip_version=enums.KMIPVersion.KMIP_1_0):
"""
@ -278,9 +299,28 @@ class LocateRequestPayload(primitives.Struct):
kmip_version=kmip_version
)
if self._attributes:
for attribute in self.attributes:
attribute.write(local_buffer, kmip_version=kmip_version)
if kmip_version < enums.KMIPVersion.KMIP_2_0:
if self._attributes:
for attribute in self.attributes:
attribute.write(
local_buffer,
kmip_version=kmip_version
)
else:
if self._attributes:
# TODO (ph) Add a new utility to avoid using TemplateAttributes
template_attribute = objects.TemplateAttribute(
attributes=self.attributes
)
attributes = objects.convert_template_attribute_to_attributes(
template_attribute
)
attributes.write(local_buffer, kmip_version=kmip_version)
else:
raise exceptions.InvalidField(
"The Locate request payload is missing the attributes "
"list."
)
self.length = local_buffer.length()
super(LocateRequestPayload, self).write(

View File

@ -16,6 +16,7 @@
import testtools
from kmip.core import enums
from kmip.core import exceptions
from kmip.core import objects
from kmip.core import primitives
from kmip.core import utils
@ -54,6 +55,28 @@ class TestLocateRequestPayload(testtools.TestCase):
b'\x6F\x75\x70\x00\x00\x00\x00\x00'
)
# Encoding obtained from the KMIP 1.1 testing document, Section 13.3.5.
# Modified to include the Offset Items, Storage Status Mask, and Object
# Group Member fields. Manually converted to the KMIP 2.0 format.
#
# This encoding matches the following set of values:
# Request Payload
# Maximum Items - 1
# Offset Items - 1
# Storage Status Mask - Online Storage | Archival Storage
# Object Group Member - Group Member Default
# Attributes
# Object Type - Public Key
self.full_encoding_with_attributes = utils.BytearrayStream(
b'\x42\x00\x79\x01\x00\x00\x00\x58'
b'\x42\x00\x4F\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00'
b'\x42\x00\xD4\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00'
b'\x42\x00\x8E\x02\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00'
b'\x42\x00\xAC\x05\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00\x00'
b'\x42\x01\x25\x01\x00\x00\x00\x10'
b'\x42\x00\x57\x05\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00'
)
# Encoding obtained from the KMIP 1.1 testing document, Section 15.3.4.
# Modified to include the Offset Items and Storage Status Mask fields.
#
@ -374,6 +397,53 @@ class TestLocateRequestPayload(testtools.TestCase):
payload.attributes[0]
)
def test_read_kmip_2_0(self):
"""
Test that a Locate request payload can be read from a data stream
encoded with the KMIP 2.0 format.
"""
payload = payloads.LocateRequestPayload()
self.assertIsNone(payload.maximum_items)
self.assertIsNone(payload.offset_items)
self.assertIsNone(payload.storage_status_mask)
self.assertIsNone(payload.object_group_member)
self.assertEqual([], payload.attributes)
payload.read(
self.full_encoding_with_attributes,
kmip_version=enums.KMIPVersion.KMIP_2_0
)
self.assertEqual(1, payload.maximum_items)
self.assertEqual(1, payload.offset_items)
self.assertEqual(
enums.get_bit_mask_from_enumerations(
[
enums.StorageStatusMask.ONLINE_STORAGE,
enums.StorageStatusMask.ARCHIVAL_STORAGE
]
),
payload.storage_status_mask
)
self.assertEqual(
enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT,
payload.object_group_member
)
self.assertIsInstance(payload.attributes, list)
self.assertEqual(1, len(payload.attributes))
self.assertEqual(
objects.Attribute(
attribute_name=objects.Attribute.AttributeName("Object Type"),
attribute_value=primitives.Enumeration(
enums.ObjectType,
value=enums.ObjectType.PUBLIC_KEY,
tag=enums.Tags.OBJECT_TYPE
)
),
payload.attributes[0]
)
def test_read_missing_maximum_items(self):
"""
Test that a Locate request payload can be read from a data stream
@ -567,6 +637,25 @@ class TestLocateRequestPayload(testtools.TestCase):
)
self.assertEqual([], payload.attributes)
def test_read_missing_attributes_kmip_2_0(self):
"""
Test that an InvalidKmipEncoding error is raised during the decoding
of a Locate request payload when the attributes structure is missing
from the encoding.
"""
payload = payloads.LocateRequestPayload()
args = (self.no_attributes_encoding, )
kwargs = {"kmip_version": enums.KMIPVersion.KMIP_2_0}
self.assertRaisesRegex(
exceptions.InvalidKmipEncoding,
"The Locate request payload encoding is missing the attributes "
"structure.",
payload.read,
*args,
**kwargs
)
def test_read_missing_everything(self):
"""
Test that a Locate request payload can be read from a data stream
@ -616,6 +705,36 @@ class TestLocateRequestPayload(testtools.TestCase):
self.assertEqual(len(self.full_encoding), len(stream))
self.assertEqual(str(self.full_encoding), str(stream))
def test_write_kmip_2_0(self):
"""
Test that a Locate request payload can be written to a data stream
encoded with the KMIP 2.0 format.
"""
payload = payloads.LocateRequestPayload(
maximum_items=1,
offset_items=1,
storage_status_mask=3,
object_group_member=enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT,
attributes=[
objects.Attribute(
attribute_name=objects.Attribute.AttributeName(
"Object Type"
),
attribute_value=primitives.Enumeration(
enums.ObjectType,
value=enums.ObjectType.PUBLIC_KEY,
tag=enums.Tags.OBJECT_TYPE
)
)
]
)
stream = utils.BytearrayStream()
payload.write(stream, kmip_version=enums.KMIPVersion.KMIP_2_0)
self.assertEqual(len(self.full_encoding_with_attributes), len(stream))
self.assertEqual(str(self.full_encoding_with_attributes), str(stream))
def test_write_missing_maximum_items(self):
"""
Test that a Locate request payload can be written to a data stream
@ -758,6 +877,28 @@ class TestLocateRequestPayload(testtools.TestCase):
self.assertEqual(len(self.no_attributes_encoding), len(stream))
self.assertEqual(str(self.no_attributes_encoding), str(stream))
def test_write_missing_attributes_kmip_2_0(self):
"""
Test that an InvalidField error is raised during the encoding of a
Locate request payload when the payload is missing the attributes list.
"""
payload = payloads.LocateRequestPayload(
maximum_items=1,
offset_items=1,
storage_status_mask=3,
object_group_member=enums.ObjectGroupMember.GROUP_MEMBER_DEFAULT
)
args = (utils.BytearrayStream(), )
kwargs = {"kmip_version": enums.KMIPVersion.KMIP_2_0}
self.assertRaisesRegex(
exceptions.InvalidField,
"The Locate request payload is missing the attributes list.",
payload.write,
*args,
**kwargs
)
def test_write_missing_everything(self):
"""
Test that a Locate request payload can be written to a data stream