mirror of https://github.com/OpenKMIP/PyKMIP.git
Adding revoke operation
This commit is contained in:
parent
a7e2084385
commit
30ff7e78bf
|
@ -620,3 +620,12 @@ class ObjectGroupMember(Enum):
|
|||
class StorageStatusMask(Enum):
|
||||
ONLINE_STORAGE = 0x00000001
|
||||
ARCHIVAL_STORAGE = 0x00000002
|
||||
|
||||
class RevocationReasonCode(Enum):
|
||||
UNSPECIFIED = 0x00000001
|
||||
KEY_COMPROMISE = 0x00000002
|
||||
CA_COMPROMISE = 0x00000003
|
||||
AFFILIATION_CHANGED = 0x00000004
|
||||
SUPERSEDED = 0x00000005
|
||||
CESSATION_OF_OPERATION = 0x00000006
|
||||
PRIVILEGE_WITHDRAWN = 0x00000007
|
||||
|
|
|
@ -25,6 +25,7 @@ from kmip.core.messages.payloads import locate
|
|||
from kmip.core.messages.payloads import query
|
||||
from kmip.core.messages.payloads import rekey_key_pair
|
||||
from kmip.core.messages.payloads import register
|
||||
from kmip.core.messages.payloads import revoke
|
||||
|
||||
|
||||
class RequestPayloadFactory(PayloadFactory):
|
||||
|
@ -58,3 +59,6 @@ class RequestPayloadFactory(PayloadFactory):
|
|||
|
||||
def _create_activate_payload(self):
|
||||
return activate.ActivateRequestPayload()
|
||||
|
||||
def _create_revoke_payload(self):
|
||||
return revoke.RevokeRequestPayload()
|
||||
|
|
|
@ -25,6 +25,7 @@ from kmip.core.messages.payloads import locate
|
|||
from kmip.core.messages.payloads import query
|
||||
from kmip.core.messages.payloads import rekey_key_pair
|
||||
from kmip.core.messages.payloads import register
|
||||
from kmip.core.messages.payloads import revoke
|
||||
|
||||
|
||||
class ResponsePayloadFactory(PayloadFactory):
|
||||
|
@ -58,3 +59,6 @@ class ResponsePayloadFactory(PayloadFactory):
|
|||
|
||||
def _create_activate_payload(self):
|
||||
return activate.ActivateResponsePayload()
|
||||
|
||||
def _create_revoke_payload(self):
|
||||
return revoke.RevokeResponsePayload()
|
||||
|
|
|
@ -0,0 +1,193 @@
|
|||
# Copyright (c) 2015 Hewlett Packard Development Company, L.P.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from kmip.core import attributes
|
||||
from kmip.core import enums
|
||||
from kmip.core import objects
|
||||
from kmip.core import primitives
|
||||
|
||||
from kmip.core.primitives import Struct
|
||||
|
||||
from kmip.core.utils import BytearrayStream
|
||||
|
||||
|
||||
class RevokeRequestPayload(Struct):
|
||||
"""
|
||||
A request payload for the Revoke operation.
|
||||
|
||||
The payload contains a UUID of a cryptographic object that that server
|
||||
should revoke. See Section 4.20 of the KMIP 1.1 specification for more
|
||||
information.
|
||||
|
||||
Attributes:
|
||||
unique_identifier: The UUID of a managed cryptographic object
|
||||
revocation_reason: The reason why the object was revoked
|
||||
compromised_date: The date of compromise if the object was compromised
|
||||
"""
|
||||
|
||||
def __init__(self,
|
||||
unique_identifier=None,
|
||||
revocation_reason=None,
|
||||
compromise_date=None):
|
||||
"""
|
||||
Construct a RevokeRequestPayload object.
|
||||
Args:
|
||||
unique_identifier (UniqueIdentifier): The UUID of a managed
|
||||
cryptographic object.
|
||||
revocation_reason (RevocationReason): The reason why the object was
|
||||
revoked.
|
||||
compromise_date (DateTime): the date of compromise if the object
|
||||
was compromised.
|
||||
"""
|
||||
super(RevokeRequestPayload, self).__init__(
|
||||
tag=enums.Tags.REQUEST_PAYLOAD)
|
||||
self.unique_identifier = unique_identifier
|
||||
self.compromise_date = compromise_date
|
||||
self.revocation_reason = revocation_reason
|
||||
if self.revocation_reason is None:
|
||||
self.revocation_reason = objects.RevocationReason()
|
||||
self.validate()
|
||||
|
||||
def read(self, istream):
|
||||
"""
|
||||
Read the data encoding the RevokeRequestPayload object and decode it
|
||||
into its constituent parts.
|
||||
Args:
|
||||
istream (Stream): A data stream containing encoded object data,
|
||||
supporting a read method; usually a BytearrayStream object.
|
||||
"""
|
||||
super(RevokeRequestPayload, self).read(istream)
|
||||
tstream = BytearrayStream(istream.read(self.length))
|
||||
|
||||
self.unique_identifier = attributes.UniqueIdentifier()
|
||||
self.unique_identifier.read(tstream)
|
||||
|
||||
self.revocation_reason = objects.RevocationReason()
|
||||
self.revocation_reason.read(tstream)
|
||||
|
||||
if self.is_tag_next(enums.Tags.COMPROMISE_OCCURRENCE_DATE, tstream):
|
||||
self.compromise_date = primitives.DateTime(
|
||||
tag=enums.Tags.COMPROMISE_OCCURRENCE_DATE)
|
||||
self.compromise_date.read(tstream)
|
||||
|
||||
self.is_oversized(tstream)
|
||||
self.validate()
|
||||
|
||||
def write(self, ostream):
|
||||
"""
|
||||
Write the data encoding the RevokeRequestPayload object to a stream.
|
||||
Args:
|
||||
ostream (Stream): A data stream in which to encode object data,
|
||||
supporting a write method; usually a BytearrayStream object.
|
||||
"""
|
||||
tstream = BytearrayStream()
|
||||
|
||||
# Write the contents of the request payload
|
||||
if self.unique_identifier is not None:
|
||||
self.unique_identifier.write(tstream)
|
||||
|
||||
self.revocation_reason.write(tstream)
|
||||
|
||||
if self.compromise_date is not None:
|
||||
self.compromise_date.write(tstream)
|
||||
|
||||
# Write the length and value of the request payload
|
||||
self.length = tstream.length()
|
||||
super(RevokeRequestPayload, self).write(ostream)
|
||||
ostream.write(tstream.buffer)
|
||||
|
||||
def validate(self):
|
||||
"""
|
||||
Error check the attributes of the ActivateRequestPayload object.
|
||||
"""
|
||||
if self.unique_identifier is not None:
|
||||
if not isinstance(self.unique_identifier,
|
||||
attributes.UniqueIdentifier):
|
||||
msg = "invalid unique identifier"
|
||||
raise TypeError(msg)
|
||||
if self.compromise_date is not None:
|
||||
if not isinstance(self.compromise_date, primitives.DateTime):
|
||||
msg = "invalid compromise time"
|
||||
raise TypeError(msg)
|
||||
if not isinstance(self.revocation_reason, objects.RevocationReason):
|
||||
msg = "invalid revocation reason"
|
||||
raise TypeError(msg)
|
||||
|
||||
|
||||
class RevokeResponsePayload(Struct):
|
||||
"""
|
||||
A response payload for the Revoke operation.
|
||||
The payload contains the server response to the initial Revoke request.
|
||||
See Section 4.20 of the KMIP 1.1 specification for more information.
|
||||
Attributes:
|
||||
unique_identifier: The UUID of a managed cryptographic object.
|
||||
"""
|
||||
def __init__(self,
|
||||
unique_identifier=None):
|
||||
"""
|
||||
Construct a RevokeResponsePayload object.
|
||||
Args:
|
||||
unique_identifier (UniqueIdentifier): The UUID of a managed
|
||||
cryptographic object.
|
||||
"""
|
||||
super(RevokeResponsePayload, self).__init__(
|
||||
tag=enums.Tags.RESPONSE_PAYLOAD)
|
||||
if unique_identifier is None:
|
||||
self.unique_identifier = attributes.UniqueIdentifier()
|
||||
else:
|
||||
self.unique_identifier = unique_identifier
|
||||
self.validate()
|
||||
|
||||
def read(self, istream):
|
||||
"""
|
||||
Read the data encoding the RevokeResponsePayload object and decode it
|
||||
into its constituent parts.
|
||||
Args:
|
||||
istream (Stream): A data stream containing encoded object data,
|
||||
supporting a read method; usually a BytearrayStream object.
|
||||
"""
|
||||
super(RevokeResponsePayload, self).read(istream)
|
||||
tstream = BytearrayStream(istream.read(self.length))
|
||||
|
||||
self.unique_identifier = attributes.UniqueIdentifier()
|
||||
self.unique_identifier.read(tstream)
|
||||
|
||||
self.is_oversized(tstream)
|
||||
self.validate()
|
||||
|
||||
def write(self, ostream):
|
||||
"""
|
||||
Write the data encoding the RevokeResponsePayload object to a stream.
|
||||
Args:
|
||||
ostream (Stream): A data stream in which to encode object data,
|
||||
supporting a write method; usually a BytearrayStream object.
|
||||
"""
|
||||
tstream = BytearrayStream()
|
||||
|
||||
# Write the contents of the response payload
|
||||
self.unique_identifier.write(tstream)
|
||||
|
||||
# Write the length and value of the request payload
|
||||
self.length = tstream.length()
|
||||
super(RevokeResponsePayload, self).write(ostream)
|
||||
ostream.write(tstream.buffer)
|
||||
|
||||
def validate(self):
|
||||
"""
|
||||
Error check the attributes of the RevokeRequestPayload object.
|
||||
"""
|
||||
if not isinstance(self.unique_identifier, attributes.UniqueIdentifier):
|
||||
msg = "invalid unique identifier"
|
||||
raise TypeError(msg)
|
|
@ -25,6 +25,7 @@ from kmip.core.enums import AttributeType
|
|||
from kmip.core.enums import Tags
|
||||
from kmip.core.enums import Types
|
||||
from kmip.core.enums import CredentialType
|
||||
from kmip.core.enums import RevocationReasonCode as RevocationReasonCodeEnum
|
||||
|
||||
from kmip.core.errors import ErrorStrings
|
||||
from kmip.core.misc import KeyFormatType
|
||||
|
@ -1177,3 +1178,102 @@ class ExtensionInformation(Struct):
|
|||
extension_name=extension_name,
|
||||
extension_tag=extension_tag,
|
||||
extension_type=extension_type)
|
||||
|
||||
|
||||
# 3.31, 9.1.3.2.19
|
||||
class RevocationReasonCode(Enumeration):
|
||||
ENUM_TYPE = RevocationReasonCodeEnum
|
||||
|
||||
def __init__(self, value=RevocationReasonCodeEnum.UNSPECIFIED):
|
||||
super(RevocationReasonCode, self).__init__(
|
||||
value=value, tag=Tags.REVOCATION_REASON_CODE)
|
||||
|
||||
|
||||
# 3.31
|
||||
class RevocationReason(Struct):
|
||||
"""
|
||||
A structure describing the reason for a revocation operation.
|
||||
|
||||
See Sections 2.1.9 and 4.25 of the KMIP 1.1 specification for
|
||||
more information.
|
||||
|
||||
Attributes:
|
||||
code: The revocation reason code enumeration
|
||||
message: An optional revocation message
|
||||
"""
|
||||
|
||||
def __init__(self, code=None, message=None):
|
||||
"""
|
||||
Construct a RevocationReason object.
|
||||
|
||||
Parameters:
|
||||
code(RevocationReasonCode): revocation reason code
|
||||
message(string): An optional revocation message
|
||||
"""
|
||||
super(RevocationReason, self).__init__(tag=Tags.REVOCATION_REASON)
|
||||
if code is not None:
|
||||
self.revocation_code = RevocationReasonCode(value=code)
|
||||
else:
|
||||
self.revocation_code = RevocationReasonCode()
|
||||
|
||||
if message is not None:
|
||||
self.revocation_message = TextString(
|
||||
value=message,
|
||||
tag=Tags.REVOCATION_MESSAGE)
|
||||
else:
|
||||
self.revocation_message = None
|
||||
|
||||
self.validate()
|
||||
|
||||
def read(self, istream):
|
||||
"""
|
||||
Read the data encoding the RevocationReason object and decode it
|
||||
into its constituent parts.
|
||||
|
||||
Args:
|
||||
istream (Stream): A data stream containing encoded object data,
|
||||
supporting a read method; usually a BytearrayStream object.
|
||||
"""
|
||||
super(RevocationReason, self).read(istream)
|
||||
tstream = BytearrayStream(istream.read(self.length))
|
||||
|
||||
self.revocation_code = RevocationReasonCode()
|
||||
self.revocation_code.read(tstream)
|
||||
|
||||
if self.is_tag_next(Tags.REVOCATION_MESSAGE, tstream):
|
||||
self.revocation_message = TextString()
|
||||
self.revocation_message.read(tstream)
|
||||
|
||||
self.is_oversized(tstream)
|
||||
self.validate()
|
||||
|
||||
def write(self, ostream):
|
||||
"""
|
||||
Write the data encoding the RevocationReason object to a stream.
|
||||
|
||||
Args:
|
||||
ostream (Stream): A data stream in which to encode object data,
|
||||
supporting a write method; usually a BytearrayStream object.
|
||||
"""
|
||||
tstream = BytearrayStream()
|
||||
|
||||
self.revocation_code.write(tstream)
|
||||
if self.revocation_message is not None:
|
||||
self.revocation_message.write(tstream)
|
||||
|
||||
# Write the length and value
|
||||
self.length = tstream.length()
|
||||
super(RevocationReason, self).write(ostream)
|
||||
ostream.write(tstream.buffer)
|
||||
|
||||
def validate(self):
|
||||
"""
|
||||
validate the RevocationReason object
|
||||
"""
|
||||
if not isinstance(self.revocation_code, RevocationReasonCode):
|
||||
msg = "RevocationReaonCode expected"
|
||||
raise TypeError(msg)
|
||||
if self.revocation_message is not None:
|
||||
if not isinstance(self.revocation_message, TextString):
|
||||
msg = "TextString expect"
|
||||
raise TypeError(msg)
|
||||
|
|
|
@ -23,6 +23,7 @@ from kmip.services.results import LocateResult
|
|||
from kmip.services.results import QueryResult
|
||||
from kmip.services.results import RegisterResult
|
||||
from kmip.services.results import RekeyKeyPairResult
|
||||
from kmip.services.results import RevokeResult
|
||||
|
||||
from kmip.core import attributes as attr
|
||||
|
||||
|
@ -53,6 +54,7 @@ from kmip.core.messages.payloads import locate
|
|||
from kmip.core.messages.payloads import query
|
||||
from kmip.core.messages.payloads import rekey_key_pair
|
||||
from kmip.core.messages.payloads import register
|
||||
from kmip.core.messages.payloads import revoke
|
||||
|
||||
from kmip.services.kmip_protocol import KMIPProtocol
|
||||
|
||||
|
@ -263,6 +265,12 @@ class KMIPProxy(KMIP):
|
|||
key_wrapping_specification=key_wrapping_specification,
|
||||
credential=credential)
|
||||
|
||||
def revoke(self, uuid, reason, message=None, credential=None):
|
||||
return self._revoke(unique_identifier=uuid,
|
||||
revocation_code=reason,
|
||||
revocation_message=message,
|
||||
credential=credential)
|
||||
|
||||
def destroy(self, uuid, credential=None):
|
||||
return self._destroy(unique_identifier=uuid,
|
||||
credential=credential)
|
||||
|
@ -631,6 +639,43 @@ class KMIPProxy(KMIP):
|
|||
payload_unique_identifier)
|
||||
return result
|
||||
|
||||
def _revoke(self, unique_identifier=None, revocation_code=None,
|
||||
revocation_message=None, credential=None):
|
||||
operation = Operation(OperationEnum.REVOKE)
|
||||
|
||||
reason = objects.RevocationReason(code=revocation_code,
|
||||
message=revocation_message)
|
||||
uuid = None
|
||||
if unique_identifier is not None:
|
||||
uuid = attr.UniqueIdentifier(unique_identifier)
|
||||
|
||||
payload = revoke.RevokeRequestPayload(
|
||||
unique_identifier=uuid,
|
||||
revocation_reason=reason,
|
||||
compromise_date=None) # TODO(tim-kelsey): sort out date handling
|
||||
|
||||
batch_item = messages.RequestBatchItem(operation=operation,
|
||||
request_payload=payload)
|
||||
message = self._build_request_message(credential, [batch_item])
|
||||
self._send_message(message)
|
||||
message = messages.ResponseMessage()
|
||||
data = self._receive_message()
|
||||
message.read(data)
|
||||
batch_items = message.batch_items
|
||||
batch_item = batch_items[0]
|
||||
payload = batch_item.response_payload
|
||||
|
||||
if payload is None:
|
||||
payload_unique_identifier = None
|
||||
else:
|
||||
payload_unique_identifier = payload.unique_identifier
|
||||
|
||||
result = RevokeResult(batch_item.result_status,
|
||||
batch_item.result_reason,
|
||||
batch_item.result_message,
|
||||
payload_unique_identifier)
|
||||
return result
|
||||
|
||||
def _register(self,
|
||||
object_type=None,
|
||||
template_attribute=None,
|
||||
|
|
|
@ -249,3 +249,15 @@ class DiscoverVersionsResult(OperationResult):
|
|||
super(DiscoverVersionsResult, self).__init__(
|
||||
result_status, result_reason, result_message)
|
||||
self.protocol_versions = protocol_versions
|
||||
|
||||
|
||||
class RevokeResult(OperationResult):
|
||||
|
||||
def __init__(self,
|
||||
result_status,
|
||||
result_reason=None,
|
||||
result_message=None,
|
||||
unique_identifier=None):
|
||||
super(RevokeResult, self).__init__(
|
||||
result_status, result_reason, result_message)
|
||||
self.unique_identifier = unique_identifier
|
||||
|
|
|
@ -28,6 +28,7 @@ from kmip.core.messages.payloads import locate
|
|||
from kmip.core.messages.payloads import query
|
||||
from kmip.core.messages.payloads import rekey_key_pair
|
||||
from kmip.core.messages.payloads import register
|
||||
from kmip.core.messages.payloads import revoke
|
||||
|
||||
|
||||
class TestRequestPayloadFactory(testtools.TestCase):
|
||||
|
@ -120,8 +121,8 @@ class TestRequestPayloadFactory(testtools.TestCase):
|
|||
self._test_payload_type(payload, activate.ActivateRequestPayload)
|
||||
|
||||
def test_create_revoke_payload(self):
|
||||
self._test_not_implemented(
|
||||
self.factory.create, Operation.REVOKE)
|
||||
payload = self.factory.create(Operation.REVOKE)
|
||||
self._test_payload_type(payload, revoke.RevokeRequestPayload)
|
||||
|
||||
def test_create_destroy_payload(self):
|
||||
payload = self.factory.create(Operation.DESTROY)
|
||||
|
|
|
@ -28,6 +28,7 @@ from kmip.core.messages.payloads import locate
|
|||
from kmip.core.messages.payloads import query
|
||||
from kmip.core.messages.payloads import rekey_key_pair
|
||||
from kmip.core.messages.payloads import register
|
||||
from kmip.core.messages.payloads import revoke
|
||||
|
||||
|
||||
class TestResponsePayloadFactory(testtools.TestCase):
|
||||
|
@ -120,8 +121,8 @@ class TestResponsePayloadFactory(testtools.TestCase):
|
|||
self._test_payload_type(payload, activate.ActivateResponsePayload)
|
||||
|
||||
def test_create_revoke_payload(self):
|
||||
self._test_not_implemented(
|
||||
self.factory.create, Operation.REVOKE)
|
||||
payload = self.factory.create(Operation.REVOKE)
|
||||
self._test_payload_type(payload, revoke.RevokeResponsePayload)
|
||||
|
||||
def test_create_destroy_payload(self):
|
||||
payload = self.factory.create(Operation.DESTROY)
|
||||
|
|
|
@ -0,0 +1,222 @@
|
|||
# Copyright (c) 2015 Hewlett Packard Development Company, L.P.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from testtools import TestCase
|
||||
|
||||
from kmip.core import attributes
|
||||
from kmip.core import enums
|
||||
from kmip.core import objects
|
||||
from kmip.core import primitives
|
||||
from kmip.core import utils
|
||||
|
||||
from kmip.core.messages.payloads import revoke
|
||||
|
||||
|
||||
class TestRevokeRequestPayload(TestCase):
|
||||
"""
|
||||
Test suite for the RevokeRequestPayload class.
|
||||
|
||||
Test encodings obtained from Sections 4.2 of the KMIP 1.1 Test
|
||||
Cases documentation.
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
super(TestRevokeRequestPayload, self).setUp()
|
||||
|
||||
self.uuid = attributes.UniqueIdentifier(
|
||||
'668eff89-3010-4258-bc0e-8c402309c746')
|
||||
|
||||
self.encoding_a = utils.BytearrayStream((
|
||||
b'\x42\x00\x79\x01\x00\x00\x00\x58\x42\x00\x94\x07\x00\x00\x00\x24'
|
||||
b'\x36\x36\x38\x65\x66\x66\x38\x39\x2D\x33\x30\x31\x30\x2D\x34\x32'
|
||||
b'\x35\x38\x2D\x62\x63\x30\x65\x2D\x38\x63\x34\x30\x32\x33\x30\x39'
|
||||
b'\x63\x37\x34\x36\x00\x00\x00\x00\x42\x00\x81\x01\x00\x00\x00\x10'
|
||||
b'\x42\x00\x82\x05\x00\x00\x00\x04\x00\x00\x00\x02\x00\x00\x00\x00'
|
||||
b'\x42\x00\x21\x09\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x06'
|
||||
))
|
||||
|
||||
def tearDown(self):
|
||||
super(TestRevokeRequestPayload, self).tearDown()
|
||||
|
||||
def test_init_with_none(self):
|
||||
"""
|
||||
Test that a RevokeRequestPayload object can be constructed with no
|
||||
specified value.
|
||||
"""
|
||||
revoke.RevokeRequestPayload()
|
||||
|
||||
def test_init_with_args(self):
|
||||
"""
|
||||
Test that a RevokeRequestPayload object can be constructed with valid
|
||||
values.
|
||||
"""
|
||||
revoke.RevokeRequestPayload(unique_identifier=self.uuid)
|
||||
|
||||
def test_validate_with_bad_uuid_type(self):
|
||||
"""
|
||||
Test that a TypeError exception is raised when an invalid UUID type
|
||||
is used to construct a RevokeRequestPayload object.
|
||||
"""
|
||||
self.assertRaisesRegexp(
|
||||
TypeError, "invalid unique identifier",
|
||||
revoke.RevokeRequestPayload, "not-a-uuid")
|
||||
|
||||
def test_validate_with_bad_date_type(self):
|
||||
"""
|
||||
Test that a TypeError exception is raised when an invalid UUID type
|
||||
is used to construct a RevokeRequestPayload object.
|
||||
"""
|
||||
reason = objects.RevocationReason()
|
||||
self.assertRaisesRegexp(
|
||||
TypeError, "invalid compromise time",
|
||||
revoke.RevokeRequestPayload, self.uuid, reason, "not-a-date")
|
||||
|
||||
def test_validate_with_bad_reason_type(self):
|
||||
"""
|
||||
Test that a TypeError exception is raised when an invalid UUID type
|
||||
is used to construct a RevokeRequestPayload object.
|
||||
"""
|
||||
self.assertRaisesRegexp(
|
||||
TypeError, "invalid revocation reason",
|
||||
revoke.RevokeRequestPayload, self.uuid, "not-a-reason")
|
||||
|
||||
def test_read_with_known_uuid(self):
|
||||
"""
|
||||
Test that a RevokeRequestPayload object with known UUID can be read
|
||||
from a data stream.
|
||||
"""
|
||||
payload = revoke.RevokeRequestPayload()
|
||||
payload.read(self.encoding_a)
|
||||
expected = '668eff89-3010-4258-bc0e-8c402309c746'
|
||||
observed = payload.unique_identifier.value
|
||||
|
||||
msg = "Revoke UUID value mismatch"
|
||||
msg += "; expected {0}, received {1}".format(
|
||||
expected, observed)
|
||||
self.assertEqual(expected, observed, msg)
|
||||
|
||||
def test_write_with_known_uuid(self):
|
||||
"""
|
||||
Test that a RevokeRequestPayload object with a known UUID can be
|
||||
written to a data stream.
|
||||
"""
|
||||
reason = objects.RevocationReason(
|
||||
code=enums.RevocationReasonCode.KEY_COMPROMISE)
|
||||
date = primitives.DateTime(
|
||||
tag=enums.Tags.COMPROMISE_OCCURRENCE_DATE, value=6)
|
||||
|
||||
stream = utils.BytearrayStream()
|
||||
payload = revoke.RevokeRequestPayload(
|
||||
unique_identifier=self.uuid,
|
||||
revocation_reason=reason,
|
||||
compromise_date=date)
|
||||
payload.write(stream)
|
||||
|
||||
length_expected = len(self.encoding_a)
|
||||
length_received = len(stream)
|
||||
|
||||
msg = "encoding lengths not equal"
|
||||
msg += "; expected {0}, received {1}".format(
|
||||
length_expected, length_received)
|
||||
self.assertEqual(length_expected, length_received, msg)
|
||||
|
||||
msg = "encoding mismatch"
|
||||
msg += ";\nexpected:\n{0}\nreceived:\n{1}".format(self.encoding_a,
|
||||
stream)
|
||||
|
||||
self.assertEqual(self.encoding_a, stream, msg)
|
||||
|
||||
|
||||
class TestRevokeResponsePayload(TestCase):
|
||||
"""
|
||||
Test encodings obtained from Sections 4.2 of the KMIP 1.1 Test
|
||||
Cases documentation.
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
super(TestRevokeResponsePayload, self).setUp()
|
||||
|
||||
self.uuid = attributes.UniqueIdentifier(
|
||||
'668eff89-3010-4258-bc0e-8c402309c746')
|
||||
|
||||
self.encoding_a = utils.BytearrayStream((
|
||||
b'\x42\x00\x7C\x01\x00\x00\x00\x30\x42\x00\x94\x07\x00\x00\x00\x24'
|
||||
b'\x36\x36\x38\x65\x66\x66\x38\x39\x2D\x33\x30\x31\x30\x2D\x34\x32'
|
||||
b'\x35\x38\x2D\x62\x63\x30\x65\x2D\x38\x63\x34\x30\x32\x33\x30\x39'
|
||||
b'\x63\x37\x34\x36\x00\x00\x00\x00'))
|
||||
|
||||
def tearDown(self):
|
||||
super(TestRevokeResponsePayload, self).tearDown()
|
||||
|
||||
def test_init_with_none(self):
|
||||
"""
|
||||
Test that a RevokeResponsePayload object can be constructed with no
|
||||
specified value.
|
||||
"""
|
||||
revoke.RevokeResponsePayload()
|
||||
|
||||
def test_init_with_args(self):
|
||||
"""
|
||||
Test that a RevokeResponsePayload object can be constructed with
|
||||
valid values.
|
||||
"""
|
||||
revoke.RevokeResponsePayload(unique_identifier=self.uuid)
|
||||
|
||||
def test_validate_with_invalid_uuid(self):
|
||||
"""
|
||||
Test that a TypeError exception is raised when an invalid Operations
|
||||
list is used to construct a RevokeResponsePayload object.
|
||||
"""
|
||||
self.assertRaisesRegexp(
|
||||
TypeError, "invalid unique identifier",
|
||||
revoke.RevokeResponsePayload, "not-a-uuid")
|
||||
|
||||
def test_read_with_known_uuid(self):
|
||||
"""
|
||||
Test that a RevokeResponsePayload object with known UUID can be read
|
||||
from a data stream.
|
||||
"""
|
||||
payload = revoke.RevokeResponsePayload()
|
||||
payload.read(self.encoding_a)
|
||||
expected = '668eff89-3010-4258-bc0e-8c402309c746'
|
||||
observed = payload.unique_identifier.value
|
||||
|
||||
msg = "Revoke UUID value mismatch"
|
||||
msg += "; expected {0}, received {1}".format(
|
||||
expected, observed)
|
||||
self.assertEqual(expected, observed, msg)
|
||||
|
||||
def test_write_with_known_uuid(self):
|
||||
"""
|
||||
Test that a RevokeResponsePayload object with a known UUID can be
|
||||
written to a data stream.
|
||||
"""
|
||||
stream = utils.BytearrayStream()
|
||||
payload = revoke.RevokeResponsePayload(self.uuid)
|
||||
payload.write(stream)
|
||||
|
||||
length_expected = len(self.encoding_a)
|
||||
length_received = len(stream)
|
||||
|
||||
msg = "encoding lengths not equal"
|
||||
msg += "; expected {0}, received {1}".format(
|
||||
length_expected, length_received)
|
||||
self.assertEqual(length_expected, length_received, msg)
|
||||
|
||||
msg = "encoding mismatch"
|
||||
msg += ";\nexpected:\n{0}\nreceived:\n{1}".format(self.encoding_a,
|
||||
stream)
|
||||
|
||||
self.assertEqual(self.encoding_a, stream, msg)
|
Loading…
Reference in New Issue