Adding payload factories

This change adds a set of payload factories to the factory suite, which
handle the construction of default request/response payload objects for
KMIP operations. These factories replace the use of payload operation
dictionaries for dynamic payload object lookup. A payload factory test
suite and minor logging tweaks are included.
This commit is contained in:
Peter Hamilton 2014-12-04 08:31:09 -05:00
parent 3918c320fd
commit 6cbb3159ca
11 changed files with 771 additions and 31 deletions

View File

@ -0,0 +1,174 @@
# Copyright (c) 2014 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from kmip.core.enums import Operation
class PayloadFactory():
def create(self, operation):
# Switch on Operation enum
if operation is Operation.CREATE:
return self._create_create_payload()
elif operation is Operation.CREATE_KEY_PAIR:
return self._create_create_key_pair_payload()
elif operation is Operation.REGISTER:
return self._create_register_payload()
elif operation is Operation.REKEY:
return self._create_rekey_payload()
elif operation is Operation.DERIVE_KEY:
return self._create_derive_key_payload()
elif operation is Operation.CERTIFY:
return self._create_certify_payload()
elif operation is Operation.RECERTIFY:
return self._create_recertify_payload()
elif operation is Operation.LOCATE:
return self._create_locate_payload()
elif operation is Operation.CHECK:
return self._create_check_payload()
elif operation is Operation.GET:
return self._create_get_payload()
elif operation is Operation.GET_ATTRIBUTES:
return self._create_get_attributes_payload()
elif operation is Operation.GET_ATTRIBUTE_LIST:
return self._create_get_attribute_list_payload()
elif operation is Operation.ADD_ATTRIBUTE:
return self._create_add_attribute_payload()
elif operation is Operation.MODIFY_ATTRIBUTE:
return self._create_modify_attribute_payload()
elif operation is Operation.DELETE_ATTRIBUTE:
return self._create_delete_attribute_payload()
elif operation is Operation.OBTAIN_LEASE:
return self._create_obtain_lease_payload()
elif operation is Operation.GET_USAGE_ALLOCATION:
return self._create_get_usage_allocation_payload()
elif operation is Operation.ACTIVATE:
return self._create_activate_payload()
elif operation is Operation.REVOKE:
return self._create_revoke_payload()
elif operation is Operation.DESTROY:
return self._create_destroy_payload()
elif operation is Operation.ARCHIVE:
return self._create_archive_payload()
elif operation is Operation.RECOVER:
return self._create_recover_payload()
elif operation is Operation.VALIDATE:
return self._create_validate_payload()
elif operation is Operation.QUERY:
return self._create_query_payload()
elif operation is Operation.CANCEL:
return self._create_cancel_payload()
elif operation is Operation.POLL:
return self._create_poll_payload()
elif operation is Operation.NOTIFY:
return self._create_notify_payload()
elif operation is Operation.PUT:
return self._create_put_payload()
elif operation is Operation.REKEY_KEY_PAIR:
return self._create_rekey_key_pair_payload()
elif operation is Operation.DISCOVER_VERSIONS:
return self._create_discover_versions_payload()
else:
raise ValueError('unsupported operation: {0}'.format(operation))
def _create_create_payload(self):
raise NotImplementedError()
def _create_create_key_pair_payload(self):
raise NotImplementedError()
def _create_register_payload(self):
raise NotImplementedError()
def _create_rekey_payload(self):
raise NotImplementedError()
def _create_derive_key_payload(self):
raise NotImplementedError()
def _create_certify_payload(self):
raise NotImplementedError()
def _create_recertify_payload(self):
raise NotImplementedError()
def _create_locate_payload(self):
raise NotImplementedError()
def _create_check_payload(self):
raise NotImplementedError()
def _create_get_payload(self):
raise NotImplementedError()
def _create_get_attributes_payload(self):
raise NotImplementedError()
def _create_get_attribute_list_payload(self):
raise NotImplementedError()
def _create_add_attribute_payload(self):
raise NotImplementedError()
def _create_modify_attribute_payload(self):
raise NotImplementedError()
def _create_delete_attribute_payload(self):
raise NotImplementedError()
def _create_obtain_lease_payload(self):
raise NotImplementedError()
def _create_get_usage_allocation_payload(self):
raise NotImplementedError()
def _create_activate_payload(self):
raise NotImplementedError()
def _create_revoke_payload(self):
raise NotImplementedError()
def _create_destroy_payload(self):
raise NotImplementedError()
def _create_archive_payload(self):
raise NotImplementedError()
def _create_recover_payload(self):
raise NotImplementedError()
def _create_validate_payload(self):
raise NotImplementedError()
def _create_query_payload(self):
raise NotImplementedError()
def _create_cancel_payload(self):
raise NotImplementedError()
def _create_poll_payload(self):
raise NotImplementedError()
def _create_notify_payload(self):
raise NotImplementedError()
def _create_put_payload(self):
raise NotImplementedError()
def _create_rekey_key_pair_payload(self):
raise NotImplementedError()
def _create_discover_versions_payload(self):
raise NotImplementedError()

View File

@ -0,0 +1,40 @@
# Copyright (c) 2014 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from kmip.core.factories.payloads import PayloadFactory
from kmip.core.messages.payloads import create
from kmip.core.messages.payloads import destroy
from kmip.core.messages.payloads import get
from kmip.core.messages.payloads import locate
from kmip.core.messages.payloads import register
class RequestPayloadFactory(PayloadFactory):
def _create_create_payload(self):
return create.CreateRequestPayload()
def _create_register_payload(self):
return register.RegisterRequestPayload()
def _create_locate_payload(self):
return locate.LocateRequestPayload()
def _create_get_payload(self):
return get.GetRequestPayload()
def _create_destroy_payload(self):
return destroy.DestroyRequestPayload()

View File

@ -0,0 +1,40 @@
# Copyright (c) 2014 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from kmip.core.factories.payloads import PayloadFactory
from kmip.core.messages.payloads import create
from kmip.core.messages.payloads import destroy
from kmip.core.messages.payloads import get
from kmip.core.messages.payloads import locate
from kmip.core.messages.payloads import register
class ResponsePayloadFactory(PayloadFactory):
def _create_create_payload(self):
return create.CreateResponsePayload()
def _create_register_payload(self):
return register.RegisterResponsePayload()
def _create_locate_payload(self):
return locate.LocateResponsePayload()
def _create_get_payload(self):
return get.GetResponsePayload()
def _create_destroy_payload(self):
return destroy.DestroyResponsePayload()

View File

@ -19,7 +19,8 @@ from kmip.core.messages import contents
from kmip.core.messages.contents import AsynchronousCorrelationValue from kmip.core.messages.contents import AsynchronousCorrelationValue
from kmip.core.messages.contents import BatchErrorContinuationOption from kmip.core.messages.contents import BatchErrorContinuationOption
from kmip.core.messages import payloads from kmip.core.factories.payloads.request import RequestPayloadFactory
from kmip.core.factories.payloads.response import ResponsePayloadFactory
from kmip.core.primitives import Struct from kmip.core.primitives import Struct
@ -175,6 +176,9 @@ class RequestBatchItem(Struct):
request_payload=None, request_payload=None,
message_extension=None): message_extension=None):
super(self.__class__, self).__init__(tag=Tags.REQUEST_BATCH_ITEM) super(self.__class__, self).__init__(tag=Tags.REQUEST_BATCH_ITEM)
self.payload_factory = RequestPayloadFactory()
self.operation = operation self.operation = operation
self.unique_batch_item_id = unique_batch_item_id self.unique_batch_item_id = unique_batch_item_id
self.request_payload = request_payload self.request_payload = request_payload
@ -193,9 +197,10 @@ class RequestBatchItem(Struct):
self.unique_batch_item_id = contents.UniqueBatchItemID() self.unique_batch_item_id = contents.UniqueBatchItemID()
self.unique_batch_item_id.read(tstream) self.unique_batch_item_id.read(tstream)
# Lookup the response payload class that belongs to the operation # Dynamically create the response payload class that belongs to the
cls = payloads.REQUEST_MAP.get(self.operation.enum) # operation
self.request_payload = cls() self.request_payload = self.payload_factory.create(
self.operation.enum)
self.request_payload.read(tstream) self.request_payload.read(tstream)
# Read the message extension if it is present # Read the message extension if it is present
@ -237,6 +242,9 @@ class ResponseBatchItem(Struct):
response_payload=None, response_payload=None,
message_extension=None): message_extension=None):
super(self.__class__, self).__init__(tag=Tags.RESPONSE_BATCH_ITEM) super(self.__class__, self).__init__(tag=Tags.RESPONSE_BATCH_ITEM)
self.payload_factory = ResponsePayloadFactory()
self.operation = operation self.operation = operation
self.unique_batch_item_id = unique_batch_item_id self.unique_batch_item_id = unique_batch_item_id
self.result_status = result_status self.result_status = result_status
@ -280,11 +288,11 @@ class ResponseBatchItem(Struct):
self.async_correlation_value = AsynchronousCorrelationValue() self.async_correlation_value = AsynchronousCorrelationValue()
self.async_correlation_value.read(tstream) self.async_correlation_value.read(tstream)
# Lookup the response payload class that belongs to the operation # Dynamically create the response payload class that belongs to the
cls = payloads.RESPONSE_MAP.get(self.operation.enum) # operation
expected = cls() expected = self.payload_factory.create(self.operation.enum)
if self.is_tag_next(expected.tag, tstream): if self.is_tag_next(expected.tag, tstream):
self.response_payload = cls() self.response_payload = expected
self.response_payload.read(tstream) self.response_payload.read(tstream)
# Read the message extension if it is present # Read the message extension if it is present

View File

@ -14,24 +14,3 @@
# under the License. # under the License.
__all__ = ['create', 'destroy', 'get', 'locate', 'register'] __all__ = ['create', 'destroy', 'get', 'locate', 'register']
from kmip.core import enums
from kmip.core.messages.payloads import create
from kmip.core.messages.payloads import get
from kmip.core.messages.payloads import destroy
from kmip.core.messages.payloads import register
from kmip.core.messages.payloads import locate
# TODO (peter-hamilton) Replace with PayloadFactories
REQUEST_MAP = {enums.Operation.CREATE: create.CreateRequestPayload,
enums.Operation.GET: get.GetRequestPayload,
enums.Operation.DESTROY: destroy.DestroyRequestPayload,
enums.Operation.REGISTER: register.RegisterRequestPayload,
enums.Operation.LOCATE: locate.LocateRequestPayload}
RESPONSE_MAP = {enums.Operation.CREATE: create.CreateResponsePayload,
enums.Operation.GET: get.GetResponsePayload,
enums.Operation.REGISTER: register.RegisterResponsePayload,
enums.Operation.DESTROY: destroy.DestroyResponsePayload,
enums.Operation.LOCATE: locate.LocateResponsePayload}

View File

@ -31,14 +31,18 @@ class KMIPProtocol(object):
def write(self, data): def write(self, data):
if len(data) > 0: if len(data) > 0:
sbuffer = bytes(data) sbuffer = bytes(data)
self.logger.debug('buffer: {0}'.format(binascii.hexlify(sbuffer))) self.logger.debug('KMIPProtocol.write: {0}'.format(
binascii.hexlify(sbuffer)))
self.socket.sendall(sbuffer) self.socket.sendall(sbuffer)
def read(self): def read(self):
header = self._recv_all(self.HEADER_SIZE) header = self._recv_all(self.HEADER_SIZE)
msg_size = unpack('!I', header[4:])[0] msg_size = unpack('!I', header[4:])[0]
payload = self._recv_all(msg_size) payload = self._recv_all(msg_size)
return BytearrayStream(header + payload) data = BytearrayStream(header + payload)
self.logger.debug('KMIPProtocol.read: {0}'.format(
binascii.hexlify(bytes(data.buffer))))
return data
def _recv_all(self, total_bytes_to_be_read): def _recv_all(self, total_bytes_to_be_read):
bytes_read = 0 bytes_read = 0

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import logging
import time import time
from kmip.core.messages.messages import RequestMessage from kmip.core.messages.messages import RequestMessage
@ -43,6 +44,7 @@ from kmip.core.utils import BytearrayStream
class Processor(object): class Processor(object):
def __init__(self, handler): def __init__(self, handler):
self.logger = logging.getLogger(__name__)
self._handler = handler self._handler = handler
def process(self, istream, ostream): def process(self, istream, ostream):

View File

@ -0,0 +1,14 @@
# Copyright (c) 2014 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,155 @@
# Copyright (c) 2014 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from kmip.core.enums import Operation
from kmip.core.factories.payloads import PayloadFactory
class TestPayloadFactory(testtools.TestCase):
def setUp(self):
super(TestPayloadFactory, self).setUp()
self.factory = PayloadFactory()
def tearDown(self):
super(TestPayloadFactory, self).tearDown()
def _test_not_implemented(self, func, args):
self.assertRaises(NotImplementedError, func, args)
def test_create_create_payload(self):
self._test_not_implemented(
self.factory.create, Operation.CREATE)
def test_create_create_key_pair_payload(self):
self._test_not_implemented(
self.factory.create, Operation.CREATE_KEY_PAIR)
def test_create_register_payload(self):
self._test_not_implemented(
self.factory.create, Operation.REGISTER)
def test_create_rekey_payload(self):
self._test_not_implemented(
self.factory.create, Operation.REKEY)
def test_create_derive_key_payload(self):
self._test_not_implemented(
self.factory.create, Operation.DERIVE_KEY)
def test_create_certify_payload(self):
self._test_not_implemented(
self.factory.create, Operation.CERTIFY)
def test_create_recertify_payload(self):
self._test_not_implemented(
self.factory.create, Operation.RECERTIFY)
def test_create_locate_payload(self):
self._test_not_implemented(
self.factory.create, Operation.LOCATE)
def test_create_check_payload(self):
self._test_not_implemented(
self.factory.create, Operation.CHECK)
def test_create_get_payload(self):
self._test_not_implemented(
self.factory.create, Operation.GET)
def test_create_get_attributes_payload(self):
self._test_not_implemented(
self.factory.create, Operation.GET_ATTRIBUTES)
def test_create_get_attributes_list_payload(self):
self._test_not_implemented(
self.factory.create, Operation.GET_ATTRIBUTE_LIST)
def test_create_add_attribute_payload(self):
self._test_not_implemented(
self.factory.create, Operation.ADD_ATTRIBUTE)
def test_create_modify_attribute_payload(self):
self._test_not_implemented(
self.factory.create, Operation.MODIFY_ATTRIBUTE)
def test_create_delete_attribute_payload(self):
self._test_not_implemented(
self.factory.create, Operation.DELETE_ATTRIBUTE)
def test_create_obtain_lease_payload(self):
self._test_not_implemented(
self.factory.create, Operation.OBTAIN_LEASE)
def test_create_get_usage_allocation_payload(self):
self._test_not_implemented(
self.factory.create, Operation.GET_USAGE_ALLOCATION)
def test_create_activate_payload(self):
self._test_not_implemented(
self.factory.create, Operation.ACTIVATE)
def test_create_revoke_payload(self):
self._test_not_implemented(
self.factory.create, Operation.REVOKE)
def test_create_destroy_payload(self):
self._test_not_implemented(
self.factory.create, Operation.DESTROY)
def test_create_archive_payload(self):
self._test_not_implemented(
self.factory.create, Operation.ARCHIVE)
def test_create_recover_payload(self):
self._test_not_implemented(
self.factory.create, Operation.RECOVER)
def test_create_validate_payload(self):
self._test_not_implemented(
self.factory.create, Operation.VALIDATE)
def test_create_query_payload(self):
self._test_not_implemented(
self.factory.create, Operation.QUERY)
def test_create_cancel_payload(self):
self._test_not_implemented(
self.factory.create, Operation.CANCEL)
def test_create_poll_payload(self):
self._test_not_implemented(
self.factory.create, Operation.POLL)
def test_create_notify_payload(self):
self._test_not_implemented(
self.factory.create, Operation.NOTIFY)
def test_create_put_payload(self):
self._test_not_implemented(
self.factory.create, Operation.PUT)
def test_create_rekey_key_pair_payload(self):
self._test_not_implemented(
self.factory.create, Operation.REKEY_KEY_PAIR)
def test_create_discover_versions_payload(self):
self._test_not_implemented(
self.factory.create, Operation.DISCOVER_VERSIONS)
def test_invalid_operation(self):
self.assertRaises(ValueError, self.factory.create, None)

View File

@ -0,0 +1,162 @@
# Copyright (c) 2014 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from kmip.core.enums import Operation
from kmip.core.factories.payloads.request import RequestPayloadFactory
from kmip.core.messages.payloads import create
from kmip.core.messages.payloads import destroy
from kmip.core.messages.payloads import get
from kmip.core.messages.payloads import locate
from kmip.core.messages.payloads import register
class TestPayloadFactory(testtools.TestCase):
def setUp(self):
super(TestPayloadFactory, self).setUp()
self.factory = RequestPayloadFactory()
def tearDown(self):
super(TestPayloadFactory, self).tearDown()
def _test_not_implemented(self, func, args):
self.assertRaises(NotImplementedError, func, args)
def _test_payload_type(self, payload, payload_type):
msg = "expected {0}, received {1}".format(payload_type, payload)
self.assertIsInstance(payload, payload_type, msg)
def test_create_create_payload(self):
payload = self.factory.create(Operation.CREATE)
self._test_payload_type(payload, create.CreateRequestPayload)
def test_create_create_key_pair_payload(self):
self._test_not_implemented(
self.factory.create, Operation.CREATE_KEY_PAIR)
def test_create_register_payload(self):
payload = self.factory.create(Operation.REGISTER)
self._test_payload_type(payload, register.RegisterRequestPayload)
def test_create_rekey_payload(self):
self._test_not_implemented(
self.factory.create, Operation.REKEY)
def test_create_derive_key_payload(self):
self._test_not_implemented(
self.factory.create, Operation.DERIVE_KEY)
def test_create_certify_payload(self):
self._test_not_implemented(
self.factory.create, Operation.CERTIFY)
def test_create_recertify_payload(self):
self._test_not_implemented(
self.factory.create, Operation.RECERTIFY)
def test_create_locate_payload(self):
payload = self.factory.create(Operation.LOCATE)
self._test_payload_type(payload, locate.LocateRequestPayload)
def test_create_check_payload(self):
self._test_not_implemented(
self.factory.create, Operation.CHECK)
def test_create_get_payload(self):
payload = self.factory.create(Operation.GET)
self._test_payload_type(payload, get.GetRequestPayload)
def test_create_get_attributes_payload(self):
self._test_not_implemented(
self.factory.create, Operation.GET_ATTRIBUTES)
def test_create_get_attributes_list_payload(self):
self._test_not_implemented(
self.factory.create, Operation.GET_ATTRIBUTE_LIST)
def test_create_add_attribute_payload(self):
self._test_not_implemented(
self.factory.create, Operation.ADD_ATTRIBUTE)
def test_create_modify_attribute_payload(self):
self._test_not_implemented(
self.factory.create, Operation.MODIFY_ATTRIBUTE)
def test_create_delete_attribute_payload(self):
self._test_not_implemented(
self.factory.create, Operation.DELETE_ATTRIBUTE)
def test_create_obtain_lease_payload(self):
self._test_not_implemented(
self.factory.create, Operation.OBTAIN_LEASE)
def test_create_get_usage_allocation_payload(self):
self._test_not_implemented(
self.factory.create, Operation.GET_USAGE_ALLOCATION)
def test_create_activate_payload(self):
self._test_not_implemented(
self.factory.create, Operation.ACTIVATE)
def test_create_revoke_payload(self):
self._test_not_implemented(
self.factory.create, Operation.REVOKE)
def test_create_destroy_payload(self):
payload = self.factory.create(Operation.DESTROY)
self._test_payload_type(payload, destroy.DestroyRequestPayload)
def test_create_archive_payload(self):
self._test_not_implemented(
self.factory.create, Operation.ARCHIVE)
def test_create_recover_payload(self):
self._test_not_implemented(
self.factory.create, Operation.RECOVER)
def test_create_validate_payload(self):
self._test_not_implemented(
self.factory.create, Operation.VALIDATE)
def test_create_query_payload(self):
self._test_not_implemented(
self.factory.create, Operation.QUERY)
def test_create_cancel_payload(self):
self._test_not_implemented(
self.factory.create, Operation.CANCEL)
def test_create_poll_payload(self):
self._test_not_implemented(
self.factory.create, Operation.POLL)
def test_create_notify_payload(self):
self._test_not_implemented(
self.factory.create, Operation.NOTIFY)
def test_create_put_payload(self):
self._test_not_implemented(
self.factory.create, Operation.PUT)
def test_create_rekey_key_pair_payload(self):
self._test_not_implemented(
self.factory.create, Operation.REKEY_KEY_PAIR)
def test_create_discover_versions_payload(self):
self._test_not_implemented(
self.factory.create, Operation.DISCOVER_VERSIONS)

View File

@ -0,0 +1,162 @@
# Copyright (c) 2014 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from kmip.core.enums import Operation
from kmip.core.factories.payloads.response import ResponsePayloadFactory
from kmip.core.messages.payloads import create
from kmip.core.messages.payloads import destroy
from kmip.core.messages.payloads import get
from kmip.core.messages.payloads import locate
from kmip.core.messages.payloads import register
class TestPayloadFactory(testtools.TestCase):
def setUp(self):
super(TestPayloadFactory, self).setUp()
self.factory = ResponsePayloadFactory()
def tearDown(self):
super(TestPayloadFactory, self).tearDown()
def _test_not_implemented(self, func, args):
self.assertRaises(NotImplementedError, func, args)
def _test_payload_type(self, payload, payload_type):
msg = "expected {0}, received {1}".format(payload_type, payload)
self.assertIsInstance(payload, payload_type, msg)
def test_create_create_payload(self):
payload = self.factory.create(Operation.CREATE)
self._test_payload_type(payload, create.CreateResponsePayload)
def test_create_create_key_pair_payload(self):
self._test_not_implemented(
self.factory.create, Operation.CREATE_KEY_PAIR)
def test_create_register_payload(self):
payload = self.factory.create(Operation.REGISTER)
self._test_payload_type(payload, register.RegisterResponsePayload)
def test_create_rekey_payload(self):
self._test_not_implemented(
self.factory.create, Operation.REKEY)
def test_create_derive_key_payload(self):
self._test_not_implemented(
self.factory.create, Operation.DERIVE_KEY)
def test_create_certify_payload(self):
self._test_not_implemented(
self.factory.create, Operation.CERTIFY)
def test_create_recertify_payload(self):
self._test_not_implemented(
self.factory.create, Operation.RECERTIFY)
def test_create_locate_payload(self):
payload = self.factory.create(Operation.LOCATE)
self._test_payload_type(payload, locate.LocateResponsePayload)
def test_create_check_payload(self):
self._test_not_implemented(
self.factory.create, Operation.CHECK)
def test_create_get_payload(self):
payload = self.factory.create(Operation.GET)
self._test_payload_type(payload, get.GetResponsePayload)
def test_create_get_attributes_payload(self):
self._test_not_implemented(
self.factory.create, Operation.GET_ATTRIBUTES)
def test_create_get_attributes_list_payload(self):
self._test_not_implemented(
self.factory.create, Operation.GET_ATTRIBUTE_LIST)
def test_create_add_attribute_payload(self):
self._test_not_implemented(
self.factory.create, Operation.ADD_ATTRIBUTE)
def test_create_modify_attribute_payload(self):
self._test_not_implemented(
self.factory.create, Operation.MODIFY_ATTRIBUTE)
def test_create_delete_attribute_payload(self):
self._test_not_implemented(
self.factory.create, Operation.DELETE_ATTRIBUTE)
def test_create_obtain_lease_payload(self):
self._test_not_implemented(
self.factory.create, Operation.OBTAIN_LEASE)
def test_create_get_usage_allocation_payload(self):
self._test_not_implemented(
self.factory.create, Operation.GET_USAGE_ALLOCATION)
def test_create_activate_payload(self):
self._test_not_implemented(
self.factory.create, Operation.ACTIVATE)
def test_create_revoke_payload(self):
self._test_not_implemented(
self.factory.create, Operation.REVOKE)
def test_create_destroy_payload(self):
payload = self.factory.create(Operation.DESTROY)
self._test_payload_type(payload, destroy.DestroyResponsePayload)
def test_create_archive_payload(self):
self._test_not_implemented(
self.factory.create, Operation.ARCHIVE)
def test_create_recover_payload(self):
self._test_not_implemented(
self.factory.create, Operation.RECOVER)
def test_create_validate_payload(self):
self._test_not_implemented(
self.factory.create, Operation.VALIDATE)
def test_create_query_payload(self):
self._test_not_implemented(
self.factory.create, Operation.QUERY)
def test_create_cancel_payload(self):
self._test_not_implemented(
self.factory.create, Operation.CANCEL)
def test_create_poll_payload(self):
self._test_not_implemented(
self.factory.create, Operation.POLL)
def test_create_notify_payload(self):
self._test_not_implemented(
self.factory.create, Operation.NOTIFY)
def test_create_put_payload(self):
self._test_not_implemented(
self.factory.create, Operation.PUT)
def test_create_rekey_key_pair_payload(self):
self._test_not_implemented(
self.factory.create, Operation.REKEY_KEY_PAIR)
def test_create_discover_versions_payload(self):
self._test_not_implemented(
self.factory.create, Operation.DISCOVER_VERSIONS)