Merge pull request #278 from OpenKMIP/feat/sync-server-query

Update the server Query results to match current features
This commit is contained in:
Peter Hamilton 2017-04-12 12:37:24 -04:00 committed by GitHub
commit 0faf1e5f43
2 changed files with 176 additions and 15 deletions

View File

@ -1552,6 +1552,7 @@ class KmipEngine(object):
contents.Operation(enums.Operation.CREATE),
contents.Operation(enums.Operation.CREATE_KEY_PAIR),
contents.Operation(enums.Operation.REGISTER),
contents.Operation(enums.Operation.LOCATE),
contents.Operation(enums.Operation.GET),
contents.Operation(enums.Operation.GET_ATTRIBUTES),
contents.Operation(enums.Operation.GET_ATTRIBUTE_LIST),
@ -1560,10 +1561,14 @@ class KmipEngine(object):
contents.Operation(enums.Operation.QUERY)
])
if self._protocol_version == contents.ProtocolVersion.create(1, 1):
if self._protocol_version >= contents.ProtocolVersion.create(1, 1):
operations.extend([
contents.Operation(enums.Operation.DISCOVER_VERSIONS)
])
if self._protocol_version >= contents.ProtocolVersion.create(1, 2):
operations.extend([
contents.Operation(enums.Operation.MAC)
])
if enums.QueryFunction.QUERY_OBJECTS in queries:
objects = list()

View File

@ -4468,14 +4468,12 @@ class TestKmipEngine(testtools.TestCase):
*args
)
def test_query(self):
def test_query_1_0(self):
"""
Test that a Query request can be processed correctly, for different
versions of KMIP.
Test that a Query request can be processed correctly, for KMIP 1.0.
"""
e = engine.KmipEngine()
# Test for KMIP 1.0.
e._logger = mock.MagicMock()
e._protocol_version = contents.ProtocolVersion.create(1, 0)
@ -4497,7 +4495,7 @@ class TestKmipEngine(testtools.TestCase):
e._logger.info.assert_called_once_with("Processing operation: Query")
self.assertIsInstance(result, query.QueryResponsePayload)
self.assertIsNotNone(result.operations)
self.assertEqual(9, len(result.operations))
self.assertEqual(10, len(result.operations))
self.assertEqual(
enums.Operation.CREATE,
result.operations[0].value
@ -4511,29 +4509,33 @@ class TestKmipEngine(testtools.TestCase):
result.operations[2].value
)
self.assertEqual(
enums.Operation.GET,
enums.Operation.LOCATE,
result.operations[3].value
)
self.assertEqual(
enums.Operation.GET_ATTRIBUTES,
enums.Operation.GET,
result.operations[4].value
)
self.assertEqual(
enums.Operation.GET_ATTRIBUTE_LIST,
enums.Operation.GET_ATTRIBUTES,
result.operations[5].value
)
self.assertEqual(
enums.Operation.ACTIVATE,
enums.Operation.GET_ATTRIBUTE_LIST,
result.operations[6].value
)
self.assertEqual(
enums.Operation.DESTROY,
enums.Operation.ACTIVATE,
result.operations[7].value
)
self.assertEqual(
enums.Operation.QUERY,
enums.Operation.DESTROY,
result.operations[8].value
)
self.assertEqual(
enums.Operation.QUERY,
result.operations[9].value
)
self.assertEqual(list(), result.object_types)
self.assertIsNotNone(result.vendor_identification)
self.assertEqual(
@ -4544,19 +4546,173 @@ class TestKmipEngine(testtools.TestCase):
self.assertEqual(list(), result.application_namespaces)
self.assertEqual(list(), result.extension_information)
# Test for KMIP 1.1.
def test_query_1_1(self):
"""
Test that a Query request can be processed correctly, for KMIP 1.1.
"""
e = engine.KmipEngine()
e._logger = mock.MagicMock()
e._protocol_version = contents.ProtocolVersion.create(1, 1)
payload = query.QueryRequestPayload([
misc.QueryFunction(enums.QueryFunction.QUERY_OPERATIONS),
misc.QueryFunction(enums.QueryFunction.QUERY_OBJECTS),
misc.QueryFunction(
enums.QueryFunction.QUERY_SERVER_INFORMATION
),
misc.QueryFunction(
enums.QueryFunction.QUERY_APPLICATION_NAMESPACES
),
misc.QueryFunction(enums.QueryFunction.QUERY_EXTENSION_LIST),
misc.QueryFunction(enums.QueryFunction.QUERY_EXTENSION_MAP)
])
result = e._process_query(payload)
e._logger.info.assert_called_once_with("Processing operation: Query")
self.assertIsInstance(result, query.QueryResponsePayload)
self.assertIsNotNone(result.operations)
self.assertEqual(10, len(result.operations))
self.assertEqual(11, len(result.operations))
self.assertEqual(
enums.Operation.CREATE,
result.operations[0].value
)
self.assertEqual(
enums.Operation.CREATE_KEY_PAIR,
result.operations[1].value
)
self.assertEqual(
enums.Operation.REGISTER,
result.operations[2].value
)
self.assertEqual(
enums.Operation.LOCATE,
result.operations[3].value
)
self.assertEqual(
enums.Operation.GET,
result.operations[4].value
)
self.assertEqual(
enums.Operation.GET_ATTRIBUTES,
result.operations[5].value
)
self.assertEqual(
enums.Operation.GET_ATTRIBUTE_LIST,
result.operations[6].value
)
self.assertEqual(
enums.Operation.ACTIVATE,
result.operations[7].value
)
self.assertEqual(
enums.Operation.DESTROY,
result.operations[8].value
)
self.assertEqual(
enums.Operation.QUERY,
result.operations[9].value
)
self.assertEqual(
enums.Operation.DISCOVER_VERSIONS,
result.operations[-1].value
result.operations[10].value
)
self.assertEqual(list(), result.object_types)
self.assertIsNotNone(result.vendor_identification)
self.assertEqual(
"PyKMIP {0} Software Server".format(kmip.__version__),
result.vendor_identification.value
)
self.assertIsNone(result.server_information)
self.assertEqual(list(), result.application_namespaces)
self.assertEqual(list(), result.extension_information)
def test_query_1_2(self):
"""
Test that a Query request can be processed correctly, for KMIP 1.2.
"""
e = engine.KmipEngine()
e._logger = mock.MagicMock()
e._protocol_version = contents.ProtocolVersion.create(1, 2)
payload = query.QueryRequestPayload([
misc.QueryFunction(enums.QueryFunction.QUERY_OPERATIONS),
misc.QueryFunction(enums.QueryFunction.QUERY_OBJECTS),
misc.QueryFunction(
enums.QueryFunction.QUERY_SERVER_INFORMATION
),
misc.QueryFunction(
enums.QueryFunction.QUERY_APPLICATION_NAMESPACES
),
misc.QueryFunction(enums.QueryFunction.QUERY_EXTENSION_LIST),
misc.QueryFunction(enums.QueryFunction.QUERY_EXTENSION_MAP)
])
result = e._process_query(payload)
e._logger.info.assert_called_once_with("Processing operation: Query")
self.assertIsInstance(result, query.QueryResponsePayload)
self.assertIsNotNone(result.operations)
self.assertEqual(12, len(result.operations))
self.assertEqual(
enums.Operation.CREATE,
result.operations[0].value
)
self.assertEqual(
enums.Operation.CREATE_KEY_PAIR,
result.operations[1].value
)
self.assertEqual(
enums.Operation.REGISTER,
result.operations[2].value
)
self.assertEqual(
enums.Operation.LOCATE,
result.operations[3].value
)
self.assertEqual(
enums.Operation.GET,
result.operations[4].value
)
self.assertEqual(
enums.Operation.GET_ATTRIBUTES,
result.operations[5].value
)
self.assertEqual(
enums.Operation.GET_ATTRIBUTE_LIST,
result.operations[6].value
)
self.assertEqual(
enums.Operation.ACTIVATE,
result.operations[7].value
)
self.assertEqual(
enums.Operation.DESTROY,
result.operations[8].value
)
self.assertEqual(
enums.Operation.QUERY,
result.operations[9].value
)
self.assertEqual(
enums.Operation.DISCOVER_VERSIONS,
result.operations[10].value
)
self.assertEqual(
enums.Operation.MAC,
result.operations[11].value
)
self.assertEqual(list(), result.object_types)
self.assertIsNotNone(result.vendor_identification)
self.assertEqual(
"PyKMIP {0} Software Server".format(kmip.__version__),
result.vendor_identification.value
)
self.assertIsNone(result.server_information)
self.assertEqual(list(), result.application_namespaces)
self.assertEqual(list(), result.extension_information)
def test_discover_versions(self):
"""