Update the CreateKeyPair payloads to support KMIP 2.0

This change updates the CreateKeyPair payloads to support KMIP 2.0
features, including swapping out TemplateAttributes for the new
Attributes structure in the request payload and removing all
attribute-related encodings from the response payload. Unit tests
have been added to cover these changes.
This commit is contained in:
Peter Hamilton 2019-03-18 11:58:24 -04:00 committed by Peter Hamilton
parent 1c85295d89
commit 6f81d79c53
2 changed files with 380 additions and 70 deletions

View File

@ -154,41 +154,82 @@ class CreateKeyPairRequestPayload(primitives.Struct):
)
local_buffer = utils.BytearrayStream(input_buffer.read(self.length))
if self.is_tag_next(
enums.Tags.COMMON_TEMPLATE_ATTRIBUTE,
local_buffer
):
self._common_template_attribute = objects.TemplateAttribute(
tag=enums.Tags.COMMON_TEMPLATE_ATTRIBUTE
)
self._common_template_attribute.read(
local_buffer,
kmip_version=kmip_version
)
if kmip_version < enums.KMIPVersion.KMIP_2_0:
if self.is_tag_next(
enums.Tags.COMMON_TEMPLATE_ATTRIBUTE,
local_buffer
):
self._common_template_attribute = objects.TemplateAttribute(
tag=enums.Tags.COMMON_TEMPLATE_ATTRIBUTE
)
self._common_template_attribute.read(
local_buffer,
kmip_version=kmip_version
)
else:
if self.is_tag_next(enums.Tags.COMMON_ATTRIBUTES, local_buffer):
attributes = objects.Attributes(
tag=enums.Tags.COMMON_ATTRIBUTES
)
attributes.read(local_buffer, kmip_version=kmip_version)
self._common_template_attribute = \
objects.convert_attributes_to_template_attribute(
attributes
)
if self.is_tag_next(
enums.Tags.PRIVATE_KEY_TEMPLATE_ATTRIBUTE,
local_buffer
):
self._private_key_template_attribute = objects.TemplateAttribute(
tag=enums.Tags.PRIVATE_KEY_TEMPLATE_ATTRIBUTE
)
self._private_key_template_attribute.read(
local_buffer,
kmip_version=kmip_version
)
if kmip_version < enums.KMIPVersion.KMIP_2_0:
if self.is_tag_next(
enums.Tags.PRIVATE_KEY_TEMPLATE_ATTRIBUTE,
local_buffer
):
self._private_key_template_attribute = \
objects.TemplateAttribute(
tag=enums.Tags.PRIVATE_KEY_TEMPLATE_ATTRIBUTE
)
self._private_key_template_attribute.read(
local_buffer,
kmip_version=kmip_version
)
else:
if self.is_tag_next(
enums.Tags.PRIVATE_KEY_ATTRIBUTES,
local_buffer
):
attributes = objects.Attributes(
tag=enums.Tags.PRIVATE_KEY_ATTRIBUTES
)
attributes.read(local_buffer, kmip_version=kmip_version)
self._private_key_template_attribute = \
objects.convert_attributes_to_template_attribute(
attributes
)
if self.is_tag_next(
enums.Tags.PUBLIC_KEY_TEMPLATE_ATTRIBUTE,
local_buffer
):
self._public_key_template_attribute = objects.TemplateAttribute(
tag=enums.Tags.PUBLIC_KEY_TEMPLATE_ATTRIBUTE
)
self._public_key_template_attribute.read(
local_buffer,
kmip_version=kmip_version
)
if kmip_version < enums.KMIPVersion.KMIP_2_0:
if self.is_tag_next(
enums.Tags.PUBLIC_KEY_TEMPLATE_ATTRIBUTE,
local_buffer
):
self._public_key_template_attribute = \
objects.TemplateAttribute(
tag=enums.Tags.PUBLIC_KEY_TEMPLATE_ATTRIBUTE
)
self._public_key_template_attribute.read(
local_buffer,
kmip_version=kmip_version
)
else:
if self.is_tag_next(
enums.Tags.PUBLIC_KEY_ATTRIBUTES,
local_buffer
):
attributes = objects.Attributes(
tag=enums.Tags.PUBLIC_KEY_ATTRIBUTES
)
attributes.read(local_buffer, kmip_version=kmip_version)
self._public_key_template_attribute = \
objects.convert_attributes_to_template_attribute(
attributes
)
self.is_oversized(local_buffer)
@ -205,23 +246,44 @@ class CreateKeyPairRequestPayload(primitives.Struct):
"""
local_buffer = utils.BytearrayStream()
if self._common_template_attribute is not None:
self._common_template_attribute.write(
local_buffer,
kmip_version=kmip_version
)
if kmip_version < enums.KMIPVersion.KMIP_2_0:
if self._common_template_attribute is not None:
self._common_template_attribute.write(
local_buffer,
kmip_version=kmip_version
)
else:
if self._common_template_attribute is not None:
attributes = objects.convert_template_attribute_to_attributes(
self._common_template_attribute
)
attributes.write(local_buffer, kmip_version=kmip_version)
if self._private_key_template_attribute is not None:
self._private_key_template_attribute.write(
local_buffer,
kmip_version=kmip_version
)
if kmip_version < enums.KMIPVersion.KMIP_2_0:
if self._private_key_template_attribute is not None:
self._private_key_template_attribute.write(
local_buffer,
kmip_version=kmip_version
)
else:
if self._private_key_template_attribute is not None:
attributes = objects.convert_template_attribute_to_attributes(
self._private_key_template_attribute
)
attributes.write(local_buffer, kmip_version=kmip_version)
if self._public_key_template_attribute is not None:
self._public_key_template_attribute.write(
local_buffer,
kmip_version=kmip_version
)
if kmip_version < enums.KMIPVersion.KMIP_2_0:
if self._public_key_template_attribute is not None:
self._public_key_template_attribute.write(
local_buffer,
kmip_version=kmip_version
)
else:
if self._public_key_template_attribute is not None:
attributes = objects.convert_template_attribute_to_attributes(
self._public_key_template_attribute
)
attributes.write(local_buffer, kmip_version=kmip_version)
self.length = local_buffer.length()
super(CreateKeyPairRequestPayload, self).write(
@ -301,6 +363,26 @@ class CreateKeyPairResponsePayload(primitives.Struct):
public_key_unique_identifier=None,
private_key_template_attribute=None,
public_key_template_attribute=None):
"""
Construct a CreateKeyPair response payload structure.
Args:
private_key_unique_identifier (string): A string specifying the
ID of the new private key. Optional, defaults to None. Required
for read/write.
public_key_unique_identifier (string): A string specifying the
ID of the new public key. Optional, defaults to None. Required
for read/write.
private_key_template_attribute (TemplateAttribute): A
TemplateAttribute structure with the
PrivateKeyTemplateAttribute tag containing the set of
attributes that were set on the new private key. Optional,
defaults to None.
public_key_template_attribute (TemplateAttribute): A
TemplateAttribute structure with the PublicKeyTemplateAttribute
tag containing the set of attributes that were set on the new
public key. Optional, defaults to None.
"""
super(CreateKeyPairResponsePayload, self).__init__(
enums.Tags.RESPONSE_PAYLOAD
)
@ -456,29 +538,32 @@ class CreateKeyPairResponsePayload(primitives.Struct):
"public key unique identifier."
)
if self.is_tag_next(
enums.Tags.PRIVATE_KEY_TEMPLATE_ATTRIBUTE,
local_buffer
):
self._private_key_template_attribute = objects.TemplateAttribute(
tag=enums.Tags.PRIVATE_KEY_TEMPLATE_ATTRIBUTE
)
self._private_key_template_attribute.read(
local_buffer,
kmip_version=kmip_version
)
if kmip_version < enums.KMIPVersion.KMIP_2_0:
if self.is_tag_next(
enums.Tags.PRIVATE_KEY_TEMPLATE_ATTRIBUTE,
local_buffer
):
self._private_key_template_attribute = \
objects.TemplateAttribute(
tag=enums.Tags.PRIVATE_KEY_TEMPLATE_ATTRIBUTE
)
self._private_key_template_attribute.read(
local_buffer,
kmip_version=kmip_version
)
if self.is_tag_next(
enums.Tags.PUBLIC_KEY_TEMPLATE_ATTRIBUTE,
local_buffer
):
self._public_key_template_attribute = objects.TemplateAttribute(
tag=enums.Tags.PUBLIC_KEY_TEMPLATE_ATTRIBUTE
)
self._public_key_template_attribute.read(
local_buffer,
kmip_version=kmip_version
)
if self.is_tag_next(
enums.Tags.PUBLIC_KEY_TEMPLATE_ATTRIBUTE,
local_buffer
):
self._public_key_template_attribute = \
objects.TemplateAttribute(
tag=enums.Tags.PUBLIC_KEY_TEMPLATE_ATTRIBUTE
)
self._public_key_template_attribute.read(
local_buffer,
kmip_version=kmip_version
)
self.is_oversized(local_buffer)

View File

@ -100,6 +100,43 @@ class TestCreateKeyPairRequestPayload(testtools.TestCase):
b'\x42\x00\x0B\x02\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00\x00'
)
# Encoding obtained from the KMIP 1.1 testing document, Section 8.1.0.
# Manually converted to the KMIP 2.0 format.
#
# This encoding matches the following set of values:
# Request Payload
# Common Attributes
# Cryptographic Algorithm - RSA
# Cryptographic Length - 1024
# Private Key Attributes
# Name
# Name Value - PrivateKey1
# Name Type - Uninterpreted Text String
# Cryptographic Usage Mask - Sign
# Public Key Attributes
# Name
# Name Value - PublicKey1
# Name Type - Uninterpreted Text String
# Cryptographic Usage Mask - Verify
self.full_encoding_with_attributes = utils.BytearrayStream(
b'\x42\x00\x79\x01\x00\x00\x00\xB8'
b'\x42\x01\x26\x01\x00\x00\x00\x20'
b'\x42\x00\x28\x05\x00\x00\x00\x04\x00\x00\x00\x04\x00\x00\x00\x00'
b'\x42\x00\x2A\x02\x00\x00\x00\x04\x00\x00\x04\x00\x00\x00\x00\x00'
b'\x42\x01\x27\x01\x00\x00\x00\x40'
b'\x42\x00\x53\x01\x00\x00\x00\x28'
b'\x42\x00\x55\x07\x00\x00\x00\x0B'
b'\x50\x72\x69\x76\x61\x74\x65\x4B\x65\x79\x31\x00\x00\x00\x00\x00'
b'\x42\x00\x54\x05\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00'
b'\x42\x00\x2C\x02\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00'
b'\x42\x01\x28\x01\x00\x00\x00\x40'
b'\x42\x00\x53\x01\x00\x00\x00\x28'
b'\x42\x00\x55\x07\x00\x00\x00\x0A'
b'\x50\x75\x62\x6C\x69\x63\x4B\x65\x79\x31\x00\x00\x00\x00\x00\x00'
b'\x42\x00\x54\x05\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00'
b'\x42\x00\x2C\x02\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00\x00'
)
# Encoding obtained from the KMIP 1.1 testing document, Section 8.1.0.
#
# This encoding matches the following set of values:
@ -510,6 +547,106 @@ class TestCreateKeyPairRequestPayload(testtools.TestCase):
payload.public_key_template_attribute
)
def test_read_kmip_2_0(self):
"""
Test that a CreateKeyPair request payload can be read from a data
stream encoded with the KMIP 2.0 format.
"""
payload = payloads.CreateKeyPairRequestPayload()
self.assertIsNone(payload.common_template_attribute)
self.assertIsNone(payload.private_key_template_attribute)
self.assertIsNone(payload.public_key_template_attribute)
payload.read(
self.full_encoding_with_attributes,
kmip_version=enums.KMIPVersion.KMIP_2_0
)
self.assertEqual(
objects.TemplateAttribute(
attributes=[
objects.Attribute(
attribute_name=objects.Attribute.AttributeName(
'Cryptographic Algorithm'
),
attribute_value=primitives.Enumeration(
enums.CryptographicAlgorithm,
value=enums.CryptographicAlgorithm.RSA,
tag=enums.Tags.CRYPTOGRAPHIC_ALGORITHM
)
),
objects.Attribute(
attribute_name=objects.Attribute.AttributeName(
'Cryptographic Length'
),
attribute_value=primitives.Integer(
value=1024,
tag=enums.Tags.CRYPTOGRAPHIC_LENGTH
)
)
],
tag=enums.Tags.COMMON_TEMPLATE_ATTRIBUTE
),
payload.common_template_attribute
)
self.assertEqual(
objects.TemplateAttribute(
attributes=[
objects.Attribute(
attribute_name=objects.Attribute.AttributeName("Name"),
attribute_value=attributes.Name(
name_value=attributes.Name.NameValue(
"PrivateKey1"
),
name_type=attributes.Name.NameType(
enums.NameType.UNINTERPRETED_TEXT_STRING
)
)
),
objects.Attribute(
attribute_name=objects.Attribute.AttributeName(
"Cryptographic Usage Mask"
),
attribute_value=primitives.Integer(
value=enums.CryptographicUsageMask.SIGN.value,
tag=enums.Tags.CRYPTOGRAPHIC_USAGE_MASK
)
)
],
tag=enums.Tags.PRIVATE_KEY_TEMPLATE_ATTRIBUTE
),
payload.private_key_template_attribute
)
self.assertEqual(
objects.TemplateAttribute(
attributes=[
objects.Attribute(
attribute_name=objects.Attribute.AttributeName("Name"),
attribute_value=attributes.Name(
name_value=attributes.Name.NameValue(
"PublicKey1"
),
name_type=attributes.Name.NameType(
enums.NameType.UNINTERPRETED_TEXT_STRING
)
)
),
objects.Attribute(
attribute_name=objects.Attribute.AttributeName(
"Cryptographic Usage Mask"
),
attribute_value=primitives.Integer(
value=enums.CryptographicUsageMask.VERIFY.value,
tag=enums.Tags.CRYPTOGRAPHIC_USAGE_MASK
)
)
],
tag=enums.Tags.PUBLIC_KEY_TEMPLATE_ATTRIBUTE
),
payload.public_key_template_attribute
)
def test_read_missing_common_template_attribute(self):
"""
Test that a CreateKeyPair request payload can be read from a data
@ -826,6 +963,94 @@ class TestCreateKeyPairRequestPayload(testtools.TestCase):
self.assertEqual(len(self.full_encoding), len(stream))
self.assertEqual(str(self.full_encoding), str(stream))
def test_write_kmip_2_0(self):
"""
Test that a CreateKeyPair request payload can be written to a data
stream encoded with the KMIP 2.0 format.
"""
payload = payloads.CreateKeyPairRequestPayload(
common_template_attribute=objects.TemplateAttribute(
attributes=[
objects.Attribute(
attribute_name=objects.Attribute.AttributeName(
'Cryptographic Algorithm'
),
attribute_value=primitives.Enumeration(
enums.CryptographicAlgorithm,
value=enums.CryptographicAlgorithm.RSA,
tag=enums.Tags.CRYPTOGRAPHIC_ALGORITHM
)
),
objects.Attribute(
attribute_name=objects.Attribute.AttributeName(
'Cryptographic Length'
),
attribute_value=primitives.Integer(
value=1024,
tag=enums.Tags.CRYPTOGRAPHIC_LENGTH
)
)
],
tag=enums.Tags.COMMON_TEMPLATE_ATTRIBUTE
),
private_key_template_attribute=objects.TemplateAttribute(
attributes=[
objects.Attribute(
attribute_name=objects.Attribute.AttributeName("Name"),
attribute_value=attributes.Name(
name_value=attributes.Name.NameValue(
"PrivateKey1"
),
name_type=attributes.Name.NameType(
enums.NameType.UNINTERPRETED_TEXT_STRING
)
)
),
objects.Attribute(
attribute_name=objects.Attribute.AttributeName(
"Cryptographic Usage Mask"
),
attribute_value=primitives.Integer(
value=enums.CryptographicUsageMask.SIGN.value,
tag=enums.Tags.CRYPTOGRAPHIC_USAGE_MASK
)
)
],
tag=enums.Tags.PRIVATE_KEY_TEMPLATE_ATTRIBUTE
),
public_key_template_attribute=objects.TemplateAttribute(
attributes=[
objects.Attribute(
attribute_name=objects.Attribute.AttributeName("Name"),
attribute_value=attributes.Name(
name_value=attributes.Name.NameValue(
"PublicKey1"
),
name_type=attributes.Name.NameType(
enums.NameType.UNINTERPRETED_TEXT_STRING
)
)
),
objects.Attribute(
attribute_name=objects.Attribute.AttributeName(
"Cryptographic Usage Mask"
),
attribute_value=primitives.Integer(
value=enums.CryptographicUsageMask.VERIFY.value,
tag=enums.Tags.CRYPTOGRAPHIC_USAGE_MASK
)
)
],
tag=enums.Tags.PUBLIC_KEY_TEMPLATE_ATTRIBUTE
)
)
stream = utils.BytearrayStream()
payload.write(stream, kmip_version=enums.KMIPVersion.KMIP_2_0)
self.assertEqual(len(self.full_encoding_with_attributes), len(stream))
self.assertEqual(str(self.full_encoding_with_attributes), str(stream))
def test_write_missing_common_template_attribute(self):
"""
Test that a CreateKeyPair request payload can be written to a data