Adding support for the RekeyKeyPair operation

This change adds support for the ReKeyKeyPair operation, adding in
required KMIP objects and updating the KMIP client. Minor changes to the
server are included in preparation for future changes. The unit test
suite has been updated accordingly.
This commit is contained in:
Peter Hamilton 2015-01-21 11:41:08 -05:00
parent c6d6db3dfe
commit ff533ff4bb
15 changed files with 743 additions and 32 deletions

View File

@ -20,6 +20,7 @@ from kmip.core.messages.payloads import create_key_pair
from kmip.core.messages.payloads import destroy
from kmip.core.messages.payloads import get
from kmip.core.messages.payloads import locate
from kmip.core.messages.payloads import rekey_key_pair
from kmip.core.messages.payloads import register
@ -34,6 +35,9 @@ class RequestPayloadFactory(PayloadFactory):
def _create_register_payload(self):
return register.RegisterRequestPayload()
def _create_rekey_key_pair_payload(self):
return rekey_key_pair.RekeyKeyPairRequestPayload()
def _create_locate_payload(self):
return locate.LocateRequestPayload()

View File

@ -20,6 +20,7 @@ from kmip.core.messages.payloads import create_key_pair
from kmip.core.messages.payloads import destroy
from kmip.core.messages.payloads import get
from kmip.core.messages.payloads import locate
from kmip.core.messages.payloads import rekey_key_pair
from kmip.core.messages.payloads import register
@ -34,6 +35,9 @@ class ResponsePayloadFactory(PayloadFactory):
def _create_register_payload(self):
return register.RegisterResponsePayload()
def _create_rekey_key_pair_payload(self):
return rekey_key_pair.RekeyKeyPairResponsePayload()
def _create_locate_payload(self):
return locate.LocateResponsePayload()

View File

@ -29,7 +29,7 @@ class CreateKeyPairRequestPayload(Struct):
common_template_attribute=None,
private_key_template_attribute=None,
public_key_template_attribute=None):
super(self.__class__, self).__init__(Tags.REQUEST_PAYLOAD)
super(CreateKeyPairRequestPayload, self).__init__(Tags.REQUEST_PAYLOAD)
self.common_template_attribute = common_template_attribute
self.private_key_template_attribute = private_key_template_attribute
@ -38,7 +38,7 @@ class CreateKeyPairRequestPayload(Struct):
self.validate()
def read(self, istream):
super(self.__class__, self).read(istream)
super(CreateKeyPairRequestPayload, self).read(istream)
tstream = BytearrayStream(istream.read(self.length))
if self.is_tag_next(Tags.COMMON_TEMPLATE_ATTRIBUTE, tstream):
@ -71,7 +71,7 @@ class CreateKeyPairRequestPayload(Struct):
self.public_key_template_attribute.write(tstream)
self.length = tstream.length()
super(self.__class__, self).write(ostream)
super(CreateKeyPairRequestPayload, self).write(ostream)
ostream.write(tstream.buffer)
def validate(self):
@ -113,8 +113,10 @@ class CreateKeyPairResponsePayload(Struct):
public_key_uuid=None,
private_key_template_attribute=None,
public_key_template_attribute=None):
super(self.__class__, self).__init__(Tags.RESPONSE_PAYLOAD)
super(CreateKeyPairResponsePayload, self).__init__(
Tags.RESPONSE_PAYLOAD)
# Private and public UUIDs are required so make defaults as backup
if private_key_uuid is None:
self.private_key_uuid = attributes.PrivateKeyUniqueIdentifier('')
else:
@ -131,7 +133,7 @@ class CreateKeyPairResponsePayload(Struct):
self.validate()
def read(self, istream):
super(self.__class__, self).read(istream)
super(CreateKeyPairResponsePayload, self).read(istream)
tstream = BytearrayStream(istream.read(self.length))
self.private_key_uuid.read(tstream)
@ -163,7 +165,7 @@ class CreateKeyPairResponsePayload(Struct):
self.public_key_template_attribute.write(tstream)
self.length = tstream.length()
super(self.__class__, self).write(ostream)
super(CreateKeyPairResponsePayload, self).write(ostream)
ostream.write(tstream.buffer)
def validate(self):

View File

@ -0,0 +1,153 @@
# Copyright (c) 2014 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from kmip.core import attributes
from kmip.core import misc
from kmip.core import objects
from kmip.core.enums import Tags
from kmip.core.messages.payloads.create_key_pair import \
CreateKeyPairResponsePayload
from kmip.core.primitives import Struct
from kmip.core.utils import BytearrayStream
class RekeyKeyPairRequestPayload(Struct):
def __init__(self,
private_key_uuid=None,
offset=None,
common_template_attribute=None,
private_key_template_attribute=None,
public_key_template_attribute=None):
super(RekeyKeyPairRequestPayload, self).__init__(Tags.REQUEST_PAYLOAD)
self.private_key_uuid = private_key_uuid
self.offset = offset
self.common_template_attribute = common_template_attribute
self.private_key_template_attribute = private_key_template_attribute
self.public_key_template_attribute = public_key_template_attribute
self.validate()
def read(self, istream):
super(RekeyKeyPairRequestPayload, self).read(istream)
tstream = BytearrayStream(istream.read(self.length))
if self.is_tag_next(Tags.PRIVATE_KEY_UNIQUE_IDENTIFIER, tstream):
self.private_key_uuid = attributes.PrivateKeyUniqueIdentifier()
self.private_key_uuid.read(tstream)
if self.is_tag_next(Tags.OFFSET, tstream):
self.offset = misc.Offset()
self.offset.read(tstream)
if self.is_tag_next(Tags.COMMON_TEMPLATE_ATTRIBUTE, tstream):
self.common_template_attribute = objects.CommonTemplateAttribute()
self.common_template_attribute.read(tstream)
if self.is_tag_next(Tags.PRIVATE_KEY_TEMPLATE_ATTRIBUTE, tstream):
self.private_key_template_attribute = \
objects.PrivateKeyTemplateAttribute()
self.private_key_template_attribute.read(tstream)
if self.is_tag_next(Tags.PUBLIC_KEY_TEMPLATE_ATTRIBUTE, tstream):
self.public_key_template_attribute = \
objects.PublicKeyTemplateAttribute()
self.public_key_template_attribute.read(tstream)
self.is_oversized(tstream)
self.validate()
def write(self, ostream):
tstream = BytearrayStream()
if self.private_key_uuid is not None:
self.private_key_uuid.write(tstream)
if self.offset is not None:
self.offset.write(tstream)
if self.common_template_attribute is not None:
self.common_template_attribute.write(tstream)
if self.private_key_template_attribute is not None:
self.private_key_template_attribute.write(tstream)
if self.public_key_template_attribute is not None:
self.public_key_template_attribute.write(tstream)
self.length = tstream.length()
super(RekeyKeyPairRequestPayload, self).write(ostream)
ostream.write(tstream.buffer)
def validate(self):
self.__validate()
def __validate(self):
if self.private_key_uuid is not None:
if not isinstance(self.private_key_uuid,
attributes.PrivateKeyUniqueIdentifier):
msg = "invalid private key unique identifier"
msg += "; expected {0}, received {1}".format(
attributes.PrivateKeyUniqueIdentifier,
self.private_key_uuid)
raise TypeError(msg)
if self.offset is not None:
if not isinstance(self.offset, misc.Offset):
msg = "invalid offset"
msg += "; expected {0}, received {1}".format(
misc.Offset, self.offset)
raise TypeError(msg)
if self.common_template_attribute is not None:
if not isinstance(self.common_template_attribute,
objects.CommonTemplateAttribute):
msg = "invalid common template attribute"
msg += "; expected {0}, received {1}".format(
objects.CommonTemplateAttribute,
self.common_template_attribute)
raise TypeError(msg)
if self.private_key_template_attribute is not None:
if not isinstance(self.private_key_template_attribute,
objects.PrivateKeyTemplateAttribute):
msg = "invalid private key template attribute"
msg += "; expected {0}, received {1}".format(
objects.PrivateKeyTemplateAttribute,
self.private_key_template_attribute)
raise TypeError(msg)
if self.public_key_template_attribute is not None:
if not isinstance(self.public_key_template_attribute,
objects.PublicKeyTemplateAttribute):
msg = "invalid public key template attribute"
msg += "; expected {0}, received {1}".format(
objects.PublicKeyTemplateAttribute,
self.public_key_template_attribute)
raise TypeError(msg)
class RekeyKeyPairResponsePayload(CreateKeyPairResponsePayload):
def __init__(self,
private_key_uuid=None,
public_key_uuid=None,
private_key_template_attribute=None,
public_key_template_attribute=None):
super(RekeyKeyPairResponsePayload, self).__init__(
private_key_uuid, public_key_uuid, private_key_template_attribute,
public_key_template_attribute)

23
kmip/core/misc.py Normal file
View File

@ -0,0 +1,23 @@
# Copyright (c) 2014 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from kmip.core import enums
from kmip.core import primitives
class Offset(primitives.Interval):
def __init__(self, value=None, tag=enums.Tags.OFFSET):
super(Offset, self).__init__(value, tag)

View File

@ -175,6 +175,9 @@ class Integer(Base):
super(Integer, self).__init__(tag, type=Types.INTEGER)
self.value = value
if self.value is None:
self.value = 0
self.length = self.LENGTH
self.padding_length = self.LENGTH
@ -223,6 +226,15 @@ class Integer(Base):
def __repr__(self):
return '<Integer, %d>' % (self.value)
def __eq__(self, other):
if isinstance(other, Integer):
return self.value == other.value
else:
return NotImplemented
def __ne__(self, other):
return not self.__eq__(other)
class LongInteger(Base):
LENGTH = 8

View File

@ -67,6 +67,12 @@ class KMIP(object):
credential=None):
raise NotImplementedError()
def rekey_key_pair(self, private_key_unique_identifier,
offset, common_template_attribute,
private_key_template_attribute,
public_key_template_attribute):
raise NotImplementedError()
def get(self, uuid=None, key_format_type=None, key_compression_type=None,
key_wrapping_specification=None, credential=None):
raise NotImplementedError()
@ -192,6 +198,12 @@ class KMIPImpl(KMIP):
uuid=UniqueIdentifier(s_uuid),
template_attribute=template_attribute)
def rekey_key_pair(self, private_key_unique_identifier,
offset, common_template_attribute,
private_key_template_attribute,
public_key_template_attribute):
raise NotImplementedError()
def get(self,
uuid=None,
key_format_type=None,

View File

@ -15,10 +15,11 @@
from kmip.services.results import CreateResult
from kmip.services.results import CreateKeyPairResult
from kmip.services.results import GetResult
from kmip.services.results import DestroyResult
from kmip.services.results import RegisterResult
from kmip.services.results import GetResult
from kmip.services.results import LocateResult
from kmip.services.results import RegisterResult
from kmip.services.results import RekeyKeyPairResult
from kmip.core import attributes as attr
@ -32,17 +33,18 @@ from kmip.core.server import KMIP
from kmip.core.messages.contents import Authentication
from kmip.core.messages.contents import BatchCount
from kmip.core.messages.contents import ProtocolVersion
from kmip.core.messages.contents import Operation
from kmip.core.messages.contents import ProtocolVersion
from kmip.core.messages import messages
from kmip.core.messages.payloads import create
from kmip.core.messages.payloads import create_key_pair
from kmip.core.messages.payloads import get
from kmip.core.messages.payloads import register
from kmip.core.messages.payloads import locate
from kmip.core.messages.payloads import destroy
from kmip.core.messages.payloads import get
from kmip.core.messages.payloads import locate
from kmip.core.messages.payloads import rekey_key_pair
from kmip.core.messages.payloads import register
from kmip.services.kmip_protocol import KMIPProtocol
@ -134,6 +136,22 @@ class KMIPProxy(KMIP):
secret=secret,
credential=credential)
def rekey_key_pair(self, batch=False, private_key_uuid=None, offset=None,
common_template_attribute=None,
private_key_template_attribute=None,
public_key_template_attribute=None, credential=None):
batch_item = self._build_rekey_key_pair_batch_item(
private_key_uuid, offset, common_template_attribute,
private_key_template_attribute, public_key_template_attribute)
if batch:
self.batch_items.append(batch_item)
else:
request = self._build_request_message(credential, [batch_item])
response = self._send_and_receive_message(request)
results = self._process_batch_items(response)
return results[0]
def locate(self, maximum_items=None, storage_status_mask=None,
object_group_member=None, attributes=None, credential=None):
return self._locate(maximum_items=maximum_items,
@ -194,6 +212,21 @@ class KMIPProxy(KMIP):
operation=operation, request_payload=payload)
return batch_item
def _build_rekey_key_pair_batch_item(self,
private_key_uuid=None, offset=None,
common_template_attribute=None,
private_key_template_attribute=None,
public_key_template_attribute=None):
operation = Operation(OperationEnum.REKEY_KEY_PAIR)
payload = rekey_key_pair.RekeyKeyPairRequestPayload(
private_key_uuid, offset,
common_template_attribute=common_template_attribute,
private_key_template_attribute=private_key_template_attribute,
public_key_template_attribute=public_key_template_attribute)
batch_item = messages.RequestBatchItem(
operation=operation, request_payload=payload)
return batch_item
def _process_batch_items(self, response):
results = []
for batch_item in response.batch_items:
@ -206,10 +239,13 @@ class KMIPProxy(KMIP):
def _get_batch_item_processor(self, operation):
if operation == OperationEnum.CREATE_KEY_PAIR:
return self._process_create_key_pair_batch_item
elif operation == OperationEnum.REKEY_KEY_PAIR:
return self._process_rekey_key_pair_batch_item
else:
raise ValueError("no processor for given operation")
raise ValueError("no processor for operation: {0}".format(
operation))
def _process_create_key_pair_batch_item(self, batch_item):
def _process_key_pair_batch_item(self, batch_item, result):
payload = batch_item.response_payload
payload_private_key_uuid = None
@ -225,14 +261,19 @@ class KMIPProxy(KMIP):
payload_public_key_template_attribute = \
payload.public_key_template_attribute
result = CreateKeyPairResult(batch_item.result_status,
batch_item.result_reason,
batch_item.result_message,
payload_private_key_uuid,
payload_public_key_uuid,
payload_private_key_template_attribute,
payload_public_key_template_attribute)
return result
return result(batch_item.result_status, batch_item.result_reason,
batch_item.result_message, payload_private_key_uuid,
payload_public_key_uuid,
payload_private_key_template_attribute,
payload_public_key_template_attribute)
def _process_create_key_pair_batch_item(self, batch_item):
return self._process_key_pair_batch_item(
batch_item, CreateKeyPairResult)
def _process_rekey_key_pair_batch_item(self, batch_item):
return self._process_key_pair_batch_item(
batch_item, RekeyKeyPairResult)
def _get(self,
unique_identifier=None,

View File

@ -99,6 +99,22 @@ class RegisterResult(OperationResult):
self.template_attribute = None
class RekeyKeyPairResult(CreateKeyPairResult):
def __init__(self,
result_status,
result_reason=None,
result_message=None,
private_key_uuid=None,
public_key_uuid=None,
private_key_template_attribute=None,
public_key_template_attribute=None):
super(RekeyKeyPairResult, self).__init__(
result_status, result_reason, result_message, private_key_uuid,
public_key_uuid, private_key_template_attribute,
public_key_template_attribute)
class GetResult(OperationResult):
def __init__(self,

View File

@ -23,6 +23,7 @@ from kmip.core.messages.payloads import create_key_pair
from kmip.core.messages.payloads import destroy
from kmip.core.messages.payloads import get
from kmip.core.messages.payloads import locate
from kmip.core.messages.payloads import rekey_key_pair
from kmip.core.messages.payloads import register
@ -156,8 +157,9 @@ class TestRequestPayloadFactory(testtools.TestCase):
self.factory.create, Operation.PUT)
def test_create_rekey_key_pair_payload(self):
self._test_not_implemented(
self.factory.create, Operation.REKEY_KEY_PAIR)
payload = self.factory.create(Operation.REKEY_KEY_PAIR)
self._test_payload_type(
payload, rekey_key_pair.RekeyKeyPairRequestPayload)
def test_create_discover_versions_payload(self):
self._test_not_implemented(

View File

@ -23,6 +23,7 @@ from kmip.core.messages.payloads import create_key_pair
from kmip.core.messages.payloads import destroy
from kmip.core.messages.payloads import get
from kmip.core.messages.payloads import locate
from kmip.core.messages.payloads import rekey_key_pair
from kmip.core.messages.payloads import register
@ -156,8 +157,9 @@ class TestResponsePayloadFactory(testtools.TestCase):
self.factory.create, Operation.PUT)
def test_create_rekey_key_pair_payload(self):
self._test_not_implemented(
self.factory.create, Operation.REKEY_KEY_PAIR)
payload = self.factory.create(Operation.REKEY_KEY_PAIR)
self._test_payload_type(
payload, rekey_key_pair.RekeyKeyPairResponsePayload)
def test_create_discover_versions_payload(self):
self._test_not_implemented(

View File

@ -192,7 +192,7 @@ class TestCreateKeyPairResponsePayload(TestCase):
def test_validate_with_invalid_private_key_unique_identifier(self):
kwargs = {'private_key_uuid': 'invalid',
'public_key_uuid': self.public_key_uuid,
'public_key_uuid': None,
'private_key_template_attribute': None,
'public_key_template_attribute': None}
self.assertRaisesRegexp(
@ -200,7 +200,7 @@ class TestCreateKeyPairResponsePayload(TestCase):
create_key_pair.CreateKeyPairResponsePayload, **kwargs)
def test_validate_with_invalid_public_key_unique_identifier(self):
kwargs = {'private_key_uuid': self.private_key_uuid,
kwargs = {'private_key_uuid': None,
'public_key_uuid': 'invalid',
'private_key_template_attribute': None,
'public_key_template_attribute': None}

View File

@ -0,0 +1,346 @@
# Copyright (c) 2014 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from testtools import TestCase
from kmip.core import attributes
from kmip.core import misc
from kmip.core import objects
from kmip.core import utils
from kmip.core.messages.payloads import rekey_key_pair
class TestRekeyKeyPairRequestPayload(TestCase):
def setUp(self):
super(TestRekeyKeyPairRequestPayload, self).setUp()
self.uuid = '00000000-0000-0000-0000-000000000000'
self.private_key_uuid = attributes.PrivateKeyUniqueIdentifier(
self.uuid)
self.offset = misc.Offset(0)
self.common_template_attribute = objects.CommonTemplateAttribute()
self.private_key_template_attribute = \
objects.PrivateKeyTemplateAttribute()
self.public_key_template_attribute = \
objects.PublicKeyTemplateAttribute()
self.encoding_empty = utils.BytearrayStream((
b'\x42\x00\x79\x01\x00\x00\x00\x00'))
self.encoding_full = utils.BytearrayStream((
b'\x42\x00\x79\x01\x00\x00\x00\x58\x42\x00\x66\x07\x00\x00\x00\x24'
b'\x30\x30\x30\x30\x30\x30\x30\x30\x2d\x30\x30\x30\x30\x2d\x30\x30'
b'\x30\x30\x2d\x30\x30\x30\x30\x2d\x30\x30\x30\x30\x30\x30\x30\x30'
b'\x30\x30\x30\x30\x00\x00\x00\x00\x42\x00\x58\x0A\x00\x00\x00\x04'
b'\x00\x00\x00\x00\x00\x00\x00\x00\x42\x00\x1F\x01\x00\x00\x00\x00'
b'\x42\x00\x65\x01\x00\x00\x00\x00\x42\x00\x6E\x01\x00\x00\x00\x00'
))
def tearDown(self):
super(TestRekeyKeyPairRequestPayload, self).tearDown()
def test_init_with_none(self):
rekey_key_pair.RekeyKeyPairRequestPayload()
def test_init_with_args(self):
rekey_key_pair.RekeyKeyPairRequestPayload(
self.private_key_uuid, self.offset, self.common_template_attribute,
self.private_key_template_attribute,
self.public_key_template_attribute)
def test_validate_with_invalid_private_key_unique_identifier(self):
kwargs = {'private_key_uuid': 'invalid', 'offset': None,
'common_template_attribute': None,
'private_key_template_attribute': None,
'public_key_template_attribute': None}
self.assertRaisesRegexp(
TypeError, "invalid private key unique identifier",
rekey_key_pair.RekeyKeyPairRequestPayload, **kwargs)
def test_validate_with_invalid_offset(self):
kwargs = {'private_key_uuid': None, 'offset': 'invalid',
'common_template_attribute': None,
'private_key_template_attribute': None,
'public_key_template_attribute': None}
self.assertRaisesRegexp(
TypeError, "invalid offset",
rekey_key_pair.RekeyKeyPairRequestPayload, **kwargs)
def test_validate_with_invalid_common_template_attribute(self):
kwargs = {'private_key_uuid': None, 'offset': None,
'common_template_attribute': 'invalid',
'private_key_template_attribute': None,
'public_key_template_attribute': None}
self.assertRaisesRegexp(
TypeError, "invalid common template attribute",
rekey_key_pair.RekeyKeyPairRequestPayload, **kwargs)
def test_validate_with_invalid_private_key_template_attribute(self):
kwargs = {'private_key_uuid': None, 'offset': None,
'common_template_attribute': None,
'private_key_template_attribute': 'invalid',
'public_key_template_attribute': None}
self.assertRaisesRegexp(
TypeError, "invalid private key template attribute",
rekey_key_pair.RekeyKeyPairRequestPayload, **kwargs)
def test_validate_with_invalid_public_key_template_attribute(self):
kwargs = {'private_key_uuid': None, 'offset': None,
'common_template_attribute': None,
'private_key_template_attribute': None,
'public_key_template_attribute': 'invalid'}
self.assertRaisesRegexp(
TypeError, "invalid public key template attribute",
rekey_key_pair.RekeyKeyPairRequestPayload, **kwargs)
def _test_read(self, stream, payload, private_key_uuid, offset,
common_template_attribute, private_key_template_attribute,
public_key_template_attribute):
payload.read(stream)
msg = "private_key_uuid decoding mismatch"
msg += "; expected {0}, received {1}".format(
private_key_uuid, payload.private_key_uuid)
self.assertEqual(private_key_uuid, payload.private_key_uuid, msg)
msg = "offset decoding mismatch"
msg += "; expected {0}, received {1}".format(offset, payload.offset)
self.assertEqual(offset, payload.offset, msg)
msg = "common_template_attribute decoding mismatch"
msg += "; expected {0}, received {1}".format(
common_template_attribute, payload.common_template_attribute)
self.assertEqual(common_template_attribute,
payload.common_template_attribute, msg)
msg = "private_key_template_attribute decoding mismatch"
msg += "; expected {0}, received {1}".format(
private_key_template_attribute,
payload.private_key_template_attribute)
self.assertEqual(private_key_template_attribute,
payload.private_key_template_attribute, msg)
msg = "public_key_template_attribute decoding mismatch"
msg += "; expected {0}, received {1}".format(
public_key_template_attribute,
payload.public_key_template_attribute)
self.assertEqual(public_key_template_attribute,
payload.public_key_template_attribute, msg)
def test_read_with_none(self):
stream = self.encoding_empty
payload = rekey_key_pair.RekeyKeyPairRequestPayload()
self._test_read(stream, payload, None, None, None, None, None)
def test_read_with_args(self):
stream = self.encoding_full
payload = rekey_key_pair.RekeyKeyPairRequestPayload()
self._test_read(stream, payload, self.private_key_uuid, self.offset,
self.common_template_attribute,
self.private_key_template_attribute,
self.public_key_template_attribute)
def _test_write(self, stream, payload, expected):
payload.write(stream)
length_expected = len(expected)
length_received = len(stream)
msg = "encoding lengths not equal"
msg += "; expected {0}, received {1}".format(
length_expected, length_received)
self.assertEqual(length_expected, length_received, msg)
msg = "encoding mismatch"
msg += ";\nexpected:\n{0}\nreceived:\n{1}".format(expected, stream)
self.assertEqual(expected, stream, msg)
def test_write_with_none(self):
stream = utils.BytearrayStream()
payload = rekey_key_pair.RekeyKeyPairRequestPayload()
self._test_write(stream, payload, self.encoding_empty)
def test_write_with_args(self):
stream = utils.BytearrayStream()
payload = rekey_key_pair.RekeyKeyPairRequestPayload(
self.private_key_uuid, self.offset, self.common_template_attribute,
self.private_key_template_attribute,
self.public_key_template_attribute)
self._test_write(stream, payload, self.encoding_full)
class TestRekeyKeyPairResponsePayload(TestCase):
def setUp(self):
super(TestRekeyKeyPairResponsePayload, self).setUp()
self.uuid = '00000000-0000-0000-0000-000000000000'
self.private_key_uuid = attributes.PrivateKeyUniqueIdentifier(
self.uuid)
self.public_key_uuid = attributes.PublicKeyUniqueIdentifier(
self.uuid)
self.empty_private_key_uuid = attributes.PrivateKeyUniqueIdentifier('')
self.empty_public_key_uuid = attributes.PublicKeyUniqueIdentifier('')
self.private_key_template_attribute = \
objects.PrivateKeyTemplateAttribute()
self.public_key_template_attribute = \
objects.PublicKeyTemplateAttribute()
self.encoding_empty = utils.BytearrayStream((
b'\x42\x00\x7C\x01\x00\x00\x00\x10\x42\x00\x66\x07\x00\x00\x00\x00'
b'\x42\x00\x6F\x07\x00\x00\x00\x00'))
self.encoding_full = utils.BytearrayStream((
b'\x42\x00\x7C\x01\x00\x00\x00\x70\x42\x00\x66\x07\x00\x00\x00\x24'
b'\x30\x30\x30\x30\x30\x30\x30\x30\x2d\x30\x30\x30\x30\x2d\x30\x30'
b'\x30\x30\x2d\x30\x30\x30\x30\x2d\x30\x30\x30\x30\x30\x30\x30\x30'
b'\x30\x30\x30\x30\x00\x00\x00\x00\x42\x00\x6F\x07\x00\x00\x00\x24'
b'\x30\x30\x30\x30\x30\x30\x30\x30\x2d\x30\x30\x30\x30\x2d\x30\x30'
b'\x30\x30\x2d\x30\x30\x30\x30\x2d\x30\x30\x30\x30\x30\x30\x30\x30'
b'\x30\x30\x30\x30\x00\x00\x00\x00\x42\x00\x65\x01\x00\x00\x00\x00'
b'\x42\x00\x6E\x01\x00\x00\x00\x00'))
def tearDown(self):
super(TestRekeyKeyPairResponsePayload, self).tearDown()
def test_init_with_none(self):
rekey_key_pair.RekeyKeyPairResponsePayload()
def test_init_with_args(self):
rekey_key_pair.RekeyKeyPairResponsePayload(
self.private_key_uuid, self.public_key_uuid,
self.private_key_template_attribute,
self.public_key_template_attribute)
def test_validate_with_invalid_private_key_unique_identifier(self):
kwargs = {'private_key_uuid': 'invalid',
'public_key_uuid': None,
'private_key_template_attribute': None,
'public_key_template_attribute': None}
self.assertRaisesRegexp(
TypeError, "invalid private key unique identifier",
rekey_key_pair.RekeyKeyPairResponsePayload, **kwargs)
def test_validate_with_invalid_public_key_unique_identifier(self):
kwargs = {'private_key_uuid': None,
'public_key_uuid': 'invalid',
'private_key_template_attribute': None,
'public_key_template_attribute': None}
self.assertRaisesRegexp(
TypeError, "invalid public key unique identifier",
rekey_key_pair.RekeyKeyPairResponsePayload, **kwargs)
def test_validate_with_invalid_private_key_template_attribute(self):
kwargs = {'private_key_uuid': None,
'public_key_uuid': None,
'private_key_template_attribute': 'invalid',
'public_key_template_attribute': None}
self.assertRaisesRegexp(
TypeError, "invalid private key template attribute",
rekey_key_pair.RekeyKeyPairResponsePayload, **kwargs)
def test_validate_with_invalid_public_key_template_attribute(self):
kwargs = {'private_key_uuid': None,
'public_key_uuid': None,
'private_key_template_attribute': None,
'public_key_template_attribute': 'invalid'}
self.assertRaisesRegexp(
TypeError, "invalid public key template attribute",
rekey_key_pair.RekeyKeyPairResponsePayload, **kwargs)
def _test_read(self, stream, payload, private_key_uuid, public_key_uuid,
private_key_template_attribute,
public_key_template_attribute):
payload.read(stream)
msg = "private_key_uuid decoding mismatch"
msg += "; expected {0}, received {1}".format(
private_key_uuid, payload.private_key_uuid)
self.assertEqual(private_key_uuid, payload.private_key_uuid, msg)
msg = "public_key_uuid decoding mismatch"
msg += "; expected {0}, received {1}".format(
public_key_uuid, payload.public_key_uuid)
self.assertEqual(public_key_uuid, payload.public_key_uuid, msg)
msg = "private_key_template_attribute decoding mismatch"
msg += "; expected {0}, received {1}".format(
private_key_template_attribute,
payload.private_key_template_attribute)
self.assertEqual(private_key_template_attribute,
payload.private_key_template_attribute, msg)
msg = "public_key_template_attribute decoding mismatch"
msg += "; expected {0}, received {1}".format(
public_key_template_attribute,
payload.public_key_template_attribute)
self.assertEqual(public_key_template_attribute,
payload.public_key_template_attribute, msg)
def test_read_with_none(self):
stream = self.encoding_empty
payload = rekey_key_pair.RekeyKeyPairResponsePayload()
self._test_read(stream, payload, self.empty_private_key_uuid,
self.empty_public_key_uuid, None, None)
def test_read_with_args(self):
stream = self.encoding_full
payload = rekey_key_pair.RekeyKeyPairResponsePayload(
self.private_key_uuid, self.public_key_uuid,
self.private_key_template_attribute,
self.public_key_template_attribute)
self._test_read(stream, payload, self.private_key_uuid,
self.public_key_uuid,
self.private_key_template_attribute,
self.public_key_template_attribute)
def _test_write(self, stream, payload, expected):
payload.write(stream)
length_expected = len(expected)
length_received = len(stream)
msg = "encoding lengths not equal"
msg += "; expected {0}, received {1}".format(
length_expected, length_received)
self.assertEqual(length_expected, length_received, msg)
msg = "encoding mismatch"
msg += ";\nexpected:\n{0}\nreceived:\n{1}".format(expected, stream)
self.assertEqual(expected, stream, msg)
def test_write_with_none(self):
stream = utils.BytearrayStream()
payload = rekey_key_pair.RekeyKeyPairResponsePayload()
self._test_write(stream, payload, self.encoding_empty)
def test_write_with_args(self):
stream = utils.BytearrayStream()
payload = rekey_key_pair.RekeyKeyPairResponsePayload(
self.private_key_uuid, self.public_key_uuid,
self.private_key_template_attribute,
self.public_key_template_attribute)
self._test_write(stream, payload, self.encoding_full)

View File

@ -281,8 +281,8 @@ class TestInteger(TestCase):
def test_init_unset(self):
i = Integer()
self.assertEqual(None, i.value,
self.bad_value.format('value', None, i.value))
self.assertEqual(0, i.value,
self.bad_value.format('value', 0, i.value))
self.assertEqual(i.LENGTH, i.length,
self.bad_value.format('length', i.LENGTH, i.length))
self.assertEqual(i.LENGTH, i.padding_length,
@ -858,8 +858,8 @@ class TestEnumeration(TestCase):
self.assertEqual(None, e.enum,
self.bad_value.format('enum', None, e.enum))
self.assertEqual(None, e.value,
self.bad_value.format('value', None, e.value))
self.assertEqual(0, e.value,
self.bad_value.format('value', 0, e.value))
def test_validate_on_valid(self):
e = Enumeration()

View File

@ -21,6 +21,8 @@ import os
import sys
import time
from kmip.core.attributes import PrivateKeyUniqueIdentifier
from kmip.core.enums import AttributeType
from kmip.core.enums import CredentialType
from kmip.core.enums import CryptographicAlgorithm
@ -44,6 +46,10 @@ from kmip.core.messages.messages import ResponseMessage
from kmip.core.messages.contents import Operation
from kmip.core.messages.payloads.create_key_pair import \
CreateKeyPairRequestPayload, CreateKeyPairResponsePayload
from kmip.core.messages.payloads.rekey_key_pair import \
RekeyKeyPairRequestPayload, RekeyKeyPairResponsePayload
from kmip.core.misc import Offset
from kmip.core.objects import Attribute
from kmip.core.objects import CommonTemplateAttribute
@ -55,10 +61,12 @@ from kmip.core.secrets import SymmetricKey
from kmip.services.kmip_client import KMIPProxy
from kmip.services.results import CreateKeyPairResult
from kmip.services.results import RekeyKeyPairResult
import kmip.core.utils as utils
# TODO (peter-hamilton) Move integration tests into separate module
class TestKMIPClientIntegration(TestCase):
STARTUP_TIME = 1.0
SHUTDOWN_TIME = 0.1
@ -520,6 +528,65 @@ class TestKMIPClient(TestCase):
def test_build_create_key_pair_batch_item_no_input(self):
self._test_build_create_key_pair_batch_item(None, None, None)
def _test_build_rekey_key_pair_batch_item(self, uuid, offset, common,
private, public):
batch_item = self.client._build_rekey_key_pair_batch_item(
private_key_uuid=uuid, offset=offset,
common_template_attribute=common,
private_key_template_attribute=private,
public_key_template_attribute=public)
base = "expected {0}, received {1}"
msg = base.format(RequestBatchItem, batch_item)
self.assertIsInstance(batch_item, RequestBatchItem, msg)
operation = batch_item.operation
msg = base.format(Operation, operation)
self.assertIsInstance(operation, Operation, msg)
operation_enum = operation.enum
msg = base.format(OperationEnum.REKEY_KEY_PAIR, operation_enum)
self.assertEqual(OperationEnum.REKEY_KEY_PAIR, operation_enum, msg)
payload = batch_item.request_payload
msg = base.format(RekeyKeyPairRequestPayload, payload)
self.assertIsInstance(payload, RekeyKeyPairRequestPayload, msg)
private_key_uuid_observed = payload.private_key_uuid
offset_observed = payload.offset
common_observed = payload.common_template_attribute
private_observed = payload.private_key_template_attribute
public_observed = payload.public_key_template_attribute
msg = base.format(uuid, private_key_uuid_observed)
self.assertEqual(uuid, private_key_uuid_observed, msg)
msg = base.format(offset, offset_observed)
self.assertEqual(offset, offset_observed, msg)
msg = base.format(common, common_observed)
self.assertEqual(common, common_observed, msg)
msg = base.format(private, private_observed)
self.assertEqual(private, private_observed, msg)
msg = base.format(public, public_observed)
self.assertEqual(public, public_observed)
def test_build_rekey_key_pair_batch_item_with_input(self):
self._test_build_rekey_key_pair_batch_item(
PrivateKeyUniqueIdentifier(), Offset(),
CommonTemplateAttribute(),
PrivateKeyTemplateAttribute(),
PublicKeyTemplateAttribute())
def test_build_rekey_key_pair_batch_item_no_input(self):
self._test_build_rekey_key_pair_batch_item(
None, None, None, None, None)
def test_process_batch_items(self):
batch_item = ResponseBatchItem(
operation=Operation(OperationEnum.CREATE_KEY_PAIR),
@ -549,6 +616,24 @@ class TestKMIPClient(TestCase):
msg = "number of results " + base.format(0, len(results))
self.assertEqual(0, len(results), msg)
def test_get_batch_item_processor(self):
base = "expected {0}, received {1}"
expected = self.client._process_create_key_pair_batch_item
observed = self.client._get_batch_item_processor(
OperationEnum.CREATE_KEY_PAIR)
msg = base.format(expected, observed)
self.assertEqual(expected, observed, msg)
expected = self.client._process_rekey_key_pair_batch_item
observed = self.client._get_batch_item_processor(
OperationEnum.REKEY_KEY_PAIR)
msg = base.format(expected, observed)
self.assertEqual(expected, observed, msg)
self.assertRaisesRegexp(ValueError, "no processor for operation",
self.client._get_batch_item_processor, None)
def test_process_create_key_pair_batch_item(self):
batch_item = ResponseBatchItem(
operation=Operation(OperationEnum.CREATE_KEY_PAIR),
@ -557,3 +642,12 @@ class TestKMIPClient(TestCase):
msg = "expected {0}, received {1}".format(CreateKeyPairResult, result)
self.assertIsInstance(result, CreateKeyPairResult, msg)
def test_process_rekey_key_pair_batch_item(self):
batch_item = ResponseBatchItem(
operation=Operation(OperationEnum.REKEY_KEY_PAIR),
response_payload=RekeyKeyPairResponsePayload())
result = self.client._process_rekey_key_pair_batch_item(batch_item)
msg = "expected {0}, received {1}".format(RekeyKeyPairResult, result)
self.assertIsInstance(result, RekeyKeyPairResult, msg)