176 lines
3.5 KiB
Python
176 lines
3.5 KiB
Python
# vim:fileencoding=utf-8:noet
|
|
from threading import Lock
|
|
from powerline.renderer import Renderer
|
|
from powerline.lib.config import ConfigLoader
|
|
from powerline import Powerline
|
|
from copy import deepcopy
|
|
from time import sleep
|
|
from functools import wraps
|
|
|
|
|
|
access_log = []
|
|
access_lock = Lock()
|
|
|
|
|
|
def load_json_config(config_file_path, *args, **kwargs):
|
|
global access_log
|
|
with access_lock:
|
|
access_log.append(config_file_path)
|
|
try:
|
|
return deepcopy(config_container['config'][config_file_path])
|
|
except KeyError:
|
|
raise IOError(config_file_path)
|
|
|
|
|
|
def _find_config_file(config, search_paths, config_file):
|
|
if config_file.endswith('raise') and config_file not in config:
|
|
raise IOError('fcf:' + config_file)
|
|
return config_file
|
|
|
|
|
|
def pop_events():
|
|
global access_log
|
|
with access_lock:
|
|
r = access_log[:]
|
|
access_log = []
|
|
return r
|
|
|
|
|
|
def log_call(func):
|
|
@wraps(func)
|
|
def ret(self, *args, **kwargs):
|
|
self._calls.append((func.__name__, args, kwargs))
|
|
return func(self, *args, **kwargs)
|
|
return ret
|
|
|
|
|
|
class Watcher(object):
|
|
events = set()
|
|
lock = Lock()
|
|
|
|
def __init__(self):
|
|
self._calls = []
|
|
|
|
@log_call
|
|
def watch(self, file):
|
|
pass
|
|
|
|
@log_call
|
|
def __call__(self, file):
|
|
with self.lock:
|
|
if file in self.events:
|
|
self.events.remove(file)
|
|
return True
|
|
return False
|
|
|
|
def _reset(self, files):
|
|
with self.lock:
|
|
self.events.clear()
|
|
self.events.update(files)
|
|
|
|
@log_call
|
|
def unsubscribe(self):
|
|
pass
|
|
|
|
|
|
class Logger(object):
|
|
def __init__(self):
|
|
self.messages = []
|
|
self.lock = Lock()
|
|
|
|
def _add_msg(self, attr, msg):
|
|
with self.lock:
|
|
self.messages.append(attr + ':' + msg)
|
|
|
|
def _pop_msgs(self):
|
|
with self.lock:
|
|
r = self.messages
|
|
self.messages = []
|
|
return r
|
|
|
|
def __getattr__(self, attr):
|
|
return lambda *args, **kwargs: self._add_msg(attr, *args, **kwargs)
|
|
|
|
|
|
class SimpleRenderer(Renderer):
|
|
def hlstyle(self, fg=None, bg=None, attr=None):
|
|
return '<{fg} {bg} {attr}>'.format(fg=fg and fg[0], bg=bg and bg[0], attr=attr)
|
|
|
|
|
|
class EvenSimplerRenderer(Renderer):
|
|
def hlstyle(self, fg=None, bg=None, attr=None):
|
|
return '{{{fg}{bg}{attr}}}'.format(
|
|
fg=fg and fg[0] or '-',
|
|
bg=bg and bg[0] or '-',
|
|
attr=attr if attr else '',
|
|
)
|
|
|
|
|
|
class TestPowerline(Powerline):
|
|
_created = False
|
|
|
|
@staticmethod
|
|
def get_local_themes(local_themes):
|
|
return local_themes
|
|
|
|
def _will_create_renderer(self):
|
|
return self.cr_kwargs
|
|
|
|
|
|
renderer = SimpleRenderer
|
|
|
|
|
|
def get_powerline(**kwargs):
|
|
return get_powerline_raw(
|
|
TestPowerline,
|
|
ext='test',
|
|
renderer_module='tests.lib.config_mock',
|
|
logger=Logger(),
|
|
**kwargs
|
|
)
|
|
|
|
|
|
def get_powerline_raw(PowerlineClass, **kwargs):
|
|
global renderer
|
|
watcher = Watcher()
|
|
if kwargs.pop('simpler_renderer', False):
|
|
renderer = EvenSimplerRenderer
|
|
else:
|
|
renderer = SimpleRenderer
|
|
pl = PowerlineClass(
|
|
config_loader=ConfigLoader(
|
|
load=load_json_config,
|
|
watcher=watcher,
|
|
watcher_type='test',
|
|
run_once=kwargs.get('run_once')
|
|
),
|
|
**kwargs
|
|
)
|
|
pl._watcher = watcher
|
|
return pl
|
|
|
|
|
|
config_container = None
|
|
|
|
|
|
def swap_attributes(cfg_container, powerline_module, replaces):
|
|
global config_container
|
|
config_container = cfg_container
|
|
if not replaces:
|
|
replaces = {
|
|
'_find_config_file': lambda *args: _find_config_file(config_container['config'], *args),
|
|
}
|
|
for attr, val in replaces.items():
|
|
old_val = getattr(powerline_module, attr)
|
|
setattr(powerline_module, attr, val)
|
|
replaces[attr] = old_val
|
|
return replaces
|
|
|
|
|
|
def add_watcher_events(p, *args, **kwargs):
|
|
p._watcher._reset(args)
|
|
while not p._will_create_renderer():
|
|
sleep(kwargs.get('interval', 0.1))
|
|
if not kwargs.get('wait', True):
|
|
return
|