From 69a7b49d3278474330e4a46573491c94bef555a6 Mon Sep 17 00:00:00 2001 From: Peter Hamilton Date: Wed, 27 Sep 2017 17:41:14 -0400 Subject: [PATCH] Add a logging level configuration option This change adds a logging level configuration option for the server, allowing the admin to control what server activity gets collected for logging. Unit tests have been added and updated to cover this new configuration setting. --- examples/server.conf | 1 + kmip/services/server/config.py | 41 ++++++++++- kmip/services/server/server.py | 32 ++++++++- .../tests/unit/services/server/test_config.py | 70 +++++++++++++++++++ .../tests/unit/services/server/test_server.py | 6 +- 5 files changed, 143 insertions(+), 7 deletions(-) diff --git a/examples/server.conf b/examples/server.conf index 5ea0f41..6801f94 100644 --- a/examples/server.conf +++ b/examples/server.conf @@ -11,3 +11,4 @@ tls_cipher_suites= EXAMPLE_CIPHER_SUITE_1 EXAMPLE_CIPHER_SUITE_2 EXAMPLE_CIPHER_SUITE_3 +logging_level=INFO diff --git a/kmip/services/server/config.py b/kmip/services/server/config.py index c243c74..6230c48 100644 --- a/kmip/services/server/config.py +++ b/kmip/services/server/config.py @@ -36,6 +36,7 @@ class KmipServerConfig(object): self.settings = dict() self.settings['enable_tls_client_auth'] = True self.settings['tls_cipher_suites'] = [] + self.settings['logging_level'] = logging.INFO self._expected_settings = [ 'hostname', @@ -48,7 +49,8 @@ class KmipServerConfig(object): self._optional_settings = [ 'policy_path', 'enable_tls_client_auth', - 'tls_cipher_suites' + 'tls_cipher_suites', + 'logging_level' ] def set_setting(self, setting, value): @@ -88,8 +90,10 @@ class KmipServerConfig(object): self._set_policy_path(value) elif setting == 'enable_tls_client_auth': self._set_enable_tls_client_auth(value) - else: + elif setting == 'tls_cipher_suites': self._set_tls_cipher_suites(value) + else: + self._set_logging_level(value) def load_settings(self, path): """ @@ -164,6 +168,10 @@ class KmipServerConfig(object): self._set_tls_cipher_suites( parser.get('server', 'tls_cipher_suites') ) + if parser.has_option('server', 'logging_level'): + self._set_logging_level( + parser.get('server', 'logging_level') + ) def _set_hostname(self, value): if isinstance(value, six.string_types): @@ -290,3 +298,32 @@ class KmipServerConfig(object): "The TLS cipher suites must be a set of strings representing " "cipher suite names." ) + + def _set_logging_level(self, value): + if value is None: + self.settings['logging_level'] = logging.INFO + return + + logging_levels = { + "DEBUG": logging.DEBUG, + "INFO": logging.INFO, + "WARNING": logging.WARNING, + "ERROR": logging.ERROR, + "CRITICAL": logging.CRITICAL + } + if value in logging_levels.values(): + self.settings['logging_level'] = value + elif isinstance(value, six.string_types): + level = logging_levels.get(value.upper()) + if level: + self.settings['logging_level'] = level + else: + raise exceptions.ConfigurationError( + "The logging level must be a string representing a valid " + "logging level." + ) + else: + raise exceptions.ConfigurationError( + "The logging level must be a string representing a valid " + "logging level." + ) diff --git a/kmip/services/server/server.py b/kmip/services/server/server.py index a0cdf26..a020db3 100644 --- a/kmip/services/server/server.py +++ b/kmip/services/server/server.py @@ -52,7 +52,8 @@ class KmipServer(object): log_path='/var/log/pykmip/server.log', policy_path=None, enable_tls_client_auth=None, - tls_cipher_suites=None + tls_cipher_suites=None, + logging_level=None ): """ Create a KmipServer. @@ -107,6 +108,11 @@ class KmipServer(object): be used by the server when establishing a TLS connection with a client. Optional, defaults to None. If None, the default set of TLS cipher suites will be used. + logging_level (string): A logging level enumeration defined by the + logging package (e.g., DEBUG, INFO). Sets the base logging + level for the server. All log messages logged at this level or + higher in criticality will be logged. All log messages lower + in criticality will not be logged. Optional, defaults to None. """ self._logger = logging.getLogger('kmip.server') self._setup_logging(log_path) @@ -122,9 +128,12 @@ class KmipServer(object): auth_suite, policy_path, enable_tls_client_auth, - tls_cipher_suites + tls_cipher_suites, + logging_level ) + self._logger.setLevel(self.config.settings.get('logging_level')) + cipher_suites = self.config.settings.get('tls_cipher_suites') if self.config.settings.get('auth_suite') == 'TLS1.2': self.auth_suite = auth.TLS12AuthenticationSuite(cipher_suites) @@ -169,7 +178,8 @@ class KmipServer(object): auth_suite=None, policy_path=None, enable_tls_client_auth=None, - tls_cipher_suites=None + tls_cipher_suites=None, + logging_level=None ): if path: self.config.load_settings(path) @@ -198,6 +208,8 @@ class KmipServer(object): 'tls_cipher_suites', tls_cipher_suites.split(',') ) + if logging_level: + self.config.set_setting('logging_level', logging_level) def start(self): """ @@ -536,6 +548,18 @@ def build_argument_parser(): "Optional, defaults to None." ) ) + parser.add_option( + "-v", + "--logging_level", + action="store", + type="str", + default=None, + dest="logging_level", + help=( + "A string representing the logging level for the server (e.g., " + "DEBUG, INFO). Optional, defaults to None." + ) + ) return parser @@ -566,6 +590,8 @@ def main(args=None): kwargs['policy_path'] = opts.policy_path if opts.ignore_tls_client_auth: kwargs['enable_tls_client_auth'] = False + if opts.logging_level: + kwargs['logging_level'] = opts.logging_level # Create and start the server. s = KmipServer(**kwargs) diff --git a/kmip/tests/unit/services/server/test_config.py b/kmip/tests/unit/services/server/test_config.py index 90c669e..2cf669f 100644 --- a/kmip/tests/unit/services/server/test_config.py +++ b/kmip/tests/unit/services/server/test_config.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. +import logging import mock from six.moves import configparser @@ -65,6 +66,7 @@ class TestKmipServerConfig(testtools.TestCase): c._set_policy_path = mock.MagicMock() c._set_enable_tls_client_auth = mock.MagicMock() c._set_tls_cipher_suites = mock.MagicMock() + c._set_logging_level = mock.MagicMock() # Test the right error is generated when setting an unsupported # setting. @@ -109,6 +111,9 @@ class TestKmipServerConfig(testtools.TestCase): c.set_setting('tls_cipher_suites', []) c._set_tls_cipher_suites.assert_called_once_with([]) + c.set_setting('logging_level', 'WARNING') + c._set_logging_level.assert_called_once_with('WARNING') + def test_load_settings(self): """ Test that the right calls are made and the right errors generated when @@ -163,6 +168,7 @@ class TestKmipServerConfig(testtools.TestCase): c._set_policy_path = mock.MagicMock() c._set_enable_tls_client_auth = mock.MagicMock() c._set_tls_cipher_suites = mock.MagicMock() + c._set_logging_level = mock.MagicMock() # Test that the right calls are made when correctly parsing settings. parser = configparser.SafeConfigParser() @@ -180,6 +186,7 @@ class TestKmipServerConfig(testtools.TestCase): 'tls_cipher_suites', "\n TLS_RSA_WITH_AES_256_CBC_SHA256" ) + parser.set('server', 'logging_level', 'ERROR') c._parse_settings(parser) @@ -196,6 +203,7 @@ class TestKmipServerConfig(testtools.TestCase): c._set_tls_cipher_suites.assert_called_once_with( "\n TLS_RSA_WITH_AES_256_CBC_SHA256" ) + c._set_logging_level.assert_called_once_with('ERROR') # Test that a ConfigurationError is generated when the expected # section is missing. @@ -686,3 +694,65 @@ class TestKmipServerConfig(testtools.TestCase): c._set_tls_cipher_suites, *args ) + + def test_set_logging_level(self): + """ + Test that the logging_level configuration property can be set + correctly with a value expected from the config file. + """ + c = config.KmipServerConfig() + c._logger = mock.MagicMock() + + c._set_logging_level("DEBUG") + self.assertEqual(logging.DEBUG, c.settings.get('logging_level')) + + def test_set_logging_level_enum(self): + """ + Test that the logging_level configuration property can be set + correctly with a value expected from the server constructor. + """ + c = config.KmipServerConfig() + c._logger = mock.MagicMock() + + c._set_logging_level(logging.DEBUG) + self.assertEqual(logging.DEBUG, c.settings.get('logging_level')) + + def test_set_logging_level_default(self): + """ + Test that the logging_level configuration property can be set + correctly without specifying a value. + """ + c = config.KmipServerConfig() + c._logger = mock.MagicMock() + + c._set_logging_level(logging.DEBUG) + self.assertEqual(logging.DEBUG, c.settings.get('logging_level')) + + c._set_logging_level(None) + self.assertEqual(logging.INFO, c.settings.get('logging_level')) + + def test_set_logging_level_invalid_value(self): + """ + Test that the right error is raised when an invalid value is used to + set the tls_cipher_suites configuration property. + """ + c = config.KmipServerConfig() + c._logger = mock.MagicMock() + + args = (0,) + self.assertRaisesRegexp( + exceptions.ConfigurationError, + "The logging level must be a string representing a valid logging " + "level.", + c._set_logging_level, + *args + ) + + args = ('invalid',) + self.assertRaisesRegexp( + exceptions.ConfigurationError, + "The logging level must be a string representing a valid logging " + "level.", + c._set_logging_level, + *args + ) diff --git a/kmip/tests/unit/services/server/test_server.py b/kmip/tests/unit/services/server/test_server.py index 993175a..878711c 100644 --- a/kmip/tests/unit/services/server/test_server.py +++ b/kmip/tests/unit/services/server/test_server.py @@ -95,7 +95,7 @@ class TestKmipServer(testtools.TestCase): open_mock.assert_called_once_with('/test/path/server.log', 'w') self.assertTrue(s._logger.addHandler.called) - s._logger.setLevel.assert_called_once_with(logging.DEBUG) + s._logger.setLevel.assert_any_call(logging.DEBUG) @mock.patch('kmip.services.server.engine.KmipEngine') @mock.patch('kmip.services.auth.TLS12AuthenticationSuite') @@ -121,7 +121,8 @@ class TestKmipServer(testtools.TestCase): 'Basic', '/etc/pykmip/policies', False, - 'TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA' + 'TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA', + 'DEBUG' ) s.config.load_settings.assert_called_with('/etc/pykmip/server.conf') @@ -155,6 +156,7 @@ class TestKmipServer(testtools.TestCase): 'TLS_RSA_WITH_AES_256_CBC_SHA' ] ) + s.config.set_setting.assert_any_call('logging_level', 'DEBUG') # Test that an attempt is made to instantiate the TLS 1.2 auth suite s = server.KmipServer(