powerline/tests/lib/config_mock.py

215 lines
4.8 KiB
Python

# vim:fileencoding=utf-8:noet
from threading import Lock
from powerline.renderer import Renderer
from powerline.lib.config import ConfigLoader
from powerline import Powerline
from tests.lib import Args, replace_attr
from copy import deepcopy
from time import sleep
from functools import wraps
import os
class TestHelpers(object):
def __init__(self, config):
self.config = config
self.access_log = []
self.access_lock = Lock()
def loader_condition(self, path):
return (path in self.config) and path
def find_config_files(self, cfg_path, config_loader, loader_callback):
if cfg_path.endswith('.json'):
cfg_path = cfg_path[:-5]
if cfg_path.startswith('/'):
cfg_path = cfg_path.lstrip('/')
with self.access_lock:
self.access_log.append('check:' + cfg_path)
if cfg_path in self.config:
yield cfg_path
else:
if config_loader:
config_loader.register_missing(self.loader_condition, loader_callback, cfg_path)
raise IOError(('fcf:' if cfg_path.endswith('raise') else '') + cfg_path)
def load_json_config(self, config_file_path, *args, **kwargs):
if config_file_path.endswith('.json'):
config_file_path = config_file_path[:-5]
if config_file_path.startswith('/'):
config_file_path = config_file_path.lstrip('/')
with self.access_lock:
self.access_log.append('load:' + config_file_path)
try:
return deepcopy(self.config[config_file_path])
except KeyError:
raise IOError(config_file_path)
def pop_events(self):
with self.access_lock:
r = self.access_log[:]
self.access_log = []
return r
def log_call(func):
@wraps(func)
def ret(self, *args, **kwargs):
self._calls.append((func.__name__, args, kwargs))
return func(self, *args, **kwargs)
return ret
class TestWatcher(object):
events = set()
lock = Lock()
def __init__(self):
self._calls = []
@log_call
def watch(self, file):
pass
@log_call
def __call__(self, file):
with self.lock:
if file in self.events:
self.events.remove(file)
return True
return False
def _reset(self, files):
with self.lock:
self.events.clear()
self.events.update(files)
@log_call
def unsubscribe(self):
pass
class Logger(object):
def __init__(self):
self.messages = []
self.lock = Lock()
def _add_msg(self, attr, msg):
with self.lock:
self.messages.append(attr + ':' + msg)
def _pop_msgs(self):
with self.lock:
r = self.messages
self.messages = []
return r
def __getattr__(self, attr):
return lambda *args, **kwargs: self._add_msg(attr, *args, **kwargs)
class SimpleRenderer(Renderer):
def hlstyle(self, fg=None, bg=None, attr=None):
return '<{fg} {bg} {attr}>'.format(fg=fg and fg[0], bg=bg and bg[0], attr=attr)
class EvenSimplerRenderer(Renderer):
def hlstyle(self, fg=None, bg=None, attr=None):
return '{{{fg}{bg}{attr}}}'.format(
fg=fg and fg[0] or '-',
bg=bg and bg[0] or '-',
attr=attr if attr else '',
)
class TestPowerline(Powerline):
_created = False
def __init__(self, _helpers, **kwargs):
super(TestPowerline, self).__init__(**kwargs)
self._helpers = _helpers
self.find_config_files = _helpers.find_config_files
@staticmethod
def get_local_themes(local_themes):
return local_themes
@staticmethod
def get_config_paths():
return ['']
def _will_create_renderer(self):
return self.cr_kwargs
def _pop_events(self):
return self._helpers.pop_events()
renderer = EvenSimplerRenderer
class TestConfigLoader(ConfigLoader):
def __init__(self, _helpers, **kwargs):
watcher = TestWatcher()
super(TestConfigLoader, self).__init__(
load=_helpers.load_json_config,
watcher=watcher,
watcher_type='test',
**kwargs
)
def get_powerline(config, **kwargs):
helpers = TestHelpers(config)
return get_powerline_raw(
helpers,
TestPowerline,
_helpers=helpers,
ext='test',
renderer_module='tests.lib.config_mock',
logger=Logger(),
**kwargs
)
def select_renderer(simpler_renderer=False):
global renderer
renderer = EvenSimplerRenderer if simpler_renderer else SimpleRenderer
def get_powerline_raw(helpers, PowerlineClass, **kwargs):
if not isinstance(helpers, TestHelpers):
helpers = TestHelpers(helpers)
select_renderer(kwargs.pop('simpler_renderer', False))
pl = PowerlineClass(
config_loader=TestConfigLoader(
_helpers=helpers,
run_once=kwargs.get('run_once')
),
**kwargs
)
pl._watcher = pl.config_loader.watcher
return pl
def swap_attributes(config, powerline_module):
return replace_attr(powerline_module, 'os', Args(
path=Args(
isfile=lambda path: path.lstrip('/').replace('.json', '') in config,
join=os.path.join,
expanduser=lambda path: path,
realpath=lambda path: path,
dirname=os.path.dirname,
),
environ={},
))
def add_watcher_events(p, *args, **kwargs):
if isinstance(p._watcher, TestWatcher):
p._watcher._reset(args)
while not p._will_create_renderer():
sleep(kwargs.get('interval', 0.1))
if not kwargs.get('wait', True):
return