mirror of
https://github.com/OpenKMIP/PyKMIP.git
synced 2025-07-21 21:14:24 +02:00
Merge pull request #147 from OpenKMIP/feat/add-kmip-engine-create
Adding KmipEngine support for Create
This commit is contained in:
commit
23b27590f8
@ -32,6 +32,7 @@ from kmip.core.factories import secrets
|
|||||||
from kmip.core.messages import contents
|
from kmip.core.messages import contents
|
||||||
from kmip.core.messages import messages
|
from kmip.core.messages import messages
|
||||||
|
|
||||||
|
from kmip.core.messages.payloads import create
|
||||||
from kmip.core.messages.payloads import destroy
|
from kmip.core.messages.payloads import destroy
|
||||||
from kmip.core.messages.payloads import discover_versions
|
from kmip.core.messages.payloads import discover_versions
|
||||||
from kmip.core.messages.payloads import get
|
from kmip.core.messages.payloads import get
|
||||||
@ -616,7 +617,9 @@ class KmipEngine(object):
|
|||||||
)
|
)
|
||||||
|
|
||||||
def _process_operation(self, operation, payload):
|
def _process_operation(self, operation, payload):
|
||||||
if operation == enums.Operation.REGISTER:
|
if operation == enums.Operation.CREATE:
|
||||||
|
return self._process_create(payload)
|
||||||
|
elif operation == enums.Operation.REGISTER:
|
||||||
return self._process_register(payload)
|
return self._process_register(payload)
|
||||||
elif operation == enums.Operation.GET:
|
elif operation == enums.Operation.GET:
|
||||||
return self._process_get(payload)
|
return self._process_get(payload)
|
||||||
@ -633,6 +636,92 @@ class KmipEngine(object):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@_kmip_version_supported('1.0')
|
||||||
|
def _process_create(self, payload):
|
||||||
|
self._logger.info("Processing operation: Create")
|
||||||
|
|
||||||
|
object_type = payload.object_type.value
|
||||||
|
template_attribute = payload.template_attribute
|
||||||
|
|
||||||
|
if object_type != enums.ObjectType.SYMMETRIC_KEY:
|
||||||
|
name = object_type.name
|
||||||
|
raise exceptions.InvalidField(
|
||||||
|
"Cannot create a {0} object with the Create operation.".format(
|
||||||
|
''.join([x.capitalize() for x in name.split('_')])
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
object_attributes = {}
|
||||||
|
if template_attribute:
|
||||||
|
object_attributes = self._process_template_attribute(
|
||||||
|
template_attribute
|
||||||
|
)
|
||||||
|
|
||||||
|
algorithm = object_attributes.get('Cryptographic Algorithm')
|
||||||
|
if algorithm:
|
||||||
|
algorithm = algorithm.value
|
||||||
|
else:
|
||||||
|
raise exceptions.InvalidField(
|
||||||
|
"The cryptographic algorithm must be specified as an "
|
||||||
|
"attribute."
|
||||||
|
)
|
||||||
|
|
||||||
|
length = object_attributes.get('Cryptographic Length')
|
||||||
|
if length:
|
||||||
|
length = length.value
|
||||||
|
else:
|
||||||
|
# TODO (peterhamilton) The cryptographic length is technically not
|
||||||
|
# required per the spec. Update the CryptographyEngine to accept a
|
||||||
|
# None length, allowing it to pick the length dynamically. Default
|
||||||
|
# to the strongest key size allowed for the algorithm type.
|
||||||
|
raise exceptions.InvalidField(
|
||||||
|
"The cryptographic length must be specified as an attribute."
|
||||||
|
)
|
||||||
|
|
||||||
|
usage_mask = object_attributes.get('Cryptographic Usage Mask')
|
||||||
|
if usage_mask is None:
|
||||||
|
raise exceptions.InvalidField(
|
||||||
|
"The cryptographic usage mask must be specified as an "
|
||||||
|
"attribute."
|
||||||
|
)
|
||||||
|
|
||||||
|
result = self._cryptography_engine.create_symmetric_key(
|
||||||
|
algorithm,
|
||||||
|
length
|
||||||
|
)
|
||||||
|
|
||||||
|
managed_object = objects.SymmetricKey(
|
||||||
|
algorithm,
|
||||||
|
length,
|
||||||
|
result.get('value')
|
||||||
|
)
|
||||||
|
managed_object.names = []
|
||||||
|
|
||||||
|
self._set_attributes_on_managed_object(
|
||||||
|
managed_object,
|
||||||
|
object_attributes
|
||||||
|
)
|
||||||
|
|
||||||
|
# TODO (peterhamilton) Set additional server-only attributes.
|
||||||
|
|
||||||
|
self._data_session.add(managed_object)
|
||||||
|
|
||||||
|
# NOTE (peterhamilton) SQLAlchemy will *not* assign an ID until
|
||||||
|
# commit is called. This makes future support for UNDO problematic.
|
||||||
|
self._data_session.commit()
|
||||||
|
|
||||||
|
response_payload = create.CreateResponsePayload(
|
||||||
|
object_type=payload.object_type,
|
||||||
|
unique_identifier=attributes.UniqueIdentifier(
|
||||||
|
str(managed_object.unique_identifier)
|
||||||
|
),
|
||||||
|
template_attribute=None
|
||||||
|
)
|
||||||
|
|
||||||
|
self._id_placeholder = str(managed_object.unique_identifier)
|
||||||
|
|
||||||
|
return response_payload
|
||||||
|
|
||||||
@_kmip_version_supported('1.0')
|
@_kmip_version_supported('1.0')
|
||||||
def _process_register(self, payload):
|
def _process_register(self, payload):
|
||||||
self._logger.info("Processing operation: Register")
|
self._logger.info("Processing operation: Register")
|
||||||
@ -789,6 +878,7 @@ class KmipEngine(object):
|
|||||||
|
|
||||||
if enums.QueryFunction.QUERY_OPERATIONS in queries:
|
if enums.QueryFunction.QUERY_OPERATIONS in queries:
|
||||||
operations = list([
|
operations = list([
|
||||||
|
contents.Operation(enums.Operation.CREATE),
|
||||||
contents.Operation(enums.Operation.REGISTER),
|
contents.Operation(enums.Operation.REGISTER),
|
||||||
contents.Operation(enums.Operation.GET),
|
contents.Operation(enums.Operation.GET),
|
||||||
contents.Operation(enums.Operation.DESTROY),
|
contents.Operation(enums.Operation.DESTROY),
|
||||||
|
@ -35,6 +35,7 @@ from kmip.core.factories import attributes as factory
|
|||||||
from kmip.core.messages import contents
|
from kmip.core.messages import contents
|
||||||
from kmip.core.messages import messages
|
from kmip.core.messages import messages
|
||||||
|
|
||||||
|
from kmip.core.messages.payloads import create
|
||||||
from kmip.core.messages.payloads import destroy
|
from kmip.core.messages.payloads import destroy
|
||||||
from kmip.core.messages.payloads import discover_versions
|
from kmip.core.messages.payloads import discover_versions
|
||||||
from kmip.core.messages.payloads import get
|
from kmip.core.messages.payloads import get
|
||||||
@ -643,18 +644,21 @@ class TestKmipEngine(testtools.TestCase):
|
|||||||
e = engine.KmipEngine()
|
e = engine.KmipEngine()
|
||||||
e._logger = mock.MagicMock()
|
e._logger = mock.MagicMock()
|
||||||
|
|
||||||
|
e._process_create = mock.MagicMock()
|
||||||
e._process_register = mock.MagicMock()
|
e._process_register = mock.MagicMock()
|
||||||
e._process_get = mock.MagicMock()
|
e._process_get = mock.MagicMock()
|
||||||
e._process_destroy = mock.MagicMock()
|
e._process_destroy = mock.MagicMock()
|
||||||
e._process_query = mock.MagicMock()
|
e._process_query = mock.MagicMock()
|
||||||
e._process_discover_versions = mock.MagicMock()
|
e._process_discover_versions = mock.MagicMock()
|
||||||
|
|
||||||
|
e._process_operation(enums.Operation.CREATE, None)
|
||||||
e._process_operation(enums.Operation.REGISTER, None)
|
e._process_operation(enums.Operation.REGISTER, None)
|
||||||
e._process_operation(enums.Operation.GET, None)
|
e._process_operation(enums.Operation.GET, None)
|
||||||
e._process_operation(enums.Operation.DESTROY, None)
|
e._process_operation(enums.Operation.DESTROY, None)
|
||||||
e._process_operation(enums.Operation.QUERY, None)
|
e._process_operation(enums.Operation.QUERY, None)
|
||||||
e._process_operation(enums.Operation.DISCOVER_VERSIONS, None)
|
e._process_operation(enums.Operation.DISCOVER_VERSIONS, None)
|
||||||
|
|
||||||
|
e._process_create.assert_called_with(None)
|
||||||
e._process_register.assert_called_with(None)
|
e._process_register.assert_called_with(None)
|
||||||
e._process_get.assert_called_with(None)
|
e._process_get.assert_called_with(None)
|
||||||
e._process_destroy.assert_called_with(None)
|
e._process_destroy.assert_called_with(None)
|
||||||
@ -1425,6 +1429,267 @@ class TestKmipEngine(testtools.TestCase):
|
|||||||
*args
|
*args
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def test_create(self):
|
||||||
|
"""
|
||||||
|
Test that a Create request can be processed correctly.
|
||||||
|
"""
|
||||||
|
e = engine.KmipEngine()
|
||||||
|
e._data_store = self.engine
|
||||||
|
e._data_store_session_factory = self.session_factory
|
||||||
|
e._data_session = e._data_store_session_factory()
|
||||||
|
e._logger = mock.MagicMock()
|
||||||
|
|
||||||
|
attribute_factory = factory.AttributeFactory()
|
||||||
|
|
||||||
|
# Build Create request
|
||||||
|
object_type = attributes.ObjectType(enums.ObjectType.SYMMETRIC_KEY)
|
||||||
|
template_attribute = objects.TemplateAttribute(
|
||||||
|
attributes=[
|
||||||
|
attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.NAME,
|
||||||
|
attributes.Name.create(
|
||||||
|
'Test Symmetric Key',
|
||||||
|
enums.NameType.UNINTERPRETED_TEXT_STRING
|
||||||
|
)
|
||||||
|
),
|
||||||
|
attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
|
||||||
|
enums.CryptographicAlgorithm.AES
|
||||||
|
),
|
||||||
|
attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.CRYPTOGRAPHIC_LENGTH,
|
||||||
|
256
|
||||||
|
),
|
||||||
|
attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
|
||||||
|
[
|
||||||
|
enums.CryptographicUsageMask.ENCRYPT,
|
||||||
|
enums.CryptographicUsageMask.DECRYPT
|
||||||
|
]
|
||||||
|
)
|
||||||
|
]
|
||||||
|
)
|
||||||
|
payload = create.CreateRequestPayload(
|
||||||
|
object_type,
|
||||||
|
template_attribute
|
||||||
|
)
|
||||||
|
|
||||||
|
response_payload = e._process_create(payload)
|
||||||
|
e._data_session.commit()
|
||||||
|
e._data_session = e._data_store_session_factory()
|
||||||
|
|
||||||
|
e._logger.info.assert_called_once_with(
|
||||||
|
"Processing operation: Create"
|
||||||
|
)
|
||||||
|
|
||||||
|
uid = response_payload.unique_identifier.value
|
||||||
|
self.assertEqual('1', uid)
|
||||||
|
|
||||||
|
# Retrieve the stored object and verify all attributes were set
|
||||||
|
# appropriately.
|
||||||
|
symmetric_key = e._data_session.query(
|
||||||
|
pie_objects.SymmetricKey
|
||||||
|
).filter(
|
||||||
|
pie_objects.ManagedObject.unique_identifier == uid
|
||||||
|
).one()
|
||||||
|
self.assertEqual(
|
||||||
|
enums.KeyFormatType.RAW,
|
||||||
|
symmetric_key.key_format_type
|
||||||
|
)
|
||||||
|
self.assertEqual(1, len(symmetric_key.names))
|
||||||
|
self.assertIn('Test Symmetric Key', symmetric_key.names)
|
||||||
|
self.assertEqual(256, len(symmetric_key.value) * 8)
|
||||||
|
self.assertEqual(
|
||||||
|
enums.CryptographicAlgorithm.AES,
|
||||||
|
symmetric_key.cryptographic_algorithm
|
||||||
|
)
|
||||||
|
self.assertEqual(256, symmetric_key.cryptographic_length)
|
||||||
|
self.assertEqual(2, len(symmetric_key.cryptographic_usage_masks))
|
||||||
|
self.assertIn(
|
||||||
|
enums.CryptographicUsageMask.ENCRYPT,
|
||||||
|
symmetric_key.cryptographic_usage_masks
|
||||||
|
)
|
||||||
|
self.assertIn(
|
||||||
|
enums.CryptographicUsageMask.DECRYPT,
|
||||||
|
symmetric_key.cryptographic_usage_masks
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(uid, e._id_placeholder)
|
||||||
|
|
||||||
|
def test_create_unsupported_object_type(self):
|
||||||
|
"""
|
||||||
|
Test that an InvalidField error is generated when attempting to
|
||||||
|
create an unsupported object type.
|
||||||
|
"""
|
||||||
|
e = engine.KmipEngine()
|
||||||
|
e._data_store = self.engine
|
||||||
|
e._data_store_session_factory = self.session_factory
|
||||||
|
e._data_session = e._data_store_session_factory()
|
||||||
|
e._logger = mock.MagicMock()
|
||||||
|
|
||||||
|
object_type = attributes.ObjectType(enums.ObjectType.PUBLIC_KEY)
|
||||||
|
payload = create.CreateRequestPayload(
|
||||||
|
object_type
|
||||||
|
)
|
||||||
|
|
||||||
|
args = (payload, )
|
||||||
|
regex = "Cannot create a PublicKey object with the Create operation."
|
||||||
|
self.assertRaisesRegexp(
|
||||||
|
exceptions.InvalidField,
|
||||||
|
regex,
|
||||||
|
e._process_create,
|
||||||
|
*args
|
||||||
|
)
|
||||||
|
|
||||||
|
e._logger.info.assert_called_once_with(
|
||||||
|
"Processing operation: Create"
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_create_omitting_attributes(self):
|
||||||
|
"""
|
||||||
|
Test that InvalidField errors are generated when trying to create
|
||||||
|
a symmetric key without required attributes.
|
||||||
|
"""
|
||||||
|
e = engine.KmipEngine()
|
||||||
|
e._data_store = self.engine
|
||||||
|
e._data_store_session_factory = self.session_factory
|
||||||
|
e._data_session = e._data_store_session_factory()
|
||||||
|
e._logger = mock.MagicMock()
|
||||||
|
|
||||||
|
attribute_factory = factory.AttributeFactory()
|
||||||
|
|
||||||
|
# Test the error for omitting the Cryptographic Algorithm
|
||||||
|
object_type = attributes.ObjectType(enums.ObjectType.SYMMETRIC_KEY)
|
||||||
|
template_attribute = objects.TemplateAttribute(
|
||||||
|
attributes=[
|
||||||
|
attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.NAME,
|
||||||
|
attributes.Name.create(
|
||||||
|
'Test Symmetric Key',
|
||||||
|
enums.NameType.UNINTERPRETED_TEXT_STRING
|
||||||
|
)
|
||||||
|
),
|
||||||
|
attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.CRYPTOGRAPHIC_LENGTH,
|
||||||
|
256
|
||||||
|
),
|
||||||
|
attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
|
||||||
|
[
|
||||||
|
enums.CryptographicUsageMask.ENCRYPT,
|
||||||
|
enums.CryptographicUsageMask.DECRYPT
|
||||||
|
]
|
||||||
|
)
|
||||||
|
]
|
||||||
|
)
|
||||||
|
payload = create.CreateRequestPayload(
|
||||||
|
object_type,
|
||||||
|
template_attribute
|
||||||
|
)
|
||||||
|
|
||||||
|
args = (payload, )
|
||||||
|
regex = (
|
||||||
|
"The cryptographic algorithm must be specified as an attribute."
|
||||||
|
)
|
||||||
|
self.assertRaisesRegexp(
|
||||||
|
exceptions.InvalidField,
|
||||||
|
regex,
|
||||||
|
e._process_create,
|
||||||
|
*args
|
||||||
|
)
|
||||||
|
|
||||||
|
e._logger.info.assert_called_once_with(
|
||||||
|
"Processing operation: Create"
|
||||||
|
)
|
||||||
|
e._logger.reset_mock()
|
||||||
|
|
||||||
|
# Test the error for omitting the Cryptographic Length
|
||||||
|
object_type = attributes.ObjectType(enums.ObjectType.SYMMETRIC_KEY)
|
||||||
|
template_attribute = objects.TemplateAttribute(
|
||||||
|
attributes=[
|
||||||
|
attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.NAME,
|
||||||
|
attributes.Name.create(
|
||||||
|
'Test Symmetric Key',
|
||||||
|
enums.NameType.UNINTERPRETED_TEXT_STRING
|
||||||
|
)
|
||||||
|
),
|
||||||
|
attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
|
||||||
|
enums.CryptographicAlgorithm.AES
|
||||||
|
),
|
||||||
|
attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
|
||||||
|
[
|
||||||
|
enums.CryptographicUsageMask.ENCRYPT,
|
||||||
|
enums.CryptographicUsageMask.DECRYPT
|
||||||
|
]
|
||||||
|
)
|
||||||
|
]
|
||||||
|
)
|
||||||
|
payload = create.CreateRequestPayload(
|
||||||
|
object_type,
|
||||||
|
template_attribute
|
||||||
|
)
|
||||||
|
|
||||||
|
args = (payload, )
|
||||||
|
regex = (
|
||||||
|
"The cryptographic length must be specified as an attribute."
|
||||||
|
)
|
||||||
|
self.assertRaisesRegexp(
|
||||||
|
exceptions.InvalidField,
|
||||||
|
regex,
|
||||||
|
e._process_create,
|
||||||
|
*args
|
||||||
|
)
|
||||||
|
|
||||||
|
e._logger.info.assert_called_once_with(
|
||||||
|
"Processing operation: Create"
|
||||||
|
)
|
||||||
|
e._logger.reset_mock()
|
||||||
|
|
||||||
|
# Test the error for omitting the Cryptographic Usage Mask
|
||||||
|
object_type = attributes.ObjectType(enums.ObjectType.SYMMETRIC_KEY)
|
||||||
|
template_attribute = objects.TemplateAttribute(
|
||||||
|
attributes=[
|
||||||
|
attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.NAME,
|
||||||
|
attributes.Name.create(
|
||||||
|
'Test Symmetric Key',
|
||||||
|
enums.NameType.UNINTERPRETED_TEXT_STRING
|
||||||
|
)
|
||||||
|
),
|
||||||
|
attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
|
||||||
|
enums.CryptographicAlgorithm.AES
|
||||||
|
),
|
||||||
|
attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.CRYPTOGRAPHIC_LENGTH,
|
||||||
|
256
|
||||||
|
)
|
||||||
|
]
|
||||||
|
)
|
||||||
|
payload = create.CreateRequestPayload(
|
||||||
|
object_type,
|
||||||
|
template_attribute
|
||||||
|
)
|
||||||
|
|
||||||
|
args = (payload, )
|
||||||
|
regex = (
|
||||||
|
"The cryptographic usage mask must be specified as an attribute."
|
||||||
|
)
|
||||||
|
self.assertRaisesRegexp(
|
||||||
|
exceptions.InvalidField,
|
||||||
|
regex,
|
||||||
|
e._process_create,
|
||||||
|
*args
|
||||||
|
)
|
||||||
|
|
||||||
|
e._logger.info.assert_called_once_with(
|
||||||
|
"Processing operation: Create"
|
||||||
|
)
|
||||||
|
e._logger.reset_mock()
|
||||||
|
|
||||||
def test_register(self):
|
def test_register(self):
|
||||||
"""
|
"""
|
||||||
Test that a Register request can be processed correctly.
|
Test that a Register request can be processed correctly.
|
||||||
@ -1912,23 +2177,27 @@ class TestKmipEngine(testtools.TestCase):
|
|||||||
e._logger.info.assert_called_once_with("Processing operation: Query")
|
e._logger.info.assert_called_once_with("Processing operation: Query")
|
||||||
self.assertIsInstance(result, query.QueryResponsePayload)
|
self.assertIsInstance(result, query.QueryResponsePayload)
|
||||||
self.assertIsNotNone(result.operations)
|
self.assertIsNotNone(result.operations)
|
||||||
self.assertEqual(4, len(result.operations))
|
self.assertEqual(5, len(result.operations))
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
enums.Operation.REGISTER,
|
enums.Operation.CREATE,
|
||||||
result.operations[0].value
|
result.operations[0].value
|
||||||
)
|
)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
enums.Operation.GET,
|
enums.Operation.REGISTER,
|
||||||
result.operations[1].value
|
result.operations[1].value
|
||||||
)
|
)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
enums.Operation.DESTROY,
|
enums.Operation.GET,
|
||||||
result.operations[2].value
|
result.operations[2].value
|
||||||
)
|
)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
enums.Operation.QUERY,
|
enums.Operation.DESTROY,
|
||||||
result.operations[3].value
|
result.operations[3].value
|
||||||
)
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
enums.Operation.QUERY,
|
||||||
|
result.operations[4].value
|
||||||
|
)
|
||||||
self.assertEqual(list(), result.object_types)
|
self.assertEqual(list(), result.object_types)
|
||||||
self.assertIsNotNone(result.vendor_identification)
|
self.assertIsNotNone(result.vendor_identification)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
@ -1947,7 +2216,7 @@ class TestKmipEngine(testtools.TestCase):
|
|||||||
|
|
||||||
e._logger.info.assert_called_once_with("Processing operation: Query")
|
e._logger.info.assert_called_once_with("Processing operation: Query")
|
||||||
self.assertIsNotNone(result.operations)
|
self.assertIsNotNone(result.operations)
|
||||||
self.assertEqual(5, len(result.operations))
|
self.assertEqual(6, len(result.operations))
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
enums.Operation.DISCOVER_VERSIONS,
|
enums.Operation.DISCOVER_VERSIONS,
|
||||||
result.operations[-1].value
|
result.operations[-1].value
|
||||||
@ -2019,6 +2288,129 @@ class TestKmipEngine(testtools.TestCase):
|
|||||||
)
|
)
|
||||||
self.assertEqual([], result.protocol_versions)
|
self.assertEqual([], result.protocol_versions)
|
||||||
|
|
||||||
|
def test_create_get_destroy(self):
|
||||||
|
"""
|
||||||
|
Test that a managed object can be created, retrieved, and destroyed
|
||||||
|
without error.
|
||||||
|
"""
|
||||||
|
e = engine.KmipEngine()
|
||||||
|
e._data_store = self.engine
|
||||||
|
e._data_store_session_factory = self.session_factory
|
||||||
|
e._data_session = e._data_store_session_factory()
|
||||||
|
e._logger = mock.MagicMock()
|
||||||
|
|
||||||
|
attribute_factory = factory.AttributeFactory()
|
||||||
|
|
||||||
|
# Build a SymmetricKey for registration.
|
||||||
|
object_type = attributes.ObjectType(enums.ObjectType.SYMMETRIC_KEY)
|
||||||
|
template_attribute = objects.TemplateAttribute(
|
||||||
|
attributes=[
|
||||||
|
attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.NAME,
|
||||||
|
attributes.Name.create(
|
||||||
|
'Test Symmetric Key',
|
||||||
|
enums.NameType.UNINTERPRETED_TEXT_STRING
|
||||||
|
)
|
||||||
|
),
|
||||||
|
attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
|
||||||
|
enums.CryptographicAlgorithm.AES
|
||||||
|
),
|
||||||
|
attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.CRYPTOGRAPHIC_LENGTH,
|
||||||
|
256
|
||||||
|
),
|
||||||
|
attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
|
||||||
|
[
|
||||||
|
enums.CryptographicUsageMask.ENCRYPT,
|
||||||
|
enums.CryptographicUsageMask.DECRYPT
|
||||||
|
]
|
||||||
|
)
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create the symmetric key with the corresponding attributes
|
||||||
|
payload = create.CreateRequestPayload(
|
||||||
|
object_type=object_type,
|
||||||
|
template_attribute=template_attribute
|
||||||
|
)
|
||||||
|
|
||||||
|
response_payload = e._process_create(payload)
|
||||||
|
e._data_session.commit()
|
||||||
|
e._data_session = e._data_store_session_factory()
|
||||||
|
|
||||||
|
e._logger.info.assert_called_once_with(
|
||||||
|
"Processing operation: Create"
|
||||||
|
)
|
||||||
|
|
||||||
|
uid = response_payload.unique_identifier.value
|
||||||
|
self.assertEqual('1', uid)
|
||||||
|
|
||||||
|
e._logger.reset_mock()
|
||||||
|
|
||||||
|
# Retrieve the created key using Get and verify all fields set
|
||||||
|
payload = get.GetRequestPayload(
|
||||||
|
unique_identifier=attributes.UniqueIdentifier(uid)
|
||||||
|
)
|
||||||
|
|
||||||
|
response_payload = e._process_get(payload)
|
||||||
|
e._data_session.commit()
|
||||||
|
e._data_session = e._data_store_session_factory()
|
||||||
|
|
||||||
|
e._logger.info.assert_called_once_with(
|
||||||
|
"Processing operation: Get"
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
enums.ObjectType.SYMMETRIC_KEY,
|
||||||
|
response_payload.object_type.value
|
||||||
|
)
|
||||||
|
self.assertEqual(str(uid), response_payload.unique_identifier.value)
|
||||||
|
self.assertIsInstance(response_payload.secret, secrets.SymmetricKey)
|
||||||
|
|
||||||
|
key_block = response_payload.secret.key_block
|
||||||
|
self.assertEqual(
|
||||||
|
256,
|
||||||
|
len(key_block.key_value.key_material.value) * 8
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
enums.KeyFormatType.RAW,
|
||||||
|
key_block.key_format_type.value
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
enums.CryptographicAlgorithm.AES,
|
||||||
|
key_block.cryptographic_algorithm.value
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
256,
|
||||||
|
key_block.cryptographic_length.value
|
||||||
|
)
|
||||||
|
|
||||||
|
e._logger.reset_mock()
|
||||||
|
|
||||||
|
# Destroy the symmetric key and verify it cannot be accessed again
|
||||||
|
payload = destroy.DestroyRequestPayload(
|
||||||
|
unique_identifier=attributes.UniqueIdentifier(uid)
|
||||||
|
)
|
||||||
|
|
||||||
|
response_payload = e._process_destroy(payload)
|
||||||
|
e._data_session.commit()
|
||||||
|
e._data_session = e._data_store_session_factory()
|
||||||
|
|
||||||
|
e._logger.info.assert_called_once_with(
|
||||||
|
"Processing operation: Destroy"
|
||||||
|
)
|
||||||
|
self.assertEqual(str(uid), response_payload.unique_identifier.value)
|
||||||
|
self.assertRaises(
|
||||||
|
exc.NoResultFound,
|
||||||
|
e._data_session.query(pie_objects.OpaqueObject).filter(
|
||||||
|
pie_objects.ManagedObject.unique_identifier == uid
|
||||||
|
).one
|
||||||
|
)
|
||||||
|
|
||||||
|
e._data_session.commit()
|
||||||
|
e._data_store_session_factory()
|
||||||
|
|
||||||
def test_register_get_destroy(self):
|
def test_register_get_destroy(self):
|
||||||
"""
|
"""
|
||||||
Test that a managed object can be registered, retrieved, and destroyed
|
Test that a managed object can be registered, retrieved, and destroyed
|
||||||
|
Loading…
x
Reference in New Issue
Block a user