powerline/tests/lib/config_mock.py

176 lines
3.5 KiB
Python

# vim:fileencoding=utf-8:noet
from threading import Lock
from powerline.renderer import Renderer
from powerline.lib.config import ConfigLoader
from powerline import Powerline
from copy import deepcopy
from time import sleep
from functools import wraps
access_log = []
access_lock = Lock()
def load_json_config(config_file_path, *args, **kwargs):
global access_log
with access_lock:
access_log.append(config_file_path)
try:
return deepcopy(config_container['config'][config_file_path])
except KeyError:
raise IOError(config_file_path)
def _find_config_file(config, search_paths, config_file):
if config_file.endswith('raise') and config_file not in config:
raise IOError('fcf:' + config_file)
return config_file
def pop_events():
global access_log
with access_lock:
r = access_log[:]
access_log = []
return r
def log_call(func):
@wraps(func)
def ret(self, *args, **kwargs):
self._calls.append((func.__name__, args, kwargs))
return func(self, *args, **kwargs)
return ret
class Watcher(object):
events = set()
lock = Lock()
def __init__(self):
self._calls = []
@log_call
def watch(self, file):
pass
@log_call
def __call__(self, file):
with self.lock:
if file in self.events:
self.events.remove(file)
return True
return False
def _reset(self, files):
with self.lock:
self.events.clear()
self.events.update(files)
@log_call
def unsubscribe(self):
pass
class Logger(object):
def __init__(self):
self.messages = []
self.lock = Lock()
def _add_msg(self, attr, msg):
with self.lock:
self.messages.append(attr + ':' + msg)
def _pop_msgs(self):
with self.lock:
r = self.messages
self.messages = []
return r
def __getattr__(self, attr):
return lambda *args, **kwargs: self._add_msg(attr, *args, **kwargs)
class SimpleRenderer(Renderer):
def hlstyle(self, fg=None, bg=None, attr=None):
return '<{fg} {bg} {attr}>'.format(fg=fg and fg[0], bg=bg and bg[0], attr=attr)
class EvenSimplerRenderer(Renderer):
def hlstyle(self, fg=None, bg=None, attr=None):
return '{{{fg}{bg}{attr}}}'.format(
fg=fg and fg[0] or '-',
bg=bg and bg[0] or '-',
attr=attr if attr else '',
)
class TestPowerline(Powerline):
_created = False
@staticmethod
def get_local_themes(local_themes):
return local_themes
def _will_create_renderer(self):
return self.create_renderer_kwargs
renderer = SimpleRenderer
def get_powerline(**kwargs):
return get_powerline_raw(
TestPowerline,
ext='test',
renderer_module='tests.lib.config_mock',
logger=Logger(),
**kwargs
)
def get_powerline_raw(PowerlineClass, **kwargs):
global renderer
watcher = Watcher()
if kwargs.pop('simpler_renderer', False):
renderer = EvenSimplerRenderer
else:
renderer = SimpleRenderer
pl = PowerlineClass(
config_loader=ConfigLoader(
load=load_json_config,
watcher=watcher,
watcher_type='test',
run_once=kwargs.get('run_once')
),
**kwargs
)
pl._watcher = watcher
return pl
config_container = None
def swap_attributes(cfg_container, powerline_module, replaces):
global config_container
config_container = cfg_container
if not replaces:
replaces = {
'_find_config_file': lambda *args: _find_config_file(config_container['config'], *args),
}
for attr, val in replaces.items():
old_val = getattr(powerline_module, attr)
setattr(powerline_module, attr, val)
replaces[attr] = old_val
return replaces
def add_watcher_events(p, *args, **kwargs):
p._watcher._reset(args)
while not p._will_create_renderer():
sleep(kwargs.get('interval', 0.1))
if not kwargs.get('wait', True):
return