mirror of https://github.com/OpenKMIP/PyKMIP.git
Adding KMIPProxy support for the GetAttributeList operation
This change adds support for the GetAttributeList operation to the KMIPProxy client. It adds a new result object for the operation along with an integration test demonstrating how the operation can be used. Client unit test cases are also included.
This commit is contained in:
parent
c21fe63b00
commit
c763b69af1
|
@ -19,6 +19,7 @@ from kmip.services.results import CreateKeyPairResult
|
|||
from kmip.services.results import DestroyResult
|
||||
from kmip.services.results import DiscoverVersionsResult
|
||||
from kmip.services.results import GetResult
|
||||
from kmip.services.results import GetAttributeListResult
|
||||
from kmip.services.results import LocateResult
|
||||
from kmip.services.results import QueryResult
|
||||
from kmip.services.results import RegisterResult
|
||||
|
@ -50,6 +51,7 @@ from kmip.core.messages.payloads import create_key_pair
|
|||
from kmip.core.messages.payloads import destroy
|
||||
from kmip.core.messages.payloads import discover_versions
|
||||
from kmip.core.messages.payloads import get
|
||||
from kmip.core.messages.payloads import get_attribute_list
|
||||
from kmip.core.messages.payloads import locate
|
||||
from kmip.core.messages.payloads import query
|
||||
from kmip.core.messages.payloads import rekey_key_pair
|
||||
|
@ -280,6 +282,25 @@ class KMIPProxy(KMIP):
|
|||
key_wrapping_specification=key_wrapping_specification,
|
||||
credential=credential)
|
||||
|
||||
def get_attribute_list(self, uid=None):
|
||||
"""
|
||||
Send a GetAttributeList request to the server.
|
||||
|
||||
Args:
|
||||
uid (string): The ID of the managed object with which the retrieved
|
||||
attribute names should be associated.
|
||||
|
||||
Returns:
|
||||
result (GetAttributeListResult): A structure containing the results
|
||||
of the operation.
|
||||
"""
|
||||
batch_item = self._build_get_attribute_list_batch_item(uid)
|
||||
|
||||
request = self._build_request_message(None, [batch_item])
|
||||
response = self._send_and_receive_message(request)
|
||||
results = self._process_batch_items(response)
|
||||
return results[0]
|
||||
|
||||
def revoke(self, uuid, reason, message=None, credential=None):
|
||||
return self._revoke(unique_identifier=uuid,
|
||||
revocation_code=reason,
|
||||
|
@ -434,6 +455,13 @@ class KMIPProxy(KMIP):
|
|||
operation=operation, request_payload=payload)
|
||||
return batch_item
|
||||
|
||||
def _build_get_attribute_list_batch_item(self, uid=None):
|
||||
operation = Operation(OperationEnum.GET_ATTRIBUTE_LIST)
|
||||
payload = get_attribute_list.GetAttributeListRequestPayload(uid)
|
||||
batch_item = messages.RequestBatchItem(
|
||||
operation=operation, request_payload=payload)
|
||||
return batch_item
|
||||
|
||||
def _build_discover_versions_batch_item(self, protocol_versions=None):
|
||||
operation = Operation(OperationEnum.DISCOVER_VERSIONS)
|
||||
|
||||
|
@ -456,6 +484,8 @@ class KMIPProxy(KMIP):
|
|||
def _get_batch_item_processor(self, operation):
|
||||
if operation == OperationEnum.CREATE_KEY_PAIR:
|
||||
return self._process_create_key_pair_batch_item
|
||||
elif operation == OperationEnum.GET_ATTRIBUTE_LIST:
|
||||
return self._process_get_attribute_list_batch_item
|
||||
elif operation == OperationEnum.REKEY_KEY_PAIR:
|
||||
return self._process_rekey_key_pair_batch_item
|
||||
elif operation == OperationEnum.QUERY:
|
||||
|
@ -466,6 +496,23 @@ class KMIPProxy(KMIP):
|
|||
raise ValueError("no processor for operation: {0}".format(
|
||||
operation))
|
||||
|
||||
def _process_get_attribute_list_batch_item(self, batch_item):
|
||||
payload = batch_item.response_payload
|
||||
|
||||
uid = None
|
||||
names = None
|
||||
|
||||
if payload:
|
||||
uid = payload.uid
|
||||
names = payload.attribute_names
|
||||
|
||||
return GetAttributeListResult(
|
||||
batch_item.result_status,
|
||||
batch_item.result_reason,
|
||||
batch_item.result_message,
|
||||
uid,
|
||||
names)
|
||||
|
||||
def _process_key_pair_batch_item(self, batch_item, result):
|
||||
payload = batch_item.response_payload
|
||||
|
||||
|
|
|
@ -158,6 +158,21 @@ class GetResult(OperationResult):
|
|||
self.secret = None
|
||||
|
||||
|
||||
class GetAttributeListResult(OperationResult):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
result_status,
|
||||
result_reason=None,
|
||||
result_message=None,
|
||||
uid=None,
|
||||
names=None):
|
||||
super(GetAttributeListResult, self).__init__(
|
||||
result_status, result_reason, result_message)
|
||||
self.uid = uid
|
||||
self.names = names
|
||||
|
||||
|
||||
class DestroyResult(OperationResult):
|
||||
|
||||
def __init__(self,
|
||||
|
|
|
@ -1152,3 +1152,43 @@ class TestIntegration(TestCase):
|
|||
type(opaque_obj_result_destroyed_result.result_reason.enum)
|
||||
|
||||
self.assertEqual(expected, opaque_obj_observed)
|
||||
|
||||
def test_symmetric_key_create_getattributelist_destroy(self):
|
||||
"""
|
||||
Test that the GetAttributeList operation works for a newly created key.
|
||||
"""
|
||||
key_name = 'Integration Test - Create-GetAttributeList-Destroy Key'
|
||||
result = self._create_symmetric_key(key_name=key_name)
|
||||
uid = result.uuid.value
|
||||
|
||||
self.assertEqual(ResultStatus.SUCCESS, result.result_status.enum)
|
||||
self.assertEqual(ObjectType.SYMMETRIC_KEY, result.object_type.enum)
|
||||
self.assertIsInstance(uid, str)
|
||||
|
||||
try:
|
||||
result = self.client.get_attribute_list(uid)
|
||||
|
||||
self.assertEqual(ResultStatus.SUCCESS, result.result_status.enum)
|
||||
self.assertIsInstance(result.uid, str)
|
||||
self.assertIsInstance(result.names, list)
|
||||
|
||||
for name in result.names:
|
||||
self.assertIsInstance(name, str)
|
||||
|
||||
expected = [
|
||||
'Cryptographic Algorithm',
|
||||
'Cryptographic Length',
|
||||
'Cryptographic Usage Mask',
|
||||
'Unique Identifier',
|
||||
'Object Type']
|
||||
for name in expected:
|
||||
self.assertIn(name, result.names)
|
||||
|
||||
finally:
|
||||
result = self.client.destroy(uid)
|
||||
self.assertEqual(ResultStatus.SUCCESS, result.result_status.enum)
|
||||
|
||||
result = self.client.get(uuid=result.uuid.value, credential=None)
|
||||
|
||||
self.assertEqual(
|
||||
ResultStatus.OPERATION_FAILED, result.result_status.enum)
|
||||
|
|
|
@ -36,6 +36,7 @@ from kmip.core.messages.payloads.create_key_pair import \
|
|||
CreateKeyPairRequestPayload, CreateKeyPairResponsePayload
|
||||
from kmip.core.messages.payloads.discover_versions import \
|
||||
DiscoverVersionsRequestPayload, DiscoverVersionsResponsePayload
|
||||
from kmip.core.messages.payloads import get_attribute_list
|
||||
from kmip.core.messages.payloads.query import \
|
||||
QueryRequestPayload, QueryResponsePayload
|
||||
from kmip.core.messages.payloads.rekey_key_pair import \
|
||||
|
@ -54,6 +55,7 @@ from kmip.services.kmip_client import KMIPProxy
|
|||
|
||||
from kmip.services.results import CreateKeyPairResult
|
||||
from kmip.services.results import DiscoverVersionsResult
|
||||
from kmip.services.results import GetAttributeListResult
|
||||
from kmip.services.results import QueryResult
|
||||
from kmip.services.results import RekeyKeyPairResult
|
||||
|
||||
|
@ -314,6 +316,19 @@ class TestKMIPClient(TestCase):
|
|||
protocol_versions = None
|
||||
self._test_build_discover_versions_batch_item(protocol_versions)
|
||||
|
||||
def test_build_get_attribute_list_batch_item(self):
|
||||
uid = '00000000-1111-2222-3333-444444444444'
|
||||
batch_item = self.client._build_get_attribute_list_batch_item(uid)
|
||||
|
||||
self.assertIsInstance(batch_item, RequestBatchItem)
|
||||
self.assertIsInstance(batch_item.operation, Operation)
|
||||
self.assertEqual(
|
||||
OperationEnum.GET_ATTRIBUTE_LIST, batch_item.operation.enum)
|
||||
self.assertIsInstance(
|
||||
batch_item.request_payload,
|
||||
get_attribute_list.GetAttributeListRequestPayload)
|
||||
self.assertEqual(uid, batch_item.request_payload.uid)
|
||||
|
||||
def test_process_batch_items(self):
|
||||
batch_item = ResponseBatchItem(
|
||||
operation=Operation(OperationEnum.CREATE_KEY_PAIR),
|
||||
|
@ -361,6 +376,11 @@ class TestKMIPClient(TestCase):
|
|||
self.assertRaisesRegexp(ValueError, "no processor for operation",
|
||||
self.client._get_batch_item_processor, None)
|
||||
|
||||
expected = self.client._process_get_attribute_list_batch_item
|
||||
observed = self.client._get_batch_item_processor(
|
||||
OperationEnum.GET_ATTRIBUTE_LIST)
|
||||
self.assertEqual(expected, observed)
|
||||
|
||||
def _test_equality(self, expected, observed):
|
||||
msg = "expected {0}, observed {1}".format(expected, observed)
|
||||
self.assertEqual(expected, observed, msg)
|
||||
|
@ -467,6 +487,20 @@ class TestKMIPClient(TestCase):
|
|||
protocol_versions = None
|
||||
self._test_process_discover_versions_batch_item(protocol_versions)
|
||||
|
||||
def test_process_get_attribute_list_batch_item(self):
|
||||
uid = '00000000-1111-2222-3333-444444444444'
|
||||
names = ['Cryptographic Algorithm', 'Cryptographic Length']
|
||||
payload = get_attribute_list.GetAttributeListResponsePayload(
|
||||
uid=uid, attribute_names=names)
|
||||
batch_item = ResponseBatchItem(
|
||||
operation=Operation(OperationEnum.GET_ATTRIBUTE_LIST),
|
||||
response_payload=payload)
|
||||
result = self.client._process_get_attribute_list_batch_item(batch_item)
|
||||
|
||||
self.assertIsInstance(result, GetAttributeListResult)
|
||||
self.assertEqual(uid, result.uid)
|
||||
self.assertEqual(names, result.names)
|
||||
|
||||
|
||||
class TestClientProfileInformation(TestCase):
|
||||
"""
|
||||
|
|
Loading…
Reference in New Issue