From c763b69af15914be600f139dc2c8678cdbd54a60 Mon Sep 17 00:00:00 2001 From: Peter Hamilton Date: Wed, 5 Aug 2015 07:59:05 -0400 Subject: [PATCH] Adding KMIPProxy support for the GetAttributeList operation This change adds support for the GetAttributeList operation to the KMIPProxy client. It adds a new result object for the operation along with an integration test demonstrating how the operation can be used. Client unit test cases are also included. --- kmip/services/kmip_client.py | 47 +++++++++++++++++++ kmip/services/results.py | 15 ++++++ .../integration/services/test_integration.py | 40 ++++++++++++++++ kmip/tests/unit/services/test_kmip_client.py | 34 ++++++++++++++ 4 files changed, 136 insertions(+) diff --git a/kmip/services/kmip_client.py b/kmip/services/kmip_client.py index eacabe3..73634ba 100644 --- a/kmip/services/kmip_client.py +++ b/kmip/services/kmip_client.py @@ -19,6 +19,7 @@ from kmip.services.results import CreateKeyPairResult from kmip.services.results import DestroyResult from kmip.services.results import DiscoverVersionsResult from kmip.services.results import GetResult +from kmip.services.results import GetAttributeListResult from kmip.services.results import LocateResult from kmip.services.results import QueryResult from kmip.services.results import RegisterResult @@ -50,6 +51,7 @@ from kmip.core.messages.payloads import create_key_pair from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import discover_versions from kmip.core.messages.payloads import get +from kmip.core.messages.payloads import get_attribute_list from kmip.core.messages.payloads import locate from kmip.core.messages.payloads import query from kmip.core.messages.payloads import rekey_key_pair @@ -280,6 +282,25 @@ class KMIPProxy(KMIP): key_wrapping_specification=key_wrapping_specification, credential=credential) + def get_attribute_list(self, uid=None): + """ + Send a GetAttributeList request to the server. + + Args: + uid (string): The ID of the managed object with which the retrieved + attribute names should be associated. + + Returns: + result (GetAttributeListResult): A structure containing the results + of the operation. + """ + batch_item = self._build_get_attribute_list_batch_item(uid) + + request = self._build_request_message(None, [batch_item]) + response = self._send_and_receive_message(request) + results = self._process_batch_items(response) + return results[0] + def revoke(self, uuid, reason, message=None, credential=None): return self._revoke(unique_identifier=uuid, revocation_code=reason, @@ -434,6 +455,13 @@ class KMIPProxy(KMIP): operation=operation, request_payload=payload) return batch_item + def _build_get_attribute_list_batch_item(self, uid=None): + operation = Operation(OperationEnum.GET_ATTRIBUTE_LIST) + payload = get_attribute_list.GetAttributeListRequestPayload(uid) + batch_item = messages.RequestBatchItem( + operation=operation, request_payload=payload) + return batch_item + def _build_discover_versions_batch_item(self, protocol_versions=None): operation = Operation(OperationEnum.DISCOVER_VERSIONS) @@ -456,6 +484,8 @@ class KMIPProxy(KMIP): def _get_batch_item_processor(self, operation): if operation == OperationEnum.CREATE_KEY_PAIR: return self._process_create_key_pair_batch_item + elif operation == OperationEnum.GET_ATTRIBUTE_LIST: + return self._process_get_attribute_list_batch_item elif operation == OperationEnum.REKEY_KEY_PAIR: return self._process_rekey_key_pair_batch_item elif operation == OperationEnum.QUERY: @@ -466,6 +496,23 @@ class KMIPProxy(KMIP): raise ValueError("no processor for operation: {0}".format( operation)) + def _process_get_attribute_list_batch_item(self, batch_item): + payload = batch_item.response_payload + + uid = None + names = None + + if payload: + uid = payload.uid + names = payload.attribute_names + + return GetAttributeListResult( + batch_item.result_status, + batch_item.result_reason, + batch_item.result_message, + uid, + names) + def _process_key_pair_batch_item(self, batch_item, result): payload = batch_item.response_payload diff --git a/kmip/services/results.py b/kmip/services/results.py index 477aae7..89bff41 100644 --- a/kmip/services/results.py +++ b/kmip/services/results.py @@ -158,6 +158,21 @@ class GetResult(OperationResult): self.secret = None +class GetAttributeListResult(OperationResult): + + def __init__( + self, + result_status, + result_reason=None, + result_message=None, + uid=None, + names=None): + super(GetAttributeListResult, self).__init__( + result_status, result_reason, result_message) + self.uid = uid + self.names = names + + class DestroyResult(OperationResult): def __init__(self, diff --git a/kmip/tests/integration/services/test_integration.py b/kmip/tests/integration/services/test_integration.py index 4b94fde..9a15a3a 100644 --- a/kmip/tests/integration/services/test_integration.py +++ b/kmip/tests/integration/services/test_integration.py @@ -1152,3 +1152,43 @@ class TestIntegration(TestCase): type(opaque_obj_result_destroyed_result.result_reason.enum) self.assertEqual(expected, opaque_obj_observed) + + def test_symmetric_key_create_getattributelist_destroy(self): + """ + Test that the GetAttributeList operation works for a newly created key. + """ + key_name = 'Integration Test - Create-GetAttributeList-Destroy Key' + result = self._create_symmetric_key(key_name=key_name) + uid = result.uuid.value + + self.assertEqual(ResultStatus.SUCCESS, result.result_status.enum) + self.assertEqual(ObjectType.SYMMETRIC_KEY, result.object_type.enum) + self.assertIsInstance(uid, str) + + try: + result = self.client.get_attribute_list(uid) + + self.assertEqual(ResultStatus.SUCCESS, result.result_status.enum) + self.assertIsInstance(result.uid, str) + self.assertIsInstance(result.names, list) + + for name in result.names: + self.assertIsInstance(name, str) + + expected = [ + 'Cryptographic Algorithm', + 'Cryptographic Length', + 'Cryptographic Usage Mask', + 'Unique Identifier', + 'Object Type'] + for name in expected: + self.assertIn(name, result.names) + + finally: + result = self.client.destroy(uid) + self.assertEqual(ResultStatus.SUCCESS, result.result_status.enum) + + result = self.client.get(uuid=result.uuid.value, credential=None) + + self.assertEqual( + ResultStatus.OPERATION_FAILED, result.result_status.enum) diff --git a/kmip/tests/unit/services/test_kmip_client.py b/kmip/tests/unit/services/test_kmip_client.py index fceafa7..4016f93 100644 --- a/kmip/tests/unit/services/test_kmip_client.py +++ b/kmip/tests/unit/services/test_kmip_client.py @@ -36,6 +36,7 @@ from kmip.core.messages.payloads.create_key_pair import \ CreateKeyPairRequestPayload, CreateKeyPairResponsePayload from kmip.core.messages.payloads.discover_versions import \ DiscoverVersionsRequestPayload, DiscoverVersionsResponsePayload +from kmip.core.messages.payloads import get_attribute_list from kmip.core.messages.payloads.query import \ QueryRequestPayload, QueryResponsePayload from kmip.core.messages.payloads.rekey_key_pair import \ @@ -54,6 +55,7 @@ from kmip.services.kmip_client import KMIPProxy from kmip.services.results import CreateKeyPairResult from kmip.services.results import DiscoverVersionsResult +from kmip.services.results import GetAttributeListResult from kmip.services.results import QueryResult from kmip.services.results import RekeyKeyPairResult @@ -314,6 +316,19 @@ class TestKMIPClient(TestCase): protocol_versions = None self._test_build_discover_versions_batch_item(protocol_versions) + def test_build_get_attribute_list_batch_item(self): + uid = '00000000-1111-2222-3333-444444444444' + batch_item = self.client._build_get_attribute_list_batch_item(uid) + + self.assertIsInstance(batch_item, RequestBatchItem) + self.assertIsInstance(batch_item.operation, Operation) + self.assertEqual( + OperationEnum.GET_ATTRIBUTE_LIST, batch_item.operation.enum) + self.assertIsInstance( + batch_item.request_payload, + get_attribute_list.GetAttributeListRequestPayload) + self.assertEqual(uid, batch_item.request_payload.uid) + def test_process_batch_items(self): batch_item = ResponseBatchItem( operation=Operation(OperationEnum.CREATE_KEY_PAIR), @@ -361,6 +376,11 @@ class TestKMIPClient(TestCase): self.assertRaisesRegexp(ValueError, "no processor for operation", self.client._get_batch_item_processor, None) + expected = self.client._process_get_attribute_list_batch_item + observed = self.client._get_batch_item_processor( + OperationEnum.GET_ATTRIBUTE_LIST) + self.assertEqual(expected, observed) + def _test_equality(self, expected, observed): msg = "expected {0}, observed {1}".format(expected, observed) self.assertEqual(expected, observed, msg) @@ -467,6 +487,20 @@ class TestKMIPClient(TestCase): protocol_versions = None self._test_process_discover_versions_batch_item(protocol_versions) + def test_process_get_attribute_list_batch_item(self): + uid = '00000000-1111-2222-3333-444444444444' + names = ['Cryptographic Algorithm', 'Cryptographic Length'] + payload = get_attribute_list.GetAttributeListResponsePayload( + uid=uid, attribute_names=names) + batch_item = ResponseBatchItem( + operation=Operation(OperationEnum.GET_ATTRIBUTE_LIST), + response_payload=payload) + result = self.client._process_get_attribute_list_batch_item(batch_item) + + self.assertIsInstance(result, GetAttributeListResult) + self.assertEqual(uid, result.uid) + self.assertEqual(names, result.names) + class TestClientProfileInformation(TestCase): """