PyKMIP/kmip/tests/unit/services/server/test_server.py
Peter Hamilton e77dcadf41 Fixing violations of E722 for flake8 checks
This change fixes violations of E722, the use of except without
specifying an exception type. For now the high-level Exception
class is used as a generic catchall. In the future these cases
will be updated to handle the specific exceptions expected.
2017-10-23 08:43:36 -04:00

536 lines
19 KiB
Python

# Copyright (c) 2016 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
try:
import unittest.mock as mock
except Exception:
import mock
import signal
import socket
import testtools
from kmip.core import exceptions
from kmip.services import auth
from kmip.services.server import server
class TestKmipServer(testtools.TestCase):
"""
A test suite for the KmipServer.
"""
def setUp(self):
super(TestKmipServer, self).setUp()
def tearDown(self):
super(TestKmipServer, self).tearDown()
@mock.patch('kmip.services.server.server.KmipServer._setup_logging')
@mock.patch('kmip.services.server.server.KmipServer._setup_configuration')
def test_init(self, config_mock, logging_mock):
"""
Test that a KmipServer can be instantiated without error.
"""
s = server.KmipServer()
self.assertTrue(config_mock.called)
self.assertTrue(logging_mock.called)
self.assertIsInstance(s.auth_suite, auth.BasicAuthenticationSuite)
self.assertIsNotNone(s._engine)
self.assertEqual(1, s._session_id)
self.assertFalse(s._is_serving)
@mock.patch('logging.getLogger', side_effect=mock.MagicMock())
@mock.patch('logging.handlers.RotatingFileHandler')
@mock.patch('kmip.services.server.server.KmipServer._setup_configuration')
@mock.patch('os.path.exists')
@mock.patch('os.path.isdir')
@mock.patch('os.makedirs')
def test_setup_logging(
self,
makedirs_mock,
isdir_mock,
path_mock,
config_mock,
handler_mock,
logging_mock):
"""
Verify that the server logger is setup correctly.
"""
path_mock.return_value = False
isdir_mock.return_value = False
open_mock = mock.mock_open()
# Dynamically mock out the built-in open function. Approach changes
# across Python versions.
try:
# For Python3+
import builtins # NOQA
module = 'builtins'
except ImportError:
# For Python2+
module = '__builtin__'
with mock.patch('{0}.open'.format(module), open_mock):
s = server.KmipServer(log_path='/test/path/server.log')
path_mock.assert_called_once_with('/test/path/server.log')
isdir_mock.assert_called_once_with('/test/path')
makedirs_mock.assert_called_once_with('/test/path')
open_mock.assert_called_once_with('/test/path/server.log', 'w')
self.assertTrue(s._logger.addHandler.called)
s._logger.setLevel.assert_any_call(logging.DEBUG)
@mock.patch('kmip.services.server.engine.KmipEngine')
@mock.patch('kmip.services.auth.TLS12AuthenticationSuite')
@mock.patch('kmip.services.server.server.KmipServer._setup_logging')
def test_setup_configuration(self, logging_mock, auth_mock, engine_mock):
"""
Test that the server setup configuration works without error.
"""
s = server.KmipServer(
config_path=None,
policy_path=None
)
s.config = mock.MagicMock()
# Test the right calls are made when reinvoking config setup
s._setup_configuration(
'/etc/pykmip/server.conf',
'127.0.0.1',
5696,
'/etc/pykmip/certs/server.crt',
'/etc/pykmip/certs/server.key',
'/etc/pykmip/certs/ca.crt',
'Basic',
'/etc/pykmip/policies',
False,
'TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA',
'DEBUG'
)
s.config.load_settings.assert_called_with('/etc/pykmip/server.conf')
s.config.set_setting.assert_any_call('hostname', '127.0.0.1')
s.config.set_setting.assert_any_call('port', 5696)
s.config.set_setting.assert_any_call(
'certificate_path',
'/etc/pykmip/certs/server.crt'
)
s.config.set_setting.assert_any_call(
'key_path',
'/etc/pykmip/certs/server.key'
)
s.config.set_setting.assert_any_call(
'ca_path',
'/etc/pykmip/certs/ca.crt'
)
s.config.set_setting.assert_any_call('auth_suite', 'Basic')
s.config.set_setting.assert_any_call(
'policy_path',
'/etc/pykmip/policies'
)
s.config.set_setting.assert_any_call(
'enable_tls_client_auth',
False
)
s.config.set_setting.assert_any_call(
'tls_cipher_suites',
[
'TLS_RSA_WITH_AES_128_CBC_SHA',
'TLS_RSA_WITH_AES_256_CBC_SHA'
]
)
s.config.set_setting.assert_any_call('logging_level', 'DEBUG')
# Test that an attempt is made to instantiate the TLS 1.2 auth suite
s = server.KmipServer(
auth_suite='TLS1.2',
config_path=None,
policy_path=None
)
self.assertEqual('TLS1.2', s.config.settings.get('auth_suite'))
self.assertIsNotNone(s.auth_suite)
@mock.patch('kmip.services.server.engine.KmipEngine')
@mock.patch('kmip.services.server.server.KmipServer._setup_logging')
def test_start(self, logging_mock, engine_mock):
"""
Test that starting the KmipServer either runs as expected or generates
the expected error.
"""
a_mock = mock.MagicMock()
b_mock = mock.MagicMock()
s = server.KmipServer(
hostname='127.0.0.1',
port=5696,
auth_suite='Basic',
config_path=None,
policy_path=None,
tls_cipher_suites='TLS_RSA_WITH_AES_128_CBC_SHA'
)
s._logger = mock.MagicMock()
self.assertFalse(s._is_serving)
# Test that in ideal cases no errors are generated and the right
# log messages are.
with mock.patch('socket.socket') as socket_mock:
with mock.patch('ssl.wrap_socket') as ssl_mock:
socket_mock.return_value = a_mock
ssl_mock.return_value = b_mock
s.start()
s._logger.info.assert_any_call(
"Starting server socket handler."
)
s._logger.debug.assert_any_call("Configured cipher suites: 1")
s._logger.debug.assert_any_call("TLS_RSA_WITH_AES_128_CBC_SHA")
s._logger.debug.assert_any_call(
"Authentication suite ciphers to use: 1"
)
s._logger.debug.assert_any_call("AES128-SHA")
socket_mock.assert_called_once_with(
socket.AF_INET,
socket.SOCK_STREAM
)
a_mock.setsockopt.assert_called_once_with(
socket.SOL_SOCKET,
socket.SO_REUSEADDR,
1
)
self.assertTrue(ssl_mock.called)
b_mock.bind.assert_called_once_with(('127.0.0.1', 5696))
s._logger.info.assert_called_with(
"Server successfully bound socket handler to "
"127.0.0.1:5696"
)
self.assertTrue(s._is_serving)
a_mock.reset_mock()
b_mock.reset_mock()
# Test that a NetworkingError is generated if the socket bind fails.
with mock.patch('socket.socket') as socket_mock:
with mock.patch('ssl.wrap_socket') as ssl_mock:
socket_mock.return_value = a_mock
ssl_mock.return_value = b_mock
test_exception = Exception()
b_mock.bind.side_effect = test_exception
regex = (
"Server failed to bind socket handler to 127.0.0.1:5696"
)
self.assertRaisesRegexp(
exceptions.NetworkingError,
regex,
s.start
)
s._logger.info.assert_any_call(
"Starting server socket handler."
)
s._logger.exception.assert_called_once_with(test_exception)
@mock.patch('kmip.services.server.engine.KmipEngine')
@mock.patch('kmip.services.server.server.KmipServer._setup_logging')
def test_stop(self, logging_mock, engine_mock):
"""
Test that the right calls and log messages are triggered while
cleaning up the server and any remaining sessions.
"""
s = server.KmipServer(
hostname='127.0.0.1',
port=5696,
config_path=None,
policy_path=None
)
s._logger = mock.MagicMock()
s._socket = mock.MagicMock()
# Test the expected behavior for a normal server stop sequence
thread_mock = mock.MagicMock()
thread_mock.join = mock.MagicMock()
thread_mock.is_alive = mock.MagicMock(return_value=False)
thread_mock.name = 'TestThread'
with mock.patch('threading.enumerate') as threading_mock:
threading_mock.return_value = [thread_mock]
s.stop()
s._logger.info.assert_any_call(
"Cleaning up remaining connection threads."
)
self.assertTrue(threading_mock.called)
thread_mock.join.assert_called_once_with(10.0)
s._logger.info.assert_any_call(
"Cleanup succeeded for thread: TestThread"
)
s._logger.info.assert_any_call(
"Shutting down server socket handler."
)
s._socket.shutdown.assert_called_once_with(socket.SHUT_RDWR)
s._socket.close.assert_called_once_with()
# Test the expected behavior when stopping multiple server session
# threads goes wrong
thread_mock.reset_mock()
test_exception = Exception()
thread_mock.join = mock.MagicMock(side_effect=test_exception)
s._logger.reset_mock()
s._socket.reset_mock()
with mock.patch('threading.enumerate') as threading_mock:
threading_mock.return_value = [thread_mock]
s.stop()
s._logger.info.assert_any_call(
"Cleaning up remaining connection threads."
)
self.assertTrue(threading_mock.called)
thread_mock.join.assert_called_once_with(10.0)
s._logger.info.assert_any_call(
"Error occurred while attempting to cleanup thread: TestThread"
)
s._logger.exception.assert_called_once_with(test_exception)
s._logger.info.assert_any_call(
"Shutting down server socket handler."
)
s._socket.shutdown.assert_called_once_with(socket.SHUT_RDWR)
s._socket.close.assert_called_once_with()
thread_mock.reset_mock()
test_exception = Exception()
thread_mock.join = mock.MagicMock()
thread_mock.is_alive = mock.MagicMock(return_value=True)
s._logger.reset_mock()
s._socket.reset_mock()
with mock.patch('threading.enumerate') as threading_mock:
threading_mock.return_value = [thread_mock]
s.stop()
s._logger.info.assert_any_call(
"Cleaning up remaining connection threads."
)
self.assertTrue(threading_mock.called)
thread_mock.join.assert_called_once_with(10.0)
s._logger.warning.assert_any_call(
"Cleanup failed for thread: TestThread. Thread is still alive"
)
s._logger.info.assert_any_call(
"Shutting down server socket handler."
)
s._socket.shutdown.assert_called_once_with(socket.SHUT_RDWR)
s._socket.close.assert_called_once_with()
# Test that the right errors and log messages are generated when
# stopping the server goes wrong
s._logger.reset_mock()
s._socket.reset_mock()
test_exception = Exception()
s._socket.close = mock.MagicMock(side_effect=test_exception)
regex = "Server failed to shutdown socket handler."
self.assertRaisesRegexp(
exceptions.NetworkingError,
regex,
s.stop
)
s._logger.info.assert_any_call(
"Cleaning up remaining connection threads."
)
s._logger.info.assert_any_call(
"Shutting down server socket handler."
)
s._socket.shutdown.assert_called_once_with(socket.SHUT_RDWR)
s._socket.close.assert_called_once_with()
s._logger.exception(test_exception)
@mock.patch('kmip.services.server.engine.KmipEngine')
@mock.patch('kmip.services.server.server.KmipServer._setup_logging')
def test_serve(self, logging_mock, engine_mock):
"""
Test that the right calls and log messages are triggered while
serving connections.
"""
s = server.KmipServer(
hostname='127.0.0.1',
port=5696,
config_path=None,
policy_path=None
)
s._is_serving = True
s._logger = mock.MagicMock()
s._socket = mock.MagicMock()
s._setup_connection_handler = mock.MagicMock()
expected_error = KeyboardInterrupt
# Test the expected behavior for a normal server/interrupt sequence
s._socket.accept = mock.MagicMock(
side_effect=[('connection', 'address'), expected_error]
)
s.serve()
s._socket.listen.assert_called_once_with(5)
s._socket.accept.assert_any_call()
s._setup_connection_handler.assert_called_once_with(
'connection',
'address'
)
s._logger.warning.assert_called_with(
"Interrupting connection service."
)
s._logger.info.assert_called_with("Stopping connection service.")
# Test the behavior for an unexpected socket error.
unexpected_error = socket.error()
s._is_serving = True
s._logger.reset_mock()
s._socket.accept = mock.MagicMock(
side_effect=[unexpected_error, expected_error]
)
s.serve()
s._socket.accept.assert_any_call()
s._logger.warning.assert_any_call(
"Error detected while establishing new connection."
)
s._logger.exception.assert_called_with(unexpected_error)
s._logger.info.assert_called_with("Stopping connection service.")
# Test the behavior for an unexpected error.
unexpected_error = Exception()
s._is_serving = True
s._logger.reset_mock()
s._socket.accept = mock.MagicMock(
side_effect=[unexpected_error, expected_error]
)
s.serve()
s._socket.accept.assert_any_call()
s._logger.warning.assert_any_call(
"Error detected while establishing new connection."
)
s._logger.exception.assert_called_with(unexpected_error)
s._logger.info.assert_called_with("Stopping connection service.")
# Test the signal handler for each expected signal
s._is_serving = True
handler = signal.getsignal(signal.SIGINT)
args = (signal.SIGINT, None)
self.assertRaisesRegex(
KeyboardInterrupt,
"SIGINT received",
handler,
*args
)
self.assertFalse(s._is_serving)
s._is_serving = True
handler = signal.getsignal(signal.SIGTERM)
handler(None, None)
self.assertFalse(s._is_serving)
@mock.patch('kmip.services.server.engine.KmipEngine')
@mock.patch('kmip.services.server.server.KmipServer._setup_logging')
def test_setup_connection_handler(self, logging_mock, engine_mock):
"""
Test that a KmipSession can be successfully created and spun off from
the KmipServer.
"""
s = server.KmipServer(
hostname='127.0.0.1',
port=5696,
config_path=None,
policy_path=None
)
s._logger = mock.MagicMock()
# Test that the right calls and log messages are made when
# starting a new session.
with mock.patch(
'kmip.services.server.session.KmipSession.start'
) as session_mock:
address = ('127.0.0.1', 5696)
s._setup_connection_handler(None, address)
s._logger.info.assert_any_call(
"Receiving incoming connection from: 127.0.0.1:5696"
)
s._logger.info.assert_any_call(
"Dedicating session 00000001 to 127.0.0.1:5696"
)
session_mock.assert_called_once_with()
self.assertEqual(2, s._session_id)
# Test that the right error messages are logged when the session
# fails to start.
test_exception = Exception()
with mock.patch(
'kmip.services.server.session.KmipSession.start',
side_effect=test_exception
) as session_mock:
address = ('127.0.0.1', 5696)
s._setup_connection_handler(None, address)
s._logger.info.assert_any_call(
"Receiving incoming connection from: 127.0.0.1:5696"
)
s._logger.info.assert_any_call(
"Dedicating session 00000001 to 127.0.0.1:5696"
)
session_mock.assert_called_once_with()
s._logger.warning.assert_called_once_with(
"Failure occurred while starting session: 00000002"
)
s._logger.exception.assert_called_once_with(test_exception)
self.assertEqual(3, s._session_id)
@mock.patch('kmip.services.server.engine.KmipEngine')
@mock.patch('kmip.services.server.server.KmipServer._setup_logging')
def test_as_context_manager(self, logging_mock, engine_mock):
"""
Test that the right methods are called when the KmipServer is used
as a context manager.
"""
s = server.KmipServer(
hostname='127.0.0.1',
port=5696,
config_path=None,
policy_path=None
)
s._logger = mock.MagicMock()
s.start = mock.MagicMock()
s.stop = mock.MagicMock()
with s:
pass
self.assertTrue(s.start.called)
self.assertTrue(s.stop.called)