mirror of https://github.com/OpenKMIP/PyKMIP.git
Add a logging level configuration option
This change adds a logging level configuration option for the server, allowing the admin to control what server activity gets collected for logging. Unit tests have been added and updated to cover this new configuration setting.
This commit is contained in:
parent
23e6e89ec7
commit
69a7b49d32
|
@ -11,3 +11,4 @@ tls_cipher_suites=
|
|||
EXAMPLE_CIPHER_SUITE_1
|
||||
EXAMPLE_CIPHER_SUITE_2
|
||||
EXAMPLE_CIPHER_SUITE_3
|
||||
logging_level=INFO
|
||||
|
|
|
@ -36,6 +36,7 @@ class KmipServerConfig(object):
|
|||
self.settings = dict()
|
||||
self.settings['enable_tls_client_auth'] = True
|
||||
self.settings['tls_cipher_suites'] = []
|
||||
self.settings['logging_level'] = logging.INFO
|
||||
|
||||
self._expected_settings = [
|
||||
'hostname',
|
||||
|
@ -48,7 +49,8 @@ class KmipServerConfig(object):
|
|||
self._optional_settings = [
|
||||
'policy_path',
|
||||
'enable_tls_client_auth',
|
||||
'tls_cipher_suites'
|
||||
'tls_cipher_suites',
|
||||
'logging_level'
|
||||
]
|
||||
|
||||
def set_setting(self, setting, value):
|
||||
|
@ -88,8 +90,10 @@ class KmipServerConfig(object):
|
|||
self._set_policy_path(value)
|
||||
elif setting == 'enable_tls_client_auth':
|
||||
self._set_enable_tls_client_auth(value)
|
||||
else:
|
||||
elif setting == 'tls_cipher_suites':
|
||||
self._set_tls_cipher_suites(value)
|
||||
else:
|
||||
self._set_logging_level(value)
|
||||
|
||||
def load_settings(self, path):
|
||||
"""
|
||||
|
@ -164,6 +168,10 @@ class KmipServerConfig(object):
|
|||
self._set_tls_cipher_suites(
|
||||
parser.get('server', 'tls_cipher_suites')
|
||||
)
|
||||
if parser.has_option('server', 'logging_level'):
|
||||
self._set_logging_level(
|
||||
parser.get('server', 'logging_level')
|
||||
)
|
||||
|
||||
def _set_hostname(self, value):
|
||||
if isinstance(value, six.string_types):
|
||||
|
@ -290,3 +298,32 @@ class KmipServerConfig(object):
|
|||
"The TLS cipher suites must be a set of strings representing "
|
||||
"cipher suite names."
|
||||
)
|
||||
|
||||
def _set_logging_level(self, value):
|
||||
if value is None:
|
||||
self.settings['logging_level'] = logging.INFO
|
||||
return
|
||||
|
||||
logging_levels = {
|
||||
"DEBUG": logging.DEBUG,
|
||||
"INFO": logging.INFO,
|
||||
"WARNING": logging.WARNING,
|
||||
"ERROR": logging.ERROR,
|
||||
"CRITICAL": logging.CRITICAL
|
||||
}
|
||||
if value in logging_levels.values():
|
||||
self.settings['logging_level'] = value
|
||||
elif isinstance(value, six.string_types):
|
||||
level = logging_levels.get(value.upper())
|
||||
if level:
|
||||
self.settings['logging_level'] = level
|
||||
else:
|
||||
raise exceptions.ConfigurationError(
|
||||
"The logging level must be a string representing a valid "
|
||||
"logging level."
|
||||
)
|
||||
else:
|
||||
raise exceptions.ConfigurationError(
|
||||
"The logging level must be a string representing a valid "
|
||||
"logging level."
|
||||
)
|
||||
|
|
|
@ -52,7 +52,8 @@ class KmipServer(object):
|
|||
log_path='/var/log/pykmip/server.log',
|
||||
policy_path=None,
|
||||
enable_tls_client_auth=None,
|
||||
tls_cipher_suites=None
|
||||
tls_cipher_suites=None,
|
||||
logging_level=None
|
||||
):
|
||||
"""
|
||||
Create a KmipServer.
|
||||
|
@ -107,6 +108,11 @@ class KmipServer(object):
|
|||
be used by the server when establishing a TLS connection with
|
||||
a client. Optional, defaults to None. If None, the default set
|
||||
of TLS cipher suites will be used.
|
||||
logging_level (string): A logging level enumeration defined by the
|
||||
logging package (e.g., DEBUG, INFO). Sets the base logging
|
||||
level for the server. All log messages logged at this level or
|
||||
higher in criticality will be logged. All log messages lower
|
||||
in criticality will not be logged. Optional, defaults to None.
|
||||
"""
|
||||
self._logger = logging.getLogger('kmip.server')
|
||||
self._setup_logging(log_path)
|
||||
|
@ -122,9 +128,12 @@ class KmipServer(object):
|
|||
auth_suite,
|
||||
policy_path,
|
||||
enable_tls_client_auth,
|
||||
tls_cipher_suites
|
||||
tls_cipher_suites,
|
||||
logging_level
|
||||
)
|
||||
|
||||
self._logger.setLevel(self.config.settings.get('logging_level'))
|
||||
|
||||
cipher_suites = self.config.settings.get('tls_cipher_suites')
|
||||
if self.config.settings.get('auth_suite') == 'TLS1.2':
|
||||
self.auth_suite = auth.TLS12AuthenticationSuite(cipher_suites)
|
||||
|
@ -169,7 +178,8 @@ class KmipServer(object):
|
|||
auth_suite=None,
|
||||
policy_path=None,
|
||||
enable_tls_client_auth=None,
|
||||
tls_cipher_suites=None
|
||||
tls_cipher_suites=None,
|
||||
logging_level=None
|
||||
):
|
||||
if path:
|
||||
self.config.load_settings(path)
|
||||
|
@ -198,6 +208,8 @@ class KmipServer(object):
|
|||
'tls_cipher_suites',
|
||||
tls_cipher_suites.split(',')
|
||||
)
|
||||
if logging_level:
|
||||
self.config.set_setting('logging_level', logging_level)
|
||||
|
||||
def start(self):
|
||||
"""
|
||||
|
@ -536,6 +548,18 @@ def build_argument_parser():
|
|||
"Optional, defaults to None."
|
||||
)
|
||||
)
|
||||
parser.add_option(
|
||||
"-v",
|
||||
"--logging_level",
|
||||
action="store",
|
||||
type="str",
|
||||
default=None,
|
||||
dest="logging_level",
|
||||
help=(
|
||||
"A string representing the logging level for the server (e.g., "
|
||||
"DEBUG, INFO). Optional, defaults to None."
|
||||
)
|
||||
)
|
||||
|
||||
return parser
|
||||
|
||||
|
@ -566,6 +590,8 @@ def main(args=None):
|
|||
kwargs['policy_path'] = opts.policy_path
|
||||
if opts.ignore_tls_client_auth:
|
||||
kwargs['enable_tls_client_auth'] = False
|
||||
if opts.logging_level:
|
||||
kwargs['logging_level'] = opts.logging_level
|
||||
|
||||
# Create and start the server.
|
||||
s = KmipServer(**kwargs)
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import mock
|
||||
|
||||
from six.moves import configparser
|
||||
|
@ -65,6 +66,7 @@ class TestKmipServerConfig(testtools.TestCase):
|
|||
c._set_policy_path = mock.MagicMock()
|
||||
c._set_enable_tls_client_auth = mock.MagicMock()
|
||||
c._set_tls_cipher_suites = mock.MagicMock()
|
||||
c._set_logging_level = mock.MagicMock()
|
||||
|
||||
# Test the right error is generated when setting an unsupported
|
||||
# setting.
|
||||
|
@ -109,6 +111,9 @@ class TestKmipServerConfig(testtools.TestCase):
|
|||
c.set_setting('tls_cipher_suites', [])
|
||||
c._set_tls_cipher_suites.assert_called_once_with([])
|
||||
|
||||
c.set_setting('logging_level', 'WARNING')
|
||||
c._set_logging_level.assert_called_once_with('WARNING')
|
||||
|
||||
def test_load_settings(self):
|
||||
"""
|
||||
Test that the right calls are made and the right errors generated when
|
||||
|
@ -163,6 +168,7 @@ class TestKmipServerConfig(testtools.TestCase):
|
|||
c._set_policy_path = mock.MagicMock()
|
||||
c._set_enable_tls_client_auth = mock.MagicMock()
|
||||
c._set_tls_cipher_suites = mock.MagicMock()
|
||||
c._set_logging_level = mock.MagicMock()
|
||||
|
||||
# Test that the right calls are made when correctly parsing settings.
|
||||
parser = configparser.SafeConfigParser()
|
||||
|
@ -180,6 +186,7 @@ class TestKmipServerConfig(testtools.TestCase):
|
|||
'tls_cipher_suites',
|
||||
"\n TLS_RSA_WITH_AES_256_CBC_SHA256"
|
||||
)
|
||||
parser.set('server', 'logging_level', 'ERROR')
|
||||
|
||||
c._parse_settings(parser)
|
||||
|
||||
|
@ -196,6 +203,7 @@ class TestKmipServerConfig(testtools.TestCase):
|
|||
c._set_tls_cipher_suites.assert_called_once_with(
|
||||
"\n TLS_RSA_WITH_AES_256_CBC_SHA256"
|
||||
)
|
||||
c._set_logging_level.assert_called_once_with('ERROR')
|
||||
|
||||
# Test that a ConfigurationError is generated when the expected
|
||||
# section is missing.
|
||||
|
@ -686,3 +694,65 @@ class TestKmipServerConfig(testtools.TestCase):
|
|||
c._set_tls_cipher_suites,
|
||||
*args
|
||||
)
|
||||
|
||||
def test_set_logging_level(self):
|
||||
"""
|
||||
Test that the logging_level configuration property can be set
|
||||
correctly with a value expected from the config file.
|
||||
"""
|
||||
c = config.KmipServerConfig()
|
||||
c._logger = mock.MagicMock()
|
||||
|
||||
c._set_logging_level("DEBUG")
|
||||
self.assertEqual(logging.DEBUG, c.settings.get('logging_level'))
|
||||
|
||||
def test_set_logging_level_enum(self):
|
||||
"""
|
||||
Test that the logging_level configuration property can be set
|
||||
correctly with a value expected from the server constructor.
|
||||
"""
|
||||
c = config.KmipServerConfig()
|
||||
c._logger = mock.MagicMock()
|
||||
|
||||
c._set_logging_level(logging.DEBUG)
|
||||
self.assertEqual(logging.DEBUG, c.settings.get('logging_level'))
|
||||
|
||||
def test_set_logging_level_default(self):
|
||||
"""
|
||||
Test that the logging_level configuration property can be set
|
||||
correctly without specifying a value.
|
||||
"""
|
||||
c = config.KmipServerConfig()
|
||||
c._logger = mock.MagicMock()
|
||||
|
||||
c._set_logging_level(logging.DEBUG)
|
||||
self.assertEqual(logging.DEBUG, c.settings.get('logging_level'))
|
||||
|
||||
c._set_logging_level(None)
|
||||
self.assertEqual(logging.INFO, c.settings.get('logging_level'))
|
||||
|
||||
def test_set_logging_level_invalid_value(self):
|
||||
"""
|
||||
Test that the right error is raised when an invalid value is used to
|
||||
set the tls_cipher_suites configuration property.
|
||||
"""
|
||||
c = config.KmipServerConfig()
|
||||
c._logger = mock.MagicMock()
|
||||
|
||||
args = (0,)
|
||||
self.assertRaisesRegexp(
|
||||
exceptions.ConfigurationError,
|
||||
"The logging level must be a string representing a valid logging "
|
||||
"level.",
|
||||
c._set_logging_level,
|
||||
*args
|
||||
)
|
||||
|
||||
args = ('invalid',)
|
||||
self.assertRaisesRegexp(
|
||||
exceptions.ConfigurationError,
|
||||
"The logging level must be a string representing a valid logging "
|
||||
"level.",
|
||||
c._set_logging_level,
|
||||
*args
|
||||
)
|
||||
|
|
|
@ -95,7 +95,7 @@ class TestKmipServer(testtools.TestCase):
|
|||
open_mock.assert_called_once_with('/test/path/server.log', 'w')
|
||||
|
||||
self.assertTrue(s._logger.addHandler.called)
|
||||
s._logger.setLevel.assert_called_once_with(logging.DEBUG)
|
||||
s._logger.setLevel.assert_any_call(logging.DEBUG)
|
||||
|
||||
@mock.patch('kmip.services.server.engine.KmipEngine')
|
||||
@mock.patch('kmip.services.auth.TLS12AuthenticationSuite')
|
||||
|
@ -121,7 +121,8 @@ class TestKmipServer(testtools.TestCase):
|
|||
'Basic',
|
||||
'/etc/pykmip/policies',
|
||||
False,
|
||||
'TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA'
|
||||
'TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA',
|
||||
'DEBUG'
|
||||
)
|
||||
|
||||
s.config.load_settings.assert_called_with('/etc/pykmip/server.conf')
|
||||
|
@ -155,6 +156,7 @@ class TestKmipServer(testtools.TestCase):
|
|||
'TLS_RSA_WITH_AES_256_CBC_SHA'
|
||||
]
|
||||
)
|
||||
s.config.set_setting.assert_any_call('logging_level', 'DEBUG')
|
||||
|
||||
# Test that an attempt is made to instantiate the TLS 1.2 auth suite
|
||||
s = server.KmipServer(
|
||||
|
|
Loading…
Reference in New Issue