Adding KmipEngine support for Register

This change adds support for the Register operation to the KmipEngine.
New exceptions and test cases are included.
This commit is contained in:
Peter 2016-03-21 11:22:47 -04:00
parent 0a499b7b12
commit 89cba73821
2 changed files with 981 additions and 7 deletions

View File

@ -14,6 +14,7 @@
# under the License.
import logging
import six
import sqlalchemy
from sqlalchemy.orm import exc
@ -35,12 +36,15 @@ from kmip.core.messages.payloads import destroy
from kmip.core.messages.payloads import discover_versions
from kmip.core.messages.payloads import get
from kmip.core.messages.payloads import query
from kmip.core.messages.payloads import register
from kmip.core import misc
from kmip.pie import sqltypes
from kmip.pie import factory
from kmip.pie import objects
from kmip.pie import sqltypes
from kmip.services.server import policy
from kmip.services.server.crypto import engine
@ -63,6 +67,9 @@ class KmipEngine(object):
* Key compression
* Key wrapping
* Key format conversions
* Registration of empty managed objects (e.g., Private Keys)
* Managed object state tracking
* Managed object usage limit tracking and enforcement
"""
def __init__(self):
@ -105,6 +112,8 @@ class KmipEngine(object):
enums.ObjectType.OPAQUE_DATA: objects.OpaqueObject
}
self._attribute_policy = policy.AttributePolicy(self._protocol_version)
def _kmip_version_supported(supported):
def decorator(function):
def wrapper(self, *args, **kwargs):
@ -133,6 +142,9 @@ class KmipEngine(object):
def _set_protocol_version(self, protocol_version):
if protocol_version in self._protocol_versions:
self._protocol_version = protocol_version
self._attribute_policy = policy.AttributePolicy(
self._protocol_version
)
else:
raise exceptions.InvalidMessage(
"KMIP {0} is not supported by the server.".format(
@ -479,8 +491,134 @@ class KmipEngine(object):
secret_factory = secrets.SecretFactory()
return secret_factory.create(object_type, value)
def _process_template_attribute(self, template_attribute):
"""
Given a kmip.core TemplateAttribute object, extract the attribute
value data into a usable dictionary format.
"""
attributes = {}
if len(template_attribute.names) > 0:
raise exceptions.ItemNotFound(
"Attribute templates are not supported."
)
for attribute in template_attribute.attributes:
name = attribute.attribute_name.value
if not self._attribute_policy.is_attribute_supported(name):
raise exceptions.InvalidField(
"The {0} attribute is unsupported.".format(name)
)
if self._attribute_policy.is_attribute_multivalued(name):
values = attributes.get(name, list())
if (not attribute.attribute_index) and len(values) > 0:
raise exceptions.InvalidField(
"Attribute index missing from multivalued attribute."
)
values.append(attribute.attribute_value)
attributes.update([(name, values)])
else:
if attribute.attribute_index:
if attribute.attribute_index.value != 0:
raise exceptions.InvalidField(
"Non-zero attribute index found for "
"single-valued attribute."
)
value = attributes.get(name, None)
if value:
raise exceptions.IndexOutOfBounds(
"Cannot set multiple instances of the "
"{0} attribute.".format(name)
)
else:
attributes.update([(name, attribute.attribute_value)])
return attributes
def _set_attributes_on_managed_object(self, managed_object, attributes):
"""
Given a kmip.pie object and a dictionary of attributes, attempt to set
the attribute values on the object.
"""
for attribute_name, attribute_value in six.iteritems(attributes):
object_type = managed_object._object_type
if self._attribute_policy.is_attribute_applicable_to_object_type(
attribute_name,
object_type):
self._set_attribute_on_managed_object(
managed_object,
(attribute_name, attribute_value)
)
else:
name = object_type.name
raise exceptions.InvalidField(
"Cannot set {0} attribute on {1} object.".format(
attribute_name,
''.join([x.capitalize() for x in name.split('_')])
)
)
def _set_attribute_on_managed_object(self, managed_object, attribute):
"""
Set the attribute value on the kmip.pie managed object.
"""
attribute_name = attribute[0]
attribute_value = attribute[1]
if self._attribute_policy.is_attribute_multivalued(attribute_name):
if attribute_name == 'Name':
managed_object.names.extend(
[x.name_value.value for x in attribute_value]
)
for name in managed_object.names:
if managed_object.names.count(name) > 1:
raise exceptions.InvalidField(
"Cannot set duplicate name values."
)
else:
# TODO (peterhamilton) Remove when all attributes are supported
raise exceptions.InvalidField(
"The {0} attribute is unsupported.".format(attribute_name)
)
else:
field = None
value = attribute_value.value
if attribute_name == 'Cryptographic Algorithm':
field = 'cryptographic_algorithm'
elif attribute_name == 'Cryptographic Length':
field = 'cryptographic_length'
elif attribute_name == 'Cryptographic Usage Mask':
field = 'cryptographic_usage_masks'
value = list()
for e in enums.CryptographicUsageMask:
if e.value & attribute_value.value:
value.append(e)
if field:
existing_value = getattr(managed_object, field)
if existing_value:
if existing_value != value:
raise exceptions.InvalidField(
"Cannot overwrite the {0} attribute.".format(
attribute_name
)
)
else:
setattr(managed_object, field, value)
else:
# TODO (peterhamilton) Remove when all attributes are supported
raise exceptions.InvalidField(
"The {0} attribute is unsupported.".format(attribute_name)
)
def _process_operation(self, operation, payload):
if operation == enums.Operation.GET:
if operation == enums.Operation.REGISTER:
return self._process_register(payload)
elif operation == enums.Operation.GET:
return self._process_get(payload)
elif operation == enums.Operation.DESTROY:
return self._process_destroy(payload)
@ -495,6 +633,65 @@ class KmipEngine(object):
)
)
@_kmip_version_supported('1.0')
def _process_register(self, payload):
self._logger.info("Processing operation: Register")
object_type = payload.object_type.value
template_attribute = payload.template_attribute
if self._object_map.get(object_type) is None:
name = object_type.name
raise exceptions.InvalidField(
"The {0} object type is not supported.".format(
''.join(
[x.capitalize() for x in name.split('_')]
)
)
)
if payload.secret:
secret = payload.secret
else:
# TODO (peterhamilton) It is possible to register 'empty' secrets
# like Private Keys. For now, that feature is not supported.
raise exceptions.InvalidField(
"Cannot register a secret in absentia."
)
object_attributes = {}
if template_attribute:
object_attributes = self._process_template_attribute(
template_attribute
)
managed_object_factory = factory.ObjectFactory()
managed_object = managed_object_factory.convert(secret)
managed_object.names = []
self._set_attributes_on_managed_object(
managed_object,
object_attributes
)
# TODO (peterhamilton) Set additional server-only attributes.
self._data_session.add(managed_object)
# NOTE (peterhamilton) SQLAlchemy will *not* assign an ID until
# commit is called. This makes future support for UNDO problematic.
self._data_session.commit()
response_payload = register.RegisterResponsePayload(
unique_identifier=attributes.UniqueIdentifier(
str(managed_object.unique_identifier)
)
)
self._id_placeholder = str(managed_object.unique_identifier)
return response_payload
@_kmip_version_supported('1.0')
def _process_get(self, payload):
self._logger.info("Processing operation: Get")
@ -592,6 +789,7 @@ class KmipEngine(object):
if enums.QueryFunction.QUERY_OPERATIONS in queries:
operations = list([
contents.Operation(enums.Operation.REGISTER),
contents.Operation(enums.Operation.GET),
contents.Operation(enums.Operation.DESTROY),
contents.Operation(enums.Operation.QUERY)

View File

@ -30,6 +30,8 @@ from kmip.core import misc
from kmip.core import objects
from kmip.core import secrets
from kmip.core.factories import attributes as factory
from kmip.core.messages import contents
from kmip.core.messages import messages
@ -37,6 +39,7 @@ from kmip.core.messages.payloads import destroy
from kmip.core.messages.payloads import discover_versions
from kmip.core.messages.payloads import get
from kmip.core.messages.payloads import query
from kmip.core.messages.payloads import register
from kmip.pie import objects as pie_objects
from kmip.pie import sqltypes
@ -640,16 +643,19 @@ class TestKmipEngine(testtools.TestCase):
e = engine.KmipEngine()
e._logger = mock.MagicMock()
e._process_register = mock.MagicMock()
e._process_get = mock.MagicMock()
e._process_destroy = mock.MagicMock()
e._process_query = mock.MagicMock()
e._process_discover_versions = mock.MagicMock()
e._process_operation(enums.Operation.REGISTER, None)
e._process_operation(enums.Operation.GET, None)
e._process_operation(enums.Operation.DESTROY, None)
e._process_operation(enums.Operation.QUERY, None)
e._process_operation(enums.Operation.DISCOVER_VERSIONS, None)
e._process_register.assert_called_with(None)
e._process_get.assert_called_with(None)
e._process_destroy.assert_called_with(None)
e._process_query.assert_called_with(None)
@ -942,6 +948,634 @@ class TestKmipEngine(testtools.TestCase):
*args
)
def test_process_template_attribute(self):
"""
Test that a template attribute structure can be processed correctly.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
attribute_factory = factory.AttributeFactory()
name = attribute_factory.create_attribute(
enums.AttributeType.NAME,
attributes.Name.create(
'Test Symmetric Key',
enums.NameType.UNINTERPRETED_TEXT_STRING
)
)
algorithm = attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
enums.CryptographicAlgorithm.AES
)
length = attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_LENGTH,
128
)
mask = attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
[
enums.CryptographicUsageMask.ENCRYPT,
enums.CryptographicUsageMask.DECRYPT
]
)
template_attribute = objects.TemplateAttribute(
attributes=[name, algorithm, length, mask]
)
result = e._process_template_attribute(template_attribute)
self.assertIsInstance(result, dict)
self.assertEqual(4, len(result.keys()))
self.assertIn('Name', result.keys())
self.assertIn('Cryptographic Algorithm', result.keys())
self.assertIn('Cryptographic Length', result.keys())
self.assertIn('Cryptographic Usage Mask', result.keys())
self.assertEqual([name.attribute_value], result.get('Name'))
self.assertEqual(
algorithm.attribute_value,
result.get('Cryptographic Algorithm')
)
self.assertEqual(
length.attribute_value,
result.get('Cryptographic Length')
)
self.assertEqual(
mask.attribute_value,
result.get('Cryptographic Usage Mask')
)
def test_process_template_attribute_unsupported_features(self):
"""
Test that the right errors are generated when unsupported features
are referenced while processing a template attribute.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
attribute_factory = factory.AttributeFactory()
# Test that providing template names generates an InvalidField error.
template_attribute = objects.TemplateAttribute(
names=[
attributes.Name.create(
'invalid',
enums.NameType.UNINTERPRETED_TEXT_STRING
)
]
)
args = (template_attribute, )
regex = "Attribute templates are not supported."
self.assertRaisesRegexp(
exceptions.ItemNotFound,
regex,
e._process_template_attribute,
*args
)
# Test that an unrecognized attribute generates an InvalidField error.
name = attribute_factory.create_attribute(
enums.AttributeType.NAME,
attributes.Name.create(
'Test Symmetric Key',
enums.NameType.UNINTERPRETED_TEXT_STRING
)
)
name.attribute_name.value = 'invalid'
template_attribute = objects.TemplateAttribute(attributes=[name])
args = (template_attribute, )
regex = "The invalid attribute is unsupported."
self.assertRaisesRegexp(
exceptions.InvalidField,
regex,
e._process_template_attribute,
*args
)
# Test that missing indices generate an InvalidField error.
name_a = attribute_factory.create_attribute(
enums.AttributeType.NAME,
attributes.Name.create(
'Test Symmetric Key',
enums.NameType.UNINTERPRETED_TEXT_STRING
)
)
name_b = attribute_factory.create_attribute(
enums.AttributeType.NAME,
attributes.Name.create(
'Test Symmetric Key',
enums.NameType.UNINTERPRETED_TEXT_STRING
)
)
template_attribute = objects.TemplateAttribute(
attributes=[name_a, name_b]
)
args = (template_attribute, )
regex = "Attribute index missing from multivalued attribute."
self.assertRaisesRegexp(
exceptions.InvalidField,
regex,
e._process_template_attribute,
*args
)
# Test that a non-zero index generates an InvalidField error.
algorithm = attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
enums.CryptographicAlgorithm.AES,
1
)
template_attribute = objects.TemplateAttribute(attributes=[algorithm])
args = (template_attribute, )
regex = "Non-zero attribute index found for single-valued attribute."
self.assertRaisesRegexp(
exceptions.InvalidField,
regex,
e._process_template_attribute,
*args
)
# Test that setting multiple values for a single-value attribute
# generates an InvalidField error.
algorithm_a = attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
enums.CryptographicAlgorithm.AES
)
algorithm_b = attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
enums.CryptographicAlgorithm.TRIPLE_DES
)
template_attribute = objects.TemplateAttribute(
attributes=[algorithm_a, algorithm_b]
)
args = (template_attribute, )
regex = (
"Cannot set multiple instances of the Cryptographic Algorithm "
"attribute."
)
self.assertRaisesRegexp(
exceptions.IndexOutOfBounds,
regex,
e._process_template_attribute,
*args
)
def test_set_attributes_on_managed_object(self):
"""
Test that multiple attributes can be set on a given managed object.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
managed_object = pie_objects.SecretData(
b'',
enums.SecretDataType.PASSWORD
)
managed_object.names = []
attribute_factory = factory.AttributeFactory()
name = attribute_factory.create_attribute(
enums.AttributeType.NAME,
attributes.Name.create(
'Test Secret Data',
enums.NameType.UNINTERPRETED_TEXT_STRING
)
)
mask = attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
[
enums.CryptographicUsageMask.ENCRYPT,
enums.CryptographicUsageMask.DECRYPT
]
)
template_attribute = objects.TemplateAttribute(
attributes=[name, mask]
)
object_attributes = e._process_template_attribute(template_attribute)
self.assertEqual([], managed_object.names)
self.assertEqual([], managed_object.cryptographic_usage_masks)
e._set_attributes_on_managed_object(
managed_object,
object_attributes
)
self.assertEqual(['Test Secret Data'], managed_object.names)
self.assertEqual(
[
enums.CryptographicUsageMask.ENCRYPT,
enums.CryptographicUsageMask.DECRYPT
],
managed_object.cryptographic_usage_masks
)
def test_set_attributes_on_managed_object_attribute_mismatch(self):
"""
Test that an InvalidField error is generated when attempting to set
an attribute that is not applicable for a given managed object.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
managed_object = pie_objects.OpaqueObject(
b'',
enums.OpaqueDataType.NONE
)
attribute_factory = factory.AttributeFactory()
mask = attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
[
enums.CryptographicUsageMask.ENCRYPT,
enums.CryptographicUsageMask.DECRYPT
]
)
template_attribute = objects.TemplateAttribute(attributes=[mask])
object_attributes = e._process_template_attribute(template_attribute)
args = (managed_object, object_attributes)
regex = (
"Cannot set Cryptographic Usage Mask attribute on OpaqueData "
"object."
)
self.assertRaisesRegexp(
exceptions.InvalidField,
regex,
e._set_attributes_on_managed_object,
*args
)
def test_set_attribute_on_managed_object(self):
"""
Test that various attributes can be set correctly on a given
managed object.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
attribute_factory = factory.AttributeFactory()
name = attribute_factory.create_attribute(
enums.AttributeType.NAME,
attributes.Name.create(
'Test Symmetric Key',
enums.NameType.UNINTERPRETED_TEXT_STRING
)
)
algorithm = attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
enums.CryptographicAlgorithm.AES
)
length = attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_LENGTH,
0
)
mask = attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
[
enums.CryptographicUsageMask.ENCRYPT,
enums.CryptographicUsageMask.DECRYPT
]
)
managed_object = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.AES,
0,
b''
)
managed_object.names = []
self.assertEqual([], managed_object.names)
self.assertEqual(
enums.CryptographicAlgorithm.AES,
managed_object.cryptographic_algorithm
)
self.assertEqual(0, managed_object.cryptographic_length)
self.assertEqual([], managed_object.cryptographic_usage_masks)
e._set_attribute_on_managed_object(
managed_object,
('Name', [name.attribute_value])
)
self.assertEqual(['Test Symmetric Key'], managed_object.names)
e._set_attribute_on_managed_object(
managed_object,
('Cryptographic Algorithm', algorithm.attribute_value)
)
self.assertEqual(
enums.CryptographicAlgorithm.AES,
managed_object.cryptographic_algorithm
)
e._set_attribute_on_managed_object(
managed_object,
('Cryptographic Length', length.attribute_value)
)
self.assertEqual(0, managed_object.cryptographic_length)
e._set_attribute_on_managed_object(
managed_object,
('Cryptographic Usage Mask', mask.attribute_value)
)
self.assertEqual(
[
enums.CryptographicUsageMask.ENCRYPT,
enums.CryptographicUsageMask.DECRYPT
],
managed_object.cryptographic_usage_masks
)
def test_set_attribute_on_managed_object_unsupported_features(self):
"""
Test that the right errors are generated when unsupported features
are referenced while setting managed object attributes.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
attribute_factory = factory.AttributeFactory()
managed_object = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.AES,
8,
b'\x00'
)
# Test that multiple duplicate names cannot be set on an object.
name_a = attribute_factory.create_attribute(
enums.AttributeType.NAME,
attributes.Name.create(
'Test Symmetric Key',
enums.NameType.UNINTERPRETED_TEXT_STRING
)
)
name_b = attribute_factory.create_attribute(
enums.AttributeType.NAME,
attributes.Name.create(
'Test Symmetric Key',
enums.NameType.UNINTERPRETED_TEXT_STRING
)
)
args = (
managed_object,
('Name', [name_a.attribute_value, name_b.attribute_value])
)
regex = "Cannot set duplicate name values."
self.assertRaisesRegexp(
exceptions.InvalidField,
regex,
e._set_attribute_on_managed_object,
*args
)
# Test that a multivalued, unsupported attribute cannot be set on an
# object.
name_a = attribute_factory.create_attribute(
enums.AttributeType.NAME,
attributes.Name.create(
'Test Symmetric Key',
enums.NameType.UNINTERPRETED_TEXT_STRING
)
)
name_b = attribute_factory.create_attribute(
enums.AttributeType.NAME,
attributes.Name.create(
'Test Symmetric Key',
enums.NameType.UNINTERPRETED_TEXT_STRING
)
)
args = (
managed_object,
('Digest', [name_a.attribute_value, name_b.attribute_value])
)
regex = "The Digest attribute is unsupported."
self.assertRaisesRegexp(
exceptions.InvalidField,
regex,
e._set_attribute_on_managed_object,
*args
)
# Test that a set attribute cannot be overwritten.
length = attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_LENGTH,
128
)
args = (
managed_object,
('Cryptographic Length', length.attribute_value)
)
regex = "Cannot overwrite the Cryptographic Length attribute."
self.assertRaisesRegexp(
exceptions.InvalidField,
regex,
e._set_attribute_on_managed_object,
*args
)
# Test that an unsupported attribute cannot be set.
object_group = attribute_factory.create_attribute(
enums.AttributeType.OBJECT_GROUP,
'Test Group'
)
args = (
managed_object,
('Object Group', object_group.attribute_value)
)
regex = "The Object Group attribute is unsupported."
self.assertRaisesRegexp(
exceptions.InvalidField,
regex,
e._set_attribute_on_managed_object,
*args
)
def test_register(self):
"""
Test that a Register request can be processed correctly.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
attribute_factory = factory.AttributeFactory()
# Build a SymmetricKey for registration.
object_type = attributes.ObjectType(enums.ObjectType.SYMMETRIC_KEY)
template_attribute = objects.TemplateAttribute(
attributes=[
attribute_factory.create_attribute(
enums.AttributeType.NAME,
attributes.Name.create(
'Test Symmetric Key',
enums.NameType.UNINTERPRETED_TEXT_STRING
)
),
attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
enums.CryptographicAlgorithm.AES
),
attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_LENGTH,
128
),
attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
[
enums.CryptographicUsageMask.ENCRYPT,
enums.CryptographicUsageMask.DECRYPT
]
)
]
)
key_bytes = (
b'\x00\x00\x00\x00\x00\x00\x00\x00'
b'\x00\x00\x00\x00\x00\x00\x00\x00'
)
secret = secrets.SymmetricKey(
key_block=objects.KeyBlock(
key_format_type=misc.KeyFormatType(enums.KeyFormatType.RAW),
key_value=objects.KeyValue(
key_material=objects.KeyMaterial(key_bytes)
),
cryptographic_algorithm=attributes.CryptographicAlgorithm(
enums.CryptographicAlgorithm.AES
),
cryptographic_length=attributes.CryptographicLength(128)
)
)
payload = register.RegisterRequestPayload(
object_type=object_type,
template_attribute=template_attribute,
secret=secret
)
response_payload = e._process_register(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_called_once_with(
"Processing operation: Register"
)
uid = response_payload.unique_identifier.value
self.assertEqual('1', uid)
# Retrieve the stored object and verify all attributes were set
# appropriately.
symmetric_key = e._data_session.query(
pie_objects.SymmetricKey
).filter(
pie_objects.ManagedObject.unique_identifier == uid
).one()
self.assertEqual(
enums.KeyFormatType.RAW,
symmetric_key.key_format_type
)
self.assertEqual(1, len(symmetric_key.names))
self.assertIn('Test Symmetric Key', symmetric_key.names)
self.assertEqual(key_bytes, symmetric_key.value)
self.assertEqual(
enums.CryptographicAlgorithm.AES,
symmetric_key.cryptographic_algorithm
)
self.assertEqual(128, symmetric_key.cryptographic_length)
self.assertEqual(2, len(symmetric_key.cryptographic_usage_masks))
self.assertIn(
enums.CryptographicUsageMask.ENCRYPT,
symmetric_key.cryptographic_usage_masks
)
self.assertIn(
enums.CryptographicUsageMask.DECRYPT,
symmetric_key.cryptographic_usage_masks
)
self.assertEqual(uid, e._id_placeholder)
def test_register_unsupported_object_type(self):
"""
Test that an InvalidField error is generated when attempting to
register an unsupported object type.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
object_type = attributes.ObjectType(enums.ObjectType.SPLIT_KEY)
payload = register.RegisterRequestPayload(object_type=object_type)
args = (payload, )
regex = "The SplitKey object type is not supported."
self.assertRaisesRegexp(
exceptions.InvalidField,
regex,
e._process_register,
*args
)
def test_request_omitting_secret(self):
"""
Test that an InvalidField error is generate when trying to register
a secret in absentia.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
object_type = attributes.ObjectType(enums.ObjectType.SYMMETRIC_KEY)
payload = register.RegisterRequestPayload(object_type=object_type)
args = (payload, )
regex = "Cannot register a secret in absentia."
self.assertRaisesRegexp(
exceptions.InvalidField,
regex,
e._process_register,
*args
)
def test_get(self):
"""
Test that a Get request can be processed correctly.
@ -1278,19 +1912,23 @@ class TestKmipEngine(testtools.TestCase):
e._logger.info.assert_called_once_with("Processing operation: Query")
self.assertIsInstance(result, query.QueryResponsePayload)
self.assertIsNotNone(result.operations)
self.assertEqual(3, len(result.operations))
self.assertEqual(4, len(result.operations))
self.assertEqual(
enums.Operation.GET,
enums.Operation.REGISTER,
result.operations[0].value
)
self.assertEqual(
enums.Operation.DESTROY,
enums.Operation.GET,
result.operations[1].value
)
self.assertEqual(
enums.Operation.QUERY,
enums.Operation.DESTROY,
result.operations[2].value
)
self.assertEqual(
enums.Operation.QUERY,
result.operations[3].value
)
self.assertEqual(list(), result.object_types)
self.assertIsNotNone(result.vendor_identification)
self.assertEqual(
@ -1309,7 +1947,7 @@ class TestKmipEngine(testtools.TestCase):
e._logger.info.assert_called_once_with("Processing operation: Query")
self.assertIsNotNone(result.operations)
self.assertEqual(4, len(result.operations))
self.assertEqual(5, len(result.operations))
self.assertEqual(
enums.Operation.DISCOVER_VERSIONS,
result.operations[-1].value
@ -1380,3 +2018,141 @@ class TestKmipEngine(testtools.TestCase):
"Processing operation: DiscoverVersions"
)
self.assertEqual([], result.protocol_versions)
def test_register_get_destroy(self):
"""
Test that a managed object can be registered, retrieved, and destroyed
without error.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
attribute_factory = factory.AttributeFactory()
# Build a SymmetricKey for registration.
object_type = attributes.ObjectType(enums.ObjectType.SYMMETRIC_KEY)
template_attribute = objects.TemplateAttribute(
attributes=[
attribute_factory.create_attribute(
enums.AttributeType.NAME,
attributes.Name.create(
'Test Symmetric Key',
enums.NameType.UNINTERPRETED_TEXT_STRING
)
),
attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
enums.CryptographicAlgorithm.AES
),
attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_LENGTH,
128
),
attribute_factory.create_attribute(
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
[
enums.CryptographicUsageMask.ENCRYPT,
enums.CryptographicUsageMask.DECRYPT
]
)
]
)
key_bytes = (
b'\x00\x00\x00\x00\x00\x00\x00\x00'
b'\x00\x00\x00\x00\x00\x00\x00\x00'
)
secret = secrets.SymmetricKey(
key_block=objects.KeyBlock(
key_format_type=misc.KeyFormatType(enums.KeyFormatType.RAW),
key_value=objects.KeyValue(
key_material=objects.KeyMaterial(key_bytes)
),
cryptographic_algorithm=attributes.CryptographicAlgorithm(
enums.CryptographicAlgorithm.AES
),
cryptographic_length=attributes.CryptographicLength(128)
)
)
# Register the symmetric key with the corresponding attributes
payload = register.RegisterRequestPayload(
object_type=object_type,
template_attribute=template_attribute,
secret=secret
)
response_payload = e._process_register(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_called_once_with(
"Processing operation: Register"
)
uid = response_payload.unique_identifier.value
self.assertEqual('1', uid)
e._logger.reset_mock()
# Retrieve the registered key using Get and verify all fields set
payload = get.GetRequestPayload(
unique_identifier=attributes.UniqueIdentifier(uid)
)
response_payload = e._process_get(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_called_once_with(
"Processing operation: Get"
)
self.assertEqual(
enums.ObjectType.SYMMETRIC_KEY,
response_payload.object_type.value
)
self.assertEqual(str(uid), response_payload.unique_identifier.value)
self.assertIsInstance(response_payload.secret, secrets.SymmetricKey)
self.assertEqual(
key_bytes,
response_payload.secret.key_block.key_value.key_material.value
)
self.assertEqual(
enums.KeyFormatType.RAW,
response_payload.secret.key_block.key_format_type.value
)
self.assertEqual(
enums.CryptographicAlgorithm.AES,
response_payload.secret.key_block.cryptographic_algorithm.value
)
self.assertEqual(
128,
response_payload.secret.key_block.cryptographic_length.value
)
e._logger.reset_mock()
# Destroy the symmetric key and verify it cannot be accessed again
payload = destroy.DestroyRequestPayload(
unique_identifier=attributes.UniqueIdentifier(uid)
)
response_payload = e._process_destroy(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_called_once_with(
"Processing operation: Destroy"
)
self.assertEqual(str(uid), response_payload.unique_identifier.value)
self.assertRaises(
exc.NoResultFound,
e._data_session.query(pie_objects.OpaqueObject).filter(
pie_objects.ManagedObject.unique_identifier == uid
).one
)
e._data_session.commit()
e._data_store_session_factory()