Merge pull request #269 from vbnmmnbv/revoke_pie

Add Revoke operation support for pie client
This commit is contained in:
Peter Hamilton 2017-04-21 10:26:26 -04:00 committed by GitHub
commit 2891476568
5 changed files with 237 additions and 9 deletions

View File

@ -110,6 +110,25 @@ class KmipClient:
"""
pass
@abc.abstractmethod
def revoke(self, revocation_reason, uid, revocation_message,
compromise_occurrence_date):
"""
Revoke a managed object stored by a KMIP appliance.
Args:
revocation_reason (RevocationReasonCode): An enumeration indicating
the revocation reason.
uid (string): The unique ID of the managed object to revoke.
Optional, defaults to None.
revocation_message (string): A message regarding the revocation.
Optional, defaults to None.
compromise_occurrence_date (int): A integer which will be converted
to the Datetime when the managed object was firstly believed to
be compromised. Optional, defaults to None.
"""
pass
@abc.abstractmethod
def destroy(self, uid):
"""

View File

@ -17,6 +17,7 @@ import logging
import six
from kmip.core import enums
from kmip.core import primitives
from kmip.core import objects as cobjects
from kmip.core.factories import attributes
@ -513,6 +514,7 @@ class ProxyKmipClient(api.KmipClient):
Args:
uid (string): The unique ID of the managed object to activate.
Optional, defaults to None.
Returns:
None
@ -542,6 +544,66 @@ class ProxyKmipClient(api.KmipClient):
message = result.result_message.value
raise exceptions.KmipOperationFailure(status, reason, message)
def revoke(self, revocation_reason, uid=None, revocation_message=None,
compromise_occurrence_date=None):
"""
Revoke a managed object stored by a KMIP appliance.
Args:
revocation_reason (RevocationReasonCode): An enumeration indicating
the revocation reason.
uid (string): The unique ID of the managed object to revoke.
Optional, defaults to None.
revocation_message (string): A message regarding the revocation.
Optional, defaults to None.
compromise_occurrence_date (int): An integer, the number of seconds
since the epoch, which will be converted to the Datetime when
the managed object was firstly believed to be compromised.
Optional, defaults to None.
Returns:
None
Raises:
ClientConnectionNotOpen: if the client connection is unusable
KmipOperationFailure: if the operation result is a failure
TypeError: if the input argument is invalid
"""
# Check input
if not isinstance(revocation_reason, enums.RevocationReasonCode):
raise TypeError(
"revocation_reason must be a RevocationReasonCode enumeration")
if uid is not None:
if not isinstance(uid, six.string_types):
raise TypeError("uid must be a string")
if revocation_message is not None:
if not isinstance(revocation_message, six.string_types):
raise TypeError("revocation_message must be a string")
if compromise_occurrence_date is not None:
if not isinstance(compromise_occurrence_date, six.integer_types):
raise TypeError(
"compromise_occurrence_date must be an integer")
compromise_occurrence_date = primitives.DateTime(
compromise_occurrence_date,
enums.Tags.COMPROMISE_OCCURRENCE_DATE)
# Verify that operations can be given at this time
if not self._is_open:
raise exceptions.ClientConnectionNotOpen()
# revoke the managed object and handle the results
result = self.proxy.revoke(revocation_reason, uid, revocation_message,
compromise_occurrence_date)
status = result.result_status.value
if status == enums.ResultStatus.SUCCESS:
return
else:
reason = result.result_reason.value
message = result.result_message.value
raise exceptions.KmipOperationFailure(status, reason, message)
def destroy(self, uid=None):
"""
Destroy a managed object stored by a KMIP appliance.

View File

@ -351,11 +351,14 @@ class KMIPProxy(KMIP):
results = self._process_batch_items(response)
return results[0]
def revoke(self, uuid, reason, message=None, credential=None):
return self._revoke(unique_identifier=uuid,
revocation_code=reason,
revocation_message=message,
credential=credential)
def revoke(self, revocation_reason, uuid=None, revocation_message=None,
compromise_occurrence_date=None, credential=None):
return self._revoke(
unique_identifier=uuid,
revocation_reason=revocation_reason,
revocation_message=revocation_message,
compromise_occurrence_date=compromise_occurrence_date,
credential=credential)
def destroy(self, uuid=None, credential=None):
return self._destroy(unique_identifier=uuid,
@ -805,11 +808,12 @@ class KMIPProxy(KMIP):
payload_unique_identifier)
return result
def _revoke(self, unique_identifier=None, revocation_code=None,
revocation_message=None, credential=None):
def _revoke(self, unique_identifier=None, revocation_reason=None,
revocation_message=None, compromise_occurrence_date=None,
credential=None):
operation = Operation(OperationEnum.REVOKE)
reason = objects.RevocationReason(code=revocation_code,
reason = objects.RevocationReason(code=revocation_reason,
message=revocation_message)
uuid = None
if unique_identifier is not None:
@ -818,7 +822,7 @@ class KMIPProxy(KMIP):
payload = revoke.RevokeRequestPayload(
unique_identifier=uuid,
revocation_reason=reason,
compromise_date=None) # TODO(tim-kelsey): sort out date handling
compromise_date=compromise_occurrence_date)
batch_item = messages.RequestBatchItem(operation=operation,
request_payload=payload)

View File

@ -50,6 +50,12 @@ class DummyKmipClient(api.KmipClient):
def activate(self, uid):
super(DummyKmipClient, self).activate(uid)
def revoke(self, revocation_reason, uid, revocation_message,
compromise_occurrence_date):
super(DummyKmipClient, self).revoke(
revocation_reason, uid, revocation_message,
compromise_occurrence_date)
def destroy(self, uid):
super(DummyKmipClient, self).destroy(uid)
@ -127,6 +133,13 @@ class TestKmipClient(testtools.TestCase):
dummy = DummyKmipClient()
dummy.activate('uid')
def test_revoke(self):
"""
Test that the revoke method can be called without error.
"""
dummy = DummyKmipClient()
dummy.revoke('reason', 'uid', 'message', 'date')
def test_destroy(self):
"""
Test that the destroy method can be called without error.

View File

@ -24,6 +24,7 @@ from kmip.core import objects as obj
from kmip.core.factories import attributes
from kmip.core.messages import contents
from kmip.core.primitives import DateTime
from kmip.services.kmip_client import KMIPProxy
from kmip.services import results
@ -949,6 +950,135 @@ class TestProxyKmipClient(testtools.TestCase):
self.assertRaisesRegexp(
KmipOperationFailure, error_msg, client.activate, *args)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_revoke(self):
"""
Test that the client can revoke a secret.
"""
revocation_reason = enums.RevocationReasonCode.KEY_COMPROMISE
uuid = 'aaaaaaaa-1111-2222-3333-ffffffffffff'
revocation_message = 'Key compromised!'
compromise_occurrence_date = 1
status = enums.ResultStatus.SUCCESS
result = results.OperationResult(contents.ResultStatus(status))
with ProxyKmipClient() as client:
client.proxy.revoke.return_value = result
result = client.revoke(
revocation_reason, uuid, revocation_message,
compromise_occurrence_date)
client.proxy.revoke.assert_called_with(
revocation_reason, uuid, revocation_message,
DateTime(compromise_occurrence_date,
enums.Tags.COMPROMISE_OCCURRENCE_DATE))
self.assertEqual(None, result)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_revoke_on_invalid_inputs(self):
"""
Test that a TypeError exception is raised when trying to revoke a
secret with invalid inputs.
"""
revocation_reason = enums.RevocationReasonCode.KEY_COMPROMISE
revocation_reason_invalid = "key compromise"
uuid = 'aaaaaaaa-1111-2222-3333-ffffffffffff'
uuid_invalid = 123
revocation_message = 'Key compromised!'
revocation_message_invalid = 123
compromise_occurrence_date = 1
compromise_occurrence_date_invalid = '1'
args = [revocation_reason_invalid, uuid, revocation_message,
compromise_occurrence_date]
with ProxyKmipClient() as client:
self.assertRaisesRegexp(
TypeError,
"revocation_reason must be a RevocationReasonCode enumeration",
client.revoke,
*args)
args = [revocation_reason, uuid_invalid, revocation_message,
compromise_occurrence_date]
with ProxyKmipClient() as client:
self.assertRaisesRegexp(
TypeError,
"uid must be a string",
client.revoke,
*args)
args = [revocation_reason, uuid, revocation_message_invalid,
compromise_occurrence_date]
with ProxyKmipClient() as client:
self.assertRaisesRegexp(
TypeError,
"revocation_message must be a string",
client.revoke,
*args)
args = [revocation_reason, uuid, revocation_message,
compromise_occurrence_date_invalid]
with ProxyKmipClient() as client:
self.assertRaisesRegexp(
TypeError,
"compromise_occurrence_date must be an integer",
client.revoke,
*args)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_revoke_on_closed(self):
"""
Test that a ClientConnectionNotOpen exception is raised when trying
to revoke a secret on an unopened client connection.
"""
client = ProxyKmipClient()
revocation_reason = enums.RevocationReasonCode.KEY_COMPROMISE
uuid = 'aaaaaaaa-1111-2222-3333-ffffffffffff'
revocation_message = 'Key compromised!'
compromise_occurrence_date = 1
args = [revocation_reason, uuid, revocation_message,
compromise_occurrence_date]
self.assertRaises(
ClientConnectionNotOpen, client.revoke, *args)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_revoke_on_operation_failure(self):
"""
Test that a KmipOperationFailure exception is raised when the
backend fails to revoke a secret.
"""
status = enums.ResultStatus.OPERATION_FAILED
reason = enums.ResultReason.GENERAL_FAILURE
revocation_message = "Test failure message"
result = results.OperationResult(
contents.ResultStatus(status),
contents.ResultReason(reason),
contents.ResultMessage(revocation_message))
error_msg = str(KmipOperationFailure(status, reason,
revocation_message))
client = ProxyKmipClient()
client.open()
client.proxy.revoke.return_value = result
revocation_reason = enums.RevocationReasonCode.KEY_COMPROMISE
uuid = 'aaaaaaaa-1111-2222-3333-ffffffffffff'
revocation_message = 'Key compromised!'
compromise_occurrence_date = 1
args = [revocation_reason, uuid, revocation_message,
compromise_occurrence_date]
self.assertRaisesRegexp(
KmipOperationFailure, error_msg, client.revoke, *args)
@mock.patch('kmip.pie.client.KMIPProxy',
mock.MagicMock(spec_set=KMIPProxy))
def test_destroy(self):