From 7baa31ea510b518d7632e95006b156f42c11224c Mon Sep 17 00:00:00 2001 From: Peter Hamilton Date: Mon, 27 Jul 2015 13:09:45 -0400 Subject: [PATCH] Adding the Pie ProxyKmipClient This change adds the Pie ProxyKmipClient, which provides a simplified version of the original KMIP interface and which wraps the original client. The ProxyKmipClient supports basic CRUD operations for symmetric, public, and private keys. A unit test suite for the client is included. --- kmip/pie/client.py | 366 +++++++++++++++ kmip/pie/exceptions.py | 50 +++ kmip/tests/unit/pie/test_client.py | 599 +++++++++++++++++++++++++ kmip/tests/unit/pie/test_exceptions.py | 106 +++++ 4 files changed, 1121 insertions(+) create mode 100644 kmip/pie/client.py create mode 100644 kmip/pie/exceptions.py create mode 100644 kmip/tests/unit/pie/test_client.py create mode 100644 kmip/tests/unit/pie/test_exceptions.py diff --git a/kmip/pie/client.py b/kmip/pie/client.py new file mode 100644 index 0000000..0c8bb00 --- /dev/null +++ b/kmip/pie/client.py @@ -0,0 +1,366 @@ +# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import six + +from kmip.core import enums +from kmip.core import objects as cobjects + +from kmip.core.factories import attributes + +from kmip.pie import api +from kmip.pie import exceptions +from kmip.pie import factory +from kmip.pie import objects as pobjects + +from kmip.services.kmip_client import KMIPProxy + + +class ProxyKmipClient(api.KmipClient): + """ + A simplified KMIP client for conducting KMIP operations. + + The ProxyKmipClient is a simpler KMIP client supporting various KMIP + operations. It wraps the original KMIPProxy, reducing the boilerplate + needed to deploy PyKMIP in client applications. The underlying proxy + client is responsible for setting up the underlying socket connection + and for writing/reading data to/from the socket. + + Like the KMIPProxy, the ProxyKmipClient is not thread-safe. + """ + def __init__(self, + hostname=None, + port=None, + cert=None, + key=None, + ca=None, + ssl_version=None, + username=None, + password=None, + config='client'): + """ + Construct a ProxyKmipClient. + + Args: + hostname (string): The host or IP address of a KMIP appliance. + Optional, defaults to None. + port (int): The port number used to establish a connection to a + KMIP appliance. Usually 5696 for KMIP applications. Optional, + defaults to None. + cert (string): The path to the client's certificate. Optional, + defaults to None. + key (string): The path to the key for the client's certificate. + Optional, defaults to None. + ca (string): The path to the CA certificate used to verify the + server's certificate. Optional, defaults to None. + ssl_version (string): The name of the ssl version to use for the + connection. Example: 'PROTOCOL_SSLv23'. Optional, defaults to + None. + username (string): The username of the KMIP appliance account to + use for operations. Optional, defaults to None. + password (string): The password of the KMIP appliance account to + use for operations. Optional, defaults to None. + config (string): The name of a section in the PyKMIP configuration + file. Use to load a specific set of configuration settings from + the configuration file, instead of specifying them manually. + Optional, defaults to the default client section, 'client'. + """ + self.logger = logging.getLogger() + + self.attribute_factory = attributes.AttributeFactory() + self.object_factory = factory.ObjectFactory() + + # TODO (peter-hamilton) Consider adding validation checks for inputs. + self.proxy = KMIPProxy( + host=hostname, + port=port, + certfile=cert, + keyfile=key, + ca_certs=ca, + ssl_version=ssl_version, + username=username, + password=password, + config=config) + + # TODO (peter-hamilton) Add a multiprocessing lock for synchronization. + self._is_open = False + + def open(self): + """ + Open the client connection. + + Raises: + ClientConnectionFailure: if the client connection is already open + Exception: if an error occurs while trying to open the connection + """ + if self._is_open: + raise exceptions.ClientConnectionFailure( + "client connection already open") + else: + try: + self.proxy.open() + self._is_open = True + except Exception as e: + self.logger.exception("could not open client connection", e) + raise e + + def close(self): + """ + Close the client connection. + + Raises: + ClientConnectionNotOpen: if the client connection is not open + Exception: if an error occurs while trying to close the connection + """ + if not self._is_open: + raise exceptions.ClientConnectionNotOpen() + else: + try: + self.proxy.close() + self._is_open = False + except Exception as e: + self.logger.exception("could not close client connection", e) + raise e + + def create(self, algorithm, length): + """ + Create a symmetric key on a KMIP appliance. + + Args: + algorithm (CryptographicAlgorithm): An enumeration defining the + algorithm to use to generate the symmetric key. + length (int): The length in bits for the symmetric key. + + Returns: + string: The uid of the newly created symmetric key. + + Raises: + ClientConnectionNotOpen: if the client connection is unusable + KmipOperationFailure: if the operation result is a failure + TypeError: if the input arguments are invalid + """ + # Check inputs + if not isinstance(algorithm, enums.CryptographicAlgorithm): + raise TypeError( + "algorithm must be a CryptographicAlgorithm enumeration") + elif not isinstance(length, six.integer_types) or length <= 0: + raise TypeError("length must be a positive integer") + + # Verify that operations can be given at this time + if not self._is_open: + raise exceptions.ClientConnectionNotOpen() + + # Create the template containing the attributes + attributes = self._build_key_attributes(algorithm, length) + template = cobjects.TemplateAttribute(attributes=attributes) + + # Create the symmetric key and handle the results + result = self.proxy.create(enums.ObjectType.SYMMETRIC_KEY, template) + + status = result.result_status.enum + if status == enums.ResultStatus.SUCCESS: + uid = result.uuid.value + return uid + else: + reason = result.result_reason.enum + message = result.result_message.value + raise exceptions.KmipOperationFailure(status, reason, message) + + def create_key_pair(self, algorithm, length): + """ + Create an asymmetric key pair on a KMIP appliance. + + Args: + algorithm (CryptographicAlgorithm): An enumeration defining the + algorithm to use to generate the key pair. + length (int): The length in bits for the key pair. + + Returns: + string: The uid of the newly created public key. + string: The uid of the newly created private key. + + Raises: + ClientConnectionNotOpen: if the client connection is unusable + KmipOperationFailure: if the operation result is a failure + TypeError: if the input arguments are invalid + """ + # Check inputs + if not isinstance(algorithm, enums.CryptographicAlgorithm): + raise TypeError( + "algorithm must be a CryptographicAlgorithm enumeration") + elif not isinstance(length, six.integer_types) or length <= 0: + raise TypeError("length must be a positive integer") + + # Verify that operations can be given at this time + if not self._is_open: + raise exceptions.ClientConnectionNotOpen() + + # Create the template containing the attributes + attributes = self._build_key_attributes(algorithm, length) + template = cobjects.CommonTemplateAttribute(attributes=attributes) + + # Create the asymmetric key pair and handle the results + result = self.proxy.create_key_pair(common_template_attribute=template) + + status = result.result_status.enum + if status == enums.ResultStatus.SUCCESS: + public_uid = result.public_key_uuid.value + private_uid = result.private_key_uuid.value + return public_uid, private_uid + else: + reason = result.result_reason.enum + message = result.result_message.value + raise exceptions.KmipOperationFailure(status, reason, message) + + def register(self, managed_object): + """ + Register a managed object with a KMIP appliance. + + Args: + managed_object (ManagedObject): A managed object to register. An + instantiatable subclass of ManagedObject from the Pie API. + + Returns: + string: The uid of the newly registered managed object. + + Raises: + ClientConnectionNotOpen: if the client connection is unusable + KmipOperationFailure: if the operation result is a failure + TypeError: if the input argument is invalid + """ + # Check input + if not isinstance(managed_object, pobjects.ManagedObject): + raise TypeError("managed object must be a Pie ManagedObject") + + # Verify that operations can be given at this time + if not self._is_open: + raise exceptions.ClientConnectionNotOpen() + + # Extract and create attributes + attributes = list() + if hasattr(managed_object, 'cryptographic_usage_masks'): + mask_attribute = self.attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + managed_object.cryptographic_usage_masks) + + attributes.append(mask_attribute) + + template = cobjects.TemplateAttribute(attributes=attributes) + object_type = managed_object.object_type + + # Register the managed object and handle the results + secret = self.object_factory.convert(managed_object) + result = self.proxy.register(object_type, template, secret) + + status = result.result_status.enum + if status == enums.ResultStatus.SUCCESS: + uid = result.uuid.value + return uid + else: + reason = result.result_reason.enum + message = result.result_message.value + raise exceptions.KmipOperationFailure(status, reason, message) + + def get(self, uid): + """ + Get a managed object from a KMIP appliance. + + Args: + uid (string): The unique ID of the managed object to retrieve. + + Returns: + ManagedObject: The retrieved managed object object. + + Raises: + ClientConnectionNotOpen: if the client connection is unusable + KmipOperationFailure: if the operation result is a failure + TypeError: if the input argument is invalid + """ + # Check input + if not isinstance(uid, six.string_types): + raise TypeError("uid must be a string") + + # Verify that operations can be given at this time + if not self._is_open: + raise exceptions.ClientConnectionNotOpen() + + # Get the managed object and handle the results + result = self.proxy.get(uid) + + status = result.result_status.enum + if status == enums.ResultStatus.SUCCESS: + managed_object = self.object_factory.convert(result.secret) + return managed_object + else: + reason = result.result_reason.enum + message = result.result_message.value + raise exceptions.KmipOperationFailure(status, reason, message) + + def destroy(self, uid): + """ + Destroy a managed object stored by a KMIP appliance. + + Args: + uid (string): The unique ID of the managed object to destroy. + + Returns: + None + + Raises: + ClientConnectionNotOpen: if the client connection is unusable + KmipOperationFailure: if the operation result is a failure + TypeError: if the input argument is invalid + """ + # Check input + if not isinstance(uid, six.string_types): + raise TypeError("uid must be a string") + + # Verify that operations can be given at this time + if not self._is_open: + raise exceptions.ClientConnectionNotOpen() + + # Destroy the managed object and handle the results + result = self.proxy.destroy(uid) + + status = result.result_status.enum + if status == enums.ResultStatus.SUCCESS: + return + else: + reason = result.result_reason.enum + message = result.result_message.value + raise exceptions.KmipOperationFailure(status, reason, message) + + def _build_key_attributes(self, algorithm, length): + # Build a list of core key attributes. + algorithm_attribute = self.attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, + algorithm) + length_attribute = self.attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_LENGTH, + length) + mask_attribute = self.attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.DECRYPT]) + + return [algorithm_attribute, length_attribute, mask_attribute] + + def __enter__(self): + self.open() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() diff --git a/kmip/pie/exceptions.py b/kmip/pie/exceptions.py new file mode 100644 index 0000000..3516047 --- /dev/null +++ b/kmip/pie/exceptions.py @@ -0,0 +1,50 @@ +# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +class ClientConnectionFailure(Exception): + """ + An exception raised for errors with the client socket connection. + """ + pass + + +class ClientConnectionNotOpen(Exception): + """ + An exception raised when operations are issued to a closed connection. + """ + def __init__(self): + """ + Construct the closed client connection error message. + """ + super(ClientConnectionNotOpen, self).__init__( + "client connection not open") + + +class KmipOperationFailure(Exception): + """ + An exception raised upon the failure of a KMIP appliance operation. + """ + def __init__(self, status, reason, message): + """ + Construct the error message for the KMIP operation failure. + + Args: + status: a ResultStatus enumeration + reason: a ResultReason enumeration + message: a string providing additional error information + """ + msg = "{0}: {1} - {2}".format(status.name, reason.name, message) + super(KmipOperationFailure, self).__init__(msg) diff --git a/kmip/tests/unit/pie/test_client.py b/kmip/tests/unit/pie/test_client.py new file mode 100644 index 0000000..92fce56 --- /dev/null +++ b/kmip/tests/unit/pie/test_client.py @@ -0,0 +1,599 @@ +# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +import six +import ssl +import testtools + +from kmip.core import attributes as attr +from kmip.core import enums +from kmip.core import objects as obj + +from kmip.core.factories import attributes +from kmip.core.messages import contents + +from kmip.services.kmip_client import KMIPProxy +from kmip.services import results + +from kmip.pie.client import ProxyKmipClient + +from kmip.pie.exceptions import ClientConnectionFailure +from kmip.pie.exceptions import ClientConnectionNotOpen +from kmip.pie.exceptions import KmipOperationFailure + +from kmip.pie import factory +from kmip.pie import objects + + +class TestProxyKmipClient(testtools.TestCase): + """ + Test suite for the ProxyKmipClient. + """ + + def setUp(self): + super(TestProxyKmipClient, self).setUp() + self.attribute_factory = attributes.AttributeFactory() + + def tearDown(self): + super(TestProxyKmipClient, self).tearDown() + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_init(self): + """ + Test that a ProxyKmipClient can be constructed with valid arguments. + """ + ProxyKmipClient( + hostname='127.0.0.1', + port=5696, + cert='/example/path/to/cert', + key='/example/path/to/key', + ca='/example/path/to/ca', + ssl_version=ssl.PROTOCOL_TLSv1, + username='username', + password='password', + config='test') + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_open(self): + """ + Test that the client can open a connection. + """ + client = ProxyKmipClient() + client.open() + client.proxy.open.assert_called_with() + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_open_on_open(self): + """ + Test that a ClientConnectionFailure exception is raised when trying + to open an opened client connection. + """ + client = ProxyKmipClient() + client.open() + self.assertRaises(ClientConnectionFailure, client.open) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_open_on_proxy_failure(self): + """ + Test that an Exception is raised when an error occurs while opening + the client proxy connection. + """ + client = ProxyKmipClient() + client.proxy.open.side_effect = Exception + self.assertRaises(Exception, client.open) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_close(self): + """ + Test that the client can close an open connection. + """ + client = ProxyKmipClient() + client.open() + client.close() + client.proxy.close.assert_called_with() + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_close_on_close(self): + """ + Test that a ClientConnectionNotOpen exception is raised when trying + to close a closed client connection. + """ + client = ProxyKmipClient() + self.assertRaises(ClientConnectionNotOpen, client.close) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_close_on_proxy_failure(self): + """ + Test that an Exception is raised when an error occurs while closing + the client proxy connection. + """ + client = ProxyKmipClient() + client._is_open = True + client.proxy.close.side_effect = Exception + self.assertRaises(Exception, client.close) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_enter(self): + """ + Test the result and effect of the enter method for the context + manager. + """ + client = ProxyKmipClient() + + self.assertFalse(client._is_open) + result = client.__enter__() + self.assertEqual(result, client) + self.assertTrue(client._is_open) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_exit(self): + """ + Test the result and effect of the exit method for the context + manager. + """ + client = ProxyKmipClient() + client.__enter__() + + self.assertTrue(client._is_open) + client.__exit__(None, None, None) + self.assertFalse(client._is_open) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_context_manager(self): + """ + Test that the KmipClient can be used by the with-statement as a + context manager. + """ + with ProxyKmipClient() as client: + self.assertTrue(client._is_open) + client.proxy.open.assert_called_with() + self.assertFalse(client._is_open) + client.proxy.close.assert_called_with() + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_create(self): + """ + Test that a symmetric key can be created with proper inputs and that + its UID is returned properly. + """ + # Create the template to test the create call + algorithm = enums.CryptographicAlgorithm.AES + length = 256 + algorithm_attribute = self.attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, algorithm) + length_attribute = self.attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_LENGTH, length) + mask_attribute = self.attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.DECRYPT]) + + attributes = [algorithm_attribute, length_attribute, mask_attribute] + template = obj.TemplateAttribute(attributes=attributes) + + key_id = 'aaaaaaaa-1111-2222-3333-ffffffffffff' + status = enums.ResultStatus.SUCCESS + result = results.CreateResult( + contents.ResultStatus(status), + uuid=attr.UniqueIdentifier(key_id)) + + with ProxyKmipClient() as client: + client.proxy.create.return_value = result + + uid = client.create(algorithm, length) + client.proxy.create.assert_called_with( + enums.ObjectType.SYMMETRIC_KEY, template) + self.assertIsInstance(uid, six.string_types) + self.assertEqual(uid, key_id) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_create_on_invalid_algorithm(self): + """ + Test that a TypeError exception is raised when trying to create a + symmetric key with an invalid algorithm. + """ + args = ['invalid', 256] + with ProxyKmipClient() as client: + self.assertRaises(TypeError, client.create, *args) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_create_on_invalid_length(self): + """ + Test that a TypeError exception is raised when trying to create a + symmetric key with an invalid length. + """ + args = [enums.CryptographicAlgorithm.AES, 'invalid'] + with ProxyKmipClient() as client: + self.assertRaises(TypeError, client.create, *args) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_create_on_closed(self): + """ + Test that a ClientConnectionNotOpen exception is raised when trying + to create a symmetric key on an unopened client connection. + """ + client = ProxyKmipClient() + args = [enums.CryptographicAlgorithm.AES, 256] + self.assertRaises( + ClientConnectionNotOpen, client.create, *args) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_create_on_operation_failure(self): + """ + Test that a KmipOperationFailure exception is raised when the + the backend fails to create a symmetric key. + """ + status = enums.ResultStatus.OPERATION_FAILED + reason = enums.ResultReason.GENERAL_FAILURE + message = "Test failure message" + + result = results.OperationResult( + contents.ResultStatus(status), + contents.ResultReason(reason), + contents.ResultMessage(message)) + error_msg = str(KmipOperationFailure(status, reason, message)) + + client = ProxyKmipClient() + client.open() + client.proxy.create.return_value = result + args = [enums.CryptographicAlgorithm.AES, 256] + + self.assertRaisesRegexp( + KmipOperationFailure, error_msg, client.create, *args) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_create_key_pair(self): + """ + Test that an asymmetric key pair can be created with proper inputs + and that the UIDs of the public and private keys are returned + properly. + """ + # Create the template to test the create key pair call + algorithm = enums.CryptographicAlgorithm.RSA + length = 2048 + algorithm_attribute = self.attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, algorithm) + length_attribute = self.attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_LENGTH, length) + mask_attribute = self.attribute_factory.create_attribute( + enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK, + [enums.CryptographicUsageMask.ENCRYPT, + enums.CryptographicUsageMask.DECRYPT]) + + attributes = [algorithm_attribute, length_attribute, mask_attribute] + template = obj.CommonTemplateAttribute(attributes=attributes) + + status = enums.ResultStatus.SUCCESS + result = results.CreateKeyPairResult( + contents.ResultStatus(status), + public_key_uuid=attr.PublicKeyUniqueIdentifier( + 'aaaaaaaa-1111-2222-3333-ffffffffffff'), + private_key_uuid=attr.PrivateKeyUniqueIdentifier( + 'ffffffff-3333-2222-1111-aaaaaaaaaaaa')) + + with ProxyKmipClient() as client: + client.proxy.create_key_pair.return_value = result + + public_uid, private_uid = client.create_key_pair( + enums.CryptographicAlgorithm.RSA, 2048) + + kwargs = {'common_template_attribute': template} + client.proxy.create_key_pair.assert_called_with(**kwargs) + self.assertIsInstance(public_uid, six.string_types) + self.assertIsInstance(private_uid, six.string_types) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_create_key_pair_on_invalid_algorithm(self): + """ + Test that a TypeError exception is raised when trying to create an + asymmetric key pair with an invalid algorithm. + """ + args = ['invalid', 256] + with ProxyKmipClient() as client: + self.assertRaises( + TypeError, client.create_key_pair, *args) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_create_key_pair_on_invalid_length(self): + """ + Test that a TypeError exception is raised when trying to create an + asymmetric key pair with an invalid length. + """ + args = [enums.CryptographicAlgorithm.AES, 'invalid'] + with ProxyKmipClient() as client: + self.assertRaises( + TypeError, client.create_key_pair, *args) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_create_key_pair_on_closed(self): + """ + Test that a ClientConnectionNotOpen exception is raised when trying + to create an asymmetric key pair on an unopened client connection. + """ + client = ProxyKmipClient() + args = [enums.CryptographicAlgorithm.RSA, 2048] + self.assertRaises( + ClientConnectionNotOpen, client.create_key_pair, *args) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_create_key_pair_on_operation_failure(self): + """ + Test that a KmipOperationFailure exception is raised when the + backend fails to create an asymmetric key pair. + """ + status = enums.ResultStatus.OPERATION_FAILED + reason = enums.ResultReason.GENERAL_FAILURE + message = "Test failure message" + + result = results.OperationResult( + contents.ResultStatus(status), + contents.ResultReason(reason), + contents.ResultMessage(message)) + error_msg = str(KmipOperationFailure(status, reason, message)) + + client = ProxyKmipClient() + client.open() + client.proxy.create_key_pair.return_value = result + args = [enums.CryptographicAlgorithm.RSA, 2048] + + self.assertRaisesRegexp( + KmipOperationFailure, error_msg, + client.create_key_pair, *args) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_get(self): + """ + Test that a secret can be retrieved with proper input. + """ + # Key encoding obtained from Section 14.2 of the KMIP 1.1 test + # documentation. + secret = objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 128, + (b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B\x0C\x0D\x0E' + b'\x0F')) + fact = factory.ObjectFactory() + + result = results.GetResult( + contents.ResultStatus(enums.ResultStatus.SUCCESS), + uuid=attr.PublicKeyUniqueIdentifier( + 'aaaaaaaa-1111-2222-3333-ffffffffffff'), + secret=fact.convert(secret)) + + with ProxyKmipClient() as client: + client.proxy.get.return_value = result + + result = client.get('aaaaaaaa-1111-2222-3333-ffffffffffff') + client.proxy.get.assert_called_with( + 'aaaaaaaa-1111-2222-3333-ffffffffffff') + self.assertIsInstance(result, objects.SymmetricKey) + self.assertEqual(result, secret) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_get_on_invalid_uid(self): + """ + Test that a TypeError exception is raised when trying to retrieve a + secret with an invalid ID. + """ + args = [0] + with ProxyKmipClient() as client: + self.assertRaises(TypeError, client.get, *args) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_get_on_closed(self): + """ + Test that a ClientConnectionNotOpen exception is raised when trying + to retrieve a secret on an unopened client connection. + """ + client = ProxyKmipClient() + args = ['aaaaaaaa-1111-2222-3333-ffffffffffff'] + self.assertRaises(ClientConnectionNotOpen, client.get, *args) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_get_on_operation_failure(self): + """ + Test that a KmipOperationFailure exception is raised when the + backend fails to retrieve a secret. + """ + status = enums.ResultStatus.OPERATION_FAILED + reason = enums.ResultReason.GENERAL_FAILURE + message = "Test failure message" + + result = results.OperationResult( + contents.ResultStatus(status), + contents.ResultReason(reason), + contents.ResultMessage(message)) + error_msg = str(KmipOperationFailure(status, reason, message)) + + client = ProxyKmipClient() + client.open() + client.proxy.get.return_value = result + args = ['id'] + + self.assertRaisesRegexp( + KmipOperationFailure, error_msg, client.get, *args) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_destroy(self): + """ + Test that the client can destroy a secret. + """ + status = enums.ResultStatus.SUCCESS + result = results.OperationResult(contents.ResultStatus(status)) + + with ProxyKmipClient() as client: + client.proxy.destroy.return_value = result + result = client.destroy( + 'aaaaaaaa-1111-2222-3333-ffffffffffff') + client.proxy.destroy.assert_called_with( + 'aaaaaaaa-1111-2222-3333-ffffffffffff') + self.assertEqual(None, result) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_destroy_on_invalid_uid(self): + """ + Test that a TypeError exception is raised when trying to destroy a + secret with an invalid ID. + """ + args = [0] + with ProxyKmipClient() as client: + self.assertRaises(TypeError, client.destroy, *args) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_destroy_on_closed(self): + """ + Test that a ClientConnectionNotOpen exception is raised when trying + to destroy a secret on an unopened client connection. + """ + client = ProxyKmipClient() + args = ['aaaaaaaa-1111-2222-3333-ffffffffffff'] + self.assertRaises( + ClientConnectionNotOpen, client.destroy, *args) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_destroy_on_operation_failure(self): + """ + Test that a KmipOperationFailure exception is raised when the + backend fails to destroy a secret. + """ + status = enums.ResultStatus.OPERATION_FAILED + reason = enums.ResultReason.GENERAL_FAILURE + message = "Test failure message" + + result = results.OperationResult( + contents.ResultStatus(status), + contents.ResultReason(reason), + contents.ResultMessage(message)) + error_msg = str(KmipOperationFailure(status, reason, message)) + + client = ProxyKmipClient() + client.open() + client.proxy.destroy.return_value = result + args = ['id'] + + self.assertRaisesRegexp( + KmipOperationFailure, error_msg, client.destroy, *args) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_register(self): + """ + Test that the client can register a key. + """ + key = objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 128, + (b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B\x0C\x0D\x0E' + b'\x0F')) + + result = results.RegisterResult( + contents.ResultStatus(enums.ResultStatus.SUCCESS), + uuid=attr.PublicKeyUniqueIdentifier( + 'aaaaaaaa-1111-2222-3333-ffffffffffff')) + + with ProxyKmipClient() as client: + client.proxy.register.return_value = result + uid = client.register(key) + self.assertTrue(client.proxy.register.called) + self.assertIsInstance(uid, six.string_types) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_register_on_invalid_uid(self): + """ + Test that a TypeError exception is raised when trying to register a + key with an invalid key object. + """ + args = ['invalid'] + with ProxyKmipClient() as client: + self.assertRaises(TypeError, client.register, *args) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_register_on_closed(self): + """ + Test that a ClientConnectionNotOpen exception is raised when trying + to register a key on an unopened client connection. + """ + key = objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, + 128, + (b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B\x0C\x0D\x0E' + b'\x0F')) + client = ProxyKmipClient() + args = [key] + self.assertRaises(ClientConnectionNotOpen, client.register, *args) + + @mock.patch('kmip.pie.client.KMIPProxy', + mock.MagicMock(spec_set=KMIPProxy)) + def test_register_on_operation_failure(self): + """ + Test that a KmipOperationFailure exception is raised when the + backend fails to register a key. + """ + status = enums.ResultStatus.OPERATION_FAILED + reason = enums.ResultReason.GENERAL_FAILURE + message = "Test failure message" + + result = results.OperationResult( + contents.ResultStatus(status), + contents.ResultReason(reason), + contents.ResultMessage(message)) + error_msg = str(KmipOperationFailure(status, reason, message)) + + # Key encoding obtained from Section 14.2 of the KMIP 1.1 test + # documentation. + key_value = ( + b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B\x0C\x0D\x0E' + b'\x0F') + key = objects.SymmetricKey( + enums.CryptographicAlgorithm.AES, 128, key_value) + + client = ProxyKmipClient() + client.open() + client.proxy.register.return_value = result + args = [key] + + self.assertRaisesRegexp( + KmipOperationFailure, error_msg, client.register, *args) diff --git a/kmip/tests/unit/pie/test_exceptions.py b/kmip/tests/unit/pie/test_exceptions.py new file mode 100644 index 0000000..ea56e26 --- /dev/null +++ b/kmip/tests/unit/pie/test_exceptions.py @@ -0,0 +1,106 @@ +# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from testtools import TestCase + +from kmip.core.enums import ResultStatus +from kmip.core.enums import ResultReason + +from kmip.pie.exceptions import ClientConnectionFailure +from kmip.pie.exceptions import ClientConnectionNotOpen +from kmip.pie.exceptions import KmipOperationFailure + + +class TestClientConnectionFailure(TestCase): + """ + Test suite for ClientConnectionFailure. + """ + + def setUp(self): + super(TestClientConnectionFailure, self).setUp() + + def tearDown(self): + super(TestClientConnectionFailure, self).tearDown() + + def test_init(self): + """ + Test that a ClientConnectionFailure exception can be instantiated. + """ + exc = ClientConnectionFailure() + self.assertIsInstance(exc, Exception) + + def test_message(self): + """ + Test that a ClientConnectionFailure exception message can be set + properly. + """ + exc = ClientConnectionFailure("test message") + self.assertEqual("test message", str(exc)) + + +class TestClientConnectionNotOpen(TestCase): + """ + Test suite for ClientConnectionNotOpen. + """ + + def setUp(self): + super(TestClientConnectionNotOpen, self).setUp() + + def tearDown(self): + super(TestClientConnectionNotOpen, self).tearDown() + + def test_init(self): + """ + Test that a ClientConnectionNotOpen exception can be instantiated. + """ + exc = ClientConnectionNotOpen() + self.assertIsInstance(exc, Exception) + self.assertEqual("client connection not open", str(exc)) + + +class TestKmipOperationFailure(TestCase): + """ + Test suite for KmipOperationFailure. + """ + + def setUp(self): + super(TestKmipOperationFailure, self).setUp() + + def tearDown(self): + super(TestKmipOperationFailure, self).tearDown() + + def test_init(self): + """ + Test that a KmipOperationFailure exception can be instantiated. + """ + exc = KmipOperationFailure( + ResultStatus.OPERATION_FAILED, + ResultReason.GENERAL_FAILURE, + "Test error message.") + self.assertIsInstance(exc, Exception) + + def test_message(self): + """ + Test that a KmipOperationFailure exception message can be set + properly. + """ + status = ResultStatus.OPERATION_FAILED + reason = ResultReason.GENERAL_FAILURE + exc = KmipOperationFailure(status, reason, "Test error message.") + + msg = "{0}: {1} - {2}".format( + status.name, reason.name, "Test error message.") + + self.assertEqual(msg, str(exc))