mirror of https://github.com/OpenKMIP/PyKMIP.git
Update the Register payloads to support Protection Storage Masks
This change updates the Register payloads, adding support for Protection Storage Masks which were added in KMIP 2.0. The payload unit tests have been updated to reflect this change.
This commit is contained in:
parent
cc4c1775d1
commit
a2712e7541
|
@ -33,12 +33,16 @@ class RegisterRequestPayload(primitives.Struct):
|
|||
object_type: The type of the object to register.
|
||||
template_attribute: A group of attributes to set on the new object.
|
||||
managed_object: The object to register.
|
||||
protection_storage_masks: An integer representing all of the
|
||||
protection storage mask selections for the new object. Added in
|
||||
KMIP 2.0.
|
||||
"""
|
||||
|
||||
def __init__(self,
|
||||
object_type=None,
|
||||
template_attribute=None,
|
||||
managed_object=None):
|
||||
managed_object=None,
|
||||
protection_storage_masks=None):
|
||||
"""
|
||||
Construct a Register request payload structure.
|
||||
|
||||
|
@ -60,6 +64,9 @@ class RegisterRequestPayload(primitives.Struct):
|
|||
* secrets.SymmetricKey
|
||||
* secrets.Template
|
||||
Optional, defaults to None. Required for read/write.
|
||||
protection_storage_masks (int): An integer representing all of
|
||||
the protection storage mask selections for the new object.
|
||||
Optional, defaults to None. Added in KMIP 2.0.
|
||||
"""
|
||||
|
||||
super(RegisterRequestPayload, self).__init__(
|
||||
|
@ -71,10 +78,12 @@ class RegisterRequestPayload(primitives.Struct):
|
|||
self._object_type = None
|
||||
self._template_attribute = None
|
||||
self._secret = None
|
||||
self._protection_storage_masks = None
|
||||
|
||||
self.object_type = object_type
|
||||
self.template_attribute = template_attribute
|
||||
self.managed_object = managed_object
|
||||
self.protection_storage_masks = protection_storage_masks
|
||||
|
||||
@property
|
||||
def object_type(self):
|
||||
|
@ -140,6 +149,26 @@ class RegisterRequestPayload(primitives.Struct):
|
|||
"Managed object must be a supported managed object structure."
|
||||
)
|
||||
|
||||
@property
|
||||
def protection_storage_masks(self):
|
||||
if self._protection_storage_masks:
|
||||
return self._protection_storage_masks.value
|
||||
return None
|
||||
|
||||
@protection_storage_masks.setter
|
||||
def protection_storage_masks(self, value):
|
||||
if value is None:
|
||||
self._protection_storage_masks = None
|
||||
elif isinstance(value, six.integer_types):
|
||||
self._protection_storage_masks = primitives.Integer(
|
||||
value=value,
|
||||
tag=enums.Tags.PROTECTION_STORAGE_MASKS
|
||||
)
|
||||
else:
|
||||
raise TypeError(
|
||||
"The protection storage masks must be an integer."
|
||||
)
|
||||
|
||||
def read(self, input_buffer, kmip_version=enums.KMIPVersion.KMIP_1_0):
|
||||
"""
|
||||
Read the data encoding the Register request payload and decode it into
|
||||
|
@ -217,6 +246,20 @@ class RegisterRequestPayload(primitives.Struct):
|
|||
"object."
|
||||
)
|
||||
|
||||
if kmip_version >= enums.KMIPVersion.KMIP_2_0:
|
||||
if self.is_tag_next(
|
||||
enums.Tags.PROTECTION_STORAGE_MASKS,
|
||||
local_buffer
|
||||
):
|
||||
protection_storage_masks = primitives.Integer(
|
||||
tag=enums.Tags.PROTECTION_STORAGE_MASKS
|
||||
)
|
||||
protection_storage_masks.read(
|
||||
local_buffer,
|
||||
kmip_version=kmip_version
|
||||
)
|
||||
self._protection_storage_masks = protection_storage_masks
|
||||
|
||||
self.is_oversized(local_buffer)
|
||||
|
||||
def write(self, output_buffer, kmip_version=enums.KMIPVersion.KMIP_1_0):
|
||||
|
@ -281,6 +324,13 @@ class RegisterRequestPayload(primitives.Struct):
|
|||
"field."
|
||||
)
|
||||
|
||||
if kmip_version >= enums.KMIPVersion.KMIP_2_0:
|
||||
if self._protection_storage_masks:
|
||||
self._protection_storage_masks.write(
|
||||
local_buffer,
|
||||
kmip_version=kmip_version
|
||||
)
|
||||
|
||||
self.length = local_buffer.length()
|
||||
super(RegisterRequestPayload, self).write(
|
||||
output_buffer,
|
||||
|
@ -296,6 +346,9 @@ class RegisterRequestPayload(primitives.Struct):
|
|||
return False
|
||||
elif self.managed_object != other.managed_object:
|
||||
return False
|
||||
elif self.protection_storage_masks != \
|
||||
other.protection_storage_masks:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
else:
|
||||
|
@ -311,7 +364,12 @@ class RegisterRequestPayload(primitives.Struct):
|
|||
args = ", ".join([
|
||||
"object_type={}".format(self.object_type),
|
||||
"template_attribute={}".format(repr(self.template_attribute)),
|
||||
"managed_object={}".format(repr(self.managed_object))
|
||||
"managed_object={}".format(repr(self.managed_object)),
|
||||
"protection_storage_masks={}".format(
|
||||
"{}".format(
|
||||
repr(self.protection_storage_masks)
|
||||
) if self._protection_storage_masks else None
|
||||
)
|
||||
])
|
||||
return "RegisterRequestPayload({})".format(args)
|
||||
|
||||
|
@ -320,7 +378,12 @@ class RegisterRequestPayload(primitives.Struct):
|
|||
[
|
||||
'"object_type": {}'.format(self.object_type),
|
||||
'"template_attribute": {}'.format(self.template_attribute),
|
||||
'"managed_object": {}'.format(self.managed_object)
|
||||
'"managed_object": {}'.format(self.managed_object),
|
||||
'"protection_storage_masks": {}'.format(
|
||||
"{}".format(
|
||||
str(self.protection_storage_masks)
|
||||
) if self._protection_storage_masks else None
|
||||
)
|
||||
]
|
||||
)
|
||||
return '{' + value + '}'
|
||||
|
|
|
@ -156,8 +156,9 @@ class TestRegisterRequestPayload(testtools.TestCase):
|
|||
# Certificate
|
||||
# Certificate Type - X.509
|
||||
# Certificate Value - See comment for the full encoding.
|
||||
# Protection Storage Masks - Software | Hardware
|
||||
self.full_encoding_with_attributes = utils.BytearrayStream(
|
||||
b'\x42\x00\x79\x01\x00\x00\x03\x60'
|
||||
b'\x42\x00\x79\x01\x00\x00\x03\x70'
|
||||
b'\x42\x00\x57\x05\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00'
|
||||
b'\x42\x01\x25\x01\x00\x00\x00\x10'
|
||||
b'\x42\x00\x2C\x02\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00'
|
||||
|
@ -165,6 +166,7 @@ class TestRegisterRequestPayload(testtools.TestCase):
|
|||
b'\x42\x00\x1D\x05\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00'
|
||||
b'\x42\x00\x1E\x08\x00\x00\x03\x16' + self.certificate_value +
|
||||
b'\x00\x00'
|
||||
b'\x42\x01\x5F\x02\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00'
|
||||
)
|
||||
|
||||
# Encoding obtained from the KMIP 1.1 testing document, Section 13.2.2.
|
||||
|
@ -313,6 +315,31 @@ class TestRegisterRequestPayload(testtools.TestCase):
|
|||
*args
|
||||
)
|
||||
|
||||
def test_invalid_protection_storage_masks(self):
|
||||
"""
|
||||
Test that a TypeError is raised when an invalid value is used to set
|
||||
the protection storage masks of a Register request payload.
|
||||
"""
|
||||
kwargs = {"protection_storage_masks": "invalid"}
|
||||
self.assertRaisesRegex(
|
||||
TypeError,
|
||||
"The protection storage masks must be an integer.",
|
||||
payloads.RegisterRequestPayload,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
args = (
|
||||
payloads.RegisterRequestPayload(),
|
||||
"protection_storage_masks",
|
||||
"invalid"
|
||||
)
|
||||
self.assertRaisesRegex(
|
||||
TypeError,
|
||||
"The protection storage masks must be an integer.",
|
||||
setattr,
|
||||
*args
|
||||
)
|
||||
|
||||
def test_read(self):
|
||||
"""
|
||||
Test that a Register request payload can be read from a data stream.
|
||||
|
@ -322,6 +349,7 @@ class TestRegisterRequestPayload(testtools.TestCase):
|
|||
self.assertIsNone(payload.object_type)
|
||||
self.assertIsNone(payload.template_attribute)
|
||||
self.assertIsNone(payload.managed_object)
|
||||
self.assertIsNone(payload.protection_storage_masks)
|
||||
|
||||
payload.read(self.full_encoding)
|
||||
|
||||
|
@ -350,6 +378,7 @@ class TestRegisterRequestPayload(testtools.TestCase):
|
|||
),
|
||||
payload.managed_object
|
||||
)
|
||||
self.assertIsNone(payload.protection_storage_masks)
|
||||
|
||||
def test_read_kmip_2_0(self):
|
||||
"""
|
||||
|
@ -361,6 +390,7 @@ class TestRegisterRequestPayload(testtools.TestCase):
|
|||
self.assertIsNone(payload.object_type)
|
||||
self.assertIsNone(payload.template_attribute)
|
||||
self.assertIsNone(payload.managed_object)
|
||||
self.assertIsNone(payload.protection_storage_masks)
|
||||
|
||||
payload.read(
|
||||
self.full_encoding_with_attributes,
|
||||
|
@ -392,6 +422,7 @@ class TestRegisterRequestPayload(testtools.TestCase):
|
|||
),
|
||||
payload.managed_object
|
||||
)
|
||||
self.assertEqual(3, payload.protection_storage_masks)
|
||||
|
||||
def test_read_missing_object_type(self):
|
||||
"""
|
||||
|
@ -519,6 +550,10 @@ class TestRegisterRequestPayload(testtools.TestCase):
|
|||
managed_object=secrets.Certificate(
|
||||
certificate_type=enums.CertificateType.X_509,
|
||||
certificate_value=self.certificate_value
|
||||
),
|
||||
protection_storage_masks=(
|
||||
enums.ProtectionStorageMask.SOFTWARE.value |
|
||||
enums.ProtectionStorageMask.HARDWARE.value
|
||||
)
|
||||
)
|
||||
|
||||
|
@ -680,13 +715,18 @@ class TestRegisterRequestPayload(testtools.TestCase):
|
|||
)
|
||||
)
|
||||
)
|
||||
),
|
||||
protection_storage_masks=(
|
||||
enums.ProtectionStorageMask.SOFTWARE.value |
|
||||
enums.ProtectionStorageMask.HARDWARE.value
|
||||
)
|
||||
)
|
||||
self.assertEqual(
|
||||
"RegisterRequestPayload("
|
||||
"object_type=ObjectType.SECRET_DATA, "
|
||||
"template_attribute=Struct(), "
|
||||
"managed_object=Struct())",
|
||||
"managed_object=Struct(), "
|
||||
"protection_storage_masks=3)",
|
||||
repr(payload)
|
||||
)
|
||||
|
||||
|
@ -728,13 +768,18 @@ class TestRegisterRequestPayload(testtools.TestCase):
|
|||
)
|
||||
)
|
||||
)
|
||||
),
|
||||
protection_storage_masks=(
|
||||
enums.ProtectionStorageMask.SOFTWARE.value |
|
||||
enums.ProtectionStorageMask.HARDWARE.value
|
||||
)
|
||||
)
|
||||
self.assertEqual(
|
||||
'{'
|
||||
'"object_type": ObjectType.SECRET_DATA, '
|
||||
'"template_attribute": Struct(), '
|
||||
'"managed_object": Struct()'
|
||||
'"managed_object": Struct(), '
|
||||
'"protection_storage_masks": 3'
|
||||
'}',
|
||||
str(payload)
|
||||
)
|
||||
|
@ -769,6 +814,10 @@ class TestRegisterRequestPayload(testtools.TestCase):
|
|||
managed_object=secrets.Certificate(
|
||||
certificate_type=enums.CertificateType.X_509,
|
||||
certificate_value=self.certificate_value
|
||||
),
|
||||
protection_storage_masks=(
|
||||
enums.ProtectionStorageMask.SOFTWARE.value |
|
||||
enums.ProtectionStorageMask.HARDWARE.value
|
||||
)
|
||||
)
|
||||
b = payloads.RegisterRequestPayload(
|
||||
|
@ -790,6 +839,10 @@ class TestRegisterRequestPayload(testtools.TestCase):
|
|||
managed_object=secrets.Certificate(
|
||||
certificate_type=enums.CertificateType.X_509,
|
||||
certificate_value=self.certificate_value
|
||||
),
|
||||
protection_storage_masks=(
|
||||
enums.ProtectionStorageMask.SOFTWARE.value |
|
||||
enums.ProtectionStorageMask.HARDWARE.value
|
||||
)
|
||||
)
|
||||
|
||||
|
@ -871,6 +924,27 @@ class TestRegisterRequestPayload(testtools.TestCase):
|
|||
self.assertFalse(a == b)
|
||||
self.assertFalse(b == a)
|
||||
|
||||
def test_equal_on_not_equal_protection_storage_masks(self):
|
||||
"""
|
||||
Test that the equality operator returns False when comparing two Create
|
||||
request payloads with different protection storage masks.
|
||||
"""
|
||||
a = payloads.RegisterRequestPayload(
|
||||
protection_storage_masks=(
|
||||
enums.ProtectionStorageMask.SOFTWARE.value |
|
||||
enums.ProtectionStorageMask.HARDWARE.value
|
||||
)
|
||||
)
|
||||
b = payloads.RegisterRequestPayload(
|
||||
protection_storage_masks=(
|
||||
enums.ProtectionStorageMask.ON_SYSTEM.value |
|
||||
enums.ProtectionStorageMask.OFF_SYSTEM.value
|
||||
)
|
||||
)
|
||||
|
||||
self.assertFalse(a == b)
|
||||
self.assertFalse(b == a)
|
||||
|
||||
def test_equal_on_type_mismatch(self):
|
||||
"""
|
||||
Test that the equality operator returns False when comparing two
|
||||
|
@ -912,6 +986,10 @@ class TestRegisterRequestPayload(testtools.TestCase):
|
|||
managed_object=secrets.Certificate(
|
||||
certificate_type=enums.CertificateType.X_509,
|
||||
certificate_value=self.certificate_value
|
||||
),
|
||||
protection_storage_masks=(
|
||||
enums.ProtectionStorageMask.SOFTWARE.value |
|
||||
enums.ProtectionStorageMask.HARDWARE.value
|
||||
)
|
||||
)
|
||||
b = payloads.RegisterRequestPayload(
|
||||
|
@ -933,6 +1011,10 @@ class TestRegisterRequestPayload(testtools.TestCase):
|
|||
managed_object=secrets.Certificate(
|
||||
certificate_type=enums.CertificateType.X_509,
|
||||
certificate_value=self.certificate_value
|
||||
),
|
||||
protection_storage_masks=(
|
||||
enums.ProtectionStorageMask.SOFTWARE.value |
|
||||
enums.ProtectionStorageMask.HARDWARE.value
|
||||
)
|
||||
)
|
||||
|
||||
|
@ -1014,6 +1096,27 @@ class TestRegisterRequestPayload(testtools.TestCase):
|
|||
self.assertTrue(a != b)
|
||||
self.assertTrue(b != a)
|
||||
|
||||
def test_not_equal_on_not_equal_protection_storage_masks(self):
|
||||
"""
|
||||
Test that the inequality operator returns True when comparing two
|
||||
Register request payloads with different protection storage masks.
|
||||
"""
|
||||
a = payloads.RegisterRequestPayload(
|
||||
protection_storage_masks=(
|
||||
enums.ProtectionStorageMask.SOFTWARE.value |
|
||||
enums.ProtectionStorageMask.HARDWARE.value
|
||||
)
|
||||
)
|
||||
b = payloads.RegisterRequestPayload(
|
||||
protection_storage_masks=(
|
||||
enums.ProtectionStorageMask.ON_SYSTEM.value |
|
||||
enums.ProtectionStorageMask.OFF_SYSTEM.value
|
||||
)
|
||||
)
|
||||
|
||||
self.assertTrue(a != b)
|
||||
self.assertTrue(b != a)
|
||||
|
||||
def test_not_equal_on_type_mismatch(self):
|
||||
"""
|
||||
Test that the inequality operator returns True when comparing two
|
||||
|
|
Loading…
Reference in New Issue