From fc0d95ebb90bf7632a2c1968723b1e0ddd5c54e3 Mon Sep 17 00:00:00 2001 From: Peter Hamilton Date: Tue, 6 Dec 2016 10:45:50 -0500 Subject: [PATCH] Adding GetAttributes support to the PyKMIP clients This change adds support for the GetAttributes operation to the PyKMIP clients. Demo scripts showing how to use the new operation will be included in a future update. Client test cases have been added and updated to reflect the new operation. --- kmip/pie/client.py | 47 +++++++ kmip/services/kmip_client.py | 64 +++++++++ kmip/services/results.py | 19 +++ kmip/tests/unit/pie/test_client.py | 136 +++++++++++++++++++ kmip/tests/unit/services/test_kmip_client.py | 52 +++++++ 5 files changed, 318 insertions(+) diff --git a/kmip/pie/client.py b/kmip/pie/client.py index c628103..fb67ef4 100644 --- a/kmip/pie/client.py +++ b/kmip/pie/client.py @@ -360,6 +360,53 @@ class ProxyKmipClient(api.KmipClient): message = result.result_message.value raise exceptions.KmipOperationFailure(status, reason, message) + def get_attributes(self, uid=None, attribute_names=None): + """ + Get the attributes associated with a managed object. + + If the uid is not specified, the appliance will use the ID placeholder + by default. + + If the attribute_names list is not specified, the appliance will + return all viable attributes for the managed object. + + Args: + uid (string): The unique ID of the managed object with which the + retrieved attributes should be associated. Optional, defaults + to None. + attribute_names (list): A list of string attribute names + indicating which attributes should be retrieved. Optional, + defaults to None. + """ + # Check input + if uid is not None: + if not isinstance(uid, six.string_types): + raise TypeError("uid must be a string") + if attribute_names is not None: + if not isinstance(attribute_names, list): + raise TypeError("attribute_names must be a list of strings") + else: + for attribute_name in attribute_names: + if not isinstance(attribute_name, six.string_types): + raise TypeError( + "attribute_names must be a list of strings" + ) + + # Verify that operations can be given at this time + if not self._is_open: + raise exceptions.ClientConnectionNotOpen() + + # Get the list of attributes for a managed object + result = self.proxy.get_attributes(uid, attribute_names) + + status = result.result_status.value + if status == enums.ResultStatus.SUCCESS: + return result.uuid, result.attributes + else: + reason = result.result_reason.value + message = result.result_message.value + raise exceptions.KmipOperationFailure(status, reason, message) + def get_attribute_list(self, uid=None): """ Get the names of the attributes associated with a managed object. diff --git a/kmip/services/kmip_client.py b/kmip/services/kmip_client.py index 115bc22..d078bfa 100644 --- a/kmip/services/kmip_client.py +++ b/kmip/services/kmip_client.py @@ -19,6 +19,7 @@ from kmip.services.results import CreateKeyPairResult from kmip.services.results import DestroyResult from kmip.services.results import DiscoverVersionsResult from kmip.services.results import GetResult +from kmip.services.results import GetAttributesResult from kmip.services.results import GetAttributeListResult from kmip.services.results import LocateResult from kmip.services.results import OperationResult @@ -52,6 +53,7 @@ from kmip.core.messages.payloads import create_key_pair from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import discover_versions from kmip.core.messages.payloads import get +from kmip.core.messages.payloads import get_attributes from kmip.core.messages.payloads import get_attribute_list from kmip.core.messages.payloads import locate from kmip.core.messages.payloads import query @@ -302,6 +304,32 @@ class KMIPProxy(KMIP): key_wrapping_specification=key_wrapping_specification, credential=credential) + def get_attributes(self, uuid=None, attribute_names=None): + """ + Send a GetAttributes request to the server. + + Args: + uuid (string): The ID of the managed object with which the + retrieved attributes should be associated. Optional, defaults + to None. + attribute_names (list): A list of AttributeName values indicating + what object attributes the client wants from the server. + Optional, defaults to None. + + Returns: + result (GetAttributesResult): A structure containing the results + of the operation. + """ + batch_item = self._build_get_attributes_batch_item( + uuid, + attribute_names + ) + + request = self._build_request_message(None, [batch_item]) + response = self._send_and_receive_message(request) + results = self._process_batch_items(response) + return results[0] + def get_attribute_list(self, uid=None): """ Send a GetAttributeList request to the server. @@ -475,6 +503,22 @@ class KMIPProxy(KMIP): operation=operation, request_payload=payload) return batch_item + def _build_get_attributes_batch_item( + self, + uuid=None, + attribute_names=None + ): + operation = Operation(OperationEnum.GET_ATTRIBUTES) + payload = get_attributes.GetAttributesRequestPayload( + uuid, + attribute_names + ) + batch_item = messages.RequestBatchItem( + operation=operation, + request_payload=payload + ) + return batch_item + def _build_get_attribute_list_batch_item(self, uid=None): operation = Operation(OperationEnum.GET_ATTRIBUTE_LIST) payload = get_attribute_list.GetAttributeListRequestPayload(uid) @@ -508,6 +552,8 @@ class KMIPProxy(KMIP): return self._process_response_error elif operation == OperationEnum.CREATE_KEY_PAIR: return self._process_create_key_pair_batch_item + elif operation == OperationEnum.GET_ATTRIBUTES: + return self._process_get_attributes_batch_item elif operation == OperationEnum.GET_ATTRIBUTE_LIST: return self._process_get_attribute_list_batch_item elif operation == OperationEnum.REKEY_KEY_PAIR: @@ -520,6 +566,24 @@ class KMIPProxy(KMIP): raise ValueError("no processor for operation: {0}".format( operation)) + def _process_get_attributes_batch_item(self, batch_item): + payload = batch_item.response_payload + + uuid = None + attributes = None + + if payload: + uuid = payload.unique_identifier + attributes = payload.attributes + + return GetAttributesResult( + batch_item.result_status, + batch_item.result_reason, + batch_item.result_message, + uuid, + attributes + ) + def _process_get_attribute_list_batch_item(self, batch_item): payload = batch_item.response_payload diff --git a/kmip/services/results.py b/kmip/services/results.py index 89bff41..73d34e2 100644 --- a/kmip/services/results.py +++ b/kmip/services/results.py @@ -158,6 +158,25 @@ class GetResult(OperationResult): self.secret = None +class GetAttributesResult(OperationResult): + + def __init__( + self, + result_status, + result_reason=None, + result_message=None, + uuid=None, + attributes=None + ): + super(GetAttributesResult, self).__init__( + result_status, + result_reason, + result_message + ) + self.uuid = uuid + self.attributes = attributes + + class GetAttributeListResult(OperationResult): def __init__( diff --git a/kmip/tests/unit/pie/test_client.py b/kmip/tests/unit/pie/test_client.py index 5d59c49..a8fba97 100644 --- a/kmip/tests/unit/pie/test_client.py +++ b/kmip/tests/unit/pie/test_client.py @@ -665,6 +665,142 @@ class TestProxyKmipClient(testtools.TestCase): self.assertRaisesRegexp( KmipOperationFailure, error_msg, client.get, *args) + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_get_attributes(self): + """ + Test that a secret's attributes can be retrieved with proper input. + """ + result = results.GetAttributesResult( + contents.ResultStatus(enums.ResultStatus.SUCCESS), + uuid='aaaaaaaa-1111-2222-3333-ffffffffffff', + attributes=[ + obj.Attribute( + attribute_name=obj.Attribute.AttributeName('Name'), + attribute_index=obj.Attribute.AttributeIndex(0), + attribute_value=attr.Name( + name_value=attr.Name.NameValue('Test Name'), + name_type=attr.Name.NameType( + enums.NameType.UNINTERPRETED_TEXT_STRING + ) + ) + ), + obj.Attribute( + attribute_name=obj.Attribute.AttributeName('Object Type'), + attribute_value=attr.ObjectType( + enums.ObjectType.SYMMETRIC_KEY + ) + ) + ] + ) + + with ProxyKmipClient() as client: + client.proxy.get_attributes.return_value = result + + result = client.get_attributes( + 'aaaaaaaa-1111-2222-3333-ffffffffffff', + ['Name', 'Object Type'] + ) + client.proxy.get_attributes.assert_called_with( + 'aaaaaaaa-1111-2222-3333-ffffffffffff', + ['Name', 'Object Type'] + ) + self.assertIsInstance(result[0], six.string_types) + self.assertIsInstance(result[1], list) + for r in result[1]: + self.assertIsInstance(r, obj.Attribute) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_get_attributes_on_invalid_uid(self): + """ + Test that a TypeError exception is raised when trying to retrieve a + secret's attributes with an invalid ID. + """ + args = [0] + with ProxyKmipClient() as client: + self.assertRaisesRegexp( + TypeError, + "uid must be a string", + client.get_attributes, + *args + ) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_get_attributes_on_invalid_attribute_names(self): + """ + Test that a TypeError exception is raised when trying to retrieve a + secret's attributes with an invalid attribute name set. + """ + args = [None, 0] + with ProxyKmipClient() as client: + self.assertRaisesRegexp( + TypeError, + "attribute_names must be a list of strings", + client.get_attributes, + *args + ) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_get_attributes_on_invalid_attribute_name(self): + """ + Test that a TypeError exception is raised when trying to retrieve a + secret's attributes with an invalid attribute name. + """ + args = [None, [0]] + with ProxyKmipClient() as client: + self.assertRaisesRegexp( + TypeError, + "attribute_names must be a list of strings", + client.get_attributes, + *args + ) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_get_attributes_on_closed(self): + """ + Test that a ClientConnectionNotOpen exception is raised when trying + to retrieve a secret's attributes on an unopened client connection. + """ + client = ProxyKmipClient() + args = [ + 'aaaaaaaa-1111-2222-3333-ffffffffffff', + ['Name', 'Object Type'] + ] + self.assertRaises( + ClientConnectionNotOpen, + client.get_attributes, + *args + ) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_get_attributes_on_operation_failure(self): + """ + Test that a KmipOperationFailure exception is raised when the + backend fails to retrieve a secret's attributes. + """ + status = enums.ResultStatus.OPERATION_FAILED + reason = enums.ResultReason.GENERAL_FAILURE + message = "Test failure message" + + result = results.OperationResult( + contents.ResultStatus(status), + contents.ResultReason(reason), + contents.ResultMessage(message)) + error_msg = str(KmipOperationFailure(status, reason, message)) + + client = ProxyKmipClient() + client.open() + client.proxy.get_attributes.return_value = result + args = ['id', []] + + self.assertRaisesRegexp( + KmipOperationFailure, error_msg, client.get_attributes, *args) + @mock.patch('kmip.pie.client.KMIPProxy', mock.MagicMock(spec_set=KMIPProxy)) def test_get_attribute_list(self): diff --git a/kmip/tests/unit/services/test_kmip_client.py b/kmip/tests/unit/services/test_kmip_client.py index f4cdecb..1b5e67b 100644 --- a/kmip/tests/unit/services/test_kmip_client.py +++ b/kmip/tests/unit/services/test_kmip_client.py @@ -41,6 +41,7 @@ from kmip.core.messages.payloads.create_key_pair import \ CreateKeyPairRequestPayload, CreateKeyPairResponsePayload from kmip.core.messages.payloads.discover_versions import \ DiscoverVersionsRequestPayload, DiscoverVersionsResponsePayload +from kmip.core.messages.payloads import get_attributes from kmip.core.messages.payloads import get_attribute_list from kmip.core.messages.payloads.query import \ QueryRequestPayload, QueryResponsePayload @@ -60,6 +61,7 @@ from kmip.services.kmip_client import KMIPProxy from kmip.services.results import CreateKeyPairResult from kmip.services.results import DiscoverVersionsResult +from kmip.services.results import GetAttributesResult from kmip.services.results import GetAttributeListResult from kmip.services.results import OperationResult from kmip.services.results import QueryResult @@ -381,6 +383,33 @@ class TestKMIPClient(TestCase): protocol_versions = None self._test_build_discover_versions_batch_item(protocol_versions) + def test_build_get_attributes_batch_item(self): + uuid = '00000000-1111-2222-3333-444444444444' + attribute_names = [ + 'Name', + 'Object Type' + ] + batch_item = self.client._build_get_attributes_batch_item( + uuid, + attribute_names + ) + + self.assertIsInstance(batch_item, RequestBatchItem) + self.assertIsInstance(batch_item.operation, Operation) + self.assertEqual( + OperationEnum.GET_ATTRIBUTES, + batch_item.operation.value + ) + self.assertIsInstance( + batch_item.request_payload, + get_attributes.GetAttributesRequestPayload + ) + self.assertEqual(uuid, batch_item.request_payload.unique_identifier) + self.assertEqual( + attribute_names, + batch_item.request_payload.attribute_names + ) + def test_build_get_attribute_list_batch_item(self): uid = '00000000-1111-2222-3333-444444444444' batch_item = self.client._build_get_attribute_list_batch_item(uid) @@ -464,6 +493,12 @@ class TestKMIPClient(TestCase): self.client._get_batch_item_processor, 0xA5A5A5A5) + expected = self.client._process_get_attributes_batch_item + observed = self.client._get_batch_item_processor( + OperationEnum.GET_ATTRIBUTES + ) + self.assertEqual(expected, observed) + expected = self.client._process_get_attribute_list_batch_item observed = self.client._get_batch_item_processor( OperationEnum.GET_ATTRIBUTE_LIST) @@ -575,6 +610,23 @@ class TestKMIPClient(TestCase): protocol_versions = None self._test_process_discover_versions_batch_item(protocol_versions) + def test_process_get_attributes_batch_item(self): + uuid = '00000000-1111-2222-3333-444444444444' + attributes = [] + payload = get_attributes.GetAttributesResponsePayload( + unique_identifier=uuid, + attributes=attributes + ) + batch_item = ResponseBatchItem( + operation=Operation(OperationEnum.GET_ATTRIBUTES), + response_payload=payload + ) + result = self.client._process_get_attributes_batch_item(batch_item) + + self.assertIsInstance(result, GetAttributesResult) + self.assertEqual(uuid, result.uuid) + self.assertEqual(attributes, result.attributes) + def test_process_get_attribute_list_batch_item(self): uid = '00000000-1111-2222-3333-444444444444' names = ['Cryptographic Algorithm', 'Cryptographic Length']