From 804a59d75dfa898960be91ea33ed46137328aa04 Mon Sep 17 00:00:00 2001 From: Peter Hamilton Date: Fri, 6 Apr 2018 00:49:35 -0400 Subject: [PATCH] Add Check support to the ProxyKmipClient This change adds Check operation support to the ProxyKmipClient. The client unit test suite has been updated to cover the new code. Partially addresses #405 --- kmip/pie/client.py | 57 +++++++++++ kmip/tests/unit/pie/test_client.py | 155 +++++++++++++++++++++++++++++ 2 files changed, 212 insertions(+) diff --git a/kmip/pie/client.py b/kmip/pie/client.py index 88d168e..c4296cc 100644 --- a/kmip/pie/client.py +++ b/kmip/pie/client.py @@ -565,6 +565,63 @@ class ProxyKmipClient(object): message = result.result_message.value raise exceptions.KmipOperationFailure(status, reason, message) + @is_connected + def check(self, + uid=None, + usage_limits_count=None, + cryptographic_usage_mask=None, + lease_time=None): + """ + Check the constraints for a managed object. + + Args: + uid (string): The unique ID of the managed object to check. + Optional, defaults to None. + usage_limits_count (int): The number of items that can be secured + with the specified managed object. Optional, defaults to None. + cryptographic_usage_mask (list): A list of CryptographicUsageMask + enumerations specifying the operations possible with the + specified managed object. Optional, defaults to None. + lease_time (int): The number of seconds that can be leased for the + specified managed object. Optional, defaults to None. + """ + if uid is not None: + if not isinstance(uid, six.string_types): + raise TypeError("The unique identifier must be a string.") + if usage_limits_count is not None: + if not isinstance(usage_limits_count, six.integer_types): + raise TypeError("The usage limits count must be an integer.") + if cryptographic_usage_mask is not None: + if not isinstance(cryptographic_usage_mask, list) or \ + not all(isinstance( + x, + enums.CryptographicUsageMask + ) for x in cryptographic_usage_mask): + raise TypeError( + "The cryptographic usage mask must be a list of " + "CryptographicUsageMask enumerations." + ) + if lease_time is not None: + if not isinstance(lease_time, six.integer_types): + raise TypeError("The lease time must be an integer.") + + result = self.proxy.check( + uid, + usage_limits_count, + cryptographic_usage_mask, + lease_time + ) + + status = result.get('result_status') + if status == enums.ResultStatus.SUCCESS: + return result.get('unique_identifier') + else: + raise exceptions.KmipOperationFailure( + status, + result.get('result_reason'), + result.get('result_message') + ) + @is_connected def get(self, uid=None, key_wrapping_specification=None): """ diff --git a/kmip/tests/unit/pie/test_client.py b/kmip/tests/unit/pie/test_client.py index b8f999f..e3fb6fc 100644 --- a/kmip/tests/unit/pie/test_client.py +++ b/kmip/tests/unit/pie/test_client.py @@ -717,6 +717,161 @@ class TestProxyKmipClient(testtools.TestCase): KmipOperationFailure, error_msg, client.create_key_pair, *args) + @mock.patch( + 'kmip.pie.client.KMIPProxy', mock.MagicMock(spec_set=KMIPProxy) + ) + def test_check(self): + """ + Test that the client can check an object. + """ + result = { + 'unique_identifier': '1', + 'result_status': enums.ResultStatus.SUCCESS + } + + client = ProxyKmipClient() + client.open() + client.proxy.check.return_value = result + + checked_id = client.check( + uid='1', + usage_limits_count=100, + cryptographic_usage_mask=[ + enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.DECRYPT + ], + lease_time=10000 + ) + + self.assertEqual('1', checked_id) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_check_on_invalid_unique_identifier(self): + """ + Test that a TypeError exception is raised when trying to check an + object with an invalid unique identifier. + """ + kwargs = {'uid': 0} + with ProxyKmipClient() as client: + self.assertRaisesRegexp( + TypeError, + "The unique identifier must be a string.", + client.check, + **kwargs + ) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_check_on_invalid_usage_limits_count(self): + """ + Test that a TypeError exception is raised when trying to check an + object with an invalid usage limits count. + """ + kwargs = {'usage_limits_count': 'invalid'} + with ProxyKmipClient() as client: + self.assertRaisesRegexp( + TypeError, + "The usage limits count must be an integer.", + client.check, + **kwargs + ) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_check_on_invalid_cryptographic_usage_mask(self): + """ + Test that a TypeError exception is raised when trying to check an + object with an invalid cryptographic usage mask. + """ + kwargs = {'cryptographic_usage_mask': 'invalid'} + with ProxyKmipClient() as client: + self.assertRaisesRegexp( + TypeError, + "The cryptographic usage mask must be a list of " + "CryptographicUsageMask enumerations.", + client.check, + **kwargs + ) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_check_on_invalid_lease_time(self): + """ + Test that a TypeError exception is raised when trying to check an + object with an invalid lease time. + """ + kwargs = {'lease_time': 'invalid'} + with ProxyKmipClient() as client: + self.assertRaisesRegexp( + TypeError, + "The lease time must be an integer.", + client.check, + **kwargs + ) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_check_on_closed(self): + """ + Test that a ClientConnectionNotOpen exception is raised when trying + to check an object on an unopened client connection. + """ + client = ProxyKmipClient() + kwargs = { + 'uid': '1', + 'usage_limits_count': 100, + 'cryptographic_usage_mask': [ + enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.DECRYPT + ], + 'lease_time': 10000 + } + + self.assertRaises( + ClientConnectionNotOpen, + client.check, + **kwargs + ) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_check_on_operation_failure(self): + """ + Test that a KmipOperationFailure exception is raised when the + backend fails to derive a key. + """ + status = enums.ResultStatus.OPERATION_FAILED + reason = enums.ResultReason.GENERAL_FAILURE + message = "Test failure message" + + result = { + 'result_status': status, + 'result_reason': reason, + 'result_message': message + } + error_message = str(KmipOperationFailure(status, reason, message)) + + client = ProxyKmipClient() + client.open() + client.proxy.check.return_value = result + kwargs = { + 'uid': '1', + 'usage_limits_count': 100, + 'cryptographic_usage_mask': [ + enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.DECRYPT + ], + 'lease_time': 10000 + } + + self.assertRaisesRegexp( + KmipOperationFailure, + error_message, + client.check, + **kwargs + ) + @mock.patch('kmip.pie.client.KMIPProxy', mock.MagicMock(spec_set=KMIPProxy)) def test_get(self):