diff --git a/kmip/core/factories/payloads/request.py b/kmip/core/factories/payloads/request.py index a00e473..b437c27 100644 --- a/kmip/core/factories/payloads/request.py +++ b/kmip/core/factories/payloads/request.py @@ -20,6 +20,7 @@ from kmip.core.messages.payloads import create_key_pair from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import get from kmip.core.messages.payloads import locate +from kmip.core.messages.payloads import rekey_key_pair from kmip.core.messages.payloads import register @@ -34,6 +35,9 @@ class RequestPayloadFactory(PayloadFactory): def _create_register_payload(self): return register.RegisterRequestPayload() + def _create_rekey_key_pair_payload(self): + return rekey_key_pair.RekeyKeyPairRequestPayload() + def _create_locate_payload(self): return locate.LocateRequestPayload() diff --git a/kmip/core/factories/payloads/response.py b/kmip/core/factories/payloads/response.py index 7d86908..ee40587 100644 --- a/kmip/core/factories/payloads/response.py +++ b/kmip/core/factories/payloads/response.py @@ -20,6 +20,7 @@ from kmip.core.messages.payloads import create_key_pair from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import get from kmip.core.messages.payloads import locate +from kmip.core.messages.payloads import rekey_key_pair from kmip.core.messages.payloads import register @@ -34,6 +35,9 @@ class ResponsePayloadFactory(PayloadFactory): def _create_register_payload(self): return register.RegisterResponsePayload() + def _create_rekey_key_pair_payload(self): + return rekey_key_pair.RekeyKeyPairResponsePayload() + def _create_locate_payload(self): return locate.LocateResponsePayload() diff --git a/kmip/core/messages/payloads/create_key_pair.py b/kmip/core/messages/payloads/create_key_pair.py index d03c9f8..0b352ed 100644 --- a/kmip/core/messages/payloads/create_key_pair.py +++ b/kmip/core/messages/payloads/create_key_pair.py @@ -29,7 +29,7 @@ class CreateKeyPairRequestPayload(Struct): common_template_attribute=None, private_key_template_attribute=None, public_key_template_attribute=None): - super(self.__class__, self).__init__(Tags.REQUEST_PAYLOAD) + super(CreateKeyPairRequestPayload, self).__init__(Tags.REQUEST_PAYLOAD) self.common_template_attribute = common_template_attribute self.private_key_template_attribute = private_key_template_attribute @@ -38,7 +38,7 @@ class CreateKeyPairRequestPayload(Struct): self.validate() def read(self, istream): - super(self.__class__, self).read(istream) + super(CreateKeyPairRequestPayload, self).read(istream) tstream = BytearrayStream(istream.read(self.length)) if self.is_tag_next(Tags.COMMON_TEMPLATE_ATTRIBUTE, tstream): @@ -71,7 +71,7 @@ class CreateKeyPairRequestPayload(Struct): self.public_key_template_attribute.write(tstream) self.length = tstream.length() - super(self.__class__, self).write(ostream) + super(CreateKeyPairRequestPayload, self).write(ostream) ostream.write(tstream.buffer) def validate(self): @@ -113,8 +113,10 @@ class CreateKeyPairResponsePayload(Struct): public_key_uuid=None, private_key_template_attribute=None, public_key_template_attribute=None): - super(self.__class__, self).__init__(Tags.RESPONSE_PAYLOAD) + super(CreateKeyPairResponsePayload, self).__init__( + Tags.RESPONSE_PAYLOAD) + # Private and public UUIDs are required so make defaults as backup if private_key_uuid is None: self.private_key_uuid = attributes.PrivateKeyUniqueIdentifier('') else: @@ -131,7 +133,7 @@ class CreateKeyPairResponsePayload(Struct): self.validate() def read(self, istream): - super(self.__class__, self).read(istream) + super(CreateKeyPairResponsePayload, self).read(istream) tstream = BytearrayStream(istream.read(self.length)) self.private_key_uuid.read(tstream) @@ -163,7 +165,7 @@ class CreateKeyPairResponsePayload(Struct): self.public_key_template_attribute.write(tstream) self.length = tstream.length() - super(self.__class__, self).write(ostream) + super(CreateKeyPairResponsePayload, self).write(ostream) ostream.write(tstream.buffer) def validate(self): diff --git a/kmip/core/messages/payloads/rekey_key_pair.py b/kmip/core/messages/payloads/rekey_key_pair.py new file mode 100644 index 0000000..a9fcf7a --- /dev/null +++ b/kmip/core/messages/payloads/rekey_key_pair.py @@ -0,0 +1,153 @@ +# Copyright (c) 2014 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from kmip.core import attributes +from kmip.core import misc +from kmip.core import objects + +from kmip.core.enums import Tags +from kmip.core.messages.payloads.create_key_pair import \ + CreateKeyPairResponsePayload +from kmip.core.primitives import Struct +from kmip.core.utils import BytearrayStream + + +class RekeyKeyPairRequestPayload(Struct): + + def __init__(self, + private_key_uuid=None, + offset=None, + common_template_attribute=None, + private_key_template_attribute=None, + public_key_template_attribute=None): + super(RekeyKeyPairRequestPayload, self).__init__(Tags.REQUEST_PAYLOAD) + + self.private_key_uuid = private_key_uuid + self.offset = offset + self.common_template_attribute = common_template_attribute + self.private_key_template_attribute = private_key_template_attribute + self.public_key_template_attribute = public_key_template_attribute + + self.validate() + + def read(self, istream): + super(RekeyKeyPairRequestPayload, self).read(istream) + tstream = BytearrayStream(istream.read(self.length)) + + if self.is_tag_next(Tags.PRIVATE_KEY_UNIQUE_IDENTIFIER, tstream): + self.private_key_uuid = attributes.PrivateKeyUniqueIdentifier() + self.private_key_uuid.read(tstream) + + if self.is_tag_next(Tags.OFFSET, tstream): + self.offset = misc.Offset() + self.offset.read(tstream) + + if self.is_tag_next(Tags.COMMON_TEMPLATE_ATTRIBUTE, tstream): + self.common_template_attribute = objects.CommonTemplateAttribute() + self.common_template_attribute.read(tstream) + + if self.is_tag_next(Tags.PRIVATE_KEY_TEMPLATE_ATTRIBUTE, tstream): + self.private_key_template_attribute = \ + objects.PrivateKeyTemplateAttribute() + self.private_key_template_attribute.read(tstream) + + if self.is_tag_next(Tags.PUBLIC_KEY_TEMPLATE_ATTRIBUTE, tstream): + self.public_key_template_attribute = \ + objects.PublicKeyTemplateAttribute() + self.public_key_template_attribute.read(tstream) + + self.is_oversized(tstream) + self.validate() + + def write(self, ostream): + tstream = BytearrayStream() + + if self.private_key_uuid is not None: + self.private_key_uuid.write(tstream) + + if self.offset is not None: + self.offset.write(tstream) + + if self.common_template_attribute is not None: + self.common_template_attribute.write(tstream) + + if self.private_key_template_attribute is not None: + self.private_key_template_attribute.write(tstream) + + if self.public_key_template_attribute is not None: + self.public_key_template_attribute.write(tstream) + + self.length = tstream.length() + super(RekeyKeyPairRequestPayload, self).write(ostream) + ostream.write(tstream.buffer) + + def validate(self): + self.__validate() + + def __validate(self): + if self.private_key_uuid is not None: + if not isinstance(self.private_key_uuid, + attributes.PrivateKeyUniqueIdentifier): + msg = "invalid private key unique identifier" + msg += "; expected {0}, received {1}".format( + attributes.PrivateKeyUniqueIdentifier, + self.private_key_uuid) + raise TypeError(msg) + + if self.offset is not None: + if not isinstance(self.offset, misc.Offset): + msg = "invalid offset" + msg += "; expected {0}, received {1}".format( + misc.Offset, self.offset) + raise TypeError(msg) + + if self.common_template_attribute is not None: + if not isinstance(self.common_template_attribute, + objects.CommonTemplateAttribute): + msg = "invalid common template attribute" + msg += "; expected {0}, received {1}".format( + objects.CommonTemplateAttribute, + self.common_template_attribute) + raise TypeError(msg) + + if self.private_key_template_attribute is not None: + if not isinstance(self.private_key_template_attribute, + objects.PrivateKeyTemplateAttribute): + msg = "invalid private key template attribute" + msg += "; expected {0}, received {1}".format( + objects.PrivateKeyTemplateAttribute, + self.private_key_template_attribute) + raise TypeError(msg) + + if self.public_key_template_attribute is not None: + if not isinstance(self.public_key_template_attribute, + objects.PublicKeyTemplateAttribute): + msg = "invalid public key template attribute" + msg += "; expected {0}, received {1}".format( + objects.PublicKeyTemplateAttribute, + self.public_key_template_attribute) + raise TypeError(msg) + + +class RekeyKeyPairResponsePayload(CreateKeyPairResponsePayload): + + def __init__(self, + private_key_uuid=None, + public_key_uuid=None, + private_key_template_attribute=None, + public_key_template_attribute=None): + super(RekeyKeyPairResponsePayload, self).__init__( + private_key_uuid, public_key_uuid, private_key_template_attribute, + public_key_template_attribute) diff --git a/kmip/core/misc.py b/kmip/core/misc.py new file mode 100644 index 0000000..3014864 --- /dev/null +++ b/kmip/core/misc.py @@ -0,0 +1,23 @@ +# Copyright (c) 2014 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from kmip.core import enums +from kmip.core import primitives + + +class Offset(primitives.Interval): + + def __init__(self, value=None, tag=enums.Tags.OFFSET): + super(Offset, self).__init__(value, tag) diff --git a/kmip/core/primitives.py b/kmip/core/primitives.py index 5901155..e7a1094 100644 --- a/kmip/core/primitives.py +++ b/kmip/core/primitives.py @@ -175,6 +175,9 @@ class Integer(Base): super(Integer, self).__init__(tag, type=Types.INTEGER) self.value = value + if self.value is None: + self.value = 0 + self.length = self.LENGTH self.padding_length = self.LENGTH @@ -223,6 +226,15 @@ class Integer(Base): def __repr__(self): return '' % (self.value) + def __eq__(self, other): + if isinstance(other, Integer): + return self.value == other.value + else: + return NotImplemented + + def __ne__(self, other): + return not self.__eq__(other) + class LongInteger(Base): LENGTH = 8 diff --git a/kmip/core/server.py b/kmip/core/server.py index 1b92c72..538f1d6 100644 --- a/kmip/core/server.py +++ b/kmip/core/server.py @@ -67,6 +67,12 @@ class KMIP(object): credential=None): raise NotImplementedError() + def rekey_key_pair(self, private_key_unique_identifier, + offset, common_template_attribute, + private_key_template_attribute, + public_key_template_attribute): + raise NotImplementedError() + def get(self, uuid=None, key_format_type=None, key_compression_type=None, key_wrapping_specification=None, credential=None): raise NotImplementedError() @@ -192,6 +198,12 @@ class KMIPImpl(KMIP): uuid=UniqueIdentifier(s_uuid), template_attribute=template_attribute) + def rekey_key_pair(self, private_key_unique_identifier, + offset, common_template_attribute, + private_key_template_attribute, + public_key_template_attribute): + raise NotImplementedError() + def get(self, uuid=None, key_format_type=None, diff --git a/kmip/services/kmip_client.py b/kmip/services/kmip_client.py index 4283169..1d55b8c 100644 --- a/kmip/services/kmip_client.py +++ b/kmip/services/kmip_client.py @@ -15,10 +15,11 @@ from kmip.services.results import CreateResult from kmip.services.results import CreateKeyPairResult -from kmip.services.results import GetResult from kmip.services.results import DestroyResult -from kmip.services.results import RegisterResult +from kmip.services.results import GetResult from kmip.services.results import LocateResult +from kmip.services.results import RegisterResult +from kmip.services.results import RekeyKeyPairResult from kmip.core import attributes as attr @@ -32,17 +33,18 @@ from kmip.core.server import KMIP from kmip.core.messages.contents import Authentication from kmip.core.messages.contents import BatchCount -from kmip.core.messages.contents import ProtocolVersion from kmip.core.messages.contents import Operation +from kmip.core.messages.contents import ProtocolVersion from kmip.core.messages import messages from kmip.core.messages.payloads import create from kmip.core.messages.payloads import create_key_pair -from kmip.core.messages.payloads import get -from kmip.core.messages.payloads import register -from kmip.core.messages.payloads import locate from kmip.core.messages.payloads import destroy +from kmip.core.messages.payloads import get +from kmip.core.messages.payloads import locate +from kmip.core.messages.payloads import rekey_key_pair +from kmip.core.messages.payloads import register from kmip.services.kmip_protocol import KMIPProtocol @@ -134,6 +136,22 @@ class KMIPProxy(KMIP): secret=secret, credential=credential) + def rekey_key_pair(self, batch=False, private_key_uuid=None, offset=None, + common_template_attribute=None, + private_key_template_attribute=None, + public_key_template_attribute=None, credential=None): + batch_item = self._build_rekey_key_pair_batch_item( + private_key_uuid, offset, common_template_attribute, + private_key_template_attribute, public_key_template_attribute) + + if batch: + self.batch_items.append(batch_item) + else: + request = self._build_request_message(credential, [batch_item]) + response = self._send_and_receive_message(request) + results = self._process_batch_items(response) + return results[0] + def locate(self, maximum_items=None, storage_status_mask=None, object_group_member=None, attributes=None, credential=None): return self._locate(maximum_items=maximum_items, @@ -194,6 +212,21 @@ class KMIPProxy(KMIP): operation=operation, request_payload=payload) return batch_item + def _build_rekey_key_pair_batch_item(self, + private_key_uuid=None, offset=None, + common_template_attribute=None, + private_key_template_attribute=None, + public_key_template_attribute=None): + operation = Operation(OperationEnum.REKEY_KEY_PAIR) + payload = rekey_key_pair.RekeyKeyPairRequestPayload( + private_key_uuid, offset, + common_template_attribute=common_template_attribute, + private_key_template_attribute=private_key_template_attribute, + public_key_template_attribute=public_key_template_attribute) + batch_item = messages.RequestBatchItem( + operation=operation, request_payload=payload) + return batch_item + def _process_batch_items(self, response): results = [] for batch_item in response.batch_items: @@ -206,10 +239,13 @@ class KMIPProxy(KMIP): def _get_batch_item_processor(self, operation): if operation == OperationEnum.CREATE_KEY_PAIR: return self._process_create_key_pair_batch_item + elif operation == OperationEnum.REKEY_KEY_PAIR: + return self._process_rekey_key_pair_batch_item else: - raise ValueError("no processor for given operation") + raise ValueError("no processor for operation: {0}".format( + operation)) - def _process_create_key_pair_batch_item(self, batch_item): + def _process_key_pair_batch_item(self, batch_item, result): payload = batch_item.response_payload payload_private_key_uuid = None @@ -225,14 +261,19 @@ class KMIPProxy(KMIP): payload_public_key_template_attribute = \ payload.public_key_template_attribute - result = CreateKeyPairResult(batch_item.result_status, - batch_item.result_reason, - batch_item.result_message, - payload_private_key_uuid, - payload_public_key_uuid, - payload_private_key_template_attribute, - payload_public_key_template_attribute) - return result + return result(batch_item.result_status, batch_item.result_reason, + batch_item.result_message, payload_private_key_uuid, + payload_public_key_uuid, + payload_private_key_template_attribute, + payload_public_key_template_attribute) + + def _process_create_key_pair_batch_item(self, batch_item): + return self._process_key_pair_batch_item( + batch_item, CreateKeyPairResult) + + def _process_rekey_key_pair_batch_item(self, batch_item): + return self._process_key_pair_batch_item( + batch_item, RekeyKeyPairResult) def _get(self, unique_identifier=None, diff --git a/kmip/services/results.py b/kmip/services/results.py index a0a9a76..3a2ee7e 100644 --- a/kmip/services/results.py +++ b/kmip/services/results.py @@ -99,6 +99,22 @@ class RegisterResult(OperationResult): self.template_attribute = None +class RekeyKeyPairResult(CreateKeyPairResult): + + def __init__(self, + result_status, + result_reason=None, + result_message=None, + private_key_uuid=None, + public_key_uuid=None, + private_key_template_attribute=None, + public_key_template_attribute=None): + super(RekeyKeyPairResult, self).__init__( + result_status, result_reason, result_message, private_key_uuid, + public_key_uuid, private_key_template_attribute, + public_key_template_attribute) + + class GetResult(OperationResult): def __init__(self, diff --git a/kmip/tests/core/factories/payloads/test_request.py b/kmip/tests/core/factories/payloads/test_request.py index 05b002c..30428d6 100644 --- a/kmip/tests/core/factories/payloads/test_request.py +++ b/kmip/tests/core/factories/payloads/test_request.py @@ -23,6 +23,7 @@ from kmip.core.messages.payloads import create_key_pair from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import get from kmip.core.messages.payloads import locate +from kmip.core.messages.payloads import rekey_key_pair from kmip.core.messages.payloads import register @@ -156,8 +157,9 @@ class TestRequestPayloadFactory(testtools.TestCase): self.factory.create, Operation.PUT) def test_create_rekey_key_pair_payload(self): - self._test_not_implemented( - self.factory.create, Operation.REKEY_KEY_PAIR) + payload = self.factory.create(Operation.REKEY_KEY_PAIR) + self._test_payload_type( + payload, rekey_key_pair.RekeyKeyPairRequestPayload) def test_create_discover_versions_payload(self): self._test_not_implemented( diff --git a/kmip/tests/core/factories/payloads/test_response.py b/kmip/tests/core/factories/payloads/test_response.py index cf33641..605d0dd 100644 --- a/kmip/tests/core/factories/payloads/test_response.py +++ b/kmip/tests/core/factories/payloads/test_response.py @@ -23,6 +23,7 @@ from kmip.core.messages.payloads import create_key_pair from kmip.core.messages.payloads import destroy from kmip.core.messages.payloads import get from kmip.core.messages.payloads import locate +from kmip.core.messages.payloads import rekey_key_pair from kmip.core.messages.payloads import register @@ -156,8 +157,9 @@ class TestResponsePayloadFactory(testtools.TestCase): self.factory.create, Operation.PUT) def test_create_rekey_key_pair_payload(self): - self._test_not_implemented( - self.factory.create, Operation.REKEY_KEY_PAIR) + payload = self.factory.create(Operation.REKEY_KEY_PAIR) + self._test_payload_type( + payload, rekey_key_pair.RekeyKeyPairResponsePayload) def test_create_discover_versions_payload(self): self._test_not_implemented( diff --git a/kmip/tests/core/messages/payloads/test_create_key_pair.py b/kmip/tests/core/messages/payloads/test_create_key_pair.py index 4269007..2149f1f 100644 --- a/kmip/tests/core/messages/payloads/test_create_key_pair.py +++ b/kmip/tests/core/messages/payloads/test_create_key_pair.py @@ -192,7 +192,7 @@ class TestCreateKeyPairResponsePayload(TestCase): def test_validate_with_invalid_private_key_unique_identifier(self): kwargs = {'private_key_uuid': 'invalid', - 'public_key_uuid': self.public_key_uuid, + 'public_key_uuid': None, 'private_key_template_attribute': None, 'public_key_template_attribute': None} self.assertRaisesRegexp( @@ -200,7 +200,7 @@ class TestCreateKeyPairResponsePayload(TestCase): create_key_pair.CreateKeyPairResponsePayload, **kwargs) def test_validate_with_invalid_public_key_unique_identifier(self): - kwargs = {'private_key_uuid': self.private_key_uuid, + kwargs = {'private_key_uuid': None, 'public_key_uuid': 'invalid', 'private_key_template_attribute': None, 'public_key_template_attribute': None} diff --git a/kmip/tests/core/messages/payloads/test_rekey_key_pair.py b/kmip/tests/core/messages/payloads/test_rekey_key_pair.py new file mode 100644 index 0000000..ce3dd77 --- /dev/null +++ b/kmip/tests/core/messages/payloads/test_rekey_key_pair.py @@ -0,0 +1,346 @@ +# Copyright (c) 2014 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from testtools import TestCase + +from kmip.core import attributes +from kmip.core import misc +from kmip.core import objects +from kmip.core import utils + +from kmip.core.messages.payloads import rekey_key_pair + + +class TestRekeyKeyPairRequestPayload(TestCase): + + def setUp(self): + super(TestRekeyKeyPairRequestPayload, self).setUp() + + self.uuid = '00000000-0000-0000-0000-000000000000' + self.private_key_uuid = attributes.PrivateKeyUniqueIdentifier( + self.uuid) + self.offset = misc.Offset(0) + self.common_template_attribute = objects.CommonTemplateAttribute() + self.private_key_template_attribute = \ + objects.PrivateKeyTemplateAttribute() + self.public_key_template_attribute = \ + objects.PublicKeyTemplateAttribute() + + self.encoding_empty = utils.BytearrayStream(( + b'\x42\x00\x79\x01\x00\x00\x00\x00')) + self.encoding_full = utils.BytearrayStream(( + b'\x42\x00\x79\x01\x00\x00\x00\x58\x42\x00\x66\x07\x00\x00\x00\x24' + b'\x30\x30\x30\x30\x30\x30\x30\x30\x2d\x30\x30\x30\x30\x2d\x30\x30' + b'\x30\x30\x2d\x30\x30\x30\x30\x2d\x30\x30\x30\x30\x30\x30\x30\x30' + b'\x30\x30\x30\x30\x00\x00\x00\x00\x42\x00\x58\x0A\x00\x00\x00\x04' + b'\x00\x00\x00\x00\x00\x00\x00\x00\x42\x00\x1F\x01\x00\x00\x00\x00' + b'\x42\x00\x65\x01\x00\x00\x00\x00\x42\x00\x6E\x01\x00\x00\x00\x00' + )) + + def tearDown(self): + super(TestRekeyKeyPairRequestPayload, self).tearDown() + + def test_init_with_none(self): + rekey_key_pair.RekeyKeyPairRequestPayload() + + def test_init_with_args(self): + rekey_key_pair.RekeyKeyPairRequestPayload( + self.private_key_uuid, self.offset, self.common_template_attribute, + self.private_key_template_attribute, + self.public_key_template_attribute) + + def test_validate_with_invalid_private_key_unique_identifier(self): + kwargs = {'private_key_uuid': 'invalid', 'offset': None, + 'common_template_attribute': None, + 'private_key_template_attribute': None, + 'public_key_template_attribute': None} + self.assertRaisesRegexp( + TypeError, "invalid private key unique identifier", + rekey_key_pair.RekeyKeyPairRequestPayload, **kwargs) + + def test_validate_with_invalid_offset(self): + kwargs = {'private_key_uuid': None, 'offset': 'invalid', + 'common_template_attribute': None, + 'private_key_template_attribute': None, + 'public_key_template_attribute': None} + self.assertRaisesRegexp( + TypeError, "invalid offset", + rekey_key_pair.RekeyKeyPairRequestPayload, **kwargs) + + def test_validate_with_invalid_common_template_attribute(self): + kwargs = {'private_key_uuid': None, 'offset': None, + 'common_template_attribute': 'invalid', + 'private_key_template_attribute': None, + 'public_key_template_attribute': None} + self.assertRaisesRegexp( + TypeError, "invalid common template attribute", + rekey_key_pair.RekeyKeyPairRequestPayload, **kwargs) + + def test_validate_with_invalid_private_key_template_attribute(self): + kwargs = {'private_key_uuid': None, 'offset': None, + 'common_template_attribute': None, + 'private_key_template_attribute': 'invalid', + 'public_key_template_attribute': None} + self.assertRaisesRegexp( + TypeError, "invalid private key template attribute", + rekey_key_pair.RekeyKeyPairRequestPayload, **kwargs) + + def test_validate_with_invalid_public_key_template_attribute(self): + kwargs = {'private_key_uuid': None, 'offset': None, + 'common_template_attribute': None, + 'private_key_template_attribute': None, + 'public_key_template_attribute': 'invalid'} + self.assertRaisesRegexp( + TypeError, "invalid public key template attribute", + rekey_key_pair.RekeyKeyPairRequestPayload, **kwargs) + + def _test_read(self, stream, payload, private_key_uuid, offset, + common_template_attribute, private_key_template_attribute, + public_key_template_attribute): + payload.read(stream) + + msg = "private_key_uuid decoding mismatch" + msg += "; expected {0}, received {1}".format( + private_key_uuid, payload.private_key_uuid) + self.assertEqual(private_key_uuid, payload.private_key_uuid, msg) + + msg = "offset decoding mismatch" + msg += "; expected {0}, received {1}".format(offset, payload.offset) + self.assertEqual(offset, payload.offset, msg) + + msg = "common_template_attribute decoding mismatch" + msg += "; expected {0}, received {1}".format( + common_template_attribute, payload.common_template_attribute) + self.assertEqual(common_template_attribute, + payload.common_template_attribute, msg) + + msg = "private_key_template_attribute decoding mismatch" + msg += "; expected {0}, received {1}".format( + private_key_template_attribute, + payload.private_key_template_attribute) + self.assertEqual(private_key_template_attribute, + payload.private_key_template_attribute, msg) + + msg = "public_key_template_attribute decoding mismatch" + msg += "; expected {0}, received {1}".format( + public_key_template_attribute, + payload.public_key_template_attribute) + self.assertEqual(public_key_template_attribute, + payload.public_key_template_attribute, msg) + + def test_read_with_none(self): + stream = self.encoding_empty + payload = rekey_key_pair.RekeyKeyPairRequestPayload() + + self._test_read(stream, payload, None, None, None, None, None) + + def test_read_with_args(self): + stream = self.encoding_full + payload = rekey_key_pair.RekeyKeyPairRequestPayload() + + self._test_read(stream, payload, self.private_key_uuid, self.offset, + self.common_template_attribute, + self.private_key_template_attribute, + self.public_key_template_attribute) + + def _test_write(self, stream, payload, expected): + payload.write(stream) + + length_expected = len(expected) + length_received = len(stream) + + msg = "encoding lengths not equal" + msg += "; expected {0}, received {1}".format( + length_expected, length_received) + self.assertEqual(length_expected, length_received, msg) + + msg = "encoding mismatch" + msg += ";\nexpected:\n{0}\nreceived:\n{1}".format(expected, stream) + + self.assertEqual(expected, stream, msg) + + def test_write_with_none(self): + stream = utils.BytearrayStream() + payload = rekey_key_pair.RekeyKeyPairRequestPayload() + + self._test_write(stream, payload, self.encoding_empty) + + def test_write_with_args(self): + stream = utils.BytearrayStream() + payload = rekey_key_pair.RekeyKeyPairRequestPayload( + self.private_key_uuid, self.offset, self.common_template_attribute, + self.private_key_template_attribute, + self.public_key_template_attribute) + + self._test_write(stream, payload, self.encoding_full) + + +class TestRekeyKeyPairResponsePayload(TestCase): + + def setUp(self): + super(TestRekeyKeyPairResponsePayload, self).setUp() + + self.uuid = '00000000-0000-0000-0000-000000000000' + self.private_key_uuid = attributes.PrivateKeyUniqueIdentifier( + self.uuid) + self.public_key_uuid = attributes.PublicKeyUniqueIdentifier( + self.uuid) + self.empty_private_key_uuid = attributes.PrivateKeyUniqueIdentifier('') + self.empty_public_key_uuid = attributes.PublicKeyUniqueIdentifier('') + + self.private_key_template_attribute = \ + objects.PrivateKeyTemplateAttribute() + self.public_key_template_attribute = \ + objects.PublicKeyTemplateAttribute() + + self.encoding_empty = utils.BytearrayStream(( + b'\x42\x00\x7C\x01\x00\x00\x00\x10\x42\x00\x66\x07\x00\x00\x00\x00' + b'\x42\x00\x6F\x07\x00\x00\x00\x00')) + self.encoding_full = utils.BytearrayStream(( + b'\x42\x00\x7C\x01\x00\x00\x00\x70\x42\x00\x66\x07\x00\x00\x00\x24' + b'\x30\x30\x30\x30\x30\x30\x30\x30\x2d\x30\x30\x30\x30\x2d\x30\x30' + b'\x30\x30\x2d\x30\x30\x30\x30\x2d\x30\x30\x30\x30\x30\x30\x30\x30' + b'\x30\x30\x30\x30\x00\x00\x00\x00\x42\x00\x6F\x07\x00\x00\x00\x24' + b'\x30\x30\x30\x30\x30\x30\x30\x30\x2d\x30\x30\x30\x30\x2d\x30\x30' + b'\x30\x30\x2d\x30\x30\x30\x30\x2d\x30\x30\x30\x30\x30\x30\x30\x30' + b'\x30\x30\x30\x30\x00\x00\x00\x00\x42\x00\x65\x01\x00\x00\x00\x00' + b'\x42\x00\x6E\x01\x00\x00\x00\x00')) + + def tearDown(self): + super(TestRekeyKeyPairResponsePayload, self).tearDown() + + def test_init_with_none(self): + rekey_key_pair.RekeyKeyPairResponsePayload() + + def test_init_with_args(self): + rekey_key_pair.RekeyKeyPairResponsePayload( + self.private_key_uuid, self.public_key_uuid, + self.private_key_template_attribute, + self.public_key_template_attribute) + + def test_validate_with_invalid_private_key_unique_identifier(self): + kwargs = {'private_key_uuid': 'invalid', + 'public_key_uuid': None, + 'private_key_template_attribute': None, + 'public_key_template_attribute': None} + self.assertRaisesRegexp( + TypeError, "invalid private key unique identifier", + rekey_key_pair.RekeyKeyPairResponsePayload, **kwargs) + + def test_validate_with_invalid_public_key_unique_identifier(self): + kwargs = {'private_key_uuid': None, + 'public_key_uuid': 'invalid', + 'private_key_template_attribute': None, + 'public_key_template_attribute': None} + self.assertRaisesRegexp( + TypeError, "invalid public key unique identifier", + rekey_key_pair.RekeyKeyPairResponsePayload, **kwargs) + + def test_validate_with_invalid_private_key_template_attribute(self): + kwargs = {'private_key_uuid': None, + 'public_key_uuid': None, + 'private_key_template_attribute': 'invalid', + 'public_key_template_attribute': None} + self.assertRaisesRegexp( + TypeError, "invalid private key template attribute", + rekey_key_pair.RekeyKeyPairResponsePayload, **kwargs) + + def test_validate_with_invalid_public_key_template_attribute(self): + kwargs = {'private_key_uuid': None, + 'public_key_uuid': None, + 'private_key_template_attribute': None, + 'public_key_template_attribute': 'invalid'} + self.assertRaisesRegexp( + TypeError, "invalid public key template attribute", + rekey_key_pair.RekeyKeyPairResponsePayload, **kwargs) + + def _test_read(self, stream, payload, private_key_uuid, public_key_uuid, + private_key_template_attribute, + public_key_template_attribute): + payload.read(stream) + + msg = "private_key_uuid decoding mismatch" + msg += "; expected {0}, received {1}".format( + private_key_uuid, payload.private_key_uuid) + self.assertEqual(private_key_uuid, payload.private_key_uuid, msg) + + msg = "public_key_uuid decoding mismatch" + msg += "; expected {0}, received {1}".format( + public_key_uuid, payload.public_key_uuid) + self.assertEqual(public_key_uuid, payload.public_key_uuid, msg) + + msg = "private_key_template_attribute decoding mismatch" + msg += "; expected {0}, received {1}".format( + private_key_template_attribute, + payload.private_key_template_attribute) + self.assertEqual(private_key_template_attribute, + payload.private_key_template_attribute, msg) + + msg = "public_key_template_attribute decoding mismatch" + msg += "; expected {0}, received {1}".format( + public_key_template_attribute, + payload.public_key_template_attribute) + self.assertEqual(public_key_template_attribute, + payload.public_key_template_attribute, msg) + + def test_read_with_none(self): + stream = self.encoding_empty + payload = rekey_key_pair.RekeyKeyPairResponsePayload() + + self._test_read(stream, payload, self.empty_private_key_uuid, + self.empty_public_key_uuid, None, None) + + def test_read_with_args(self): + stream = self.encoding_full + payload = rekey_key_pair.RekeyKeyPairResponsePayload( + self.private_key_uuid, self.public_key_uuid, + self.private_key_template_attribute, + self.public_key_template_attribute) + + self._test_read(stream, payload, self.private_key_uuid, + self.public_key_uuid, + self.private_key_template_attribute, + self.public_key_template_attribute) + + def _test_write(self, stream, payload, expected): + payload.write(stream) + + length_expected = len(expected) + length_received = len(stream) + + msg = "encoding lengths not equal" + msg += "; expected {0}, received {1}".format( + length_expected, length_received) + self.assertEqual(length_expected, length_received, msg) + + msg = "encoding mismatch" + msg += ";\nexpected:\n{0}\nreceived:\n{1}".format(expected, stream) + + self.assertEqual(expected, stream, msg) + + def test_write_with_none(self): + stream = utils.BytearrayStream() + payload = rekey_key_pair.RekeyKeyPairResponsePayload() + + self._test_write(stream, payload, self.encoding_empty) + + def test_write_with_args(self): + stream = utils.BytearrayStream() + payload = rekey_key_pair.RekeyKeyPairResponsePayload( + self.private_key_uuid, self.public_key_uuid, + self.private_key_template_attribute, + self.public_key_template_attribute) + + self._test_write(stream, payload, self.encoding_full) diff --git a/kmip/tests/core/test_primitives.py b/kmip/tests/core/test_primitives.py index b511a53..904ccf4 100644 --- a/kmip/tests/core/test_primitives.py +++ b/kmip/tests/core/test_primitives.py @@ -281,8 +281,8 @@ class TestInteger(TestCase): def test_init_unset(self): i = Integer() - self.assertEqual(None, i.value, - self.bad_value.format('value', None, i.value)) + self.assertEqual(0, i.value, + self.bad_value.format('value', 0, i.value)) self.assertEqual(i.LENGTH, i.length, self.bad_value.format('length', i.LENGTH, i.length)) self.assertEqual(i.LENGTH, i.padding_length, @@ -858,8 +858,8 @@ class TestEnumeration(TestCase): self.assertEqual(None, e.enum, self.bad_value.format('enum', None, e.enum)) - self.assertEqual(None, e.value, - self.bad_value.format('value', None, e.value)) + self.assertEqual(0, e.value, + self.bad_value.format('value', 0, e.value)) def test_validate_on_valid(self): e = Enumeration() diff --git a/kmip/tests/services/test_kmip_client.py b/kmip/tests/services/test_kmip_client.py index 68bcaad..950fd3b 100644 --- a/kmip/tests/services/test_kmip_client.py +++ b/kmip/tests/services/test_kmip_client.py @@ -21,6 +21,8 @@ import os import sys import time +from kmip.core.attributes import PrivateKeyUniqueIdentifier + from kmip.core.enums import AttributeType from kmip.core.enums import CredentialType from kmip.core.enums import CryptographicAlgorithm @@ -44,6 +46,10 @@ from kmip.core.messages.messages import ResponseMessage from kmip.core.messages.contents import Operation from kmip.core.messages.payloads.create_key_pair import \ CreateKeyPairRequestPayload, CreateKeyPairResponsePayload +from kmip.core.messages.payloads.rekey_key_pair import \ + RekeyKeyPairRequestPayload, RekeyKeyPairResponsePayload + +from kmip.core.misc import Offset from kmip.core.objects import Attribute from kmip.core.objects import CommonTemplateAttribute @@ -55,10 +61,12 @@ from kmip.core.secrets import SymmetricKey from kmip.services.kmip_client import KMIPProxy from kmip.services.results import CreateKeyPairResult +from kmip.services.results import RekeyKeyPairResult import kmip.core.utils as utils +# TODO (peter-hamilton) Move integration tests into separate module class TestKMIPClientIntegration(TestCase): STARTUP_TIME = 1.0 SHUTDOWN_TIME = 0.1 @@ -520,6 +528,65 @@ class TestKMIPClient(TestCase): def test_build_create_key_pair_batch_item_no_input(self): self._test_build_create_key_pair_batch_item(None, None, None) + def _test_build_rekey_key_pair_batch_item(self, uuid, offset, common, + private, public): + batch_item = self.client._build_rekey_key_pair_batch_item( + private_key_uuid=uuid, offset=offset, + common_template_attribute=common, + private_key_template_attribute=private, + public_key_template_attribute=public) + + base = "expected {0}, received {1}" + msg = base.format(RequestBatchItem, batch_item) + self.assertIsInstance(batch_item, RequestBatchItem, msg) + + operation = batch_item.operation + + msg = base.format(Operation, operation) + self.assertIsInstance(operation, Operation, msg) + + operation_enum = operation.enum + + msg = base.format(OperationEnum.REKEY_KEY_PAIR, operation_enum) + self.assertEqual(OperationEnum.REKEY_KEY_PAIR, operation_enum, msg) + + payload = batch_item.request_payload + + msg = base.format(RekeyKeyPairRequestPayload, payload) + self.assertIsInstance(payload, RekeyKeyPairRequestPayload, msg) + + private_key_uuid_observed = payload.private_key_uuid + offset_observed = payload.offset + common_observed = payload.common_template_attribute + private_observed = payload.private_key_template_attribute + public_observed = payload.public_key_template_attribute + + msg = base.format(uuid, private_key_uuid_observed) + self.assertEqual(uuid, private_key_uuid_observed, msg) + + msg = base.format(offset, offset_observed) + self.assertEqual(offset, offset_observed, msg) + + msg = base.format(common, common_observed) + self.assertEqual(common, common_observed, msg) + + msg = base.format(private, private_observed) + self.assertEqual(private, private_observed, msg) + + msg = base.format(public, public_observed) + self.assertEqual(public, public_observed) + + def test_build_rekey_key_pair_batch_item_with_input(self): + self._test_build_rekey_key_pair_batch_item( + PrivateKeyUniqueIdentifier(), Offset(), + CommonTemplateAttribute(), + PrivateKeyTemplateAttribute(), + PublicKeyTemplateAttribute()) + + def test_build_rekey_key_pair_batch_item_no_input(self): + self._test_build_rekey_key_pair_batch_item( + None, None, None, None, None) + def test_process_batch_items(self): batch_item = ResponseBatchItem( operation=Operation(OperationEnum.CREATE_KEY_PAIR), @@ -549,6 +616,24 @@ class TestKMIPClient(TestCase): msg = "number of results " + base.format(0, len(results)) self.assertEqual(0, len(results), msg) + def test_get_batch_item_processor(self): + base = "expected {0}, received {1}" + + expected = self.client._process_create_key_pair_batch_item + observed = self.client._get_batch_item_processor( + OperationEnum.CREATE_KEY_PAIR) + msg = base.format(expected, observed) + self.assertEqual(expected, observed, msg) + + expected = self.client._process_rekey_key_pair_batch_item + observed = self.client._get_batch_item_processor( + OperationEnum.REKEY_KEY_PAIR) + msg = base.format(expected, observed) + self.assertEqual(expected, observed, msg) + + self.assertRaisesRegexp(ValueError, "no processor for operation", + self.client._get_batch_item_processor, None) + def test_process_create_key_pair_batch_item(self): batch_item = ResponseBatchItem( operation=Operation(OperationEnum.CREATE_KEY_PAIR), @@ -557,3 +642,12 @@ class TestKMIPClient(TestCase): msg = "expected {0}, received {1}".format(CreateKeyPairResult, result) self.assertIsInstance(result, CreateKeyPairResult, msg) + + def test_process_rekey_key_pair_batch_item(self): + batch_item = ResponseBatchItem( + operation=Operation(OperationEnum.REKEY_KEY_PAIR), + response_payload=RekeyKeyPairResponsePayload()) + result = self.client._process_rekey_key_pair_batch_item(batch_item) + + msg = "expected {0}, received {1}".format(RekeyKeyPairResult, result) + self.assertIsInstance(result, RekeyKeyPairResult, msg)