Add support for the CurrentAttribute structure

This change adds support for the CurrentAttribute structure added
in KMIP 2.0. The CurrentAttribute structure is a basic container
structure that contains a single attribute instance for use by
attribute operations. A new unit test suite has been added to cover
the new additions.

Partially implements #547
This commit is contained in:
Peter Hamilton 2019-10-31 17:50:58 -04:00 committed by Peter Hamilton
parent cd1079afd5
commit 616e683370
2 changed files with 580 additions and 0 deletions

View File

@ -175,6 +175,187 @@ class Attribute(Struct):
return NotImplemented
class CurrentAttribute(primitives.Struct):
"""
A structure containing a single attribute.
This is intended for use with KMIP 2.0+.
Attributes:
attribute: An attribute instance.
"""
def __init__(self, attribute=None):
"""
Construct a CurrentAttribute structure.
Args:
attribute (struct): An attribute structure of varying type.
Defaults to None. Required for read/write.
"""
super(CurrentAttribute, self).__init__(
tag=enums.Tags.CURRENT_ATTRIBUTE
)
self._factory = AttributeValueFactory()
self._attribute = None
self.attribute = attribute
@property
def attribute(self):
if self._attribute:
return self._attribute
return None
@attribute.setter
def attribute(self, value):
if value is None:
self._attribute = None
elif isinstance(value, primitives.Base):
if enums.is_attribute(value.tag):
self._attribute = value
else:
raise TypeError(
"The attribute must be a supported attribute type."
)
else:
raise TypeError(
"The attribute must be a Base object, not a {}.".format(
type(value)
)
)
def read(self, input_buffer, kmip_version=enums.KMIPVersion.KMIP_2_0):
"""
Read the data stream and decode the CurrentAttribute structure into
its parts.
Args:
input_buffer (stream): A data stream containing encoded object
data, supporting a read method.
kmip_version (enum): A KMIPVersion enumeration defining the KMIP
version with which the object will be decoded. Optional,
defaults to KMIP 2.0.
Raises:
AttributeNotSupported: Raised when an invalid value is decoded as
the attribute from the encoding.
InvalidKmipEncoding: Raised if the attribute is missing from the
encoding.
VersionNotSupported: Raised when a KMIP version is provided that
does not support the CurrentAttribute structure.
"""
if kmip_version < enums.KMIPVersion.KMIP_2_0:
raise exceptions.VersionNotSupported(
"KMIP {} does not support the CurrentAttribute object.".format(
kmip_version.value
)
)
super(CurrentAttribute, self).read(
input_buffer,
kmip_version=kmip_version
)
local_buffer = BytearrayStream(input_buffer.read(self.length))
if len(local_buffer) < 3:
raise exceptions.InvalidKmipEncoding(
"The CurrentAttribute encoding is missing the attribute field."
)
tag = struct.unpack('!I', b'\x00' + local_buffer.peek(3))[0]
if enums.is_enum_value(enums.Tags, tag):
tag = enums.Tags(tag)
if enums.is_attribute(tag, kmip_version=kmip_version):
value = self._factory.create_attribute_value_by_enum(tag, None)
value.read(local_buffer, kmip_version=kmip_version)
self._attribute = value
else:
raise exceptions.AttributeNotSupported(
"Attribute {} is not supported by KMIP {}.".format(
tag.name,
kmip_version.value
)
)
else:
raise exceptions.InvalidKmipEncoding(
"The CurrentAttribute encoding is missing the attribute field."
)
self.is_oversized(local_buffer)
def write(self, output_buffer, kmip_version=enums.KMIPVersion.KMIP_2_0):
"""
Write the CurrentAttribute structure encoding to the data stream.
Args:
output_buffer (stream): A data stream in which to encode
CurrentAttribute structure data, supporting a write method.
kmip_version (enum): A KMIPVersion enumeration defining the KMIP
version with which the object will be encoded. Optional,
defaults to KMIP 2.0.
Raises:
AttributeNotSupported: Raised if an unsupported attribute is
found while encoding.
InvalidField: Raised when the attribute is unspecified at write
time.
VersionNotSupported: Raised when a KMIP version is provided that
does not support the CurrentAttribute object.
"""
if kmip_version < enums.KMIPVersion.KMIP_2_0:
raise exceptions.VersionNotSupported(
"KMIP {} does not support the CurrentAttribute object.".format(
kmip_version.value
)
)
local_buffer = BytearrayStream()
if self._attribute:
tag = self._attribute.tag
if not enums.is_attribute(tag, kmip_version=kmip_version):
raise exceptions.AttributeNotSupported(
"Attribute {} is not supported by KMIP {}.".format(
tag.name,
kmip_version.value
)
)
self._attribute.write(local_buffer, kmip_version=kmip_version)
else:
raise exceptions.InvalidField(
"The CurrentAttribute object is missing the attribute field."
)
self.length = local_buffer.length()
super(CurrentAttribute, self).write(
output_buffer,
kmip_version=kmip_version
)
output_buffer.write(local_buffer.buffer)
def __repr__(self):
return "CurrentAttribute(attribute={})".format(repr(self.attribute))
def __str__(self):
value = '"attribute": {}'.format(repr(self.attribute))
return '{' + value + '}'
def __eq__(self, other):
if not isinstance(other, CurrentAttribute):
return NotImplemented
elif self.attribute != other.attribute:
return False
return True
def __ne__(self, other):
if isinstance(other, CurrentAttribute):
return not (self == other)
else:
return NotImplemented
class AttributeReference(primitives.Struct):
"""
A structure containing reference information for an attribute.

View File

@ -0,0 +1,399 @@
# Copyright (c) 2019 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from kmip.core import enums
from kmip.core import exceptions
from kmip.core import objects
from kmip.core import primitives
from kmip.core import utils
class TestCurrentAttribute(testtools.TestCase):
"""
A unit test suite for the CurrentAttribute structure.
"""
def setUp(self):
super(TestCurrentAttribute, self).setUp()
# This encoding matches the following set of values:
# CurrentAttribute
# Cryptographic Algorithm - AES
self.full_encoding = utils.BytearrayStream(
b'\x42\x01\x3C\x01\x00\x00\x00\x10'
b'\x42\x00\x28\x05\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00'
)
self.empty_encoding = utils.BytearrayStream(
b'\x42\x01\x3C\x01\x00\x00\x00\x00'
)
# This encoding matches the following set of values:
# CurrentAttribute
# Non-existent Tag
self.invalid_encoding = utils.BytearrayStream(
b'\x42\x01\x3C\x01\x00\x00\x00\x10'
b'\x42\xFF\xFF\x05\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00'
)
# This encoding matches the following set of values:
# CurrentAttribute
# Operation Policy Name - b4faee10-aa2a-4446-8ad4-0881f3422959
self.unsupported_encoding = utils.BytearrayStream(
b'\x42\x01\x3C\x01\x00\x00\x00\x30'
b'\x42\x00\x5D\x07\x00\x00\x00\x24\x62\x34\x66\x61\x65\x65\x31\x30'
b'\x2D\x61\x61\x32\x61\x2D\x34\x34\x34\x36\x2D\x38\x61\x64\x34\x2D'
b'\x30\x38\x38\x31\x66\x33\x34\x32\x32\x39\x35\x39\x00\x00\x00\x00'
)
def tearDown(self):
super(TestCurrentAttribute, self).tearDown()
def test_unrecognized_attribute(self):
"""
Test that a TypeError is raised when an unrecognized attribute is used
to create a CurrentAttribute object. Note that this unrecognized
attribute is a valid PyKMIP object derived from Base, it just isn't an
attribute.
"""
kwargs = {
"attribute": primitives.Enumeration(
enums.WrappingMethod,
enums.WrappingMethod.ENCRYPT,
enums.Tags.WRAPPING_METHOD
)
}
self.assertRaisesRegex(
TypeError,
"The attribute must be a supported attribute type.",
objects.CurrentAttribute,
**kwargs
)
args = (
objects.CurrentAttribute(),
"attribute",
primitives.Enumeration(
enums.WrappingMethod,
enums.WrappingMethod.ENCRYPT,
enums.Tags.WRAPPING_METHOD
)
)
self.assertRaisesRegex(
TypeError,
"The attribute must be a supported attribute type.",
setattr,
*args
)
def test_invalid_attribute(self):
"""
Test that a TypeError is raised when an invalid value is used to
create a CurrentAttribute object. Note that the value is not a valid
PyKMIP object derived from Base and therefore cannot be an attribute.
"""
kwargs = {"attribute": "invalid"}
self.assertRaisesRegex(
TypeError,
"The attribute must be a Base object, not a {}.".format(
type("invalid")
),
objects.CurrentAttribute,
**kwargs
)
args = (
objects.CurrentAttribute(),
"attribute",
"invalid"
)
self.assertRaisesRegex(
TypeError,
"The attribute must be a Base object, not a {}.".format(
type("invalid")
),
setattr,
*args
)
def test_read(self):
"""
Test that a CurrentAttribute structure can be correctly read in from
a data stream.
"""
current_attribute = objects.CurrentAttribute()
self.assertIsNone(current_attribute.attribute)
current_attribute.read(self.full_encoding)
self.assertIsInstance(
current_attribute.attribute,
primitives.Enumeration
)
self.assertEqual(
current_attribute.attribute.value,
enums.CryptographicAlgorithm.AES
)
def test_read_no_attribute(self):
"""
Test that an InvalidKmipEncoding error is raised when an invalid
encoding with no encoded attribute is used to decode a CurrentAttribute
object.
"""
current_attribute = objects.CurrentAttribute()
args = (self.empty_encoding, )
self.assertRaisesRegex(
exceptions.InvalidKmipEncoding,
"The CurrentAttribute encoding is missing the attribute field.",
current_attribute.read,
*args
)
def test_read_invalid_attribute(self):
"""
Test that an InvalidKmipEncoding error is raised when an invalid
encoding containing an invalid attribute is used to decode a
CurrentAttribute object.
"""
current_attribute = objects.CurrentAttribute()
args = (self.invalid_encoding, )
self.assertRaisesRegex(
exceptions.InvalidKmipEncoding,
"The CurrentAttribute encoding is missing the attribute field.",
current_attribute.read,
*args
)
def test_read_unsupported_attribute(self):
"""
Test that an AttributeNotSupported error is raised when an unsupported
attribute is parsed while reading in a CurrentAttribute object from a
data stream. This can occur when an older attribute is no longer
supported by a newer version of KMIP, or vice versa.
"""
current_attribute = objects.CurrentAttribute()
args = (self.unsupported_encoding, )
kwargs = {"kmip_version": enums.KMIPVersion.KMIP_2_0}
self.assertRaisesRegex(
exceptions.AttributeNotSupported,
"Attribute OPERATION_POLICY_NAME is not supported by KMIP 2.0.",
current_attribute.read,
*args,
**kwargs
)
def test_read_version_not_supported(self):
"""
Test that a VersionNotSupported error is raised when an unsupported
KMIP version is provided while reading in a CurrentAttribute object
from a data stream. The CurrentAttribute structure is only supported
in KMIP 2.0+.
"""
current_attribute = objects.CurrentAttribute()
args = (self.full_encoding, )
kwargs = {"kmip_version": enums.KMIPVersion.KMIP_1_2}
self.assertRaisesRegex(
exceptions.VersionNotSupported,
"KMIP 1.2 does not support the CurrentAttribute object.",
current_attribute.read,
*args,
**kwargs
)
def test_write(self):
"""
Test that a CurrentAttribute object can be written to a data stream.
"""
current_attribute = objects.CurrentAttribute(
attribute=primitives.Enumeration(
enums.CryptographicAlgorithm,
enums.CryptographicAlgorithm.AES,
enums.Tags.CRYPTOGRAPHIC_ALGORITHM
)
)
stream = utils.BytearrayStream()
current_attribute.write(stream)
self.assertEqual(len(self.full_encoding), len(stream))
self.assertEqual(str(self.full_encoding), str(stream))
def test_write_no_attribute(self):
"""
Test that an InvalidField error is raised when an empty
CurrentAttribute object is written to a data stream.
"""
current_attribute = objects.CurrentAttribute()
args = (utils.BytearrayStream(), )
self.assertRaisesRegex(
exceptions.InvalidField,
"The CurrentAttribute object is missing the attribute field.",
current_attribute.write,
*args
)
def test_write_unsupported_attribute(self):
"""
Test that an AttributeNotSupported error is raised when an unsupported
attribute is found while writing a CurrentAttribute object to a data
stream. This can occur when an older attribute is no longer supported
by a newer version of KMIP, or vice versa.
"""
current_attribute = objects.CurrentAttribute(
attribute=primitives.TextString(
"default",
tag=enums.Tags.OPERATION_POLICY_NAME
)
)
args = (utils.BytearrayStream(), )
kwargs = {"kmip_version": enums.KMIPVersion.KMIP_2_0}
self.assertRaisesRegex(
exceptions.AttributeNotSupported,
"Attribute OPERATION_POLICY_NAME is not supported by KMIP 2.0.",
current_attribute.write,
*args,
**kwargs
)
def test_write_version_not_supported(self):
"""
Test that a VersionNotSupported error is raised when an unsupported
KMIP version is provided while writing a CurrentAttribute object to
a data stream. The CurrentAttribute structure is only supported in
KMIP 2.0+.
"""
current_attribute = objects.CurrentAttribute()
args = (utils.BytearrayStream(), )
kwargs = {"kmip_version": enums.KMIPVersion.KMIP_1_4}
self.assertRaisesRegex(
exceptions.VersionNotSupported,
"KMIP 1.4 does not support the CurrentAttribute object.",
current_attribute.write,
*args,
**kwargs
)
def test_repr(self):
"""
Test that repr can be applied to a CurrentAttribute object.
"""
current_attribute = objects.CurrentAttribute(
attribute=primitives.Enumeration(
enums.CryptographicAlgorithm,
enums.CryptographicAlgorithm.AES,
enums.Tags.CRYPTOGRAPHIC_ALGORITHM
)
)
self.assertEqual(
"CurrentAttribute("
"attribute=Enumeration("
"enum=CryptographicAlgorithm, "
"value=CryptographicAlgorithm.AES, "
"tag=Tags.CRYPTOGRAPHIC_ALGORITHM))",
repr(current_attribute)
)
def test_str(self):
"""
Test that str can be applied to a CurrentAttribute object.
"""
current_attribute = objects.CurrentAttribute(
attribute=primitives.Enumeration(
enums.CryptographicAlgorithm,
enums.CryptographicAlgorithm.AES,
enums.Tags.CRYPTOGRAPHIC_ALGORITHM
)
)
self.assertEqual(
'{"attribute": Enumeration('
'enum=CryptographicAlgorithm, '
'value=CryptographicAlgorithm.AES, '
'tag=Tags.CRYPTOGRAPHIC_ALGORITHM)}',
str(current_attribute)
)
def test_comparison(self):
"""
Test that the equality/inequality operators return True/False when
comparing two CurrentAttribute objects with the same data.
"""
a = objects.CurrentAttribute()
b = objects.CurrentAttribute()
self.assertTrue(a == b)
self.assertTrue(b == a)
self.assertFalse(a != b)
self.assertFalse(b != a)
a = objects.CurrentAttribute(
attribute=primitives.Enumeration(
enums.CryptographicAlgorithm,
enums.CryptographicAlgorithm.AES,
enums.Tags.CRYPTOGRAPHIC_ALGORITHM
)
)
b = objects.CurrentAttribute(
attribute=primitives.Enumeration(
enums.CryptographicAlgorithm,
enums.CryptographicAlgorithm.AES,
enums.Tags.CRYPTOGRAPHIC_ALGORITHM
)
)
self.assertTrue(a == b)
self.assertTrue(b == a)
self.assertFalse(a != b)
self.assertFalse(b != a)
def test_comparison_on_different_attributes(self):
"""
Test that the equality/inequality operators return False/True when
comparing two CurrentAttribute objects with different attributes.
"""
a = objects.CurrentAttribute(
attribute=primitives.Enumeration(
enums.CryptographicAlgorithm,
enums.CryptographicAlgorithm.AES,
enums.Tags.CRYPTOGRAPHIC_ALGORITHM
)
)
b = objects.CurrentAttribute(
attribute=primitives.Integer(
128,
enums.Tags.CRYPTOGRAPHIC_LENGTH
)
)
self.assertFalse(a == b)
self.assertFalse(b == a)
self.assertTrue(a != b)
self.assertTrue(b != a)
def test_comparison_on_type_mismatch(self):
"""
Test that the equality/inequality operators return False/True when
comparing two CurrentAttribute objects with different types.
"""
a = objects.CurrentAttribute()
b = "invalid"
self.assertFalse(a == b)
self.assertFalse(b == a)
self.assertTrue(a != b)
self.assertTrue(b != a)