Replace MultiClientWatcher and Powerline threads with ConfigLoader

Also
- move file opening and parsing to ConfigLoader
- add interval configuration
This commit is contained in:
ZyX 2013-04-04 22:57:14 +04:00
parent f0e5f43d48
commit af2f8f588b
8 changed files with 239 additions and 139 deletions

View File

@ -166,6 +166,10 @@ Common configuration is a subdictionary that is a value of ``common`` key in
String, determines format of the log messages. Defaults to
``'%(asctime)s:%(level)s:%(message)s'``.
``interval``
Number, determines time (in seconds) between checks for changed
configuration. Use ``null`` to disable. Defaults to 10.
Extension-specific configuration
--------------------------------

View File

@ -7,58 +7,13 @@ import sys
import logging
from powerline.colorscheme import Colorscheme
from powerline.lib.file_watcher import create_file_watcher
from powerline.lib.config import ConfigLoader
from threading import Lock, Thread, Event
from collections import defaultdict
from threading import Lock, Event
DEFAULT_SYSTEM_CONFIG_DIR = None
watcher = None
class MultiClientWatcher(object):
subscribers = set()
received_events = {}
def __init__(self):
global watcher
self.subscribers.add(self)
if not watcher:
watcher = create_file_watcher()
def watch(self, file):
watcher.watch(file)
def __call__(self, file):
if self not in self.subscribers:
return False
if file in self.received_events and self not in self.received_events[file]:
self.received_events[file].add(self)
if self.received_events[file] >= self.subscribers:
self.received_events.pop(file)
return True
if watcher(file):
self.received_events[file] = set([self])
return True
return False
def unsubscribe(self):
try:
self.subscribers.remove(self)
except KeyError:
pass
__del__ = unsubscribe
def open_file(path):
return open(path, 'r')
def find_config_file(search_paths, config_file):
config_file += '.json'
@ -69,11 +24,6 @@ def find_config_file(search_paths, config_file):
raise IOError('Config file not found in search path: {0}'.format(config_file))
def load_json_config(config_file_path, load=json.load, open_file=open_file):
with open_file(config_file_path) as config_file_fp:
return load(config_file_fp)
class PowerlineState(object):
def __init__(self, use_daemon_threads, logger, ext):
self.logger = logger
@ -132,9 +82,12 @@ class Powerline(object):
during python session.
:param Logger logger:
If present, no new logger will be created and this logger will be used.
:param float interval:
When reloading configuration wait for this amount of seconds. Set it to
None if you dont want to reload configuration automatically.
:param bool use_daemon_threads:
Use daemon threads for.
:param Event shutdown_event:
Use this Event as shutdown_event.
:param ConfigLoader config_loader:
Class that manages (re)loading of configuration.
'''
def __init__(self,
@ -143,35 +96,29 @@ class Powerline(object):
run_once=False,
logger=None,
use_daemon_threads=True,
interval=10,
watcher=None):
shutdown_event=None,
config_loader=None):
self.ext = ext
self.renderer_module = renderer_module or ext
self.run_once = run_once
self.logger = logger
self.use_daemon_threads = use_daemon_threads
self.interval = interval
if '.' not in self.renderer_module:
self.renderer_module = 'powerline.renderers.' + self.renderer_module
elif self.renderer_module[-1] == '.':
self.renderer_module = self.renderer_module[:-1]
self.config_paths = self.get_config_paths()
config_paths = self.get_config_paths()
self.find_config_file = lambda cfg_path: find_config_file(config_paths, cfg_path)
self.configs_lock = Lock()
self.cr_kwargs_lock = Lock()
self.create_renderer_kwargs = {}
self.shutdown_event = Event()
self.configs = defaultdict(set)
self.missing = defaultdict(set)
self.thread = None
self.shutdown_event = shutdown_event or Event()
self.config_loader = config_loader or ConfigLoader(shutdown_event=self.shutdown_event)
self.renderer_options = {}
self.watcher = watcher or MultiClientWatcher()
self.prev_common_config = None
self.prev_ext_config = None
self.pl = None
@ -224,6 +171,8 @@ class Powerline(object):
if not self.pl:
self.pl = PowerlineState(self.use_daemon_threads, self.logger, self.ext)
if not self.config_loader.pl:
self.config_loader.pl = self.pl
self.renderer_options.update(
pl=self.pl,
@ -239,6 +188,12 @@ class Powerline(object):
},
)
if not self.run_once:
interval = self.common_config.get('interval', 10)
self.config_loader.set_interval(interval)
if interval is not None and not self.config_loader.is_alive():
self.config_loader.start()
self.ext_config = config['ext'][self.ext]
if self.ext_config != self.prev_ext_config:
ext_config_differs = True
@ -287,9 +242,6 @@ class Powerline(object):
else:
self.renderer = renderer
if not self.run_once and not self.is_alive() and self.interval is not None:
self.start()
def get_log_handler(self):
'''Get log handler.
@ -325,24 +277,20 @@ class Powerline(object):
return config_paths
def _load_config(self, cfg_path, type):
'''Load configuration and setup watcher.'''
'''Load configuration and setup watches.'''
function = getattr(self, 'on_' + type + '_change')
try:
path = find_config_file(self.config_paths, cfg_path)
path = self.find_config_file(cfg_path)
except IOError:
with self.configs_lock:
self.missing[type].add(cfg_path)
self.config_loader.register_missing(self.find_config_file, function, cfg_path)
raise
with self.configs_lock:
self.configs[type].add(path)
self.watcher.watch(path)
return load_json_config(path)
self.config_loader.register(function, path)
return self.config_loader.load(path)
def _purge_configs(self, type):
try:
with self.configs_lock:
self.configs.pop(type)
except KeyError:
pass
function = getattr(self, 'on_' + type + '_change')
self.config_loader.unregister_functions(set((function,)))
self.config_loader.unregister_missing(set(((self.find_config_file, function),)))
def load_theme_config(self, name):
'''Get theme configuration.
@ -409,48 +357,35 @@ class Powerline(object):
return self.renderer.render(*args, **kwargs)
def shutdown(self):
'''Lock renderer from modifications and run its ``.shutdown()`` method.
'''Shut down all background threads. Must be run only prior to exiting
current application.
'''
self.shutdown_event.set()
if self.use_daemon_threads and self.is_alive():
# Give the worker thread a chance to shutdown, but don't block for too long
self.thread.join(.01)
self.renderer.shutdown()
self.watcher.unsubscribe()
functions = (
self.on_main_change,
self.on_colors_change,
self.on_colorscheme_change,
self.on_theme_change,
)
self.config_loader.unregister_functions(set(functions))
self.config_loader.unregister_missing(set(((find_config_file, function) for function in functions)))
def is_alive(self):
return self.thread and self.thread.is_alive()
def on_main_change(self, path):
with self.cr_kwargs_lock:
self.create_renderer_kwargs['load_main'] = True
def start(self):
self.thread = Thread(target=self.run)
if self.use_daemon_threads:
self.thread.daemon = True
self.thread.start()
def on_colors_change(self, path):
with self.cr_kwargs_lock:
self.create_renderer_kwargs['load_colors'] = True
def run(self):
while not self.shutdown_event.is_set():
kwargs = {}
removes = []
with self.configs_lock:
for type, paths in self.configs.items():
for path in paths:
if self.watcher(path):
kwargs['load_' + type] = True
for type, cfg_paths in self.missing.items():
for cfg_path in cfg_paths:
try:
find_config_file(self.config_paths, cfg_path)
except IOError:
pass
else:
kwargs['load_' + type] = True
removes.append((type, cfg_path))
for type, cfg_path in removes:
self.missing[type].remove(cfg_path)
if kwargs:
with self.cr_kwargs_lock:
self.create_renderer_kwargs.update(kwargs)
self.shutdown_event.wait(self.interval)
def on_colorscheme_change(self, path):
with self.cr_kwargs_lock:
self.create_renderer_kwargs['load_colorscheme'] = True
def on_theme_change(self, path):
with self.cr_kwargs_lock:
self.create_renderer_kwargs['load_theme'] = True
def __enter__(self):
return self

149
powerline/lib/config.py Normal file
View File

@ -0,0 +1,149 @@
# vim:fileencoding=utf-8:noet
from powerline.lib.threaded import MultiRunnedThread
from powerline.lib.file_watcher import create_file_watcher
from threading import Event, Lock
from collections import defaultdict
import json
def open_file(path):
return open(path, 'r')
def load_json_config(config_file_path, load=json.load, open_file=open_file):
with open_file(config_file_path) as config_file_fp:
return load(config_file_fp)
class ConfigLoader(MultiRunnedThread):
def __init__(self, shutdown_event=None, watcher=None, load=load_json_config):
super(ConfigLoader, self).__init__()
self.shutdown_event = shutdown_event or Event()
self.watcher = watcher or create_file_watcher()
self._load = load
self.pl = None
self.interval = None
self.lock = Lock()
self.watched = defaultdict(set)
self.missing = defaultdict(set)
self.loaded = {}
def set_pl(self, pl):
self.pl = pl
def set_interval(self, interval):
self.interval = interval
def register(self, function, path):
'''Register function that will be run when file changes.
:param function function:
Function that will be called when file at the given path changes.
:param str path:
Path that will be watched for.
'''
with self.lock:
self.watched[path].add(function)
self.watcher.watch(path)
def register_missing(self, condition_function, function, key):
'''Register any function that will be called with given key each
interval seconds (interval is defined at __init__). Its result is then
passed to ``function``, but only if the result is true.
:param function condition_function:
Function which will be called each ``interval`` seconds. All
exceptions from it will be ignored.
:param function function:
Function which will be called if condition_function returns
something that is true. Accepts result of condition_function as an
argument.
:param str key:
Any value, it will be passed to condition_function on each call.
Note: registered functions will be automatically removed if
condition_function results in something true.
'''
with self.lock:
self.missing[key].add((condition_function, function))
def unregister_functions(self, removed_functions):
'''Unregister files handled by these functions.
:param set removed_functions:
Set of functions previously passed to ``.register()`` method.
'''
removes = []
with self.lock:
for path, functions in list(self.watched.items()):
functions -= removed_functions
if not functions:
self.watched.pop(path)
self.loaded.pop(path, None)
def unregister_missing(self, removed_functions):
'''Unregister files handled by these functions.
:param set removed_functions:
Set of pairs (2-tuples) representing ``(condition_function,
function)`` function pairs previously passed as an arguments to
``.register_missing()`` method.
'''
with self.lock:
for key, functions in list(self.missing.items()):
functions -= removed_functions
if not functions:
self.missing.pop(key)
def load(self, path):
try:
# No locks: GIL does what we need
return self.loaded[path]
except KeyError:
r = self._load(path)
if self.interval is not None:
self.loaded[path] = r
return r
def run(self):
while self.interval is not None and not self.shutdown_event.is_set():
toload = []
with self.lock:
for path, functions in self.watched.items():
for function in functions:
if self.watcher(path):
function(path)
toload.append(path)
with self.lock:
for key, functions in list(self.missing.items()):
remove = False
for condition_function, function in list(functions):
try:
path = condition_function(key)
except Exception as e:
self.exception('Error while running condition function for key {0}: {1}', key, str(e))
else:
if path:
toload.append(path)
function(path)
functions.remove((condition_function, function))
if not functions:
self.missing.pop(key)
for path in toload:
try:
self.loaded[path] = self._load(path)
except Exception as e:
self.exception('Error while loading {0}: {1}', path, str(e))
self.shutdown_event.wait(self.interval)
def exception(self, msg, *args, **kwargs):
if self.pl:
self.pl.exception(msg, prefix='config_loader', *args, **kwargs)
else:
raise

View File

@ -8,6 +8,8 @@ from threading import Thread, Lock, Event
class MultiRunnedThread(object):
daemon = True
def __init__(self):
self.thread = None

View File

@ -1,5 +1,6 @@
from powerline.lint.markedjson import load
from powerline import load_json_config, find_config_file, Powerline
from powerline import find_config_file, Powerline
from powerline.lib.config import load_json_config
from powerline.lint.markedjson.error import echoerr, MarkedError
from powerline.segments.vim import vim_modes
import itertools
@ -73,11 +74,15 @@ class Spec(object):
spec.context_message(msg)
return self
def check_type(self, value, context_mark, data, context, echoerr, t):
if type(value.value) is not t:
def check_type(self, value, context_mark, data, context, echoerr, types):
if type(value.value) not in types:
echoerr(context=self.cmsg.format(key=context_key(context)),
context_mark=context_mark,
problem='{0!r} must be a {1} instance, not {2}'.format(value, t.__name__, type(value.value).__name__),
problem='{0!r} must be a {1} instance, not {2}'.format(
value,
', '.join((t.__name__ for t in types)),
type(value.value).__name__
),
problem_mark=value.mark)
return False, True
return True, False
@ -141,8 +146,8 @@ class Spec(object):
return False, hadproblem
return True, hadproblem
def type(self, t):
self.checks.append(('check_type', t))
def type(self, *args):
self.checks.append(('check_type', args))
return self
cmp_funcs = {
@ -411,6 +416,7 @@ main_spec = (Spec(
log_level=Spec().re('^[A-Z]+$').func(lambda value, *args: (True, True, not hasattr(logging, value)),
lambda value: 'unknown debugging level {0}'.format(value)).optional(),
log_format=Spec().type(str).optional(),
interval=Spec().type(int, float, type(None)).optional(),
).context_message('Error while loading common configuration (key {key})'),
ext=Spec(
vim=Spec(
@ -791,7 +797,7 @@ def check_segment_data_key(key, data, context, echoerr):
# FIXME More checks, limit existing to ThreadedSegment instances only
args_spec = Spec(
interval=Spec().either(Spec().type(float), Spec().type(int)).optional(),
interval=Spec().type(int, float).optional(),
update_first=Spec().type(bool).optional(),
shutdown_event=Spec().error('Shutdown event must be set by powerline').optional(),
pl=Spec().error('pl object must be set by powerline').optional(),

View File

@ -1,6 +1,7 @@
# vim:fileencoding=utf-8:noet
from threading import Lock
from powerline.renderer import Renderer
from powerline.lib.config import ConfigLoader
from powerline import Powerline
from copy import deepcopy
@ -9,12 +10,12 @@ access_log = []
access_lock = Lock()
def load_json_config(config, config_file_path, *args, **kwargs):
def load_json_config(config_file_path, *args, **kwargs):
global access_log
with access_lock:
access_log.append(config_file_path)
try:
return deepcopy(config[config_file_path])
return deepcopy(config_container['config'][config_file_path])
except KeyError:
raise IOError(config_file_path)
@ -41,10 +42,10 @@ class Watcher(object):
pass
def __call__(self, file):
if file in self.events:
with self.lock:
with self.lock:
if file in self.events:
self.events.remove(file)
return True
return True
return False
def _reset(self, files):
@ -98,18 +99,20 @@ def get_powerline(**kwargs):
return TestPowerline(
ext='test',
renderer_module='tests.lib.config_mock',
interval=0,
logger=Logger(),
watcher=Watcher(),
config_loader=ConfigLoader(load=load_json_config, watcher=Watcher()),
**kwargs
)
def swap_attributes(config_container, powerline_module, replaces):
config_container = None
def swap_attributes(cfg_container, powerline_module, replaces):
global config_container
config_container = cfg_container
if not replaces:
replaces = {
'watcher': Watcher(),
'load_json_config': lambda *args: load_json_config(config_container['config'], *args),
'find_config_file': lambda *args: find_config_file(config_container['config'], *args),
}
for attr, val in replaces.items():

View File

@ -23,6 +23,7 @@ config = {
},
},
'spaces': 0,
'interval': 0,
},
'ext': {
'test': {
@ -97,7 +98,7 @@ def sleep(interval):
def add_watcher_events(p, *args, **kwargs):
p.watcher._reset(args)
p.config_loader.watcher._reset(args)
while not p._will_create_renderer():
sleep(kwargs.get('interval', 0.000001))
if not kwargs.get('wait', True):
@ -187,7 +188,7 @@ class TestConfigReload(TestCase):
add_watcher_events(p, 'config')
self.assertEqual(p.render(), '<1 2 1> s<2 4 False>>><3 4 4>g<4 False False>>><None None None>')
self.assertAccessEvents('config')
self.assertEqual(p.logger._pop_msgs(), ['exception:test:Failed to create renderer: fcf:colorschemes/test/nonexistentraise'])
self.assertIn('exception:test:Failed to create renderer: fcf:colorschemes/test/nonexistentraise', p.logger._pop_msgs())
config['colorschemes/test/nonexistentraise'] = {
'groups': {

View File

@ -492,4 +492,4 @@ def tearDownModule():
if __name__ == '__main__':
from tests import main
main(verbosity=10)
main()