mirror of https://github.com/OpenKMIP/PyKMIP.git
Update the GetAttributes payloads to support KMIP 2.0
This change updates the GetAttributes payloads to support KMIP 2.0 features, including swapping out Attribute Names for the Attribute Reference structure in the request payload and the Attribute list for the Attributes structure in the response payload. Unit tests have been added to cover these changes.
This commit is contained in:
parent
568e87e89e
commit
0961687d66
|
@ -124,6 +124,10 @@ class GetAttributesRequestPayload(primitives.Struct):
|
|||
kmip_version (KMIPVersion): An enumeration defining the KMIP
|
||||
version with which the object will be decoded. Optional,
|
||||
defaults to KMIP 1.0.
|
||||
|
||||
Raises:
|
||||
InvalidKmipEncoding: Raised if an invalid type is found for the
|
||||
AttributeReference encoding for KMIP 2.0+ encodings.
|
||||
"""
|
||||
super(GetAttributesRequestPayload, self).read(
|
||||
input_buffer,
|
||||
|
@ -143,11 +147,45 @@ class GetAttributesRequestPayload(primitives.Struct):
|
|||
self._unique_identifier = None
|
||||
|
||||
names = list()
|
||||
while self.is_tag_next(enums.Tags.ATTRIBUTE_NAME, local_buffer):
|
||||
name = primitives.TextString(tag=enums.Tags.ATTRIBUTE_NAME)
|
||||
name.read(local_buffer, kmip_version=kmip_version)
|
||||
names.append(name)
|
||||
self._attribute_names = names
|
||||
if kmip_version < enums.KMIPVersion.KMIP_2_0:
|
||||
while self.is_tag_next(enums.Tags.ATTRIBUTE_NAME, local_buffer):
|
||||
name = primitives.TextString(tag=enums.Tags.ATTRIBUTE_NAME)
|
||||
name.read(local_buffer, kmip_version=kmip_version)
|
||||
names.append(name)
|
||||
self._attribute_names = names
|
||||
else:
|
||||
while self.is_tag_next(
|
||||
enums.Tags.ATTRIBUTE_REFERENCE,
|
||||
local_buffer
|
||||
):
|
||||
if self.is_type_next(enums.Types.STRUCTURE, local_buffer):
|
||||
reference = objects.AttributeReference()
|
||||
reference.read(local_buffer, kmip_version=kmip_version)
|
||||
names.append(
|
||||
primitives.TextString(
|
||||
value=reference.attribute_name,
|
||||
tag=enums.Tags.ATTRIBUTE_NAME
|
||||
)
|
||||
)
|
||||
elif self.is_type_next(enums.Types.ENUMERATION, local_buffer):
|
||||
reference = primitives.Enumeration(
|
||||
enums.Tags,
|
||||
tag=enums.Tags.ATTRIBUTE_REFERENCE
|
||||
)
|
||||
reference.read(local_buffer, kmip_version=kmip_version)
|
||||
name = enums.convert_attribute_tag_to_name(reference.value)
|
||||
names.append(
|
||||
primitives.TextString(
|
||||
value=name,
|
||||
tag=enums.Tags.ATTRIBUTE_NAME
|
||||
)
|
||||
)
|
||||
else:
|
||||
raise exceptions.InvalidKmipEncoding(
|
||||
"The GetAttributes request payload encoding contains "
|
||||
"an invalid AttributeReference type."
|
||||
)
|
||||
self._attribute_names = names
|
||||
|
||||
self.is_oversized(local_buffer)
|
||||
|
||||
|
@ -172,8 +210,25 @@ class GetAttributesRequestPayload(primitives.Struct):
|
|||
kmip_version=kmip_version
|
||||
)
|
||||
|
||||
for attribute_name in self._attribute_names:
|
||||
attribute_name.write(local_buffer, kmip_version=kmip_version)
|
||||
if kmip_version < enums.KMIPVersion.KMIP_2_0:
|
||||
for attribute_name in self._attribute_names:
|
||||
attribute_name.write(local_buffer, kmip_version=kmip_version)
|
||||
else:
|
||||
# NOTE (ph) This approach simplifies backwards compatible issues
|
||||
# but limits easy support for using AttributeReference
|
||||
# structures going forward, specifically limiting the
|
||||
# use of VendorIdentification for custom attributes.
|
||||
# If custom attributes need to be retrieved using
|
||||
# the GetAttributes operation for KMIP 2.0 applications
|
||||
# this code will need to change.
|
||||
for attribute_name in self._attribute_names:
|
||||
t = enums.convert_attribute_name_to_tag(attribute_name.value)
|
||||
e = primitives.Enumeration(
|
||||
enums.Tags,
|
||||
value=t,
|
||||
tag=enums.Tags.ATTRIBUTE_REFERENCE
|
||||
)
|
||||
e.write(local_buffer, kmip_version=kmip_version)
|
||||
|
||||
self.length = local_buffer.length()
|
||||
super(GetAttributesRequestPayload, self).write(
|
||||
|
@ -321,11 +376,26 @@ class GetAttributesResponsePayload(primitives.Struct):
|
|||
"unique identifier."
|
||||
)
|
||||
|
||||
self._attributes = list()
|
||||
while self.is_tag_next(enums.Tags.ATTRIBUTE, local_buffer):
|
||||
attribute = objects.Attribute()
|
||||
attribute.read(local_buffer, kmip_version=kmip_version)
|
||||
self._attributes.append(attribute)
|
||||
if kmip_version < enums.KMIPVersion.KMIP_2_0:
|
||||
self._attributes = list()
|
||||
while self.is_tag_next(enums.Tags.ATTRIBUTE, local_buffer):
|
||||
attribute = objects.Attribute()
|
||||
attribute.read(local_buffer, kmip_version=kmip_version)
|
||||
self._attributes.append(attribute)
|
||||
else:
|
||||
if self.is_tag_next(enums.Tags.ATTRIBUTES, local_buffer):
|
||||
attributes = objects.Attributes()
|
||||
attributes.read(local_buffer, kmip_version=kmip_version)
|
||||
# TODO (ph) Add a new utility to avoid using TemplateAttributes
|
||||
temp_attr = objects.convert_attributes_to_template_attribute(
|
||||
attributes
|
||||
)
|
||||
self._attributes = temp_attr.attributes
|
||||
else:
|
||||
raise exceptions.InvalidKmipEncoding(
|
||||
"The GetAttributes response payload encoding is missing "
|
||||
"the attributes structure."
|
||||
)
|
||||
|
||||
self.is_oversized(local_buffer)
|
||||
|
||||
|
@ -355,8 +425,24 @@ class GetAttributesResponsePayload(primitives.Struct):
|
|||
"identifier field."
|
||||
)
|
||||
|
||||
for attribute in self._attributes:
|
||||
attribute.write(local_buffer, kmip_version=kmip_version)
|
||||
if kmip_version < enums.KMIPVersion.KMIP_2_0:
|
||||
for attribute in self._attributes:
|
||||
attribute.write(local_buffer, kmip_version=kmip_version)
|
||||
else:
|
||||
if self._attributes:
|
||||
# TODO (ph) Add a new utility to avoid using TemplateAttributes
|
||||
template_attribute = objects.TemplateAttribute(
|
||||
attributes=self.attributes
|
||||
)
|
||||
attributes = objects.convert_template_attribute_to_attributes(
|
||||
template_attribute
|
||||
)
|
||||
attributes.write(local_buffer, kmip_version=kmip_version)
|
||||
else:
|
||||
raise exceptions.InvalidField(
|
||||
"The GetAttributes response payload is missing the "
|
||||
"attributes list."
|
||||
)
|
||||
|
||||
self.length = local_buffer.length()
|
||||
super(GetAttributesResponsePayload, self).write(
|
||||
|
|
|
@ -33,19 +33,105 @@ class TestGetAttributesRequestPayload(testtools.TestCase):
|
|||
|
||||
# Encodings taken from Sections 3.1.2 of the KMIP 1.1 testing
|
||||
# documentation.
|
||||
#
|
||||
# This encoding matches the following set of values:
|
||||
# Request Payload
|
||||
# Unique Identifier - 1703250b-4d40-4de2-93a0-c494a1d4ae40
|
||||
# Attribute Name - Object Group
|
||||
# Attribute Name - Application Specific Information
|
||||
# Attribute Name - Contact Information
|
||||
# Attribute Name - x-Purpose
|
||||
self.full_encoding = utils.BytearrayStream(
|
||||
b'\x42\x00\x79\x01\x00\x00\x00\xA8\x42\x00\x94\x07\x00\x00\x00\x24'
|
||||
b'\x42\x00\x79\x01\x00\x00\x00\xA8'
|
||||
b'\x42\x00\x94\x07\x00\x00\x00\x24'
|
||||
b'\x31\x37\x30\x33\x32\x35\x30\x62\x2D\x34\x64\x34\x30\x2D\x34\x64'
|
||||
b'\x65\x32\x2D\x39\x33\x61\x30\x2D\x63\x34\x39\x34\x61\x31\x64\x34'
|
||||
b'\x61\x65\x34\x30\x00\x00\x00\x00\x42\x00\x0A\x07\x00\x00\x00\x0C'
|
||||
b'\x61\x65\x34\x30\x00\x00\x00\x00'
|
||||
b'\x42\x00\x0A\x07\x00\x00\x00\x0C'
|
||||
b'\x4F\x62\x6A\x65\x63\x74\x20\x47\x72\x6F\x75\x70\x00\x00\x00\x00'
|
||||
b'\x42\x00\x0A\x07\x00\x00\x00\x20\x41\x70\x70\x6C\x69\x63\x61\x74'
|
||||
b'\x69\x6F\x6E\x20\x53\x70\x65\x63\x69\x66\x69\x63\x20\x49\x6E\x66'
|
||||
b'\x6F\x72\x6D\x61\x74\x69\x6F\x6E\x42\x00\x0A\x07\x00\x00\x00\x13'
|
||||
b'\x42\x00\x0A\x07\x00\x00\x00\x20'
|
||||
b'\x41\x70\x70\x6C\x69\x63\x61\x74\x69\x6F\x6E\x20\x53\x70\x65\x63'
|
||||
b'\x69\x66\x69\x63\x20\x49\x6E\x66\x6F\x72\x6D\x61\x74\x69\x6F\x6E'
|
||||
b'\x42\x00\x0A\x07\x00\x00\x00\x13'
|
||||
b'\x43\x6F\x6E\x74\x61\x63\x74\x20\x49\x6E\x66\x6F\x72\x6D\x61\x74'
|
||||
b'\x69\x6F\x6E\x00\x00\x00\x00\x00\x42\x00\x0A\x07\x00\x00\x00\x09'
|
||||
b'\x69\x6F\x6E\x00\x00\x00\x00\x00'
|
||||
b'\x42\x00\x0A\x07\x00\x00\x00\x09'
|
||||
b'\x78\x2D\x50\x75\x72\x70\x6F\x73\x65\x00\x00\x00\x00\x00\x00\x00'
|
||||
)
|
||||
|
||||
# Encodings adapted from Sections 3.1.2 of the KMIP 1.1 testing
|
||||
# documentation. Manually converted to the KMIP 2.0 format.
|
||||
#
|
||||
# This encoding matches the following set of values:
|
||||
# Request Payload
|
||||
# Unique Identifier - 1703250b-4d40-4de2-93a0-c494a1d4ae40
|
||||
# Attribute Reference - Object Group
|
||||
# Attribute Reference - Application Specific Information
|
||||
# Attribute Reference - Contact Information
|
||||
self.full_encoding_with_reference_enums = utils.BytearrayStream(
|
||||
b'\x42\x00\x79\x01\x00\x00\x00\x60'
|
||||
b'\x42\x00\x94\x07\x00\x00\x00\x24'
|
||||
b'\x31\x37\x30\x33\x32\x35\x30\x62\x2D\x34\x64\x34\x30\x2D\x34\x64'
|
||||
b'\x65\x32\x2D\x39\x33\x61\x30\x2D\x63\x34\x39\x34\x61\x31\x64\x34'
|
||||
b'\x61\x65\x34\x30\x00\x00\x00\x00'
|
||||
b'\x42\x01\x3B\x05\x00\x00\x00\x04\x00\x42\x00\x56\x00\x00\x00\x00'
|
||||
b'\x42\x01\x3B\x05\x00\x00\x00\x04\x00\x42\x00\x04\x00\x00\x00\x00'
|
||||
b'\x42\x01\x3B\x05\x00\x00\x00\x04\x00\x42\x00\x22\x00\x00\x00\x00'
|
||||
)
|
||||
|
||||
# Encodings adapted from Sections 3.1.2 of the KMIP 1.1 testing
|
||||
# documentation. Manually converted to the KMIP 2.0 format.
|
||||
#
|
||||
# This encoding matches the following set of values:
|
||||
# Request Payload
|
||||
# Unique Identifier - 1703250b-4d40-4de2-93a0-c494a1d4ae40
|
||||
# Attribute Reference
|
||||
# Vendor Identification -
|
||||
# Attribute Name - Object Group
|
||||
# Attribute Reference
|
||||
# Vendor Identification -
|
||||
# Attribute Name - Application Specific Information
|
||||
# Attribute Reference
|
||||
# Vendor Identification -
|
||||
# Attribute Name - Contact Information
|
||||
self.full_encoding_with_reference_structs = utils.BytearrayStream(
|
||||
b'\x42\x00\x79\x01\x00\x00\x00\xD0'
|
||||
b'\x42\x00\x94\x07\x00\x00\x00\x24'
|
||||
b'\x31\x37\x30\x33\x32\x35\x30\x62\x2D\x34\x64\x34\x30\x2D\x34\x64'
|
||||
b'\x65\x32\x2D\x39\x33\x61\x30\x2D\x63\x34\x39\x34\x61\x31\x64\x34'
|
||||
b'\x61\x65\x34\x30\x00\x00\x00\x00'
|
||||
b'\x42\x01\x3B\x01\x00\x00\x00\x20'
|
||||
b'\x42\x00\x9D\x07\x00\x00\x00\x00'
|
||||
b'\x42\x00\x0A\x07\x00\x00\x00\x0C'
|
||||
b'\x4F\x62\x6A\x65\x63\x74\x20\x47\x72\x6F\x75\x70\x00\x00\x00\x00'
|
||||
b'\x42\x01\x3B\x01\x00\x00\x00\x30'
|
||||
b'\x42\x00\x9D\x07\x00\x00\x00\x00'
|
||||
b'\x42\x00\x0A\x07\x00\x00\x00\x20'
|
||||
b'\x41\x70\x70\x6C\x69\x63\x61\x74\x69\x6F\x6E\x20\x53\x70\x65\x63'
|
||||
b'\x69\x66\x69\x63\x20\x49\x6E\x66\x6F\x72\x6D\x61\x74\x69\x6F\x6E'
|
||||
b'\x42\x01\x3B\x01\x00\x00\x00\x28'
|
||||
b'\x42\x00\x9D\x07\x00\x00\x00\x00'
|
||||
b'\x42\x00\x0A\x07\x00\x00\x00\x13'
|
||||
b'\x43\x6F\x6E\x74\x61\x63\x74\x20\x49\x6E\x66\x6F\x72\x6D\x61\x74'
|
||||
b'\x69\x6F\x6E\x00\x00\x00\x00\x00'
|
||||
)
|
||||
|
||||
# Encodings adapted from Sections 3.1.2 of the KMIP 1.1 testing
|
||||
# documentation. Manually converted to the KMIP 2.0 format.
|
||||
#
|
||||
# This encoding matches the following set of values:
|
||||
# Request Payload
|
||||
# Unique Identifier - 1703250b-4d40-4de2-93a0-c494a1d4ae40
|
||||
# Attribute Reference - Object Group --> "encoded" as a ByteString
|
||||
self.invalid_attribute_reference_encoding = utils.BytearrayStream(
|
||||
b'\x42\x00\x79\x01\x00\x00\x00\x40'
|
||||
b'\x42\x00\x94\x07\x00\x00\x00\x24'
|
||||
b'\x31\x37\x30\x33\x32\x35\x30\x62\x2D\x34\x64\x34\x30\x2D\x34\x64'
|
||||
b'\x65\x32\x2D\x39\x33\x61\x30\x2D\x63\x34\x39\x34\x61\x31\x64\x34'
|
||||
b'\x61\x65\x34\x30\x00\x00\x00\x00'
|
||||
b'\x42\x01\x3B\x08\x00\x00\x00\x04\x00\x42\x00\x56\x00\x00\x00\x00'
|
||||
)
|
||||
|
||||
self.encoding_sans_unique_identifier = utils.BytearrayStream(
|
||||
b'\x42\x00\x79\x01\x00\x00\x00\x78\x42\x00\x0A\x07\x00\x00\x00\x0C'
|
||||
b'\x4F\x62\x6A\x65\x63\x74\x20\x47\x72\x6F\x75\x70\x00\x00\x00\x00'
|
||||
|
@ -239,19 +325,12 @@ class TestGetAttributesRequestPayload(testtools.TestCase):
|
|||
"""
|
||||
payload = payloads.GetAttributesRequestPayload()
|
||||
|
||||
self.assertEqual(None, payload._unique_identifier)
|
||||
self.assertEqual(list(), payload._attribute_names)
|
||||
self.assertEqual(None, payload.unique_identifier)
|
||||
self.assertEqual(list(), payload.attribute_names)
|
||||
|
||||
payload.read(self.full_encoding)
|
||||
|
||||
self.assertEqual(self.unique_identifier, payload.unique_identifier)
|
||||
self.assertEqual(
|
||||
primitives.TextString(
|
||||
value=self.unique_identifier,
|
||||
tag=enums.Tags.UNIQUE_IDENTIFIER
|
||||
),
|
||||
payload._unique_identifier
|
||||
)
|
||||
self.assertEqual(
|
||||
set(self.attribute_names),
|
||||
set(payload.attribute_names)
|
||||
|
@ -265,6 +344,79 @@ class TestGetAttributesRequestPayload(testtools.TestCase):
|
|||
payload._attribute_names
|
||||
)
|
||||
|
||||
def test_read_kmip_2_0_enums(self):
|
||||
"""
|
||||
Test that a GetAttributes request payload can be read from a data
|
||||
stream encoded with the KMIP 2.0 format using AttributeReference
|
||||
enumerations.
|
||||
"""
|
||||
payload = payloads.GetAttributesRequestPayload()
|
||||
|
||||
self.assertEqual(None, payload.unique_identifier)
|
||||
self.assertEqual(list(), payload.attribute_names)
|
||||
|
||||
payload.read(
|
||||
self.full_encoding_with_reference_enums,
|
||||
kmip_version=enums.KMIPVersion.KMIP_2_0
|
||||
)
|
||||
|
||||
self.assertEqual(self.unique_identifier, payload.unique_identifier)
|
||||
self.assertEqual(3, len(payload.attribute_names))
|
||||
self.assertEqual(
|
||||
[
|
||||
"Object Group",
|
||||
"Application Specific Information",
|
||||
"Contact Information"
|
||||
],
|
||||
payload.attribute_names
|
||||
)
|
||||
|
||||
def test_read_kmip_2_0_structs(self):
|
||||
"""
|
||||
Test that a GetAttributes request payload can be read from a data
|
||||
stream encoded with the KMIP 2.0 format using AttributeReference
|
||||
structures.
|
||||
"""
|
||||
payload = payloads.GetAttributesRequestPayload()
|
||||
|
||||
self.assertEqual(None, payload.unique_identifier)
|
||||
self.assertEqual(list(), payload.attribute_names)
|
||||
|
||||
payload.read(
|
||||
self.full_encoding_with_reference_structs,
|
||||
kmip_version=enums.KMIPVersion.KMIP_2_0
|
||||
)
|
||||
|
||||
self.assertEqual(self.unique_identifier, payload.unique_identifier)
|
||||
self.assertEqual(3, len(payload.attribute_names))
|
||||
self.assertEqual(
|
||||
[
|
||||
"Object Group",
|
||||
"Application Specific Information",
|
||||
"Contact Information"
|
||||
],
|
||||
payload.attribute_names
|
||||
)
|
||||
|
||||
def test_read_kmip_2_0_invalid_attribute_reference(self):
|
||||
"""
|
||||
Test that an InvalidKmipEncoding error is raised during the decoding
|
||||
of a GetAttributes request payload when the wrong type is found for
|
||||
the AttributeReference structure.
|
||||
"""
|
||||
payload = payloads.GetAttributesRequestPayload()
|
||||
|
||||
args = (self.invalid_attribute_reference_encoding, )
|
||||
kwargs = {"kmip_version": enums.KMIPVersion.KMIP_2_0}
|
||||
self.assertRaisesRegex(
|
||||
exceptions.InvalidKmipEncoding,
|
||||
"The GetAttributes request payload encoding contains an invalid "
|
||||
"AttributeReference type.",
|
||||
payload.read,
|
||||
*args,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
def test_read_with_no_unique_identifier(self):
|
||||
"""
|
||||
Test that a GetAttributes request payload with no ID can be read
|
||||
|
@ -347,6 +499,32 @@ class TestGetAttributesRequestPayload(testtools.TestCase):
|
|||
self.assertEqual(len(self.full_encoding), len(stream))
|
||||
self.assertEqual(str(self.full_encoding), str(stream))
|
||||
|
||||
def test_write_kmip_2_0_enums(self):
|
||||
"""
|
||||
Test that a GetAttributes request payload can be written to a data
|
||||
stream encoded in the KMIP 2.0 format using AttributeReference
|
||||
enumerations.
|
||||
"""
|
||||
payload = payloads.GetAttributesRequestPayload(
|
||||
self.unique_identifier,
|
||||
[
|
||||
"Object Group",
|
||||
"Application Specific Information",
|
||||
"Contact Information"
|
||||
]
|
||||
)
|
||||
stream = utils.BytearrayStream()
|
||||
payload.write(stream, kmip_version=enums.KMIPVersion.KMIP_2_0)
|
||||
|
||||
self.assertEqual(
|
||||
len(self.full_encoding_with_reference_enums),
|
||||
len(stream)
|
||||
)
|
||||
self.assertEqual(
|
||||
str(self.full_encoding_with_reference_enums),
|
||||
str(stream)
|
||||
)
|
||||
|
||||
def test_write_with_no_unique_identifier(self):
|
||||
"""
|
||||
Test that a GetAttributes request payload with no ID can be written
|
||||
|
@ -707,28 +885,78 @@ class TestGetAttributesResponsePayload(testtools.TestCase):
|
|||
|
||||
# Encodings taken from Sections 3.1.2 of the KMIP 1.1 testing
|
||||
# documentation.
|
||||
#
|
||||
# Request Payload
|
||||
# Unique Identifier - 1703250b-4d40-4de2-93a0-c494a1d4ae40
|
||||
# Attribute
|
||||
# Attribute Name - Object Group
|
||||
# Attribute Value - Group1
|
||||
# Attribute
|
||||
# Attribute Name - Application Specific Information
|
||||
# Attribute Value
|
||||
# Application Namespace - ssl
|
||||
# Application Data - www.example.com
|
||||
# Attribute
|
||||
# Attribute Name - Contact Information
|
||||
# Attribute Value - Joe
|
||||
# Attribute
|
||||
# Attribute Name - x-Purpose
|
||||
# Attribute Value - demonstration
|
||||
self.full_encoding = utils.BytearrayStream(
|
||||
b'\x42\x00\x7C\x01\x00\x00\x01\x30\x42\x00\x94\x07\x00\x00\x00\x24'
|
||||
b'\x42\x00\x7C\x01\x00\x00\x01\x30'
|
||||
b'\x42\x00\x94\x07\x00\x00\x00\x24'
|
||||
b'\x31\x37\x30\x33\x32\x35\x30\x62\x2D\x34\x64\x34\x30\x2D\x34\x64'
|
||||
b'\x65\x32\x2D\x39\x33\x61\x30\x2D\x63\x34\x39\x34\x61\x31\x64\x34'
|
||||
b'\x61\x65\x34\x30\x00\x00\x00\x00\x42\x00\x08\x01\x00\x00\x00\x28'
|
||||
b'\x61\x65\x34\x30\x00\x00\x00\x00'
|
||||
b'\x42\x00\x08\x01\x00\x00\x00\x28'
|
||||
b'\x42\x00\x0A\x07\x00\x00\x00\x0C\x4F\x62\x6A\x65\x63\x74\x20\x47'
|
||||
b'\x72\x6F\x75\x70\x00\x00\x00\x00\x42\x00\x0B\x07\x00\x00\x00\x06'
|
||||
b'\x47\x72\x6F\x75\x70\x31\x00\x00\x42\x00\x08\x01\x00\x00\x00\x58'
|
||||
b'\x42\x00\x0A\x07\x00\x00\x00\x20\x41\x70\x70\x6C\x69\x63\x61\x74'
|
||||
b'\x69\x6F\x6E\x20\x53\x70\x65\x63\x69\x66\x69\x63\x20\x49\x6E\x66'
|
||||
b'\x6F\x72\x6D\x61\x74\x69\x6F\x6E\x42\x00\x0B\x01\x00\x00\x00\x28'
|
||||
b'\x72\x6F\x75\x70\x00\x00\x00\x00'
|
||||
b'\x42\x00\x0B\x07\x00\x00\x00\x06\x47\x72\x6F\x75\x70\x31\x00\x00'
|
||||
b'\x42\x00\x08\x01\x00\x00\x00\x58'
|
||||
b'\x42\x00\x0A\x07\x00\x00\x00\x20'
|
||||
b'\x41\x70\x70\x6C\x69\x63\x61\x74\x69\x6F\x6E\x20\x53\x70\x65\x63'
|
||||
b'\x69\x66\x69\x63\x20\x49\x6E\x66\x6F\x72\x6D\x61\x74\x69\x6F\x6E'
|
||||
b'\x42\x00\x0B\x01\x00\x00\x00\x28'
|
||||
b'\x42\x00\x03\x07\x00\x00\x00\x03\x73\x73\x6C\x00\x00\x00\x00\x00'
|
||||
b'\x42\x00\x02\x07\x00\x00\x00\x0F\x77\x77\x77\x2E\x65\x78\x61\x6D'
|
||||
b'\x70\x6C\x65\x2E\x63\x6F\x6D\x00\x42\x00\x08\x01\x00\x00\x00\x30'
|
||||
b'\x42\x00\x0A\x07\x00\x00\x00\x13\x43\x6F\x6E\x74\x61\x63\x74\x20'
|
||||
b'\x49\x6E\x66\x6F\x72\x6D\x61\x74\x69\x6F\x6E\x00\x00\x00\x00\x00'
|
||||
b'\x70\x6C\x65\x2E\x63\x6F\x6D\x00'
|
||||
b'\x42\x00\x08\x01\x00\x00\x00\x30'
|
||||
b'\x42\x00\x0A\x07\x00\x00\x00\x13'
|
||||
b'\x43\x6F\x6E\x74\x61\x63\x74\x20\x49\x6E\x66\x6F\x72\x6D\x61\x74'
|
||||
b'\x69\x6F\x6E\x00\x00\x00\x00\x00'
|
||||
b'\x42\x00\x0B\x07\x00\x00\x00\x03\x4A\x6F\x65\x00\x00\x00\x00\x00'
|
||||
b'\x42\x00\x08\x01\x00\x00\x00\x30\x42\x00\x0A\x07\x00\x00\x00\x09'
|
||||
b'\x42\x00\x08\x01\x00\x00\x00\x30'
|
||||
b'\x42\x00\x0A\x07\x00\x00\x00\x09'
|
||||
b'\x78\x2D\x50\x75\x72\x70\x6F\x73\x65\x00\x00\x00\x00\x00\x00\x00'
|
||||
b'\x42\x00\x0B\x07\x00\x00\x00\x0D\x64\x65\x6D\x6F\x6E\x73\x74\x72'
|
||||
b'\x61\x74\x69\x6F\x6E\x00\x00\x00'
|
||||
b'\x42\x00\x0B\x07\x00\x00\x00\x0D'
|
||||
b'\x64\x65\x6D\x6F\x6E\x73\x74\x72\x61\x74\x69\x6F\x6E\x00\x00\x00'
|
||||
)
|
||||
|
||||
# Encodings taken from Sections 3.1.2 of the KMIP 1.1 testing
|
||||
# documentation. Manually converted to the KMIP 2.0 format.
|
||||
#
|
||||
# Request Payload
|
||||
# Unique Identifier - 1703250b-4d40-4de2-93a0-c494a1d4ae40
|
||||
# Attributes
|
||||
# Object Group - Group1
|
||||
# Application Specific Information
|
||||
# Application Namespace - ssl
|
||||
# Application Data - www.example.com
|
||||
# Contact Information - Joe
|
||||
self.full_encoding_with_attributes = utils.BytearrayStream(
|
||||
b'\x42\x00\x7C\x01\x00\x00\x00\x78'
|
||||
b'\x42\x00\x94\x07\x00\x00\x00\x24'
|
||||
b'\x31\x37\x30\x33\x32\x35\x30\x62\x2D\x34\x64\x34\x30\x2D\x34\x64'
|
||||
b'\x65\x32\x2D\x39\x33\x61\x30\x2D\x63\x34\x39\x34\x61\x31\x64\x34'
|
||||
b'\x61\x65\x34\x30\x00\x00\x00\x00'
|
||||
b'\x42\x01\x25\x01\x00\x00\x00\x40'
|
||||
b'\x42\x00\x04\x01\x00\x00\x00\x28'
|
||||
b'\x42\x00\x03\x07\x00\x00\x00\x03\x73\x73\x6C\x00\x00\x00\x00\x00'
|
||||
b'\x42\x00\x02\x07\x00\x00\x00\x0F\x77\x77\x77\x2E\x65\x78\x61\x6D'
|
||||
b'\x70\x6C\x65\x2E\x63\x6F\x6D\x00'
|
||||
b'\x42\x00\x22\x07\x00\x00\x00\x03\x4A\x6F\x65\x00\x00\x00\x00\x00'
|
||||
)
|
||||
|
||||
self.encoding_sans_unique_identifier = utils.BytearrayStream(
|
||||
b'\x42\x00\x7C\x01\x00\x00\x01\x00\x42\x00\x08\x01\x00\x00\x00\x28'
|
||||
b'\x42\x00\x0A\x07\x00\x00\x00\x0C\x4F\x62\x6A\x65\x63\x74\x20\x47'
|
||||
|
@ -905,13 +1133,6 @@ class TestGetAttributesResponsePayload(testtools.TestCase):
|
|||
payload.read(self.full_encoding)
|
||||
|
||||
self.assertEqual(self.unique_identifier, payload.unique_identifier)
|
||||
self.assertEqual(
|
||||
primitives.TextString(
|
||||
value=self.unique_identifier,
|
||||
tag=enums.Tags.UNIQUE_IDENTIFIER
|
||||
),
|
||||
payload._unique_identifier
|
||||
)
|
||||
self.assertEqual(
|
||||
len(self.attributes),
|
||||
len(payload.attributes)
|
||||
|
@ -922,6 +1143,45 @@ class TestGetAttributesResponsePayload(testtools.TestCase):
|
|||
payload._attributes
|
||||
)
|
||||
|
||||
def test_read_kmip_2_0(self):
|
||||
"""
|
||||
Test that a GetAttributes response payload can be read from a data
|
||||
stream encoded with the KMIP 2.0 format.
|
||||
"""
|
||||
payload = payloads.GetAttributesResponsePayload()
|
||||
|
||||
self.assertEqual(None, payload.unique_identifier)
|
||||
self.assertEqual(list(), payload.attributes)
|
||||
|
||||
payload.read(
|
||||
self.full_encoding_with_attributes,
|
||||
kmip_version=enums.KMIPVersion.KMIP_2_0
|
||||
)
|
||||
|
||||
self.assertEqual(self.unique_identifier, payload.unique_identifier)
|
||||
self.assertEqual(2, len(payload.attributes))
|
||||
self.assertIn(
|
||||
objects.Attribute(
|
||||
attribute_name=objects.Attribute.AttributeName(
|
||||
'Application Specific Information'
|
||||
),
|
||||
attribute_value=attributes.ApplicationSpecificInformation(
|
||||
attributes.ApplicationNamespace('ssl'),
|
||||
attributes.ApplicationData('www.example.com')
|
||||
)
|
||||
),
|
||||
payload.attributes
|
||||
)
|
||||
self.assertIn(
|
||||
objects.Attribute(
|
||||
attribute_name=objects.Attribute.AttributeName(
|
||||
'Contact Information'
|
||||
),
|
||||
attribute_value=attributes.ContactInformation('Joe')
|
||||
),
|
||||
payload.attributes
|
||||
)
|
||||
|
||||
def test_read_with_no_unique_identifier(self):
|
||||
"""
|
||||
Test that an InvalidKmipEncoding error gets raised when attempting to
|
||||
|
@ -964,6 +1224,25 @@ class TestGetAttributesResponsePayload(testtools.TestCase):
|
|||
self.assertEqual(list(), payload.attributes)
|
||||
self.assertEqual(list(), payload._attributes)
|
||||
|
||||
def test_read_missing_attributes(self):
|
||||
"""
|
||||
Test that an InvalidKmipEncoding error is raised during the decoding
|
||||
of a GetAttributes response payload when the attributes structure is
|
||||
missing from the encoding.
|
||||
"""
|
||||
payload = payloads.GetAttributesResponsePayload()
|
||||
|
||||
args = (self.encoding_sans_attributes, )
|
||||
kwargs = {"kmip_version": enums.KMIPVersion.KMIP_2_0}
|
||||
self.assertRaisesRegex(
|
||||
exceptions.InvalidKmipEncoding,
|
||||
"The GetAttributes response payload encoding is missing the "
|
||||
"attributes structure.",
|
||||
payload.read,
|
||||
*args,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
def test_write(self):
|
||||
"""
|
||||
Test that a GetAttributes response payload can be written to a data
|
||||
|
@ -979,6 +1258,37 @@ class TestGetAttributesResponsePayload(testtools.TestCase):
|
|||
self.assertEqual(len(self.full_encoding), len(stream))
|
||||
self.assertEqual(str(self.full_encoding), str(stream))
|
||||
|
||||
def test_write_kmip_2_0(self):
|
||||
"""
|
||||
Test that a GetAttributes response payload can be written to a data
|
||||
stream encoded with the KMIP 2.0 format.
|
||||
"""
|
||||
payload = payloads.GetAttributesResponsePayload(
|
||||
self.unique_identifier,
|
||||
[
|
||||
objects.Attribute(
|
||||
attribute_name=objects.Attribute.AttributeName(
|
||||
'Application Specific Information'
|
||||
),
|
||||
attribute_value=attributes.ApplicationSpecificInformation(
|
||||
attributes.ApplicationNamespace('ssl'),
|
||||
attributes.ApplicationData('www.example.com')
|
||||
)
|
||||
),
|
||||
objects.Attribute(
|
||||
attribute_name=objects.Attribute.AttributeName(
|
||||
'Contact Information'
|
||||
),
|
||||
attribute_value=attributes.ContactInformation('Joe')
|
||||
)
|
||||
]
|
||||
)
|
||||
stream = utils.BytearrayStream()
|
||||
payload.write(stream, kmip_version=enums.KMIPVersion.KMIP_2_0)
|
||||
|
||||
self.assertEqual(len(self.full_encoding_with_attributes), len(stream))
|
||||
self.assertEqual(str(self.full_encoding_with_attributes), str(stream))
|
||||
|
||||
def test_write_with_no_unique_identifier(self):
|
||||
"""
|
||||
Test that a GetAttributes request payload with no ID can be written
|
||||
|
@ -1014,6 +1324,25 @@ class TestGetAttributesResponsePayload(testtools.TestCase):
|
|||
self.assertEqual(len(self.encoding_sans_attributes), len(stream))
|
||||
self.assertEqual(str(self.encoding_sans_attributes), str(stream))
|
||||
|
||||
def test_write_missing_attributes_kmip_2_0(self):
|
||||
"""
|
||||
Test that an InvalidField error is raised during the encoding of a
|
||||
GetAttributes request payload when the payload is missing the
|
||||
attributes list.
|
||||
"""
|
||||
payload = payloads.GetAttributesResponsePayload(self.unique_identifier)
|
||||
|
||||
args = (utils.BytearrayStream(), )
|
||||
kwargs = {"kmip_version": enums.KMIPVersion.KMIP_2_0}
|
||||
self.assertRaisesRegex(
|
||||
exceptions.InvalidField,
|
||||
"The GetAttributes response payload is missing the attributes "
|
||||
"list.",
|
||||
payload.write,
|
||||
*args,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
def test_repr(self):
|
||||
"""
|
||||
Test that repr can be applied to a GetAttributes response payload.
|
||||
|
|
Loading…
Reference in New Issue