Add the ProfileInformation structure

This change adds the ProfileInformation structure, a KMIP 1.3
addition that is used to specify details for supported KMIP
profiles. A unit test suite is included to cover the new
structure.
This commit is contained in:
Peter Hamilton 2019-04-15 11:03:36 -04:00 committed by Peter Hamilton
parent 4b19fc81a3
commit b68312119f
2 changed files with 696 additions and 0 deletions

View File

@ -4500,3 +4500,244 @@ class RNGParameters(primitives.Struct):
return not (self == other)
else:
return NotImplemented
class ProfileInformation(primitives.Struct):
"""
A structure containing details of supported KMIP profiles.
This is intended for use with KMIP 1.3+.
Attributes:
profile_name: A ProfileName enumeration identifying the specific
profile supported.
server_uri: A string specifying a Uniform Resource Identifier that
points to the location of the server supporting the profile.
server_port: An integer specifying the port number to use when
accessing the server supporting the profile.
"""
def __init__(self, profile_name=None, server_uri=None, server_port=None):
"""
Construct a ProfileInformation structure.
Args:
profile_name (enum): A ProfileName enumeration identifying the
specific profile supported. Optional, defaults to None.
Required for read/write.
server_uri (string): A string specifying a Uniform Resource
Identifier that points to the location of the server
supporting the profile. Optional, defaults to None.
server_port (int): An integer specifying the port number to use
when accessing the server supporting the profile. Optional,
defaults to None.
"""
super(ProfileInformation, self).__init__(
tag=enums.Tags.PROFILE_INFORMATION
)
self._profile_name = None
self._server_uri = None
self._server_port = None
self.profile_name = profile_name
self.server_uri = server_uri
self.server_port = server_port
@property
def profile_name(self):
if self._profile_name:
return self._profile_name.value
return None
@profile_name.setter
def profile_name(self, value):
if value is None:
self._profile_name = None
elif isinstance(value, enums.ProfileName):
self._profile_name = primitives.Enumeration(
enums.ProfileName,
value=value,
tag=enums.Tags.PROFILE_NAME
)
else:
raise TypeError(
"The profile name must be a ProfileName enumeration."
)
@property
def server_uri(self):
if self._server_uri:
return self._server_uri.value
return None
@server_uri.setter
def server_uri(self, value):
if value is None:
self._server_uri = None
elif isinstance(value, six.string_types):
self._server_uri = primitives.TextString(
value=value,
tag=enums.Tags.SERVER_URI
)
else:
raise TypeError("The server URI must be a string.")
@property
def server_port(self):
if self._server_port:
return self._server_port.value
return None
@server_port.setter
def server_port(self, value):
if value is None:
self._server_port = None
elif isinstance(value, six.integer_types):
self._server_port = primitives.Integer(
value=value,
tag=enums.Tags.SERVER_PORT
)
else:
raise TypeError("The server port must be an integer.")
def read(self, input_buffer, kmip_version=enums.KMIPVersion.KMIP_1_3):
"""
Read the data encoding the ProfileInformation structure and decode it
into its constituent parts.
Args:
input_buffer (stream): A data stream containing encoded object
data, supporting a read method; usually a BytearrayStream
object.
kmip_version (KMIPVersion): An enumeration defining the KMIP
version with which the object will be decoded. Optional,
defaults to KMIP 2.0.
Raises:
InvalidKmipEncoding: Raised if the profile name is missing from
the encoding.
VersionNotSupported: Raised when a KMIP version is provided that
does not support the ProfileInformation structure.
"""
if kmip_version < enums.KMIPVersion.KMIP_1_3:
raise exceptions.VersionNotSupported(
"KMIP {} does not support the ProfileInformation "
"object.".format(
kmip_version.value
)
)
super(ProfileInformation, self).read(
input_buffer,
kmip_version=kmip_version
)
local_buffer = utils.BytearrayStream(input_buffer.read(self.length))
if self.is_tag_next(enums.Tags.PROFILE_NAME, local_buffer):
profile_name = primitives.Enumeration(
enums.ProfileName,
tag=enums.Tags.PROFILE_NAME
)
profile_name.read(local_buffer, kmip_version=kmip_version)
self._profile_name = profile_name
else:
raise exceptions.InvalidKmipEncoding(
"The ProfileInformation encoding is missing the profile name."
)
if self.is_tag_next(enums.Tags.SERVER_URI, local_buffer):
server_uri = primitives.TextString(tag=enums.Tags.SERVER_URI)
server_uri.read(local_buffer, kmip_version=kmip_version)
self._server_uri = server_uri
if self.is_tag_next(enums.Tags.SERVER_PORT, local_buffer):
server_port = primitives.Integer(tag=enums.Tags.SERVER_PORT)
server_port.read(local_buffer, kmip_version=kmip_version)
self._server_port = server_port
self.is_oversized(local_buffer)
def write(self, output_buffer, kmip_version=enums.KMIPVersion.KMIP_1_3):
"""
Write the ProfileInformation structure encoding to the data stream.
Args:
output_buffer (stream): A data stream in which to encode
ProfileInformation structure data, supporting a write method.
kmip_version (enum): A KMIPVersion enumeration defining the KMIP
version with which the object will be encoded. Optional,
defaults to KMIP 2.0.
Raises:
InvalidField: Raised if the profile name field is not defined.
VersionNotSupported: Raised when a KMIP version is provided that
does not support the ProfileInformation structure.
"""
if kmip_version < enums.KMIPVersion.KMIP_1_3:
raise exceptions.VersionNotSupported(
"KMIP {} does not support the ProfileInformation "
"object.".format(
kmip_version.value
)
)
local_buffer = BytearrayStream()
if self._profile_name:
self._profile_name.write(local_buffer, kmip_version=kmip_version)
else:
raise exceptions.InvalidField(
"The ProfileInformation structure is missing the profile "
"name field."
)
if self._server_uri:
self._server_uri.write(local_buffer, kmip_version=kmip_version)
if self._server_port:
self._server_port.write(local_buffer, kmip_version=kmip_version)
self.length = local_buffer.length()
super(ProfileInformation, self).write(
output_buffer,
kmip_version=kmip_version
)
output_buffer.write(local_buffer.buffer)
def __repr__(self):
n = "profile_name={}".format(self.profile_name)
u = 'server_uri="{}"'.format(self.server_uri)
p = "server_port={}".format(self.server_port)
v = ", ".join([n, u, p])
return "ProfileInformation({})".format(v)
def __str__(self):
n = '"profile_name": {}'.format(self.profile_name)
u = '"server_uri": "{}"'.format(self.server_uri)
p = '"server_port": {}'.format(self.server_port)
v = ", ".join([n, u, p])
return '{' + v + '}'
def __eq__(self, other):
if isinstance(other, ProfileInformation):
if self.profile_name != other.profile_name:
return False
elif self.server_uri != other.server_uri:
return False
elif self.server_port != other.server_port:
return False
else:
return True
else:
return NotImplemented
def __ne__(self, other):
if isinstance(other, ProfileInformation):
return not (self == other)
else:
return NotImplemented

View File

@ -6513,3 +6513,458 @@ class TestRNGParameters(testtools.TestCase):
self.assertTrue(a != b)
self.assertTrue(b != a)
class TestProfileInformation(testtools.TestCase):
def setUp(self):
super(TestProfileInformation, self).setUp()
# This encoding matches the following set of values:
#
# Profile Information
# Profile Name - BASELINE_SERVER_BASIC_KMIPv12
# Server URI - https://example.com
# Server Port - 5696
self.full_encoding = utils.BytearrayStream(
b'\x42\x00\xEB\x01\x00\x00\x00\x40'
b'\x42\x00\xEC\x05\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00'
b'\x42\x00\xED\x07\x00\x00\x00\x13'
b'\x68\x74\x74\x70\x73\x3A\x2F\x2F\x65\x78\x61\x6D\x70\x6C\x65\x2E'
b'\x63\x6F\x6D\x00\x00\x00\x00\x00'
b'\x42\x00\xEE\x02\x00\x00\x00\x04\x00\x00\x16\x40\x00\x00\x00\x00'
)
# This encoding matches the following set of values:
#
# Profile Information
# Server URI - https://example.com
# Server Port - 5696
self.no_profile_name_encoding = utils.BytearrayStream(
b'\x42\x00\xEB\x01\x00\x00\x00\x30'
b'\x42\x00\xED\x07\x00\x00\x00\x13'
b'\x68\x74\x74\x70\x73\x3A\x2F\x2F\x65\x78\x61\x6D\x70\x6C\x65\x2E'
b'\x63\x6F\x6D\x00\x00\x00\x00\x00'
b'\x42\x00\xEE\x02\x00\x00\x00\x04\x00\x00\x16\x40\x00\x00\x00\x00'
)
# This encoding matches the following set of values:
#
# Profile Information
# Profile Name - BASELINE_SERVER_BASIC_KMIPv12
self.only_profile_name_encoding = utils.BytearrayStream(
b'\x42\x00\xEB\x01\x00\x00\x00\x10'
b'\x42\x00\xEC\x05\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00'
)
def tearDown(self):
super(TestProfileInformation, self).tearDown()
def test_invalid_profile_name(self):
"""
Test that a TypeError is raised when an invalid value is used to set
the profile name of a ProfileInformation structure.
"""
kwargs = {"profile_name": "invalid"}
self.assertRaisesRegex(
TypeError,
"The profile name must be a ProfileName enumeration.",
objects.ProfileInformation,
**kwargs
)
args = (objects.ProfileInformation(), "profile_name", "invalid")
self.assertRaisesRegex(
TypeError,
"The profile name must be a ProfileName enumeration.",
setattr,
*args
)
def test_invalid_server_uri(self):
"""
Test that a TypeError is raised when an invalid value is used to set
the server URI of a ProfileInformation structure.
"""
kwargs = {"server_uri": 0}
self.assertRaisesRegex(
TypeError,
"The server URI must be a string.",
objects.ProfileInformation,
**kwargs
)
args = (objects.ProfileInformation(), "server_uri", 0)
self.assertRaisesRegex(
TypeError,
"The server URI must be a string.",
setattr,
*args
)
def test_invalid_server_port(self):
"""
Test that a TypeError is raised when an invalid value is used to set
the server port of a ProfileInformation structure.
"""
kwargs = {"server_port": "invalid"}
self.assertRaisesRegex(
TypeError,
"The server port must be an integer.",
objects.ProfileInformation,
**kwargs
)
args = (objects.ProfileInformation(), "server_port", "invalid")
self.assertRaisesRegex(
TypeError,
"The server port must be an integer.",
setattr,
*args
)
def test_read(self):
"""
Test that a ProfileInformation structure can be correctly read in
from a data stream.
"""
profile_information = objects.ProfileInformation()
self.assertIsNone(profile_information.profile_name)
self.assertIsNone(profile_information.server_uri)
self.assertIsNone(profile_information.server_port)
profile_information.read(
self.full_encoding,
kmip_version=enums.KMIPVersion.KMIP_1_3
)
self.assertEqual(
enums.ProfileName.BASELINE_SERVER_BASIC_KMIPv12,
profile_information.profile_name
)
self.assertEqual("https://example.com", profile_information.server_uri)
self.assertEqual(5696, profile_information.server_port)
def test_read_unsupported_kmip_version(self):
"""
Test that a VersionNotSupported error is raised during the decoding of
a ProfileInformation structure when the structure is read for an
unsupported KMIP version.
"""
profile_information = objects.ProfileInformation()
args = (self.full_encoding, )
kwargs = {"kmip_version": enums.KMIPVersion.KMIP_1_2}
self.assertRaisesRegex(
exceptions.VersionNotSupported,
"KMIP 1.2 does not support the ProfileInformation object.",
profile_information.read,
*args,
**kwargs
)
def test_read_missing_profile_name(self):
"""
Test that an InvalidKmipEncoding error is raised during the decoding
of a ProfileInformation structure when the profile name is missing
from the encoding.
"""
profile_information = objects.ProfileInformation()
args = (self.no_profile_name_encoding, )
self.assertRaisesRegex(
exceptions.InvalidKmipEncoding,
"The ProfileInformation encoding is missing the profile name.",
profile_information.read,
*args
)
def test_read_only_profile_name(self):
"""
Test that a ProfileInformation structure can be correctly read in
from a data stream even when missing all fields except the profile
name.
"""
profile_information = objects.ProfileInformation()
self.assertIsNone(profile_information.profile_name)
self.assertIsNone(profile_information.server_uri)
self.assertIsNone(profile_information.server_port)
profile_information.read(
self.only_profile_name_encoding,
kmip_version=enums.KMIPVersion.KMIP_1_3
)
self.assertEqual(
enums.ProfileName.BASELINE_SERVER_BASIC_KMIPv12,
profile_information.profile_name
)
self.assertIsNone(profile_information.server_uri)
self.assertIsNone(profile_information.server_port)
def test_write(self):
"""
Test that a ProfileInformation structure can be written to a data
stream.
"""
profile_information = objects.ProfileInformation(
profile_name=enums.ProfileName.BASELINE_SERVER_BASIC_KMIPv12,
server_uri="https://example.com",
server_port=5696
)
buffer = utils.BytearrayStream()
profile_information.write(
buffer,
kmip_version=enums.KMIPVersion.KMIP_1_3
)
self.assertEqual(len(self.full_encoding), len(buffer))
self.assertEqual(str(self.full_encoding), str(buffer))
def test_write_unsupported_kmip_version(self):
"""
Test that a VersionNotSupported error is raised during the encoding of
a ProfileInformation structure when the structure is written for an
unsupported KMIP version.
"""
profile_information = objects.ProfileInformation(
profile_name=enums.ProfileName.BASELINE_SERVER_BASIC_KMIPv12,
server_uri="https://example.com",
server_port=5696
)
args = (utils.BytearrayStream(), )
kwargs = {"kmip_version": enums.KMIPVersion.KMIP_1_2}
self.assertRaisesRegex(
exceptions.VersionNotSupported,
"KMIP 1.2 does not support the ProfileInformation object.",
profile_information.write,
*args,
**kwargs
)
def test_write_missing_profile_name(self):
"""
Test that an InvalidField error is raised during the encoding of an
ProfileInformation structure when the structure is missing the profile
name field.
"""
profile_information = objects.ProfileInformation()
args = (utils.BytearrayStream(), )
self.assertRaisesRegex(
exceptions.InvalidField,
"The ProfileInformation structure is missing the profile name "
"field.",
profile_information.write,
*args
)
def test_write_only_profile_name(self):
"""
Test that a ProfileInformation structure can be written to a data
stream even when missing all fields except the profile name.
"""
profile_information = objects.ProfileInformation(
profile_name=enums.ProfileName.BASELINE_SERVER_BASIC_KMIPv12
)
buffer = utils.BytearrayStream()
profile_information.write(
buffer,
kmip_version=enums.KMIPVersion.KMIP_1_3
)
self.assertEqual(len(self.only_profile_name_encoding), len(buffer))
self.assertEqual(str(self.only_profile_name_encoding), str(buffer))
def test_repr(self):
"""
Test that repr can be applied to a ProfileInformation structure.
"""
profile_information = objects.ProfileInformation(
profile_name=enums.ProfileName.BASELINE_SERVER_BASIC_KMIPv12,
server_uri="https://example.com",
server_port=5696
)
n = "profile_name=ProfileName.BASELINE_SERVER_BASIC_KMIPv12"
u = 'server_uri="https://example.com"'
p = "server_port=5696"
v = ", ".join([n, u, p])
self.assertEqual(
"ProfileInformation({})".format(v),
repr(profile_information)
)
def test_str(self):
"""
Test that str can be applied to a ProfileInformation structure.
"""
profile_information = objects.ProfileInformation(
profile_name=enums.ProfileName.BASELINE_SERVER_BASIC_KMIPv12,
server_uri="https://example.com",
server_port=5696
)
n = '"profile_name": ProfileName.BASELINE_SERVER_BASIC_KMIPv12'
u = '"server_uri": "https://example.com"'
p = '"server_port": 5696'
v = ", ".join([n, u, p])
self.assertEqual(
"{" + v + "}",
str(profile_information)
)
def test_equal_on_equal(self):
"""
Test that the equality operator returns True when comparing two
ProfileInformation structures with the same data.
"""
a = objects.ProfileInformation()
b = objects.ProfileInformation()
self.assertTrue(a == b)
self.assertTrue(b == a)
a = objects.ProfileInformation(
profile_name=enums.ProfileName.BASELINE_SERVER_BASIC_KMIPv12,
server_uri="https://example.com",
server_port=5696
)
b = objects.ProfileInformation(
profile_name=enums.ProfileName.BASELINE_SERVER_BASIC_KMIPv12,
server_uri="https://example.com",
server_port=5696
)
self.assertTrue(a == b)
self.assertTrue(b == a)
def test_equal_on_not_equal_profile_name(self):
"""
Test that the equality operator returns False when comparing two
ProfileInformation structures with different profile name fields.
"""
a = objects.ProfileInformation(
profile_name=enums.ProfileName.BASELINE_SERVER_BASIC_KMIPv12
)
b = objects.ProfileInformation(
profile_name=enums.ProfileName.TAPE_LIBRARY_CLIENT_KMIPv10
)
self.assertFalse(a == b)
self.assertFalse(b == a)
def test_equal_on_not_equal_server_uri(self):
"""
Test that the equality operator returns False when comparing two
ProfileInformation structures with different server URI fields.
"""
a = objects.ProfileInformation(server_uri="https://example.com")
b = objects.ProfileInformation(server_uri="https://test.com")
self.assertFalse(a == b)
self.assertFalse(b == a)
def test_equal_on_not_equal_server_port(self):
"""
Test that the equality operator returns False when comparing two
ProfileInformation structures with different server port fields.
"""
a = objects.ProfileInformation(server_port=5696)
b = objects.ProfileInformation(server_port=5697)
self.assertFalse(a == b)
self.assertFalse(b == a)
def test_equal_on_type_mismatch(self):
"""
Test that the equality operator returns False when comparing two
ProfileInformation structures with different types.
"""
a = objects.ProfileInformation()
b = "invalid"
self.assertFalse(a == b)
self.assertFalse(b == a)
def test_not_equal_on_equal(self):
"""
Test that the inequality operator returns False when comparing two
ProfileInformation structures with the same data.
"""
a = objects.ProfileInformation()
b = objects.ProfileInformation()
self.assertFalse(a != b)
self.assertFalse(b != a)
a = objects.ProfileInformation(
profile_name=enums.ProfileName.BASELINE_SERVER_BASIC_KMIPv12,
server_uri="https://example.com",
server_port=5696
)
b = objects.ProfileInformation(
profile_name=enums.ProfileName.BASELINE_SERVER_BASIC_KMIPv12,
server_uri="https://example.com",
server_port=5696
)
self.assertFalse(a != b)
self.assertFalse(b != a)
def test_not_equal_on_not_equal_profile_name(self):
"""
Test that the inequality operator returns True when comparing two
ProfileInformation structures with different profile name fields.
"""
a = objects.ProfileInformation(
profile_name=enums.ProfileName.BASELINE_SERVER_BASIC_KMIPv12
)
b = objects.ProfileInformation(
profile_name=enums.ProfileName.TAPE_LIBRARY_CLIENT_KMIPv10
)
self.assertTrue(a != b)
self.assertTrue(b != a)
def test_not_equal_on_not_equal_server_uri(self):
"""
Test that the inequality operator returns True when comparing two
ProfileInformation structures with different server URI fields.
"""
a = objects.ProfileInformation(server_uri="https://example.com")
b = objects.ProfileInformation(server_uri="https://test.com")
self.assertTrue(a != b)
self.assertTrue(b != a)
def test_not_equal_on_not_equal_server_port(self):
"""
Test that the inequality operator returns True when comparing two
ProfileInformation structures with different server port fields.
"""
a = objects.ProfileInformation(server_port=5696)
b = objects.ProfileInformation(server_port=5697)
self.assertTrue(a != b)
self.assertTrue(b != a)
def test_not_equal_on_type_mismatch(self):
"""
Test that the inequality operator returns True when comparing two
ProfileInformation structures with different types.
"""
a = objects.ProfileInformation()
b = "invalid"
self.assertTrue(a != b)
self.assertTrue(b != a)