Add minimum Locate operation support for server.

Currently it will return all the existing managed objects with the restriction of operation
policy only. No other filterings.

Closes #258
This commit is contained in:
Hao Shen 2017-02-09 09:27:06 -08:00
parent d9cf4c148a
commit b8b2d43347
2 changed files with 132 additions and 0 deletions

View File

@ -49,6 +49,7 @@ from kmip.core.messages.payloads import get_attribute_list
from kmip.core.messages.payloads import query from kmip.core.messages.payloads import query
from kmip.core.messages.payloads import register from kmip.core.messages.payloads import register
from kmip.core.messages.payloads import mac from kmip.core.messages.payloads import mac
from kmip.core.messages.payloads import locate
from kmip.core import misc from kmip.core import misc
@ -931,6 +932,28 @@ class KmipEngine(object):
return managed_object return managed_object
def _list_objects_with_access_controls(
self,
operation
):
managed_objects = None
managed_objects_allowed = list()
managed_objects = self._data_session.query(objects.ManagedObject).all()
for managed_object in managed_objects:
is_allowed = self._is_allowed_by_operation_policy(
managed_object.operation_policy_name,
self._client_identity,
managed_object._owner,
managed_object.object_type,
operation
)
if is_allowed is True:
managed_objects_allowed.append(managed_object)
return managed_objects_allowed
def _process_operation(self, operation, payload): def _process_operation(self, operation, payload):
if operation == enums.Operation.CREATE: if operation == enums.Operation.CREATE:
return self._process_create(payload) return self._process_create(payload)
@ -938,6 +961,8 @@ class KmipEngine(object):
return self._process_create_key_pair(payload) return self._process_create_key_pair(payload)
elif operation == enums.Operation.REGISTER: elif operation == enums.Operation.REGISTER:
return self._process_register(payload) return self._process_register(payload)
elif operation == enums.Operation.LOCATE:
return self._process_locate(payload)
elif operation == enums.Operation.GET: elif operation == enums.Operation.GET:
return self._process_get(payload) return self._process_get(payload)
elif operation == enums.Operation.GET_ATTRIBUTES: elif operation == enums.Operation.GET_ATTRIBUTES:
@ -1299,6 +1324,27 @@ class KmipEngine(object):
return response_payload return response_payload
@_kmip_version_supported('1.0')
def _process_locate(self, payload):
# TODO Currently the Locate operation will just return all the
# existing managed objects with the restriction of operation
# policy only. Need to implement other filtering logics in the
# SPEC (e.g., attribute, storage status mask and etc..)
self._logger.info("Processing operation: Locate")
managed_objects = self._list_objects_with_access_controls(
enums.Operation.LOCATE)
unique_identifiers = [attributes.UniqueIdentifier(
str(managed_object.unique_identifier))
for managed_object in managed_objects]
response_payload = locate.LocateResponsePayload(
unique_identifiers=unique_identifiers
)
return response_payload
@_kmip_version_supported('1.0') @_kmip_version_supported('1.0')
def _process_get(self, payload): def _process_get(self, payload):
self._logger.info("Processing operation: Get") self._logger.info("Processing operation: Get")

View File

@ -49,6 +49,7 @@ from kmip.core.messages.payloads import get_attributes
from kmip.core.messages.payloads import query from kmip.core.messages.payloads import query
from kmip.core.messages.payloads import register from kmip.core.messages.payloads import register
from kmip.core.messages.payloads import mac from kmip.core.messages.payloads import mac
from kmip.core.messages.payloads import locate
from kmip.pie import objects as pie_objects from kmip.pie import objects as pie_objects
from kmip.pie import sqltypes from kmip.pie import sqltypes
@ -3461,6 +3462,91 @@ class TestKmipEngine(testtools.TestCase):
*args *args
) )
def test_locate(self):
"""
Test that a Locate request can be processed correctly.
"""
# TODO Need add more extensive tests after locate operaton is
# fully supported
e = engine.KmipEngine()
e._data_store = self.engine
e._data_store_session_factory = self.session_factory
e._data_session = e._data_store_session_factory()
e._logger = mock.MagicMock()
obj_a = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE)
obj_b = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE)
# locate should return nothing at beginning
payload = locate.LocateRequestPayload()
response_payload = e._process_locate(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_any_call(
"Processing operation: Locate"
)
self.assertEqual(
len(response_payload.unique_identifiers),
0
)
# Add the first obj and test the locate
e._data_session.add(obj_a)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
id_a = str(obj_a.unique_identifier)
payload = locate.LocateRequestPayload()
e._logger.reset_mock()
response_payload = e._process_locate(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_any_call(
"Processing operation: Locate"
)
self.assertEqual(
len(response_payload.unique_identifiers),
1
)
self.assertEqual(
id_a,
response_payload.unique_identifiers[0].value
)
# Add the second obj and test the locate
e._data_session.add(obj_b)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
id_b = str(obj_b.unique_identifier)
payload = locate.LocateRequestPayload()
e._logger.reset_mock()
response_payload = e._process_locate(payload)
e._data_session.commit()
e._data_session = e._data_store_session_factory()
e._logger.info.assert_any_call(
"Processing operation: Locate"
)
self.assertEqual(
len(response_payload.unique_identifiers),
2
)
self.assertIn(
id_a,
[uid.value for uid in response_payload.unique_identifiers]
)
self.assertIn(
id_b,
[uid.value for uid in response_payload.unique_identifiers]
)
def test_get(self): def test_get(self):
""" """
Test that a Get request can be processed correctly. Test that a Get request can be processed correctly.