Merge pull request #73 from OpenKMIP/feat/add-get-attribute-list-proxy

Adding KMIPProxy support for the GetAttributeList operation
This commit is contained in:
Peter Hamilton 2015-09-04 08:49:07 -04:00
commit c2e6ffa048
4 changed files with 136 additions and 0 deletions

View File

@ -19,6 +19,7 @@ from kmip.services.results import CreateKeyPairResult
from kmip.services.results import DestroyResult
from kmip.services.results import DiscoverVersionsResult
from kmip.services.results import GetResult
from kmip.services.results import GetAttributeListResult
from kmip.services.results import LocateResult
from kmip.services.results import QueryResult
from kmip.services.results import RegisterResult
@ -50,6 +51,7 @@ from kmip.core.messages.payloads import create_key_pair
from kmip.core.messages.payloads import destroy
from kmip.core.messages.payloads import discover_versions
from kmip.core.messages.payloads import get
from kmip.core.messages.payloads import get_attribute_list
from kmip.core.messages.payloads import locate
from kmip.core.messages.payloads import query
from kmip.core.messages.payloads import rekey_key_pair
@ -280,6 +282,25 @@ class KMIPProxy(KMIP):
key_wrapping_specification=key_wrapping_specification,
credential=credential)
def get_attribute_list(self, uid=None):
"""
Send a GetAttributeList request to the server.
Args:
uid (string): The ID of the managed object with which the retrieved
attribute names should be associated.
Returns:
result (GetAttributeListResult): A structure containing the results
of the operation.
"""
batch_item = self._build_get_attribute_list_batch_item(uid)
request = self._build_request_message(None, [batch_item])
response = self._send_and_receive_message(request)
results = self._process_batch_items(response)
return results[0]
def revoke(self, uuid, reason, message=None, credential=None):
return self._revoke(unique_identifier=uuid,
revocation_code=reason,
@ -434,6 +455,13 @@ class KMIPProxy(KMIP):
operation=operation, request_payload=payload)
return batch_item
def _build_get_attribute_list_batch_item(self, uid=None):
operation = Operation(OperationEnum.GET_ATTRIBUTE_LIST)
payload = get_attribute_list.GetAttributeListRequestPayload(uid)
batch_item = messages.RequestBatchItem(
operation=operation, request_payload=payload)
return batch_item
def _build_discover_versions_batch_item(self, protocol_versions=None):
operation = Operation(OperationEnum.DISCOVER_VERSIONS)
@ -456,6 +484,8 @@ class KMIPProxy(KMIP):
def _get_batch_item_processor(self, operation):
if operation == OperationEnum.CREATE_KEY_PAIR:
return self._process_create_key_pair_batch_item
elif operation == OperationEnum.GET_ATTRIBUTE_LIST:
return self._process_get_attribute_list_batch_item
elif operation == OperationEnum.REKEY_KEY_PAIR:
return self._process_rekey_key_pair_batch_item
elif operation == OperationEnum.QUERY:
@ -466,6 +496,23 @@ class KMIPProxy(KMIP):
raise ValueError("no processor for operation: {0}".format(
operation))
def _process_get_attribute_list_batch_item(self, batch_item):
payload = batch_item.response_payload
uid = None
names = None
if payload:
uid = payload.uid
names = payload.attribute_names
return GetAttributeListResult(
batch_item.result_status,
batch_item.result_reason,
batch_item.result_message,
uid,
names)
def _process_key_pair_batch_item(self, batch_item, result):
payload = batch_item.response_payload

View File

@ -158,6 +158,21 @@ class GetResult(OperationResult):
self.secret = None
class GetAttributeListResult(OperationResult):
def __init__(
self,
result_status,
result_reason=None,
result_message=None,
uid=None,
names=None):
super(GetAttributeListResult, self).__init__(
result_status, result_reason, result_message)
self.uid = uid
self.names = names
class DestroyResult(OperationResult):
def __init__(self,

View File

@ -1152,3 +1152,43 @@ class TestIntegration(TestCase):
type(opaque_obj_result_destroyed_result.result_reason.enum)
self.assertEqual(expected, opaque_obj_observed)
def test_symmetric_key_create_getattributelist_destroy(self):
"""
Test that the GetAttributeList operation works for a newly created key.
"""
key_name = 'Integration Test - Create-GetAttributeList-Destroy Key'
result = self._create_symmetric_key(key_name=key_name)
uid = result.uuid.value
self.assertEqual(ResultStatus.SUCCESS, result.result_status.enum)
self.assertEqual(ObjectType.SYMMETRIC_KEY, result.object_type.enum)
self.assertIsInstance(uid, str)
try:
result = self.client.get_attribute_list(uid)
self.assertEqual(ResultStatus.SUCCESS, result.result_status.enum)
self.assertIsInstance(result.uid, str)
self.assertIsInstance(result.names, list)
for name in result.names:
self.assertIsInstance(name, str)
expected = [
'Cryptographic Algorithm',
'Cryptographic Length',
'Cryptographic Usage Mask',
'Unique Identifier',
'Object Type']
for name in expected:
self.assertIn(name, result.names)
finally:
result = self.client.destroy(uid)
self.assertEqual(ResultStatus.SUCCESS, result.result_status.enum)
result = self.client.get(uuid=result.uuid.value, credential=None)
self.assertEqual(
ResultStatus.OPERATION_FAILED, result.result_status.enum)

View File

@ -36,6 +36,7 @@ from kmip.core.messages.payloads.create_key_pair import \
CreateKeyPairRequestPayload, CreateKeyPairResponsePayload
from kmip.core.messages.payloads.discover_versions import \
DiscoverVersionsRequestPayload, DiscoverVersionsResponsePayload
from kmip.core.messages.payloads import get_attribute_list
from kmip.core.messages.payloads.query import \
QueryRequestPayload, QueryResponsePayload
from kmip.core.messages.payloads.rekey_key_pair import \
@ -54,6 +55,7 @@ from kmip.services.kmip_client import KMIPProxy
from kmip.services.results import CreateKeyPairResult
from kmip.services.results import DiscoverVersionsResult
from kmip.services.results import GetAttributeListResult
from kmip.services.results import QueryResult
from kmip.services.results import RekeyKeyPairResult
@ -314,6 +316,19 @@ class TestKMIPClient(TestCase):
protocol_versions = None
self._test_build_discover_versions_batch_item(protocol_versions)
def test_build_get_attribute_list_batch_item(self):
uid = '00000000-1111-2222-3333-444444444444'
batch_item = self.client._build_get_attribute_list_batch_item(uid)
self.assertIsInstance(batch_item, RequestBatchItem)
self.assertIsInstance(batch_item.operation, Operation)
self.assertEqual(
OperationEnum.GET_ATTRIBUTE_LIST, batch_item.operation.enum)
self.assertIsInstance(
batch_item.request_payload,
get_attribute_list.GetAttributeListRequestPayload)
self.assertEqual(uid, batch_item.request_payload.uid)
def test_process_batch_items(self):
batch_item = ResponseBatchItem(
operation=Operation(OperationEnum.CREATE_KEY_PAIR),
@ -361,6 +376,11 @@ class TestKMIPClient(TestCase):
self.assertRaisesRegexp(ValueError, "no processor for operation",
self.client._get_batch_item_processor, None)
expected = self.client._process_get_attribute_list_batch_item
observed = self.client._get_batch_item_processor(
OperationEnum.GET_ATTRIBUTE_LIST)
self.assertEqual(expected, observed)
def _test_equality(self, expected, observed):
msg = "expected {0}, observed {1}".format(expected, observed)
self.assertEqual(expected, observed, msg)
@ -467,6 +487,20 @@ class TestKMIPClient(TestCase):
protocol_versions = None
self._test_process_discover_versions_batch_item(protocol_versions)
def test_process_get_attribute_list_batch_item(self):
uid = '00000000-1111-2222-3333-444444444444'
names = ['Cryptographic Algorithm', 'Cryptographic Length']
payload = get_attribute_list.GetAttributeListResponsePayload(
uid=uid, attribute_names=names)
batch_item = ResponseBatchItem(
operation=Operation(OperationEnum.GET_ATTRIBUTE_LIST),
response_payload=payload)
result = self.client._process_get_attribute_list_batch_item(batch_item)
self.assertIsInstance(result, GetAttributeListResult)
self.assertEqual(uid, result.uid)
self.assertEqual(names, result.names)
class TestClientProfileInformation(TestCase):
"""