Adding GetAttributes support to the PyKMIP clients

This change adds support for the GetAttributes operation to the
PyKMIP clients. Demo scripts showing how to use the new operation
will be included in a future update. Client test cases have been
added and updated to reflect the new operation.
This commit is contained in:
Peter Hamilton 2016-12-06 10:45:50 -05:00
parent ccd66c5062
commit fc0d95ebb9
5 changed files with 318 additions and 0 deletions

View File

@ -360,6 +360,53 @@ class ProxyKmipClient(api.KmipClient):
message = result.result_message.value
raise exceptions.KmipOperationFailure(status, reason, message)
def get_attributes(self, uid=None, attribute_names=None):
"""
Get the attributes associated with a managed object.
If the uid is not specified, the appliance will use the ID placeholder
by default.
If the attribute_names list is not specified, the appliance will
return all viable attributes for the managed object.
Args:
uid (string): The unique ID of the managed object with which the
retrieved attributes should be associated. Optional, defaults
to None.
attribute_names (list): A list of string attribute names
indicating which attributes should be retrieved. Optional,
defaults to None.
"""
# Check input
if uid is not None:
if not isinstance(uid, six.string_types):
raise TypeError("uid must be a string")
if attribute_names is not None:
if not isinstance(attribute_names, list):
raise TypeError("attribute_names must be a list of strings")
else:
for attribute_name in attribute_names:
if not isinstance(attribute_name, six.string_types):
raise TypeError(
"attribute_names must be a list of strings"
)
# Verify that operations can be given at this time
if not self._is_open:
raise exceptions.ClientConnectionNotOpen()
# Get the list of attributes for a managed object
result = self.proxy.get_attributes(uid, attribute_names)
status = result.result_status.value
if status == enums.ResultStatus.SUCCESS:
return result.uuid, result.attributes
else:
reason = result.result_reason.value
message = result.result_message.value
raise exceptions.KmipOperationFailure(status, reason, message)
def get_attribute_list(self, uid=None):
"""
Get the names of the attributes associated with a managed object.

View File

@ -19,6 +19,7 @@ from kmip.services.results import CreateKeyPairResult
from kmip.services.results import DestroyResult
from kmip.services.results import DiscoverVersionsResult
from kmip.services.results import GetResult
from kmip.services.results import GetAttributesResult
from kmip.services.results import GetAttributeListResult
from kmip.services.results import LocateResult
from kmip.services.results import OperationResult
@ -52,6 +53,7 @@ from kmip.core.messages.payloads import create_key_pair
from kmip.core.messages.payloads import destroy
from kmip.core.messages.payloads import discover_versions
from kmip.core.messages.payloads import get
from kmip.core.messages.payloads import get_attributes
from kmip.core.messages.payloads import get_attribute_list
from kmip.core.messages.payloads import locate
from kmip.core.messages.payloads import query
@ -302,6 +304,32 @@ class KMIPProxy(KMIP):
key_wrapping_specification=key_wrapping_specification,
credential=credential)
def get_attributes(self, uuid=None, attribute_names=None):
"""
Send a GetAttributes request to the server.
Args:
uuid (string): The ID of the managed object with which the
retrieved attributes should be associated. Optional, defaults
to None.
attribute_names (list): A list of AttributeName values indicating
what object attributes the client wants from the server.
Optional, defaults to None.
Returns:
result (GetAttributesResult): A structure containing the results
of the operation.
"""
batch_item = self._build_get_attributes_batch_item(
uuid,
attribute_names
)
request = self._build_request_message(None, [batch_item])
response = self._send_and_receive_message(request)
results = self._process_batch_items(response)
return results[0]
def get_attribute_list(self, uid=None):
"""
Send a GetAttributeList request to the server.
@ -475,6 +503,22 @@ class KMIPProxy(KMIP):
operation=operation, request_payload=payload)
return batch_item
def _build_get_attributes_batch_item(
self,
uuid=None,
attribute_names=None
):
operation = Operation(OperationEnum.GET_ATTRIBUTES)
payload = get_attributes.GetAttributesRequestPayload(
uuid,
attribute_names
)
batch_item = messages.RequestBatchItem(
operation=operation,
request_payload=payload
)
return batch_item
def _build_get_attribute_list_batch_item(self, uid=None):
operation = Operation(OperationEnum.GET_ATTRIBUTE_LIST)
payload = get_attribute_list.GetAttributeListRequestPayload(uid)
@ -508,6 +552,8 @@ class KMIPProxy(KMIP):
return self._process_response_error
elif operation == OperationEnum.CREATE_KEY_PAIR:
return self._process_create_key_pair_batch_item
elif operation == OperationEnum.GET_ATTRIBUTES:
return self._process_get_attributes_batch_item
elif operation == OperationEnum.GET_ATTRIBUTE_LIST:
return self._process_get_attribute_list_batch_item
elif operation == OperationEnum.REKEY_KEY_PAIR:
@ -520,6 +566,24 @@ class KMIPProxy(KMIP):
raise ValueError("no processor for operation: {0}".format(
operation))
def _process_get_attributes_batch_item(self, batch_item):
payload = batch_item.response_payload
uuid = None
attributes = None
if payload:
uuid = payload.unique_identifier
attributes = payload.attributes
return GetAttributesResult(
batch_item.result_status,
batch_item.result_reason,
batch_item.result_message,
uuid,
attributes
)
def _process_get_attribute_list_batch_item(self, batch_item):
payload = batch_item.response_payload

View File

@ -158,6 +158,25 @@ class GetResult(OperationResult):
self.secret = None
class GetAttributesResult(OperationResult):
def __init__(
self,
result_status,
result_reason=None,
result_message=None,
uuid=None,
attributes=None
):
super(GetAttributesResult, self).__init__(
result_status,
result_reason,
result_message
)
self.uuid = uuid
self.attributes = attributes
class GetAttributeListResult(OperationResult):
def __init__(

View File

@ -665,6 +665,142 @@ class TestProxyKmipClient(testtools.TestCase):
self.assertRaisesRegexp(
KmipOperationFailure, error_msg, client.get, *args)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_get_attributes(self):
"""
Test that a secret's attributes can be retrieved with proper input.
"""
result = results.GetAttributesResult(
contents.ResultStatus(enums.ResultStatus.SUCCESS),
uuid='aaaaaaaa-1111-2222-3333-ffffffffffff',
attributes=[
obj.Attribute(
attribute_name=obj.Attribute.AttributeName('Name'),
attribute_index=obj.Attribute.AttributeIndex(0),
attribute_value=attr.Name(
name_value=attr.Name.NameValue('Test Name'),
name_type=attr.Name.NameType(
enums.NameType.UNINTERPRETED_TEXT_STRING
)
)
),
obj.Attribute(
attribute_name=obj.Attribute.AttributeName('Object Type'),
attribute_value=attr.ObjectType(
enums.ObjectType.SYMMETRIC_KEY
)
)
]
)
with ProxyKmipClient() as client:
client.proxy.get_attributes.return_value = result
result = client.get_attributes(
'aaaaaaaa-1111-2222-3333-ffffffffffff',
['Name', 'Object Type']
)
client.proxy.get_attributes.assert_called_with(
'aaaaaaaa-1111-2222-3333-ffffffffffff',
['Name', 'Object Type']
)
self.assertIsInstance(result[0], six.string_types)
self.assertIsInstance(result[1], list)
for r in result[1]:
self.assertIsInstance(r, obj.Attribute)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_get_attributes_on_invalid_uid(self):
"""
Test that a TypeError exception is raised when trying to retrieve a
secret's attributes with an invalid ID.
"""
args = [0]
with ProxyKmipClient() as client:
self.assertRaisesRegexp(
TypeError,
"uid must be a string",
client.get_attributes,
*args
)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_get_attributes_on_invalid_attribute_names(self):
"""
Test that a TypeError exception is raised when trying to retrieve a
secret's attributes with an invalid attribute name set.
"""
args = [None, 0]
with ProxyKmipClient() as client:
self.assertRaisesRegexp(
TypeError,
"attribute_names must be a list of strings",
client.get_attributes,
*args
)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_get_attributes_on_invalid_attribute_name(self):
"""
Test that a TypeError exception is raised when trying to retrieve a
secret's attributes with an invalid attribute name.
"""
args = [None, [0]]
with ProxyKmipClient() as client:
self.assertRaisesRegexp(
TypeError,
"attribute_names must be a list of strings",
client.get_attributes,
*args
)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_get_attributes_on_closed(self):
"""
Test that a ClientConnectionNotOpen exception is raised when trying
to retrieve a secret's attributes on an unopened client connection.
"""
client = ProxyKmipClient()
args = [
'aaaaaaaa-1111-2222-3333-ffffffffffff',
['Name', 'Object Type']
]
self.assertRaises(
ClientConnectionNotOpen,
client.get_attributes,
*args
)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_get_attributes_on_operation_failure(self):
"""
Test that a KmipOperationFailure exception is raised when the
backend fails to retrieve a secret's attributes.
"""
status = enums.ResultStatus.OPERATION_FAILED
reason = enums.ResultReason.GENERAL_FAILURE
message = "Test failure message"
result = results.OperationResult(
contents.ResultStatus(status),
contents.ResultReason(reason),
contents.ResultMessage(message))
error_msg = str(KmipOperationFailure(status, reason, message))
client = ProxyKmipClient()
client.open()
client.proxy.get_attributes.return_value = result
args = ['id', []]
self.assertRaisesRegexp(
KmipOperationFailure, error_msg, client.get_attributes, *args)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_get_attribute_list(self):

View File

@ -41,6 +41,7 @@ from kmip.core.messages.payloads.create_key_pair import \
CreateKeyPairRequestPayload, CreateKeyPairResponsePayload
from kmip.core.messages.payloads.discover_versions import \
DiscoverVersionsRequestPayload, DiscoverVersionsResponsePayload
from kmip.core.messages.payloads import get_attributes
from kmip.core.messages.payloads import get_attribute_list
from kmip.core.messages.payloads.query import \
QueryRequestPayload, QueryResponsePayload
@ -60,6 +61,7 @@ from kmip.services.kmip_client import KMIPProxy
from kmip.services.results import CreateKeyPairResult
from kmip.services.results import DiscoverVersionsResult
from kmip.services.results import GetAttributesResult
from kmip.services.results import GetAttributeListResult
from kmip.services.results import OperationResult
from kmip.services.results import QueryResult
@ -381,6 +383,33 @@ class TestKMIPClient(TestCase):
protocol_versions = None
self._test_build_discover_versions_batch_item(protocol_versions)
def test_build_get_attributes_batch_item(self):
uuid = '00000000-1111-2222-3333-444444444444'
attribute_names = [
'Name',
'Object Type'
]
batch_item = self.client._build_get_attributes_batch_item(
uuid,
attribute_names
)
self.assertIsInstance(batch_item, RequestBatchItem)
self.assertIsInstance(batch_item.operation, Operation)
self.assertEqual(
OperationEnum.GET_ATTRIBUTES,
batch_item.operation.value
)
self.assertIsInstance(
batch_item.request_payload,
get_attributes.GetAttributesRequestPayload
)
self.assertEqual(uuid, batch_item.request_payload.unique_identifier)
self.assertEqual(
attribute_names,
batch_item.request_payload.attribute_names
)
def test_build_get_attribute_list_batch_item(self):
uid = '00000000-1111-2222-3333-444444444444'
batch_item = self.client._build_get_attribute_list_batch_item(uid)
@ -464,6 +493,12 @@ class TestKMIPClient(TestCase):
self.client._get_batch_item_processor,
0xA5A5A5A5)
expected = self.client._process_get_attributes_batch_item
observed = self.client._get_batch_item_processor(
OperationEnum.GET_ATTRIBUTES
)
self.assertEqual(expected, observed)
expected = self.client._process_get_attribute_list_batch_item
observed = self.client._get_batch_item_processor(
OperationEnum.GET_ATTRIBUTE_LIST)
@ -575,6 +610,23 @@ class TestKMIPClient(TestCase):
protocol_versions = None
self._test_process_discover_versions_batch_item(protocol_versions)
def test_process_get_attributes_batch_item(self):
uuid = '00000000-1111-2222-3333-444444444444'
attributes = []
payload = get_attributes.GetAttributesResponsePayload(
unique_identifier=uuid,
attributes=attributes
)
batch_item = ResponseBatchItem(
operation=Operation(OperationEnum.GET_ATTRIBUTES),
response_payload=payload
)
result = self.client._process_get_attributes_batch_item(batch_item)
self.assertIsInstance(result, GetAttributesResult)
self.assertEqual(uuid, result.uuid)
self.assertEqual(attributes, result.attributes)
def test_process_get_attribute_list_batch_item(self):
uid = '00000000-1111-2222-3333-444444444444'
names = ['Cryptographic Algorithm', 'Cryptographic Length']