diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index 097300d..edd3cc1 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -49,6 +49,7 @@ from kmip.core.messages.payloads import get_attribute_list from kmip.core.messages.payloads import query from kmip.core.messages.payloads import register from kmip.core.messages.payloads import mac +from kmip.core.messages.payloads import locate from kmip.core import misc @@ -931,6 +932,28 @@ class KmipEngine(object): return managed_object + def _list_objects_with_access_controls( + self, + operation + ): + managed_objects = None + managed_objects_allowed = list() + + managed_objects = self._data_session.query(objects.ManagedObject).all() + + for managed_object in managed_objects: + is_allowed = self._is_allowed_by_operation_policy( + managed_object.operation_policy_name, + self._client_identity, + managed_object._owner, + managed_object.object_type, + operation + ) + if is_allowed is True: + managed_objects_allowed.append(managed_object) + + return managed_objects_allowed + def _process_operation(self, operation, payload): if operation == enums.Operation.CREATE: return self._process_create(payload) @@ -938,6 +961,8 @@ class KmipEngine(object): return self._process_create_key_pair(payload) elif operation == enums.Operation.REGISTER: return self._process_register(payload) + elif operation == enums.Operation.LOCATE: + return self._process_locate(payload) elif operation == enums.Operation.GET: return self._process_get(payload) elif operation == enums.Operation.GET_ATTRIBUTES: @@ -1299,6 +1324,27 @@ class KmipEngine(object): return response_payload + @_kmip_version_supported('1.0') + def _process_locate(self, payload): + # TODO Currently the Locate operation will just return all the + # existing managed objects with the restriction of operation + # policy only. Need to implement other filtering logics in the + # SPEC (e.g., attribute, storage status mask and etc..) + self._logger.info("Processing operation: Locate") + + managed_objects = self._list_objects_with_access_controls( + enums.Operation.LOCATE) + + unique_identifiers = [attributes.UniqueIdentifier( + str(managed_object.unique_identifier)) + for managed_object in managed_objects] + + response_payload = locate.LocateResponsePayload( + unique_identifiers=unique_identifiers + ) + + return response_payload + @_kmip_version_supported('1.0') def _process_get(self, payload): self._logger.info("Processing operation: Get") diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index 791fb45..3125122 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -49,6 +49,7 @@ from kmip.core.messages.payloads import get_attributes from kmip.core.messages.payloads import query from kmip.core.messages.payloads import register from kmip.core.messages.payloads import mac +from kmip.core.messages.payloads import locate from kmip.pie import objects as pie_objects from kmip.pie import sqltypes @@ -3461,6 +3462,91 @@ class TestKmipEngine(testtools.TestCase): *args ) + def test_locate(self): + """ + Test that a Locate request can be processed correctly. + """ + # TODO Need add more extensive tests after locate operaton is + # fully supported + e = engine.KmipEngine() + e._data_store = self.engine + e._data_store_session_factory = self.session_factory + e._data_session = e._data_store_session_factory() + e._logger = mock.MagicMock() + + obj_a = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE) + obj_b = pie_objects.OpaqueObject(b'', enums.OpaqueDataType.NONE) + + # locate should return nothing at beginning + payload = locate.LocateRequestPayload() + response_payload = e._process_locate(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call( + "Processing operation: Locate" + ) + self.assertEqual( + len(response_payload.unique_identifiers), + 0 + ) + + # Add the first obj and test the locate + e._data_session.add(obj_a) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + id_a = str(obj_a.unique_identifier) + + payload = locate.LocateRequestPayload() + e._logger.reset_mock() + response_payload = e._process_locate(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call( + "Processing operation: Locate" + ) + + self.assertEqual( + len(response_payload.unique_identifiers), + 1 + ) + self.assertEqual( + id_a, + response_payload.unique_identifiers[0].value + ) + + # Add the second obj and test the locate + e._data_session.add(obj_b) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + id_b = str(obj_b.unique_identifier) + + payload = locate.LocateRequestPayload() + e._logger.reset_mock() + response_payload = e._process_locate(payload) + e._data_session.commit() + e._data_session = e._data_store_session_factory() + + e._logger.info.assert_any_call( + "Processing operation: Locate" + ) + + self.assertEqual( + len(response_payload.unique_identifiers), + 2 + ) + self.assertIn( + id_a, + [uid.value for uid in response_payload.unique_identifiers] + ) + self.assertIn( + id_b, + [uid.value for uid in response_payload.unique_identifiers] + ) + def test_get(self): """ Test that a Get request can be processed correctly.