Add support for the ProtectionStorageMasks structure

This change adds support for the ProtectionStorageMasks structure
which is a new addition in KMIP 2.0. A unit test suite has been
added for the new structure.
This commit is contained in:
Peter Hamilton 2019-05-03 15:52:09 -04:00 committed by Peter Hamilton
parent b4c3980da0
commit cd16b20a6b
2 changed files with 617 additions and 0 deletions

View File

@ -6046,3 +6046,199 @@ class CapabilityInformation(primitives.Struct):
return not (self == other)
else:
return NotImplemented
class ProtectionStorageMasks(primitives.Struct):
"""
A structure containing a list of protection storage masks.
This is intended for use with KMIP 2.0+.
Attributes:
protection_storage_masks: A list of integers representing
combined sets of ProtectionStorageMask enumerations detailing
the storage protections supported by the server.
"""
def __init__(self,
protection_storage_masks=None,
tag=enums.Tags.PROTECTION_STORAGE_MASKS):
"""
Construct a ProtectionStorageMasks structure.
Args:
protection_storage_masks (list): A list of integers representing
combined sets of ProtectionStorageMask enumerations detailing
the storage protections supported by the server. Optional,
defaults to None.
tag (enum): A Tags enumeration specifying which type of collection
this of protection storage masks this object represents.
Optional, defaults to Tags.PROTECTION_STORAGE_MASKS.
"""
super(ProtectionStorageMasks, self).__init__(tag=tag)
self._protection_storage_masks = None
self.protection_storage_masks = protection_storage_masks
@property
def protection_storage_masks(self):
if self._protection_storage_masks:
return [x.value for x in self._protection_storage_masks]
return None
@protection_storage_masks.setter
def protection_storage_masks(self, value):
if value is None:
self._protection_storage_masks = None
elif isinstance(value, list):
protection_storage_masks = []
for x in value:
if isinstance(x, six.integer_types):
if enums.is_bit_mask(enums.ProtectionStorageMask, x):
protection_storage_masks.append(
primitives.Integer(
value=x,
tag=enums.Tags.PROTECTION_STORAGE_MASK
)
)
else:
raise TypeError(
"The protection storage masks must be a list of "
"integers representing combinations of "
"ProtectionStorageMask enumerations."
)
else:
raise TypeError(
"The protection storage masks must be a list of "
"integers representing combinations of "
"ProtectionStorageMask enumerations."
)
self._protection_storage_masks = protection_storage_masks
else:
raise TypeError(
"The protection storage masks must be a list of "
"integers representing combinations of "
"ProtectionStorageMask enumerations."
)
def read(self, input_buffer, kmip_version=enums.KMIPVersion.KMIP_2_0):
"""
Read the data encoding the ProtectionStorageMasks structure and decode
it into its constituent parts.
Args:
input_buffer (stream): A data stream containing encoded object
data, supporting a read method; usually a BytearrayStream
object.
kmip_version (KMIPVersion): An enumeration defining the KMIP
version with which the object will be decoded. Optional,
defaults to KMIP 2.0.
Raises:
VersionNotSupported: Raised when a KMIP version is provided that
does not support the ProtectionStorageMasks structure.
"""
if kmip_version < enums.KMIPVersion.KMIP_2_0:
raise exceptions.VersionNotSupported(
"KMIP {} does not support the ProtectionStorageMasks "
"object.".format(
kmip_version.value
)
)
super(ProtectionStorageMasks, self).read(
input_buffer,
kmip_version=kmip_version
)
local_buffer = utils.BytearrayStream(input_buffer.read(self.length))
protection_storage_masks = []
while self.is_tag_next(
enums.Tags.PROTECTION_STORAGE_MASK,
local_buffer
):
protection_storage_mask = primitives.Integer(
tag=enums.Tags.PROTECTION_STORAGE_MASK
)
protection_storage_mask.read(
local_buffer,
kmip_version=kmip_version
)
protection_storage_masks.append(protection_storage_mask)
self._protection_storage_masks = protection_storage_masks
self.is_oversized(local_buffer)
def write(self, output_buffer, kmip_version=enums.KMIPVersion.KMIP_2_0):
"""
Write the ProtectionStorageMasks structure encoding to the data stream.
Args:
output_buffer (stream): A data stream in which to encode
CapabilityInformation structure data, supporting a write
method.
kmip_version (enum): A KMIPVersion enumeration defining the KMIP
version with which the object will be encoded. Optional,
defaults to KMIP 2.0.
Raises:
VersionNotSupported: Raised when a KMIP version is provided that
does not support the ProtectionStorageMasks structure.
"""
if kmip_version < enums.KMIPVersion.KMIP_2_0:
raise exceptions.VersionNotSupported(
"KMIP {} does not support the ProtectionStorageMasks "
"object.".format(
kmip_version.value
)
)
local_buffer = BytearrayStream()
if self._protection_storage_masks:
for protection_storage_mask in self._protection_storage_masks:
protection_storage_mask.write(
local_buffer,
kmip_version=kmip_version
)
self.length = local_buffer.length()
super(ProtectionStorageMasks, self).write(
output_buffer,
kmip_version=kmip_version
)
output_buffer.write(local_buffer.buffer)
def __repr__(self):
v = "protection_storage_masks={}".format(
"[{}]".format(
", ".join(str(x) for x in self.protection_storage_masks)
) if self._protection_storage_masks else None
)
return "ProtectionStorageMasks({})".format(v)
def __str__(self):
v = '"protection_storage_masks": {}'.format(
"[{}]".format(
", ".join(str(x) for x in self.protection_storage_masks)
) if self._protection_storage_masks else None
)
return '{' + v + '}'
def __eq__(self, other):
if isinstance(other, ProtectionStorageMasks):
if self.protection_storage_masks != other.protection_storage_masks:
return False
else:
return True
else:
return NotImplemented
def __ne__(self, other):
if isinstance(other, ProtectionStorageMasks):
return not (self == other)
else:
return NotImplemented

View File

@ -9371,3 +9371,424 @@ class TestCapabilityInformation(testtools.TestCase):
self.assertTrue(a != b)
self.assertTrue(b != a)
class TestProtectionStorageMasks(testtools.TestCase):
def setUp(self):
super(TestProtectionStorageMasks, self).setUp()
# This encoding matches the following set of values:
#
# Protection Storage Masks
# Protection Storage Mask - Software | Hardware
# Protection Storage Mask - On Premises | Off Premises
self.full_encoding = utils.BytearrayStream(
b'\x42\x01\x5F\x01\x00\x00\x00\x20'
b'\x42\x01\x5E\x02\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00'
b'\x42\x01\x5E\x02\x00\x00\x00\x04\x00\x00\x03\x00\x00\x00\x00\x00'
)
# This encoding matches the following set of values:
#
# Protection Storage Masks
self.empty_encoding = utils.BytearrayStream(
b'\x42\x01\x5F\x01\x00\x00\x00\x00'
)
def tearDown(self):
super(TestProtectionStorageMasks, self).tearDown()
def test_invalid_protection_storage_masks(self):
"""
Test that a TypeError is raised when an invalid value is used to set
the protection storage masks of a ProtectionStorageMasks structure.
"""
kwargs = {"protection_storage_masks": "invalid"}
self.assertRaisesRegex(
TypeError,
"The protection storage masks must be a list of integers "
"representing combinations of ProtectionStorageMask enumerations.",
objects.ProtectionStorageMasks,
**kwargs
)
kwargs = {"protection_storage_masks": ["invalid"]}
self.assertRaisesRegex(
TypeError,
"The protection storage masks must be a list of integers "
"representing combinations of ProtectionStorageMask enumerations.",
objects.ProtectionStorageMasks,
**kwargs
)
kwargs = {"protection_storage_masks": [0x10000000]}
self.assertRaisesRegex(
TypeError,
"The protection storage masks must be a list of integers "
"representing combinations of ProtectionStorageMask enumerations.",
objects.ProtectionStorageMasks,
**kwargs
)
args = (
objects.ProtectionStorageMasks(),
"protection_storage_masks",
"invalid"
)
self.assertRaisesRegex(
TypeError,
"The protection storage masks must be a list of integers "
"representing combinations of ProtectionStorageMask enumerations.",
setattr,
*args
)
args = (
objects.ProtectionStorageMasks(),
"protection_storage_masks",
["invalid"]
)
self.assertRaisesRegex(
TypeError,
"The protection storage masks must be a list of integers "
"representing combinations of ProtectionStorageMask enumerations.",
setattr,
*args
)
args = (
objects.ProtectionStorageMasks(),
"protection_storage_masks",
[0x10000000]
)
self.assertRaisesRegex(
TypeError,
"The protection storage masks must be a list of integers "
"representing combinations of ProtectionStorageMask enumerations.",
setattr,
*args
)
def test_read(self):
"""
Test that a ProtectionStorageMasks structure can be correctly read in
from a data stream.
"""
protection_storage_masks = objects.ProtectionStorageMasks()
self.assertIsNone(protection_storage_masks.protection_storage_masks)
protection_storage_masks.read(self.full_encoding)
self.assertEqual(
[0x03, 0x0300],
protection_storage_masks.protection_storage_masks
)
def test_read_unsupported_kmip_version(self):
"""
Test that a VersionNotSupported error is raised during the decoding of
a ProtectionStorageMasks structure when the structure is read for an
unsupported KMIP version.
"""
protection_storage_masks = objects.ProtectionStorageMasks()
args = (self.full_encoding, )
kwargs = {"kmip_version": enums.KMIPVersion.KMIP_1_2}
self.assertRaisesRegex(
exceptions.VersionNotSupported,
"KMIP 1.2 does not support the ProtectionStorageMasks object.",
protection_storage_masks.read,
*args,
**kwargs
)
def test_read_empty(self):
"""
Test that a ProtectionStorageMasks structure can be correctly read in
from an empty data stream.
"""
protection_storage_masks = objects.ProtectionStorageMasks()
self.assertIsNone(protection_storage_masks.protection_storage_masks)
protection_storage_masks.read(self.empty_encoding)
self.assertIsNone(protection_storage_masks.protection_storage_masks)
def test_write(self):
"""
Test that a ProtectionStorageMasks structure can be written to a data
stream.
"""
protection_storage_masks = objects.ProtectionStorageMasks(
protection_storage_masks=[
(
enums.ProtectionStorageMask.SOFTWARE.value |
enums.ProtectionStorageMask.HARDWARE.value
),
(
enums.ProtectionStorageMask.ON_PREMISES.value |
enums.ProtectionStorageMask.OFF_PREMISES.value
)
]
)
buffer = utils.BytearrayStream()
protection_storage_masks.write(buffer)
self.assertEqual(len(self.full_encoding), len(buffer))
self.assertEqual(str(self.full_encoding), str(buffer))
def test_write_unsupported_kmip_version(self):
"""
Test that a VersionNotSupported error is raised during the encoding of
a ProtectionStorageMasks structure when the structure is written for an
unsupported KMIP version.
"""
protection_storage_masks = objects.ProtectionStorageMasks(
protection_storage_masks=[
(
enums.ProtectionStorageMask.SOFTWARE.value |
enums.ProtectionStorageMask.HARDWARE.value
),
(
enums.ProtectionStorageMask.ON_PREMISES.value |
enums.ProtectionStorageMask.OFF_PREMISES.value
)
]
)
args = (utils.BytearrayStream(), )
kwargs = {"kmip_version": enums.KMIPVersion.KMIP_1_2}
self.assertRaisesRegex(
exceptions.VersionNotSupported,
"KMIP 1.2 does not support the ProtectionStorageMasks object.",
protection_storage_masks.write,
*args,
**kwargs
)
def test_write_empty(self):
"""
Test that an empty ProtectionStorageMasks structure can be correctly
written to a data stream.
"""
protection_storage_masks = objects.ProtectionStorageMasks()
buffer = utils.BytearrayStream()
protection_storage_masks.write(buffer)
self.assertEqual(len(self.empty_encoding), len(buffer))
self.assertEqual(str(self.empty_encoding), str(buffer))
def test_repr(self):
"""
Test that repr can be applied to a ProtectionStorageMasks structure.
"""
protection_storage_masks = objects.ProtectionStorageMasks(
protection_storage_masks=[
(
enums.ProtectionStorageMask.SOFTWARE.value |
enums.ProtectionStorageMask.HARDWARE.value
),
(
enums.ProtectionStorageMask.ON_PREMISES.value |
enums.ProtectionStorageMask.OFF_PREMISES.value
)
]
)
v = "protection_storage_masks=[3, 768]"
self.assertEqual(
"ProtectionStorageMasks({})".format(v),
repr(protection_storage_masks)
)
def test_str(self):
"""
Test that str can be applied to a ProtectionStorageMasks structure.
"""
protection_storage_masks = objects.ProtectionStorageMasks(
protection_storage_masks=[
(
enums.ProtectionStorageMask.SOFTWARE.value |
enums.ProtectionStorageMask.HARDWARE.value
),
(
enums.ProtectionStorageMask.ON_PREMISES.value |
enums.ProtectionStorageMask.OFF_PREMISES.value
)
]
)
v = '"protection_storage_masks": [3, 768]'
self.assertEqual(
"{" + v + "}",
str(protection_storage_masks)
)
def test_equal_on_equal(self):
"""
Test that the equality operator returns True when comparing two
ProtectionStorageMasks structures with the same data.
"""
a = objects.ProtectionStorageMasks()
b = objects.ProtectionStorageMasks()
self.assertTrue(a == b)
self.assertTrue(b == a)
a = objects.ProtectionStorageMasks(
protection_storage_masks=[
(
enums.ProtectionStorageMask.SOFTWARE.value |
enums.ProtectionStorageMask.HARDWARE.value
),
(
enums.ProtectionStorageMask.ON_PREMISES.value |
enums.ProtectionStorageMask.OFF_PREMISES.value
)
]
)
b = objects.ProtectionStorageMasks(
protection_storage_masks=[
(
enums.ProtectionStorageMask.SOFTWARE.value |
enums.ProtectionStorageMask.HARDWARE.value
),
(
enums.ProtectionStorageMask.ON_PREMISES.value |
enums.ProtectionStorageMask.OFF_PREMISES.value
)
]
)
self.assertTrue(a == b)
self.assertTrue(b == a)
def test_equal_on_not_equal_protection_storage_masks(self):
"""
Test that the equality operator returns False when comparing two
ProtectionStorageMasks structures with different protection storage
masks fields.
"""
a = objects.ProtectionStorageMasks(
protection_storage_masks=[
(
enums.ProtectionStorageMask.SOFTWARE.value |
enums.ProtectionStorageMask.HARDWARE.value
),
(
enums.ProtectionStorageMask.ON_PREMISES.value |
enums.ProtectionStorageMask.OFF_PREMISES.value
)
]
)
b = objects.ProtectionStorageMasks(
protection_storage_masks=[
(
enums.ProtectionStorageMask.SOFTWARE.value |
enums.ProtectionStorageMask.HARDWARE.value
),
(
enums.ProtectionStorageMask.ON_PREMISES.value
)
]
)
self.assertFalse(a == b)
self.assertFalse(b == a)
def test_equal_on_type_mismatch(self):
"""
Test that the equality operator returns False when comparing two
ProtectionStorageMasks structures with different types.
"""
a = objects.ProtectionStorageMasks()
b = "invalid"
self.assertFalse(a == b)
self.assertFalse(b == a)
def test_not_equal_on_equal(self):
"""
Test that the inequality operator returns False when comparing two
ProtectionStorageMasks structures with the same data.
"""
a = objects.ProtectionStorageMasks()
b = objects.ProtectionStorageMasks()
self.assertFalse(a != b)
self.assertFalse(b != a)
a = objects.ProtectionStorageMasks(
protection_storage_masks=[
(
enums.ProtectionStorageMask.SOFTWARE.value |
enums.ProtectionStorageMask.HARDWARE.value
),
(
enums.ProtectionStorageMask.ON_PREMISES.value |
enums.ProtectionStorageMask.OFF_PREMISES.value
)
]
)
b = objects.ProtectionStorageMasks(
protection_storage_masks=[
(
enums.ProtectionStorageMask.SOFTWARE.value |
enums.ProtectionStorageMask.HARDWARE.value
),
(
enums.ProtectionStorageMask.ON_PREMISES.value |
enums.ProtectionStorageMask.OFF_PREMISES.value
)
]
)
self.assertFalse(a != b)
self.assertFalse(b != a)
def test_not_equal_on_not_equal_protection_storage_masks(self):
"""
Test that the inequality operator returns True when comparing two
ProtectionStorageMasks structures with different protection storage
masks fields.
"""
a = objects.ProtectionStorageMasks(
protection_storage_masks=[
(
enums.ProtectionStorageMask.SOFTWARE.value |
enums.ProtectionStorageMask.HARDWARE.value
),
(
enums.ProtectionStorageMask.ON_PREMISES.value |
enums.ProtectionStorageMask.OFF_PREMISES.value
)
]
)
b = objects.ProtectionStorageMasks(
protection_storage_masks=[
(
enums.ProtectionStorageMask.SOFTWARE.value |
enums.ProtectionStorageMask.HARDWARE.value
),
(
enums.ProtectionStorageMask.ON_PREMISES.value
)
]
)
self.assertTrue(a != b)
self.assertTrue(b != a)
def test_not_equal_on_type_mismatch(self):
"""
Test that the inequality operator returns True when comparing two
ProtectionStorageMasks structures with different types.
"""
a = objects.ProtectionStorageMasks()
b = "invalid"
self.assertTrue(a != b)
self.assertTrue(b != a)