Merge pull request #345 from OpenKMIP/feat/add-log-level-config

Add a logging level configuration option
This commit is contained in:
Peter Hamilton 2017-09-27 18:07:59 -04:00 committed by GitHub
commit aa798d939c
5 changed files with 143 additions and 7 deletions

View File

@ -11,3 +11,4 @@ tls_cipher_suites=
EXAMPLE_CIPHER_SUITE_1
EXAMPLE_CIPHER_SUITE_2
EXAMPLE_CIPHER_SUITE_3
logging_level=INFO

View File

@ -36,6 +36,7 @@ class KmipServerConfig(object):
self.settings = dict()
self.settings['enable_tls_client_auth'] = True
self.settings['tls_cipher_suites'] = []
self.settings['logging_level'] = logging.INFO
self._expected_settings = [
'hostname',
@ -48,7 +49,8 @@ class KmipServerConfig(object):
self._optional_settings = [
'policy_path',
'enable_tls_client_auth',
'tls_cipher_suites'
'tls_cipher_suites',
'logging_level'
]
def set_setting(self, setting, value):
@ -88,8 +90,10 @@ class KmipServerConfig(object):
self._set_policy_path(value)
elif setting == 'enable_tls_client_auth':
self._set_enable_tls_client_auth(value)
else:
elif setting == 'tls_cipher_suites':
self._set_tls_cipher_suites(value)
else:
self._set_logging_level(value)
def load_settings(self, path):
"""
@ -164,6 +168,10 @@ class KmipServerConfig(object):
self._set_tls_cipher_suites(
parser.get('server', 'tls_cipher_suites')
)
if parser.has_option('server', 'logging_level'):
self._set_logging_level(
parser.get('server', 'logging_level')
)
def _set_hostname(self, value):
if isinstance(value, six.string_types):
@ -290,3 +298,32 @@ class KmipServerConfig(object):
"The TLS cipher suites must be a set of strings representing "
"cipher suite names."
)
def _set_logging_level(self, value):
if value is None:
self.settings['logging_level'] = logging.INFO
return
logging_levels = {
"DEBUG": logging.DEBUG,
"INFO": logging.INFO,
"WARNING": logging.WARNING,
"ERROR": logging.ERROR,
"CRITICAL": logging.CRITICAL
}
if value in logging_levels.values():
self.settings['logging_level'] = value
elif isinstance(value, six.string_types):
level = logging_levels.get(value.upper())
if level:
self.settings['logging_level'] = level
else:
raise exceptions.ConfigurationError(
"The logging level must be a string representing a valid "
"logging level."
)
else:
raise exceptions.ConfigurationError(
"The logging level must be a string representing a valid "
"logging level."
)

View File

@ -52,7 +52,8 @@ class KmipServer(object):
log_path='/var/log/pykmip/server.log',
policy_path=None,
enable_tls_client_auth=None,
tls_cipher_suites=None
tls_cipher_suites=None,
logging_level=None
):
"""
Create a KmipServer.
@ -107,6 +108,11 @@ class KmipServer(object):
be used by the server when establishing a TLS connection with
a client. Optional, defaults to None. If None, the default set
of TLS cipher suites will be used.
logging_level (string): A logging level enumeration defined by the
logging package (e.g., DEBUG, INFO). Sets the base logging
level for the server. All log messages logged at this level or
higher in criticality will be logged. All log messages lower
in criticality will not be logged. Optional, defaults to None.
"""
self._logger = logging.getLogger('kmip.server')
self._setup_logging(log_path)
@ -122,9 +128,12 @@ class KmipServer(object):
auth_suite,
policy_path,
enable_tls_client_auth,
tls_cipher_suites
tls_cipher_suites,
logging_level
)
self._logger.setLevel(self.config.settings.get('logging_level'))
cipher_suites = self.config.settings.get('tls_cipher_suites')
if self.config.settings.get('auth_suite') == 'TLS1.2':
self.auth_suite = auth.TLS12AuthenticationSuite(cipher_suites)
@ -169,7 +178,8 @@ class KmipServer(object):
auth_suite=None,
policy_path=None,
enable_tls_client_auth=None,
tls_cipher_suites=None
tls_cipher_suites=None,
logging_level=None
):
if path:
self.config.load_settings(path)
@ -198,6 +208,8 @@ class KmipServer(object):
'tls_cipher_suites',
tls_cipher_suites.split(',')
)
if logging_level:
self.config.set_setting('logging_level', logging_level)
def start(self):
"""
@ -536,6 +548,18 @@ def build_argument_parser():
"Optional, defaults to None."
)
)
parser.add_option(
"-v",
"--logging_level",
action="store",
type="str",
default=None,
dest="logging_level",
help=(
"A string representing the logging level for the server (e.g., "
"DEBUG, INFO). Optional, defaults to None."
)
)
return parser
@ -566,6 +590,8 @@ def main(args=None):
kwargs['policy_path'] = opts.policy_path
if opts.ignore_tls_client_auth:
kwargs['enable_tls_client_auth'] = False
if opts.logging_level:
kwargs['logging_level'] = opts.logging_level
# Create and start the server.
s = KmipServer(**kwargs)

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import logging
import mock
from six.moves import configparser
@ -65,6 +66,7 @@ class TestKmipServerConfig(testtools.TestCase):
c._set_policy_path = mock.MagicMock()
c._set_enable_tls_client_auth = mock.MagicMock()
c._set_tls_cipher_suites = mock.MagicMock()
c._set_logging_level = mock.MagicMock()
# Test the right error is generated when setting an unsupported
# setting.
@ -109,6 +111,9 @@ class TestKmipServerConfig(testtools.TestCase):
c.set_setting('tls_cipher_suites', [])
c._set_tls_cipher_suites.assert_called_once_with([])
c.set_setting('logging_level', 'WARNING')
c._set_logging_level.assert_called_once_with('WARNING')
def test_load_settings(self):
"""
Test that the right calls are made and the right errors generated when
@ -163,6 +168,7 @@ class TestKmipServerConfig(testtools.TestCase):
c._set_policy_path = mock.MagicMock()
c._set_enable_tls_client_auth = mock.MagicMock()
c._set_tls_cipher_suites = mock.MagicMock()
c._set_logging_level = mock.MagicMock()
# Test that the right calls are made when correctly parsing settings.
parser = configparser.SafeConfigParser()
@ -180,6 +186,7 @@ class TestKmipServerConfig(testtools.TestCase):
'tls_cipher_suites',
"\n TLS_RSA_WITH_AES_256_CBC_SHA256"
)
parser.set('server', 'logging_level', 'ERROR')
c._parse_settings(parser)
@ -196,6 +203,7 @@ class TestKmipServerConfig(testtools.TestCase):
c._set_tls_cipher_suites.assert_called_once_with(
"\n TLS_RSA_WITH_AES_256_CBC_SHA256"
)
c._set_logging_level.assert_called_once_with('ERROR')
# Test that a ConfigurationError is generated when the expected
# section is missing.
@ -686,3 +694,65 @@ class TestKmipServerConfig(testtools.TestCase):
c._set_tls_cipher_suites,
*args
)
def test_set_logging_level(self):
"""
Test that the logging_level configuration property can be set
correctly with a value expected from the config file.
"""
c = config.KmipServerConfig()
c._logger = mock.MagicMock()
c._set_logging_level("DEBUG")
self.assertEqual(logging.DEBUG, c.settings.get('logging_level'))
def test_set_logging_level_enum(self):
"""
Test that the logging_level configuration property can be set
correctly with a value expected from the server constructor.
"""
c = config.KmipServerConfig()
c._logger = mock.MagicMock()
c._set_logging_level(logging.DEBUG)
self.assertEqual(logging.DEBUG, c.settings.get('logging_level'))
def test_set_logging_level_default(self):
"""
Test that the logging_level configuration property can be set
correctly without specifying a value.
"""
c = config.KmipServerConfig()
c._logger = mock.MagicMock()
c._set_logging_level(logging.DEBUG)
self.assertEqual(logging.DEBUG, c.settings.get('logging_level'))
c._set_logging_level(None)
self.assertEqual(logging.INFO, c.settings.get('logging_level'))
def test_set_logging_level_invalid_value(self):
"""
Test that the right error is raised when an invalid value is used to
set the tls_cipher_suites configuration property.
"""
c = config.KmipServerConfig()
c._logger = mock.MagicMock()
args = (0,)
self.assertRaisesRegexp(
exceptions.ConfigurationError,
"The logging level must be a string representing a valid logging "
"level.",
c._set_logging_level,
*args
)
args = ('invalid',)
self.assertRaisesRegexp(
exceptions.ConfigurationError,
"The logging level must be a string representing a valid logging "
"level.",
c._set_logging_level,
*args
)

View File

@ -95,7 +95,7 @@ class TestKmipServer(testtools.TestCase):
open_mock.assert_called_once_with('/test/path/server.log', 'w')
self.assertTrue(s._logger.addHandler.called)
s._logger.setLevel.assert_called_once_with(logging.DEBUG)
s._logger.setLevel.assert_any_call(logging.DEBUG)
@mock.patch('kmip.services.server.engine.KmipEngine')
@mock.patch('kmip.services.auth.TLS12AuthenticationSuite')
@ -121,7 +121,8 @@ class TestKmipServer(testtools.TestCase):
'Basic',
'/etc/pykmip/policies',
False,
'TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA'
'TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA',
'DEBUG'
)
s.config.load_settings.assert_called_with('/etc/pykmip/server.conf')
@ -155,6 +156,7 @@ class TestKmipServer(testtools.TestCase):
'TLS_RSA_WITH_AES_256_CBC_SHA'
]
)
s.config.set_setting.assert_any_call('logging_level', 'DEBUG')
# Test that an attempt is made to instantiate the TLS 1.2 auth suite
s = server.KmipServer(