Add Check support to the ProxyKmipClient

This change adds Check operation support to the ProxyKmipClient.
The client unit test suite has been updated to cover the new code.

Partially addresses #405
This commit is contained in:
Peter Hamilton 2018-04-06 00:49:35 -04:00
parent 54efe7b3db
commit 804a59d75d
2 changed files with 212 additions and 0 deletions

View File

@ -565,6 +565,63 @@ class ProxyKmipClient(object):
message = result.result_message.value
raise exceptions.KmipOperationFailure(status, reason, message)
@is_connected
def check(self,
uid=None,
usage_limits_count=None,
cryptographic_usage_mask=None,
lease_time=None):
"""
Check the constraints for a managed object.
Args:
uid (string): The unique ID of the managed object to check.
Optional, defaults to None.
usage_limits_count (int): The number of items that can be secured
with the specified managed object. Optional, defaults to None.
cryptographic_usage_mask (list): A list of CryptographicUsageMask
enumerations specifying the operations possible with the
specified managed object. Optional, defaults to None.
lease_time (int): The number of seconds that can be leased for the
specified managed object. Optional, defaults to None.
"""
if uid is not None:
if not isinstance(uid, six.string_types):
raise TypeError("The unique identifier must be a string.")
if usage_limits_count is not None:
if not isinstance(usage_limits_count, six.integer_types):
raise TypeError("The usage limits count must be an integer.")
if cryptographic_usage_mask is not None:
if not isinstance(cryptographic_usage_mask, list) or \
not all(isinstance(
x,
enums.CryptographicUsageMask
) for x in cryptographic_usage_mask):
raise TypeError(
"The cryptographic usage mask must be a list of "
"CryptographicUsageMask enumerations."
)
if lease_time is not None:
if not isinstance(lease_time, six.integer_types):
raise TypeError("The lease time must be an integer.")
result = self.proxy.check(
uid,
usage_limits_count,
cryptographic_usage_mask,
lease_time
)
status = result.get('result_status')
if status == enums.ResultStatus.SUCCESS:
return result.get('unique_identifier')
else:
raise exceptions.KmipOperationFailure(
status,
result.get('result_reason'),
result.get('result_message')
)
@is_connected
def get(self, uid=None, key_wrapping_specification=None):
"""

View File

@ -717,6 +717,161 @@ class TestProxyKmipClient(testtools.TestCase):
KmipOperationFailure, error_msg,
client.create_key_pair, *args)
@mock.patch(
'kmip.pie.client.KMIPProxy', mock.MagicMock(spec_set=KMIPProxy)
)
def test_check(self):
"""
Test that the client can check an object.
"""
result = {
'unique_identifier': '1',
'result_status': enums.ResultStatus.SUCCESS
}
client = ProxyKmipClient()
client.open()
client.proxy.check.return_value = result
checked_id = client.check(
uid='1',
usage_limits_count=100,
cryptographic_usage_mask=[
enums.CryptographicUsageMask.ENCRYPT,
enums.CryptographicUsageMask.DECRYPT
],
lease_time=10000
)
self.assertEqual('1', checked_id)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_check_on_invalid_unique_identifier(self):
"""
Test that a TypeError exception is raised when trying to check an
object with an invalid unique identifier.
"""
kwargs = {'uid': 0}
with ProxyKmipClient() as client:
self.assertRaisesRegexp(
TypeError,
"The unique identifier must be a string.",
client.check,
**kwargs
)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_check_on_invalid_usage_limits_count(self):
"""
Test that a TypeError exception is raised when trying to check an
object with an invalid usage limits count.
"""
kwargs = {'usage_limits_count': 'invalid'}
with ProxyKmipClient() as client:
self.assertRaisesRegexp(
TypeError,
"The usage limits count must be an integer.",
client.check,
**kwargs
)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_check_on_invalid_cryptographic_usage_mask(self):
"""
Test that a TypeError exception is raised when trying to check an
object with an invalid cryptographic usage mask.
"""
kwargs = {'cryptographic_usage_mask': 'invalid'}
with ProxyKmipClient() as client:
self.assertRaisesRegexp(
TypeError,
"The cryptographic usage mask must be a list of "
"CryptographicUsageMask enumerations.",
client.check,
**kwargs
)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_check_on_invalid_lease_time(self):
"""
Test that a TypeError exception is raised when trying to check an
object with an invalid lease time.
"""
kwargs = {'lease_time': 'invalid'}
with ProxyKmipClient() as client:
self.assertRaisesRegexp(
TypeError,
"The lease time must be an integer.",
client.check,
**kwargs
)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_check_on_closed(self):
"""
Test that a ClientConnectionNotOpen exception is raised when trying
to check an object on an unopened client connection.
"""
client = ProxyKmipClient()
kwargs = {
'uid': '1',
'usage_limits_count': 100,
'cryptographic_usage_mask': [
enums.CryptographicUsageMask.ENCRYPT,
enums.CryptographicUsageMask.DECRYPT
],
'lease_time': 10000
}
self.assertRaises(
ClientConnectionNotOpen,
client.check,
**kwargs
)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_check_on_operation_failure(self):
"""
Test that a KmipOperationFailure exception is raised when the
backend fails to derive a key.
"""
status = enums.ResultStatus.OPERATION_FAILED
reason = enums.ResultReason.GENERAL_FAILURE
message = "Test failure message"
result = {
'result_status': status,
'result_reason': reason,
'result_message': message
}
error_message = str(KmipOperationFailure(status, reason, message))
client = ProxyKmipClient()
client.open()
client.proxy.check.return_value = result
kwargs = {
'uid': '1',
'usage_limits_count': 100,
'cryptographic_usage_mask': [
enums.CryptographicUsageMask.ENCRYPT,
enums.CryptographicUsageMask.DECRYPT
],
'lease_time': 10000
}
self.assertRaisesRegexp(
KmipOperationFailure,
error_message,
client.check,
**kwargs
)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_get(self):