mirror of
https://github.com/OpenKMIP/PyKMIP.git
synced 2025-07-19 20:14:27 +02:00
Merge pull request #413 from OpenKMIP/feat/add-check-client
Add Check support to the KMIPProxy client
This commit is contained in:
commit
54efe7b3db
@ -31,6 +31,7 @@ from kmip.services.results import MACResult
|
||||
|
||||
from kmip.core import attributes as attr
|
||||
|
||||
from kmip.core import enums
|
||||
from kmip.core.enums import AuthenticationSuite
|
||||
from kmip.core.enums import ConformanceClause
|
||||
from kmip.core.enums import CredentialType
|
||||
@ -364,6 +365,107 @@ class KMIPProxy:
|
||||
|
||||
return result
|
||||
|
||||
def check(self,
|
||||
uuid=None,
|
||||
usage_limits_count=None,
|
||||
cryptographic_usage_mask=None,
|
||||
lease_time=None,
|
||||
credential=None):
|
||||
"""
|
||||
Check object usage according to specific constraints.
|
||||
|
||||
Args:
|
||||
uuid (string): The unique identifier of a managed cryptographic
|
||||
object that should be checked. Optional, defaults to None.
|
||||
usage_limits_count (int): An integer specifying the number of
|
||||
items that can be secured with the specified cryptographic
|
||||
object. Optional, defaults to None.
|
||||
cryptographic_usage_mask (list): A list of CryptographicUsageMask
|
||||
enumerations specifying the operations possible with the
|
||||
specified cryptographic object. Optional, defaults to None.
|
||||
lease_time (int): The number of seconds that can be leased for the
|
||||
specified cryptographic object. Optional, defaults to None.
|
||||
credential (Credential): A Credential struct containing a set of
|
||||
authorization parameters for the operation. Optional, defaults
|
||||
to None.
|
||||
|
||||
Returns:
|
||||
dict: The results of the check operation, containing the following
|
||||
key/value pairs:
|
||||
|
||||
Key | Value
|
||||
---------------------------|-----------------------------------
|
||||
'unique_identifier' | (string) The unique ID of the
|
||||
| checked cryptographic object.
|
||||
'usage_limits_count' | (int) The value provided as input
|
||||
| if the value exceeds server
|
||||
| constraints.
|
||||
'cryptographic_usage_mask' | (list) The value provided as input
|
||||
| if the value exceeds server
|
||||
| constraints.
|
||||
'lease_time' | (int) The value provided as input
|
||||
| if the value exceeds server
|
||||
| constraints.
|
||||
'result_status' | (ResultStatus) An enumeration
|
||||
| indicating the status of the
|
||||
| operation result.
|
||||
'result_reason' | (ResultReason) An enumeration
|
||||
| providing context for the result
|
||||
| status.
|
||||
'result_message' | (string) A message providing
|
||||
| additional context for the
|
||||
| operation result.
|
||||
"""
|
||||
# TODO (peter-hamilton) Push this into the Check request.
|
||||
mask = 0
|
||||
for m in cryptographic_usage_mask:
|
||||
mask |= m.value
|
||||
|
||||
operation = Operation(OperationEnum.CHECK)
|
||||
request_payload = payloads.CheckRequestPayload(
|
||||
unique_identifier=uuid,
|
||||
usage_limits_count=usage_limits_count,
|
||||
cryptographic_usage_mask=mask,
|
||||
lease_time=lease_time
|
||||
)
|
||||
batch_item = messages.RequestBatchItem(
|
||||
operation=operation,
|
||||
request_payload=request_payload
|
||||
)
|
||||
|
||||
request = self._build_request_message(credential, [batch_item])
|
||||
response = self._send_and_receive_message(request)
|
||||
batch_item = response.batch_items[0]
|
||||
payload = batch_item.response_payload
|
||||
|
||||
result = {}
|
||||
|
||||
if payload:
|
||||
result['unique_identifier'] = payload.unique_identifier
|
||||
if payload.usage_limits_count is not None:
|
||||
result['usage_limits_count'] = payload.usage_limits_count
|
||||
if payload.cryptographic_usage_mask is not None:
|
||||
# TODO (peter-hamilton) Push this into the Check response.
|
||||
masks = []
|
||||
for enumeration in enums.CryptographicUsageMask:
|
||||
if payload.cryptographic_usage_mask & enumeration.value:
|
||||
masks.append(enumeration)
|
||||
result['cryptographic_usage_mask'] = masks
|
||||
if payload.lease_time is not None:
|
||||
result['lease_time'] = payload.lease_time
|
||||
|
||||
result['result_status'] = batch_item.result_status.value
|
||||
try:
|
||||
result['result_reason'] = batch_item.result_reason.value
|
||||
except Exception:
|
||||
result['result_reason'] = batch_item.result_reason
|
||||
try:
|
||||
result['result_message'] = batch_item.result_message.value
|
||||
except Exception:
|
||||
result['result_message'] = batch_item.result_message
|
||||
|
||||
return result
|
||||
|
||||
def get(self, uuid=None, key_format_type=None, key_compression_type=None,
|
||||
key_wrapping_specification=None, credential=None):
|
||||
return self._get(
|
||||
|
@ -707,6 +707,60 @@ class TestKMIPClient(TestCase):
|
||||
self.client._create_socket(sock)
|
||||
self.assertEqual(ssl.SSLSocket, type(self.client.socket))
|
||||
|
||||
@mock.patch(
|
||||
'kmip.services.kmip_client.KMIPProxy._build_request_message'
|
||||
)
|
||||
@mock.patch(
|
||||
'kmip.services.kmip_client.KMIPProxy._send_and_receive_message'
|
||||
)
|
||||
def test_check(self, send_mock, build_mock):
|
||||
"""
|
||||
Test that the client can correctly build, send, and process a Check
|
||||
request.
|
||||
"""
|
||||
payload = payloads.CheckResponsePayload(
|
||||
unique_identifier='1',
|
||||
usage_limits_count=100,
|
||||
cryptographic_usage_mask=12,
|
||||
lease_time=10000
|
||||
)
|
||||
batch_item = ResponseBatchItem(
|
||||
operation=Operation(OperationEnum.CHECK),
|
||||
result_status=ResultStatus(ResultStatusEnum.SUCCESS),
|
||||
response_payload=payload
|
||||
)
|
||||
response = ResponseMessage(batch_items=[batch_item])
|
||||
|
||||
build_mock.return_value = None
|
||||
send_mock.return_value = response
|
||||
|
||||
result = self.client.check(
|
||||
'1',
|
||||
100,
|
||||
[
|
||||
enums.CryptographicUsageMask.ENCRYPT,
|
||||
enums.CryptographicUsageMask.DECRYPT
|
||||
],
|
||||
10000
|
||||
)
|
||||
|
||||
self.assertEqual('1', result.get('unique_identifier'))
|
||||
self.assertEqual(100, result.get('usage_limits_count'))
|
||||
self.assertEqual(
|
||||
[
|
||||
enums.CryptographicUsageMask.ENCRYPT,
|
||||
enums.CryptographicUsageMask.DECRYPT
|
||||
],
|
||||
result.get('cryptographic_usage_mask')
|
||||
)
|
||||
self.assertEqual(10000, result.get('lease_time'))
|
||||
self.assertEqual(
|
||||
ResultStatusEnum.SUCCESS,
|
||||
result.get('result_status')
|
||||
)
|
||||
self.assertEqual(None, result.get('result_reason'))
|
||||
self.assertEqual(None, result.get('result_message'))
|
||||
|
||||
@mock.patch(
|
||||
'kmip.services.kmip_client.KMIPProxy._build_request_message'
|
||||
)
|
||||
|
Loading…
x
Reference in New Issue
Block a user