Add ApplicationSpecificInformation support to the server

This change adds ApplicationSpecificInformation attribute support
to the server, allowing for the storage and retrieval of the new
attribute in addition to object filtering based on its value. New
unit tests have been added to cover the new changes.
This commit is contained in:
Peter Hamilton 2019-10-03 15:34:38 -04:00 committed by Peter Hamilton
parent 3f84a20a0c
commit 01eb144243
7 changed files with 238 additions and 29 deletions

View File

@ -1248,10 +1248,10 @@ class ApplicationSpecificInformation(primitives.Struct):
def __str__(self):
value = ", ".join(
[
'"application_namespace": {}'.format(
'"application_namespace": "{}"'.format(
self.application_namespace
),
'"application_data": {}'.format(self.application_data)
'"application_data": "{}"'.format(self.application_data)
]
)
return "{" + value + "}"

View File

@ -277,28 +277,13 @@ class AttributeValueFactory(object):
return attributes.ObjectGroup(group)
def _create_application_specific_information(self, info):
if info is None:
return attributes.ApplicationSpecificInformation()
if info:
return attributes.ApplicationSpecificInformation(
application_namespace=info.get("application_namespace"),
application_data=info.get("application_data")
)
else:
application_namespace = info.get('application_namespace')
application_data = info.get('application_data')
if not isinstance(application_namespace, str):
msg = utils.build_er_error(
attributes.ApplicationSpecificInformation,
'constructor argument type',
str, type(application_namespace))
raise TypeError(msg)
if not isinstance(application_data, str):
msg = utils.build_er_error(
attributes.ApplicationSpecificInformation,
'constructor argument type',
str, type(application_data))
raise TypeError(msg)
return attributes.ApplicationSpecificInformation.create(
application_namespace, application_data)
return attributes.ApplicationSpecificInformation()
def _create_contact_information(self, info):
if info is None:

View File

@ -730,8 +730,16 @@ class KmipEngine(object):
return None
elif attr_name == 'Link':
return None
elif attr_name == 'Application Specific Information':
return None
elif attr_name == "Application Specific Information":
values = []
for info in managed_object.app_specific_info:
values.append(
{
"application_namespace": info.application_namespace,
"application_data": info.application_data
}
)
return values
elif attr_name == 'Contact Information':
return None
elif attr_name == 'Last Change Date':
@ -781,6 +789,14 @@ class KmipEngine(object):
raise exceptions.InvalidField(
"Cannot set duplicate name values."
)
elif attribute_name == "Application Specific Information":
for value in attribute_value:
managed_object.app_specific_info.append(
objects.ApplicationSpecificInformation(
application_namespace=value.application_namespace,
application_data=value.application_data
)
)
else:
# TODO (peterhamilton) Remove when all attributes are supported
raise exceptions.InvalidField(
@ -1662,6 +1678,26 @@ class KmipEngine(object):
)
if attribute is None:
continue
elif name == "Application Specific Information":
application_namespace = value.application_namespace
application_data = value.application_data
v = {
"application_namespace": application_namespace,
"application_data": application_data
}
if v not in attribute:
self._logger.debug(
"Failed match: "
"the specified application specific "
"information ('{}', '{}') does not match any "
"of the object's associated application "
"specific information attributes.".format(
v.get("application_namespace"),
v.get("application_data")
)
)
add_object = False
break
elif name == "Name":
if value not in attribute:
self._logger.debug(

View File

@ -278,7 +278,7 @@ class TestApplicationSpecificInformation(testtools.TestCase):
("application_data", "www.example.com")
]
value = "{}".format(
", ".join(['"{}": {}'.format(arg[0], arg[1]) for arg in args])
", ".join(['"{}": "{}"'.format(arg[0], arg[1]) for arg in args])
)
self.assertEqual(
"{" + value + "}",

View File

@ -418,7 +418,55 @@ class TestAttributeValueFactory(testtools.TestCase):
"""
Test that an ApplicationSpecificInformation attribute can be created.
"""
self.skipTest('')
attribute = self.factory.create_attribute_value(
enums.AttributeType.APPLICATION_SPECIFIC_INFORMATION,
{
"application_namespace": "ssl",
"application_data": "www.example.com"
}
)
self.assertIsInstance(
attribute,
attributes.ApplicationSpecificInformation
)
self.assertEqual("ssl", attribute.application_namespace)
self.assertEqual("www.example.com", attribute.application_data)
attribute = self.factory.create_attribute_value(
enums.AttributeType.APPLICATION_SPECIFIC_INFORMATION,
None
)
self.assertIsInstance(
attribute,
attributes.ApplicationSpecificInformation
)
self.assertIsNone(attribute.application_namespace)
self.assertIsNone(attribute.application_data)
attribute = self.factory.create_attribute_value_by_enum(
enums.Tags.APPLICATION_SPECIFIC_INFORMATION,
{
"application_namespace": "ssl",
"application_data": "www.example.com"
}
)
self.assertIsInstance(
attribute,
attributes.ApplicationSpecificInformation
)
self.assertEqual("ssl", attribute.application_namespace)
self.assertEqual("www.example.com", attribute.application_data)
attribute = self.factory.create_attribute_value_by_enum(
enums.Tags.APPLICATION_SPECIFIC_INFORMATION,
None
)
self.assertIsInstance(
attribute,
attributes.ApplicationSpecificInformation
)
self.assertIsNone(attribute.application_namespace)
self.assertIsNone(attribute.application_data)
def test_create_contact_information(self):
"""

View File

@ -247,11 +247,15 @@ class TestApplicationSpecificInformation(testtools.TestCase):
session.add(app_specific_info)
session.commit()
# Grab the ID now before making a new session to avoid a Detached error
# See http://sqlalche.me/e/bhk3 for more info.
app_specific_info_id = app_specific_info.id
session = sqlalchemy.orm.sessionmaker(bind=engine)()
retrieved_info = session.query(
objects.ApplicationSpecificInformation
).filter(
objects.ApplicationSpecificInformation.id == app_specific_info.id
objects.ApplicationSpecificInformation.id == app_specific_info_id
).one()
session.commit()

View File

@ -1602,7 +1602,7 @@ class TestKmipEngine(testtools.TestCase):
symmetric_key,
'Application Specific Information'
)
self.assertEqual(None, result)
self.assertEqual([], result)
result = e._get_attribute_from_managed_object(
symmetric_key,
@ -2423,6 +2423,13 @@ class TestKmipEngine(testtools.TestCase):
attribute_factory.create_attribute(
enums.AttributeType.OPERATION_POLICY_NAME,
'test'
),
attribute_factory.create_attribute(
enums.AttributeType.APPLICATION_SPECIFIC_INFORMATION,
{
"application_namespace": "ssl",
"application_data": "www.example.com"
}
)
]
)
@ -2473,6 +2480,15 @@ class TestKmipEngine(testtools.TestCase):
self.assertEqual('test', symmetric_key.operation_policy_name)
self.assertIsNotNone(symmetric_key.initial_date)
self.assertNotEqual(0, symmetric_key.initial_date)
self.assertEqual(1, len(symmetric_key.app_specific_info))
self.assertEqual(
"ssl",
symmetric_key.app_specific_info[0].application_namespace
)
self.assertEqual(
"www.example.com",
symmetric_key.app_specific_info[0].application_data
)
self.assertEqual(uid, e._id_placeholder)
@ -5599,6 +5615,126 @@ class TestKmipEngine(testtools.TestCase):
)
self.assertEqual(0, len(response_payload.unique_identifiers))
def test_locate_with_application_specific_information(self):
"""
Test the Locate operation when the 'Application Specific Information'
attribute is given.
"""
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._is_allowed_by_operation_policy = mock.Mock(return_value=True)
e._logger = mock.MagicMock()
key = (
b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
)
app_specific_info_a = pie_objects.ApplicationSpecificInformation(
application_namespace="ssl",
application_data="www.example.com"
)
app_specific_info_b = pie_objects.ApplicationSpecificInformation(
application_namespace="ssl",
application_data="www.test.com"
)
obj_a = pie_objects.SymmetricKey(
enums.CryptographicAlgorithm.AES,
128,
key,
name='name1'
)
obj_a.app_specific_info.append(app_specific_info_a)
obj_a.app_specific_info.append(app_specific_info_b)
obj_b = pie_objects.SecretData(
key,
enums.SecretDataType.PASSWORD
)
obj_b.app_specific_info.append(app_specific_info_a)
obj_c = pie_objects.SecretData(
key,
enums.SecretDataType.PASSWORD
)
e._data_session.add(obj_a)
e._data_session.add(obj_b)
e._data_session.add(obj_c)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
id_a = str(obj_a.unique_identifier)
id_b = str(obj_b.unique_identifier)
attribute_factory = factory.AttributeFactory()
# Locate the symmetric key objects based on their shared application
# specific information attribute.
attrs = [
attribute_factory.create_attribute(
enums.AttributeType.APPLICATION_SPECIFIC_INFORMATION,
{
"application_namespace": "ssl",
"application_data": "www.example.com"
}
)
]
payload = payloads.LocateRequestPayload(attributes=attrs)
e._logger.reset_mock()
response_payload = e._process_locate(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_any_call("Processing operation: Locate")
e._logger.debug.assert_any_call(
"Locate filter matched object: {}".format(id_a)
)
e._logger.debug.assert_any_call(
"Locate filter matched object: {}".format(id_b)
)
e._logger.debug.assert_any_call(
"Failed match: "
"the specified application specific "
"information ('ssl', 'www.example.com') does not match any "
"of the object's associated application "
"specific information attributes."
)
self.assertEqual(2, len(response_payload.unique_identifiers))
self.assertIn(id_a, response_payload.unique_identifiers)
self.assertIn(id_b, response_payload.unique_identifiers)
# Locate a single symmetric key object based on its unique application
# specific information attribute.
attrs = [
attribute_factory.create_attribute(
enums.AttributeType.APPLICATION_SPECIFIC_INFORMATION,
{
"application_namespace": "ssl",
"application_data": "www.test.com"
}
)
]
payload = payloads.LocateRequestPayload(attributes=attrs)
e._logger.reset_mock()
response_payload = e._process_locate(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_any_call("Processing operation: Locate")
e._logger.debug.assert_any_call(
"Locate filter matched object: {}".format(id_a)
)
e._logger.debug.assert_any_call(
"Failed match: "
"the specified application specific "
"information ('ssl', 'www.test.com') does not match any "
"of the object's associated application "
"specific information attributes."
)
self.assertEqual(1, len(response_payload.unique_identifiers))
self.assertIn(id_a, response_payload.unique_identifiers)
def test_get(self):
"""
Test that a Get request can be processed correctly.