diff --git a/kmip/pie/api.py b/kmip/pie/api.py index d35b91e..07ed3f1 100644 --- a/kmip/pie/api.py +++ b/kmip/pie/api.py @@ -110,6 +110,25 @@ class KmipClient: """ pass + @abc.abstractmethod + def revoke(self, revocation_reason, uid, revocation_message, + compromise_occurrence_date): + """ + Revoke a managed object stored by a KMIP appliance. + + Args: + revocation_reason (RevocationReasonCode): An enumeration indicating + the revocation reason. + uid (string): The unique ID of the managed object to revoke. + Optional, defaults to None. + revocation_message (string): A message regarding the revocation. + Optional, defaults to None. + compromise_occurrence_date (int): A integer which will be converted + to the Datetime when the managed object was firstly believed to + be compromised. Optional, defaults to None. + """ + pass + @abc.abstractmethod def destroy(self, uid): """ diff --git a/kmip/pie/client.py b/kmip/pie/client.py index c36d4b2..c61ec82 100644 --- a/kmip/pie/client.py +++ b/kmip/pie/client.py @@ -17,6 +17,7 @@ import logging import six from kmip.core import enums +from kmip.core import primitives from kmip.core import objects as cobjects from kmip.core.factories import attributes @@ -513,6 +514,7 @@ class ProxyKmipClient(api.KmipClient): Args: uid (string): The unique ID of the managed object to activate. + Optional, defaults to None. Returns: None @@ -542,6 +544,66 @@ class ProxyKmipClient(api.KmipClient): message = result.result_message.value raise exceptions.KmipOperationFailure(status, reason, message) + def revoke(self, revocation_reason, uid=None, revocation_message=None, + compromise_occurrence_date=None): + """ + Revoke a managed object stored by a KMIP appliance. + + Args: + revocation_reason (RevocationReasonCode): An enumeration indicating + the revocation reason. + uid (string): The unique ID of the managed object to revoke. + Optional, defaults to None. + revocation_message (string): A message regarding the revocation. + Optional, defaults to None. + compromise_occurrence_date (int): An integer, the number of seconds + since the epoch, which will be converted to the Datetime when + the managed object was firstly believed to be compromised. + Optional, defaults to None. + + Returns: + None + + Raises: + ClientConnectionNotOpen: if the client connection is unusable + KmipOperationFailure: if the operation result is a failure + TypeError: if the input argument is invalid + """ + # Check input + if not isinstance(revocation_reason, enums.RevocationReasonCode): + raise TypeError( + "revocation_reason must be a RevocationReasonCode enumeration") + if uid is not None: + if not isinstance(uid, six.string_types): + raise TypeError("uid must be a string") + if revocation_message is not None: + if not isinstance(revocation_message, six.string_types): + raise TypeError("revocation_message must be a string") + if compromise_occurrence_date is not None: + if not isinstance(compromise_occurrence_date, six.integer_types): + raise TypeError( + "compromise_occurrence_date must be an integer") + + compromise_occurrence_date = primitives.DateTime( + compromise_occurrence_date, + enums.Tags.COMPROMISE_OCCURRENCE_DATE) + + # Verify that operations can be given at this time + if not self._is_open: + raise exceptions.ClientConnectionNotOpen() + + # revoke the managed object and handle the results + result = self.proxy.revoke(revocation_reason, uid, revocation_message, + compromise_occurrence_date) + + status = result.result_status.value + if status == enums.ResultStatus.SUCCESS: + return + else: + reason = result.result_reason.value + message = result.result_message.value + raise exceptions.KmipOperationFailure(status, reason, message) + def destroy(self, uid=None): """ Destroy a managed object stored by a KMIP appliance. diff --git a/kmip/services/kmip_client.py b/kmip/services/kmip_client.py index a9edb11..7621d87 100644 --- a/kmip/services/kmip_client.py +++ b/kmip/services/kmip_client.py @@ -351,11 +351,14 @@ class KMIPProxy(KMIP): results = self._process_batch_items(response) return results[0] - def revoke(self, uuid, reason, message=None, credential=None): - return self._revoke(unique_identifier=uuid, - revocation_code=reason, - revocation_message=message, - credential=credential) + def revoke(self, revocation_reason, uuid=None, revocation_message=None, + compromise_occurrence_date=None, credential=None): + return self._revoke( + unique_identifier=uuid, + revocation_reason=revocation_reason, + revocation_message=revocation_message, + compromise_occurrence_date=compromise_occurrence_date, + credential=credential) def destroy(self, uuid=None, credential=None): return self._destroy(unique_identifier=uuid, @@ -805,11 +808,12 @@ class KMIPProxy(KMIP): payload_unique_identifier) return result - def _revoke(self, unique_identifier=None, revocation_code=None, - revocation_message=None, credential=None): + def _revoke(self, unique_identifier=None, revocation_reason=None, + revocation_message=None, compromise_occurrence_date=None, + credential=None): operation = Operation(OperationEnum.REVOKE) - reason = objects.RevocationReason(code=revocation_code, + reason = objects.RevocationReason(code=revocation_reason, message=revocation_message) uuid = None if unique_identifier is not None: @@ -818,7 +822,7 @@ class KMIPProxy(KMIP): payload = revoke.RevokeRequestPayload( unique_identifier=uuid, revocation_reason=reason, - compromise_date=None) # TODO(tim-kelsey): sort out date handling + compromise_date=compromise_occurrence_date) batch_item = messages.RequestBatchItem(operation=operation, request_payload=payload) diff --git a/kmip/tests/unit/pie/test_api.py b/kmip/tests/unit/pie/test_api.py index 28150e4..07f49ad 100644 --- a/kmip/tests/unit/pie/test_api.py +++ b/kmip/tests/unit/pie/test_api.py @@ -50,6 +50,12 @@ class DummyKmipClient(api.KmipClient): def activate(self, uid): super(DummyKmipClient, self).activate(uid) + def revoke(self, revocation_reason, uid, revocation_message, + compromise_occurrence_date): + super(DummyKmipClient, self).revoke( + revocation_reason, uid, revocation_message, + compromise_occurrence_date) + def destroy(self, uid): super(DummyKmipClient, self).destroy(uid) @@ -127,6 +133,13 @@ class TestKmipClient(testtools.TestCase): dummy = DummyKmipClient() dummy.activate('uid') + def test_revoke(self): + """ + Test that the revoke method can be called without error. + """ + dummy = DummyKmipClient() + dummy.revoke('reason', 'uid', 'message', 'date') + def test_destroy(self): """ Test that the destroy method can be called without error. diff --git a/kmip/tests/unit/pie/test_client.py b/kmip/tests/unit/pie/test_client.py index a6a0837..b8207c6 100644 --- a/kmip/tests/unit/pie/test_client.py +++ b/kmip/tests/unit/pie/test_client.py @@ -24,6 +24,7 @@ from kmip.core import objects as obj from kmip.core.factories import attributes from kmip.core.messages import contents +from kmip.core.primitives import DateTime from kmip.services.kmip_client import KMIPProxy from kmip.services import results @@ -949,6 +950,135 @@ class TestProxyKmipClient(testtools.TestCase): self.assertRaisesRegexp( KmipOperationFailure, error_msg, client.activate, *args) + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_revoke(self): + """ + Test that the client can revoke a secret. + """ + revocation_reason = enums.RevocationReasonCode.KEY_COMPROMISE + uuid = 'aaaaaaaa-1111-2222-3333-ffffffffffff' + revocation_message = 'Key compromised!' + compromise_occurrence_date = 1 + + status = enums.ResultStatus.SUCCESS + result = results.OperationResult(contents.ResultStatus(status)) + + with ProxyKmipClient() as client: + client.proxy.revoke.return_value = result + result = client.revoke( + revocation_reason, uuid, revocation_message, + compromise_occurrence_date) + client.proxy.revoke.assert_called_with( + revocation_reason, uuid, revocation_message, + DateTime(compromise_occurrence_date, + enums.Tags.COMPROMISE_OCCURRENCE_DATE)) + self.assertEqual(None, result) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_revoke_on_invalid_inputs(self): + """ + Test that a TypeError exception is raised when trying to revoke a + secret with invalid inputs. + """ + revocation_reason = enums.RevocationReasonCode.KEY_COMPROMISE + revocation_reason_invalid = "key compromise" + + uuid = 'aaaaaaaa-1111-2222-3333-ffffffffffff' + uuid_invalid = 123 + + revocation_message = 'Key compromised!' + revocation_message_invalid = 123 + + compromise_occurrence_date = 1 + compromise_occurrence_date_invalid = '1' + + args = [revocation_reason_invalid, uuid, revocation_message, + compromise_occurrence_date] + with ProxyKmipClient() as client: + self.assertRaisesRegexp( + TypeError, + "revocation_reason must be a RevocationReasonCode enumeration", + client.revoke, + *args) + + args = [revocation_reason, uuid_invalid, revocation_message, + compromise_occurrence_date] + with ProxyKmipClient() as client: + self.assertRaisesRegexp( + TypeError, + "uid must be a string", + client.revoke, + *args) + + args = [revocation_reason, uuid, revocation_message_invalid, + compromise_occurrence_date] + with ProxyKmipClient() as client: + self.assertRaisesRegexp( + TypeError, + "revocation_message must be a string", + client.revoke, + *args) + + args = [revocation_reason, uuid, revocation_message, + compromise_occurrence_date_invalid] + with ProxyKmipClient() as client: + self.assertRaisesRegexp( + TypeError, + "compromise_occurrence_date must be an integer", + client.revoke, + *args) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_revoke_on_closed(self): + """ + Test that a ClientConnectionNotOpen exception is raised when trying + to revoke a secret on an unopened client connection. + """ + client = ProxyKmipClient() + revocation_reason = enums.RevocationReasonCode.KEY_COMPROMISE + uuid = 'aaaaaaaa-1111-2222-3333-ffffffffffff' + revocation_message = 'Key compromised!' + compromise_occurrence_date = 1 + args = [revocation_reason, uuid, revocation_message, + compromise_occurrence_date] + self.assertRaises( + ClientConnectionNotOpen, client.revoke, *args) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_revoke_on_operation_failure(self): + """ + Test that a KmipOperationFailure exception is raised when the + backend fails to revoke a secret. + """ + status = enums.ResultStatus.OPERATION_FAILED + reason = enums.ResultReason.GENERAL_FAILURE + revocation_message = "Test failure message" + + result = results.OperationResult( + contents.ResultStatus(status), + contents.ResultReason(reason), + contents.ResultMessage(revocation_message)) + error_msg = str(KmipOperationFailure(status, reason, + revocation_message)) + + client = ProxyKmipClient() + client.open() + client.proxy.revoke.return_value = result + + revocation_reason = enums.RevocationReasonCode.KEY_COMPROMISE + uuid = 'aaaaaaaa-1111-2222-3333-ffffffffffff' + revocation_message = 'Key compromised!' + compromise_occurrence_date = 1 + args = [revocation_reason, uuid, revocation_message, + compromise_occurrence_date] + + self.assertRaisesRegexp( + KmipOperationFailure, error_msg, client.revoke, *args) + @mock.patch('kmip.pie.client.KMIPProxy', mock.MagicMock(spec_set=KMIPProxy)) def test_destroy(self):