From 1d2a337ee00b610b8afc26064d3cfcb058211b90 Mon Sep 17 00:00:00 2001 From: Peter Hamilton Date: Tue, 11 Apr 2017 16:43:43 -0400 Subject: [PATCH] Update the server Query results to match current features This change updates the server results returned by the Query operation to match the current set of supported features. The tests for Query have been updated to better reflect testing across KMIP versions. --- kmip/services/server/engine.py | 7 +- .../tests/unit/services/server/test_engine.py | 184 ++++++++++++++++-- 2 files changed, 176 insertions(+), 15 deletions(-) diff --git a/kmip/services/server/engine.py b/kmip/services/server/engine.py index edd3cc1..7a15fec 100644 --- a/kmip/services/server/engine.py +++ b/kmip/services/server/engine.py @@ -1552,6 +1552,7 @@ class KmipEngine(object): contents.Operation(enums.Operation.CREATE), contents.Operation(enums.Operation.CREATE_KEY_PAIR), contents.Operation(enums.Operation.REGISTER), + contents.Operation(enums.Operation.LOCATE), contents.Operation(enums.Operation.GET), contents.Operation(enums.Operation.GET_ATTRIBUTES), contents.Operation(enums.Operation.GET_ATTRIBUTE_LIST), @@ -1560,10 +1561,14 @@ class KmipEngine(object): contents.Operation(enums.Operation.QUERY) ]) - if self._protocol_version == contents.ProtocolVersion.create(1, 1): + if self._protocol_version >= contents.ProtocolVersion.create(1, 1): operations.extend([ contents.Operation(enums.Operation.DISCOVER_VERSIONS) ]) + if self._protocol_version >= contents.ProtocolVersion.create(1, 2): + operations.extend([ + contents.Operation(enums.Operation.MAC) + ]) if enums.QueryFunction.QUERY_OBJECTS in queries: objects = list() diff --git a/kmip/tests/unit/services/server/test_engine.py b/kmip/tests/unit/services/server/test_engine.py index 3125122..031c2d7 100644 --- a/kmip/tests/unit/services/server/test_engine.py +++ b/kmip/tests/unit/services/server/test_engine.py @@ -4468,14 +4468,12 @@ class TestKmipEngine(testtools.TestCase): *args ) - def test_query(self): + def test_query_1_0(self): """ - Test that a Query request can be processed correctly, for different - versions of KMIP. + Test that a Query request can be processed correctly, for KMIP 1.0. """ e = engine.KmipEngine() - # Test for KMIP 1.0. e._logger = mock.MagicMock() e._protocol_version = contents.ProtocolVersion.create(1, 0) @@ -4497,7 +4495,7 @@ class TestKmipEngine(testtools.TestCase): e._logger.info.assert_called_once_with("Processing operation: Query") self.assertIsInstance(result, query.QueryResponsePayload) self.assertIsNotNone(result.operations) - self.assertEqual(9, len(result.operations)) + self.assertEqual(10, len(result.operations)) self.assertEqual( enums.Operation.CREATE, result.operations[0].value @@ -4511,29 +4509,33 @@ class TestKmipEngine(testtools.TestCase): result.operations[2].value ) self.assertEqual( - enums.Operation.GET, + enums.Operation.LOCATE, result.operations[3].value ) self.assertEqual( - enums.Operation.GET_ATTRIBUTES, + enums.Operation.GET, result.operations[4].value ) self.assertEqual( - enums.Operation.GET_ATTRIBUTE_LIST, + enums.Operation.GET_ATTRIBUTES, result.operations[5].value ) self.assertEqual( - enums.Operation.ACTIVATE, + enums.Operation.GET_ATTRIBUTE_LIST, result.operations[6].value ) self.assertEqual( - enums.Operation.DESTROY, + enums.Operation.ACTIVATE, result.operations[7].value ) self.assertEqual( - enums.Operation.QUERY, + enums.Operation.DESTROY, result.operations[8].value ) + self.assertEqual( + enums.Operation.QUERY, + result.operations[9].value + ) self.assertEqual(list(), result.object_types) self.assertIsNotNone(result.vendor_identification) self.assertEqual( @@ -4544,19 +4546,173 @@ class TestKmipEngine(testtools.TestCase): self.assertEqual(list(), result.application_namespaces) self.assertEqual(list(), result.extension_information) - # Test for KMIP 1.1. + def test_query_1_1(self): + """ + Test that a Query request can be processed correctly, for KMIP 1.1. + """ + e = engine.KmipEngine() + e._logger = mock.MagicMock() e._protocol_version = contents.ProtocolVersion.create(1, 1) + payload = query.QueryRequestPayload([ + misc.QueryFunction(enums.QueryFunction.QUERY_OPERATIONS), + misc.QueryFunction(enums.QueryFunction.QUERY_OBJECTS), + misc.QueryFunction( + enums.QueryFunction.QUERY_SERVER_INFORMATION + ), + misc.QueryFunction( + enums.QueryFunction.QUERY_APPLICATION_NAMESPACES + ), + misc.QueryFunction(enums.QueryFunction.QUERY_EXTENSION_LIST), + misc.QueryFunction(enums.QueryFunction.QUERY_EXTENSION_MAP) + ]) + result = e._process_query(payload) e._logger.info.assert_called_once_with("Processing operation: Query") + self.assertIsInstance(result, query.QueryResponsePayload) self.assertIsNotNone(result.operations) - self.assertEqual(10, len(result.operations)) + self.assertEqual(11, len(result.operations)) + self.assertEqual( + enums.Operation.CREATE, + result.operations[0].value + ) + self.assertEqual( + enums.Operation.CREATE_KEY_PAIR, + result.operations[1].value + ) + self.assertEqual( + enums.Operation.REGISTER, + result.operations[2].value + ) + self.assertEqual( + enums.Operation.LOCATE, + result.operations[3].value + ) + self.assertEqual( + enums.Operation.GET, + result.operations[4].value + ) + self.assertEqual( + enums.Operation.GET_ATTRIBUTES, + result.operations[5].value + ) + self.assertEqual( + enums.Operation.GET_ATTRIBUTE_LIST, + result.operations[6].value + ) + self.assertEqual( + enums.Operation.ACTIVATE, + result.operations[7].value + ) + self.assertEqual( + enums.Operation.DESTROY, + result.operations[8].value + ) + self.assertEqual( + enums.Operation.QUERY, + result.operations[9].value + ) self.assertEqual( enums.Operation.DISCOVER_VERSIONS, - result.operations[-1].value + result.operations[10].value ) + self.assertEqual(list(), result.object_types) + self.assertIsNotNone(result.vendor_identification) + self.assertEqual( + "PyKMIP {0} Software Server".format(kmip.__version__), + result.vendor_identification.value + ) + self.assertIsNone(result.server_information) + self.assertEqual(list(), result.application_namespaces) + self.assertEqual(list(), result.extension_information) + + def test_query_1_2(self): + """ + Test that a Query request can be processed correctly, for KMIP 1.2. + """ + e = engine.KmipEngine() + + e._logger = mock.MagicMock() + e._protocol_version = contents.ProtocolVersion.create(1, 2) + + payload = query.QueryRequestPayload([ + misc.QueryFunction(enums.QueryFunction.QUERY_OPERATIONS), + misc.QueryFunction(enums.QueryFunction.QUERY_OBJECTS), + misc.QueryFunction( + enums.QueryFunction.QUERY_SERVER_INFORMATION + ), + misc.QueryFunction( + enums.QueryFunction.QUERY_APPLICATION_NAMESPACES + ), + misc.QueryFunction(enums.QueryFunction.QUERY_EXTENSION_LIST), + misc.QueryFunction(enums.QueryFunction.QUERY_EXTENSION_MAP) + ]) + + result = e._process_query(payload) + + e._logger.info.assert_called_once_with("Processing operation: Query") + self.assertIsInstance(result, query.QueryResponsePayload) + self.assertIsNotNone(result.operations) + self.assertEqual(12, len(result.operations)) + self.assertEqual( + enums.Operation.CREATE, + result.operations[0].value + ) + self.assertEqual( + enums.Operation.CREATE_KEY_PAIR, + result.operations[1].value + ) + self.assertEqual( + enums.Operation.REGISTER, + result.operations[2].value + ) + self.assertEqual( + enums.Operation.LOCATE, + result.operations[3].value + ) + self.assertEqual( + enums.Operation.GET, + result.operations[4].value + ) + self.assertEqual( + enums.Operation.GET_ATTRIBUTES, + result.operations[5].value + ) + self.assertEqual( + enums.Operation.GET_ATTRIBUTE_LIST, + result.operations[6].value + ) + self.assertEqual( + enums.Operation.ACTIVATE, + result.operations[7].value + ) + self.assertEqual( + enums.Operation.DESTROY, + result.operations[8].value + ) + self.assertEqual( + enums.Operation.QUERY, + result.operations[9].value + ) + self.assertEqual( + enums.Operation.DISCOVER_VERSIONS, + result.operations[10].value + ) + self.assertEqual( + enums.Operation.MAC, + result.operations[11].value + ) + self.assertEqual(list(), result.object_types) + self.assertIsNotNone(result.vendor_identification) + self.assertEqual( + "PyKMIP {0} Software Server".format(kmip.__version__), + result.vendor_identification.value + ) + self.assertIsNone(result.server_information) + self.assertEqual(list(), result.application_namespaces) + self.assertEqual(list(), result.extension_information) def test_discover_versions(self): """