Update the Register payloads to support KMIP 2.0

This change updates the Register payloads to support KMIP 2.0
features, including swapping out TemplateAttributes for the new
Attributes structure in the request payload and removing all
attribute-related encodings from the response payload. Unit tests
have been added to cover these changes.
This commit is contained in:
Peter Hamilton 2019-03-06 14:32:14 -05:00 committed by Peter Hamilton
parent a81233aa2a
commit 8e7dae6629
2 changed files with 264 additions and 30 deletions

View File

@ -174,17 +174,37 @@ class RegisterRequestPayload(primitives.Struct):
"type."
)
if self.is_tag_next(enums.Tags.TEMPLATE_ATTRIBUTE, local_buffer):
self._template_attribute = objects.TemplateAttribute()
self._template_attribute.read(
local_buffer,
kmip_version=kmip_version
)
if kmip_version < enums.KMIPVersion.KMIP_2_0:
if self.is_tag_next(enums.Tags.TEMPLATE_ATTRIBUTE, local_buffer):
self._template_attribute = objects.TemplateAttribute()
self._template_attribute.read(
local_buffer,
kmip_version=kmip_version
)
else:
raise exceptions.InvalidKmipEncoding(
"The Register request payload encoding is missing the "
"template attribute."
)
else:
raise exceptions.InvalidKmipEncoding(
"The Register request payload encoding is missing the "
"template attribute."
)
# NOTE (ph) For now, leave attributes natively in TemplateAttribute
# form and just convert to the KMIP 2.0 Attributes form as needed
# for encoding/decoding purposes. Changing the payload to require
# the new Attributes structure will trigger a bunch of second-order
# effects across the client and server codebases that is beyond
# the scope of updating the Register payloads to support KMIP 2.0.
if self.is_tag_next(enums.Tags.ATTRIBUTES, local_buffer):
attributes = objects.Attributes()
attributes.read(local_buffer, kmip_version=kmip_version)
value = objects.convert_attributes_to_template_attribute(
attributes
)
self._template_attribute = value
else:
raise exceptions.InvalidKmipEncoding(
"The Register request payload encoding is missing the "
"attributes structure."
)
managed_object = self.secret_factory.create(self.object_type)
@ -224,16 +244,34 @@ class RegisterRequestPayload(primitives.Struct):
"field."
)
if self._template_attribute:
self._template_attribute.write(
local_buffer,
kmip_version=kmip_version
)
if kmip_version < enums.KMIPVersion.KMIP_2_0:
if self._template_attribute:
self._template_attribute.write(
local_buffer,
kmip_version=kmip_version
)
else:
raise exceptions.InvalidField(
"The Register request payload is missing the template "
"attribute field."
)
else:
raise exceptions.InvalidField(
"The Register request payload is missing the template "
"attribute field."
)
# NOTE (ph) For now, leave attributes natively in TemplateAttribute
# form and just convert to the KMIP 2.0 Attributes form as needed
# for encoding/decoding purposes. Changing the payload to require
# the new Attributes structure will trigger a bunch of second-order
# effects across the client and server codebases that is beyond
# the scope of updating the Register payloads to support KMIP 2.0.
if self._template_attribute:
attributes = objects.convert_template_attribute_to_attributes(
self._template_attribute
)
attributes.write(local_buffer, kmip_version=kmip_version)
else:
raise exceptions.InvalidField(
"The Register request payload is missing the template "
"attribute field."
)
if self._managed_object:
self._managed_object.write(local_buffer, kmip_version=kmip_version)
@ -391,12 +429,13 @@ class RegisterResponsePayload(primitives.Struct):
"identifier."
)
if self.is_tag_next(enums.Tags.TEMPLATE_ATTRIBUTE, local_buffer):
self._template_attribute = objects.TemplateAttribute()
self._template_attribute.read(
local_buffer,
kmip_version=kmip_version
)
if kmip_version < enums.KMIPVersion.KMIP_2_0:
if self.is_tag_next(enums.Tags.TEMPLATE_ATTRIBUTE, local_buffer):
self._template_attribute = objects.TemplateAttribute()
self._template_attribute.read(
local_buffer,
kmip_version=kmip_version
)
self.is_oversized(local_buffer)
@ -427,11 +466,12 @@ class RegisterResponsePayload(primitives.Struct):
"identifier field."
)
if self._template_attribute:
self._template_attribute.write(
local_buffer,
kmip_version=kmip_version
)
if kmip_version < enums.KMIPVersion.KMIP_2_0:
if self._template_attribute:
self._template_attribute.write(
local_buffer,
kmip_version=kmip_version
)
self.length = local_buffer.length()
super(RegisterResponsePayload, self).write(

View File

@ -142,6 +142,31 @@ class TestRegisterRequestPayload(testtools.TestCase):
b'\x00\x00'
)
# Encoding obtained from the KMIP 1.1 testing document, Section 13.2.2.
# Modified to exclude the Link attribute. Manually converted into the
# KMIP 2.0 format.
#
# TODO (ph) Add the Link attribute back in once Links are supported.
#
# This encoding matches the following set of values:
# Request Payload
# Object Type - Certificate
# Attributes
# Cryptographic Usage Mask - Sign | Verify
# Certificate
# Certificate Type - X.509
# Certificate Value - See comment for the full encoding.
self.full_encoding_with_attributes = utils.BytearrayStream(
b'\x42\x00\x79\x01\x00\x00\x03\x60'
b'\x42\x00\x57\x05\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00'
b'\x42\x01\x25\x01\x00\x00\x00\x10'
b'\x42\x00\x2C\x02\x00\x00\x00\x04\x00\x00\x00\x03\x00\x00\x00\x00'
b'\x42\x00\x13\x01\x00\x00\x03\x30'
b'\x42\x00\x1D\x05\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00\x00'
b'\x42\x00\x1E\x08\x00\x00\x03\x16' + self.certificate_value +
b'\x00\x00'
)
# Encoding obtained from the KMIP 1.1 testing document, Section 13.2.2.
# Modified to exclude the Link attribute.
#
@ -326,6 +351,48 @@ class TestRegisterRequestPayload(testtools.TestCase):
payload.managed_object
)
def test_read_kmip_2_0(self):
"""
Test that a Register request payload can be read from a data stream
encoded with the KMIP 2.0 format.
"""
payload = payloads.RegisterRequestPayload()
self.assertIsNone(payload.object_type)
self.assertIsNone(payload.template_attribute)
self.assertIsNone(payload.managed_object)
payload.read(
self.full_encoding_with_attributes,
kmip_version=enums.KMIPVersion.KMIP_2_0
)
self.assertEqual(enums.ObjectType.CERTIFICATE, payload.object_type)
self.assertEqual(
objects.TemplateAttribute(
attributes=[
objects.Attribute(
attribute_name=objects.Attribute.AttributeName(
"Cryptographic Usage Mask"
),
attribute_value=primitives.Integer(
enums.CryptographicUsageMask.SIGN.value |
enums.CryptographicUsageMask.VERIFY.value,
tag=enums.Tags.CRYPTOGRAPHIC_USAGE_MASK
)
)
]
),
payload.template_attribute
)
self.assertEqual(
secrets.Certificate(
certificate_type=enums.CertificateType.X_509,
certificate_value=self.certificate_value
),
payload.managed_object
)
def test_read_missing_object_type(self):
"""
Test that an InvalidKmipEncoding error is raised during the decoding
@ -360,6 +427,25 @@ class TestRegisterRequestPayload(testtools.TestCase):
*args
)
def test_read_missing_attributes(self):
"""
Test that an InvalidKmipEncoding error is raised during the decoding
of a Register request payload when the attributes structure is missing
from the encoding.
"""
payload = payloads.RegisterRequestPayload()
args = (self.no_template_attribute_encoding, )
kwargs = {"kmip_version": enums.KMIPVersion.KMIP_2_0}
self.assertRaisesRegex(
exceptions.InvalidKmipEncoding,
"The Register request payload encoding is missing the attributes "
"structure.",
payload.read,
*args,
**kwargs
)
def test_read_missing_managed_object(self):
"""
Test that an InvalidKmipEncoding error is raised during the decoding
@ -409,6 +495,39 @@ class TestRegisterRequestPayload(testtools.TestCase):
self.assertEqual(len(self.full_encoding), len(stream))
self.assertEqual(str(self.full_encoding), str(stream))
def test_write_kmip_2_0(self):
"""
Test that a Register request payload can be written to a data stream
encoded with the KMIP 2.0 format.
"""
payload = payloads.RegisterRequestPayload(
object_type=enums.ObjectType.CERTIFICATE,
template_attribute=objects.TemplateAttribute(
attributes=[
objects.Attribute(
attribute_name=objects.Attribute.AttributeName(
"Cryptographic Usage Mask"
),
attribute_value=primitives.Integer(
enums.CryptographicUsageMask.SIGN.value |
enums.CryptographicUsageMask.VERIFY.value,
tag=enums.Tags.CRYPTOGRAPHIC_USAGE_MASK
)
)
]
),
managed_object=secrets.Certificate(
certificate_type=enums.CertificateType.X_509,
certificate_value=self.certificate_value
)
)
stream = utils.BytearrayStream()
payload.write(stream, kmip_version=enums.KMIPVersion.KMIP_2_0)
self.assertEqual(len(self.full_encoding_with_attributes), len(stream))
self.assertEqual(str(self.full_encoding_with_attributes), str(stream))
def test_write_missing_object_type(self):
"""
Test that an InvalidField error is raised during the encoding of a
@ -466,6 +585,31 @@ class TestRegisterRequestPayload(testtools.TestCase):
*args
)
def test_write_missing_attributes(self):
"""
Test that an InvalidField error is raised during the encoding of a
Register request payload when the payload is missing the attributes
structure.
"""
payload = payloads.RegisterRequestPayload(
object_type=enums.ObjectType.CERTIFICATE,
managed_object=secrets.Certificate(
certificate_type=enums.CertificateType.X_509,
certificate_value=self.certificate_value
)
)
args = (utils.BytearrayStream(), )
kwargs = {"kmip_version": enums.KMIPVersion.KMIP_2_0}
self.assertRaisesRegex(
exceptions.InvalidField,
"The Register request payload is missing the template attribute "
"field.",
payload.write,
*args,
**kwargs
)
def test_write_missing_managed_object(self):
"""
Test that an InvalidField error is raised during the encoding of a
@ -1020,6 +1164,27 @@ class TestRegisterResponsePayload(testtools.TestCase):
payload.template_attribute
)
def test_read_kmip_2_0(self):
"""
Test that a Register response payload can be read from a data stream
encoded with the KMIP 2.0 format.
"""
payload = payloads.RegisterResponsePayload()
self.assertIsNone(payload.unique_identifier)
self.assertIsNone(payload.template_attribute)
payload.read(
self.no_template_attribute_encoding,
kmip_version=enums.KMIPVersion.KMIP_2_0
)
self.assertEqual(
"7091d0bf-548a-4d4a-93a6-6dd71cf75221",
payload.unique_identifier
)
self.assertIsNone(payload.template_attribute)
def test_read_missing_unique_identifier(self):
"""
Test that an InvalidKmipEncoding error is raised during the decoding
@ -1086,6 +1251,35 @@ class TestRegisterResponsePayload(testtools.TestCase):
self.assertEqual(len(self.full_encoding), len(stream))
self.assertEqual(str(self.full_encoding), str(stream))
def test_write_kmip_2_0(self):
"""
Test that a Register response payload can be written to a data stream
encoded with the KMIP 2.0 format.
"""
payload = payloads.RegisterResponsePayload(
unique_identifier="7091d0bf-548a-4d4a-93a6-6dd71cf75221",
template_attribute=objects.TemplateAttribute(
attributes=[
objects.Attribute(
attribute_name=objects.Attribute.AttributeName(
"State"
),
attribute_value=primitives.Enumeration(
enums.State,
value=enums.State.PRE_ACTIVE,
tag=enums.Tags.STATE
)
)
]
)
)
stream = utils.BytearrayStream()
payload.write(stream, kmip_version=enums.KMIPVersion.KMIP_2_0)
self.assertEqual(len(self.no_template_attribute_encoding), len(stream))
self.assertEqual(str(self.no_template_attribute_encoding), str(stream))
def test_write_missing_unique_identifier(self):
"""
Test that an InvalidField error is raised during the encoding of a