mirror of
https://github.com/OpenKMIP/PyKMIP.git
synced 2025-07-01 11:14:25 +02:00
Merge pull request #228 from OpenKMIP/feat/add-get-attributes-to-client
Adding GetAttributes support to the PyKMIP clients
This commit is contained in:
commit
bd76dc280b
@ -360,6 +360,53 @@ class ProxyKmipClient(api.KmipClient):
|
|||||||
message = result.result_message.value
|
message = result.result_message.value
|
||||||
raise exceptions.KmipOperationFailure(status, reason, message)
|
raise exceptions.KmipOperationFailure(status, reason, message)
|
||||||
|
|
||||||
|
def get_attributes(self, uid=None, attribute_names=None):
|
||||||
|
"""
|
||||||
|
Get the attributes associated with a managed object.
|
||||||
|
|
||||||
|
If the uid is not specified, the appliance will use the ID placeholder
|
||||||
|
by default.
|
||||||
|
|
||||||
|
If the attribute_names list is not specified, the appliance will
|
||||||
|
return all viable attributes for the managed object.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
uid (string): The unique ID of the managed object with which the
|
||||||
|
retrieved attributes should be associated. Optional, defaults
|
||||||
|
to None.
|
||||||
|
attribute_names (list): A list of string attribute names
|
||||||
|
indicating which attributes should be retrieved. Optional,
|
||||||
|
defaults to None.
|
||||||
|
"""
|
||||||
|
# Check input
|
||||||
|
if uid is not None:
|
||||||
|
if not isinstance(uid, six.string_types):
|
||||||
|
raise TypeError("uid must be a string")
|
||||||
|
if attribute_names is not None:
|
||||||
|
if not isinstance(attribute_names, list):
|
||||||
|
raise TypeError("attribute_names must be a list of strings")
|
||||||
|
else:
|
||||||
|
for attribute_name in attribute_names:
|
||||||
|
if not isinstance(attribute_name, six.string_types):
|
||||||
|
raise TypeError(
|
||||||
|
"attribute_names must be a list of strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Verify that operations can be given at this time
|
||||||
|
if not self._is_open:
|
||||||
|
raise exceptions.ClientConnectionNotOpen()
|
||||||
|
|
||||||
|
# Get the list of attributes for a managed object
|
||||||
|
result = self.proxy.get_attributes(uid, attribute_names)
|
||||||
|
|
||||||
|
status = result.result_status.value
|
||||||
|
if status == enums.ResultStatus.SUCCESS:
|
||||||
|
return result.uuid, result.attributes
|
||||||
|
else:
|
||||||
|
reason = result.result_reason.value
|
||||||
|
message = result.result_message.value
|
||||||
|
raise exceptions.KmipOperationFailure(status, reason, message)
|
||||||
|
|
||||||
def get_attribute_list(self, uid=None):
|
def get_attribute_list(self, uid=None):
|
||||||
"""
|
"""
|
||||||
Get the names of the attributes associated with a managed object.
|
Get the names of the attributes associated with a managed object.
|
||||||
|
@ -19,6 +19,7 @@ from kmip.services.results import CreateKeyPairResult
|
|||||||
from kmip.services.results import DestroyResult
|
from kmip.services.results import DestroyResult
|
||||||
from kmip.services.results import DiscoverVersionsResult
|
from kmip.services.results import DiscoverVersionsResult
|
||||||
from kmip.services.results import GetResult
|
from kmip.services.results import GetResult
|
||||||
|
from kmip.services.results import GetAttributesResult
|
||||||
from kmip.services.results import GetAttributeListResult
|
from kmip.services.results import GetAttributeListResult
|
||||||
from kmip.services.results import LocateResult
|
from kmip.services.results import LocateResult
|
||||||
from kmip.services.results import OperationResult
|
from kmip.services.results import OperationResult
|
||||||
@ -52,6 +53,7 @@ from kmip.core.messages.payloads import create_key_pair
|
|||||||
from kmip.core.messages.payloads import destroy
|
from kmip.core.messages.payloads import destroy
|
||||||
from kmip.core.messages.payloads import discover_versions
|
from kmip.core.messages.payloads import discover_versions
|
||||||
from kmip.core.messages.payloads import get
|
from kmip.core.messages.payloads import get
|
||||||
|
from kmip.core.messages.payloads import get_attributes
|
||||||
from kmip.core.messages.payloads import get_attribute_list
|
from kmip.core.messages.payloads import get_attribute_list
|
||||||
from kmip.core.messages.payloads import locate
|
from kmip.core.messages.payloads import locate
|
||||||
from kmip.core.messages.payloads import query
|
from kmip.core.messages.payloads import query
|
||||||
@ -302,6 +304,32 @@ class KMIPProxy(KMIP):
|
|||||||
key_wrapping_specification=key_wrapping_specification,
|
key_wrapping_specification=key_wrapping_specification,
|
||||||
credential=credential)
|
credential=credential)
|
||||||
|
|
||||||
|
def get_attributes(self, uuid=None, attribute_names=None):
|
||||||
|
"""
|
||||||
|
Send a GetAttributes request to the server.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
uuid (string): The ID of the managed object with which the
|
||||||
|
retrieved attributes should be associated. Optional, defaults
|
||||||
|
to None.
|
||||||
|
attribute_names (list): A list of AttributeName values indicating
|
||||||
|
what object attributes the client wants from the server.
|
||||||
|
Optional, defaults to None.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
result (GetAttributesResult): A structure containing the results
|
||||||
|
of the operation.
|
||||||
|
"""
|
||||||
|
batch_item = self._build_get_attributes_batch_item(
|
||||||
|
uuid,
|
||||||
|
attribute_names
|
||||||
|
)
|
||||||
|
|
||||||
|
request = self._build_request_message(None, [batch_item])
|
||||||
|
response = self._send_and_receive_message(request)
|
||||||
|
results = self._process_batch_items(response)
|
||||||
|
return results[0]
|
||||||
|
|
||||||
def get_attribute_list(self, uid=None):
|
def get_attribute_list(self, uid=None):
|
||||||
"""
|
"""
|
||||||
Send a GetAttributeList request to the server.
|
Send a GetAttributeList request to the server.
|
||||||
@ -475,6 +503,22 @@ class KMIPProxy(KMIP):
|
|||||||
operation=operation, request_payload=payload)
|
operation=operation, request_payload=payload)
|
||||||
return batch_item
|
return batch_item
|
||||||
|
|
||||||
|
def _build_get_attributes_batch_item(
|
||||||
|
self,
|
||||||
|
uuid=None,
|
||||||
|
attribute_names=None
|
||||||
|
):
|
||||||
|
operation = Operation(OperationEnum.GET_ATTRIBUTES)
|
||||||
|
payload = get_attributes.GetAttributesRequestPayload(
|
||||||
|
uuid,
|
||||||
|
attribute_names
|
||||||
|
)
|
||||||
|
batch_item = messages.RequestBatchItem(
|
||||||
|
operation=operation,
|
||||||
|
request_payload=payload
|
||||||
|
)
|
||||||
|
return batch_item
|
||||||
|
|
||||||
def _build_get_attribute_list_batch_item(self, uid=None):
|
def _build_get_attribute_list_batch_item(self, uid=None):
|
||||||
operation = Operation(OperationEnum.GET_ATTRIBUTE_LIST)
|
operation = Operation(OperationEnum.GET_ATTRIBUTE_LIST)
|
||||||
payload = get_attribute_list.GetAttributeListRequestPayload(uid)
|
payload = get_attribute_list.GetAttributeListRequestPayload(uid)
|
||||||
@ -508,6 +552,8 @@ class KMIPProxy(KMIP):
|
|||||||
return self._process_response_error
|
return self._process_response_error
|
||||||
elif operation == OperationEnum.CREATE_KEY_PAIR:
|
elif operation == OperationEnum.CREATE_KEY_PAIR:
|
||||||
return self._process_create_key_pair_batch_item
|
return self._process_create_key_pair_batch_item
|
||||||
|
elif operation == OperationEnum.GET_ATTRIBUTES:
|
||||||
|
return self._process_get_attributes_batch_item
|
||||||
elif operation == OperationEnum.GET_ATTRIBUTE_LIST:
|
elif operation == OperationEnum.GET_ATTRIBUTE_LIST:
|
||||||
return self._process_get_attribute_list_batch_item
|
return self._process_get_attribute_list_batch_item
|
||||||
elif operation == OperationEnum.REKEY_KEY_PAIR:
|
elif operation == OperationEnum.REKEY_KEY_PAIR:
|
||||||
@ -520,6 +566,24 @@ class KMIPProxy(KMIP):
|
|||||||
raise ValueError("no processor for operation: {0}".format(
|
raise ValueError("no processor for operation: {0}".format(
|
||||||
operation))
|
operation))
|
||||||
|
|
||||||
|
def _process_get_attributes_batch_item(self, batch_item):
|
||||||
|
payload = batch_item.response_payload
|
||||||
|
|
||||||
|
uuid = None
|
||||||
|
attributes = None
|
||||||
|
|
||||||
|
if payload:
|
||||||
|
uuid = payload.unique_identifier
|
||||||
|
attributes = payload.attributes
|
||||||
|
|
||||||
|
return GetAttributesResult(
|
||||||
|
batch_item.result_status,
|
||||||
|
batch_item.result_reason,
|
||||||
|
batch_item.result_message,
|
||||||
|
uuid,
|
||||||
|
attributes
|
||||||
|
)
|
||||||
|
|
||||||
def _process_get_attribute_list_batch_item(self, batch_item):
|
def _process_get_attribute_list_batch_item(self, batch_item):
|
||||||
payload = batch_item.response_payload
|
payload = batch_item.response_payload
|
||||||
|
|
||||||
|
@ -158,6 +158,25 @@ class GetResult(OperationResult):
|
|||||||
self.secret = None
|
self.secret = None
|
||||||
|
|
||||||
|
|
||||||
|
class GetAttributesResult(OperationResult):
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
result_status,
|
||||||
|
result_reason=None,
|
||||||
|
result_message=None,
|
||||||
|
uuid=None,
|
||||||
|
attributes=None
|
||||||
|
):
|
||||||
|
super(GetAttributesResult, self).__init__(
|
||||||
|
result_status,
|
||||||
|
result_reason,
|
||||||
|
result_message
|
||||||
|
)
|
||||||
|
self.uuid = uuid
|
||||||
|
self.attributes = attributes
|
||||||
|
|
||||||
|
|
||||||
class GetAttributeListResult(OperationResult):
|
class GetAttributeListResult(OperationResult):
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
|
@ -665,6 +665,142 @@ class TestProxyKmipClient(testtools.TestCase):
|
|||||||
self.assertRaisesRegexp(
|
self.assertRaisesRegexp(
|
||||||
KmipOperationFailure, error_msg, client.get, *args)
|
KmipOperationFailure, error_msg, client.get, *args)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_get_attributes(self):
|
||||||
|
"""
|
||||||
|
Test that a secret's attributes can be retrieved with proper input.
|
||||||
|
"""
|
||||||
|
result = results.GetAttributesResult(
|
||||||
|
contents.ResultStatus(enums.ResultStatus.SUCCESS),
|
||||||
|
uuid='aaaaaaaa-1111-2222-3333-ffffffffffff',
|
||||||
|
attributes=[
|
||||||
|
obj.Attribute(
|
||||||
|
attribute_name=obj.Attribute.AttributeName('Name'),
|
||||||
|
attribute_index=obj.Attribute.AttributeIndex(0),
|
||||||
|
attribute_value=attr.Name(
|
||||||
|
name_value=attr.Name.NameValue('Test Name'),
|
||||||
|
name_type=attr.Name.NameType(
|
||||||
|
enums.NameType.UNINTERPRETED_TEXT_STRING
|
||||||
|
)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
obj.Attribute(
|
||||||
|
attribute_name=obj.Attribute.AttributeName('Object Type'),
|
||||||
|
attribute_value=attr.ObjectType(
|
||||||
|
enums.ObjectType.SYMMETRIC_KEY
|
||||||
|
)
|
||||||
|
)
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
with ProxyKmipClient() as client:
|
||||||
|
client.proxy.get_attributes.return_value = result
|
||||||
|
|
||||||
|
result = client.get_attributes(
|
||||||
|
'aaaaaaaa-1111-2222-3333-ffffffffffff',
|
||||||
|
['Name', 'Object Type']
|
||||||
|
)
|
||||||
|
client.proxy.get_attributes.assert_called_with(
|
||||||
|
'aaaaaaaa-1111-2222-3333-ffffffffffff',
|
||||||
|
['Name', 'Object Type']
|
||||||
|
)
|
||||||
|
self.assertIsInstance(result[0], six.string_types)
|
||||||
|
self.assertIsInstance(result[1], list)
|
||||||
|
for r in result[1]:
|
||||||
|
self.assertIsInstance(r, obj.Attribute)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_get_attributes_on_invalid_uid(self):
|
||||||
|
"""
|
||||||
|
Test that a TypeError exception is raised when trying to retrieve a
|
||||||
|
secret's attributes with an invalid ID.
|
||||||
|
"""
|
||||||
|
args = [0]
|
||||||
|
with ProxyKmipClient() as client:
|
||||||
|
self.assertRaisesRegexp(
|
||||||
|
TypeError,
|
||||||
|
"uid must be a string",
|
||||||
|
client.get_attributes,
|
||||||
|
*args
|
||||||
|
)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_get_attributes_on_invalid_attribute_names(self):
|
||||||
|
"""
|
||||||
|
Test that a TypeError exception is raised when trying to retrieve a
|
||||||
|
secret's attributes with an invalid attribute name set.
|
||||||
|
"""
|
||||||
|
args = [None, 0]
|
||||||
|
with ProxyKmipClient() as client:
|
||||||
|
self.assertRaisesRegexp(
|
||||||
|
TypeError,
|
||||||
|
"attribute_names must be a list of strings",
|
||||||
|
client.get_attributes,
|
||||||
|
*args
|
||||||
|
)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_get_attributes_on_invalid_attribute_name(self):
|
||||||
|
"""
|
||||||
|
Test that a TypeError exception is raised when trying to retrieve a
|
||||||
|
secret's attributes with an invalid attribute name.
|
||||||
|
"""
|
||||||
|
args = [None, [0]]
|
||||||
|
with ProxyKmipClient() as client:
|
||||||
|
self.assertRaisesRegexp(
|
||||||
|
TypeError,
|
||||||
|
"attribute_names must be a list of strings",
|
||||||
|
client.get_attributes,
|
||||||
|
*args
|
||||||
|
)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_get_attributes_on_closed(self):
|
||||||
|
"""
|
||||||
|
Test that a ClientConnectionNotOpen exception is raised when trying
|
||||||
|
to retrieve a secret's attributes on an unopened client connection.
|
||||||
|
"""
|
||||||
|
client = ProxyKmipClient()
|
||||||
|
args = [
|
||||||
|
'aaaaaaaa-1111-2222-3333-ffffffffffff',
|
||||||
|
['Name', 'Object Type']
|
||||||
|
]
|
||||||
|
self.assertRaises(
|
||||||
|
ClientConnectionNotOpen,
|
||||||
|
client.get_attributes,
|
||||||
|
*args
|
||||||
|
)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_get_attributes_on_operation_failure(self):
|
||||||
|
"""
|
||||||
|
Test that a KmipOperationFailure exception is raised when the
|
||||||
|
backend fails to retrieve a secret's attributes.
|
||||||
|
"""
|
||||||
|
status = enums.ResultStatus.OPERATION_FAILED
|
||||||
|
reason = enums.ResultReason.GENERAL_FAILURE
|
||||||
|
message = "Test failure message"
|
||||||
|
|
||||||
|
result = results.OperationResult(
|
||||||
|
contents.ResultStatus(status),
|
||||||
|
contents.ResultReason(reason),
|
||||||
|
contents.ResultMessage(message))
|
||||||
|
error_msg = str(KmipOperationFailure(status, reason, message))
|
||||||
|
|
||||||
|
client = ProxyKmipClient()
|
||||||
|
client.open()
|
||||||
|
client.proxy.get_attributes.return_value = result
|
||||||
|
args = ['id', []]
|
||||||
|
|
||||||
|
self.assertRaisesRegexp(
|
||||||
|
KmipOperationFailure, error_msg, client.get_attributes, *args)
|
||||||
|
|
||||||
@mock.patch('kmip.pie.client.KMIPProxy',
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
mock.MagicMock(spec_set=KMIPProxy))
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
def test_get_attribute_list(self):
|
def test_get_attribute_list(self):
|
||||||
|
@ -41,6 +41,7 @@ from kmip.core.messages.payloads.create_key_pair import \
|
|||||||
CreateKeyPairRequestPayload, CreateKeyPairResponsePayload
|
CreateKeyPairRequestPayload, CreateKeyPairResponsePayload
|
||||||
from kmip.core.messages.payloads.discover_versions import \
|
from kmip.core.messages.payloads.discover_versions import \
|
||||||
DiscoverVersionsRequestPayload, DiscoverVersionsResponsePayload
|
DiscoverVersionsRequestPayload, DiscoverVersionsResponsePayload
|
||||||
|
from kmip.core.messages.payloads import get_attributes
|
||||||
from kmip.core.messages.payloads import get_attribute_list
|
from kmip.core.messages.payloads import get_attribute_list
|
||||||
from kmip.core.messages.payloads.query import \
|
from kmip.core.messages.payloads.query import \
|
||||||
QueryRequestPayload, QueryResponsePayload
|
QueryRequestPayload, QueryResponsePayload
|
||||||
@ -60,6 +61,7 @@ from kmip.services.kmip_client import KMIPProxy
|
|||||||
|
|
||||||
from kmip.services.results import CreateKeyPairResult
|
from kmip.services.results import CreateKeyPairResult
|
||||||
from kmip.services.results import DiscoverVersionsResult
|
from kmip.services.results import DiscoverVersionsResult
|
||||||
|
from kmip.services.results import GetAttributesResult
|
||||||
from kmip.services.results import GetAttributeListResult
|
from kmip.services.results import GetAttributeListResult
|
||||||
from kmip.services.results import OperationResult
|
from kmip.services.results import OperationResult
|
||||||
from kmip.services.results import QueryResult
|
from kmip.services.results import QueryResult
|
||||||
@ -381,6 +383,33 @@ class TestKMIPClient(TestCase):
|
|||||||
protocol_versions = None
|
protocol_versions = None
|
||||||
self._test_build_discover_versions_batch_item(protocol_versions)
|
self._test_build_discover_versions_batch_item(protocol_versions)
|
||||||
|
|
||||||
|
def test_build_get_attributes_batch_item(self):
|
||||||
|
uuid = '00000000-1111-2222-3333-444444444444'
|
||||||
|
attribute_names = [
|
||||||
|
'Name',
|
||||||
|
'Object Type'
|
||||||
|
]
|
||||||
|
batch_item = self.client._build_get_attributes_batch_item(
|
||||||
|
uuid,
|
||||||
|
attribute_names
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertIsInstance(batch_item, RequestBatchItem)
|
||||||
|
self.assertIsInstance(batch_item.operation, Operation)
|
||||||
|
self.assertEqual(
|
||||||
|
OperationEnum.GET_ATTRIBUTES,
|
||||||
|
batch_item.operation.value
|
||||||
|
)
|
||||||
|
self.assertIsInstance(
|
||||||
|
batch_item.request_payload,
|
||||||
|
get_attributes.GetAttributesRequestPayload
|
||||||
|
)
|
||||||
|
self.assertEqual(uuid, batch_item.request_payload.unique_identifier)
|
||||||
|
self.assertEqual(
|
||||||
|
attribute_names,
|
||||||
|
batch_item.request_payload.attribute_names
|
||||||
|
)
|
||||||
|
|
||||||
def test_build_get_attribute_list_batch_item(self):
|
def test_build_get_attribute_list_batch_item(self):
|
||||||
uid = '00000000-1111-2222-3333-444444444444'
|
uid = '00000000-1111-2222-3333-444444444444'
|
||||||
batch_item = self.client._build_get_attribute_list_batch_item(uid)
|
batch_item = self.client._build_get_attribute_list_batch_item(uid)
|
||||||
@ -464,6 +493,12 @@ class TestKMIPClient(TestCase):
|
|||||||
self.client._get_batch_item_processor,
|
self.client._get_batch_item_processor,
|
||||||
0xA5A5A5A5)
|
0xA5A5A5A5)
|
||||||
|
|
||||||
|
expected = self.client._process_get_attributes_batch_item
|
||||||
|
observed = self.client._get_batch_item_processor(
|
||||||
|
OperationEnum.GET_ATTRIBUTES
|
||||||
|
)
|
||||||
|
self.assertEqual(expected, observed)
|
||||||
|
|
||||||
expected = self.client._process_get_attribute_list_batch_item
|
expected = self.client._process_get_attribute_list_batch_item
|
||||||
observed = self.client._get_batch_item_processor(
|
observed = self.client._get_batch_item_processor(
|
||||||
OperationEnum.GET_ATTRIBUTE_LIST)
|
OperationEnum.GET_ATTRIBUTE_LIST)
|
||||||
@ -575,6 +610,23 @@ class TestKMIPClient(TestCase):
|
|||||||
protocol_versions = None
|
protocol_versions = None
|
||||||
self._test_process_discover_versions_batch_item(protocol_versions)
|
self._test_process_discover_versions_batch_item(protocol_versions)
|
||||||
|
|
||||||
|
def test_process_get_attributes_batch_item(self):
|
||||||
|
uuid = '00000000-1111-2222-3333-444444444444'
|
||||||
|
attributes = []
|
||||||
|
payload = get_attributes.GetAttributesResponsePayload(
|
||||||
|
unique_identifier=uuid,
|
||||||
|
attributes=attributes
|
||||||
|
)
|
||||||
|
batch_item = ResponseBatchItem(
|
||||||
|
operation=Operation(OperationEnum.GET_ATTRIBUTES),
|
||||||
|
response_payload=payload
|
||||||
|
)
|
||||||
|
result = self.client._process_get_attributes_batch_item(batch_item)
|
||||||
|
|
||||||
|
self.assertIsInstance(result, GetAttributesResult)
|
||||||
|
self.assertEqual(uuid, result.uuid)
|
||||||
|
self.assertEqual(attributes, result.attributes)
|
||||||
|
|
||||||
def test_process_get_attribute_list_batch_item(self):
|
def test_process_get_attribute_list_batch_item(self):
|
||||||
uid = '00000000-1111-2222-3333-444444444444'
|
uid = '00000000-1111-2222-3333-444444444444'
|
||||||
names = ['Cryptographic Algorithm', 'Cryptographic Length']
|
names = ['Cryptographic Algorithm', 'Cryptographic Length']
|
||||||
|
Loading…
x
Reference in New Issue
Block a user