mirror of https://github.com/OpenKMIP/PyKMIP.git
Merge pull request #146 from OpenKMIP/feat/add-kmip-engine-register
Adding KmipEngine support for Register
This commit is contained in:
commit
995d458654
|
@ -14,6 +14,7 @@
|
|||
# under the License.
|
||||
|
||||
import logging
|
||||
import six
|
||||
import sqlalchemy
|
||||
|
||||
from sqlalchemy.orm import exc
|
||||
|
@ -35,12 +36,15 @@ from kmip.core.messages.payloads import destroy
|
|||
from kmip.core.messages.payloads import discover_versions
|
||||
from kmip.core.messages.payloads import get
|
||||
from kmip.core.messages.payloads import query
|
||||
from kmip.core.messages.payloads import register
|
||||
|
||||
from kmip.core import misc
|
||||
|
||||
from kmip.pie import sqltypes
|
||||
from kmip.pie import factory
|
||||
from kmip.pie import objects
|
||||
from kmip.pie import sqltypes
|
||||
|
||||
from kmip.services.server import policy
|
||||
from kmip.services.server.crypto import engine
|
||||
|
||||
|
||||
|
@ -63,6 +67,9 @@ class KmipEngine(object):
|
|||
* Key compression
|
||||
* Key wrapping
|
||||
* Key format conversions
|
||||
* Registration of empty managed objects (e.g., Private Keys)
|
||||
* Managed object state tracking
|
||||
* Managed object usage limit tracking and enforcement
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
|
@ -105,6 +112,8 @@ class KmipEngine(object):
|
|||
enums.ObjectType.OPAQUE_DATA: objects.OpaqueObject
|
||||
}
|
||||
|
||||
self._attribute_policy = policy.AttributePolicy(self._protocol_version)
|
||||
|
||||
def _kmip_version_supported(supported):
|
||||
def decorator(function):
|
||||
def wrapper(self, *args, **kwargs):
|
||||
|
@ -133,6 +142,9 @@ class KmipEngine(object):
|
|||
def _set_protocol_version(self, protocol_version):
|
||||
if protocol_version in self._protocol_versions:
|
||||
self._protocol_version = protocol_version
|
||||
self._attribute_policy = policy.AttributePolicy(
|
||||
self._protocol_version
|
||||
)
|
||||
else:
|
||||
raise exceptions.InvalidMessage(
|
||||
"KMIP {0} is not supported by the server.".format(
|
||||
|
@ -479,8 +491,134 @@ class KmipEngine(object):
|
|||
secret_factory = secrets.SecretFactory()
|
||||
return secret_factory.create(object_type, value)
|
||||
|
||||
def _process_template_attribute(self, template_attribute):
|
||||
"""
|
||||
Given a kmip.core TemplateAttribute object, extract the attribute
|
||||
value data into a usable dictionary format.
|
||||
"""
|
||||
attributes = {}
|
||||
|
||||
if len(template_attribute.names) > 0:
|
||||
raise exceptions.ItemNotFound(
|
||||
"Attribute templates are not supported."
|
||||
)
|
||||
|
||||
for attribute in template_attribute.attributes:
|
||||
name = attribute.attribute_name.value
|
||||
|
||||
if not self._attribute_policy.is_attribute_supported(name):
|
||||
raise exceptions.InvalidField(
|
||||
"The {0} attribute is unsupported.".format(name)
|
||||
)
|
||||
|
||||
if self._attribute_policy.is_attribute_multivalued(name):
|
||||
values = attributes.get(name, list())
|
||||
if (not attribute.attribute_index) and len(values) > 0:
|
||||
raise exceptions.InvalidField(
|
||||
"Attribute index missing from multivalued attribute."
|
||||
)
|
||||
|
||||
values.append(attribute.attribute_value)
|
||||
attributes.update([(name, values)])
|
||||
else:
|
||||
if attribute.attribute_index:
|
||||
if attribute.attribute_index.value != 0:
|
||||
raise exceptions.InvalidField(
|
||||
"Non-zero attribute index found for "
|
||||
"single-valued attribute."
|
||||
)
|
||||
value = attributes.get(name, None)
|
||||
if value:
|
||||
raise exceptions.IndexOutOfBounds(
|
||||
"Cannot set multiple instances of the "
|
||||
"{0} attribute.".format(name)
|
||||
)
|
||||
else:
|
||||
attributes.update([(name, attribute.attribute_value)])
|
||||
|
||||
return attributes
|
||||
|
||||
def _set_attributes_on_managed_object(self, managed_object, attributes):
|
||||
"""
|
||||
Given a kmip.pie object and a dictionary of attributes, attempt to set
|
||||
the attribute values on the object.
|
||||
"""
|
||||
for attribute_name, attribute_value in six.iteritems(attributes):
|
||||
object_type = managed_object._object_type
|
||||
if self._attribute_policy.is_attribute_applicable_to_object_type(
|
||||
attribute_name,
|
||||
object_type):
|
||||
self._set_attribute_on_managed_object(
|
||||
managed_object,
|
||||
(attribute_name, attribute_value)
|
||||
)
|
||||
else:
|
||||
name = object_type.name
|
||||
raise exceptions.InvalidField(
|
||||
"Cannot set {0} attribute on {1} object.".format(
|
||||
attribute_name,
|
||||
''.join([x.capitalize() for x in name.split('_')])
|
||||
)
|
||||
)
|
||||
|
||||
def _set_attribute_on_managed_object(self, managed_object, attribute):
|
||||
"""
|
||||
Set the attribute value on the kmip.pie managed object.
|
||||
"""
|
||||
attribute_name = attribute[0]
|
||||
attribute_value = attribute[1]
|
||||
|
||||
if self._attribute_policy.is_attribute_multivalued(attribute_name):
|
||||
if attribute_name == 'Name':
|
||||
managed_object.names.extend(
|
||||
[x.name_value.value for x in attribute_value]
|
||||
)
|
||||
for name in managed_object.names:
|
||||
if managed_object.names.count(name) > 1:
|
||||
raise exceptions.InvalidField(
|
||||
"Cannot set duplicate name values."
|
||||
)
|
||||
else:
|
||||
# TODO (peterhamilton) Remove when all attributes are supported
|
||||
raise exceptions.InvalidField(
|
||||
"The {0} attribute is unsupported.".format(attribute_name)
|
||||
)
|
||||
else:
|
||||
field = None
|
||||
value = attribute_value.value
|
||||
|
||||
if attribute_name == 'Cryptographic Algorithm':
|
||||
field = 'cryptographic_algorithm'
|
||||
elif attribute_name == 'Cryptographic Length':
|
||||
field = 'cryptographic_length'
|
||||
elif attribute_name == 'Cryptographic Usage Mask':
|
||||
field = 'cryptographic_usage_masks'
|
||||
value = list()
|
||||
for e in enums.CryptographicUsageMask:
|
||||
if e.value & attribute_value.value:
|
||||
value.append(e)
|
||||
|
||||
if field:
|
||||
existing_value = getattr(managed_object, field)
|
||||
if existing_value:
|
||||
if existing_value != value:
|
||||
raise exceptions.InvalidField(
|
||||
"Cannot overwrite the {0} attribute.".format(
|
||||
attribute_name
|
||||
)
|
||||
)
|
||||
else:
|
||||
setattr(managed_object, field, value)
|
||||
else:
|
||||
# TODO (peterhamilton) Remove when all attributes are supported
|
||||
raise exceptions.InvalidField(
|
||||
"The {0} attribute is unsupported.".format(attribute_name)
|
||||
)
|
||||
|
||||
def _process_operation(self, operation, payload):
|
||||
if operation == enums.Operation.GET:
|
||||
if operation == enums.Operation.REGISTER:
|
||||
return self._process_register(payload)
|
||||
elif operation == enums.Operation.GET:
|
||||
return self._process_get(payload)
|
||||
elif operation == enums.Operation.DESTROY:
|
||||
return self._process_destroy(payload)
|
||||
|
@ -495,6 +633,65 @@ class KmipEngine(object):
|
|||
)
|
||||
)
|
||||
|
||||
@_kmip_version_supported('1.0')
|
||||
def _process_register(self, payload):
|
||||
self._logger.info("Processing operation: Register")
|
||||
|
||||
object_type = payload.object_type.value
|
||||
template_attribute = payload.template_attribute
|
||||
|
||||
if self._object_map.get(object_type) is None:
|
||||
name = object_type.name
|
||||
raise exceptions.InvalidField(
|
||||
"The {0} object type is not supported.".format(
|
||||
''.join(
|
||||
[x.capitalize() for x in name.split('_')]
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
if payload.secret:
|
||||
secret = payload.secret
|
||||
else:
|
||||
# TODO (peterhamilton) It is possible to register 'empty' secrets
|
||||
# like Private Keys. For now, that feature is not supported.
|
||||
raise exceptions.InvalidField(
|
||||
"Cannot register a secret in absentia."
|
||||
)
|
||||
|
||||
object_attributes = {}
|
||||
if template_attribute:
|
||||
object_attributes = self._process_template_attribute(
|
||||
template_attribute
|
||||
)
|
||||
|
||||
managed_object_factory = factory.ObjectFactory()
|
||||
managed_object = managed_object_factory.convert(secret)
|
||||
managed_object.names = []
|
||||
|
||||
self._set_attributes_on_managed_object(
|
||||
managed_object,
|
||||
object_attributes
|
||||
)
|
||||
|
||||
# TODO (peterhamilton) Set additional server-only attributes.
|
||||
|
||||
self._data_session.add(managed_object)
|
||||
|
||||
# NOTE (peterhamilton) SQLAlchemy will *not* assign an ID until
|
||||
# commit is called. This makes future support for UNDO problematic.
|
||||
self._data_session.commit()
|
||||
|
||||
response_payload = register.RegisterResponsePayload(
|
||||
unique_identifier=attributes.UniqueIdentifier(
|
||||
str(managed_object.unique_identifier)
|
||||
)
|
||||
)
|
||||
|
||||
self._id_placeholder = str(managed_object.unique_identifier)
|
||||
|
||||
return response_payload
|
||||
|
||||
@_kmip_version_supported('1.0')
|
||||
def _process_get(self, payload):
|
||||
self._logger.info("Processing operation: Get")
|
||||
|
@ -592,6 +789,7 @@ class KmipEngine(object):
|
|||
|
||||
if enums.QueryFunction.QUERY_OPERATIONS in queries:
|
||||
operations = list([
|
||||
contents.Operation(enums.Operation.REGISTER),
|
||||
contents.Operation(enums.Operation.GET),
|
||||
contents.Operation(enums.Operation.DESTROY),
|
||||
contents.Operation(enums.Operation.QUERY)
|
||||
|
|
|
@ -30,6 +30,8 @@ from kmip.core import misc
|
|||
from kmip.core import objects
|
||||
from kmip.core import secrets
|
||||
|
||||
from kmip.core.factories import attributes as factory
|
||||
|
||||
from kmip.core.messages import contents
|
||||
from kmip.core.messages import messages
|
||||
|
||||
|
@ -37,6 +39,7 @@ from kmip.core.messages.payloads import destroy
|
|||
from kmip.core.messages.payloads import discover_versions
|
||||
from kmip.core.messages.payloads import get
|
||||
from kmip.core.messages.payloads import query
|
||||
from kmip.core.messages.payloads import register
|
||||
|
||||
from kmip.pie import objects as pie_objects
|
||||
from kmip.pie import sqltypes
|
||||
|
@ -640,16 +643,19 @@ class TestKmipEngine(testtools.TestCase):
|
|||
e = engine.KmipEngine()
|
||||
e._logger = mock.MagicMock()
|
||||
|
||||
e._process_register = mock.MagicMock()
|
||||
e._process_get = mock.MagicMock()
|
||||
e._process_destroy = mock.MagicMock()
|
||||
e._process_query = mock.MagicMock()
|
||||
e._process_discover_versions = mock.MagicMock()
|
||||
|
||||
e._process_operation(enums.Operation.REGISTER, None)
|
||||
e._process_operation(enums.Operation.GET, None)
|
||||
e._process_operation(enums.Operation.DESTROY, None)
|
||||
e._process_operation(enums.Operation.QUERY, None)
|
||||
e._process_operation(enums.Operation.DISCOVER_VERSIONS, None)
|
||||
|
||||
e._process_register.assert_called_with(None)
|
||||
e._process_get.assert_called_with(None)
|
||||
e._process_destroy.assert_called_with(None)
|
||||
e._process_query.assert_called_with(None)
|
||||
|
@ -942,6 +948,634 @@ class TestKmipEngine(testtools.TestCase):
|
|||
*args
|
||||
)
|
||||
|
||||
def test_process_template_attribute(self):
|
||||
"""
|
||||
Test that a template attribute structure can be processed correctly.
|
||||
"""
|
||||
e = engine.KmipEngine()
|
||||
e._data_store = self.engine
|
||||
e._data_store_session_factory = self.session_factory
|
||||
e._data_session = e._data_store_session_factory()
|
||||
e._logger = mock.MagicMock()
|
||||
|
||||
attribute_factory = factory.AttributeFactory()
|
||||
|
||||
name = attribute_factory.create_attribute(
|
||||
enums.AttributeType.NAME,
|
||||
attributes.Name.create(
|
||||
'Test Symmetric Key',
|
||||
enums.NameType.UNINTERPRETED_TEXT_STRING
|
||||
)
|
||||
)
|
||||
algorithm = attribute_factory.create_attribute(
|
||||
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
|
||||
enums.CryptographicAlgorithm.AES
|
||||
)
|
||||
length = attribute_factory.create_attribute(
|
||||
enums.AttributeType.CRYPTOGRAPHIC_LENGTH,
|
||||
128
|
||||
)
|
||||
mask = attribute_factory.create_attribute(
|
||||
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
|
||||
[
|
||||
enums.CryptographicUsageMask.ENCRYPT,
|
||||
enums.CryptographicUsageMask.DECRYPT
|
||||
]
|
||||
)
|
||||
template_attribute = objects.TemplateAttribute(
|
||||
attributes=[name, algorithm, length, mask]
|
||||
)
|
||||
|
||||
result = e._process_template_attribute(template_attribute)
|
||||
|
||||
self.assertIsInstance(result, dict)
|
||||
self.assertEqual(4, len(result.keys()))
|
||||
self.assertIn('Name', result.keys())
|
||||
self.assertIn('Cryptographic Algorithm', result.keys())
|
||||
self.assertIn('Cryptographic Length', result.keys())
|
||||
self.assertIn('Cryptographic Usage Mask', result.keys())
|
||||
|
||||
self.assertEqual([name.attribute_value], result.get('Name'))
|
||||
self.assertEqual(
|
||||
algorithm.attribute_value,
|
||||
result.get('Cryptographic Algorithm')
|
||||
)
|
||||
self.assertEqual(
|
||||
length.attribute_value,
|
||||
result.get('Cryptographic Length')
|
||||
)
|
||||
self.assertEqual(
|
||||
mask.attribute_value,
|
||||
result.get('Cryptographic Usage Mask')
|
||||
)
|
||||
|
||||
def test_process_template_attribute_unsupported_features(self):
|
||||
"""
|
||||
Test that the right errors are generated when unsupported features
|
||||
are referenced while processing a template attribute.
|
||||
"""
|
||||
e = engine.KmipEngine()
|
||||
e._data_store = self.engine
|
||||
e._data_store_session_factory = self.session_factory
|
||||
e._data_session = e._data_store_session_factory()
|
||||
e._logger = mock.MagicMock()
|
||||
|
||||
attribute_factory = factory.AttributeFactory()
|
||||
|
||||
# Test that providing template names generates an InvalidField error.
|
||||
template_attribute = objects.TemplateAttribute(
|
||||
names=[
|
||||
attributes.Name.create(
|
||||
'invalid',
|
||||
enums.NameType.UNINTERPRETED_TEXT_STRING
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
args = (template_attribute, )
|
||||
regex = "Attribute templates are not supported."
|
||||
self.assertRaisesRegexp(
|
||||
exceptions.ItemNotFound,
|
||||
regex,
|
||||
e._process_template_attribute,
|
||||
*args
|
||||
)
|
||||
|
||||
# Test that an unrecognized attribute generates an InvalidField error.
|
||||
name = attribute_factory.create_attribute(
|
||||
enums.AttributeType.NAME,
|
||||
attributes.Name.create(
|
||||
'Test Symmetric Key',
|
||||
enums.NameType.UNINTERPRETED_TEXT_STRING
|
||||
)
|
||||
)
|
||||
name.attribute_name.value = 'invalid'
|
||||
template_attribute = objects.TemplateAttribute(attributes=[name])
|
||||
|
||||
args = (template_attribute, )
|
||||
regex = "The invalid attribute is unsupported."
|
||||
self.assertRaisesRegexp(
|
||||
exceptions.InvalidField,
|
||||
regex,
|
||||
e._process_template_attribute,
|
||||
*args
|
||||
)
|
||||
|
||||
# Test that missing indices generate an InvalidField error.
|
||||
name_a = attribute_factory.create_attribute(
|
||||
enums.AttributeType.NAME,
|
||||
attributes.Name.create(
|
||||
'Test Symmetric Key',
|
||||
enums.NameType.UNINTERPRETED_TEXT_STRING
|
||||
)
|
||||
)
|
||||
name_b = attribute_factory.create_attribute(
|
||||
enums.AttributeType.NAME,
|
||||
attributes.Name.create(
|
||||
'Test Symmetric Key',
|
||||
enums.NameType.UNINTERPRETED_TEXT_STRING
|
||||
)
|
||||
)
|
||||
|
||||
template_attribute = objects.TemplateAttribute(
|
||||
attributes=[name_a, name_b]
|
||||
)
|
||||
|
||||
args = (template_attribute, )
|
||||
regex = "Attribute index missing from multivalued attribute."
|
||||
self.assertRaisesRegexp(
|
||||
exceptions.InvalidField,
|
||||
regex,
|
||||
e._process_template_attribute,
|
||||
*args
|
||||
)
|
||||
|
||||
# Test that a non-zero index generates an InvalidField error.
|
||||
algorithm = attribute_factory.create_attribute(
|
||||
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
|
||||
enums.CryptographicAlgorithm.AES,
|
||||
1
|
||||
)
|
||||
template_attribute = objects.TemplateAttribute(attributes=[algorithm])
|
||||
|
||||
args = (template_attribute, )
|
||||
regex = "Non-zero attribute index found for single-valued attribute."
|
||||
self.assertRaisesRegexp(
|
||||
exceptions.InvalidField,
|
||||
regex,
|
||||
e._process_template_attribute,
|
||||
*args
|
||||
)
|
||||
|
||||
# Test that setting multiple values for a single-value attribute
|
||||
# generates an InvalidField error.
|
||||
algorithm_a = attribute_factory.create_attribute(
|
||||
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
|
||||
enums.CryptographicAlgorithm.AES
|
||||
)
|
||||
algorithm_b = attribute_factory.create_attribute(
|
||||
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
|
||||
enums.CryptographicAlgorithm.TRIPLE_DES
|
||||
)
|
||||
|
||||
template_attribute = objects.TemplateAttribute(
|
||||
attributes=[algorithm_a, algorithm_b]
|
||||
)
|
||||
|
||||
args = (template_attribute, )
|
||||
regex = (
|
||||
"Cannot set multiple instances of the Cryptographic Algorithm "
|
||||
"attribute."
|
||||
)
|
||||
self.assertRaisesRegexp(
|
||||
exceptions.IndexOutOfBounds,
|
||||
regex,
|
||||
e._process_template_attribute,
|
||||
*args
|
||||
)
|
||||
|
||||
def test_set_attributes_on_managed_object(self):
|
||||
"""
|
||||
Test that multiple attributes can be set on a given managed object.
|
||||
"""
|
||||
e = engine.KmipEngine()
|
||||
e._data_store = self.engine
|
||||
e._data_store_session_factory = self.session_factory
|
||||
e._data_session = e._data_store_session_factory()
|
||||
e._logger = mock.MagicMock()
|
||||
|
||||
managed_object = pie_objects.SecretData(
|
||||
b'',
|
||||
enums.SecretDataType.PASSWORD
|
||||
)
|
||||
managed_object.names = []
|
||||
attribute_factory = factory.AttributeFactory()
|
||||
|
||||
name = attribute_factory.create_attribute(
|
||||
enums.AttributeType.NAME,
|
||||
attributes.Name.create(
|
||||
'Test Secret Data',
|
||||
enums.NameType.UNINTERPRETED_TEXT_STRING
|
||||
)
|
||||
)
|
||||
mask = attribute_factory.create_attribute(
|
||||
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
|
||||
[
|
||||
enums.CryptographicUsageMask.ENCRYPT,
|
||||
enums.CryptographicUsageMask.DECRYPT
|
||||
]
|
||||
)
|
||||
template_attribute = objects.TemplateAttribute(
|
||||
attributes=[name, mask]
|
||||
)
|
||||
object_attributes = e._process_template_attribute(template_attribute)
|
||||
|
||||
self.assertEqual([], managed_object.names)
|
||||
self.assertEqual([], managed_object.cryptographic_usage_masks)
|
||||
|
||||
e._set_attributes_on_managed_object(
|
||||
managed_object,
|
||||
object_attributes
|
||||
)
|
||||
|
||||
self.assertEqual(['Test Secret Data'], managed_object.names)
|
||||
self.assertEqual(
|
||||
[
|
||||
enums.CryptographicUsageMask.ENCRYPT,
|
||||
enums.CryptographicUsageMask.DECRYPT
|
||||
],
|
||||
managed_object.cryptographic_usage_masks
|
||||
)
|
||||
|
||||
def test_set_attributes_on_managed_object_attribute_mismatch(self):
|
||||
"""
|
||||
Test that an InvalidField error is generated when attempting to set
|
||||
an attribute that is not applicable for a given managed object.
|
||||
"""
|
||||
e = engine.KmipEngine()
|
||||
e._data_store = self.engine
|
||||
e._data_store_session_factory = self.session_factory
|
||||
e._data_session = e._data_store_session_factory()
|
||||
e._logger = mock.MagicMock()
|
||||
|
||||
managed_object = pie_objects.OpaqueObject(
|
||||
b'',
|
||||
enums.OpaqueDataType.NONE
|
||||
)
|
||||
attribute_factory = factory.AttributeFactory()
|
||||
|
||||
mask = attribute_factory.create_attribute(
|
||||
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
|
||||
[
|
||||
enums.CryptographicUsageMask.ENCRYPT,
|
||||
enums.CryptographicUsageMask.DECRYPT
|
||||
]
|
||||
)
|
||||
template_attribute = objects.TemplateAttribute(attributes=[mask])
|
||||
object_attributes = e._process_template_attribute(template_attribute)
|
||||
|
||||
args = (managed_object, object_attributes)
|
||||
regex = (
|
||||
"Cannot set Cryptographic Usage Mask attribute on OpaqueData "
|
||||
"object."
|
||||
)
|
||||
self.assertRaisesRegexp(
|
||||
exceptions.InvalidField,
|
||||
regex,
|
||||
e._set_attributes_on_managed_object,
|
||||
*args
|
||||
)
|
||||
|
||||
def test_set_attribute_on_managed_object(self):
|
||||
"""
|
||||
Test that various attributes can be set correctly on a given
|
||||
managed object.
|
||||
"""
|
||||
e = engine.KmipEngine()
|
||||
e._data_store = self.engine
|
||||
e._data_store_session_factory = self.session_factory
|
||||
e._data_session = e._data_store_session_factory()
|
||||
e._logger = mock.MagicMock()
|
||||
|
||||
attribute_factory = factory.AttributeFactory()
|
||||
|
||||
name = attribute_factory.create_attribute(
|
||||
enums.AttributeType.NAME,
|
||||
attributes.Name.create(
|
||||
'Test Symmetric Key',
|
||||
enums.NameType.UNINTERPRETED_TEXT_STRING
|
||||
)
|
||||
)
|
||||
algorithm = attribute_factory.create_attribute(
|
||||
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
|
||||
enums.CryptographicAlgorithm.AES
|
||||
)
|
||||
length = attribute_factory.create_attribute(
|
||||
enums.AttributeType.CRYPTOGRAPHIC_LENGTH,
|
||||
0
|
||||
)
|
||||
mask = attribute_factory.create_attribute(
|
||||
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
|
||||
[
|
||||
enums.CryptographicUsageMask.ENCRYPT,
|
||||
enums.CryptographicUsageMask.DECRYPT
|
||||
]
|
||||
)
|
||||
managed_object = pie_objects.SymmetricKey(
|
||||
enums.CryptographicAlgorithm.AES,
|
||||
0,
|
||||
b''
|
||||
)
|
||||
managed_object.names = []
|
||||
|
||||
self.assertEqual([], managed_object.names)
|
||||
self.assertEqual(
|
||||
enums.CryptographicAlgorithm.AES,
|
||||
managed_object.cryptographic_algorithm
|
||||
)
|
||||
self.assertEqual(0, managed_object.cryptographic_length)
|
||||
self.assertEqual([], managed_object.cryptographic_usage_masks)
|
||||
|
||||
e._set_attribute_on_managed_object(
|
||||
managed_object,
|
||||
('Name', [name.attribute_value])
|
||||
)
|
||||
|
||||
self.assertEqual(['Test Symmetric Key'], managed_object.names)
|
||||
|
||||
e._set_attribute_on_managed_object(
|
||||
managed_object,
|
||||
('Cryptographic Algorithm', algorithm.attribute_value)
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
enums.CryptographicAlgorithm.AES,
|
||||
managed_object.cryptographic_algorithm
|
||||
)
|
||||
|
||||
e._set_attribute_on_managed_object(
|
||||
managed_object,
|
||||
('Cryptographic Length', length.attribute_value)
|
||||
)
|
||||
|
||||
self.assertEqual(0, managed_object.cryptographic_length)
|
||||
|
||||
e._set_attribute_on_managed_object(
|
||||
managed_object,
|
||||
('Cryptographic Usage Mask', mask.attribute_value)
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
[
|
||||
enums.CryptographicUsageMask.ENCRYPT,
|
||||
enums.CryptographicUsageMask.DECRYPT
|
||||
],
|
||||
managed_object.cryptographic_usage_masks
|
||||
)
|
||||
|
||||
def test_set_attribute_on_managed_object_unsupported_features(self):
|
||||
"""
|
||||
Test that the right errors are generated when unsupported features
|
||||
are referenced while setting managed object attributes.
|
||||
"""
|
||||
e = engine.KmipEngine()
|
||||
e._data_store = self.engine
|
||||
e._data_store_session_factory = self.session_factory
|
||||
e._data_session = e._data_store_session_factory()
|
||||
e._logger = mock.MagicMock()
|
||||
|
||||
attribute_factory = factory.AttributeFactory()
|
||||
|
||||
managed_object = pie_objects.SymmetricKey(
|
||||
enums.CryptographicAlgorithm.AES,
|
||||
8,
|
||||
b'\x00'
|
||||
)
|
||||
|
||||
# Test that multiple duplicate names cannot be set on an object.
|
||||
name_a = attribute_factory.create_attribute(
|
||||
enums.AttributeType.NAME,
|
||||
attributes.Name.create(
|
||||
'Test Symmetric Key',
|
||||
enums.NameType.UNINTERPRETED_TEXT_STRING
|
||||
)
|
||||
)
|
||||
name_b = attribute_factory.create_attribute(
|
||||
enums.AttributeType.NAME,
|
||||
attributes.Name.create(
|
||||
'Test Symmetric Key',
|
||||
enums.NameType.UNINTERPRETED_TEXT_STRING
|
||||
)
|
||||
)
|
||||
|
||||
args = (
|
||||
managed_object,
|
||||
('Name', [name_a.attribute_value, name_b.attribute_value])
|
||||
)
|
||||
regex = "Cannot set duplicate name values."
|
||||
self.assertRaisesRegexp(
|
||||
exceptions.InvalidField,
|
||||
regex,
|
||||
e._set_attribute_on_managed_object,
|
||||
*args
|
||||
)
|
||||
|
||||
# Test that a multivalued, unsupported attribute cannot be set on an
|
||||
# object.
|
||||
name_a = attribute_factory.create_attribute(
|
||||
enums.AttributeType.NAME,
|
||||
attributes.Name.create(
|
||||
'Test Symmetric Key',
|
||||
enums.NameType.UNINTERPRETED_TEXT_STRING
|
||||
)
|
||||
)
|
||||
name_b = attribute_factory.create_attribute(
|
||||
enums.AttributeType.NAME,
|
||||
attributes.Name.create(
|
||||
'Test Symmetric Key',
|
||||
enums.NameType.UNINTERPRETED_TEXT_STRING
|
||||
)
|
||||
)
|
||||
|
||||
args = (
|
||||
managed_object,
|
||||
('Digest', [name_a.attribute_value, name_b.attribute_value])
|
||||
)
|
||||
regex = "The Digest attribute is unsupported."
|
||||
self.assertRaisesRegexp(
|
||||
exceptions.InvalidField,
|
||||
regex,
|
||||
e._set_attribute_on_managed_object,
|
||||
*args
|
||||
)
|
||||
|
||||
# Test that a set attribute cannot be overwritten.
|
||||
length = attribute_factory.create_attribute(
|
||||
enums.AttributeType.CRYPTOGRAPHIC_LENGTH,
|
||||
128
|
||||
)
|
||||
|
||||
args = (
|
||||
managed_object,
|
||||
('Cryptographic Length', length.attribute_value)
|
||||
)
|
||||
regex = "Cannot overwrite the Cryptographic Length attribute."
|
||||
self.assertRaisesRegexp(
|
||||
exceptions.InvalidField,
|
||||
regex,
|
||||
e._set_attribute_on_managed_object,
|
||||
*args
|
||||
)
|
||||
|
||||
# Test that an unsupported attribute cannot be set.
|
||||
object_group = attribute_factory.create_attribute(
|
||||
enums.AttributeType.OBJECT_GROUP,
|
||||
'Test Group'
|
||||
)
|
||||
|
||||
args = (
|
||||
managed_object,
|
||||
('Object Group', object_group.attribute_value)
|
||||
)
|
||||
regex = "The Object Group attribute is unsupported."
|
||||
self.assertRaisesRegexp(
|
||||
exceptions.InvalidField,
|
||||
regex,
|
||||
e._set_attribute_on_managed_object,
|
||||
*args
|
||||
)
|
||||
|
||||
def test_register(self):
|
||||
"""
|
||||
Test that a Register request can be processed correctly.
|
||||
"""
|
||||
e = engine.KmipEngine()
|
||||
e._data_store = self.engine
|
||||
e._data_store_session_factory = self.session_factory
|
||||
e._data_session = e._data_store_session_factory()
|
||||
e._logger = mock.MagicMock()
|
||||
|
||||
attribute_factory = factory.AttributeFactory()
|
||||
|
||||
# Build a SymmetricKey for registration.
|
||||
object_type = attributes.ObjectType(enums.ObjectType.SYMMETRIC_KEY)
|
||||
template_attribute = objects.TemplateAttribute(
|
||||
attributes=[
|
||||
attribute_factory.create_attribute(
|
||||
enums.AttributeType.NAME,
|
||||
attributes.Name.create(
|
||||
'Test Symmetric Key',
|
||||
enums.NameType.UNINTERPRETED_TEXT_STRING
|
||||
)
|
||||
),
|
||||
attribute_factory.create_attribute(
|
||||
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
|
||||
enums.CryptographicAlgorithm.AES
|
||||
),
|
||||
attribute_factory.create_attribute(
|
||||
enums.AttributeType.CRYPTOGRAPHIC_LENGTH,
|
||||
128
|
||||
),
|
||||
attribute_factory.create_attribute(
|
||||
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
|
||||
[
|
||||
enums.CryptographicUsageMask.ENCRYPT,
|
||||
enums.CryptographicUsageMask.DECRYPT
|
||||
]
|
||||
)
|
||||
]
|
||||
)
|
||||
key_bytes = (
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||
)
|
||||
secret = secrets.SymmetricKey(
|
||||
key_block=objects.KeyBlock(
|
||||
key_format_type=misc.KeyFormatType(enums.KeyFormatType.RAW),
|
||||
key_value=objects.KeyValue(
|
||||
key_material=objects.KeyMaterial(key_bytes)
|
||||
),
|
||||
cryptographic_algorithm=attributes.CryptographicAlgorithm(
|
||||
enums.CryptographicAlgorithm.AES
|
||||
),
|
||||
cryptographic_length=attributes.CryptographicLength(128)
|
||||
)
|
||||
)
|
||||
|
||||
payload = register.RegisterRequestPayload(
|
||||
object_type=object_type,
|
||||
template_attribute=template_attribute,
|
||||
secret=secret
|
||||
)
|
||||
|
||||
response_payload = e._process_register(payload)
|
||||
e._data_session.commit()
|
||||
e._data_session = e._data_store_session_factory()
|
||||
|
||||
e._logger.info.assert_called_once_with(
|
||||
"Processing operation: Register"
|
||||
)
|
||||
|
||||
uid = response_payload.unique_identifier.value
|
||||
self.assertEqual('1', uid)
|
||||
|
||||
# Retrieve the stored object and verify all attributes were set
|
||||
# appropriately.
|
||||
symmetric_key = e._data_session.query(
|
||||
pie_objects.SymmetricKey
|
||||
).filter(
|
||||
pie_objects.ManagedObject.unique_identifier == uid
|
||||
).one()
|
||||
self.assertEqual(
|
||||
enums.KeyFormatType.RAW,
|
||||
symmetric_key.key_format_type
|
||||
)
|
||||
self.assertEqual(1, len(symmetric_key.names))
|
||||
self.assertIn('Test Symmetric Key', symmetric_key.names)
|
||||
self.assertEqual(key_bytes, symmetric_key.value)
|
||||
self.assertEqual(
|
||||
enums.CryptographicAlgorithm.AES,
|
||||
symmetric_key.cryptographic_algorithm
|
||||
)
|
||||
self.assertEqual(128, symmetric_key.cryptographic_length)
|
||||
self.assertEqual(2, len(symmetric_key.cryptographic_usage_masks))
|
||||
self.assertIn(
|
||||
enums.CryptographicUsageMask.ENCRYPT,
|
||||
symmetric_key.cryptographic_usage_masks
|
||||
)
|
||||
self.assertIn(
|
||||
enums.CryptographicUsageMask.DECRYPT,
|
||||
symmetric_key.cryptographic_usage_masks
|
||||
)
|
||||
|
||||
self.assertEqual(uid, e._id_placeholder)
|
||||
|
||||
def test_register_unsupported_object_type(self):
|
||||
"""
|
||||
Test that an InvalidField error is generated when attempting to
|
||||
register an unsupported object type.
|
||||
"""
|
||||
e = engine.KmipEngine()
|
||||
e._data_store = self.engine
|
||||
e._data_store_session_factory = self.session_factory
|
||||
e._data_session = e._data_store_session_factory()
|
||||
e._logger = mock.MagicMock()
|
||||
|
||||
object_type = attributes.ObjectType(enums.ObjectType.SPLIT_KEY)
|
||||
payload = register.RegisterRequestPayload(object_type=object_type)
|
||||
|
||||
args = (payload, )
|
||||
regex = "The SplitKey object type is not supported."
|
||||
self.assertRaisesRegexp(
|
||||
exceptions.InvalidField,
|
||||
regex,
|
||||
e._process_register,
|
||||
*args
|
||||
)
|
||||
|
||||
def test_request_omitting_secret(self):
|
||||
"""
|
||||
Test that an InvalidField error is generate when trying to register
|
||||
a secret in absentia.
|
||||
"""
|
||||
e = engine.KmipEngine()
|
||||
e._data_store = self.engine
|
||||
e._data_store_session_factory = self.session_factory
|
||||
e._data_session = e._data_store_session_factory()
|
||||
e._logger = mock.MagicMock()
|
||||
|
||||
object_type = attributes.ObjectType(enums.ObjectType.SYMMETRIC_KEY)
|
||||
payload = register.RegisterRequestPayload(object_type=object_type)
|
||||
|
||||
args = (payload, )
|
||||
regex = "Cannot register a secret in absentia."
|
||||
self.assertRaisesRegexp(
|
||||
exceptions.InvalidField,
|
||||
regex,
|
||||
e._process_register,
|
||||
*args
|
||||
)
|
||||
|
||||
def test_get(self):
|
||||
"""
|
||||
Test that a Get request can be processed correctly.
|
||||
|
@ -1278,19 +1912,23 @@ class TestKmipEngine(testtools.TestCase):
|
|||
e._logger.info.assert_called_once_with("Processing operation: Query")
|
||||
self.assertIsInstance(result, query.QueryResponsePayload)
|
||||
self.assertIsNotNone(result.operations)
|
||||
self.assertEqual(3, len(result.operations))
|
||||
self.assertEqual(4, len(result.operations))
|
||||
self.assertEqual(
|
||||
enums.Operation.GET,
|
||||
enums.Operation.REGISTER,
|
||||
result.operations[0].value
|
||||
)
|
||||
self.assertEqual(
|
||||
enums.Operation.DESTROY,
|
||||
enums.Operation.GET,
|
||||
result.operations[1].value
|
||||
)
|
||||
self.assertEqual(
|
||||
enums.Operation.QUERY,
|
||||
enums.Operation.DESTROY,
|
||||
result.operations[2].value
|
||||
)
|
||||
self.assertEqual(
|
||||
enums.Operation.QUERY,
|
||||
result.operations[3].value
|
||||
)
|
||||
self.assertEqual(list(), result.object_types)
|
||||
self.assertIsNotNone(result.vendor_identification)
|
||||
self.assertEqual(
|
||||
|
@ -1309,7 +1947,7 @@ class TestKmipEngine(testtools.TestCase):
|
|||
|
||||
e._logger.info.assert_called_once_with("Processing operation: Query")
|
||||
self.assertIsNotNone(result.operations)
|
||||
self.assertEqual(4, len(result.operations))
|
||||
self.assertEqual(5, len(result.operations))
|
||||
self.assertEqual(
|
||||
enums.Operation.DISCOVER_VERSIONS,
|
||||
result.operations[-1].value
|
||||
|
@ -1380,3 +2018,141 @@ class TestKmipEngine(testtools.TestCase):
|
|||
"Processing operation: DiscoverVersions"
|
||||
)
|
||||
self.assertEqual([], result.protocol_versions)
|
||||
|
||||
def test_register_get_destroy(self):
|
||||
"""
|
||||
Test that a managed object can be registered, retrieved, and destroyed
|
||||
without error.
|
||||
"""
|
||||
e = engine.KmipEngine()
|
||||
e._data_store = self.engine
|
||||
e._data_store_session_factory = self.session_factory
|
||||
e._data_session = e._data_store_session_factory()
|
||||
e._logger = mock.MagicMock()
|
||||
|
||||
attribute_factory = factory.AttributeFactory()
|
||||
|
||||
# Build a SymmetricKey for registration.
|
||||
object_type = attributes.ObjectType(enums.ObjectType.SYMMETRIC_KEY)
|
||||
template_attribute = objects.TemplateAttribute(
|
||||
attributes=[
|
||||
attribute_factory.create_attribute(
|
||||
enums.AttributeType.NAME,
|
||||
attributes.Name.create(
|
||||
'Test Symmetric Key',
|
||||
enums.NameType.UNINTERPRETED_TEXT_STRING
|
||||
)
|
||||
),
|
||||
attribute_factory.create_attribute(
|
||||
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
|
||||
enums.CryptographicAlgorithm.AES
|
||||
),
|
||||
attribute_factory.create_attribute(
|
||||
enums.AttributeType.CRYPTOGRAPHIC_LENGTH,
|
||||
128
|
||||
),
|
||||
attribute_factory.create_attribute(
|
||||
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
|
||||
[
|
||||
enums.CryptographicUsageMask.ENCRYPT,
|
||||
enums.CryptographicUsageMask.DECRYPT
|
||||
]
|
||||
)
|
||||
]
|
||||
)
|
||||
key_bytes = (
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||
)
|
||||
secret = secrets.SymmetricKey(
|
||||
key_block=objects.KeyBlock(
|
||||
key_format_type=misc.KeyFormatType(enums.KeyFormatType.RAW),
|
||||
key_value=objects.KeyValue(
|
||||
key_material=objects.KeyMaterial(key_bytes)
|
||||
),
|
||||
cryptographic_algorithm=attributes.CryptographicAlgorithm(
|
||||
enums.CryptographicAlgorithm.AES
|
||||
),
|
||||
cryptographic_length=attributes.CryptographicLength(128)
|
||||
)
|
||||
)
|
||||
|
||||
# Register the symmetric key with the corresponding attributes
|
||||
payload = register.RegisterRequestPayload(
|
||||
object_type=object_type,
|
||||
template_attribute=template_attribute,
|
||||
secret=secret
|
||||
)
|
||||
|
||||
response_payload = e._process_register(payload)
|
||||
e._data_session.commit()
|
||||
e._data_session = e._data_store_session_factory()
|
||||
|
||||
e._logger.info.assert_called_once_with(
|
||||
"Processing operation: Register"
|
||||
)
|
||||
|
||||
uid = response_payload.unique_identifier.value
|
||||
self.assertEqual('1', uid)
|
||||
|
||||
e._logger.reset_mock()
|
||||
|
||||
# Retrieve the registered key using Get and verify all fields set
|
||||
payload = get.GetRequestPayload(
|
||||
unique_identifier=attributes.UniqueIdentifier(uid)
|
||||
)
|
||||
|
||||
response_payload = e._process_get(payload)
|
||||
e._data_session.commit()
|
||||
e._data_session = e._data_store_session_factory()
|
||||
|
||||
e._logger.info.assert_called_once_with(
|
||||
"Processing operation: Get"
|
||||
)
|
||||
self.assertEqual(
|
||||
enums.ObjectType.SYMMETRIC_KEY,
|
||||
response_payload.object_type.value
|
||||
)
|
||||
self.assertEqual(str(uid), response_payload.unique_identifier.value)
|
||||
self.assertIsInstance(response_payload.secret, secrets.SymmetricKey)
|
||||
self.assertEqual(
|
||||
key_bytes,
|
||||
response_payload.secret.key_block.key_value.key_material.value
|
||||
)
|
||||
self.assertEqual(
|
||||
enums.KeyFormatType.RAW,
|
||||
response_payload.secret.key_block.key_format_type.value
|
||||
)
|
||||
self.assertEqual(
|
||||
enums.CryptographicAlgorithm.AES,
|
||||
response_payload.secret.key_block.cryptographic_algorithm.value
|
||||
)
|
||||
self.assertEqual(
|
||||
128,
|
||||
response_payload.secret.key_block.cryptographic_length.value
|
||||
)
|
||||
|
||||
e._logger.reset_mock()
|
||||
|
||||
# Destroy the symmetric key and verify it cannot be accessed again
|
||||
payload = destroy.DestroyRequestPayload(
|
||||
unique_identifier=attributes.UniqueIdentifier(uid)
|
||||
)
|
||||
|
||||
response_payload = e._process_destroy(payload)
|
||||
e._data_session.commit()
|
||||
e._data_session = e._data_store_session_factory()
|
||||
|
||||
e._logger.info.assert_called_once_with(
|
||||
"Processing operation: Destroy"
|
||||
)
|
||||
self.assertEqual(str(uid), response_payload.unique_identifier.value)
|
||||
self.assertRaises(
|
||||
exc.NoResultFound,
|
||||
e._data_session.query(pie_objects.OpaqueObject).filter(
|
||||
pie_objects.ManagedObject.unique_identifier == uid
|
||||
).one
|
||||
)
|
||||
|
||||
e._data_session.commit()
|
||||
e._data_store_session_factory()
|
||||
|
|
Loading…
Reference in New Issue