mirror of https://github.com/OpenKMIP/PyKMIP.git
Merge pull request #414 from OpenKMIP/feat/add-check-pie
Add Check support to the ProxyKmipClient
This commit is contained in:
commit
6122579844
|
@ -565,6 +565,63 @@ class ProxyKmipClient(object):
|
|||
message = result.result_message.value
|
||||
raise exceptions.KmipOperationFailure(status, reason, message)
|
||||
|
||||
@is_connected
|
||||
def check(self,
|
||||
uid=None,
|
||||
usage_limits_count=None,
|
||||
cryptographic_usage_mask=None,
|
||||
lease_time=None):
|
||||
"""
|
||||
Check the constraints for a managed object.
|
||||
|
||||
Args:
|
||||
uid (string): The unique ID of the managed object to check.
|
||||
Optional, defaults to None.
|
||||
usage_limits_count (int): The number of items that can be secured
|
||||
with the specified managed object. Optional, defaults to None.
|
||||
cryptographic_usage_mask (list): A list of CryptographicUsageMask
|
||||
enumerations specifying the operations possible with the
|
||||
specified managed object. Optional, defaults to None.
|
||||
lease_time (int): The number of seconds that can be leased for the
|
||||
specified managed object. Optional, defaults to None.
|
||||
"""
|
||||
if uid is not None:
|
||||
if not isinstance(uid, six.string_types):
|
||||
raise TypeError("The unique identifier must be a string.")
|
||||
if usage_limits_count is not None:
|
||||
if not isinstance(usage_limits_count, six.integer_types):
|
||||
raise TypeError("The usage limits count must be an integer.")
|
||||
if cryptographic_usage_mask is not None:
|
||||
if not isinstance(cryptographic_usage_mask, list) or \
|
||||
not all(isinstance(
|
||||
x,
|
||||
enums.CryptographicUsageMask
|
||||
) for x in cryptographic_usage_mask):
|
||||
raise TypeError(
|
||||
"The cryptographic usage mask must be a list of "
|
||||
"CryptographicUsageMask enumerations."
|
||||
)
|
||||
if lease_time is not None:
|
||||
if not isinstance(lease_time, six.integer_types):
|
||||
raise TypeError("The lease time must be an integer.")
|
||||
|
||||
result = self.proxy.check(
|
||||
uid,
|
||||
usage_limits_count,
|
||||
cryptographic_usage_mask,
|
||||
lease_time
|
||||
)
|
||||
|
||||
status = result.get('result_status')
|
||||
if status == enums.ResultStatus.SUCCESS:
|
||||
return result.get('unique_identifier')
|
||||
else:
|
||||
raise exceptions.KmipOperationFailure(
|
||||
status,
|
||||
result.get('result_reason'),
|
||||
result.get('result_message')
|
||||
)
|
||||
|
||||
@is_connected
|
||||
def get(self, uid=None, key_wrapping_specification=None):
|
||||
"""
|
||||
|
|
|
@ -717,6 +717,161 @@ class TestProxyKmipClient(testtools.TestCase):
|
|||
KmipOperationFailure, error_msg,
|
||||
client.create_key_pair, *args)
|
||||
|
||||
@mock.patch(
|
||||
'kmip.pie.client.KMIPProxy', mock.MagicMock(spec_set=KMIPProxy)
|
||||
)
|
||||
def test_check(self):
|
||||
"""
|
||||
Test that the client can check an object.
|
||||
"""
|
||||
result = {
|
||||
'unique_identifier': '1',
|
||||
'result_status': enums.ResultStatus.SUCCESS
|
||||
}
|
||||
|
||||
client = ProxyKmipClient()
|
||||
client.open()
|
||||
client.proxy.check.return_value = result
|
||||
|
||||
checked_id = client.check(
|
||||
uid='1',
|
||||
usage_limits_count=100,
|
||||
cryptographic_usage_mask=[
|
||||
enums.CryptographicUsageMask.ENCRYPT,
|
||||
enums.CryptographicUsageMask.DECRYPT
|
||||
],
|
||||
lease_time=10000
|
||||
)
|
||||
|
||||
self.assertEqual('1', checked_id)
|
||||
|
||||
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||
mock.MagicMock(spec_set=KMIPProxy))
|
||||
def test_check_on_invalid_unique_identifier(self):
|
||||
"""
|
||||
Test that a TypeError exception is raised when trying to check an
|
||||
object with an invalid unique identifier.
|
||||
"""
|
||||
kwargs = {'uid': 0}
|
||||
with ProxyKmipClient() as client:
|
||||
self.assertRaisesRegexp(
|
||||
TypeError,
|
||||
"The unique identifier must be a string.",
|
||||
client.check,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||
mock.MagicMock(spec_set=KMIPProxy))
|
||||
def test_check_on_invalid_usage_limits_count(self):
|
||||
"""
|
||||
Test that a TypeError exception is raised when trying to check an
|
||||
object with an invalid usage limits count.
|
||||
"""
|
||||
kwargs = {'usage_limits_count': 'invalid'}
|
||||
with ProxyKmipClient() as client:
|
||||
self.assertRaisesRegexp(
|
||||
TypeError,
|
||||
"The usage limits count must be an integer.",
|
||||
client.check,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||
mock.MagicMock(spec_set=KMIPProxy))
|
||||
def test_check_on_invalid_cryptographic_usage_mask(self):
|
||||
"""
|
||||
Test that a TypeError exception is raised when trying to check an
|
||||
object with an invalid cryptographic usage mask.
|
||||
"""
|
||||
kwargs = {'cryptographic_usage_mask': 'invalid'}
|
||||
with ProxyKmipClient() as client:
|
||||
self.assertRaisesRegexp(
|
||||
TypeError,
|
||||
"The cryptographic usage mask must be a list of "
|
||||
"CryptographicUsageMask enumerations.",
|
||||
client.check,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||
mock.MagicMock(spec_set=KMIPProxy))
|
||||
def test_check_on_invalid_lease_time(self):
|
||||
"""
|
||||
Test that a TypeError exception is raised when trying to check an
|
||||
object with an invalid lease time.
|
||||
"""
|
||||
kwargs = {'lease_time': 'invalid'}
|
||||
with ProxyKmipClient() as client:
|
||||
self.assertRaisesRegexp(
|
||||
TypeError,
|
||||
"The lease time must be an integer.",
|
||||
client.check,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||
mock.MagicMock(spec_set=KMIPProxy))
|
||||
def test_check_on_closed(self):
|
||||
"""
|
||||
Test that a ClientConnectionNotOpen exception is raised when trying
|
||||
to check an object on an unopened client connection.
|
||||
"""
|
||||
client = ProxyKmipClient()
|
||||
kwargs = {
|
||||
'uid': '1',
|
||||
'usage_limits_count': 100,
|
||||
'cryptographic_usage_mask': [
|
||||
enums.CryptographicUsageMask.ENCRYPT,
|
||||
enums.CryptographicUsageMask.DECRYPT
|
||||
],
|
||||
'lease_time': 10000
|
||||
}
|
||||
|
||||
self.assertRaises(
|
||||
ClientConnectionNotOpen,
|
||||
client.check,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||
mock.MagicMock(spec_set=KMIPProxy))
|
||||
def test_check_on_operation_failure(self):
|
||||
"""
|
||||
Test that a KmipOperationFailure exception is raised when the
|
||||
backend fails to derive a key.
|
||||
"""
|
||||
status = enums.ResultStatus.OPERATION_FAILED
|
||||
reason = enums.ResultReason.GENERAL_FAILURE
|
||||
message = "Test failure message"
|
||||
|
||||
result = {
|
||||
'result_status': status,
|
||||
'result_reason': reason,
|
||||
'result_message': message
|
||||
}
|
||||
error_message = str(KmipOperationFailure(status, reason, message))
|
||||
|
||||
client = ProxyKmipClient()
|
||||
client.open()
|
||||
client.proxy.check.return_value = result
|
||||
kwargs = {
|
||||
'uid': '1',
|
||||
'usage_limits_count': 100,
|
||||
'cryptographic_usage_mask': [
|
||||
enums.CryptographicUsageMask.ENCRYPT,
|
||||
enums.CryptographicUsageMask.DECRYPT
|
||||
],
|
||||
'lease_time': 10000
|
||||
}
|
||||
|
||||
self.assertRaisesRegexp(
|
||||
KmipOperationFailure,
|
||||
error_message,
|
||||
client.check,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||
mock.MagicMock(spec_set=KMIPProxy))
|
||||
def test_get(self):
|
||||
|
|
Loading…
Reference in New Issue