Update the GetAttributeList payloads

This change makes minor updates to the GetAttributeList payloads,
fixing error messages, comments, and local variable names to
comply with the current payload format. The corresponding unit
test suite has been updated to reflect these changes.

This change prepares the GetAttributeList payloads for future
updates to support KMIP 2.0.
This commit is contained in:
Peter Hamilton 2019-04-04 10:42:39 -04:00 committed by Peter Hamilton
parent 35b2381341
commit 05f4d7aef4
2 changed files with 129 additions and 123 deletions

View File

@ -16,6 +16,7 @@
import six import six
from kmip.core import enums from kmip.core import enums
from kmip.core import exceptions
from kmip.core import primitives from kmip.core import primitives
from kmip.core import utils from kmip.core import utils
@ -66,59 +67,67 @@ class GetAttributeListRequestPayload(primitives.Struct):
tag=enums.Tags.UNIQUE_IDENTIFIER tag=enums.Tags.UNIQUE_IDENTIFIER
) )
else: else:
raise TypeError("unique identifier must be a string") raise TypeError("Unique identifier must be a string.")
def read(self, istream, kmip_version=enums.KMIPVersion.KMIP_1_0): def read(self, input_buffer, kmip_version=enums.KMIPVersion.KMIP_1_0):
""" """
Read the data encoding the GetAttributeList request payload and decode Read the data encoding the GetAttributeList request payload and decode
it into its constituent parts. it into its constituent parts.
Args: Args:
istream (stream): A data stream containing encoded object data, input_buffer (stream): A data stream containing encoded object
supporting a read method; usually a BytearrayStream object. data, supporting a read method; usually a BytearrayStream
object.
kmip_version (KMIPVersion): An enumeration defining the KMIP kmip_version (KMIPVersion): An enumeration defining the KMIP
version with which the object will be decoded. Optional, version with which the object will be decoded. Optional,
defaults to KMIP 1.0. defaults to KMIP 1.0.
""" """
super(GetAttributeListRequestPayload, self).read( super(GetAttributeListRequestPayload, self).read(
istream, input_buffer,
kmip_version=kmip_version kmip_version=kmip_version
) )
tstream = utils.BytearrayStream(istream.read(self.length)) local_buffer = utils.BytearrayStream(input_buffer.read(self.length))
if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, tstream): if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, local_buffer):
self._unique_identifier = primitives.TextString( self._unique_identifier = primitives.TextString(
tag=enums.Tags.UNIQUE_IDENTIFIER tag=enums.Tags.UNIQUE_IDENTIFIER
) )
self._unique_identifier.read(tstream, kmip_version=kmip_version) self._unique_identifier.read(
local_buffer,
kmip_version=kmip_version
)
else: else:
self._unique_identifier = None self._unique_identifier = None
self.is_oversized(tstream) self.is_oversized(local_buffer)
def write(self, ostream, kmip_version=enums.KMIPVersion.KMIP_1_0): def write(self, output_buffer, kmip_version=enums.KMIPVersion.KMIP_1_0):
""" """
Write the data encoding the GetAttributeList request payload to a Write the data encoding the GetAttributeList request payload to a
stream. stream.
Args: Args:
ostream (stream): A data stream in which to encode object data, output_buffer (stream): A data stream in which to encode object
supporting a write method; usually a BytearrayStream object. data, supporting a write method; usually a BytearrayStream
object.
kmip_version (KMIPVersion): An enumeration defining the KMIP kmip_version (KMIPVersion): An enumeration defining the KMIP
version with which the object will be encoded. Optional, version with which the object will be encoded. Optional,
defaults to KMIP 1.0. defaults to KMIP 1.0.
""" """
tstream = utils.BytearrayStream() local_buffer = utils.BytearrayStream()
if self._unique_identifier: if self._unique_identifier:
self._unique_identifier.write(tstream, kmip_version=kmip_version) self._unique_identifier.write(
local_buffer,
kmip_version=kmip_version
)
self.length = tstream.length() self.length = local_buffer.length()
super(GetAttributeListRequestPayload, self).write( super(GetAttributeListRequestPayload, self).write(
ostream, output_buffer,
kmip_version=kmip_version kmip_version=kmip_version
) )
ostream.write(tstream.buffer) output_buffer.write(local_buffer.buffer)
def __repr__(self): def __repr__(self):
uid = "unique_identifier={0}".format(self.unique_identifier) uid = "unique_identifier={0}".format(self.unique_identifier)
@ -199,7 +208,7 @@ class GetAttributeListResponsePayload(primitives.Struct):
tag=enums.Tags.UNIQUE_IDENTIFIER tag=enums.Tags.UNIQUE_IDENTIFIER
) )
else: else:
raise TypeError("unique identifier must be a string") raise TypeError("Unique identifier must be a string.")
@property @property
def attribute_names(self): def attribute_names(self):
@ -221,7 +230,7 @@ class GetAttributeListResponsePayload(primitives.Struct):
name = value[i] name = value[i]
if not isinstance(name, six.string_types): if not isinstance(name, six.string_types):
raise TypeError( raise TypeError(
"attribute_names must be a list of strings; " "Attribute names must be a list of strings; "
"item {0} has type {1}".format(i + 1, type(name)) "item {0} has type {1}".format(i + 1, type(name))
) )
if name not in names: if name not in names:
@ -235,69 +244,104 @@ class GetAttributeListResponsePayload(primitives.Struct):
) )
) )
else: else:
raise TypeError("attribute_names must be a list of strings") raise TypeError("Attribute names must be a list of strings.")
def read(self, istream, kmip_version=enums.KMIPVersion.KMIP_1_0): def read(self, input_buffer, kmip_version=enums.KMIPVersion.KMIP_1_0):
""" """
Read the data encoding the GetAttributeList response payload and Read the data encoding the GetAttributeList response payload and
decode it into its constituent parts. decode it into its constituent parts.
Args: Args:
istream (stream): A data stream containing encoded object data, input_buffer (stream): A data stream containing encoded object
supporting a read method; usually a BytearrayStream object. data, supporting a read method; usually a BytearrayStream
object.
kmip_version (KMIPVersion): An enumeration defining the KMIP kmip_version (KMIPVersion): An enumeration defining the KMIP
version with which the object will be decoded. Optional, version with which the object will be decoded. Optional,
defaults to KMIP 1.0. defaults to KMIP 1.0.
Raises:
InvalidKmipEncoding: Raised if the unique identifier or attribute
names are missing from the encoded payload.
""" """
super(GetAttributeListResponsePayload, self).read( super(GetAttributeListResponsePayload, self).read(
istream, input_buffer,
kmip_version=kmip_version kmip_version=kmip_version
) )
tstream = utils.BytearrayStream(istream.read(self.length)) local_buffer = utils.BytearrayStream(input_buffer.read(self.length))
if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, tstream): if self.is_tag_next(enums.Tags.UNIQUE_IDENTIFIER, local_buffer):
self._unique_identifier = primitives.TextString( self._unique_identifier = primitives.TextString(
tag=enums.Tags.UNIQUE_IDENTIFIER tag=enums.Tags.UNIQUE_IDENTIFIER
) )
self._unique_identifier.read(tstream, kmip_version=kmip_version) self._unique_identifier.read(
local_buffer,
kmip_version=kmip_version
)
else: else:
self._unique_identifier = None raise exceptions.InvalidKmipEncoding(
"The GetAttributeList response payload encoding is missing "
"the unique identifier."
)
names = list() names = list()
while self.is_tag_next(enums.Tags.ATTRIBUTE_NAME, tstream): while self.is_tag_next(enums.Tags.ATTRIBUTE_NAME, local_buffer):
name = primitives.TextString(tag=enums.Tags.ATTRIBUTE_NAME) name = primitives.TextString(tag=enums.Tags.ATTRIBUTE_NAME)
name.read(tstream, kmip_version=kmip_version) name.read(local_buffer, kmip_version=kmip_version)
names.append(name) names.append(name)
if len(names) == 0:
raise exceptions.InvalidKmipEncoding(
"The GetAttributeList response payload encoding is missing "
"the attribute names."
)
self._attribute_names = names self._attribute_names = names
self.is_oversized(tstream) self.is_oversized(local_buffer)
def write(self, ostream, kmip_version=enums.KMIPVersion.KMIP_1_0): def write(self, output_buffer, kmip_version=enums.KMIPVersion.KMIP_1_0):
""" """
Write the data encoding the GetAttributeList response payload to a Write the data encoding the GetAttributeList response payload to a
stream. stream.
Args: Args:
ostream (stream): A data stream in which to encode object data, output_buffer (stream): A data stream in which to encode object
supporting a write method; usually a BytearrayStream object. data, supporting a write method; usually a BytearrayStream
object.
kmip_version (KMIPVersion): An enumeration defining the KMIP kmip_version (KMIPVersion): An enumeration defining the KMIP
version with which the object will be encoded. Optional, version with which the object will be encoded. Optional,
defaults to KMIP 1.0. defaults to KMIP 1.0.
Raises:
InvalidField: Raised if the unique identifier or attribute name
are not defined.
""" """
tstream = utils.BytearrayStream() local_buffer = utils.BytearrayStream()
if self._unique_identifier: if self._unique_identifier:
self._unique_identifier.write(tstream, kmip_version=kmip_version) self._unique_identifier.write(
local_buffer,
kmip_version=kmip_version
)
else:
raise exceptions.InvalidField(
"The GetAttributeList response payload is missing the unique "
"identifier field."
)
for attribute_name in self._attribute_names: if self._attribute_names:
attribute_name.write(tstream, kmip_version=kmip_version) for attribute_name in self._attribute_names:
attribute_name.write(local_buffer, kmip_version=kmip_version)
else:
raise exceptions.InvalidField(
"The GetAttributeList response payload is missing the "
"attribute names field."
)
self.length = tstream.length() self.length = local_buffer.length()
super(GetAttributeListResponsePayload, self).write( super(GetAttributeListResponsePayload, self).write(
ostream, output_buffer,
kmip_version=kmip_version kmip_version=kmip_version
) )
ostream.write(tstream.buffer) output_buffer.write(local_buffer.buffer)
def __repr__(self): def __repr__(self):
unique_identifier = "unique_identifier={0}".format( unique_identifier = "unique_identifier={0}".format(

View File

@ -16,6 +16,7 @@
import testtools import testtools
from kmip.core import enums from kmip.core import enums
from kmip.core import exceptions
from kmip.core import primitives from kmip.core import primitives
from kmip.core import utils from kmip.core import utils
@ -93,7 +94,7 @@ class TestGetAttributeListRequestPayload(testtools.TestCase):
args = (payload, 'unique_identifier', 0) args = (payload, 'unique_identifier', 0)
self.assertRaisesRegex( self.assertRaisesRegex(
TypeError, TypeError,
"unique identifier must be a string", "Unique identifier must be a string.",
setattr, setattr,
*args *args
) )
@ -441,7 +442,7 @@ class TestGetAttributeListResponsePayload(testtools.TestCase):
args = (payload, 'unique_identifier', 0) args = (payload, 'unique_identifier', 0)
self.assertRaisesRegex( self.assertRaisesRegex(
TypeError, TypeError,
"unique identifier must be a string", "Unique identifier must be a string.",
setattr, setattr,
*args *args
) )
@ -489,7 +490,7 @@ class TestGetAttributeListResponsePayload(testtools.TestCase):
args = (payload, 'attribute_names', 0) args = (payload, 'attribute_names', 0)
self.assertRaisesRegex( self.assertRaisesRegex(
TypeError, TypeError,
"attribute_names must be a list of strings", "Attribute names must be a list of strings.",
setattr, setattr,
*args *args
) )
@ -508,7 +509,7 @@ class TestGetAttributeListResponsePayload(testtools.TestCase):
) )
self.assertRaisesRegex( self.assertRaisesRegex(
TypeError, TypeError,
"attribute_names must be a list of strings; " "Attribute names must be a list of strings; "
"item 2 has type {0}".format(type(0)), "item 2 has type {0}".format(type(0)),
setattr, setattr,
*args *args
@ -584,70 +585,43 @@ class TestGetAttributeListResponsePayload(testtools.TestCase):
def test_read_with_no_unique_identifier(self): def test_read_with_no_unique_identifier(self):
""" """
Test that a GetAttributeList response payload with no ID can be read Test that an InvalidKmipEncoding error is raised when a
from a data stream. GetAttributeList response payload is read from a data stream with no
unique identifier.
""" """
payload = payloads.GetAttributeListResponsePayload() payload = payloads.GetAttributeListResponsePayload()
self.assertEqual(None, payload._unique_identifier) self.assertEqual(None, payload._unique_identifier)
self.assertEqual(list(), payload._attribute_names) self.assertEqual(list(), payload._attribute_names)
payload.read(self.encoding_sans_unique_identifier) args = (self.encoding_sans_unique_identifier, )
self.assertRaisesRegex(
self.assertEqual(None, payload.unique_identifier) exceptions.InvalidKmipEncoding,
self.assertEqual(None, payload._unique_identifier) "The GetAttributeList response payload encoding is missing the "
self.assertEqual( "unique identifier.",
set(self.attribute_names), payload.read,
set(payload.attribute_names) *args
) )
for attribute_name in self.attribute_names:
self.assertIn(
primitives.TextString(
value=attribute_name,
tag=enums.Tags.ATTRIBUTE_NAME
),
payload._attribute_names
)
def test_read_with_no_attribute_names(self): def test_read_with_no_attribute_names(self):
""" """
Test that a GetAttributeList response payload with no attribute names Test that an InvalidKmipEncoding error is raised when a
can be read from a data stream. GetAttributeList response payload is read from a data stream with no
attribute names.
""" """
payload = payloads.GetAttributeListResponsePayload() payload = payloads.GetAttributeListResponsePayload()
self.assertEqual(None, payload._unique_identifier) self.assertEqual(None, payload._unique_identifier)
self.assertEqual(list(), payload._attribute_names) self.assertEqual(list(), payload._attribute_names)
payload.read(self.encoding_sans_attribute_names) args = (self.encoding_sans_attribute_names, )
self.assertRaisesRegex(
self.assertEqual(self.unique_identifier, payload.unique_identifier) exceptions.InvalidKmipEncoding,
self.assertEqual( "The GetAttributeList response payload encoding is missing the "
primitives.TextString( "attribute names.",
value=self.unique_identifier, payload.read,
tag=enums.Tags.UNIQUE_IDENTIFIER *args
),
payload._unique_identifier
) )
self.assertEqual(list(), payload.attribute_names)
self.assertEqual(list(), payload._attribute_names)
def test_read_with_no_content(self):
"""
Test that a GetAttributeList response payload with no ID or attribute
names can be read from a data stream.
"""
payload = payloads.GetAttributeListResponsePayload()
self.assertEqual(None, payload._unique_identifier)
self.assertEqual(list(), payload._attribute_names)
payload.read(self.empty_encoding)
self.assertEqual(None, payload.unique_identifier)
self.assertEqual(None, payload._unique_identifier)
self.assertEqual(list(), payload.attribute_names)
self.assertEqual(list(), payload._attribute_names)
def test_write(self): def test_write(self):
""" """
@ -666,51 +640,39 @@ class TestGetAttributeListResponsePayload(testtools.TestCase):
def test_write_with_no_unique_identifier(self): def test_write_with_no_unique_identifier(self):
""" """
Test that a GetAttributeList response payload with no ID can be Test that an InvalidField error is raised when a GetAttributeList
written to a data stream. response payload is written to a data stream with no unique identifier.
""" """
payload = payloads.GetAttributeListResponsePayload( payload = payloads.GetAttributeListResponsePayload(
None, None,
self.attribute_names self.attribute_names
) )
stream = utils.BytearrayStream() args = (utils.BytearrayStream(), )
payload.write(stream) self.assertRaisesRegex(
exceptions.InvalidField,
self.assertEqual( "The GetAttributeList response payload is missing the unique "
len(self.encoding_sans_unique_identifier), "identifier field.",
len(stream) payload.write,
) *args
self.assertEqual(
str(self.encoding_sans_unique_identifier),
str(stream)
) )
def test_write_with_no_attribute_names(self): def test_write_with_no_attribute_names(self):
""" """
Test that a GetAttributeList response payload with no attribute names Test that an InvalidField error is raised when a GetAttributeList
can be written to a data stream. response payload is written to a data stream with no attribute names.
""" """
payload = payloads.GetAttributeListResponsePayload( payload = payloads.GetAttributeListResponsePayload(
self.unique_identifier, self.unique_identifier,
None None
) )
stream = utils.BytearrayStream() args = (utils.BytearrayStream(), )
payload.write(stream) self.assertRaisesRegex(
exceptions.InvalidField,
self.assertEqual(len(self.encoding_sans_attribute_names), len(stream)) "The GetAttributeList response payload is missing the attribute "
self.assertEqual(str(self.encoding_sans_attribute_names), str(stream)) "names field.",
payload.write,
def test_write_with_no_content(self): *args
""" )
Test that a GetAttributeList response payload with no ID or attribute
names can be written to a data stream.
"""
payload = payloads.GetAttributeListResponsePayload()
stream = utils.BytearrayStream()
payload.write(stream)
self.assertEqual(len(self.empty_encoding), len(stream))
self.assertEqual(str(self.empty_encoding), str(stream))
def test_repr(self): def test_repr(self):
""" """