Merge pull request #1495 from ZyX-I/extended-logging

Add more logging options
This commit is contained in:
Nikolai Aleksandrovich Pavlov 2015-11-22 06:35:56 +03:00
commit eaa772f195
12 changed files with 809 additions and 119 deletions

View File

@ -24,6 +24,7 @@ matrix:
- python: "3.2"
- python: "3.3"
- python: "3.4"
- python: "3.5"
- python: "pypy"
- python: "pypy3"
- python: "2.6"

View File

@ -12,6 +12,8 @@ Vim overrides
Vim configuration can be overridden using the following options:
.. _local-configuration-overrides-vim-config:
``g:powerline_config_overrides``
Dictionary, recursively merged with contents of
:file:`powerline/config.json`.
@ -37,6 +39,17 @@ Vim configuration can be overridden using the following options:
was configured in :ref:`log_* options <config-common-log>`. Level is always
:ref:`log_level <config-common-log_level>`, same for format.
.. warning::
This variable is deprecated. Use :ref:`log_file option
<config-common-log>` in conjunction with
:py:class:`powerline.vim.VimVarHandler` class and :ref:`Vim config
overrides variable <local-configuration-overrides-vim-config>`. Using
this is also the only variant to make saving into the environment
variable the *only* place where log is saved or save into different
variable.
.. autoclass:: powerline.vim.VimVarHandler
.. _local-configuration-overrides-script:
Powerline script overrides

View File

@ -94,14 +94,38 @@ Common configuration is a subdictionary that is a value of ``common`` key in
.. _config-common-log:
``log_file``
Defines path which will hold powerline logs. If not present, logging will be
done to stderr.
Defines how logs will be handled. There are three variants here:
#. Absent. In this case logging will be done to stderr: equivalent to
``[["logging.StreamHandler", []]]`` or ``[null]``.
#. Plain string. In this case logging will be done to the given file:
``"/file/name"`` is equivalent to ``[["logging.FileHandler",
[["/file/name"]]]]`` or ``["/file/name"]``. Leading ``~/`` is expanded in
the file name, so using ``"~/.log/foo"`` is permitted. If directory
pointed by the option is absent, it will be created, but not its parent.
#. List of handler definitions. Handler definition may either be ``null``,
a string or a list with two or three elements:
#. Logging class name and module. If module name is absent, it is
equivalent to ``logging.handlers``.
#. Class constructor arguments in a form ``[[args[, kwargs]]]``: accepted
variants are ``[]`` (no arguments), ``[args]`` (e.g.
``[["/file/name"]]``: only positional arguments) or ``[args, kwargs]``
(e.g. ``[[], {"host": "localhost", "port": 6666}]``: positional and
keyword arguments, but no positional arguments in the example).
#. Optional logging level. Overrides :ref:`log_level key
<config-common-log_level>` and has the same format.
#. Optional format string. Partially overrides :ref:`log_format key
<config-common-log_format>` and has the same format. “Partially” here
means that it may only specify more critical level.
.. _config-common-log_level:
``log_level``
String, determines logging level. Defaults to ``WARNING``.
.. _config-common-log_format:
``log_format``
String, determines format of the log messages. Defaults to
``'%(asctime)s:%(level)s:%(message)s'``.

View File

@ -9,7 +9,7 @@ from threading import Lock, Event
from powerline.colorscheme import Colorscheme
from powerline.lib.config import ConfigLoader
from powerline.lib.unicode import safe_unicode, FailedUnicode
from powerline.lib.unicode import unicode, safe_unicode, FailedUnicode
from powerline.config import DEFAULT_SYSTEM_CONFIG_DIR
from powerline.lib.dict import mergedicts
from powerline.lib.encoding import get_preferred_output_encoding
@ -121,7 +121,7 @@ def get_fallback_logger(stream=None):
handler.setLevel(level)
handler.setFormatter(formatter)
logger = logging.getLogger('powerline')
logger = logging.Logger('powerline')
logger.setLevel(level)
logger.addHandler(handler)
_fallback_logger = PowerlineLogger(None, logger, '_fallback_')
@ -200,40 +200,102 @@ def load_config(cfg_path, find_config_files, config_loader, loader_callback=None
return ret
def _get_log_handler(common_config, stream=None):
'''Get log handler.
def _set_log_handlers(common_config, logger, get_module_attr, stream=None):
'''Set log handlers
:param dict common_config:
Configuration dictionary used to create handler.
:return: logging.Handler subclass.
:param logging.Logger logger:
Logger to which handlers will be attached.
:param func get_module_attr:
:py:func:`gen_module_attr_getter` output.
:param file stream:
Stream to use by default for :py:class:`logging.StreamHandler` in place
of :py:attr:`sys.stderr`. May be ``None``.
'''
log_file = common_config['log_file']
if log_file:
log_file = os.path.expanduser(log_file)
log_dir = os.path.dirname(log_file)
if not os.path.isdir(log_dir):
os.mkdir(log_dir)
return logging.FileHandler(log_file)
else:
return logging.StreamHandler(stream)
log_targets = common_config['log_file']
num_handlers = 0
for log_target in log_targets:
if log_target is None:
log_target = ['logging.StreamHandler', []]
elif isinstance(log_target, unicode):
log_target = os.path.expanduser(log_target)
log_dir = os.path.dirname(log_target)
if log_dir and not os.path.isdir(log_dir):
os.mkdir(log_dir)
log_target = ['logging.FileHandler', [[log_target]]]
module, handler_class_name = log_target[0].rpartition('.')[::2]
module = module or 'logging.handlers'
try:
handler_class_args = log_target[1][0]
except IndexError:
if module == 'logging' and handler_class_name == 'StreamHandler':
handler_class_args = [stream]
else:
handler_class_args = ()
try:
handler_class_kwargs = log_target[1][1]
except IndexError:
handler_class_kwargs = {}
module = str(module)
handler_class_name = str(handler_class_name)
handler_class = get_module_attr(module, handler_class_name)
if not handler_class:
continue
handler = handler_class(*handler_class_args, **handler_class_kwargs)
try:
handler_level_name = log_target[2]
except IndexError:
handler_level_name = common_config['log_level']
try:
handler_format = log_target[3]
except IndexError:
handler_format = common_config['log_format']
handler.setLevel(getattr(logging, handler_level_name))
handler.setFormatter(logging.Formatter(handler_format))
logger.addHandler(handler)
num_handlers += 1
if num_handlers == 0 and log_targets:
raise ValueError('Failed to set up any handlers')
def create_logger(common_config, stream=None):
def create_logger(common_config, use_daemon_threads=True, ext='__unknown__',
import_paths=None, imported_modules=None, stream=None):
'''Create logger according to provided configuration
:param dict common_config:
Common configuration, from :py:func:`finish_common_config`.
:param bool use_daemon_threads:
Whether daemon threads should be used. Argument to
:py:class:`PowerlineLogger` constructor.
:param str ext:
Used extension. Argument to :py:class:`PowerlineLogger` constructor.
:param set imported_modules:
Set where imported modules are saved. Argument to
:py:func:`gen_module_attr_getter`. May be ``None``, in this case new
empty set is used.
:param file stream:
Stream to use by default for :py:class:`logging.StreamHandler` in place
of :py:attr:`sys.stderr`. May be ``None``.
:return: Three objects:
#. :py:class:`logging.Logger` instance.
#. :py:class:`PowerlineLogger` instance.
#. Function, output of :py:func:`gen_module_attr_getter`.
'''
log_format = common_config['log_format']
formatter = logging.Formatter(log_format)
logger = logging.Logger('powerline')
level = getattr(logging, common_config['log_level'])
handler = _get_log_handler(common_config, stream)
handler.setLevel(level)
handler.setFormatter(formatter)
logger = logging.getLogger('powerline')
logger.setLevel(level)
logger.addHandler(handler)
return logger
pl = PowerlineLogger(use_daemon_threads, logger, ext)
get_module_attr = gen_module_attr_getter(
pl, common_config['paths'],
set() if imported_modules is None else imported_modules)
_set_log_handlers(common_config, logger, get_module_attr, stream)
return logger, pl, get_module_attr
def finish_common_config(encoding, common_config):
@ -264,7 +326,10 @@ def finish_common_config(encoding, common_config):
common_config.setdefault('additional_escapes', None)
common_config.setdefault('reload_config', True)
common_config.setdefault('interval', None)
common_config.setdefault('log_file', None)
common_config.setdefault('log_file', [None])
if not isinstance(common_config['log_file'], list):
common_config['log_file'] = [common_config['log_file']]
common_config['paths'] = [
os.path.expanduser(path) for path in common_config['paths']
@ -324,6 +389,26 @@ def gen_module_attr_getter(pl, import_paths, imported_modules):
return get_module_attr
LOG_KEYS = set(('log_format', 'log_level', 'log_file', 'paths'))
'''List of keys related to logging
'''
def _get_log_keys(common_config):
'''Return a common configuration copy with only log-related config left
:param dict common_config:
Common configuration.
:return:
:py:class:`dict` instance which has only keys from
:py:attr:`powerline.LOG_KEYS` left.
'''
return dict((
(k, v) for k, v in common_config.items() if k in LOG_KEYS
))
class Powerline(object):
'''Main powerline class, entrance point for all powerline uses. Sets
powerline up and loads the configuration.
@ -380,6 +465,7 @@ class Powerline(object):
self.ext = ext
self.run_once = run_once
self.logger = logger
self.had_logger = bool(self.logger)
self.use_daemon_threads = use_daemon_threads
if not renderer_module:
@ -430,8 +516,20 @@ class Powerline(object):
This function is used to create logger unless it was already specified
at initialization.
:return: Three objects:
#. :py:class:`logging.Logger` instance.
#. :py:class:`PowerlineLogger` instance.
#. Function, output of :py:func:`gen_module_attr_getter`.
'''
return create_logger(self.common_config, self.default_log_stream)
return create_logger(
common_config=self.common_config,
use_daemon_threads=self.use_daemon_threads,
ext=self.ext,
imported_modules=self.imported_modules,
stream=self.default_log_stream,
)
def create_renderer(self, load_main=False, load_colors=False, load_colorscheme=False, load_theme=False):
'''(Re)create renderer object. Can be used after Powerline object was
@ -465,22 +563,24 @@ class Powerline(object):
or not self.prev_common_config
or self.prev_common_config['default_top_theme'] != self.common_config['default_top_theme'])
log_keys_differ = (not self.prev_common_config or (
_get_log_keys(self.prev_common_config) != _get_log_keys(self.common_config)
))
self.prev_common_config = self.common_config
self.import_paths = self.common_config['paths']
if not self.logger:
self.logger = self.create_logger()
if not self.pl:
self.pl = PowerlineLogger(self.use_daemon_threads, self.logger, self.ext)
if log_keys_differ:
if self.had_logger:
self.pl = PowerlineLogger(self.use_daemon_threads, self.logger, self.ext)
self.get_module_attr = gen_module_attr_getter(
self.pl, self.common_config['paths'], self.imported_modules)
else:
self.logger, self.pl, self.get_module_attr = self.create_logger()
self.config_loader.pl = self.pl
if not self.run_once:
self.config_loader.set_watcher(self.common_config['watcher'])
self.get_module_attr = gen_module_attr_getter(self.pl, self.import_paths, self.imported_modules)
mergedicts(self.renderer_options, dict(
pl=self.pl,
term_truecolor=self.common_config['term_truecolor'],

View File

@ -9,7 +9,7 @@ import shlex
from powerline.config import POWERLINE_ROOT, TMUX_CONFIG_DIRECTORY
from powerline.lib.config import ConfigLoader
from powerline import generate_config_finder, load_config, create_logger, PowerlineLogger, finish_common_config
from powerline import generate_config_finder, load_config, create_logger, finish_common_config
from powerline.shell import ShellPowerline
from powerline.lib.shell import which
from powerline.bindings.tmux import (TmuxVersionInfo, run_tmux_command, set_tmux_environment, get_tmux_version,
@ -221,8 +221,8 @@ def get_main_config(args):
def create_powerline_logger(args):
config = get_main_config(args)
common_config = finish_common_config(get_preferred_output_encoding(), config['common'])
logger = create_logger(common_config)
return PowerlineLogger(use_daemon_threads=True, logger=logger, ext='config')
logger, pl, get_module_attr = create_logger(common_config)
return pl
def check_command(cmd):

View File

@ -22,7 +22,7 @@ from powerline.lint.checks import (check_matcher_func, check_ext, check_config,
check_segment_function, check_args, get_one_segment_function,
check_highlight_groups, check_highlight_group, check_full_segment_data,
get_all_possible_functions, check_segment_data_key, register_common_name,
highlight_group_spec)
highlight_group_spec, check_log_file_level, check_logging_handler)
from powerline.lint.spec import Spec
from powerline.lint.context import Context
@ -56,6 +56,11 @@ ext_spec = Spec(
top_theme=top_theme_spec().optional(),
).copy
gen_components_spec = (lambda *components: Spec().list(Spec().type(unicode).oneof(set(components))))
log_level_spec = Spec().re('^[A-Z]+$').func(
(lambda value, *args: (True, True, not hasattr(logging, value))),
(lambda value: 'unknown debugging level {0}'.format(value))
).copy
log_format_spec = Spec().type(unicode).copy
main_spec = (Spec(
common=Spec(
default_top_theme=top_theme_spec().optional(),
@ -67,21 +72,32 @@ main_spec = (Spec(
(lambda value, *args: (True, True, not os.path.exists(os.path.expanduser(value.value)))),
(lambda value: 'path does not exist: {0}'.format(value))
).optional(),
log_file=Spec().type(unicode).func(
(
lambda value, *args: (
True,
True,
not os.path.isdir(os.path.dirname(os.path.expanduser(value)))
)
log_file=Spec().either(
Spec().type(unicode).func(
(
lambda value, *args: (
True,
True,
not os.path.isdir(os.path.dirname(os.path.expanduser(value)))
)
),
(lambda value: 'directory does not exist: {0}'.format(os.path.dirname(value)))
),
(lambda value: 'directory does not exist: {0}'.format(os.path.dirname(value)))
Spec().list(Spec().either(
Spec().type(unicode, type(None)),
Spec().tuple(
Spec().re(function_name_re).func(check_logging_handler),
Spec().tuple(
Spec().type(list).optional(),
Spec().type(dict).optional(),
),
log_level_spec().func(check_log_file_level).optional(),
log_format_spec().optional(),
),
))
).optional(),
log_level=Spec().re('^[A-Z]+$').func(
(lambda value, *args: (True, True, not hasattr(logging, value))),
(lambda value: 'unknown debugging level {0}'.format(value))
).optional(),
log_format=Spec().type(unicode).optional(),
log_level=log_level_spec().optional(),
log_format=log_format_spec().optional(),
interval=Spec().either(Spec().cmp('gt', 0.0), Spec().type(type(None))).optional(),
reload_config=Spec().type(bool).optional(),
watcher=Spec().type(unicode).oneof(set(('auto', 'inotify', 'stat'))).optional(),

View File

@ -800,3 +800,67 @@ def check_exinclude_function(name, data, context, echoerr):
if not func:
return True, False, True
return True, False, False
def check_log_file_level(this_level, data, context, echoerr):
'''Check handler level specified in :ref:`log_file key <config-common-log>`
This level must be greater or equal to the level in :ref:`log_level key
<config-common-log_level>`.
'''
havemarks(this_level)
hadproblem = False
top_level = context[0][1].get('common', {}).get('log_level', 'WARNING')
top_level_str = top_level
top_level_mark = getattr(top_level, 'mark', None)
if (
not isinstance(top_level, unicode) or not hasattr(logging, top_level)
or not isinstance(this_level, unicode) or not hasattr(logging, this_level)
):
return True, False, hadproblem
top_level = getattr(logging, top_level)
this_level_str = this_level
this_level_mark = this_level.mark
this_level = getattr(logging, this_level)
if this_level < top_level:
echoerr(
context='Error while checking log level index (key {key})'.format(
key=context.key),
context_mark=this_level_mark,
problem='found level that is less critical then top level ({0} < {0})'.format(
this_level_str, top_level_str),
problem_mark=top_level_mark,
)
hadproblem = True
return True, False, hadproblem
def check_logging_handler(handler_name, data, context, echoerr):
havemarks(handler_name)
import_paths = [os.path.expanduser(path) for path in context[0][1].get('common', {}).get('paths', [])]
handler_module, separator, handler_class = handler_name.rpartition('.')
if not separator:
handler_module = 'logging.handlers'
handler_class = handler_name
with WithPath(import_paths):
try:
handler = getattr(__import__(str(handler_module), fromlist=[str(handler_class)]), str(handler_class))
except ImportError:
echoerr(context='Error while loading logger class (key {key})'.format(key=context.key),
problem='failed to load module {0}'.format(handler_module),
problem_mark=handler_name.mark)
return True, False, True
except AttributeError:
echoerr(context='Error while loading logger class (key {key})'.format(key=context.key),
problem='failed to load handler class {0}'.format(handler_class),
problem_mark=handler_name.mark)
return True, False, True
if not issubclass(handler, logging.Handler):
echoerr(context='Error while loading logger class (key {key})'.format(key=context.key),
problem='loaded class {0} is not a logging.Handler subclass'.format(handler_class),
problem_mark=handler_name.mark)
return True, False, True
return True, False, False

View File

@ -531,7 +531,8 @@ class Spec(object):
if max_len == min_len:
self.len('eq', len(specs))
else:
self.len('ge', min_len)
if min_len > 0:
self.len('ge', min_len)
self.len('le', max_len)
start = len(self.specs)

View File

@ -7,7 +7,7 @@ from powerline.bindings.vim import (current_tabpage, list_tabpages)
try:
import vim
except ImportError:
vim = {}
vim = object()
def tabpage_updated_segment_info(segment_info, tabpage):

View File

@ -11,7 +11,7 @@ from collections import defaultdict
try:
import vim
except ImportError:
vim = {}
vim = object()
from powerline.bindings.vim import (vim_get_func, getbufvar, vim_getbufoption,
buffer_name, vim_getwinvar,

View File

@ -7,10 +7,13 @@ import logging
from itertools import count
import vim
try:
import vim
except ImportError:
vim = object()
from powerline.bindings.vim import vim_get_func, vim_getvar, get_vim_encoding, python_to_vim
from powerline import Powerline, FailedUnicode
from powerline import Powerline, FailedUnicode, finish_common_config
from powerline.lib.dict import mergedicts
from powerline.lib.unicode import u
@ -32,19 +35,21 @@ def _override_from(config, override_varname, key=None):
class VimVarHandler(logging.Handler, object):
'''Vim-specific handler which emits messages to Vim global variables
Used variable: ``g:powerline_log_messages``.
:param str varname:
Variable where
'''
def __init__(self, *args, **kwargs):
super(VimVarHandler, self).__init__(*args, **kwargs)
vim.command('unlet! g:powerline_log_messages')
vim.command('let g:powerline_log_messages = []')
def __init__(self, varname):
super(VimVarHandler, self).__init__()
utf_varname = u(varname)
self.vim_varname = utf_varname.encode('ascii')
vim.command('unlet! g:' + utf_varname)
vim.command('let g:' + utf_varname + ' = []')
@staticmethod
def emit(record):
def emit(self, record):
message = u(record.message)
if record.exc_text:
message += '\n' + u(record.exc_text)
vim.eval(b'add(g:powerline_log_messages, ' + python_to_vim(message) + b')')
vim.eval(b'add(g:' + self.vim_varname + b', ' + python_to_vim(message) + b')')
class VimPowerline(Powerline):
@ -53,6 +58,12 @@ class VimPowerline(Powerline):
self.last_window_id = 1
self.pyeval = pyeval
self.construct_window_statusline = self.create_window_statusline_constructor()
if all((hasattr(vim.current.window, attr) for attr in ('options', 'vars', 'number'))):
self.win_idx = self.new_win_idx
else:
self.win_idx = self.old_win_idx
self._vim_getwinvar = vim_get_func('getwinvar', 'bytes')
self._vim_setwinvar = vim_get_func('setwinvar')
if sys.version_info < (3,):
def create_window_statusline_constructor(self):
@ -80,18 +91,6 @@ class VimPowerline(Powerline):
default_log_stream = sys.stdout
def create_logger(self):
logger = super(VimPowerline, self).create_logger()
try:
if int(vim_getvar('powerline_use_var_handler')):
formatter = logging.Formatter(self.common_config['log_format'])
handler = VimVarHandler(getattr(logging, self.common_config['log_level']))
handler.setFormatter(formatter)
logger.addHandler(handler)
except KeyError:
pass
return logger
def add_local_theme(self, key, config):
'''Add local themes at runtime (during vim session).
@ -137,7 +136,16 @@ class VimPowerline(Powerline):
get_encoding = staticmethod(get_vim_encoding)
def load_main_config(self):
return _override_from(super(VimPowerline, self).load_main_config(), 'powerline_config_overrides')
main_config = _override_from(super(VimPowerline, self).load_main_config(), 'powerline_config_overrides')
try:
use_var_handler = bool(int(vim_getvar('powerline_use_var_handler')))
except KeyError:
use_var_handler = False
if use_var_handler:
main_config.setdefault('common', {})
main_config['common'] = finish_common_config(self.get_encoding(), main_config['common'])
main_config['common']['log_file'].append(['powerline.vim.VimVarHandler', [['powerline_log_messages']]])
return main_config
def load_theme_config(self, name):
return _override_from(
@ -252,44 +260,40 @@ class VimPowerline(Powerline):
# do anything.
pass
if all((hasattr(vim.current.window, attr) for attr in ('options', 'vars', 'number'))):
def win_idx(self, window_id):
r = None
for window in vim.windows:
try:
curwindow_id = window.vars['powerline_window_id']
if r is not None and curwindow_id == window_id:
raise KeyError
except KeyError:
curwindow_id = self.last_window_id
self.last_window_id += 1
window.vars['powerline_window_id'] = curwindow_id
statusline = self.construct_window_statusline(curwindow_id)
if window.options['statusline'] != statusline:
window.options['statusline'] = statusline
if curwindow_id == window_id if window_id else window is vim.current.window:
r = (window, curwindow_id, window.number)
return r
else:
_vim_getwinvar = staticmethod(vim_get_func('getwinvar', 'bytes'))
_vim_setwinvar = staticmethod(vim_get_func('setwinvar'))
def new_win_idx(self, window_id):
r = None
for window in vim.windows:
try:
curwindow_id = window.vars['powerline_window_id']
if r is not None and curwindow_id == window_id:
raise KeyError
except KeyError:
curwindow_id = self.last_window_id
self.last_window_id += 1
window.vars['powerline_window_id'] = curwindow_id
statusline = self.construct_window_statusline(curwindow_id)
if window.options['statusline'] != statusline:
window.options['statusline'] = statusline
if curwindow_id == window_id if window_id else window is vim.current.window:
r = (window, curwindow_id, window.number)
return r
def win_idx(self, window_id):
r = None
for winnr, window in zip(count(1), vim.windows):
curwindow_id = self._vim_getwinvar(winnr, 'powerline_window_id')
if curwindow_id and not (r is not None and curwindow_id == window_id):
curwindow_id = int(curwindow_id)
else:
curwindow_id = self.last_window_id
self.last_window_id += 1
self._vim_setwinvar(winnr, 'powerline_window_id', curwindow_id)
statusline = self.construct_window_statusline(curwindow_id)
if self._vim_getwinvar(winnr, '&statusline') != statusline:
self._vim_setwinvar(winnr, '&statusline', statusline)
if curwindow_id == window_id if window_id else window is vim.current.window:
r = (window, curwindow_id, winnr)
return r
def old_win_idx(self, window_id):
r = None
for winnr, window in zip(count(1), vim.windows):
curwindow_id = self._vim_getwinvar(winnr, 'powerline_window_id')
if curwindow_id and not (r is not None and curwindow_id == window_id):
curwindow_id = int(curwindow_id)
else:
curwindow_id = self.last_window_id
self.last_window_id += 1
self._vim_setwinvar(winnr, 'powerline_window_id', curwindow_id)
statusline = self.construct_window_statusline(curwindow_id)
if self._vim_getwinvar(winnr, '&statusline') != statusline:
self._vim_setwinvar(winnr, '&statusline', statusline)
if curwindow_id == window_id if window_id else window is vim.current.window:
r = (window, curwindow_id, winnr)
return r
def statusline(self, window_id):
window, window_id, winnr = self.win_idx(window_id) or (None, None, None)

467
tests/test_logging.py Normal file
View File

@ -0,0 +1,467 @@
# vim:fileencoding=utf-8:noet
'''Tests for various logging features'''
from __future__ import (unicode_literals, division, absolute_import, print_function)
import sys
import re
import codecs
import os
from io import StringIO
from shutil import rmtree
from powerline import finish_common_config, create_logger
from tests import TestCase
from tests.lib import replace_attr
TIMESTAMP_RE = r'\d{4}-\d\d-\d\d \d\d:\d\d:\d\d,\d{3}'
class TestRE(TestCase):
def assertMatches(self, text, regexp):
self.assertTrue(
re.match(regexp, text),
'{0!r} did not match {1!r}'.format(text, regexp),
)
def close_handlers(logger):
for handler in logger.handlers:
handler.close()
class TestHandlers(TestRE):
def test_stderr_handler_is_default(self):
out = StringIO()
err = StringIO()
with replace_attr(sys, 'stdout', out, 'stderr', err):
common_config = finish_common_config('utf-8', {})
logger, pl, get_module_attr = create_logger(common_config)
pl.error('Foo')
close_handlers(logger)
self.assertMatches(err.getvalue(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Foo\n$')
self.assertEqual(out.getvalue(), '')
def test_stream_override(self):
out = StringIO()
err = StringIO()
stream = StringIO()
with replace_attr(sys, 'stdout', out, 'stderr', err):
common_config = finish_common_config('utf-8', {})
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.error('Foo')
close_handlers(logger)
self.assertMatches(stream.getvalue(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Foo\n$')
self.assertEqual(err.getvalue(), '')
self.assertEqual(out.getvalue(), '')
def test_explicit_none(self):
out = StringIO()
err = StringIO()
stream = StringIO()
with replace_attr(sys, 'stdout', out, 'stderr', err):
common_config = finish_common_config('utf-8', {'log_file': [None]})
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.error('Foo')
close_handlers(logger)
self.assertMatches(stream.getvalue(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Foo\n$')
self.assertEqual(err.getvalue(), '')
self.assertEqual(out.getvalue(), '')
def test_explicit_stream_handler(self):
out = StringIO()
err = StringIO()
stream = StringIO()
with replace_attr(sys, 'stdout', out, 'stderr', err):
common_config = finish_common_config('utf-8', {'log_file': [['logging.StreamHandler', [[]]]]})
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.error('Foo')
close_handlers(logger)
self.assertEqual(stream.getvalue(), '')
self.assertMatches(err.getvalue(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Foo\n$')
self.assertEqual(out.getvalue(), '')
def test_explicit_stream_handler_implicit_stream(self):
out = StringIO()
err = StringIO()
stream = StringIO()
with replace_attr(sys, 'stdout', out, 'stderr', err):
common_config = finish_common_config('utf-8', {'log_file': [['logging.StreamHandler', []]]})
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.error('Foo')
close_handlers(logger)
self.assertMatches(stream.getvalue(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Foo\n$')
self.assertEqual(err.getvalue(), '')
self.assertEqual(out.getvalue(), '')
def test_file_handler(self):
out = StringIO()
err = StringIO()
stream = StringIO()
file_name = 'test_logging-test_file_handler'
with replace_attr(sys, 'stdout', out, 'stderr', err):
common_config = finish_common_config('utf-8', {'log_file': file_name})
try:
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.error('Foo')
close_handlers(logger)
with codecs.open(file_name, encoding='utf-8') as fp:
self.assertMatches(fp.read(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Foo\n$')
finally:
os.unlink(file_name)
self.assertEqual(stream.getvalue(), '')
self.assertEqual(err.getvalue(), '')
self.assertEqual(out.getvalue(), '')
def test_file_handler_create_dir(self):
out = StringIO()
err = StringIO()
stream = StringIO()
file_name = 'test_logging-test_file_handler_create_dir/file'
self.assertFalse(os.path.isdir(os.path.dirname(file_name)))
with replace_attr(sys, 'stdout', out, 'stderr', err):
common_config = finish_common_config('utf-8', {'log_file': file_name})
try:
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.error('Foo')
close_handlers(logger)
self.assertTrue(os.path.isdir(os.path.dirname(file_name)))
with codecs.open(file_name, encoding='utf-8') as fp:
self.assertMatches(fp.read(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Foo\n$')
finally:
rmtree(os.path.dirname(file_name))
self.assertEqual(stream.getvalue(), '')
self.assertEqual(err.getvalue(), '')
self.assertEqual(out.getvalue(), '')
def test_multiple_files(self):
out = StringIO()
err = StringIO()
stream = StringIO()
file_name_1 = 'test_logging-test_multiple_files-1'
file_name_2 = file_name_1[:-1] + '2'
with replace_attr(sys, 'stdout', out, 'stderr', err):
common_config = finish_common_config('utf-8', {'log_file': [file_name_1, file_name_2]})
try:
try:
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.error('Foo')
close_handlers(logger)
for file_name in (file_name_1, file_name_2):
with codecs.open(file_name, encoding='utf-8') as fp:
self.assertMatches(fp.read(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Foo\n$')
finally:
os.unlink(file_name_1)
finally:
os.unlink(file_name_2)
self.assertEqual(stream.getvalue(), '')
self.assertEqual(err.getvalue(), '')
self.assertEqual(out.getvalue(), '')
def test_multiple_files_and_stream(self):
out = StringIO()
err = StringIO()
stream = StringIO()
file_name_1 = 'test_logging-test_multiple_files_and_stream-1'
file_name_2 = file_name_1[:-1] + '2'
with replace_attr(sys, 'stdout', out, 'stderr', err):
common_config = finish_common_config('utf-8', {'log_file': [file_name_1, file_name_2, None]})
try:
try:
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.error('Foo')
close_handlers(logger)
for file_name in (file_name_1, file_name_2):
with codecs.open(file_name, encoding='utf-8') as fp:
self.assertMatches(fp.read(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Foo\n$')
finally:
os.unlink(file_name_1)
finally:
os.unlink(file_name_2)
self.assertMatches(stream.getvalue(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Foo\n$')
self.assertEqual(err.getvalue(), '')
self.assertEqual(out.getvalue(), '')
def test_handler_args(self):
out = StringIO()
err = StringIO()
stream = StringIO()
file_name = 'test_logging-test_handler_args'
with replace_attr(sys, 'stdout', out, 'stderr', err):
common_config = finish_common_config('utf-8', {'log_file': [
['RotatingFileHandler', [[file_name]]]
]})
try:
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.error('Foo')
close_handlers(logger)
with codecs.open(file_name, encoding='utf-8') as fp:
self.assertMatches(fp.read(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Foo\n$')
finally:
os.unlink(file_name)
self.assertEqual(stream.getvalue(), '')
self.assertEqual(err.getvalue(), '')
self.assertEqual(out.getvalue(), '')
def test_handler_args_kwargs(self):
out = StringIO()
err = StringIO()
stream = StringIO()
file_name = 'test_logging-test_handler_args_kwargs'
with replace_attr(sys, 'stdout', out, 'stderr', err):
common_config = finish_common_config('utf-8', {'log_file': [
['RotatingFileHandler', [[file_name], {'maxBytes': 1, 'backupCount': 1}]]
]})
try:
try:
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.error('Foo')
pl.error('Bar')
close_handlers(logger)
with codecs.open(file_name, encoding='utf-8') as fp:
self.assertMatches(fp.read(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Bar\n$')
with codecs.open(file_name + '.1', encoding='utf-8') as fp:
self.assertMatches(fp.read(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Foo\n$')
finally:
os.unlink(file_name + '.1')
finally:
os.unlink(file_name)
self.assertEqual(stream.getvalue(), '')
self.assertEqual(err.getvalue(), '')
self.assertEqual(out.getvalue(), '')
def test_logger_level(self):
out = StringIO()
err = StringIO()
stream = StringIO()
stream1 = StringIO()
stream2 = StringIO()
with replace_attr(sys, 'stdout', out, 'stderr', err):
common_config = finish_common_config('utf-8', {'log_file': [
['logging.StreamHandler', [[stream1]], 'WARNING'],
['logging.StreamHandler', [[stream2]], 'ERROR'],
]})
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.warn('Foo')
pl.error('Bar')
close_handlers(logger)
self.assertMatches(stream1.getvalue(), (
'^' + TIMESTAMP_RE + ':WARNING:__unknown__:Foo\n'
+ TIMESTAMP_RE + ':ERROR:__unknown__:Bar\n$'
))
self.assertMatches(stream2.getvalue(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Bar\n$')
self.assertEqual(stream.getvalue(), '')
self.assertEqual(err.getvalue(), '')
self.assertEqual(out.getvalue(), '')
def test_logger_level_not_overriding_default(self):
out = StringIO()
err = StringIO()
stream = StringIO()
stream1 = StringIO()
with replace_attr(sys, 'stdout', out, 'stderr', err):
common_config = finish_common_config('utf-8', {'log_file': [
['logging.StreamHandler', [[stream1]], 'DEBUG'],
]})
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.debug('Foo')
pl.error('Bar')
close_handlers(logger)
self.assertMatches(stream1.getvalue(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Bar\n$')
self.assertEqual(stream.getvalue(), '')
self.assertEqual(err.getvalue(), '')
self.assertEqual(out.getvalue(), '')
def test_top_log_level(self):
out = StringIO()
err = StringIO()
stream = StringIO()
stream1 = StringIO()
with replace_attr(sys, 'stdout', out, 'stderr', err):
common_config = finish_common_config('utf-8', {'log_file': [
['logging.StreamHandler', [[stream1]], 'DEBUG'],
], 'log_level': 'DEBUG'})
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.debug('Foo')
pl.error('Bar')
close_handlers(logger)
self.assertMatches(stream1.getvalue(), (
'^' + TIMESTAMP_RE + ':DEBUG:__unknown__:Foo\n'
+ TIMESTAMP_RE + ':ERROR:__unknown__:Bar\n$'
))
self.assertEqual(stream.getvalue(), '')
self.assertEqual(err.getvalue(), '')
self.assertEqual(out.getvalue(), '')
def test_logger_format(self):
out = StringIO()
err = StringIO()
stream = StringIO()
stream1 = StringIO()
with replace_attr(sys, 'stdout', out, 'stderr', err):
common_config = finish_common_config('utf-8', {'log_file': [
['logging.StreamHandler', [[stream1]], 'WARNING', 'FOO'],
]})
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.warn('Foo')
pl.error('Bar')
close_handlers(logger)
self.assertEqual(stream1.getvalue(), 'FOO\nFOO\n')
self.assertEqual(stream.getvalue(), '')
self.assertEqual(err.getvalue(), '')
self.assertEqual(out.getvalue(), '')
def test_top_log_format(self):
out = StringIO()
err = StringIO()
stream = StringIO()
stream1 = StringIO()
stream2 = StringIO()
with replace_attr(sys, 'stdout', out, 'stderr', err):
common_config = finish_common_config('utf-8', {'log_file': [
['logging.StreamHandler', [[stream1]], 'WARNING', 'FOO'],
['logging.StreamHandler', [[stream2]], 'WARNING'],
], 'log_format': 'BAR'})
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.warn('Foo')
pl.error('Bar')
close_handlers(logger)
self.assertEqual(stream2.getvalue(), 'BAR\nBAR\n')
self.assertEqual(stream1.getvalue(), 'FOO\nFOO\n')
self.assertEqual(stream.getvalue(), '')
self.assertEqual(err.getvalue(), '')
self.assertEqual(out.getvalue(), '')
class TestPowerlineLogger(TestRE):
def test_args_formatting(self):
stream = StringIO()
common_config = finish_common_config('utf-8', {})
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.warn('foo {0}', 'Test')
pl.warn('bar {0!r}', 'Test')
close_handlers(logger)
self.assertMatches(stream.getvalue(), (
'^' + TIMESTAMP_RE + ':WARNING:__unknown__:foo Test\n'
+ TIMESTAMP_RE + ':WARNING:__unknown__:bar u?\'Test\'\n$'
))
def test_prefix_formatting(self):
stream = StringIO()
common_config = finish_common_config('utf-8', {})
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.prefix = '1'
pl.warn('foo')
pl.prefix = '2'
pl.warn('bar')
close_handlers(logger)
self.assertMatches(stream.getvalue(), (
'^' + TIMESTAMP_RE + ':WARNING:__unknown__:1:foo\n'
+ TIMESTAMP_RE + ':WARNING:__unknown__:2:bar\n$'
))
def test_kwargs_formatting(self):
stream = StringIO()
common_config = finish_common_config('utf-8', {})
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.warn('foo {arg}', arg='Test')
pl.warn('bar {arg!r}', arg='Test')
close_handlers(logger)
self.assertMatches(stream.getvalue(), (
'^' + TIMESTAMP_RE + ':WARNING:__unknown__:foo Test\n'
+ TIMESTAMP_RE + ':WARNING:__unknown__:bar u?\'Test\'\n$'
))
def test_args_kwargs_formatting(self):
stream = StringIO()
common_config = finish_common_config('utf-8', {})
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.warn('foo {0!r} {arg}', 'Test0', arg='Test')
pl.warn('bar {0} {arg!r}', 'Test0', arg='Test')
close_handlers(logger)
self.assertMatches(stream.getvalue(), (
'^' + TIMESTAMP_RE + ':WARNING:__unknown__:foo u?\'Test0\' Test\n'
+ TIMESTAMP_RE + ':WARNING:__unknown__:bar Test0 u?\'Test\'\n$'
))
def test_exception_formatting(self):
stream = StringIO()
common_config = finish_common_config('utf-8', {})
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
try:
raise ValueError('foo')
except ValueError:
pl.exception('Message')
close_handlers(logger)
self.assertMatches(stream.getvalue(), (
'^' + TIMESTAMP_RE + ':ERROR:__unknown__:Message\n'
+ 'Traceback \\(most recent call last\\):\n'
+ '(?: File ".*?", line \\d+, in \\w+\n [^\n]*\n)+'
+ 'ValueError: foo\n$'
))
def test_levels(self):
stream = StringIO()
common_config = finish_common_config('utf-8', {'log_level': 'DEBUG'})
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.debug('1')
pl.info('2')
pl.warn('3')
pl.error('4')
pl.critical('5')
close_handlers(logger)
self.assertMatches(stream.getvalue(), (
'^' + TIMESTAMP_RE + ':DEBUG:__unknown__:1\n'
+ TIMESTAMP_RE + ':INFO:__unknown__:2\n'
+ TIMESTAMP_RE + ':WARNING:__unknown__:3\n'
+ TIMESTAMP_RE + ':ERROR:__unknown__:4\n'
+ TIMESTAMP_RE + ':CRITICAL:__unknown__:5\n$'
))
old_cwd = None
def setUpModule():
global old_cwd
global __file__
old_cwd = os.getcwd()
__file__ = os.path.abspath(__file__)
os.chdir(os.path.dirname(__file__))
def tearDownModule():
global old_cwd
os.chdir(old_cwd)
if __name__ == '__main__':
from tests import main
main()