mirror of
https://github.com/OpenKMIP/PyKMIP.git
synced 2025-07-20 12:34:22 +02:00
Merge pull request #115 from hadesto/dev/hadi/unit_tests/kmip_client/server_failover_release
Server Failover Unit Tests
This commit is contained in:
commit
a0423352ab
@ -5,7 +5,7 @@ keyfile=None
|
|||||||
certfile=None
|
certfile=None
|
||||||
cert_reqs=CERT_REQUIRED
|
cert_reqs=CERT_REQUIRED
|
||||||
ssl_version=PROTOCOL_SSLv23
|
ssl_version=PROTOCOL_SSLv23
|
||||||
ca_certs=./demos/certs/server.crt
|
ca_certs=./kmip/demos/certs/server.crt
|
||||||
do_handshake_on_connect=True
|
do_handshake_on_connect=True
|
||||||
suppress_ragged_eofs=True
|
suppress_ragged_eofs=True
|
||||||
username=None
|
username=None
|
||||||
|
@ -220,9 +220,10 @@ class KMIPProxy(KMIP):
|
|||||||
self.logger.error("An error occurred while connecting to "
|
self.logger.error("An error occurred while connecting to "
|
||||||
"appliance " + self.host)
|
"appliance " + self.host)
|
||||||
self.socket.close()
|
self.socket.close()
|
||||||
self.socket = None
|
|
||||||
else:
|
else:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
self.socket = None
|
||||||
raise e
|
raise e
|
||||||
|
|
||||||
def _create_socket(self, sock):
|
def _create_socket(self, sock):
|
||||||
|
@ -62,8 +62,9 @@ from kmip.services.results import RekeyKeyPairResult
|
|||||||
import kmip.core.utils as utils
|
import kmip.core.utils as utils
|
||||||
|
|
||||||
import mock
|
import mock
|
||||||
|
import os
|
||||||
import socket
|
import socket
|
||||||
|
import ssl
|
||||||
|
|
||||||
|
|
||||||
class TestKMIPClient(TestCase):
|
class TestKMIPClient(TestCase):
|
||||||
@ -77,6 +78,16 @@ class TestKMIPClient(TestCase):
|
|||||||
|
|
||||||
self.client = KMIPProxy()
|
self.client = KMIPProxy()
|
||||||
|
|
||||||
|
KMIP_PORT = 9090
|
||||||
|
CA_CERTS_PATH = os.path.normpath(os.path.join(os.path.dirname(
|
||||||
|
os.path.abspath(__file__)), '../utils/certs/server.crt'))
|
||||||
|
|
||||||
|
self.mock_client = KMIPProxy(host="IP_ADDR_1, IP_ADDR_2",
|
||||||
|
port=KMIP_PORT, ca_certs=CA_CERTS_PATH)
|
||||||
|
self.mock_client.socket = mock.MagicMock()
|
||||||
|
self.mock_client.socket.connect = mock.MagicMock()
|
||||||
|
self.mock_client.socket.close = mock.MagicMock()
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
super(TestKMIPClient, self).tearDown()
|
super(TestKMIPClient, self).tearDown()
|
||||||
|
|
||||||
@ -541,25 +552,42 @@ class TestKMIPClient(TestCase):
|
|||||||
self.assertRaises(expected_error, self.client._set_variables,
|
self.assertRaises(expected_error, self.client._set_variables,
|
||||||
**kwargs)
|
**kwargs)
|
||||||
|
|
||||||
@mock.patch('socket.socket.connect')
|
@mock.patch.object(KMIPProxy, '_create_socket')
|
||||||
@mock.patch('ssl.SSLSocket.gettimeout')
|
def test_open_server_conn_failover_fail(self, mock_create_socket):
|
||||||
def test_timeout_all_hosts(self, mock_ssl_timeout, mock_connect_return):
|
|
||||||
"""
|
"""
|
||||||
This test verifies that the client will throw an exception if no
|
This test verifies that the KMIP client throws an exception if no
|
||||||
hosts are available for connection.
|
servers are available for connection
|
||||||
"""
|
"""
|
||||||
|
mock_create_socket.return_value = mock.MagicMock()
|
||||||
|
|
||||||
mock_ssl_timeout.return_value = 1
|
# Assumes both IP addresses fail connection attempts
|
||||||
mock_connect_return.return_value = socket.timeout
|
self.mock_client.socket.connect.side_effect = [Exception, Exception]
|
||||||
try:
|
|
||||||
self.client.open()
|
self.assertRaises(Exception, self.mock_client.open)
|
||||||
except Exception as e:
|
|
||||||
# TODO: once the exception is properly defined in the
|
@mock.patch.object(KMIPProxy, '_create_socket')
|
||||||
# kmip_client.py file this test needs to change to reflect that.
|
def test_open_server_conn_failover_succeed(self, mock_create_socket):
|
||||||
self.assertIsInstance(e, Exception)
|
"""
|
||||||
self.client.close()
|
This test verifies that the KMIP client can setup a connection if at
|
||||||
else:
|
least one connection is established
|
||||||
self.client.close()
|
"""
|
||||||
|
mock_create_socket.return_value = mock.MagicMock()
|
||||||
|
|
||||||
|
# Assumes IP_ADDR_1 is a bad address and IP_ADDR_2 is a good address
|
||||||
|
self.mock_client.socket.connect.side_effect = [Exception, None]
|
||||||
|
|
||||||
|
self.mock_client.open()
|
||||||
|
|
||||||
|
self.assertEqual('IP_ADDR_2', self.mock_client.host)
|
||||||
|
|
||||||
|
def test_socket_ssl_wrap(self):
|
||||||
|
"""
|
||||||
|
This test tests that the KMIP socket is successfully wrapped into an
|
||||||
|
ssl socket
|
||||||
|
"""
|
||||||
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
self.client._create_socket(sock)
|
||||||
|
self.assertEqual(ssl.SSLSocket, type(self.client.socket))
|
||||||
|
|
||||||
|
|
||||||
class TestClientProfileInformation(TestCase):
|
class TestClientProfileInformation(TestCase):
|
||||||
|
Loading…
x
Reference in New Issue
Block a user