mirror of https://github.com/OpenKMIP/PyKMIP.git
Adding the Pie ProxyKmipClient
This change adds the Pie ProxyKmipClient, which provides a simplified version of the original KMIP interface and which wraps the original client. The ProxyKmipClient supports basic CRUD operations for symmetric, public, and private keys. A unit test suite for the client is included.
This commit is contained in:
parent
66cb329f4a
commit
7baa31ea51
|
@ -0,0 +1,366 @@
|
||||||
|
# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import six
|
||||||
|
|
||||||
|
from kmip.core import enums
|
||||||
|
from kmip.core import objects as cobjects
|
||||||
|
|
||||||
|
from kmip.core.factories import attributes
|
||||||
|
|
||||||
|
from kmip.pie import api
|
||||||
|
from kmip.pie import exceptions
|
||||||
|
from kmip.pie import factory
|
||||||
|
from kmip.pie import objects as pobjects
|
||||||
|
|
||||||
|
from kmip.services.kmip_client import KMIPProxy
|
||||||
|
|
||||||
|
|
||||||
|
class ProxyKmipClient(api.KmipClient):
|
||||||
|
"""
|
||||||
|
A simplified KMIP client for conducting KMIP operations.
|
||||||
|
|
||||||
|
The ProxyKmipClient is a simpler KMIP client supporting various KMIP
|
||||||
|
operations. It wraps the original KMIPProxy, reducing the boilerplate
|
||||||
|
needed to deploy PyKMIP in client applications. The underlying proxy
|
||||||
|
client is responsible for setting up the underlying socket connection
|
||||||
|
and for writing/reading data to/from the socket.
|
||||||
|
|
||||||
|
Like the KMIPProxy, the ProxyKmipClient is not thread-safe.
|
||||||
|
"""
|
||||||
|
def __init__(self,
|
||||||
|
hostname=None,
|
||||||
|
port=None,
|
||||||
|
cert=None,
|
||||||
|
key=None,
|
||||||
|
ca=None,
|
||||||
|
ssl_version=None,
|
||||||
|
username=None,
|
||||||
|
password=None,
|
||||||
|
config='client'):
|
||||||
|
"""
|
||||||
|
Construct a ProxyKmipClient.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
hostname (string): The host or IP address of a KMIP appliance.
|
||||||
|
Optional, defaults to None.
|
||||||
|
port (int): The port number used to establish a connection to a
|
||||||
|
KMIP appliance. Usually 5696 for KMIP applications. Optional,
|
||||||
|
defaults to None.
|
||||||
|
cert (string): The path to the client's certificate. Optional,
|
||||||
|
defaults to None.
|
||||||
|
key (string): The path to the key for the client's certificate.
|
||||||
|
Optional, defaults to None.
|
||||||
|
ca (string): The path to the CA certificate used to verify the
|
||||||
|
server's certificate. Optional, defaults to None.
|
||||||
|
ssl_version (string): The name of the ssl version to use for the
|
||||||
|
connection. Example: 'PROTOCOL_SSLv23'. Optional, defaults to
|
||||||
|
None.
|
||||||
|
username (string): The username of the KMIP appliance account to
|
||||||
|
use for operations. Optional, defaults to None.
|
||||||
|
password (string): The password of the KMIP appliance account to
|
||||||
|
use for operations. Optional, defaults to None.
|
||||||
|
config (string): The name of a section in the PyKMIP configuration
|
||||||
|
file. Use to load a specific set of configuration settings from
|
||||||
|
the configuration file, instead of specifying them manually.
|
||||||
|
Optional, defaults to the default client section, 'client'.
|
||||||
|
"""
|
||||||
|
self.logger = logging.getLogger()
|
||||||
|
|
||||||
|
self.attribute_factory = attributes.AttributeFactory()
|
||||||
|
self.object_factory = factory.ObjectFactory()
|
||||||
|
|
||||||
|
# TODO (peter-hamilton) Consider adding validation checks for inputs.
|
||||||
|
self.proxy = KMIPProxy(
|
||||||
|
host=hostname,
|
||||||
|
port=port,
|
||||||
|
certfile=cert,
|
||||||
|
keyfile=key,
|
||||||
|
ca_certs=ca,
|
||||||
|
ssl_version=ssl_version,
|
||||||
|
username=username,
|
||||||
|
password=password,
|
||||||
|
config=config)
|
||||||
|
|
||||||
|
# TODO (peter-hamilton) Add a multiprocessing lock for synchronization.
|
||||||
|
self._is_open = False
|
||||||
|
|
||||||
|
def open(self):
|
||||||
|
"""
|
||||||
|
Open the client connection.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ClientConnectionFailure: if the client connection is already open
|
||||||
|
Exception: if an error occurs while trying to open the connection
|
||||||
|
"""
|
||||||
|
if self._is_open:
|
||||||
|
raise exceptions.ClientConnectionFailure(
|
||||||
|
"client connection already open")
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
self.proxy.open()
|
||||||
|
self._is_open = True
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.exception("could not open client connection", e)
|
||||||
|
raise e
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
"""
|
||||||
|
Close the client connection.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ClientConnectionNotOpen: if the client connection is not open
|
||||||
|
Exception: if an error occurs while trying to close the connection
|
||||||
|
"""
|
||||||
|
if not self._is_open:
|
||||||
|
raise exceptions.ClientConnectionNotOpen()
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
self.proxy.close()
|
||||||
|
self._is_open = False
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.exception("could not close client connection", e)
|
||||||
|
raise e
|
||||||
|
|
||||||
|
def create(self, algorithm, length):
|
||||||
|
"""
|
||||||
|
Create a symmetric key on a KMIP appliance.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
algorithm (CryptographicAlgorithm): An enumeration defining the
|
||||||
|
algorithm to use to generate the symmetric key.
|
||||||
|
length (int): The length in bits for the symmetric key.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
string: The uid of the newly created symmetric key.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ClientConnectionNotOpen: if the client connection is unusable
|
||||||
|
KmipOperationFailure: if the operation result is a failure
|
||||||
|
TypeError: if the input arguments are invalid
|
||||||
|
"""
|
||||||
|
# Check inputs
|
||||||
|
if not isinstance(algorithm, enums.CryptographicAlgorithm):
|
||||||
|
raise TypeError(
|
||||||
|
"algorithm must be a CryptographicAlgorithm enumeration")
|
||||||
|
elif not isinstance(length, six.integer_types) or length <= 0:
|
||||||
|
raise TypeError("length must be a positive integer")
|
||||||
|
|
||||||
|
# Verify that operations can be given at this time
|
||||||
|
if not self._is_open:
|
||||||
|
raise exceptions.ClientConnectionNotOpen()
|
||||||
|
|
||||||
|
# Create the template containing the attributes
|
||||||
|
attributes = self._build_key_attributes(algorithm, length)
|
||||||
|
template = cobjects.TemplateAttribute(attributes=attributes)
|
||||||
|
|
||||||
|
# Create the symmetric key and handle the results
|
||||||
|
result = self.proxy.create(enums.ObjectType.SYMMETRIC_KEY, template)
|
||||||
|
|
||||||
|
status = result.result_status.enum
|
||||||
|
if status == enums.ResultStatus.SUCCESS:
|
||||||
|
uid = result.uuid.value
|
||||||
|
return uid
|
||||||
|
else:
|
||||||
|
reason = result.result_reason.enum
|
||||||
|
message = result.result_message.value
|
||||||
|
raise exceptions.KmipOperationFailure(status, reason, message)
|
||||||
|
|
||||||
|
def create_key_pair(self, algorithm, length):
|
||||||
|
"""
|
||||||
|
Create an asymmetric key pair on a KMIP appliance.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
algorithm (CryptographicAlgorithm): An enumeration defining the
|
||||||
|
algorithm to use to generate the key pair.
|
||||||
|
length (int): The length in bits for the key pair.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
string: The uid of the newly created public key.
|
||||||
|
string: The uid of the newly created private key.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ClientConnectionNotOpen: if the client connection is unusable
|
||||||
|
KmipOperationFailure: if the operation result is a failure
|
||||||
|
TypeError: if the input arguments are invalid
|
||||||
|
"""
|
||||||
|
# Check inputs
|
||||||
|
if not isinstance(algorithm, enums.CryptographicAlgorithm):
|
||||||
|
raise TypeError(
|
||||||
|
"algorithm must be a CryptographicAlgorithm enumeration")
|
||||||
|
elif not isinstance(length, six.integer_types) or length <= 0:
|
||||||
|
raise TypeError("length must be a positive integer")
|
||||||
|
|
||||||
|
# Verify that operations can be given at this time
|
||||||
|
if not self._is_open:
|
||||||
|
raise exceptions.ClientConnectionNotOpen()
|
||||||
|
|
||||||
|
# Create the template containing the attributes
|
||||||
|
attributes = self._build_key_attributes(algorithm, length)
|
||||||
|
template = cobjects.CommonTemplateAttribute(attributes=attributes)
|
||||||
|
|
||||||
|
# Create the asymmetric key pair and handle the results
|
||||||
|
result = self.proxy.create_key_pair(common_template_attribute=template)
|
||||||
|
|
||||||
|
status = result.result_status.enum
|
||||||
|
if status == enums.ResultStatus.SUCCESS:
|
||||||
|
public_uid = result.public_key_uuid.value
|
||||||
|
private_uid = result.private_key_uuid.value
|
||||||
|
return public_uid, private_uid
|
||||||
|
else:
|
||||||
|
reason = result.result_reason.enum
|
||||||
|
message = result.result_message.value
|
||||||
|
raise exceptions.KmipOperationFailure(status, reason, message)
|
||||||
|
|
||||||
|
def register(self, managed_object):
|
||||||
|
"""
|
||||||
|
Register a managed object with a KMIP appliance.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
managed_object (ManagedObject): A managed object to register. An
|
||||||
|
instantiatable subclass of ManagedObject from the Pie API.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
string: The uid of the newly registered managed object.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ClientConnectionNotOpen: if the client connection is unusable
|
||||||
|
KmipOperationFailure: if the operation result is a failure
|
||||||
|
TypeError: if the input argument is invalid
|
||||||
|
"""
|
||||||
|
# Check input
|
||||||
|
if not isinstance(managed_object, pobjects.ManagedObject):
|
||||||
|
raise TypeError("managed object must be a Pie ManagedObject")
|
||||||
|
|
||||||
|
# Verify that operations can be given at this time
|
||||||
|
if not self._is_open:
|
||||||
|
raise exceptions.ClientConnectionNotOpen()
|
||||||
|
|
||||||
|
# Extract and create attributes
|
||||||
|
attributes = list()
|
||||||
|
if hasattr(managed_object, 'cryptographic_usage_masks'):
|
||||||
|
mask_attribute = self.attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
|
||||||
|
managed_object.cryptographic_usage_masks)
|
||||||
|
|
||||||
|
attributes.append(mask_attribute)
|
||||||
|
|
||||||
|
template = cobjects.TemplateAttribute(attributes=attributes)
|
||||||
|
object_type = managed_object.object_type
|
||||||
|
|
||||||
|
# Register the managed object and handle the results
|
||||||
|
secret = self.object_factory.convert(managed_object)
|
||||||
|
result = self.proxy.register(object_type, template, secret)
|
||||||
|
|
||||||
|
status = result.result_status.enum
|
||||||
|
if status == enums.ResultStatus.SUCCESS:
|
||||||
|
uid = result.uuid.value
|
||||||
|
return uid
|
||||||
|
else:
|
||||||
|
reason = result.result_reason.enum
|
||||||
|
message = result.result_message.value
|
||||||
|
raise exceptions.KmipOperationFailure(status, reason, message)
|
||||||
|
|
||||||
|
def get(self, uid):
|
||||||
|
"""
|
||||||
|
Get a managed object from a KMIP appliance.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
uid (string): The unique ID of the managed object to retrieve.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
ManagedObject: The retrieved managed object object.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ClientConnectionNotOpen: if the client connection is unusable
|
||||||
|
KmipOperationFailure: if the operation result is a failure
|
||||||
|
TypeError: if the input argument is invalid
|
||||||
|
"""
|
||||||
|
# Check input
|
||||||
|
if not isinstance(uid, six.string_types):
|
||||||
|
raise TypeError("uid must be a string")
|
||||||
|
|
||||||
|
# Verify that operations can be given at this time
|
||||||
|
if not self._is_open:
|
||||||
|
raise exceptions.ClientConnectionNotOpen()
|
||||||
|
|
||||||
|
# Get the managed object and handle the results
|
||||||
|
result = self.proxy.get(uid)
|
||||||
|
|
||||||
|
status = result.result_status.enum
|
||||||
|
if status == enums.ResultStatus.SUCCESS:
|
||||||
|
managed_object = self.object_factory.convert(result.secret)
|
||||||
|
return managed_object
|
||||||
|
else:
|
||||||
|
reason = result.result_reason.enum
|
||||||
|
message = result.result_message.value
|
||||||
|
raise exceptions.KmipOperationFailure(status, reason, message)
|
||||||
|
|
||||||
|
def destroy(self, uid):
|
||||||
|
"""
|
||||||
|
Destroy a managed object stored by a KMIP appliance.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
uid (string): The unique ID of the managed object to destroy.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
None
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ClientConnectionNotOpen: if the client connection is unusable
|
||||||
|
KmipOperationFailure: if the operation result is a failure
|
||||||
|
TypeError: if the input argument is invalid
|
||||||
|
"""
|
||||||
|
# Check input
|
||||||
|
if not isinstance(uid, six.string_types):
|
||||||
|
raise TypeError("uid must be a string")
|
||||||
|
|
||||||
|
# Verify that operations can be given at this time
|
||||||
|
if not self._is_open:
|
||||||
|
raise exceptions.ClientConnectionNotOpen()
|
||||||
|
|
||||||
|
# Destroy the managed object and handle the results
|
||||||
|
result = self.proxy.destroy(uid)
|
||||||
|
|
||||||
|
status = result.result_status.enum
|
||||||
|
if status == enums.ResultStatus.SUCCESS:
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
reason = result.result_reason.enum
|
||||||
|
message = result.result_message.value
|
||||||
|
raise exceptions.KmipOperationFailure(status, reason, message)
|
||||||
|
|
||||||
|
def _build_key_attributes(self, algorithm, length):
|
||||||
|
# Build a list of core key attributes.
|
||||||
|
algorithm_attribute = self.attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM,
|
||||||
|
algorithm)
|
||||||
|
length_attribute = self.attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.CRYPTOGRAPHIC_LENGTH,
|
||||||
|
length)
|
||||||
|
mask_attribute = self.attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
|
||||||
|
[enums.CryptographicUsageMask.ENCRYPT,
|
||||||
|
enums.CryptographicUsageMask.DECRYPT])
|
||||||
|
|
||||||
|
return [algorithm_attribute, length_attribute, mask_attribute]
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
self.open()
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_value, traceback):
|
||||||
|
self.close()
|
|
@ -0,0 +1,50 @@
|
||||||
|
# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
|
||||||
|
class ClientConnectionFailure(Exception):
|
||||||
|
"""
|
||||||
|
An exception raised for errors with the client socket connection.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class ClientConnectionNotOpen(Exception):
|
||||||
|
"""
|
||||||
|
An exception raised when operations are issued to a closed connection.
|
||||||
|
"""
|
||||||
|
def __init__(self):
|
||||||
|
"""
|
||||||
|
Construct the closed client connection error message.
|
||||||
|
"""
|
||||||
|
super(ClientConnectionNotOpen, self).__init__(
|
||||||
|
"client connection not open")
|
||||||
|
|
||||||
|
|
||||||
|
class KmipOperationFailure(Exception):
|
||||||
|
"""
|
||||||
|
An exception raised upon the failure of a KMIP appliance operation.
|
||||||
|
"""
|
||||||
|
def __init__(self, status, reason, message):
|
||||||
|
"""
|
||||||
|
Construct the error message for the KMIP operation failure.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
status: a ResultStatus enumeration
|
||||||
|
reason: a ResultReason enumeration
|
||||||
|
message: a string providing additional error information
|
||||||
|
"""
|
||||||
|
msg = "{0}: {1} - {2}".format(status.name, reason.name, message)
|
||||||
|
super(KmipOperationFailure, self).__init__(msg)
|
|
@ -0,0 +1,599 @@
|
||||||
|
# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import mock
|
||||||
|
import six
|
||||||
|
import ssl
|
||||||
|
import testtools
|
||||||
|
|
||||||
|
from kmip.core import attributes as attr
|
||||||
|
from kmip.core import enums
|
||||||
|
from kmip.core import objects as obj
|
||||||
|
|
||||||
|
from kmip.core.factories import attributes
|
||||||
|
from kmip.core.messages import contents
|
||||||
|
|
||||||
|
from kmip.services.kmip_client import KMIPProxy
|
||||||
|
from kmip.services import results
|
||||||
|
|
||||||
|
from kmip.pie.client import ProxyKmipClient
|
||||||
|
|
||||||
|
from kmip.pie.exceptions import ClientConnectionFailure
|
||||||
|
from kmip.pie.exceptions import ClientConnectionNotOpen
|
||||||
|
from kmip.pie.exceptions import KmipOperationFailure
|
||||||
|
|
||||||
|
from kmip.pie import factory
|
||||||
|
from kmip.pie import objects
|
||||||
|
|
||||||
|
|
||||||
|
class TestProxyKmipClient(testtools.TestCase):
|
||||||
|
"""
|
||||||
|
Test suite for the ProxyKmipClient.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestProxyKmipClient, self).setUp()
|
||||||
|
self.attribute_factory = attributes.AttributeFactory()
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
super(TestProxyKmipClient, self).tearDown()
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_init(self):
|
||||||
|
"""
|
||||||
|
Test that a ProxyKmipClient can be constructed with valid arguments.
|
||||||
|
"""
|
||||||
|
ProxyKmipClient(
|
||||||
|
hostname='127.0.0.1',
|
||||||
|
port=5696,
|
||||||
|
cert='/example/path/to/cert',
|
||||||
|
key='/example/path/to/key',
|
||||||
|
ca='/example/path/to/ca',
|
||||||
|
ssl_version=ssl.PROTOCOL_TLSv1,
|
||||||
|
username='username',
|
||||||
|
password='password',
|
||||||
|
config='test')
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_open(self):
|
||||||
|
"""
|
||||||
|
Test that the client can open a connection.
|
||||||
|
"""
|
||||||
|
client = ProxyKmipClient()
|
||||||
|
client.open()
|
||||||
|
client.proxy.open.assert_called_with()
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_open_on_open(self):
|
||||||
|
"""
|
||||||
|
Test that a ClientConnectionFailure exception is raised when trying
|
||||||
|
to open an opened client connection.
|
||||||
|
"""
|
||||||
|
client = ProxyKmipClient()
|
||||||
|
client.open()
|
||||||
|
self.assertRaises(ClientConnectionFailure, client.open)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_open_on_proxy_failure(self):
|
||||||
|
"""
|
||||||
|
Test that an Exception is raised when an error occurs while opening
|
||||||
|
the client proxy connection.
|
||||||
|
"""
|
||||||
|
client = ProxyKmipClient()
|
||||||
|
client.proxy.open.side_effect = Exception
|
||||||
|
self.assertRaises(Exception, client.open)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_close(self):
|
||||||
|
"""
|
||||||
|
Test that the client can close an open connection.
|
||||||
|
"""
|
||||||
|
client = ProxyKmipClient()
|
||||||
|
client.open()
|
||||||
|
client.close()
|
||||||
|
client.proxy.close.assert_called_with()
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_close_on_close(self):
|
||||||
|
"""
|
||||||
|
Test that a ClientConnectionNotOpen exception is raised when trying
|
||||||
|
to close a closed client connection.
|
||||||
|
"""
|
||||||
|
client = ProxyKmipClient()
|
||||||
|
self.assertRaises(ClientConnectionNotOpen, client.close)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_close_on_proxy_failure(self):
|
||||||
|
"""
|
||||||
|
Test that an Exception is raised when an error occurs while closing
|
||||||
|
the client proxy connection.
|
||||||
|
"""
|
||||||
|
client = ProxyKmipClient()
|
||||||
|
client._is_open = True
|
||||||
|
client.proxy.close.side_effect = Exception
|
||||||
|
self.assertRaises(Exception, client.close)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_enter(self):
|
||||||
|
"""
|
||||||
|
Test the result and effect of the enter method for the context
|
||||||
|
manager.
|
||||||
|
"""
|
||||||
|
client = ProxyKmipClient()
|
||||||
|
|
||||||
|
self.assertFalse(client._is_open)
|
||||||
|
result = client.__enter__()
|
||||||
|
self.assertEqual(result, client)
|
||||||
|
self.assertTrue(client._is_open)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_exit(self):
|
||||||
|
"""
|
||||||
|
Test the result and effect of the exit method for the context
|
||||||
|
manager.
|
||||||
|
"""
|
||||||
|
client = ProxyKmipClient()
|
||||||
|
client.__enter__()
|
||||||
|
|
||||||
|
self.assertTrue(client._is_open)
|
||||||
|
client.__exit__(None, None, None)
|
||||||
|
self.assertFalse(client._is_open)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_context_manager(self):
|
||||||
|
"""
|
||||||
|
Test that the KmipClient can be used by the with-statement as a
|
||||||
|
context manager.
|
||||||
|
"""
|
||||||
|
with ProxyKmipClient() as client:
|
||||||
|
self.assertTrue(client._is_open)
|
||||||
|
client.proxy.open.assert_called_with()
|
||||||
|
self.assertFalse(client._is_open)
|
||||||
|
client.proxy.close.assert_called_with()
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_create(self):
|
||||||
|
"""
|
||||||
|
Test that a symmetric key can be created with proper inputs and that
|
||||||
|
its UID is returned properly.
|
||||||
|
"""
|
||||||
|
# Create the template to test the create call
|
||||||
|
algorithm = enums.CryptographicAlgorithm.AES
|
||||||
|
length = 256
|
||||||
|
algorithm_attribute = self.attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, algorithm)
|
||||||
|
length_attribute = self.attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.CRYPTOGRAPHIC_LENGTH, length)
|
||||||
|
mask_attribute = self.attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
|
||||||
|
[enums.CryptographicUsageMask.ENCRYPT,
|
||||||
|
enums.CryptographicUsageMask.DECRYPT])
|
||||||
|
|
||||||
|
attributes = [algorithm_attribute, length_attribute, mask_attribute]
|
||||||
|
template = obj.TemplateAttribute(attributes=attributes)
|
||||||
|
|
||||||
|
key_id = 'aaaaaaaa-1111-2222-3333-ffffffffffff'
|
||||||
|
status = enums.ResultStatus.SUCCESS
|
||||||
|
result = results.CreateResult(
|
||||||
|
contents.ResultStatus(status),
|
||||||
|
uuid=attr.UniqueIdentifier(key_id))
|
||||||
|
|
||||||
|
with ProxyKmipClient() as client:
|
||||||
|
client.proxy.create.return_value = result
|
||||||
|
|
||||||
|
uid = client.create(algorithm, length)
|
||||||
|
client.proxy.create.assert_called_with(
|
||||||
|
enums.ObjectType.SYMMETRIC_KEY, template)
|
||||||
|
self.assertIsInstance(uid, six.string_types)
|
||||||
|
self.assertEqual(uid, key_id)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_create_on_invalid_algorithm(self):
|
||||||
|
"""
|
||||||
|
Test that a TypeError exception is raised when trying to create a
|
||||||
|
symmetric key with an invalid algorithm.
|
||||||
|
"""
|
||||||
|
args = ['invalid', 256]
|
||||||
|
with ProxyKmipClient() as client:
|
||||||
|
self.assertRaises(TypeError, client.create, *args)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_create_on_invalid_length(self):
|
||||||
|
"""
|
||||||
|
Test that a TypeError exception is raised when trying to create a
|
||||||
|
symmetric key with an invalid length.
|
||||||
|
"""
|
||||||
|
args = [enums.CryptographicAlgorithm.AES, 'invalid']
|
||||||
|
with ProxyKmipClient() as client:
|
||||||
|
self.assertRaises(TypeError, client.create, *args)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_create_on_closed(self):
|
||||||
|
"""
|
||||||
|
Test that a ClientConnectionNotOpen exception is raised when trying
|
||||||
|
to create a symmetric key on an unopened client connection.
|
||||||
|
"""
|
||||||
|
client = ProxyKmipClient()
|
||||||
|
args = [enums.CryptographicAlgorithm.AES, 256]
|
||||||
|
self.assertRaises(
|
||||||
|
ClientConnectionNotOpen, client.create, *args)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_create_on_operation_failure(self):
|
||||||
|
"""
|
||||||
|
Test that a KmipOperationFailure exception is raised when the
|
||||||
|
the backend fails to create a symmetric key.
|
||||||
|
"""
|
||||||
|
status = enums.ResultStatus.OPERATION_FAILED
|
||||||
|
reason = enums.ResultReason.GENERAL_FAILURE
|
||||||
|
message = "Test failure message"
|
||||||
|
|
||||||
|
result = results.OperationResult(
|
||||||
|
contents.ResultStatus(status),
|
||||||
|
contents.ResultReason(reason),
|
||||||
|
contents.ResultMessage(message))
|
||||||
|
error_msg = str(KmipOperationFailure(status, reason, message))
|
||||||
|
|
||||||
|
client = ProxyKmipClient()
|
||||||
|
client.open()
|
||||||
|
client.proxy.create.return_value = result
|
||||||
|
args = [enums.CryptographicAlgorithm.AES, 256]
|
||||||
|
|
||||||
|
self.assertRaisesRegexp(
|
||||||
|
KmipOperationFailure, error_msg, client.create, *args)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_create_key_pair(self):
|
||||||
|
"""
|
||||||
|
Test that an asymmetric key pair can be created with proper inputs
|
||||||
|
and that the UIDs of the public and private keys are returned
|
||||||
|
properly.
|
||||||
|
"""
|
||||||
|
# Create the template to test the create key pair call
|
||||||
|
algorithm = enums.CryptographicAlgorithm.RSA
|
||||||
|
length = 2048
|
||||||
|
algorithm_attribute = self.attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.CRYPTOGRAPHIC_ALGORITHM, algorithm)
|
||||||
|
length_attribute = self.attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.CRYPTOGRAPHIC_LENGTH, length)
|
||||||
|
mask_attribute = self.attribute_factory.create_attribute(
|
||||||
|
enums.AttributeType.CRYPTOGRAPHIC_USAGE_MASK,
|
||||||
|
[enums.CryptographicUsageMask.ENCRYPT,
|
||||||
|
enums.CryptographicUsageMask.DECRYPT])
|
||||||
|
|
||||||
|
attributes = [algorithm_attribute, length_attribute, mask_attribute]
|
||||||
|
template = obj.CommonTemplateAttribute(attributes=attributes)
|
||||||
|
|
||||||
|
status = enums.ResultStatus.SUCCESS
|
||||||
|
result = results.CreateKeyPairResult(
|
||||||
|
contents.ResultStatus(status),
|
||||||
|
public_key_uuid=attr.PublicKeyUniqueIdentifier(
|
||||||
|
'aaaaaaaa-1111-2222-3333-ffffffffffff'),
|
||||||
|
private_key_uuid=attr.PrivateKeyUniqueIdentifier(
|
||||||
|
'ffffffff-3333-2222-1111-aaaaaaaaaaaa'))
|
||||||
|
|
||||||
|
with ProxyKmipClient() as client:
|
||||||
|
client.proxy.create_key_pair.return_value = result
|
||||||
|
|
||||||
|
public_uid, private_uid = client.create_key_pair(
|
||||||
|
enums.CryptographicAlgorithm.RSA, 2048)
|
||||||
|
|
||||||
|
kwargs = {'common_template_attribute': template}
|
||||||
|
client.proxy.create_key_pair.assert_called_with(**kwargs)
|
||||||
|
self.assertIsInstance(public_uid, six.string_types)
|
||||||
|
self.assertIsInstance(private_uid, six.string_types)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_create_key_pair_on_invalid_algorithm(self):
|
||||||
|
"""
|
||||||
|
Test that a TypeError exception is raised when trying to create an
|
||||||
|
asymmetric key pair with an invalid algorithm.
|
||||||
|
"""
|
||||||
|
args = ['invalid', 256]
|
||||||
|
with ProxyKmipClient() as client:
|
||||||
|
self.assertRaises(
|
||||||
|
TypeError, client.create_key_pair, *args)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_create_key_pair_on_invalid_length(self):
|
||||||
|
"""
|
||||||
|
Test that a TypeError exception is raised when trying to create an
|
||||||
|
asymmetric key pair with an invalid length.
|
||||||
|
"""
|
||||||
|
args = [enums.CryptographicAlgorithm.AES, 'invalid']
|
||||||
|
with ProxyKmipClient() as client:
|
||||||
|
self.assertRaises(
|
||||||
|
TypeError, client.create_key_pair, *args)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_create_key_pair_on_closed(self):
|
||||||
|
"""
|
||||||
|
Test that a ClientConnectionNotOpen exception is raised when trying
|
||||||
|
to create an asymmetric key pair on an unopened client connection.
|
||||||
|
"""
|
||||||
|
client = ProxyKmipClient()
|
||||||
|
args = [enums.CryptographicAlgorithm.RSA, 2048]
|
||||||
|
self.assertRaises(
|
||||||
|
ClientConnectionNotOpen, client.create_key_pair, *args)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_create_key_pair_on_operation_failure(self):
|
||||||
|
"""
|
||||||
|
Test that a KmipOperationFailure exception is raised when the
|
||||||
|
backend fails to create an asymmetric key pair.
|
||||||
|
"""
|
||||||
|
status = enums.ResultStatus.OPERATION_FAILED
|
||||||
|
reason = enums.ResultReason.GENERAL_FAILURE
|
||||||
|
message = "Test failure message"
|
||||||
|
|
||||||
|
result = results.OperationResult(
|
||||||
|
contents.ResultStatus(status),
|
||||||
|
contents.ResultReason(reason),
|
||||||
|
contents.ResultMessage(message))
|
||||||
|
error_msg = str(KmipOperationFailure(status, reason, message))
|
||||||
|
|
||||||
|
client = ProxyKmipClient()
|
||||||
|
client.open()
|
||||||
|
client.proxy.create_key_pair.return_value = result
|
||||||
|
args = [enums.CryptographicAlgorithm.RSA, 2048]
|
||||||
|
|
||||||
|
self.assertRaisesRegexp(
|
||||||
|
KmipOperationFailure, error_msg,
|
||||||
|
client.create_key_pair, *args)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_get(self):
|
||||||
|
"""
|
||||||
|
Test that a secret can be retrieved with proper input.
|
||||||
|
"""
|
||||||
|
# Key encoding obtained from Section 14.2 of the KMIP 1.1 test
|
||||||
|
# documentation.
|
||||||
|
secret = objects.SymmetricKey(
|
||||||
|
enums.CryptographicAlgorithm.AES,
|
||||||
|
128,
|
||||||
|
(b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B\x0C\x0D\x0E'
|
||||||
|
b'\x0F'))
|
||||||
|
fact = factory.ObjectFactory()
|
||||||
|
|
||||||
|
result = results.GetResult(
|
||||||
|
contents.ResultStatus(enums.ResultStatus.SUCCESS),
|
||||||
|
uuid=attr.PublicKeyUniqueIdentifier(
|
||||||
|
'aaaaaaaa-1111-2222-3333-ffffffffffff'),
|
||||||
|
secret=fact.convert(secret))
|
||||||
|
|
||||||
|
with ProxyKmipClient() as client:
|
||||||
|
client.proxy.get.return_value = result
|
||||||
|
|
||||||
|
result = client.get('aaaaaaaa-1111-2222-3333-ffffffffffff')
|
||||||
|
client.proxy.get.assert_called_with(
|
||||||
|
'aaaaaaaa-1111-2222-3333-ffffffffffff')
|
||||||
|
self.assertIsInstance(result, objects.SymmetricKey)
|
||||||
|
self.assertEqual(result, secret)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_get_on_invalid_uid(self):
|
||||||
|
"""
|
||||||
|
Test that a TypeError exception is raised when trying to retrieve a
|
||||||
|
secret with an invalid ID.
|
||||||
|
"""
|
||||||
|
args = [0]
|
||||||
|
with ProxyKmipClient() as client:
|
||||||
|
self.assertRaises(TypeError, client.get, *args)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_get_on_closed(self):
|
||||||
|
"""
|
||||||
|
Test that a ClientConnectionNotOpen exception is raised when trying
|
||||||
|
to retrieve a secret on an unopened client connection.
|
||||||
|
"""
|
||||||
|
client = ProxyKmipClient()
|
||||||
|
args = ['aaaaaaaa-1111-2222-3333-ffffffffffff']
|
||||||
|
self.assertRaises(ClientConnectionNotOpen, client.get, *args)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_get_on_operation_failure(self):
|
||||||
|
"""
|
||||||
|
Test that a KmipOperationFailure exception is raised when the
|
||||||
|
backend fails to retrieve a secret.
|
||||||
|
"""
|
||||||
|
status = enums.ResultStatus.OPERATION_FAILED
|
||||||
|
reason = enums.ResultReason.GENERAL_FAILURE
|
||||||
|
message = "Test failure message"
|
||||||
|
|
||||||
|
result = results.OperationResult(
|
||||||
|
contents.ResultStatus(status),
|
||||||
|
contents.ResultReason(reason),
|
||||||
|
contents.ResultMessage(message))
|
||||||
|
error_msg = str(KmipOperationFailure(status, reason, message))
|
||||||
|
|
||||||
|
client = ProxyKmipClient()
|
||||||
|
client.open()
|
||||||
|
client.proxy.get.return_value = result
|
||||||
|
args = ['id']
|
||||||
|
|
||||||
|
self.assertRaisesRegexp(
|
||||||
|
KmipOperationFailure, error_msg, client.get, *args)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_destroy(self):
|
||||||
|
"""
|
||||||
|
Test that the client can destroy a secret.
|
||||||
|
"""
|
||||||
|
status = enums.ResultStatus.SUCCESS
|
||||||
|
result = results.OperationResult(contents.ResultStatus(status))
|
||||||
|
|
||||||
|
with ProxyKmipClient() as client:
|
||||||
|
client.proxy.destroy.return_value = result
|
||||||
|
result = client.destroy(
|
||||||
|
'aaaaaaaa-1111-2222-3333-ffffffffffff')
|
||||||
|
client.proxy.destroy.assert_called_with(
|
||||||
|
'aaaaaaaa-1111-2222-3333-ffffffffffff')
|
||||||
|
self.assertEqual(None, result)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_destroy_on_invalid_uid(self):
|
||||||
|
"""
|
||||||
|
Test that a TypeError exception is raised when trying to destroy a
|
||||||
|
secret with an invalid ID.
|
||||||
|
"""
|
||||||
|
args = [0]
|
||||||
|
with ProxyKmipClient() as client:
|
||||||
|
self.assertRaises(TypeError, client.destroy, *args)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_destroy_on_closed(self):
|
||||||
|
"""
|
||||||
|
Test that a ClientConnectionNotOpen exception is raised when trying
|
||||||
|
to destroy a secret on an unopened client connection.
|
||||||
|
"""
|
||||||
|
client = ProxyKmipClient()
|
||||||
|
args = ['aaaaaaaa-1111-2222-3333-ffffffffffff']
|
||||||
|
self.assertRaises(
|
||||||
|
ClientConnectionNotOpen, client.destroy, *args)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_destroy_on_operation_failure(self):
|
||||||
|
"""
|
||||||
|
Test that a KmipOperationFailure exception is raised when the
|
||||||
|
backend fails to destroy a secret.
|
||||||
|
"""
|
||||||
|
status = enums.ResultStatus.OPERATION_FAILED
|
||||||
|
reason = enums.ResultReason.GENERAL_FAILURE
|
||||||
|
message = "Test failure message"
|
||||||
|
|
||||||
|
result = results.OperationResult(
|
||||||
|
contents.ResultStatus(status),
|
||||||
|
contents.ResultReason(reason),
|
||||||
|
contents.ResultMessage(message))
|
||||||
|
error_msg = str(KmipOperationFailure(status, reason, message))
|
||||||
|
|
||||||
|
client = ProxyKmipClient()
|
||||||
|
client.open()
|
||||||
|
client.proxy.destroy.return_value = result
|
||||||
|
args = ['id']
|
||||||
|
|
||||||
|
self.assertRaisesRegexp(
|
||||||
|
KmipOperationFailure, error_msg, client.destroy, *args)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_register(self):
|
||||||
|
"""
|
||||||
|
Test that the client can register a key.
|
||||||
|
"""
|
||||||
|
key = objects.SymmetricKey(
|
||||||
|
enums.CryptographicAlgorithm.AES,
|
||||||
|
128,
|
||||||
|
(b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B\x0C\x0D\x0E'
|
||||||
|
b'\x0F'))
|
||||||
|
|
||||||
|
result = results.RegisterResult(
|
||||||
|
contents.ResultStatus(enums.ResultStatus.SUCCESS),
|
||||||
|
uuid=attr.PublicKeyUniqueIdentifier(
|
||||||
|
'aaaaaaaa-1111-2222-3333-ffffffffffff'))
|
||||||
|
|
||||||
|
with ProxyKmipClient() as client:
|
||||||
|
client.proxy.register.return_value = result
|
||||||
|
uid = client.register(key)
|
||||||
|
self.assertTrue(client.proxy.register.called)
|
||||||
|
self.assertIsInstance(uid, six.string_types)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_register_on_invalid_uid(self):
|
||||||
|
"""
|
||||||
|
Test that a TypeError exception is raised when trying to register a
|
||||||
|
key with an invalid key object.
|
||||||
|
"""
|
||||||
|
args = ['invalid']
|
||||||
|
with ProxyKmipClient() as client:
|
||||||
|
self.assertRaises(TypeError, client.register, *args)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_register_on_closed(self):
|
||||||
|
"""
|
||||||
|
Test that a ClientConnectionNotOpen exception is raised when trying
|
||||||
|
to register a key on an unopened client connection.
|
||||||
|
"""
|
||||||
|
key = objects.SymmetricKey(
|
||||||
|
enums.CryptographicAlgorithm.AES,
|
||||||
|
128,
|
||||||
|
(b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B\x0C\x0D\x0E'
|
||||||
|
b'\x0F'))
|
||||||
|
client = ProxyKmipClient()
|
||||||
|
args = [key]
|
||||||
|
self.assertRaises(ClientConnectionNotOpen, client.register, *args)
|
||||||
|
|
||||||
|
@mock.patch('kmip.pie.client.KMIPProxy',
|
||||||
|
mock.MagicMock(spec_set=KMIPProxy))
|
||||||
|
def test_register_on_operation_failure(self):
|
||||||
|
"""
|
||||||
|
Test that a KmipOperationFailure exception is raised when the
|
||||||
|
backend fails to register a key.
|
||||||
|
"""
|
||||||
|
status = enums.ResultStatus.OPERATION_FAILED
|
||||||
|
reason = enums.ResultReason.GENERAL_FAILURE
|
||||||
|
message = "Test failure message"
|
||||||
|
|
||||||
|
result = results.OperationResult(
|
||||||
|
contents.ResultStatus(status),
|
||||||
|
contents.ResultReason(reason),
|
||||||
|
contents.ResultMessage(message))
|
||||||
|
error_msg = str(KmipOperationFailure(status, reason, message))
|
||||||
|
|
||||||
|
# Key encoding obtained from Section 14.2 of the KMIP 1.1 test
|
||||||
|
# documentation.
|
||||||
|
key_value = (
|
||||||
|
b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B\x0C\x0D\x0E'
|
||||||
|
b'\x0F')
|
||||||
|
key = objects.SymmetricKey(
|
||||||
|
enums.CryptographicAlgorithm.AES, 128, key_value)
|
||||||
|
|
||||||
|
client = ProxyKmipClient()
|
||||||
|
client.open()
|
||||||
|
client.proxy.register.return_value = result
|
||||||
|
args = [key]
|
||||||
|
|
||||||
|
self.assertRaisesRegexp(
|
||||||
|
KmipOperationFailure, error_msg, client.register, *args)
|
|
@ -0,0 +1,106 @@
|
||||||
|
# Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
from testtools import TestCase
|
||||||
|
|
||||||
|
from kmip.core.enums import ResultStatus
|
||||||
|
from kmip.core.enums import ResultReason
|
||||||
|
|
||||||
|
from kmip.pie.exceptions import ClientConnectionFailure
|
||||||
|
from kmip.pie.exceptions import ClientConnectionNotOpen
|
||||||
|
from kmip.pie.exceptions import KmipOperationFailure
|
||||||
|
|
||||||
|
|
||||||
|
class TestClientConnectionFailure(TestCase):
|
||||||
|
"""
|
||||||
|
Test suite for ClientConnectionFailure.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestClientConnectionFailure, self).setUp()
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
super(TestClientConnectionFailure, self).tearDown()
|
||||||
|
|
||||||
|
def test_init(self):
|
||||||
|
"""
|
||||||
|
Test that a ClientConnectionFailure exception can be instantiated.
|
||||||
|
"""
|
||||||
|
exc = ClientConnectionFailure()
|
||||||
|
self.assertIsInstance(exc, Exception)
|
||||||
|
|
||||||
|
def test_message(self):
|
||||||
|
"""
|
||||||
|
Test that a ClientConnectionFailure exception message can be set
|
||||||
|
properly.
|
||||||
|
"""
|
||||||
|
exc = ClientConnectionFailure("test message")
|
||||||
|
self.assertEqual("test message", str(exc))
|
||||||
|
|
||||||
|
|
||||||
|
class TestClientConnectionNotOpen(TestCase):
|
||||||
|
"""
|
||||||
|
Test suite for ClientConnectionNotOpen.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestClientConnectionNotOpen, self).setUp()
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
super(TestClientConnectionNotOpen, self).tearDown()
|
||||||
|
|
||||||
|
def test_init(self):
|
||||||
|
"""
|
||||||
|
Test that a ClientConnectionNotOpen exception can be instantiated.
|
||||||
|
"""
|
||||||
|
exc = ClientConnectionNotOpen()
|
||||||
|
self.assertIsInstance(exc, Exception)
|
||||||
|
self.assertEqual("client connection not open", str(exc))
|
||||||
|
|
||||||
|
|
||||||
|
class TestKmipOperationFailure(TestCase):
|
||||||
|
"""
|
||||||
|
Test suite for KmipOperationFailure.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestKmipOperationFailure, self).setUp()
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
super(TestKmipOperationFailure, self).tearDown()
|
||||||
|
|
||||||
|
def test_init(self):
|
||||||
|
"""
|
||||||
|
Test that a KmipOperationFailure exception can be instantiated.
|
||||||
|
"""
|
||||||
|
exc = KmipOperationFailure(
|
||||||
|
ResultStatus.OPERATION_FAILED,
|
||||||
|
ResultReason.GENERAL_FAILURE,
|
||||||
|
"Test error message.")
|
||||||
|
self.assertIsInstance(exc, Exception)
|
||||||
|
|
||||||
|
def test_message(self):
|
||||||
|
"""
|
||||||
|
Test that a KmipOperationFailure exception message can be set
|
||||||
|
properly.
|
||||||
|
"""
|
||||||
|
status = ResultStatus.OPERATION_FAILED
|
||||||
|
reason = ResultReason.GENERAL_FAILURE
|
||||||
|
exc = KmipOperationFailure(status, reason, "Test error message.")
|
||||||
|
|
||||||
|
msg = "{0}: {1} - {2}".format(
|
||||||
|
status.name, reason.name, "Test error message.")
|
||||||
|
|
||||||
|
self.assertEqual(msg, str(exc))
|
Loading…
Reference in New Issue