Add KMIP 2.0-style attribute handling

This change adds a new Attributes object to the object hierarchy,
which replaces TemplateAttributes in KMIP 2.0. The old attribute
components, like the AttributeName and AttributeIndex, are no
longer used and are instead replaced with the KMIP TTLV tag for
the attributes in question. This brings the attribute encoding
process in line with the rest of the KMIP specification.

To support this change, additional attribute and enumeration
utility functions have been added to simply attribute building
and attribute/enumeration validity checking. New test cases
covering this new functionality are also included.
This commit is contained in:
Peter Hamilton 2019-02-20 14:07:23 -05:00 committed by Peter Hamilton
parent e986488ebe
commit bc3e81b577
6 changed files with 1373 additions and 6 deletions

View File

@ -16,6 +16,7 @@
# In case of new content, remove the following line to enable flake8 tests
# flake8: noqa
import copy
import enum
@ -473,11 +474,12 @@ class KeyWrapType(enum.Enum):
class KMIPVersion(enum.Enum):
KMIP_1_0 = "KMIP 1.0"
KMIP_1_1 = "KMIP 1.1"
KMIP_1_2 = "KMIP 1.2"
KMIP_1_3 = "KMIP 1.3"
KMIP_1_4 = "KMIP 1.4"
KMIP_1_0 = 1.0
KMIP_1_1 = 1.1
KMIP_1_2 = 1.2
KMIP_1_3 = 1.3
KMIP_1_4 = 1.4
KMIP_2_0 = 2.0
class LinkType(enum.Enum):
@ -1304,7 +1306,7 @@ class Tags(enum.Enum):
ISSUER_DISTINGUISHED_NAME = 0x4200B2
SUBJECT_ALTERNATIVE_NAME = 0x4200B3
SUBJECT_DISTINGUISHED_NAME = 0x4200B4
X_509_CERTIFICATE_IDENTIFER = 0x4200B5
X_509_CERTIFICATE_IDENTIFIER = 0x4200B5
X_509_CERTIFICATE_ISSUER = 0x4200B6
X_509_CERTIFICATE_SUBJECT = 0x4200B7
KEY_VALUE_LOCATION = 0x4200B8
@ -1558,3 +1560,168 @@ class WrappingMethod(enum.Enum):
ENCRYPT_THEN_MAC_SIGN = 0x00000003
MAC_SIGN_THEN_ENCRYPT = 0x00000004
TR_31 = 0x00000005
def is_enum_value(enum_class, potential_value):
"""
A utility function that checks if the enumeration class contains the
provided value.
Args:
enum_class (class): One of the enumeration classes found in this file.
potential_value (int, string): A potential value of the enumeration
class.
Returns:
True: if the potential value is a valid value of the enumeration class
False: otherwise
"""
try:
enum_class(potential_value)
except ValueError:
return False
return True
def is_attribute(tag, kmip_version=None):
"""
A utility function that checks if the tag is a valid attribute tag.
Args:
tag (enum): A Tags enumeration that may or may not correspond to a
KMIP attribute type.
kmip_version (enum): The KMIPVersion enumeration that should be used
when checking if the tag is a valid attribute tag. Optional,
defaults to None. If None, the tag is compared with all possible
attribute tags across all KMIP versions. Otherwise, only the
attribute tags for a specific KMIP version are checked.
Returns:
True: if the tag is a valid attribute tag
False: otherwise
"""
kmip_1_0_attribute_tags = [
Tags.UNIQUE_IDENTIFIER,
Tags.NAME,
Tags.OBJECT_TYPE,
Tags.CRYPTOGRAPHIC_ALGORITHM,
Tags.CRYPTOGRAPHIC_LENGTH,
Tags.CRYPTOGRAPHIC_PARAMETERS,
Tags.CRYPTOGRAPHIC_DOMAIN_PARAMETERS,
Tags.CERTIFICATE_TYPE,
Tags.CERTIFICATE_IDENTIFIER,
Tags.CERTIFICATE_SUBJECT,
Tags.CERTIFICATE_ISSUER,
Tags.DIGEST,
Tags.OPERATION_POLICY_NAME,
Tags.CRYPTOGRAPHIC_USAGE_MASK,
Tags.LEASE_TIME,
Tags.USAGE_LIMITS,
Tags.STATE,
Tags.INITIAL_DATE,
Tags.ACTIVATION_DATE,
Tags.PROCESS_START_DATE,
Tags.PROTECT_STOP_DATE,
Tags.DEACTIVATION_DATE,
Tags.DESTROY_DATE,
Tags.COMPROMISE_OCCURRENCE_DATE,
Tags.COMPROMISE_DATE,
Tags.REVOCATION_REASON,
Tags.ARCHIVE_DATE,
Tags.OBJECT_GROUP,
Tags.LINK,
Tags.APPLICATION_SPECIFIC_INFORMATION,
Tags.CONTACT_INFORMATION,
Tags.LAST_CHANGE_DATE,
Tags.CUSTOM_ATTRIBUTE
]
kmip_1_1_attribute_tags = copy.deepcopy(kmip_1_0_attribute_tags) + [
Tags.CERTIFICATE_LENGTH,
Tags.X_509_CERTIFICATE_IDENTIFIER,
Tags.X_509_CERTIFICATE_SUBJECT,
Tags.X_509_CERTIFICATE_ISSUER,
Tags.DIGITAL_SIGNATURE_ALGORITHM,
Tags.FRESH
]
kmip_1_2_attribute_tags = copy.deepcopy(kmip_1_1_attribute_tags) + [
Tags.ALTERNATIVE_NAME,
Tags.KEY_VALUE_PRESENT,
Tags.KEY_VALUE_LOCATION,
Tags.ORIGINAL_CREATION_DATE
]
kmip_1_3_attribute_tags = copy.deepcopy(kmip_1_2_attribute_tags) + [
Tags.RANDOM_NUMBER_GENERATOR
]
kmip_1_4_attribute_tags = copy.deepcopy(kmip_1_3_attribute_tags) + [
Tags.PKCS12_FRIENDLY_NAME,
Tags.DESCRIPTION,
Tags.COMMENT,
Tags.SENSITIVE,
Tags.ALWAYS_SENSITIVE,
Tags.EXTRACTABLE,
Tags.NEVER_EXTRACTABLE
]
kmip_2_0_attribute_tags = copy.deepcopy(kmip_1_4_attribute_tags) + [
Tags.CERTIFICATE_SUBJECT_CN,
Tags.CERTIFICATE_SUBJECT_O,
Tags.CERTIFICATE_SUBJECT_OU,
Tags.CERTIFICATE_SUBJECT_EMAIL,
Tags.CERTIFICATE_SUBJECT_C,
Tags.CERTIFICATE_SUBJECT_ST,
Tags.CERTIFICATE_SUBJECT_L,
Tags.CERTIFICATE_SUBJECT_UID,
Tags.CERTIFICATE_SUBJECT_SERIAL_NUMBER,
Tags.CERTIFICATE_SUBJECT_TITLE,
Tags.CERTIFICATE_SUBJECT_DC,
Tags.CERTIFICATE_SUBJECT_DN_QUALIFIER,
Tags.CERTIFICATE_ISSUER_CN,
Tags.CERTIFICATE_ISSUER_O,
Tags.CERTIFICATE_ISSUER_OU,
Tags.CERTIFICATE_ISSUER_EMAIL,
Tags.CERTIFICATE_ISSUER_C,
Tags.CERTIFICATE_ISSUER_ST,
Tags.CERTIFICATE_ISSUER_L,
Tags.CERTIFICATE_ISSUER_UID,
Tags.CERTIFICATE_ISSUER_SERIAL_NUMBER,
Tags.CERTIFICATE_ISSUER_TITLE,
Tags.CERTIFICATE_ISSUER_DC,
Tags.CERTIFICATE_ISSUER_DN_QUALIFIER,
Tags.KEY_FORMAT_TYPE,
Tags.NIST_KEY_TYPE,
Tags.OPAQUE_DATA_TYPE,
Tags.PROTECTION_LEVEL,
Tags.PROTECTION_PERIOD,
Tags.PROTECTION_STORAGE_MASK,
Tags.QUANTUM_SAFE,
Tags.SHORT_UNIQUE_IDENTIFIER,
Tags.ATTRIBUTE
]
kmip_2_0_attribute_tags.remove(Tags.CERTIFICATE_IDENTIFIER)
kmip_2_0_attribute_tags.remove(Tags.CERTIFICATE_SUBJECT)
kmip_2_0_attribute_tags.remove(Tags.CERTIFICATE_ISSUER)
kmip_2_0_attribute_tags.remove(Tags.OPERATION_POLICY_NAME)
kmip_2_0_attribute_tags.remove(Tags.CUSTOM_ATTRIBUTE)
if kmip_version == KMIPVersion.KMIP_1_0:
return tag in kmip_1_0_attribute_tags
elif kmip_version == KMIPVersion.KMIP_1_1:
return tag in kmip_1_1_attribute_tags
elif kmip_version == KMIPVersion.KMIP_1_2:
return tag in kmip_1_2_attribute_tags
elif kmip_version == KMIPVersion.KMIP_1_3:
return tag in kmip_1_3_attribute_tags
elif kmip_version == KMIPVersion.KMIP_1_4:
return tag in kmip_1_4_attribute_tags
elif kmip_version == KMIPVersion.KMIP_2_0:
return tag in kmip_2_0_attribute_tags
else:
all_attribute_tags = set(
kmip_1_0_attribute_tags +
kmip_1_1_attribute_tags +
kmip_1_2_attribute_tags +
kmip_1_3_attribute_tags +
kmip_1_4_attribute_tags +
kmip_2_0_attribute_tags
)
return tag in all_attribute_tags

View File

@ -264,6 +264,13 @@ class PermissionDenied(KmipError):
)
class AttributeNotSupported(Exception):
"""
An error generated when an unsupported attribute is processed.
"""
pass
class ConfigurationError(Exception):
"""
An error generated when a problem occurs with a client or server

View File

@ -110,6 +110,90 @@ class AttributeValueFactory(object):
# Custom attribute indicated
return attributes.CustomAttribute(value)
def create_attribute_value_by_enum(self, enum, value):
# Switch on the name of the attribute
if enum is enums.Tags.UNIQUE_IDENTIFIER:
return attributes.UniqueIdentifier(value)
elif enum is enums.Tags.NAME:
return self._create_name(value)
elif enum is enums.Tags.OBJECT_TYPE:
return attributes.ObjectType(value)
elif enum is enums.Tags.CRYPTOGRAPHIC_ALGORITHM:
return attributes.CryptographicAlgorithm(value)
elif enum is enums.Tags.CRYPTOGRAPHIC_LENGTH:
return self._create_cryptographic_length(value)
elif enum is enums.Tags.CRYPTOGRAPHIC_PARAMETERS:
return self._create_cryptographic_parameters(value)
elif enum is enums.Tags.CRYPTOGRAPHIC_DOMAIN_PARAMETERS:
raise NotImplementedError()
elif enum is enums.Tags.CERTIFICATE_TYPE:
raise NotImplementedError()
elif enum is enums.Tags.CERTIFICATE_LENGTH:
return primitives.Integer(value, enums.Tags.CERTIFICATE_LENGTH)
elif enum is enums.Tags.X_509_CERTIFICATE_IDENTIFIER:
raise NotImplementedError()
elif enum is enums.Tags.X_509_CERTIFICATE_SUBJECT:
raise NotImplementedError()
elif enum is enums.Tags.X_509_CERTIFICATE_ISSUER:
raise NotImplementedError()
elif enum is enums.Tags.CERTIFICATE_IDENTIFIER:
raise NotImplementedError()
elif enum is enums.Tags.CERTIFICATE_SUBJECT:
raise NotImplementedError()
elif enum is enums.Tags.CERTIFICATE_ISSUER:
raise NotImplementedError()
elif enum is enums.Tags.DIGITAL_SIGNATURE_ALGORITHM:
raise NotImplementedError()
elif enum is enums.Tags.DIGEST:
return attributes.Digest()
elif enum is enums.Tags.OPERATION_POLICY_NAME:
return attributes.OperationPolicyName(value)
elif enum is enums.Tags.CRYPTOGRAPHIC_USAGE_MASK:
return self._create_cryptographic_usage_mask(value)
elif enum is enums.Tags.LEASE_TIME:
return primitives.Interval(value, enums.Tags.LEASE_TIME)
elif enum is enums.Tags.USAGE_LIMITS:
raise NotImplementedError()
elif enum is enums.Tags.STATE:
return attributes.State(value)
elif enum is enums.Tags.INITIAL_DATE:
return primitives.DateTime(value, enums.Tags.INITIAL_DATE)
elif enum is enums.Tags.ACTIVATION_DATE:
return primitives.DateTime(value, enums.Tags.ACTIVATION_DATE)
elif enum is enums.Tags.PROCESS_START_DATE:
return primitives.DateTime(value, enums.Tags.PROCESS_START_DATE)
elif enum is enums.Tags.PROTECT_STOP_DATE:
return primitives.DateTime(value, enums.Tags.PROTECT_STOP_DATE)
elif enum is enums.Tags.DEACTIVATION_DATE:
return primitives.DateTime(value, enums.Tags.DEACTIVATION_DATE)
elif enum is enums.Tags.DESTROY_DATE:
return primitives.DateTime(value, enums.Tags.DESTROY_DATE)
elif enum is enums.Tags.COMPROMISE_OCCURRENCE_DATE:
return primitives.DateTime(
value, enums.Tags.COMPROMISE_OCCURRENCE_DATE)
elif enum is enums.Tags.COMPROMISE_DATE:
return primitives.DateTime(value, enums.Tags.COMPROMISE_DATE)
elif enum is enums.Tags.REVOCATION_REASON:
raise NotImplementedError()
elif enum is enums.Tags.ARCHIVE_DATE:
return primitives.DateTime(value, enums.Tags.ARCHIVE_DATE)
elif enum is enums.Tags.OBJECT_GROUP:
return self._create_object_group(value)
elif enum is enums.Tags.FRESH:
return primitives.Boolean(value, enums.Tags.FRESH)
elif enum is enums.Tags.LINK:
raise NotImplementedError()
elif enum is enums.Tags.APPLICATION_SPECIFIC_INFORMATION:
return self._create_application_specific_information(value)
elif enum is enums.Tags.CONTACT_INFORMATION:
return self._create_contact_information(value)
elif enum is enums.Tags.LAST_CHANGE_DATE:
return primitives.DateTime(value, enums.Tags.LAST_CHANGE_DATE)
elif enum is enums.Tags.CUSTOM_ATTRIBUTE:
return attributes.CustomAttribute(value)
else:
raise ValueError("Unrecognized attribute type: {}".format(enum))
def _create_name(self, name):
if name is not None:
if isinstance(name, attributes.Name):

View File

@ -16,6 +16,7 @@
import abc
import six
from six.moves import xrange
import struct
from kmip.core import attributes
from kmip.core.attributes import CryptographicParameters
@ -174,6 +175,183 @@ class Attribute(Struct):
return NotImplemented
class Attributes(primitives.Struct):
"""
A collection of KMIP attributes.
This is intended for use with KMIP 2.0+ and replaces the old
TemplateAttribute-style used for older KMIP versions.
Attributes:
attributes: A list of attribute objects.
tag: A Tags enumeration specifying what type of Attributes structure
is in use. Valid values include:
* Tags.ATTRIBUTES
* Tags.COMMON_ATTRIBUTES
* Tags.PRIVATE_KEY_ATTRIBUTES
* Tags.PUBLIC_KEY_ATTRIBUTES
"""
def __init__(self, attributes=None, tag=enums.Tags.ATTRIBUTES):
"""
Construct an Attributes structure.
Args:
attributes (list): A list of attribute objects. Each object must
be some form of primitive, derived from Base. Optional,
defaults to None which is interpreted as an empty list.
tag (enum): A Tags enumeration specifying what type of Attributes
structure is in use. Valid values include:
* Tags.ATTRIBUTES
* Tags.COMMON_ATTRIBUTES
* Tags.PRIVATE_KEY_ATTRIBUTES
* Tags.PUBLIC_KEY_ATTRIBUTES
Optional, defaults to Tags.ATTRIBUTES.
"""
super(Attributes, self).__init__(tag=tag)
self._factory = AttributeValueFactory()
self._attributes = []
self.attributes = attributes
@property
def attributes(self):
return self._attributes
@attributes.setter
def attributes(self, value):
if (value is None) or (value == []):
self._attributes = []
elif isinstance(value, list):
for i, attribute in enumerate(value):
if isinstance(attribute, primitives.Base):
if not enums.is_attribute(attribute.tag):
raise TypeError(
"Item {} must be a supported attribute.".format(
i + 1
)
)
else:
raise TypeError(
"Item {} must be a Base object, not a {}.".format(
i + 1,
type(attribute)
)
)
self._attributes = value
else:
raise TypeError("Attributes must be a list of Base objects.")
def read(self, input_stream, kmip_version=enums.KMIPVersion.KMIP_2_0):
"""
Read the data stream and decode the Attributes structure into its
parts.
Args:
input_stream (stream): A data stream containing encoded object
data, supporting a read method.
kmip_version (enum): A KMIPVersion enumeration defining the KMIP
version with which the object will be decoded. Optional,
defaults to KMIP 2.0.
Raises:
AttributeNotSupported: Raised if an unsupported attribute is
encountered while decoding.
"""
super(Attributes, self).read(input_stream, kmip_version=kmip_version)
local_stream = BytearrayStream(input_stream.read(self.length))
while True:
if len(local_stream) < 3:
break
tag = struct.unpack('!I', b'\x00' + local_stream.peek(3))[0]
if enums.is_enum_value(enums.Tags, tag):
tag = enums.Tags(tag)
if not enums.is_attribute(tag, kmip_version=kmip_version):
raise exceptions.AttributeNotSupported(
"Attribute {} is not supported by KMIP {}.".format(
tag.name,
kmip_version.value
)
)
value = self._factory.create_attribute_value_by_enum(tag, None)
value.read(local_stream, kmip_version=kmip_version)
self._attributes.append(value)
else:
break
self.is_oversized(local_stream)
def write(self, output_stream, kmip_version=enums.KMIPVersion.KMIP_2_0):
"""
Write the Attributes structure encoding to the data stream.
Args:
output_stream (stream): A data stream in which to encode
Attributes structure data, supporting a write method.
kmip_version (enum): A KMIPVersion enumeration defining the KMIP
version with which the object will be encoded. Optional,
defaults to KMIP 2.0.
Raises:
AttributeNotSupported: Raised if an unsupported attribute is
found in the attribute list while encoding.
"""
local_stream = BytearrayStream()
for attribute in self._attributes:
tag = attribute.tag
if not enums.is_attribute(tag, kmip_version=kmip_version):
raise exceptions.AttributeNotSupported(
"Attribute {} is not supported by KMIP {}.".format(
tag.name,
kmip_version.value
)
)
attribute.write(local_stream, kmip_version=kmip_version)
self.length = local_stream.length()
super(Attributes, self).write(output_stream, kmip_version=kmip_version)
output_stream.write(local_stream.buffer)
def __repr__(self):
values = ", ".join([repr(x) for x in self.attributes])
return "Attributes(attributes=[{}], tag={})".format(
values,
self.tag
)
def __str__(self):
values = ", ".join([str(x) for x in self.attributes])
value = '"attributes": [{}]'.format(values)
return '{' + value + '}'
def __eq__(self, other):
if not isinstance(other, Attributes):
return NotImplemented
if len(self.attributes) != len(other.attributes):
return False
# TODO (ph) Allow order independence?
for i in six.moves.range(len(self.attributes)):
a = self.attributes[i]
b = other.attributes[i]
if a != b:
return False
return True
def __ne__(self, other):
if isinstance(other, Attributes):
return not (self == other)
else:
return NotImplemented
class Nonce(primitives.Struct):
"""
A struct representing a Nonce object.

View File

@ -26,6 +26,8 @@ from kmip.core.enums import KeyRoleType
from kmip.core.enums import PaddingMethod
from kmip.core.enums import Tags
from kmip.core import exceptions
from kmip.core.factories.attributes import AttributeValueFactory
from kmip.core import objects
@ -35,6 +37,8 @@ from kmip.core.objects import ExtensionTag
from kmip.core.objects import ExtensionType
from kmip.core.objects import KeyMaterialStruct
from kmip.core import primitives
from kmip.core import utils
from kmip.core.utils import BytearrayStream
@ -132,6 +136,609 @@ class TestAttributeClass(TestCase):
self.assertTrue(self.attributeObj_a != self.attributeObj_b)
class TestAttributes(TestCase):
def setUp(self):
super(TestAttributes, self).setUp()
# This encoding matches the following set of values:
# Attributes
# Cryptographic Algorithm - AES
# Cryptographic Length - 128
self.full_encoding = utils.BytearrayStream(
b'\x42\x01\x25\x01\x00\x00\x00\x20'
b'\x42\x00\x28\x05\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00'
b'\x42\x00\x2A\x02\x00\x00\x00\x04\x00\x00\x00\x80\x00\x00\x00\x00'
)
self.empty_encoding = utils.BytearrayStream(
b'\x42\x01\x25\x01\x00\x00\x00\x00'
)
# This encoding matches the following set of values:
# Attributes
# Cryptographic Algorithm - AES
# Non-existent Tag
self.invalid_encoding = utils.BytearrayStream(
b'\x42\x01\x25\x01\x00\x00\x00\x20'
b'\x42\x00\x28\x05\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00'
b'\x42\xFF\xFF\x05\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00'
)
# This encoding matches the following set of values:
# Attributes
# Operation Policy Name - b4faee10-aa2a-4446-8ad4-0881f3422959
self.unsupported_encoding = utils.BytearrayStream(
b'\x42\x01\x25\x01\x00\x00\x00\x30'
b'\x42\x00\x5D\x07\x00\x00\x00\x24\x62\x34\x66\x61\x65\x65\x31\x30'
b'\x2D\x61\x61\x32\x61\x2D\x34\x34\x34\x36\x2D\x38\x61\x64\x34\x2D'
b'\x30\x38\x38\x31\x66\x33\x34\x32\x32\x39\x35\x39\x00\x00\x00\x00'
)
# This encoding matches the following set of values:
# Private Key Attributes
# Cryptographic Algorithm - AES
# Cryptographic Length - 128
self.alt_encoding = utils.BytearrayStream(
b'\x42\x01\x27\x01\x00\x00\x00\x20'
b'\x42\x00\x28\x05\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00'
b'\x42\x00\x2A\x02\x00\x00\x00\x04\x00\x00\x00\x80\x00\x00\x00\x00'
)
def tearDown(self):
super(TestAttributes, self).tearDown()
def test_unrecognized_attributes(self):
"""
Test that a TypeError is raised when an unrecognized attribute is
included in the attribute list. Note that this unrecognized attribute
is a valid PyKMIP object derived from Base, it just isn't an attribute.
"""
kwargs = {
'attributes': [
primitives.Enumeration(
enums.WrappingMethod,
enums.WrappingMethod.ENCRYPT,
enums.Tags.WRAPPING_METHOD
)
]
}
self.assertRaisesRegex(
TypeError,
"Item 1 must be a supported attribute.",
objects.Attributes,
**kwargs
)
attrs = objects.Attributes()
args = (
attrs,
'attributes',
[
primitives.Enumeration(
enums.CryptographicAlgorithm,
enums.CryptographicAlgorithm.AES,
enums.Tags.CRYPTOGRAPHIC_ALGORITHM
),
primitives.Enumeration(
enums.WrappingMethod,
enums.WrappingMethod.ENCRYPT,
enums.Tags.WRAPPING_METHOD
)
]
)
self.assertRaisesRegex(
TypeError,
"Item 2 must be a supported attribute.",
setattr,
*args
)
def test_invalid_attributes(self):
"""
Test that a TypeError is raised when an invalid value is included
in the attribute list. Note that the value is not a valid PyKMIP
object derived from Base and therefore cannot be an attribute.
"""
kwargs = {
'attributes': [0]
}
self.assertRaisesRegex(
TypeError,
"Item 1 must be a Base object, not a {}.".format(type(0)),
objects.Attributes,
**kwargs
)
attrs = objects.Attributes()
args = (
attrs,
'attributes',
[
primitives.Enumeration(
enums.CryptographicAlgorithm,
enums.CryptographicAlgorithm.AES,
enums.Tags.CRYPTOGRAPHIC_ALGORITHM
),
primitives.Enumeration(
enums.KeyFormatType,
enums.KeyFormatType.RAW,
enums.Tags.KEY_FORMAT_TYPE
),
1
]
)
self.assertRaisesRegex(
TypeError,
"Item 3 must be a Base object, not a {}.".format(type(0)),
setattr,
*args
)
def test_invalid_attributes_list(self):
"""
Test that a TypeError is raised when an invalid attribute list is
used with the Attributes structure.
"""
kwargs = {
'attributes': 'invalid'
}
self.assertRaisesRegex(
TypeError,
"Attributes must be a list of Base objects.",
objects.Attributes,
**kwargs
)
attrs = objects.Attributes()
args = (
attrs,
'attributes',
'invalid'
)
self.assertRaisesRegex(
TypeError,
"Attributes must be a list of Base objects.",
setattr,
*args
)
def test_read(self):
"""
Test that an Attributes structure can be correctly read in from a data
stream.
"""
attrs = objects.Attributes()
self.assertEqual([], attrs.attributes)
attrs.read(self.full_encoding)
self.assertEqual(2, len(attrs.attributes))
attr_1 = attrs.attributes[0]
self.assertIsInstance(attr_1, primitives.Enumeration)
self.assertEqual(enums.CryptographicAlgorithm.AES, attr_1.value)
attr_2 = attrs.attributes[1]
self.assertIsInstance(attr_2, primitives.Integer)
self.assertEqual(128, attr_2.value)
def test_read_no_attributes(self):
"""
Test that an empty Attributes structure can be correctly read in from
a data stream.
"""
attrs = objects.Attributes()
self.assertEqual([], attrs.attributes)
attrs.read(self.empty_encoding)
self.assertEqual([], attrs.attributes)
def test_read_invalid_attribute(self):
"""
Test that an unrecognized tag is correctly handled when reading in an
Attributes structure from a data stream. Specifically, structure
parsing should stop and an error should be raised indicating that more
encoding data is available but could not be parsed.
"""
attrs = objects.Attributes()
self.assertEqual([], attrs.attributes)
args = (self.invalid_encoding, )
self.assertRaisesRegex(
exceptions.StreamNotEmptyError,
"Invalid length used to read Base, bytes remaining: 16",
attrs.read,
*args
)
def test_read_unsupported_attribute(self):
"""
Test that an AttributeNotSupported error is raised when an unsupported
attribute is parsed while reading in an Attributes structure from a
data stream. This can occur when an older attribute is no longer
supported by a newer version of KMIP, or vice versa.
"""
attrs = objects.Attributes()
self.assertEqual([], attrs.attributes)
args = (self.unsupported_encoding, )
kwargs = {"kmip_version": enums.KMIPVersion.KMIP_2_0}
self.assertRaisesRegex(
exceptions.AttributeNotSupported,
"Attribute OPERATION_POLICY_NAME is not supported by KMIP 2.0.",
attrs.read,
*args,
**kwargs
)
def test_read_alternative_tag(self):
"""
Test that an Attributes structure can be correctly read in from a data
stream with an alternative tag. This can occur if a variant of the
Attributes structure is being used, like the Common Attributes, Public
Key Attributes, or Private Key Attributes structures.
"""
attrs = objects.Attributes(tag=enums.Tags.PRIVATE_KEY_ATTRIBUTES)
self.assertEqual([], attrs.attributes)
attrs.read(self.alt_encoding)
self.assertEqual(2, len(attrs.attributes))
attr_1 = attrs.attributes[0]
self.assertIsInstance(attr_1, primitives.Enumeration)
self.assertEqual(enums.CryptographicAlgorithm.AES, attr_1.value)
attr_2 = attrs.attributes[1]
self.assertIsInstance(attr_2, primitives.Integer)
self.assertEqual(128, attr_2.value)
def test_write(self):
"""
Test that an Attributes structure can be correctly written to a data
stream.
"""
attrs = objects.Attributes(attributes=[
primitives.Enumeration(
enums.CryptographicAlgorithm,
enums.CryptographicAlgorithm.AES,
enums.Tags.CRYPTOGRAPHIC_ALGORITHM
),
primitives.Integer(
128,
enums.Tags.CRYPTOGRAPHIC_LENGTH
)
])
stream = utils.BytearrayStream()
attrs.write(stream)
self.assertEqual(len(self.full_encoding), len(stream))
self.assertEqual(str(self.full_encoding), str(stream))
def test_write_no_attributes(self):
"""
Test that an empty Attributes structure can be correctly written to
a data stream.
"""
attrs = objects.Attributes()
stream = utils.BytearrayStream()
attrs.write(stream)
self.assertEqual(len(self.empty_encoding), len(stream))
self.assertEqual(str(self.empty_encoding), str(stream))
def test_write_unsupported_attribute(self):
"""
Test that an AttributeNotSupported error is raised when an unsupported
attribute is found while writing an Attributes structure to a data
stream. This can occur when an older attribute is no longer supported
by a newer version of KMIP, or vice versa.
"""
attrs = objects.Attributes(attributes=[
primitives.TextString(
"default",
tag=enums.Tags.OPERATION_POLICY_NAME
)
])
stream = utils.BytearrayStream()
args = (stream, )
kwargs = {"kmip_version": enums.KMIPVersion.KMIP_2_0}
self.assertRaisesRegex(
exceptions.AttributeNotSupported,
"Attribute OPERATION_POLICY_NAME is not supported by KMIP 2.0.",
attrs.write,
*args,
**kwargs
)
def test_write_alternative_tag(self):
"""
Test that an Attributes structure can be correctly written to a data
stream with an alternative tag. This can occur if a variant of the
Attributes structure is being used, like the Common Attributes, Public
Key Attributes, or Private Key Attributes structures.
"""
attrs = objects.Attributes(
attributes=[
primitives.Enumeration(
enums.CryptographicAlgorithm,
enums.CryptographicAlgorithm.AES,
enums.Tags.CRYPTOGRAPHIC_ALGORITHM
),
primitives.Integer(
128,
enums.Tags.CRYPTOGRAPHIC_LENGTH
)
],
tag=enums.Tags.PRIVATE_KEY_ATTRIBUTES
)
stream = utils.BytearrayStream()
attrs.write(stream)
self.assertEqual(len(self.alt_encoding), len(stream))
self.assertEqual(str(self.alt_encoding), str(stream))
def test_repr(self):
"""
Test that repr can be applied to an Attributes structure.
"""
attrs = objects.Attributes(attributes=[
primitives.Enumeration(
enums.CryptographicAlgorithm,
enums.CryptographicAlgorithm.AES,
enums.Tags.CRYPTOGRAPHIC_ALGORITHM
),
primitives.Integer(
128,
enums.Tags.CRYPTOGRAPHIC_LENGTH
)
])
self.assertEqual(
"Attributes(attributes=["
"Enumeration("
"enum=CryptographicAlgorithm, "
"value=CryptographicAlgorithm.AES, "
"tag=Tags.CRYPTOGRAPHIC_ALGORITHM), "
"Integer(value=128)], "
"tag=Tags.ATTRIBUTES)",
repr(attrs)
)
def test_repr_alternative_tag(self):
"""
Test that repr can be applied to an Attribute structure with an
alternative tag. This can occur if a variant of the Attributes
structure is being used, like the Common Attributes, Public Key
Attributes, or Private Key Attributes structure.
"""
attrs = objects.Attributes(tag=enums.Tags.COMMON_ATTRIBUTES)
self.assertEqual(
"Attributes(attributes=[], tag=Tags.COMMON_ATTRIBUTES)",
repr(attrs)
)
def test_str(self):
"""
Test that str can be applied to an Attributes structure.
"""
attrs = objects.Attributes(attributes=[
primitives.Enumeration(
enums.CryptographicAlgorithm,
enums.CryptographicAlgorithm.AES,
enums.Tags.CRYPTOGRAPHIC_ALGORITHM
),
primitives.Integer(
128,
enums.Tags.CRYPTOGRAPHIC_LENGTH
)
])
self.assertEqual(
'{"attributes": [CryptographicAlgorithm.AES, 128]}',
str(attrs)
)
def test_equal_on_equal(self):
"""
Test that the equality operator returns True when comparing two
identical Attributes structures.
"""
a = objects.Attributes()
b = objects.Attributes()
self.assertTrue(a == b)
self.assertTrue(b == a)
a = objects.Attributes(attributes=[
primitives.Enumeration(
enums.CryptographicAlgorithm,
enums.CryptographicAlgorithm.AES,
enums.Tags.CRYPTOGRAPHIC_ALGORITHM
),
primitives.Integer(
128,
enums.Tags.CRYPTOGRAPHIC_LENGTH
)
])
b = objects.Attributes(attributes=[
primitives.Enumeration(
enums.CryptographicAlgorithm,
enums.CryptographicAlgorithm.AES,
enums.Tags.CRYPTOGRAPHIC_ALGORITHM
),
primitives.Integer(
128,
enums.Tags.CRYPTOGRAPHIC_LENGTH
)
])
self.assertTrue(a == b)
self.assertTrue(b == a)
def test_equal_on_not_equal_attributes(self):
"""
Test that the equality operator returns False when comparing two
Attributes structures with different attributes lists.
"""
a = objects.Attributes()
b = objects.Attributes(attributes=[
primitives.Enumeration(
enums.CryptographicAlgorithm,
enums.CryptographicAlgorithm.AES,
enums.Tags.CRYPTOGRAPHIC_ALGORITHM
),
primitives.Integer(
128,
enums.Tags.CRYPTOGRAPHIC_LENGTH
)
])
self.assertFalse(a == b)
self.assertFalse(b == a)
a = objects.Attributes(attributes=[
primitives.Enumeration(
enums.CryptographicAlgorithm,
enums.CryptographicAlgorithm.AES,
enums.Tags.CRYPTOGRAPHIC_ALGORITHM
),
primitives.Integer(
128,
enums.Tags.CRYPTOGRAPHIC_LENGTH
)
])
b = objects.Attributes(attributes=[
primitives.Integer(
128,
enums.Tags.CRYPTOGRAPHIC_LENGTH
),
primitives.Enumeration(
enums.CryptographicAlgorithm,
enums.CryptographicAlgorithm.AES,
enums.Tags.CRYPTOGRAPHIC_ALGORITHM
)
])
self.assertFalse(a == b)
self.assertFalse(b == a)
def test_equal_on_type_mismatch(self):
"""
Test that the equality operator returns False when comparing an
Attributes structure with another type.
"""
a = objects.Attributes()
b = 'invalid'
self.assertFalse(a == b)
self.assertFalse(b == a)
def test_not_equal_on_equal(self):
"""
Test that the inequality operator returns False when comparing two
identical Attributes structures.
"""
a = objects.Attributes()
b = objects.Attributes()
self.assertFalse(a != b)
self.assertFalse(b != a)
a = objects.Attributes(attributes=[
primitives.Enumeration(
enums.CryptographicAlgorithm,
enums.CryptographicAlgorithm.AES,
enums.Tags.CRYPTOGRAPHIC_ALGORITHM
),
primitives.Integer(
128,
enums.Tags.CRYPTOGRAPHIC_LENGTH
)
])
b = objects.Attributes(attributes=[
primitives.Enumeration(
enums.CryptographicAlgorithm,
enums.CryptographicAlgorithm.AES,
enums.Tags.CRYPTOGRAPHIC_ALGORITHM
),
primitives.Integer(
128,
enums.Tags.CRYPTOGRAPHIC_LENGTH
)
])
self.assertFalse(a != b)
self.assertFalse(b != a)
def test_not_equal_on_not_equal_attributes(self):
"""
Test that the inequality operator returns True when comparing two
Attributes structures with different attributes lists.
"""
a = objects.Attributes()
b = objects.Attributes(attributes=[
primitives.Enumeration(
enums.CryptographicAlgorithm,
enums.CryptographicAlgorithm.AES,
enums.Tags.CRYPTOGRAPHIC_ALGORITHM
),
primitives.Integer(
128,
enums.Tags.CRYPTOGRAPHIC_LENGTH
)
])
self.assertTrue(a != b)
self.assertTrue(b != a)
a = objects.Attributes(attributes=[
primitives.Enumeration(
enums.CryptographicAlgorithm,
enums.CryptographicAlgorithm.AES,
enums.Tags.CRYPTOGRAPHIC_ALGORITHM
),
primitives.Integer(
128,
enums.Tags.CRYPTOGRAPHIC_LENGTH
)
])
b = objects.Attributes(attributes=[
primitives.Integer(
128,
enums.Tags.CRYPTOGRAPHIC_LENGTH
),
primitives.Enumeration(
enums.CryptographicAlgorithm,
enums.CryptographicAlgorithm.AES,
enums.Tags.CRYPTOGRAPHIC_ALGORITHM
)
])
self.assertTrue(a != b)
self.assertTrue(b != a)
def test_not_equal_on_type_mismatch(self):
"""
Test that the inequality operator returns True when comparing an
Attributes structure with another type.
"""
a = objects.Attributes()
b = 'invalid'
self.assertTrue(a != b)
self.assertTrue(b != a)
class TestKeyMaterialStruct(TestCase):
"""
A test suite for the KeyMaterialStruct.

View File

@ -0,0 +1,324 @@
# Copyright (c) 2019 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from kmip.core import enums
class TestEnumUtilityFunctions(testtools.TestCase):
def setUp(self):
super(TestEnumUtilityFunctions, self).setUp()
def tearDown(self):
super(TestEnumUtilityFunctions, self).tearDown()
def test_is_enum_value(self):
result = enums.is_enum_value(
enums.CryptographicAlgorithm,
enums.CryptographicAlgorithm.AES
)
self.assertTrue(result)
result = enums.is_enum_value(
enums.WrappingMethod,
'invalid'
)
self.assertFalse(result)
def test_is_attribute(self):
# Test an attribute introduced in KMIP 1.0
result = enums.is_attribute(enums.Tags.UNIQUE_IDENTIFIER)
self.assertTrue(result)
# Test an attribute introduced in KMIP 1.1
result = enums.is_attribute(enums.Tags.FRESH)
self.assertTrue(result)
# Test an attribute introduced in KMIP 1.2
result = enums.is_attribute(enums.Tags.KEY_VALUE_PRESENT)
self.assertTrue(result)
# Test an attribute introduced in KMIP 1.3
result = enums.is_attribute(enums.Tags.RANDOM_NUMBER_GENERATOR)
self.assertTrue(result)
# Test an attribute introduced in KMIP 1.4
result = enums.is_attribute(enums.Tags.COMMENT)
self.assertTrue(result)
# Test an attribute introduced in KMIP 2.0
result = enums.is_attribute(enums.Tags.QUANTUM_SAFE)
self.assertTrue(result)
def test_is_attribute_added_in_kmip_1_0(self):
result = enums.is_attribute(
enums.Tags.UNIQUE_IDENTIFIER,
enums.KMIPVersion.KMIP_1_0
)
self.assertTrue(result)
result = enums.is_attribute(
enums.Tags.UNIQUE_IDENTIFIER,
enums.KMIPVersion.KMIP_1_1
)
self.assertTrue(result)
result = enums.is_attribute(
enums.Tags.UNIQUE_IDENTIFIER,
enums.KMIPVersion.KMIP_1_2
)
self.assertTrue(result)
result = enums.is_attribute(
enums.Tags.UNIQUE_IDENTIFIER,
enums.KMIPVersion.KMIP_1_3
)
self.assertTrue(result)
result = enums.is_attribute(
enums.Tags.UNIQUE_IDENTIFIER,
enums.KMIPVersion.KMIP_1_4
)
self.assertTrue(result)
result = enums.is_attribute(
enums.Tags.UNIQUE_IDENTIFIER,
enums.KMIPVersion.KMIP_2_0
)
self.assertTrue(result)
def test_is_attribute_added_in_kmip_1_1(self):
result = enums.is_attribute(
enums.Tags.FRESH,
enums.KMIPVersion.KMIP_1_0
)
self.assertFalse(result)
result = enums.is_attribute(
enums.Tags.FRESH,
enums.KMIPVersion.KMIP_1_1
)
self.assertTrue(result)
result = enums.is_attribute(
enums.Tags.FRESH,
enums.KMIPVersion.KMIP_1_2
)
self.assertTrue(result)
result = enums.is_attribute(
enums.Tags.FRESH,
enums.KMIPVersion.KMIP_1_3
)
self.assertTrue(result)
result = enums.is_attribute(
enums.Tags.FRESH,
enums.KMIPVersion.KMIP_1_4
)
self.assertTrue(result)
result = enums.is_attribute(
enums.Tags.FRESH,
enums.KMIPVersion.KMIP_2_0
)
self.assertTrue(result)
def test_is_attribute_added_in_kmip_1_2(self):
result = enums.is_attribute(
enums.Tags.KEY_VALUE_PRESENT,
enums.KMIPVersion.KMIP_1_0
)
self.assertFalse(result)
result = enums.is_attribute(
enums.Tags.KEY_VALUE_PRESENT,
enums.KMIPVersion.KMIP_1_1
)
self.assertFalse(result)
result = enums.is_attribute(
enums.Tags.KEY_VALUE_PRESENT,
enums.KMIPVersion.KMIP_1_2
)
self.assertTrue(result)
result = enums.is_attribute(
enums.Tags.KEY_VALUE_PRESENT,
enums.KMIPVersion.KMIP_1_3
)
self.assertTrue(result)
result = enums.is_attribute(
enums.Tags.KEY_VALUE_PRESENT,
enums.KMIPVersion.KMIP_1_4
)
self.assertTrue(result)
result = enums.is_attribute(
enums.Tags.KEY_VALUE_PRESENT,
enums.KMIPVersion.KMIP_2_0
)
self.assertTrue(result)
def test_is_attribute_added_in_kmip_1_3(self):
result = enums.is_attribute(
enums.Tags.RANDOM_NUMBER_GENERATOR,
enums.KMIPVersion.KMIP_1_0
)
self.assertFalse(result)
result = enums.is_attribute(
enums.Tags.RANDOM_NUMBER_GENERATOR,
enums.KMIPVersion.KMIP_1_1
)
self.assertFalse(result)
result = enums.is_attribute(
enums.Tags.RANDOM_NUMBER_GENERATOR,
enums.KMIPVersion.KMIP_1_2
)
self.assertFalse(result)
result = enums.is_attribute(
enums.Tags.RANDOM_NUMBER_GENERATOR,
enums.KMIPVersion.KMIP_1_3
)
self.assertTrue(result)
result = enums.is_attribute(
enums.Tags.RANDOM_NUMBER_GENERATOR,
enums.KMIPVersion.KMIP_1_4
)
self.assertTrue(result)
result = enums.is_attribute(
enums.Tags.RANDOM_NUMBER_GENERATOR,
enums.KMIPVersion.KMIP_2_0
)
self.assertTrue(result)
def test_is_attribute_added_in_kmip_1_4(self):
result = enums.is_attribute(
enums.Tags.COMMENT,
enums.KMIPVersion.KMIP_1_0
)
self.assertFalse(result)
result = enums.is_attribute(
enums.Tags.COMMENT,
enums.KMIPVersion.KMIP_1_1
)
self.assertFalse(result)
result = enums.is_attribute(
enums.Tags.COMMENT,
enums.KMIPVersion.KMIP_1_2
)
self.assertFalse(result)
result = enums.is_attribute(
enums.Tags.COMMENT,
enums.KMIPVersion.KMIP_1_3
)
self.assertFalse(result)
result = enums.is_attribute(
enums.Tags.COMMENT,
enums.KMIPVersion.KMIP_1_4
)
self.assertTrue(result)
result = enums.is_attribute(
enums.Tags.COMMENT,
enums.KMIPVersion.KMIP_2_0
)
self.assertTrue(result)
def test_is_attribute_added_in_kmip_2_0(self):
result = enums.is_attribute(
enums.Tags.QUANTUM_SAFE,
enums.KMIPVersion.KMIP_1_0
)
self.assertFalse(result)
result = enums.is_attribute(
enums.Tags.QUANTUM_SAFE,
enums.KMIPVersion.KMIP_1_1
)
self.assertFalse(result)
result = enums.is_attribute(
enums.Tags.QUANTUM_SAFE,
enums.KMIPVersion.KMIP_1_2
)
self.assertFalse(result)
result = enums.is_attribute(
enums.Tags.QUANTUM_SAFE,
enums.KMIPVersion.KMIP_1_3
)
self.assertFalse(result)
result = enums.is_attribute(
enums.Tags.QUANTUM_SAFE,
enums.KMIPVersion.KMIP_1_4
)
self.assertFalse(result)
result = enums.is_attribute(
enums.Tags.QUANTUM_SAFE,
enums.KMIPVersion.KMIP_2_0
)
self.assertTrue(result)
def test_is_attribute_removed_in_kmip_2_0(self):
result = enums.is_attribute(
enums.Tags.CUSTOM_ATTRIBUTE,
enums.KMIPVersion.KMIP_1_0
)
self.assertTrue(result)
result = enums.is_attribute(
enums.Tags.CUSTOM_ATTRIBUTE,
enums.KMIPVersion.KMIP_1_1
)
self.assertTrue(result)
result = enums.is_attribute(
enums.Tags.CUSTOM_ATTRIBUTE,
enums.KMIPVersion.KMIP_1_2
)
self.assertTrue(result)
result = enums.is_attribute(
enums.Tags.CUSTOM_ATTRIBUTE,
enums.KMIPVersion.KMIP_1_3
)
self.assertTrue(result)
result = enums.is_attribute(
enums.Tags.CUSTOM_ATTRIBUTE,
enums.KMIPVersion.KMIP_1_4
)
self.assertTrue(result)
result = enums.is_attribute(
enums.Tags.CUSTOM_ATTRIBUTE,
enums.KMIPVersion.KMIP_2_0
)
self.assertFalse(result)