Update the server Query results to match current features

This change updates the server results returned by the Query
operation to match the current set of supported features. The tests
for Query have been updated to better reflect testing across KMIP
versions.
This commit is contained in:
Peter Hamilton 2017-04-11 16:43:43 -04:00
parent 28e1e809d1
commit 1d2a337ee0
2 changed files with 176 additions and 15 deletions

View File

@ -1552,6 +1552,7 @@ class KmipEngine(object):
contents.Operation(enums.Operation.CREATE),
contents.Operation(enums.Operation.CREATE_KEY_PAIR),
contents.Operation(enums.Operation.REGISTER),
contents.Operation(enums.Operation.LOCATE),
contents.Operation(enums.Operation.GET),
contents.Operation(enums.Operation.GET_ATTRIBUTES),
contents.Operation(enums.Operation.GET_ATTRIBUTE_LIST),
@ -1560,10 +1561,14 @@ class KmipEngine(object):
contents.Operation(enums.Operation.QUERY)
])
if self._protocol_version == contents.ProtocolVersion.create(1, 1):
if self._protocol_version >= contents.ProtocolVersion.create(1, 1):
operations.extend([
contents.Operation(enums.Operation.DISCOVER_VERSIONS)
])
if self._protocol_version >= contents.ProtocolVersion.create(1, 2):
operations.extend([
contents.Operation(enums.Operation.MAC)
])
if enums.QueryFunction.QUERY_OBJECTS in queries:
objects = list()

View File

@ -4468,14 +4468,12 @@ class TestKmipEngine(testtools.TestCase):
*args
)
def test_query(self):
def test_query_1_0(self):
"""
Test that a Query request can be processed correctly, for different
versions of KMIP.
Test that a Query request can be processed correctly, for KMIP 1.0.
"""
e = engine.KmipEngine()
# Test for KMIP 1.0.
e._logger = mock.MagicMock()
e._protocol_version = contents.ProtocolVersion.create(1, 0)
@ -4497,7 +4495,7 @@ class TestKmipEngine(testtools.TestCase):
e._logger.info.assert_called_once_with("Processing operation: Query")
self.assertIsInstance(result, query.QueryResponsePayload)
self.assertIsNotNone(result.operations)
self.assertEqual(9, len(result.operations))
self.assertEqual(10, len(result.operations))
self.assertEqual(
enums.Operation.CREATE,
result.operations[0].value
@ -4511,29 +4509,33 @@ class TestKmipEngine(testtools.TestCase):
result.operations[2].value
)
self.assertEqual(
enums.Operation.GET,
enums.Operation.LOCATE,
result.operations[3].value
)
self.assertEqual(
enums.Operation.GET_ATTRIBUTES,
enums.Operation.GET,
result.operations[4].value
)
self.assertEqual(
enums.Operation.GET_ATTRIBUTE_LIST,
enums.Operation.GET_ATTRIBUTES,
result.operations[5].value
)
self.assertEqual(
enums.Operation.ACTIVATE,
enums.Operation.GET_ATTRIBUTE_LIST,
result.operations[6].value
)
self.assertEqual(
enums.Operation.DESTROY,
enums.Operation.ACTIVATE,
result.operations[7].value
)
self.assertEqual(
enums.Operation.QUERY,
enums.Operation.DESTROY,
result.operations[8].value
)
self.assertEqual(
enums.Operation.QUERY,
result.operations[9].value
)
self.assertEqual(list(), result.object_types)
self.assertIsNotNone(result.vendor_identification)
self.assertEqual(
@ -4544,19 +4546,173 @@ class TestKmipEngine(testtools.TestCase):
self.assertEqual(list(), result.application_namespaces)
self.assertEqual(list(), result.extension_information)
# Test for KMIP 1.1.
def test_query_1_1(self):
"""
Test that a Query request can be processed correctly, for KMIP 1.1.
"""
e = engine.KmipEngine()
e._logger = mock.MagicMock()
e._protocol_version = contents.ProtocolVersion.create(1, 1)
payload = query.QueryRequestPayload([
misc.QueryFunction(enums.QueryFunction.QUERY_OPERATIONS),
misc.QueryFunction(enums.QueryFunction.QUERY_OBJECTS),
misc.QueryFunction(
enums.QueryFunction.QUERY_SERVER_INFORMATION
),
misc.QueryFunction(
enums.QueryFunction.QUERY_APPLICATION_NAMESPACES
),
misc.QueryFunction(enums.QueryFunction.QUERY_EXTENSION_LIST),
misc.QueryFunction(enums.QueryFunction.QUERY_EXTENSION_MAP)
])
result = e._process_query(payload)
e._logger.info.assert_called_once_with("Processing operation: Query")
self.assertIsInstance(result, query.QueryResponsePayload)
self.assertIsNotNone(result.operations)
self.assertEqual(10, len(result.operations))
self.assertEqual(11, len(result.operations))
self.assertEqual(
enums.Operation.CREATE,
result.operations[0].value
)
self.assertEqual(
enums.Operation.CREATE_KEY_PAIR,
result.operations[1].value
)
self.assertEqual(
enums.Operation.REGISTER,
result.operations[2].value
)
self.assertEqual(
enums.Operation.LOCATE,
result.operations[3].value
)
self.assertEqual(
enums.Operation.GET,
result.operations[4].value
)
self.assertEqual(
enums.Operation.GET_ATTRIBUTES,
result.operations[5].value
)
self.assertEqual(
enums.Operation.GET_ATTRIBUTE_LIST,
result.operations[6].value
)
self.assertEqual(
enums.Operation.ACTIVATE,
result.operations[7].value
)
self.assertEqual(
enums.Operation.DESTROY,
result.operations[8].value
)
self.assertEqual(
enums.Operation.QUERY,
result.operations[9].value
)
self.assertEqual(
enums.Operation.DISCOVER_VERSIONS,
result.operations[-1].value
result.operations[10].value
)
self.assertEqual(list(), result.object_types)
self.assertIsNotNone(result.vendor_identification)
self.assertEqual(
"PyKMIP {0} Software Server".format(kmip.__version__),
result.vendor_identification.value
)
self.assertIsNone(result.server_information)
self.assertEqual(list(), result.application_namespaces)
self.assertEqual(list(), result.extension_information)
def test_query_1_2(self):
"""
Test that a Query request can be processed correctly, for KMIP 1.2.
"""
e = engine.KmipEngine()
e._logger = mock.MagicMock()
e._protocol_version = contents.ProtocolVersion.create(1, 2)
payload = query.QueryRequestPayload([
misc.QueryFunction(enums.QueryFunction.QUERY_OPERATIONS),
misc.QueryFunction(enums.QueryFunction.QUERY_OBJECTS),
misc.QueryFunction(
enums.QueryFunction.QUERY_SERVER_INFORMATION
),
misc.QueryFunction(
enums.QueryFunction.QUERY_APPLICATION_NAMESPACES
),
misc.QueryFunction(enums.QueryFunction.QUERY_EXTENSION_LIST),
misc.QueryFunction(enums.QueryFunction.QUERY_EXTENSION_MAP)
])
result = e._process_query(payload)
e._logger.info.assert_called_once_with("Processing operation: Query")
self.assertIsInstance(result, query.QueryResponsePayload)
self.assertIsNotNone(result.operations)
self.assertEqual(12, len(result.operations))
self.assertEqual(
enums.Operation.CREATE,
result.operations[0].value
)
self.assertEqual(
enums.Operation.CREATE_KEY_PAIR,
result.operations[1].value
)
self.assertEqual(
enums.Operation.REGISTER,
result.operations[2].value
)
self.assertEqual(
enums.Operation.LOCATE,
result.operations[3].value
)
self.assertEqual(
enums.Operation.GET,
result.operations[4].value
)
self.assertEqual(
enums.Operation.GET_ATTRIBUTES,
result.operations[5].value
)
self.assertEqual(
enums.Operation.GET_ATTRIBUTE_LIST,
result.operations[6].value
)
self.assertEqual(
enums.Operation.ACTIVATE,
result.operations[7].value
)
self.assertEqual(
enums.Operation.DESTROY,
result.operations[8].value
)
self.assertEqual(
enums.Operation.QUERY,
result.operations[9].value
)
self.assertEqual(
enums.Operation.DISCOVER_VERSIONS,
result.operations[10].value
)
self.assertEqual(
enums.Operation.MAC,
result.operations[11].value
)
self.assertEqual(list(), result.object_types)
self.assertIsNotNone(result.vendor_identification)
self.assertEqual(
"PyKMIP {0} Software Server".format(kmip.__version__),
result.vendor_identification.value
)
self.assertIsNone(result.server_information)
self.assertEqual(list(), result.application_namespaces)
self.assertEqual(list(), result.extension_information)
def test_discover_versions(self):
"""