Add the CapabilityInformation structure

This change adds the CapabilityInformation structure, a KMIP 1.3
addition that is used to specify details on capabilities supported
by a KMIP server. A unit test suite is included to cover the new
structure.
This commit is contained in:
Peter Hamilton 2019-04-19 12:01:39 -04:00 committed by Peter Hamilton
parent a778aa4224
commit b968378eb8
2 changed files with 1474 additions and 0 deletions

View File

@ -5499,3 +5499,550 @@ class ValidationInformation(primitives.Struct):
return not (self == other)
else:
return NotImplemented
class CapabilityInformation(primitives.Struct):
"""
A structure containing details of supported server capabilities.
This is intended for use with KMIP 1.3+.
Attributes:
streaming_capability: A boolean flag indicating whether or not
the server supports streaming data.
asynchronous_capability: A boolean flag indicating whether or not
the server supports asynchronous operations.
attestation_capability: A boolean flag indicating whether or not
the server supports attestation.
batch_undo_capability: A boolean flag indicating whether or not
the server supports batch undo. Added in KMIP 1.4.
batch_continue_capability: A boolean flag indicating whether or not
the server supports batch continue. Added in KMIP 1.4.
unwrap_mode: An UnwrapMode enumeration identifying the unwrap mode
supported by the server.
destroy_action: A DestroyAction enumeration identifying the destroy
action supported by the server.
shredding_algorithm: A ShreddingAlgorithm enumeration identifying
the shredding algorithm supported by the server.
rng_mode: An RNGMode enumeration identifying the RNG mode supported
by the server.
"""
def __init__(self,
streaming_capability=None,
asynchronous_capability=None,
attestation_capability=None,
batch_undo_capability=None,
batch_continue_capability=None,
unwrap_mode=None,
destroy_action=None,
shredding_algorithm=None,
rng_mode=None):
"""
Construct a CapabilityInformation structure.
Args:
streaming_capability (bool): A boolean flag indicating whether or
not the server supports streaming data. Optional, defaults to
None.
asynchronous_capability (bool): A boolean flag indicating whether
or not the server supports asynchronous operations. Optional,
defaults to None.
attestation_capability (bool): A boolean flag indicating whether
or not the server supports attestation. Optional, defaults to
None.
batch_undo_capability (bool): A boolean flag indicating whether or
not the server supports batch undo. Added in KMIP 1.4.
Optional, defaults to None.
batch_continue_capability (bool): A boolean flag indicating whether
or not the server supports batch continue. Added in KMIP 1.4.
Optional, defaults to None.
unwrap_mode (enum): An UnwrapMode enumeration identifying the
unwrap mode supported by the server. Optional, defaults to
None.
destroy_action (enum): A DestroyAction enumeration identifying the
destroy action supported by the server. Optional, defaults to
None.
shredding_algorithm (enum): A ShreddingAlgorithm enumeration
identifying the shredding algorithm supported by the server.
Optional, defaults to None.
rng_mode (enum): An RNGMode enumeration identifying the RNG mode
supported by the server. Optional, defaults to None.
"""
super(CapabilityInformation, self).__init__(
tag=enums.Tags.CAPABILITY_INFORMATION
)
self._streaming_capability = None
self._asynchronous_capability = None
self._attestation_capability = None
self._batch_undo_capability = None
self._batch_continue_capability = None
self._unwrap_mode = None
self._destroy_action = None
self._shredding_algorithm = None
self._rng_mode = None
self.streaming_capability = streaming_capability
self.asynchronous_capability = asynchronous_capability
self.attestation_capability = attestation_capability
self.batch_undo_capability = batch_undo_capability
self.batch_continue_capability = batch_continue_capability
self.unwrap_mode = unwrap_mode
self.destroy_action = destroy_action
self.shredding_algorithm = shredding_algorithm
self.rng_mode = rng_mode
@property
def streaming_capability(self):
if self._streaming_capability:
return self._streaming_capability.value
return None
@streaming_capability.setter
def streaming_capability(self, value):
if value is None:
self._streaming_capability = None
elif isinstance(value, bool):
self._streaming_capability = primitives.Boolean(
value=value,
tag=enums.Tags.STREAMING_CAPABILITY
)
else:
raise TypeError("The streaming capability must be a boolean.")
@property
def asynchronous_capability(self):
if self._asynchronous_capability:
return self._asynchronous_capability.value
return None
@asynchronous_capability.setter
def asynchronous_capability(self, value):
if value is None:
self._asynchronous_capability = None
elif isinstance(value, bool):
self._asynchronous_capability = primitives.Boolean(
value=value,
tag=enums.Tags.ASYNCHRONOUS_CAPABILITY
)
else:
raise TypeError(
"The asynchronous capability must be a boolean."
)
@property
def attestation_capability(self):
if self._attestation_capability:
return self._attestation_capability.value
return None
@attestation_capability.setter
def attestation_capability(self, value):
if value is None:
self._attestation_capability = None
elif isinstance(value, bool):
self._attestation_capability = primitives.Boolean(
value=value,
tag=enums.Tags.ATTESTATION_CAPABILITY
)
else:
raise TypeError("The attestation capability must be a boolean.")
@property
def batch_undo_capability(self):
if self._batch_undo_capability:
return self._batch_undo_capability.value
return None
@batch_undo_capability.setter
def batch_undo_capability(self, value):
if value is None:
self._batch_undo_capability = None
elif isinstance(value, bool):
self._batch_undo_capability = primitives.Boolean(
value=value,
tag=enums.Tags.BATCH_UNDO_CAPABILITY
)
else:
raise TypeError("The batch undo capability must be a boolean.")
@property
def batch_continue_capability(self):
if self._batch_continue_capability:
return self._batch_continue_capability.value
return None
@batch_continue_capability.setter
def batch_continue_capability(self, value):
if value is None:
self._batch_continue_capability = None
elif isinstance(value, bool):
self._batch_continue_capability = primitives.Boolean(
value=value,
tag=enums.Tags.BATCH_CONTINUE_CAPABILITY
)
else:
raise TypeError(
"The batch continue capability must be a boolean."
)
@property
def unwrap_mode(self):
if self._unwrap_mode:
return self._unwrap_mode.value
return None
@unwrap_mode.setter
def unwrap_mode(self, value):
if value is None:
self._unwrap_mode = None
elif isinstance(value, enums.UnwrapMode):
self._unwrap_mode = primitives.Enumeration(
enums.UnwrapMode,
value=value,
tag=enums.Tags.UNWRAP_MODE
)
else:
raise TypeError(
"The unwrap mode must be an UnwrapMode enumeration."
)
@property
def destroy_action(self):
if self._destroy_action:
return self._destroy_action.value
return None
@destroy_action.setter
def destroy_action(self, value):
if value is None:
self._destroy_action = None
elif isinstance(value, enums.DestroyAction):
self._destroy_action = primitives.Enumeration(
enums.DestroyAction,
value=value,
tag=enums.Tags.DESTROY_ACTION
)
else:
raise TypeError(
"The destroy action must be a DestroyAction enumeration."
)
@property
def shredding_algorithm(self):
if self._shredding_algorithm:
return self._shredding_algorithm.value
return None
@shredding_algorithm.setter
def shredding_algorithm(self, value):
if value is None:
self._shredding_algorithm = None
elif isinstance(value, enums.ShreddingAlgorithm):
self._shredding_algorithm = primitives.Enumeration(
enums.ShreddingAlgorithm,
value=value,
tag=enums.Tags.SHREDDING_ALGORITHM
)
else:
raise TypeError(
"The shredding algorithm must be a ShreddingAlgorithm "
"enumeration."
)
@property
def rng_mode(self):
if self._rng_mode:
return self._rng_mode.value
return None
@rng_mode.setter
def rng_mode(self, value):
if value is None:
self._rng_mode = None
elif isinstance(value, enums.RNGMode):
self._rng_mode = primitives.Enumeration(
enums.RNGMode,
value=value,
tag=enums.Tags.RNG_MODE
)
else:
raise TypeError("The RNG mode must be an RNGMode enumeration.")
def read(self, input_buffer, kmip_version=enums.KMIPVersion.KMIP_1_3):
"""
Read the data encoding the CapabilityInformation structure and decode
it into its constituent parts.
Args:
input_buffer (stream): A data stream containing encoded object
data, supporting a read method; usually a BytearrayStream
object.
kmip_version (KMIPVersion): An enumeration defining the KMIP
version with which the object will be decoded. Optional,
defaults to KMIP 2.0.
Raises:
VersionNotSupported: Raised when a KMIP version is provided that
does not support the CapabilityInformation structure.
"""
if kmip_version < enums.KMIPVersion.KMIP_1_3:
raise exceptions.VersionNotSupported(
"KMIP {} does not support the CapabilityInformation "
"object.".format(
kmip_version.value
)
)
super(CapabilityInformation, self).read(
input_buffer,
kmip_version=kmip_version
)
local_buffer = utils.BytearrayStream(input_buffer.read(self.length))
if self.is_tag_next(enums.Tags.STREAMING_CAPABILITY, local_buffer):
streaming_capability = primitives.Boolean(
tag=enums.Tags.STREAMING_CAPABILITY
)
streaming_capability.read(local_buffer, kmip_version=kmip_version)
self._streaming_capability = streaming_capability
if self.is_tag_next(enums.Tags.ASYNCHRONOUS_CAPABILITY, local_buffer):
asynchronous_capability = primitives.Boolean(
tag=enums.Tags.ASYNCHRONOUS_CAPABILITY
)
asynchronous_capability.read(
local_buffer,
kmip_version=kmip_version
)
self._asynchronous_capability = asynchronous_capability
if self.is_tag_next(enums.Tags.ATTESTATION_CAPABILITY, local_buffer):
attestation_capability = primitives.Boolean(
tag=enums.Tags.ATTESTATION_CAPABILITY
)
attestation_capability.read(
local_buffer,
kmip_version=kmip_version
)
self._attestation_capability = attestation_capability
if kmip_version >= enums.KMIPVersion.KMIP_1_4:
if self.is_tag_next(
enums.Tags.BATCH_UNDO_CAPABILITY,
local_buffer
):
batch_undo_capability = primitives.Boolean(
tag=enums.Tags.BATCH_UNDO_CAPABILITY
)
batch_undo_capability.read(
local_buffer,
kmip_version=kmip_version
)
self._batch_continue_capability = batch_undo_capability
if self.is_tag_next(
enums.Tags.BATCH_CONTINUE_CAPABILITY,
local_buffer
):
batch_continue_capability = primitives.Boolean(
tag=enums.Tags.BATCH_CONTINUE_CAPABILITY
)
batch_continue_capability.read(
local_buffer,
kmip_version=kmip_version
)
self._batch_continue_capability = batch_continue_capability
if self.is_tag_next(enums.Tags.UNWRAP_MODE, local_buffer):
unwrap_mode = primitives.Enumeration(
enums.UnwrapMode,
tag=enums.Tags.UNWRAP_MODE
)
unwrap_mode.read(local_buffer, kmip_version=kmip_version)
self._unwrap_mode = unwrap_mode
if self.is_tag_next(enums.Tags.DESTROY_ACTION, local_buffer):
destroy_action = primitives.Enumeration(
enums.DestroyAction,
tag=enums.Tags.DESTROY_ACTION
)
destroy_action.read(local_buffer, kmip_version=kmip_version)
self._destroy_action = destroy_action
if self.is_tag_next(enums.Tags.SHREDDING_ALGORITHM, local_buffer):
shredding_algorithm = primitives.Enumeration(
enums.ShreddingAlgorithm,
tag=enums.Tags.SHREDDING_ALGORITHM
)
shredding_algorithm.read(local_buffer, kmip_version=kmip_version)
self._shredding_algorithm = shredding_algorithm
if self.is_tag_next(enums.Tags.RNG_MODE, local_buffer):
rng_mode = primitives.Enumeration(
enums.RNGMode,
tag=enums.Tags.RNG_MODE
)
rng_mode.read(local_buffer, kmip_version=kmip_version)
self._rng_mode = rng_mode
self.is_oversized(local_buffer)
def write(self, output_buffer, kmip_version=enums.KMIPVersion.KMIP_1_3):
"""
Write the CapabilityInformation structure encoding to the data stream.
Args:
output_buffer (stream): A data stream in which to encode
CapabilityInformation structure data, supporting a write
method.
kmip_version (enum): A KMIPVersion enumeration defining the KMIP
version with which the object will be encoded. Optional,
defaults to KMIP 2.0.
Raises:
VersionNotSupported: Raised when a KMIP version is provided that
does not support the CapabilityInformation structure.
"""
if kmip_version < enums.KMIPVersion.KMIP_1_3:
raise exceptions.VersionNotSupported(
"KMIP {} does not support the CapabilityInformation "
"object.".format(
kmip_version.value
)
)
local_buffer = BytearrayStream()
if self._streaming_capability:
self._streaming_capability.write(
local_buffer,
kmip_version=kmip_version
)
if self._asynchronous_capability:
self._asynchronous_capability.write(
local_buffer,
kmip_version=kmip_version
)
if self._attestation_capability:
self._attestation_capability.write(
local_buffer,
kmip_version=kmip_version
)
if kmip_version >= enums.KMIPVersion.KMIP_1_4:
if self._batch_undo_capability:
self._batch_undo_capability.write(
local_buffer,
kmip_version=kmip_version
)
if self._batch_continue_capability:
self._batch_continue_capability.write(
local_buffer,
kmip_version=kmip_version
)
if self._unwrap_mode:
self._unwrap_mode.write(
local_buffer,
kmip_version=kmip_version
)
if self._destroy_action:
self._destroy_action.write(
local_buffer,
kmip_version=kmip_version
)
if self._shredding_algorithm:
self._shredding_algorithm.write(
local_buffer,
kmip_version=kmip_version
)
if self._rng_mode:
self._rng_mode.write(
local_buffer,
kmip_version=kmip_version
)
self.length = local_buffer.length()
super(CapabilityInformation, self).write(
output_buffer,
kmip_version=kmip_version
)
output_buffer.write(local_buffer.buffer)
def __repr__(self):
sc = "streaming_capability={}".format(self.streaming_capability)
rc = "asynchronous_capability={}".format(self.asynchronous_capability)
tc = "attestation_capability={}".format(self.attestation_capability)
buc = "batch_undo_capability={}".format(self.batch_undo_capability)
bcc = "batch_continue_capability={}".format(
self.batch_continue_capability
)
um = "unwrap_mode={}".format(self.unwrap_mode)
da = "destroy_action={}".format(self.destroy_action)
sa = "shredding_algorithm={}".format(self.shredding_algorithm)
rm = "rng_mode={}".format(self.rng_mode)
v = ", ".join([sc, rc, tc, buc, bcc, um, da, sa, rm])
return "CapabilityInformation({})".format(v)
def __str__(self):
sc = '"streaming_capability": {}'.format(self.streaming_capability)
rc = '"asynchronous_capability": {}'.format(
self.asynchronous_capability
)
tc = '"attestation_capability": {}'.format(
self.attestation_capability
)
buc = '"batch_undo_capability": {}'.format(self.batch_undo_capability)
bcc = '"batch_continue_capability": {}'.format(
self.batch_continue_capability
)
um = '"unwrap_mode": {}'.format(self.unwrap_mode)
da = '"destroy_action": {}'.format(self.destroy_action)
sa = '"shredding_algorithm": {}'.format(self.shredding_algorithm)
rm = '"rng_mode": {}'.format(self.rng_mode)
v = ", ".join([sc, rc, tc, buc, bcc, um, da, sa, rm])
return '{' + v + '}'
def __eq__(self, other):
if isinstance(other, CapabilityInformation):
if self.streaming_capability != other.streaming_capability:
return False
elif self.asynchronous_capability != other.asynchronous_capability:
return False
elif self.attestation_capability != other.attestation_capability:
return False
elif self.batch_undo_capability != other.batch_undo_capability:
return False
elif self.batch_continue_capability != \
other.batch_continue_capability:
return False
elif self.unwrap_mode != other.unwrap_mode:
return False
elif self.destroy_action != other.destroy_action:
return False
elif self.shredding_algorithm != other.shredding_algorithm:
return False
elif self.rng_mode != other.rng_mode:
return False
else:
return True
else:
return NotImplemented
def __ne__(self, other):
if isinstance(other, CapabilityInformation):
return not (self == other)
else:
return NotImplemented

View File

@ -8444,3 +8444,930 @@ class TestValidationInformation(testtools.TestCase):
self.assertTrue(a != b)
self.assertTrue(b != a)
class TestCapabilityInformation(testtools.TestCase):
def setUp(self):
super(TestCapabilityInformation, self).setUp()
# This encoding matches the following set of values:
#
# Capability Information
# Streaming Capability - False
# Asynchronous Capability - True
# Attestation Capability - True
# Batch Undo Capability - False
# Batch Continue Capability - True
# Unwrap Mode - PROCESSED
# Destroy Action - SHREDDED
# Shredding Algorithm - CRYPTOGRAPHIC
# RNG Mode - NON_SHARED_INSTANTIATION
self.full_encoding = utils.BytearrayStream(
b'\x42\x00\xF7\x01\x00\x00\x00\x90'
b'\x42\x00\xEF\x06\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x00'
b'\x42\x00\xF0\x06\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01'
b'\x42\x00\xF1\x06\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01'
b'\x42\x00\xF9\x06\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x00'
b'\x42\x00\xFA\x06\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01'
b'\x42\x00\xF2\x05\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00\x00'
b'\x42\x00\xF3\x05\x00\x00\x00\x04\x00\x00\x00\x07\x00\x00\x00\x00'
b'\x42\x00\xF4\x05\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00\x00'
b'\x42\x00\xF5\x05\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00'
)
# This encoding matches the following set of values:
#
# Capability Information
# Streaming Capability - False
# Asynchronous Capability - True
# Attestation Capability - True
# Unwrap Mode - PROCESSED
# Destroy Action - SHREDDED
# Shredding Algorithm - CRYPTOGRAPHIC
# RNG Mode - NON_SHARED_INSTANTIATION
self.full_encoding_kmip_1_3 = utils.BytearrayStream(
b'\x42\x00\xF7\x01\x00\x00\x00\x70'
b'\x42\x00\xEF\x06\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x00'
b'\x42\x00\xF0\x06\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01'
b'\x42\x00\xF1\x06\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01'
b'\x42\x00\xF2\x05\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00\x00'
b'\x42\x00\xF3\x05\x00\x00\x00\x04\x00\x00\x00\x07\x00\x00\x00\x00'
b'\x42\x00\xF4\x05\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00\x00'
b'\x42\x00\xF5\x05\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00'
)
# This encoding matches the following set of values:
#
# Capability Information
self.empty_encoding = utils.BytearrayStream(
b'\x42\x00\xF7\x01\x00\x00\x00\x00'
)
def tearDown(self):
super(TestCapabilityInformation, self).tearDown()
def test_invalid_streaming_capability(self):
"""
Test that a TypeError is raised when an invalid value is used to set
the streaming capability of a CapabilityInformation structure.
"""
kwargs = {"streaming_capability": "invalid"}
self.assertRaisesRegex(
TypeError,
"The streaming capability must be a boolean.",
objects.CapabilityInformation,
**kwargs
)
args = (
objects.CapabilityInformation(),
"streaming_capability",
"invalid"
)
self.assertRaisesRegex(
TypeError,
"The streaming capability must be a boolean.",
setattr,
*args
)
def test_invalid_asynchronous_capability(self):
"""
Test that a TypeError is raised when an invalid value is used to set
the asynchronous capability of a CapabilityInformation structure.
"""
kwargs = {"asynchronous_capability": "invalid"}
self.assertRaisesRegex(
TypeError,
"The asynchronous capability must be a boolean.",
objects.CapabilityInformation,
**kwargs
)
args = (
objects.CapabilityInformation(),
"asynchronous_capability",
"invalid"
)
self.assertRaisesRegex(
TypeError,
"The asynchronous capability must be a boolean.",
setattr,
*args
)
def test_invalid_attestation_capability(self):
"""
Test that a TypeError is raised when an invalid value is used to set
the attestation capability of a CapabilityInformation structure.
"""
kwargs = {"attestation_capability": "invalid"}
self.assertRaisesRegex(
TypeError,
"The attestation capability must be a boolean.",
objects.CapabilityInformation,
**kwargs
)
args = (
objects.CapabilityInformation(),
"attestation_capability",
"invalid"
)
self.assertRaisesRegex(
TypeError,
"The attestation capability must be a boolean.",
setattr,
*args
)
def test_invalid_batch_undo_capability(self):
"""
Test that a TypeError is raised when an invalid value is used to set
the batch undo capability of a CapabilityInformation structure.
"""
kwargs = {"batch_undo_capability": "invalid"}
self.assertRaisesRegex(
TypeError,
"The batch undo capability must be a boolean.",
objects.CapabilityInformation,
**kwargs
)
args = (
objects.CapabilityInformation(),
"batch_undo_capability",
"invalid"
)
self.assertRaisesRegex(
TypeError,
"The batch undo capability must be a boolean.",
setattr,
*args
)
def test_invalid_batch_continue_capability(self):
"""
Test that a TypeError is raised when an invalid value is used to set
the batch continue capability of a CapabilityInformation structure.
"""
kwargs = {"batch_continue_capability": "invalid"}
self.assertRaisesRegex(
TypeError,
"The batch continue capability must be a boolean.",
objects.CapabilityInformation,
**kwargs
)
args = (
objects.CapabilityInformation(),
"batch_continue_capability",
"invalid"
)
self.assertRaisesRegex(
TypeError,
"The batch continue capability must be a boolean.",
setattr,
*args
)
def test_invalid_unwrap_mode(self):
"""
Test that a TypeError is raised when an invalid value is used to set
the unwrap mode of a CapabilityInformation structure.
"""
kwargs = {"unwrap_mode": "invalid"}
self.assertRaisesRegex(
TypeError,
"The unwrap mode must be an UnwrapMode enumeration.",
objects.CapabilityInformation,
**kwargs
)
args = (
objects.CapabilityInformation(),
"unwrap_mode",
"invalid"
)
self.assertRaisesRegex(
TypeError,
"The unwrap mode must be an UnwrapMode enumeration.",
setattr,
*args
)
def test_invalid_destroy_action(self):
"""
Test that a TypeError is raised when an invalid value is used to set
the destroy action of a CapabilityInformation structure.
"""
kwargs = {"destroy_action": "invalid"}
self.assertRaisesRegex(
TypeError,
"The destroy action must be a DestroyAction enumeration.",
objects.CapabilityInformation,
**kwargs
)
args = (
objects.CapabilityInformation(),
"destroy_action",
"invalid"
)
self.assertRaisesRegex(
TypeError,
"The destroy action must be a DestroyAction enumeration.",
setattr,
*args
)
def test_invalid_shredding_algorithm(self):
"""
Test that a TypeError is raised when an invalid value is used to set
the shredding algorithm of a CapabilityInformation structure.
"""
kwargs = {"shredding_algorithm": "invalid"}
self.assertRaisesRegex(
TypeError,
"The shredding algorithm must be a ShreddingAlgorithm "
"enumeration.",
objects.CapabilityInformation,
**kwargs
)
args = (
objects.CapabilityInformation(),
"shredding_algorithm",
"invalid"
)
self.assertRaisesRegex(
TypeError,
"The shredding algorithm must be a ShreddingAlgorithm "
"enumeration.",
setattr,
*args
)
def test_invalid_rng_mode(self):
"""
Test that a TypeError is raised when an invalid value is used to set
the RNG mode of a CapabilityInformation structure.
"""
kwargs = {"rng_mode": "invalid"}
self.assertRaisesRegex(
TypeError,
"The RNG mode must be an RNGMode enumeration.",
objects.CapabilityInformation,
**kwargs
)
args = (
objects.CapabilityInformation(),
"rng_mode",
"invalid"
)
self.assertRaisesRegex(
TypeError,
"The RNG mode must be an RNGMode enumeration.",
setattr,
*args
)
def test_read(self):
"""
Test that a CapabilityInformation structure can be correctly read in
from a data stream.
"""
capability_information = objects.CapabilityInformation()
self.assertIsNone(capability_information.streaming_capability)
self.assertIsNone(capability_information.asynchronous_capability)
self.assertIsNone(capability_information.attestation_capability)
self.assertIsNone(capability_information.batch_undo_capability)
self.assertIsNone(capability_information.batch_continue_capability)
self.assertIsNone(capability_information.unwrap_mode)
self.assertIsNone(capability_information.destroy_action)
self.assertIsNone(capability_information.shredding_algorithm)
self.assertIsNone(capability_information.rng_mode)
capability_information.read(
self.full_encoding,
kmip_version=enums.KMIPVersion.KMIP_1_4
)
self.assertFalse(capability_information.streaming_capability)
self.assertTrue(capability_information.asynchronous_capability)
self.assertTrue(capability_information.attestation_capability)
self.assertFalse(capability_information.batch_undo_capability)
self.assertTrue(capability_information.batch_continue_capability)
self.assertEqual(
enums.UnwrapMode.PROCESSED,
capability_information.unwrap_mode
)
self.assertEqual(
enums.DestroyAction.SHREDDED,
capability_information.destroy_action
)
self.assertEqual(
enums.ShreddingAlgorithm.CRYPTOGRAPHIC,
capability_information.shredding_algorithm
)
self.assertEqual(
enums.RNGMode.NON_SHARED_INSTANTIATION,
capability_information.rng_mode
)
def test_read_unsupported_kmip_version(self):
"""
Test that a VersionNotSupported error is raised during the decoding of
a CapabilityInformation structure when the structure is read for an
unsupported KMIP version.
"""
capability_information = objects.CapabilityInformation()
args = (self.full_encoding, )
kwargs = {"kmip_version": enums.KMIPVersion.KMIP_1_2}
self.assertRaisesRegex(
exceptions.VersionNotSupported,
"KMIP 1.2 does not support the CapabilityInformation object.",
capability_information.read,
*args,
**kwargs
)
def test_read_kmip_1_3(self):
"""
Test that a CapabilityInformation structure can be correctly read in
from a data stream with only KMIP 1.3 features.
"""
capability_information = objects.CapabilityInformation()
self.assertIsNone(capability_information.streaming_capability)
self.assertIsNone(capability_information.asynchronous_capability)
self.assertIsNone(capability_information.attestation_capability)
self.assertIsNone(capability_information.batch_undo_capability)
self.assertIsNone(capability_information.batch_continue_capability)
self.assertIsNone(capability_information.unwrap_mode)
self.assertIsNone(capability_information.destroy_action)
self.assertIsNone(capability_information.shredding_algorithm)
self.assertIsNone(capability_information.rng_mode)
capability_information.read(
self.full_encoding_kmip_1_3,
kmip_version=enums.KMIPVersion.KMIP_1_4
)
self.assertFalse(capability_information.streaming_capability)
self.assertTrue(capability_information.asynchronous_capability)
self.assertTrue(capability_information.attestation_capability)
self.assertIsNone(capability_information.batch_undo_capability)
self.assertIsNone(capability_information.batch_continue_capability)
self.assertEqual(
enums.UnwrapMode.PROCESSED,
capability_information.unwrap_mode
)
self.assertEqual(
enums.DestroyAction.SHREDDED,
capability_information.destroy_action
)
self.assertEqual(
enums.ShreddingAlgorithm.CRYPTOGRAPHIC,
capability_information.shredding_algorithm
)
self.assertEqual(
enums.RNGMode.NON_SHARED_INSTANTIATION,
capability_information.rng_mode
)
def test_read_empty(self):
"""
Test that a CapabilityInformation structure can be correctly read in
from an empty data stream.
"""
capability_information = objects.CapabilityInformation()
self.assertIsNone(capability_information.streaming_capability)
self.assertIsNone(capability_information.asynchronous_capability)
self.assertIsNone(capability_information.attestation_capability)
self.assertIsNone(capability_information.batch_undo_capability)
self.assertIsNone(capability_information.batch_continue_capability)
self.assertIsNone(capability_information.unwrap_mode)
self.assertIsNone(capability_information.destroy_action)
self.assertIsNone(capability_information.shredding_algorithm)
self.assertIsNone(capability_information.rng_mode)
capability_information.read(
self.empty_encoding,
kmip_version=enums.KMIPVersion.KMIP_1_4
)
self.assertIsNone(capability_information.streaming_capability)
self.assertIsNone(capability_information.asynchronous_capability)
self.assertIsNone(capability_information.attestation_capability)
self.assertIsNone(capability_information.batch_undo_capability)
self.assertIsNone(capability_information.batch_continue_capability)
self.assertIsNone(capability_information.unwrap_mode)
self.assertIsNone(capability_information.destroy_action)
self.assertIsNone(capability_information.shredding_algorithm)
self.assertIsNone(capability_information.rng_mode)
def test_write(self):
"""
Test that a CapabilityInformation structure can be written to a data
stream.
"""
capability_information = objects.CapabilityInformation(
streaming_capability=False,
asynchronous_capability=True,
attestation_capability=True,
batch_undo_capability=False,
batch_continue_capability=True,
unwrap_mode=enums.UnwrapMode.PROCESSED,
destroy_action=enums.DestroyAction.SHREDDED,
shredding_algorithm=enums.ShreddingAlgorithm.CRYPTOGRAPHIC,
rng_mode=enums.RNGMode.NON_SHARED_INSTANTIATION
)
buffer = utils.BytearrayStream()
capability_information.write(
buffer,
kmip_version=enums.KMIPVersion.KMIP_1_4
)
self.assertEqual(len(self.full_encoding), len(buffer))
self.assertEqual(str(self.full_encoding), str(buffer))
def test_write_unsupported_kmip_version(self):
"""
Test that a VersionNotSupported error is raised during the encoding of
a CapabilityInformation structure when the structure is written for an
unsupported KMIP version.
"""
capability_information = objects.CapabilityInformation(
streaming_capability=False,
asynchronous_capability=True,
attestation_capability=True,
batch_undo_capability=False,
batch_continue_capability=True,
unwrap_mode=enums.UnwrapMode.PROCESSED,
destroy_action=enums.DestroyAction.SHREDDED,
shredding_algorithm=enums.ShreddingAlgorithm.CRYPTOGRAPHIC,
rng_mode=enums.RNGMode.NON_SHARED_INSTANTIATION
)
args = (utils.BytearrayStream(), )
kwargs = {"kmip_version": enums.KMIPVersion.KMIP_1_2}
self.assertRaisesRegex(
exceptions.VersionNotSupported,
"KMIP 1.2 does not support the CapabilityInformation object.",
capability_information.write,
*args,
**kwargs
)
def test_write_kmip_1_3(self):
"""
Test that a CapabilityInformation structure can be written to a data
stream with only KMIP 1.3 features.
"""
capability_information = objects.CapabilityInformation(
streaming_capability=False,
asynchronous_capability=True,
attestation_capability=True,
batch_undo_capability=False,
batch_continue_capability=True,
unwrap_mode=enums.UnwrapMode.PROCESSED,
destroy_action=enums.DestroyAction.SHREDDED,
shredding_algorithm=enums.ShreddingAlgorithm.CRYPTOGRAPHIC,
rng_mode=enums.RNGMode.NON_SHARED_INSTANTIATION
)
buffer = utils.BytearrayStream()
capability_information.write(
buffer,
kmip_version=enums.KMIPVersion.KMIP_1_3
)
self.assertEqual(len(self.full_encoding_kmip_1_3), len(buffer))
self.assertEqual(str(self.full_encoding_kmip_1_3), str(buffer))
def test_write_empty(self):
"""
Test that an empty CapabilityInformation structure can be correctly
written to a data stream.
"""
capability_information = objects.CapabilityInformation()
buffer = utils.BytearrayStream()
capability_information.write(
buffer,
kmip_version=enums.KMIPVersion.KMIP_1_3
)
self.assertEqual(len(self.empty_encoding), len(buffer))
self.assertEqual(str(self.empty_encoding), str(buffer))
def test_repr(self):
"""
Test that repr can be applied to a CapabilityInformation structure.
"""
capability_information = objects.CapabilityInformation(
streaming_capability=False,
asynchronous_capability=True,
attestation_capability=True,
batch_undo_capability=False,
batch_continue_capability=True,
unwrap_mode=enums.UnwrapMode.PROCESSED,
destroy_action=enums.DestroyAction.SHREDDED,
shredding_algorithm=enums.ShreddingAlgorithm.CRYPTOGRAPHIC,
rng_mode=enums.RNGMode.NON_SHARED_INSTANTIATION
)
sc = "streaming_capability=False"
rc = "asynchronous_capability=True"
tc = "attestation_capability=True"
buc = "batch_undo_capability=False"
bcc = "batch_continue_capability=True"
um = "unwrap_mode=UnwrapMode.PROCESSED"
da = "destroy_action=DestroyAction.SHREDDED"
sa = "shredding_algorithm=ShreddingAlgorithm.CRYPTOGRAPHIC"
rm = "rng_mode=RNGMode.NON_SHARED_INSTANTIATION"
v = ", ".join([sc, rc, tc, buc, bcc, um, da, sa, rm])
self.assertEqual(
"CapabilityInformation({})".format(v),
repr(capability_information)
)
def test_str(self):
"""
Test that str can be applied to a CapabilityInformation structure.
"""
capability_information = objects.CapabilityInformation(
streaming_capability=False,
asynchronous_capability=True,
attestation_capability=True,
batch_undo_capability=False,
batch_continue_capability=True,
unwrap_mode=enums.UnwrapMode.PROCESSED,
destroy_action=enums.DestroyAction.SHREDDED,
shredding_algorithm=enums.ShreddingAlgorithm.CRYPTOGRAPHIC,
rng_mode=enums.RNGMode.NON_SHARED_INSTANTIATION
)
sc = '"streaming_capability": False'
rc = '"asynchronous_capability": True'
tc = '"attestation_capability": True'
buc = '"batch_undo_capability": False'
bcc = '"batch_continue_capability": True'
um = '"unwrap_mode": UnwrapMode.PROCESSED'
da = '"destroy_action": DestroyAction.SHREDDED'
sa = '"shredding_algorithm": ShreddingAlgorithm.CRYPTOGRAPHIC'
rm = '"rng_mode": RNGMode.NON_SHARED_INSTANTIATION'
v = ", ".join([sc, rc, tc, buc, bcc, um, da, sa, rm])
self.assertEqual(
"{" + v + "}",
str(capability_information)
)
def test_equal_on_equal(self):
"""
Test that the equality operator returns True when comparing two
CapabilityInformation structures with the same data.
"""
a = objects.CapabilityInformation()
b = objects.CapabilityInformation()
self.assertTrue(a == b)
self.assertTrue(b == a)
a = objects.CapabilityInformation(
streaming_capability=False,
asynchronous_capability=True,
attestation_capability=True,
batch_undo_capability=False,
batch_continue_capability=True,
unwrap_mode=enums.UnwrapMode.PROCESSED,
destroy_action=enums.DestroyAction.SHREDDED,
shredding_algorithm=enums.ShreddingAlgorithm.CRYPTOGRAPHIC,
rng_mode=enums.RNGMode.NON_SHARED_INSTANTIATION
)
b = objects.CapabilityInformation(
streaming_capability=False,
asynchronous_capability=True,
attestation_capability=True,
batch_undo_capability=False,
batch_continue_capability=True,
unwrap_mode=enums.UnwrapMode.PROCESSED,
destroy_action=enums.DestroyAction.SHREDDED,
shredding_algorithm=enums.ShreddingAlgorithm.CRYPTOGRAPHIC,
rng_mode=enums.RNGMode.NON_SHARED_INSTANTIATION
)
self.assertTrue(a == b)
self.assertTrue(b == a)
def test_equal_on_not_equal_streaming_capability(self):
"""
Test that the equality operator returns False when comparing two
CapabilityInformation structures with different streaming capability
fields.
"""
a = objects.CapabilityInformation(streaming_capability=True)
b = objects.CapabilityInformation(streaming_capability=False)
self.assertFalse(a == b)
self.assertFalse(b == a)
def test_equal_on_not_equal_asynchronous_capability(self):
"""
Test that the equality operator returns False when comparing two
CapabilityInformation structures with different asynchronous
capability fields.
"""
a = objects.CapabilityInformation(asynchronous_capability=True)
b = objects.CapabilityInformation(asynchronous_capability=False)
self.assertFalse(a == b)
self.assertFalse(b == a)
def test_equal_on_not_equal_attestation_capability(self):
"""
Test that the equality operator returns False when comparing two
CapabilityInformation structures with different attestation capability
fields.
"""
a = objects.CapabilityInformation(attestation_capability=True)
b = objects.CapabilityInformation(attestation_capability=False)
self.assertFalse(a == b)
self.assertFalse(b == a)
def test_equal_on_not_equal_batch_undo_capability(self):
"""
Test that the equality operator returns False when comparing two
CapabilityInformation structures with different batch undo capability
fields.
"""
a = objects.CapabilityInformation(batch_undo_capability=True)
b = objects.CapabilityInformation(batch_undo_capability=False)
self.assertFalse(a == b)
self.assertFalse(b == a)
def test_equal_on_not_equal_batch_continue_capability(self):
"""
Test that the equality operator returns False when comparing two
CapabilityInformation structures with different batch continue
capability fields.
"""
a = objects.CapabilityInformation(batch_continue_capability=True)
b = objects.CapabilityInformation(batch_continue_capability=False)
self.assertFalse(a == b)
self.assertFalse(b == a)
def test_equal_on_not_equal_unwrap_mode(self):
"""
Test that the equality operator returns False when comparing two
CapabilityInformation structures with different unwrap mode fields.
"""
a = objects.CapabilityInformation(
unwrap_mode=enums.UnwrapMode.PROCESSED
)
b = objects.CapabilityInformation(
unwrap_mode=enums.UnwrapMode.NOT_PROCESSED
)
self.assertFalse(a == b)
self.assertFalse(b == a)
def test_equal_on_not_equal_destroy_action(self):
"""
Test that the equality operator returns False when comparing two
CapabilityInformation structures with different destroy action fields.
"""
a = objects.CapabilityInformation(
destroy_action=enums.DestroyAction.DELETED
)
b = objects.CapabilityInformation(
destroy_action=enums.DestroyAction.SHREDDED
)
self.assertFalse(a == b)
self.assertFalse(b == a)
def test_equal_on_not_equal_shredding_algorithm(self):
"""
Test that the equality operator returns False when comparing two
CapabilityInformation structures with different shredding algorithm
fields.
"""
a = objects.CapabilityInformation(
shredding_algorithm=enums.ShreddingAlgorithm.UNSPECIFIED
)
b = objects.CapabilityInformation(
shredding_algorithm=enums.ShreddingAlgorithm.UNSUPPORTED
)
self.assertFalse(a == b)
self.assertFalse(b == a)
def test_equal_on_not_equal_rng_mode(self):
"""
Test that the equality operator returns False when comparing two
CapabilityInformation structures with different RNG mode fields.
"""
a = objects.CapabilityInformation(
rng_mode=enums.RNGMode.SHARED_INSTANTIATION
)
b = objects.CapabilityInformation(
rng_mode=enums.RNGMode.NON_SHARED_INSTANTIATION
)
self.assertFalse(a == b)
self.assertFalse(b == a)
def test_equal_on_type_mismatch(self):
"""
Test that the equality operator returns False when comparing two
CapabilityInformation structures with different types.
"""
a = objects.CapabilityInformation()
b = "invalid"
self.assertFalse(a == b)
self.assertFalse(b == a)
def test_not_equal_on_equal(self):
"""
Test that the inequality operator returns False when comparing two
CapabilityInformation structures with the same data.
"""
a = objects.CapabilityInformation()
b = objects.CapabilityInformation()
self.assertFalse(a != b)
self.assertFalse(b != a)
a = objects.CapabilityInformation(
streaming_capability=False,
asynchronous_capability=True,
attestation_capability=True,
batch_undo_capability=False,
batch_continue_capability=True,
unwrap_mode=enums.UnwrapMode.PROCESSED,
destroy_action=enums.DestroyAction.SHREDDED,
shredding_algorithm=enums.ShreddingAlgorithm.CRYPTOGRAPHIC,
rng_mode=enums.RNGMode.NON_SHARED_INSTANTIATION
)
b = objects.CapabilityInformation(
streaming_capability=False,
asynchronous_capability=True,
attestation_capability=True,
batch_undo_capability=False,
batch_continue_capability=True,
unwrap_mode=enums.UnwrapMode.PROCESSED,
destroy_action=enums.DestroyAction.SHREDDED,
shredding_algorithm=enums.ShreddingAlgorithm.CRYPTOGRAPHIC,
rng_mode=enums.RNGMode.NON_SHARED_INSTANTIATION
)
self.assertFalse(a != b)
self.assertFalse(b != a)
def test_not_equal_on_not_equal_streaming_capability(self):
"""
Test that the inequality operator returns True when comparing two
CapabilityInformation structures with different streaming capability
fields.
"""
a = objects.CapabilityInformation(streaming_capability=True)
b = objects.CapabilityInformation(streaming_capability=False)
self.assertTrue(a != b)
self.assertTrue(b != a)
def test_not_equal_on_not_equal_asynchronous_capability(self):
"""
Test that the inequality operator returns True when comparing two
CapabilityInformation structures with different asynchronous
capability fields.
"""
a = objects.CapabilityInformation(asynchronous_capability=True)
b = objects.CapabilityInformation(asynchronous_capability=False)
self.assertTrue(a != b)
self.assertTrue(b != a)
def test_not_equal_on_not_equal_attestation_capability(self):
"""
Test that the inequality operator returns True when comparing two
CapabilityInformation structures with different attestation capability
fields.
"""
a = objects.CapabilityInformation(attestation_capability=True)
b = objects.CapabilityInformation(attestation_capability=False)
self.assertTrue(a != b)
self.assertTrue(b != a)
def test_not_equal_on_not_equal_batch_undo_capability(self):
"""
Test that the inequality operator returns True when comparing two
CapabilityInformation structures with different batch undo capability
fields.
"""
a = objects.CapabilityInformation(batch_undo_capability=True)
b = objects.CapabilityInformation(batch_undo_capability=False)
self.assertTrue(a != b)
self.assertTrue(b != a)
def test_not_equal_on_not_equal_batch_continue_capability(self):
"""
Test that the inequality operator returns True when comparing two
CapabilityInformation structures with different batch continue
capability fields.
"""
a = objects.CapabilityInformation(batch_continue_capability=True)
b = objects.CapabilityInformation(batch_continue_capability=False)
self.assertTrue(a != b)
self.assertTrue(b != a)
def test_not_equal_on_not_equal_unwrap_mode(self):
"""
Test that the inequality operator returns True when comparing two
CapabilityInformation structures with different unwrap mode fields.
"""
a = objects.CapabilityInformation(
unwrap_mode=enums.UnwrapMode.PROCESSED
)
b = objects.CapabilityInformation(
unwrap_mode=enums.UnwrapMode.NOT_PROCESSED
)
self.assertTrue(a != b)
self.assertTrue(b != a)
def test_not_equal_on_not_equal_destroy_action(self):
"""
Test that the inequality operator returns True when comparing two
CapabilityInformation structures with different destroy action fields.
"""
a = objects.CapabilityInformation(
destroy_action=enums.DestroyAction.DELETED
)
b = objects.CapabilityInformation(
destroy_action=enums.DestroyAction.SHREDDED
)
self.assertTrue(a != b)
self.assertTrue(b != a)
def test_not_equal_on_not_equal_shredding_algorithm(self):
"""
Test that the inequality operator returns True when comparing two
CapabilityInformation structures with different shredding algorithm
fields.
"""
a = objects.CapabilityInformation(
shredding_algorithm=enums.ShreddingAlgorithm.UNSPECIFIED
)
b = objects.CapabilityInformation(
shredding_algorithm=enums.ShreddingAlgorithm.UNSUPPORTED
)
self.assertTrue(a != b)
self.assertTrue(b != a)
def test_not_equal_on_not_equal_rng_mode(self):
"""
Test that the inequality operator returns True when comparing two
CapabilityInformation structures with different RNG mode fields.
"""
a = objects.CapabilityInformation(
rng_mode=enums.RNGMode.SHARED_INSTANTIATION
)
b = objects.CapabilityInformation(
rng_mode=enums.RNGMode.NON_SHARED_INSTANTIATION
)
self.assertTrue(a != b)
self.assertTrue(b != a)
def test_not_equal_on_type_mismatch(self):
"""
Test that the inequality operator returns True when comparing two
CapabilityInformation structures with different types.
"""
a = objects.CapabilityInformation()
b = "invalid"
self.assertTrue(a != b)
self.assertTrue(b != a)