Add more logging options

This commit is contained in:
Foo 2015-11-22 04:31:12 +03:00
parent 48469c02ef
commit 3f59edc060
11 changed files with 808 additions and 119 deletions

View File

@ -12,6 +12,8 @@ Vim overrides
Vim configuration can be overridden using the following options: Vim configuration can be overridden using the following options:
.. _local-configuration-overrides-vim-config:
``g:powerline_config_overrides`` ``g:powerline_config_overrides``
Dictionary, recursively merged with contents of Dictionary, recursively merged with contents of
:file:`powerline/config.json`. :file:`powerline/config.json`.
@ -37,6 +39,17 @@ Vim configuration can be overridden using the following options:
was configured in :ref:`log_* options <config-common-log>`. Level is always was configured in :ref:`log_* options <config-common-log>`. Level is always
:ref:`log_level <config-common-log_level>`, same for format. :ref:`log_level <config-common-log_level>`, same for format.
.. warning::
This variable is deprecated. Use :ref:`log_file option
<config-common-log>` in conjunction with
:py:class:`powerline.vim.VimVarHandler` class and :ref:`Vim config
overrides variable <local-configuration-overrides-vim-config>`. Using
this is also the only variant to make saving into the environment
variable the *only* place where log is saved or save into different
variable.
.. autoclass:: powerline.vim.VimVarHandler
.. _local-configuration-overrides-script: .. _local-configuration-overrides-script:
Powerline script overrides Powerline script overrides

View File

@ -94,14 +94,38 @@ Common configuration is a subdictionary that is a value of ``common`` key in
.. _config-common-log: .. _config-common-log:
``log_file`` ``log_file``
Defines path which will hold powerline logs. If not present, logging will be Defines how logs will be handled. There are three variants here:
done to stderr.
#. Absent. In this case logging will be done to stderr: equivalent to
``[["logging.StreamHandler", []]]`` or ``[null]``.
#. Plain string. In this case logging will be done to the given file:
``"/file/name"`` is equivalent to ``[["logging.FileHandler",
[["/file/name"]]]]`` or ``["/file/name"]``. Leading ``~/`` is expanded in
the file name, so using ``"~/.log/foo"`` is permitted. If directory
pointed by the option is absent, it will be created, but not its parent.
#. List of handler definitions. Handler definition may either be ``null``,
a string or a list with two or three elements:
#. Logging class name and module. If module name is absent, it is
equivalent to ``logging.handlers``.
#. Class constructor arguments in a form ``[[args[, kwargs]]]``: accepted
variants are ``[]`` (no arguments), ``[args]`` (e.g.
``[["/file/name"]]``: only positional arguments) or ``[args, kwargs]``
(e.g. ``[[], {"host": "localhost", "port": 6666}]``: positional and
keyword arguments, but no positional arguments in the example).
#. Optional logging level. Overrides :ref:`log_level key
<config-common-log_level>` and has the same format.
#. Optional format string. Partially overrides :ref:`log_format key
<config-common-log_format>` and has the same format. “Partially” here
means that it may only specify more critical level.
.. _config-common-log_level: .. _config-common-log_level:
``log_level`` ``log_level``
String, determines logging level. Defaults to ``WARNING``. String, determines logging level. Defaults to ``WARNING``.
.. _config-common-log_format:
``log_format`` ``log_format``
String, determines format of the log messages. Defaults to String, determines format of the log messages. Defaults to
``'%(asctime)s:%(level)s:%(message)s'``. ``'%(asctime)s:%(level)s:%(message)s'``.

View File

@ -9,7 +9,7 @@ from threading import Lock, Event
from powerline.colorscheme import Colorscheme from powerline.colorscheme import Colorscheme
from powerline.lib.config import ConfigLoader from powerline.lib.config import ConfigLoader
from powerline.lib.unicode import safe_unicode, FailedUnicode from powerline.lib.unicode import unicode, safe_unicode, FailedUnicode
from powerline.config import DEFAULT_SYSTEM_CONFIG_DIR from powerline.config import DEFAULT_SYSTEM_CONFIG_DIR
from powerline.lib.dict import mergedicts from powerline.lib.dict import mergedicts
from powerline.lib.encoding import get_preferred_output_encoding from powerline.lib.encoding import get_preferred_output_encoding
@ -121,7 +121,7 @@ def get_fallback_logger(stream=None):
handler.setLevel(level) handler.setLevel(level)
handler.setFormatter(formatter) handler.setFormatter(formatter)
logger = logging.getLogger('powerline') logger = logging.Logger('powerline')
logger.setLevel(level) logger.setLevel(level)
logger.addHandler(handler) logger.addHandler(handler)
_fallback_logger = PowerlineLogger(None, logger, '_fallback_') _fallback_logger = PowerlineLogger(None, logger, '_fallback_')
@ -200,40 +200,102 @@ def load_config(cfg_path, find_config_files, config_loader, loader_callback=None
return ret return ret
def _get_log_handler(common_config, stream=None): def _set_log_handlers(common_config, logger, get_module_attr, stream=None):
'''Get log handler. '''Set log handlers
:param dict common_config: :param dict common_config:
Configuration dictionary used to create handler. Configuration dictionary used to create handler.
:param logging.Logger logger:
:return: logging.Handler subclass. Logger to which handlers will be attached.
:param func get_module_attr:
:py:func:`gen_module_attr_getter` output.
:param file stream:
Stream to use by default for :py:class:`logging.StreamHandler` in place
of :py:attr:`sys.stderr`. May be ``None``.
''' '''
log_file = common_config['log_file'] log_targets = common_config['log_file']
if log_file: num_handlers = 0
log_file = os.path.expanduser(log_file) for log_target in log_targets:
log_dir = os.path.dirname(log_file) if log_target is None:
if not os.path.isdir(log_dir): log_target = ['logging.StreamHandler', []]
os.mkdir(log_dir) elif isinstance(log_target, unicode):
return logging.FileHandler(log_file) log_target = os.path.expanduser(log_target)
else: log_dir = os.path.dirname(log_target)
return logging.StreamHandler(stream) if log_dir and not os.path.isdir(log_dir):
os.mkdir(log_dir)
log_target = ['logging.FileHandler', [[log_target]]]
module, handler_class_name = log_target[0].rpartition('.')[::2]
module = module or 'logging.handlers'
try:
handler_class_args = log_target[1][0]
except IndexError:
if module == 'logging' and handler_class_name == 'StreamHandler':
handler_class_args = [stream]
else:
handler_class_args = ()
try:
handler_class_kwargs = log_target[1][1]
except IndexError:
handler_class_kwargs = {}
module = str(module)
handler_class_name = str(handler_class_name)
handler_class = get_module_attr(module, handler_class_name)
if not handler_class:
continue
handler = handler_class(*handler_class_args, **handler_class_kwargs)
try:
handler_level_name = log_target[2]
except IndexError:
handler_level_name = common_config['log_level']
try:
handler_format = log_target[3]
except IndexError:
handler_format = common_config['log_format']
handler.setLevel(getattr(logging, handler_level_name))
handler.setFormatter(logging.Formatter(handler_format))
logger.addHandler(handler)
num_handlers += 1
if num_handlers == 0 and log_targets:
raise ValueError('Failed to set up any handlers')
def create_logger(common_config, stream=None): def create_logger(common_config, use_daemon_threads=True, ext='__unknown__',
import_paths=None, imported_modules=None, stream=None):
'''Create logger according to provided configuration '''Create logger according to provided configuration
:param dict common_config:
Common configuration, from :py:func:`finish_common_config`.
:param bool use_daemon_threads:
Whether daemon threads should be used. Argument to
:py:class:`PowerlineLogger` constructor.
:param str ext:
Used extension. Argument to :py:class:`PowerlineLogger` constructor.
:param set imported_modules:
Set where imported modules are saved. Argument to
:py:func:`gen_module_attr_getter`. May be ``None``, in this case new
empty set is used.
:param file stream:
Stream to use by default for :py:class:`logging.StreamHandler` in place
of :py:attr:`sys.stderr`. May be ``None``.
:return: Three objects:
#. :py:class:`logging.Logger` instance.
#. :py:class:`PowerlineLogger` instance.
#. Function, output of :py:func:`gen_module_attr_getter`.
''' '''
log_format = common_config['log_format'] logger = logging.Logger('powerline')
formatter = logging.Formatter(log_format)
level = getattr(logging, common_config['log_level']) level = getattr(logging, common_config['log_level'])
handler = _get_log_handler(common_config, stream)
handler.setLevel(level)
handler.setFormatter(formatter)
logger = logging.getLogger('powerline')
logger.setLevel(level) logger.setLevel(level)
logger.addHandler(handler)
return logger pl = PowerlineLogger(use_daemon_threads, logger, ext)
get_module_attr = gen_module_attr_getter(
pl, common_config['paths'],
set() if imported_modules is None else imported_modules)
_set_log_handlers(common_config, logger, get_module_attr, stream)
return logger, pl, get_module_attr
def finish_common_config(encoding, common_config): def finish_common_config(encoding, common_config):
@ -264,7 +326,10 @@ def finish_common_config(encoding, common_config):
common_config.setdefault('additional_escapes', None) common_config.setdefault('additional_escapes', None)
common_config.setdefault('reload_config', True) common_config.setdefault('reload_config', True)
common_config.setdefault('interval', None) common_config.setdefault('interval', None)
common_config.setdefault('log_file', None) common_config.setdefault('log_file', [None])
if not isinstance(common_config['log_file'], list):
common_config['log_file'] = [common_config['log_file']]
common_config['paths'] = [ common_config['paths'] = [
os.path.expanduser(path) for path in common_config['paths'] os.path.expanduser(path) for path in common_config['paths']
@ -324,6 +389,26 @@ def gen_module_attr_getter(pl, import_paths, imported_modules):
return get_module_attr return get_module_attr
LOG_KEYS = set(('log_format', 'log_level', 'log_file', 'paths'))
'''List of keys related to logging
'''
def _get_log_keys(common_config):
'''Return a common configuration copy with only log-related config left
:param dict common_config:
Common configuration.
:return:
:py:class:`dict` instance which has only keys from
:py:attr:`powerline.LOG_KEYS` left.
'''
return dict((
(k, v) for k, v in common_config.items() if k in LOG_KEYS
))
class Powerline(object): class Powerline(object):
'''Main powerline class, entrance point for all powerline uses. Sets '''Main powerline class, entrance point for all powerline uses. Sets
powerline up and loads the configuration. powerline up and loads the configuration.
@ -380,6 +465,7 @@ class Powerline(object):
self.ext = ext self.ext = ext
self.run_once = run_once self.run_once = run_once
self.logger = logger self.logger = logger
self.had_logger = bool(self.logger)
self.use_daemon_threads = use_daemon_threads self.use_daemon_threads = use_daemon_threads
if not renderer_module: if not renderer_module:
@ -430,8 +516,20 @@ class Powerline(object):
This function is used to create logger unless it was already specified This function is used to create logger unless it was already specified
at initialization. at initialization.
:return: Three objects:
#. :py:class:`logging.Logger` instance.
#. :py:class:`PowerlineLogger` instance.
#. Function, output of :py:func:`gen_module_attr_getter`.
''' '''
return create_logger(self.common_config, self.default_log_stream) return create_logger(
common_config=self.common_config,
use_daemon_threads=self.use_daemon_threads,
ext=self.ext,
imported_modules=self.imported_modules,
stream=self.default_log_stream,
)
def create_renderer(self, load_main=False, load_colors=False, load_colorscheme=False, load_theme=False): def create_renderer(self, load_main=False, load_colors=False, load_colorscheme=False, load_theme=False):
'''(Re)create renderer object. Can be used after Powerline object was '''(Re)create renderer object. Can be used after Powerline object was
@ -465,22 +563,24 @@ class Powerline(object):
or not self.prev_common_config or not self.prev_common_config
or self.prev_common_config['default_top_theme'] != self.common_config['default_top_theme']) or self.prev_common_config['default_top_theme'] != self.common_config['default_top_theme'])
log_keys_differ = (not self.prev_common_config or (
_get_log_keys(self.prev_common_config) != _get_log_keys(self.common_config)
))
self.prev_common_config = self.common_config self.prev_common_config = self.common_config
self.import_paths = self.common_config['paths'] if log_keys_differ:
if self.had_logger:
if not self.logger: self.pl = PowerlineLogger(self.use_daemon_threads, self.logger, self.ext)
self.logger = self.create_logger() self.get_module_attr = gen_module_attr_getter(
self.pl, self.common_config['paths'], self.imported_modules)
if not self.pl: else:
self.pl = PowerlineLogger(self.use_daemon_threads, self.logger, self.ext) self.logger, self.pl, self.get_module_attr = self.create_logger()
self.config_loader.pl = self.pl self.config_loader.pl = self.pl
if not self.run_once: if not self.run_once:
self.config_loader.set_watcher(self.common_config['watcher']) self.config_loader.set_watcher(self.common_config['watcher'])
self.get_module_attr = gen_module_attr_getter(self.pl, self.import_paths, self.imported_modules)
mergedicts(self.renderer_options, dict( mergedicts(self.renderer_options, dict(
pl=self.pl, pl=self.pl,
term_truecolor=self.common_config['term_truecolor'], term_truecolor=self.common_config['term_truecolor'],

View File

@ -9,7 +9,7 @@ import shlex
from powerline.config import POWERLINE_ROOT, TMUX_CONFIG_DIRECTORY from powerline.config import POWERLINE_ROOT, TMUX_CONFIG_DIRECTORY
from powerline.lib.config import ConfigLoader from powerline.lib.config import ConfigLoader
from powerline import generate_config_finder, load_config, create_logger, PowerlineLogger, finish_common_config from powerline import generate_config_finder, load_config, create_logger, finish_common_config
from powerline.shell import ShellPowerline from powerline.shell import ShellPowerline
from powerline.lib.shell import which from powerline.lib.shell import which
from powerline.bindings.tmux import (TmuxVersionInfo, run_tmux_command, set_tmux_environment, get_tmux_version, from powerline.bindings.tmux import (TmuxVersionInfo, run_tmux_command, set_tmux_environment, get_tmux_version,
@ -221,8 +221,8 @@ def get_main_config(args):
def create_powerline_logger(args): def create_powerline_logger(args):
config = get_main_config(args) config = get_main_config(args)
common_config = finish_common_config(get_preferred_output_encoding(), config['common']) common_config = finish_common_config(get_preferred_output_encoding(), config['common'])
logger = create_logger(common_config) logger, pl, get_module_attr = create_logger(common_config)
return PowerlineLogger(use_daemon_threads=True, logger=logger, ext='config') return pl
def check_command(cmd): def check_command(cmd):

View File

@ -22,7 +22,7 @@ from powerline.lint.checks import (check_matcher_func, check_ext, check_config,
check_segment_function, check_args, get_one_segment_function, check_segment_function, check_args, get_one_segment_function,
check_highlight_groups, check_highlight_group, check_full_segment_data, check_highlight_groups, check_highlight_group, check_full_segment_data,
get_all_possible_functions, check_segment_data_key, register_common_name, get_all_possible_functions, check_segment_data_key, register_common_name,
highlight_group_spec) highlight_group_spec, check_log_file_level, check_logging_handler)
from powerline.lint.spec import Spec from powerline.lint.spec import Spec
from powerline.lint.context import Context from powerline.lint.context import Context
@ -56,6 +56,11 @@ ext_spec = Spec(
top_theme=top_theme_spec().optional(), top_theme=top_theme_spec().optional(),
).copy ).copy
gen_components_spec = (lambda *components: Spec().list(Spec().type(unicode).oneof(set(components)))) gen_components_spec = (lambda *components: Spec().list(Spec().type(unicode).oneof(set(components))))
log_level_spec = Spec().re('^[A-Z]+$').func(
(lambda value, *args: (True, True, not hasattr(logging, value))),
(lambda value: 'unknown debugging level {0}'.format(value))
).copy
log_format_spec = Spec().type(unicode).copy
main_spec = (Spec( main_spec = (Spec(
common=Spec( common=Spec(
default_top_theme=top_theme_spec().optional(), default_top_theme=top_theme_spec().optional(),
@ -67,21 +72,32 @@ main_spec = (Spec(
(lambda value, *args: (True, True, not os.path.exists(os.path.expanduser(value.value)))), (lambda value, *args: (True, True, not os.path.exists(os.path.expanduser(value.value)))),
(lambda value: 'path does not exist: {0}'.format(value)) (lambda value: 'path does not exist: {0}'.format(value))
).optional(), ).optional(),
log_file=Spec().type(unicode).func( log_file=Spec().either(
( Spec().type(unicode).func(
lambda value, *args: ( (
True, lambda value, *args: (
True, True,
not os.path.isdir(os.path.dirname(os.path.expanduser(value))) True,
) not os.path.isdir(os.path.dirname(os.path.expanduser(value)))
)
),
(lambda value: 'directory does not exist: {0}'.format(os.path.dirname(value)))
), ),
(lambda value: 'directory does not exist: {0}'.format(os.path.dirname(value))) Spec().list(Spec().either(
Spec().type(unicode, type(None)),
Spec().tuple(
Spec().re(function_name_re).func(check_logging_handler),
Spec().tuple(
Spec().type(list).optional(),
Spec().type(dict).optional(),
),
log_level_spec().func(check_log_file_level).optional(),
log_format_spec().optional(),
),
))
).optional(), ).optional(),
log_level=Spec().re('^[A-Z]+$').func( log_level=log_level_spec().optional(),
(lambda value, *args: (True, True, not hasattr(logging, value))), log_format=log_format_spec().optional(),
(lambda value: 'unknown debugging level {0}'.format(value))
).optional(),
log_format=Spec().type(unicode).optional(),
interval=Spec().either(Spec().cmp('gt', 0.0), Spec().type(type(None))).optional(), interval=Spec().either(Spec().cmp('gt', 0.0), Spec().type(type(None))).optional(),
reload_config=Spec().type(bool).optional(), reload_config=Spec().type(bool).optional(),
watcher=Spec().type(unicode).oneof(set(('auto', 'inotify', 'stat'))).optional(), watcher=Spec().type(unicode).oneof(set(('auto', 'inotify', 'stat'))).optional(),

View File

@ -800,3 +800,67 @@ def check_exinclude_function(name, data, context, echoerr):
if not func: if not func:
return True, False, True return True, False, True
return True, False, False return True, False, False
def check_log_file_level(this_level, data, context, echoerr):
'''Check handler level specified in :ref:`log_file key <config-common-log>`
This level must be greater or equal to the level in :ref:`log_level key
<config-common-log_level>`.
'''
havemarks(this_level)
hadproblem = False
top_level = context[0][1].get('common', {}).get('log_level', 'WARNING')
top_level_str = top_level
top_level_mark = getattr(top_level, 'mark', None)
if (
not isinstance(top_level, unicode) or not hasattr(logging, top_level)
or not isinstance(this_level, unicode) or not hasattr(logging, this_level)
):
return True, False, hadproblem
top_level = getattr(logging, top_level)
this_level_str = this_level
this_level_mark = this_level.mark
this_level = getattr(logging, this_level)
if this_level < top_level:
echoerr(
context='Error while checking log level index (key {key})'.format(
key=context.key),
context_mark=this_level_mark,
problem='found level that is less critical then top level ({0} < {0})'.format(
this_level_str, top_level_str),
problem_mark=top_level_mark,
)
hadproblem = True
return True, False, hadproblem
def check_logging_handler(handler_name, data, context, echoerr):
havemarks(handler_name)
import_paths = [os.path.expanduser(path) for path in context[0][1].get('common', {}).get('paths', [])]
handler_module, separator, handler_class = handler_name.rpartition('.')
if not separator:
handler_module = 'logging.handlers'
handler_class = handler_name
with WithPath(import_paths):
try:
handler = getattr(__import__(str(handler_module), fromlist=[str(handler_class)]), str(handler_class))
except ImportError:
echoerr(context='Error while loading logger class (key {key})'.format(key=context.key),
problem='failed to load module {0}'.format(handler_module),
problem_mark=handler_name.mark)
return True, False, True
except AttributeError:
echoerr(context='Error while loading logger class (key {key})'.format(key=context.key),
problem='failed to load handler class {0}'.format(handler_class),
problem_mark=handler_name.mark)
return True, False, True
if not issubclass(handler, logging.Handler):
echoerr(context='Error while loading logger class (key {key})'.format(key=context.key),
problem='loaded class {0} is not a logging.Handler subclass'.format(handler_class),
problem_mark=handler_name.mark)
return True, False, True
return True, False, False

View File

@ -531,7 +531,8 @@ class Spec(object):
if max_len == min_len: if max_len == min_len:
self.len('eq', len(specs)) self.len('eq', len(specs))
else: else:
self.len('ge', min_len) if min_len > 0:
self.len('ge', min_len)
self.len('le', max_len) self.len('le', max_len)
start = len(self.specs) start = len(self.specs)

View File

@ -7,7 +7,7 @@ from powerline.bindings.vim import (current_tabpage, list_tabpages, vim_getbufop
try: try:
import vim import vim
except ImportError: except ImportError:
vim = {} vim = object()
def tabpage_updated_segment_info(segment_info, tabpage): def tabpage_updated_segment_info(segment_info, tabpage):

View File

@ -11,7 +11,7 @@ from collections import defaultdict
try: try:
import vim import vim
except ImportError: except ImportError:
vim = {} vim = object()
from powerline.bindings.vim import (vim_get_func, getbufvar, vim_getbufoption, from powerline.bindings.vim import (vim_get_func, getbufvar, vim_getbufoption,
buffer_name, vim_getwinvar, buffer_name, vim_getwinvar,

View File

@ -7,10 +7,13 @@ import logging
from itertools import count from itertools import count
import vim try:
import vim
except ImportError:
vim = object()
from powerline.bindings.vim import vim_get_func, vim_getvar, get_vim_encoding, python_to_vim from powerline.bindings.vim import vim_get_func, vim_getvar, get_vim_encoding, python_to_vim
from powerline import Powerline, FailedUnicode from powerline import Powerline, FailedUnicode, finish_common_config
from powerline.lib.dict import mergedicts from powerline.lib.dict import mergedicts
from powerline.lib.unicode import u from powerline.lib.unicode import u
@ -32,19 +35,21 @@ def _override_from(config, override_varname, key=None):
class VimVarHandler(logging.Handler, object): class VimVarHandler(logging.Handler, object):
'''Vim-specific handler which emits messages to Vim global variables '''Vim-specific handler which emits messages to Vim global variables
Used variable: ``g:powerline_log_messages``. :param str varname:
Variable where
''' '''
def __init__(self, *args, **kwargs): def __init__(self, varname):
super(VimVarHandler, self).__init__(*args, **kwargs) super(VimVarHandler, self).__init__()
vim.command('unlet! g:powerline_log_messages') utf_varname = u(varname)
vim.command('let g:powerline_log_messages = []') self.vim_varname = utf_varname.encode('ascii')
vim.command('unlet! g:' + utf_varname)
vim.command('let g:' + utf_varname + ' = []')
@staticmethod def emit(self, record):
def emit(record):
message = u(record.message) message = u(record.message)
if record.exc_text: if record.exc_text:
message += '\n' + u(record.exc_text) message += '\n' + u(record.exc_text)
vim.eval(b'add(g:powerline_log_messages, ' + python_to_vim(message) + b')') vim.eval(b'add(g:' + self.vim_varname + b', ' + python_to_vim(message) + b')')
class VimPowerline(Powerline): class VimPowerline(Powerline):
@ -53,6 +58,12 @@ class VimPowerline(Powerline):
self.last_window_id = 1 self.last_window_id = 1
self.pyeval = pyeval self.pyeval = pyeval
self.construct_window_statusline = self.create_window_statusline_constructor() self.construct_window_statusline = self.create_window_statusline_constructor()
if all((hasattr(vim.current.window, attr) for attr in ('options', 'vars', 'number'))):
self.win_idx = self.new_win_idx
else:
self.win_idx = self.old_win_idx
self._vim_getwinvar = vim_get_func('getwinvar', 'bytes')
self._vim_setwinvar = vim_get_func('setwinvar')
if sys.version_info < (3,): if sys.version_info < (3,):
def create_window_statusline_constructor(self): def create_window_statusline_constructor(self):
@ -80,18 +91,6 @@ class VimPowerline(Powerline):
default_log_stream = sys.stdout default_log_stream = sys.stdout
def create_logger(self):
logger = super(VimPowerline, self).create_logger()
try:
if int(vim_getvar('powerline_use_var_handler')):
formatter = logging.Formatter(self.common_config['log_format'])
handler = VimVarHandler(getattr(logging, self.common_config['log_level']))
handler.setFormatter(formatter)
logger.addHandler(handler)
except KeyError:
pass
return logger
def add_local_theme(self, key, config): def add_local_theme(self, key, config):
'''Add local themes at runtime (during vim session). '''Add local themes at runtime (during vim session).
@ -137,7 +136,16 @@ class VimPowerline(Powerline):
get_encoding = staticmethod(get_vim_encoding) get_encoding = staticmethod(get_vim_encoding)
def load_main_config(self): def load_main_config(self):
return _override_from(super(VimPowerline, self).load_main_config(), 'powerline_config_overrides') main_config = _override_from(super(VimPowerline, self).load_main_config(), 'powerline_config_overrides')
try:
use_var_handler = bool(int(vim_getvar('powerline_use_var_handler')))
except KeyError:
use_var_handler = False
if use_var_handler:
main_config.setdefault('common', {})
main_config['common'] = finish_common_config(self.get_encoding(), main_config['common'])
main_config['common']['log_file'].append(['powerline.vim.VimVarHandler', [['powerline_log_messages']]])
return main_config
def load_theme_config(self, name): def load_theme_config(self, name):
return _override_from( return _override_from(
@ -252,44 +260,40 @@ class VimPowerline(Powerline):
# do anything. # do anything.
pass pass
if all((hasattr(vim.current.window, attr) for attr in ('options', 'vars', 'number'))): def new_win_idx(self, window_id):
def win_idx(self, window_id): r = None
r = None for window in vim.windows:
for window in vim.windows: try:
try: curwindow_id = window.vars['powerline_window_id']
curwindow_id = window.vars['powerline_window_id'] if r is not None and curwindow_id == window_id:
if r is not None and curwindow_id == window_id: raise KeyError
raise KeyError except KeyError:
except KeyError: curwindow_id = self.last_window_id
curwindow_id = self.last_window_id self.last_window_id += 1
self.last_window_id += 1 window.vars['powerline_window_id'] = curwindow_id
window.vars['powerline_window_id'] = curwindow_id statusline = self.construct_window_statusline(curwindow_id)
statusline = self.construct_window_statusline(curwindow_id) if window.options['statusline'] != statusline:
if window.options['statusline'] != statusline: window.options['statusline'] = statusline
window.options['statusline'] = statusline if curwindow_id == window_id if window_id else window is vim.current.window:
if curwindow_id == window_id if window_id else window is vim.current.window: r = (window, curwindow_id, window.number)
r = (window, curwindow_id, window.number) return r
return r
else:
_vim_getwinvar = staticmethod(vim_get_func('getwinvar', 'bytes'))
_vim_setwinvar = staticmethod(vim_get_func('setwinvar'))
def win_idx(self, window_id): def old_win_idx(self, window_id):
r = None r = None
for winnr, window in zip(count(1), vim.windows): for winnr, window in zip(count(1), vim.windows):
curwindow_id = self._vim_getwinvar(winnr, 'powerline_window_id') curwindow_id = self._vim_getwinvar(winnr, 'powerline_window_id')
if curwindow_id and not (r is not None and curwindow_id == window_id): if curwindow_id and not (r is not None and curwindow_id == window_id):
curwindow_id = int(curwindow_id) curwindow_id = int(curwindow_id)
else: else:
curwindow_id = self.last_window_id curwindow_id = self.last_window_id
self.last_window_id += 1 self.last_window_id += 1
self._vim_setwinvar(winnr, 'powerline_window_id', curwindow_id) self._vim_setwinvar(winnr, 'powerline_window_id', curwindow_id)
statusline = self.construct_window_statusline(curwindow_id) statusline = self.construct_window_statusline(curwindow_id)
if self._vim_getwinvar(winnr, '&statusline') != statusline: if self._vim_getwinvar(winnr, '&statusline') != statusline:
self._vim_setwinvar(winnr, '&statusline', statusline) self._vim_setwinvar(winnr, '&statusline', statusline)
if curwindow_id == window_id if window_id else window is vim.current.window: if curwindow_id == window_id if window_id else window is vim.current.window:
r = (window, curwindow_id, winnr) r = (window, curwindow_id, winnr)
return r return r
def statusline(self, window_id): def statusline(self, window_id):
window, window_id, winnr = self.win_idx(window_id) or (None, None, None) window, window_id, winnr = self.win_idx(window_id) or (None, None, None)

467
tests/test_logging.py Normal file
View File

@ -0,0 +1,467 @@
# vim:fileencoding=utf-8:noet
'''Tests for various logging features'''
from __future__ import (unicode_literals, division, absolute_import, print_function)
import sys
import re
import codecs
import os
from io import StringIO
from shutil import rmtree
from powerline import finish_common_config, create_logger
from tests import TestCase
from tests.lib import replace_attr
TIMESTAMP_RE = r'\d{4}-\d\d-\d\d \d\d:\d\d:\d\d,\d{3}'
class TestRE(TestCase):
def assertMatches(self, text, regexp):
self.assertTrue(
re.match(regexp, text),
'{0!r} did not match {1!r}'.format(text, regexp),
)
def close_handlers(logger):
for handler in logger.handlers:
handler.close()
class TestHandlers(TestRE):
def test_stderr_handler_is_default(self):
out = StringIO()
err = StringIO()
with replace_attr(sys, 'stdout', out, 'stderr', err):
common_config = finish_common_config('utf-8', {})
logger, pl, get_module_attr = create_logger(common_config)
pl.error('Foo')
close_handlers(logger)
self.assertMatches(err.getvalue(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Foo\n$')
self.assertEqual(out.getvalue(), '')
def test_stream_override(self):
out = StringIO()
err = StringIO()
stream = StringIO()
with replace_attr(sys, 'stdout', out, 'stderr', err):
common_config = finish_common_config('utf-8', {})
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.error('Foo')
close_handlers(logger)
self.assertMatches(stream.getvalue(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Foo\n$')
self.assertEqual(err.getvalue(), '')
self.assertEqual(out.getvalue(), '')
def test_explicit_none(self):
out = StringIO()
err = StringIO()
stream = StringIO()
with replace_attr(sys, 'stdout', out, 'stderr', err):
common_config = finish_common_config('utf-8', {'log_file': [None]})
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.error('Foo')
close_handlers(logger)
self.assertMatches(stream.getvalue(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Foo\n$')
self.assertEqual(err.getvalue(), '')
self.assertEqual(out.getvalue(), '')
def test_explicit_stream_handler(self):
out = StringIO()
err = StringIO()
stream = StringIO()
with replace_attr(sys, 'stdout', out, 'stderr', err):
common_config = finish_common_config('utf-8', {'log_file': [['logging.StreamHandler', [[]]]]})
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.error('Foo')
close_handlers(logger)
self.assertEqual(stream.getvalue(), '')
self.assertMatches(err.getvalue(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Foo\n$')
self.assertEqual(out.getvalue(), '')
def test_explicit_stream_handler_implicit_stream(self):
out = StringIO()
err = StringIO()
stream = StringIO()
with replace_attr(sys, 'stdout', out, 'stderr', err):
common_config = finish_common_config('utf-8', {'log_file': [['logging.StreamHandler', []]]})
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.error('Foo')
close_handlers(logger)
self.assertMatches(stream.getvalue(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Foo\n$')
self.assertEqual(err.getvalue(), '')
self.assertEqual(out.getvalue(), '')
def test_file_handler(self):
out = StringIO()
err = StringIO()
stream = StringIO()
file_name = 'test_logging-test_file_handler'
with replace_attr(sys, 'stdout', out, 'stderr', err):
common_config = finish_common_config('utf-8', {'log_file': file_name})
try:
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.error('Foo')
close_handlers(logger)
with codecs.open(file_name, encoding='utf-8') as fp:
self.assertMatches(fp.read(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Foo\n$')
finally:
os.unlink(file_name)
self.assertEqual(stream.getvalue(), '')
self.assertEqual(err.getvalue(), '')
self.assertEqual(out.getvalue(), '')
def test_file_handler_create_dir(self):
out = StringIO()
err = StringIO()
stream = StringIO()
file_name = 'test_logging-test_file_handler_create_dir/file'
self.assertFalse(os.path.isdir(os.path.dirname(file_name)))
with replace_attr(sys, 'stdout', out, 'stderr', err):
common_config = finish_common_config('utf-8', {'log_file': file_name})
try:
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.error('Foo')
close_handlers(logger)
self.assertTrue(os.path.isdir(os.path.dirname(file_name)))
with codecs.open(file_name, encoding='utf-8') as fp:
self.assertMatches(fp.read(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Foo\n$')
finally:
rmtree(os.path.dirname(file_name))
self.assertEqual(stream.getvalue(), '')
self.assertEqual(err.getvalue(), '')
self.assertEqual(out.getvalue(), '')
def test_multiple_files(self):
out = StringIO()
err = StringIO()
stream = StringIO()
file_name_1 = 'test_logging-test_multiple_files-1'
file_name_2 = file_name_1[:-1] + '2'
with replace_attr(sys, 'stdout', out, 'stderr', err):
common_config = finish_common_config('utf-8', {'log_file': [file_name_1, file_name_2]})
try:
try:
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.error('Foo')
close_handlers(logger)
for file_name in (file_name_1, file_name_2):
with codecs.open(file_name, encoding='utf-8') as fp:
self.assertMatches(fp.read(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Foo\n$')
finally:
os.unlink(file_name_1)
finally:
os.unlink(file_name_2)
self.assertEqual(stream.getvalue(), '')
self.assertEqual(err.getvalue(), '')
self.assertEqual(out.getvalue(), '')
def test_multiple_files_and_stream(self):
out = StringIO()
err = StringIO()
stream = StringIO()
file_name_1 = 'test_logging-test_multiple_files_and_stream-1'
file_name_2 = file_name_1[:-1] + '2'
with replace_attr(sys, 'stdout', out, 'stderr', err):
common_config = finish_common_config('utf-8', {'log_file': [file_name_1, file_name_2, None]})
try:
try:
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.error('Foo')
close_handlers(logger)
for file_name in (file_name_1, file_name_2):
with codecs.open(file_name, encoding='utf-8') as fp:
self.assertMatches(fp.read(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Foo\n$')
finally:
os.unlink(file_name_1)
finally:
os.unlink(file_name_2)
self.assertMatches(stream.getvalue(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Foo\n$')
self.assertEqual(err.getvalue(), '')
self.assertEqual(out.getvalue(), '')
def test_handler_args(self):
out = StringIO()
err = StringIO()
stream = StringIO()
file_name = 'test_logging-test_handler_args'
with replace_attr(sys, 'stdout', out, 'stderr', err):
common_config = finish_common_config('utf-8', {'log_file': [
['RotatingFileHandler', [[file_name]]]
]})
try:
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.error('Foo')
close_handlers(logger)
with codecs.open(file_name, encoding='utf-8') as fp:
self.assertMatches(fp.read(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Foo\n$')
finally:
os.unlink(file_name)
self.assertEqual(stream.getvalue(), '')
self.assertEqual(err.getvalue(), '')
self.assertEqual(out.getvalue(), '')
def test_handler_args_kwargs(self):
out = StringIO()
err = StringIO()
stream = StringIO()
file_name = 'test_logging-test_handler_args_kwargs'
with replace_attr(sys, 'stdout', out, 'stderr', err):
common_config = finish_common_config('utf-8', {'log_file': [
['RotatingFileHandler', [[file_name], {'maxBytes': 1, 'backupCount': 1}]]
]})
try:
try:
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.error('Foo')
pl.error('Bar')
close_handlers(logger)
with codecs.open(file_name, encoding='utf-8') as fp:
self.assertMatches(fp.read(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Bar\n$')
with codecs.open(file_name + '.1', encoding='utf-8') as fp:
self.assertMatches(fp.read(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Foo\n$')
finally:
os.unlink(file_name + '.1')
finally:
os.unlink(file_name)
self.assertEqual(stream.getvalue(), '')
self.assertEqual(err.getvalue(), '')
self.assertEqual(out.getvalue(), '')
def test_logger_level(self):
out = StringIO()
err = StringIO()
stream = StringIO()
stream1 = StringIO()
stream2 = StringIO()
with replace_attr(sys, 'stdout', out, 'stderr', err):
common_config = finish_common_config('utf-8', {'log_file': [
['logging.StreamHandler', [[stream1]], 'WARNING'],
['logging.StreamHandler', [[stream2]], 'ERROR'],
]})
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.warn('Foo')
pl.error('Bar')
close_handlers(logger)
self.assertMatches(stream1.getvalue(), (
'^' + TIMESTAMP_RE + ':WARNING:__unknown__:Foo\n'
+ TIMESTAMP_RE + ':ERROR:__unknown__:Bar\n$'
))
self.assertMatches(stream2.getvalue(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Bar\n$')
self.assertEqual(stream.getvalue(), '')
self.assertEqual(err.getvalue(), '')
self.assertEqual(out.getvalue(), '')
def test_logger_level_not_overriding_default(self):
out = StringIO()
err = StringIO()
stream = StringIO()
stream1 = StringIO()
with replace_attr(sys, 'stdout', out, 'stderr', err):
common_config = finish_common_config('utf-8', {'log_file': [
['logging.StreamHandler', [[stream1]], 'DEBUG'],
]})
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.debug('Foo')
pl.error('Bar')
close_handlers(logger)
self.assertMatches(stream1.getvalue(), '^' + TIMESTAMP_RE + ':ERROR:__unknown__:Bar\n$')
self.assertEqual(stream.getvalue(), '')
self.assertEqual(err.getvalue(), '')
self.assertEqual(out.getvalue(), '')
def test_top_log_level(self):
out = StringIO()
err = StringIO()
stream = StringIO()
stream1 = StringIO()
with replace_attr(sys, 'stdout', out, 'stderr', err):
common_config = finish_common_config('utf-8', {'log_file': [
['logging.StreamHandler', [[stream1]], 'DEBUG'],
], 'log_level': 'DEBUG'})
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.debug('Foo')
pl.error('Bar')
close_handlers(logger)
self.assertMatches(stream1.getvalue(), (
'^' + TIMESTAMP_RE + ':DEBUG:__unknown__:Foo\n'
+ TIMESTAMP_RE + ':ERROR:__unknown__:Bar\n$'
))
self.assertEqual(stream.getvalue(), '')
self.assertEqual(err.getvalue(), '')
self.assertEqual(out.getvalue(), '')
def test_logger_format(self):
out = StringIO()
err = StringIO()
stream = StringIO()
stream1 = StringIO()
with replace_attr(sys, 'stdout', out, 'stderr', err):
common_config = finish_common_config('utf-8', {'log_file': [
['logging.StreamHandler', [[stream1]], 'WARNING', 'FOO'],
]})
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.warn('Foo')
pl.error('Bar')
close_handlers(logger)
self.assertEqual(stream1.getvalue(), 'FOO\nFOO\n')
self.assertEqual(stream.getvalue(), '')
self.assertEqual(err.getvalue(), '')
self.assertEqual(out.getvalue(), '')
def test_top_log_format(self):
out = StringIO()
err = StringIO()
stream = StringIO()
stream1 = StringIO()
stream2 = StringIO()
with replace_attr(sys, 'stdout', out, 'stderr', err):
common_config = finish_common_config('utf-8', {'log_file': [
['logging.StreamHandler', [[stream1]], 'WARNING', 'FOO'],
['logging.StreamHandler', [[stream2]], 'WARNING'],
], 'log_format': 'BAR'})
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.warn('Foo')
pl.error('Bar')
close_handlers(logger)
self.assertEqual(stream2.getvalue(), 'BAR\nBAR\n')
self.assertEqual(stream1.getvalue(), 'FOO\nFOO\n')
self.assertEqual(stream.getvalue(), '')
self.assertEqual(err.getvalue(), '')
self.assertEqual(out.getvalue(), '')
class TestPowerlineLogger(TestRE):
def test_args_formatting(self):
stream = StringIO()
common_config = finish_common_config('utf-8', {})
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.warn('foo {0}', 'Test')
pl.warn('bar {0!r}', 'Test')
close_handlers(logger)
self.assertMatches(stream.getvalue(), (
'^' + TIMESTAMP_RE + ':WARNING:__unknown__:foo Test\n'
+ TIMESTAMP_RE + ':WARNING:__unknown__:bar u?\'Test\'\n$'
))
def test_prefix_formatting(self):
stream = StringIO()
common_config = finish_common_config('utf-8', {})
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.prefix = '1'
pl.warn('foo')
pl.prefix = '2'
pl.warn('bar')
close_handlers(logger)
self.assertMatches(stream.getvalue(), (
'^' + TIMESTAMP_RE + ':WARNING:__unknown__:1:foo\n'
+ TIMESTAMP_RE + ':WARNING:__unknown__:2:bar\n$'
))
def test_kwargs_formatting(self):
stream = StringIO()
common_config = finish_common_config('utf-8', {})
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.warn('foo {arg}', arg='Test')
pl.warn('bar {arg!r}', arg='Test')
close_handlers(logger)
self.assertMatches(stream.getvalue(), (
'^' + TIMESTAMP_RE + ':WARNING:__unknown__:foo Test\n'
+ TIMESTAMP_RE + ':WARNING:__unknown__:bar u?\'Test\'\n$'
))
def test_args_kwargs_formatting(self):
stream = StringIO()
common_config = finish_common_config('utf-8', {})
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.warn('foo {0!r} {arg}', 'Test0', arg='Test')
pl.warn('bar {0} {arg!r}', 'Test0', arg='Test')
close_handlers(logger)
self.assertMatches(stream.getvalue(), (
'^' + TIMESTAMP_RE + ':WARNING:__unknown__:foo u?\'Test0\' Test\n'
+ TIMESTAMP_RE + ':WARNING:__unknown__:bar Test0 u?\'Test\'\n$'
))
def test_exception_formatting(self):
stream = StringIO()
common_config = finish_common_config('utf-8', {})
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
try:
raise ValueError('foo')
except ValueError:
pl.exception('Message')
close_handlers(logger)
self.assertMatches(stream.getvalue(), (
'^' + TIMESTAMP_RE + ':ERROR:__unknown__:Message\n'
+ 'Traceback \\(most recent call last\\):\n'
+ '(?: File ".*?", line \\d+, in \\w+\n [^\n]*\n)+'
+ 'ValueError: foo\n$'
))
def test_levels(self):
stream = StringIO()
common_config = finish_common_config('utf-8', {'log_level': 'DEBUG'})
logger, pl, get_module_attr = create_logger(common_config, stream=stream)
pl.debug('1')
pl.info('2')
pl.warn('3')
pl.error('4')
pl.critical('5')
close_handlers(logger)
self.assertMatches(stream.getvalue(), (
'^' + TIMESTAMP_RE + ':DEBUG:__unknown__:1\n'
+ TIMESTAMP_RE + ':INFO:__unknown__:2\n'
+ TIMESTAMP_RE + ':WARNING:__unknown__:3\n'
+ TIMESTAMP_RE + ':ERROR:__unknown__:4\n'
+ TIMESTAMP_RE + ':CRITICAL:__unknown__:5\n$'
))
old_cwd = None
def setUpModule():
global old_cwd
global __file__
old_cwd = os.getcwd()
__file__ = os.path.abspath(__file__)
os.chdir(os.path.dirname(__file__))
def tearDownModule():
global old_cwd
os.chdir(old_cwd)
if __name__ == '__main__':
from tests import main
main()