Merge remote-tracking branch 'zyx-i/autoreload-config' into develop
This commit is contained in:
commit
3d72a227dd
|
@ -7,37 +7,85 @@ import sys
|
|||
import logging
|
||||
|
||||
from powerline.colorscheme import Colorscheme
|
||||
from powerline.lib.file_watcher import create_file_watcher
|
||||
|
||||
from threading import Lock, Thread, Event
|
||||
from collections import defaultdict
|
||||
|
||||
|
||||
DEFAULT_SYSTEM_CONFIG_DIR = None
|
||||
|
||||
watcher = None
|
||||
|
||||
|
||||
class MultiClientWatcher(object):
|
||||
subscribers = set()
|
||||
received_events = {}
|
||||
|
||||
def __init__(self):
|
||||
global watcher
|
||||
self.subscribers.add(self)
|
||||
if not watcher:
|
||||
watcher = create_file_watcher()
|
||||
|
||||
def watch(self, file):
|
||||
watcher.watch(file)
|
||||
|
||||
def __call__(self, file):
|
||||
if self not in self.subscribers:
|
||||
return False
|
||||
|
||||
if file in self.received_events and self not in self.received_events[file]:
|
||||
self.received_events[file].add(self)
|
||||
if self.received_events[file] >= self.subscribers:
|
||||
self.received_events.pop(file)
|
||||
return True
|
||||
|
||||
if watcher(file):
|
||||
self.received_events[file] = set([self])
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def unsubscribe(self):
|
||||
try:
|
||||
self.subscribers.remove(self)
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
__del__ = unsubscribe
|
||||
|
||||
|
||||
def open_file(path):
|
||||
return open(path, 'r')
|
||||
|
||||
|
||||
def load_json_config(search_paths, config_file, load=json.load, open_file=open_file):
|
||||
def find_config_file(search_paths, config_file):
|
||||
config_file += '.json'
|
||||
for path in search_paths:
|
||||
config_file_path = os.path.join(path, config_file)
|
||||
if os.path.isfile(config_file_path):
|
||||
with open_file(config_file_path) as config_file_fp:
|
||||
return load(config_file_fp)
|
||||
return config_file_path
|
||||
raise IOError('Config file not found in search path: {0}'.format(config_file))
|
||||
|
||||
|
||||
def load_json_config(config_file_path, load=json.load, open_file=open_file):
|
||||
with open_file(config_file_path) as config_file_fp:
|
||||
return load(config_file_fp)
|
||||
|
||||
|
||||
class PowerlineState(object):
|
||||
def __init__(self, logger, environ, getcwd, home):
|
||||
self.environ = environ
|
||||
self.getcwd = getcwd
|
||||
self.home = home or environ.get('HOME', None)
|
||||
def __init__(self, use_daemon_threads, logger, ext):
|
||||
self.logger = logger
|
||||
self.prefix = None
|
||||
self.ext = ext
|
||||
self.use_daemon_threads = use_daemon_threads
|
||||
self.prefix = ''
|
||||
self.last_msgs = {}
|
||||
|
||||
def _log(self, attr, msg, *args, **kwargs):
|
||||
prefix = kwargs.get('prefix') or self.prefix
|
||||
msg = ((prefix + ':') if prefix else '') + msg.format(*args, **kwargs)
|
||||
prefix = self.ext + ((':' + prefix) if prefix else '')
|
||||
msg = prefix + ':' + msg.format(*args, **kwargs)
|
||||
key = attr + ':' + prefix
|
||||
if msg != self.last_msgs.get(key):
|
||||
getattr(self.logger, attr)(msg)
|
||||
|
@ -73,20 +121,20 @@ class Powerline(object):
|
|||
colorschemes, render module (``powerline.renders.{ext}``).
|
||||
:param str renderer_module:
|
||||
Overrides renderer module (defaults to ``ext``). Should be the name of
|
||||
the package imported like this: ``powerline.renders.{render_module}``.
|
||||
the package imported like this: ``powerline.renders.{render_module}``.
|
||||
If this parameter contains a dot, ``powerline.renderers.`` is not
|
||||
prepended. There is also a special case for renderers defined in
|
||||
toplevel modules: ``foo.`` (note: dot at the end) tries to get renderer
|
||||
from module ``foo`` (because ``foo`` (without dot) tries to get renderer
|
||||
from module ``powerline.renderers.foo``).
|
||||
:param bool run_once:
|
||||
Determines whether .renderer.render() method will be run only once
|
||||
during python session.
|
||||
:param Logger logger:
|
||||
If present, no new logger will be created and this logger will be used.
|
||||
:param dict environ:
|
||||
Object with ``.__getitem__`` and ``.get`` methods used to obtain
|
||||
environment variables. Defaults to ``os.environ``.
|
||||
:param func getcwd:
|
||||
Function used to get current working directory. Defaults to
|
||||
``os.getcwdu`` or ``os.getcwd``.
|
||||
:param str home:
|
||||
Home directory. Defaults to ``environ.get('HOME')``.
|
||||
:param float interval:
|
||||
When reloading configuration wait for this amount of seconds. Set it to
|
||||
None if you don’t want to reload configuration automatically.
|
||||
'''
|
||||
|
||||
def __init__(self,
|
||||
|
@ -94,67 +142,154 @@ class Powerline(object):
|
|||
renderer_module=None,
|
||||
run_once=False,
|
||||
logger=None,
|
||||
environ=os.environ,
|
||||
getcwd=getattr(os, 'getcwdu', os.getcwd),
|
||||
home=None):
|
||||
use_daemon_threads=True,
|
||||
interval=10,
|
||||
watcher=None):
|
||||
self.ext = ext
|
||||
self.renderer_module = renderer_module or ext
|
||||
self.run_once = run_once
|
||||
self.logger = logger
|
||||
self.use_daemon_threads = use_daemon_threads
|
||||
self.interval = interval
|
||||
|
||||
if '.' not in self.renderer_module:
|
||||
self.renderer_module = 'powerline.renderers.' + self.renderer_module
|
||||
elif self.renderer_module[-1] == '.':
|
||||
self.renderer_module = self.renderer_module[:-1]
|
||||
|
||||
self.config_paths = self.get_config_paths()
|
||||
|
||||
# Load main config file
|
||||
config = self.load_main_config()
|
||||
common_config = config['common']
|
||||
ext_config = config['ext'][ext]
|
||||
self.ext = ext
|
||||
self.renderer_lock = Lock()
|
||||
self.configs_lock = Lock()
|
||||
self.shutdown_event = Event()
|
||||
self.configs = defaultdict(set)
|
||||
self.missing = defaultdict(set)
|
||||
|
||||
# Load and initialize colorscheme
|
||||
colorscheme_config = self.load_colorscheme_config(ext_config['colorscheme'])
|
||||
colors_config = self.load_colors_config()
|
||||
colorscheme = Colorscheme(colorscheme_config, colors_config)
|
||||
self.thread = None
|
||||
|
||||
# Load and initialize extension theme
|
||||
theme_config = self.load_theme_config(ext_config.get('theme', 'default'))
|
||||
common_config['paths'] = [os.path.expanduser(path) for path in common_config.get('paths', [])]
|
||||
self.import_paths = common_config['paths']
|
||||
theme_kwargs = {
|
||||
'ext': ext,
|
||||
'common_config': common_config,
|
||||
'run_once': run_once,
|
||||
}
|
||||
local_themes = self.get_local_themes(ext_config.get('local_themes'))
|
||||
self.renderer_options = {}
|
||||
|
||||
# Load and initialize extension renderer
|
||||
renderer_module_name = renderer_module or ext
|
||||
renderer_module_import = 'powerline.renderers.{0}'.format(renderer_module_name)
|
||||
try:
|
||||
Renderer = __import__(renderer_module_import, fromlist=['renderer']).renderer
|
||||
except ImportError as e:
|
||||
sys.stderr.write('Error while importing renderer module: {0}\n'.format(e))
|
||||
sys.exit(1)
|
||||
options = {
|
||||
'term_truecolor': common_config.get('term_truecolor', False),
|
||||
'ambiwidth': common_config.get('ambiwidth', 1),
|
||||
'tmux_escape': common_config.get('additional_escapes') == 'tmux',
|
||||
'screen_escape': common_config.get('additional_escapes') == 'screen',
|
||||
}
|
||||
self.watcher = watcher or MultiClientWatcher()
|
||||
|
||||
# Create logger
|
||||
if not logger:
|
||||
log_format = common_config.get('log_format', '%(asctime)s:%(levelname)s:%(message)s')
|
||||
formatter = logging.Formatter(log_format)
|
||||
self.prev_common_config = None
|
||||
self.prev_ext_config = None
|
||||
self.pl = None
|
||||
|
||||
level = getattr(logging, common_config.get('log_level', 'WARNING'))
|
||||
handler = self.get_log_handler(common_config)
|
||||
handler.setLevel(level)
|
||||
handler.setFormatter(formatter)
|
||||
self.create_renderer(load_main=True, load_colors=True, load_colorscheme=True, load_theme=True)
|
||||
|
||||
logger = logging.getLogger('powerline')
|
||||
logger.setLevel(level)
|
||||
logger.addHandler(handler)
|
||||
def create_renderer(self, load_main=False, load_colors=False, load_colorscheme=False, load_theme=False):
|
||||
'''(Re)create renderer object. Can be used after Powerline object was
|
||||
successfully initialized. If any of the below parameters except
|
||||
``load_main`` is True renderer object will be recreated.
|
||||
|
||||
pl = PowerlineState(logger=logger, environ=environ, getcwd=getcwd, home=home)
|
||||
:param bool load_main:
|
||||
Determines whether main configuration file (:file:`config.json`)
|
||||
should be loaded. If appropriate configuration changes implies
|
||||
``load_colorscheme`` and ``load_theme`` and recreation of renderer
|
||||
object. Won’t trigger recreation if only unrelated configuration
|
||||
changed.
|
||||
:param bool load_colors:
|
||||
Determines whether colors configuration from :file:`colors.json`
|
||||
should be (re)loaded.
|
||||
:param bool load_colorscheme:
|
||||
Determines whether colorscheme configuration should be (re)loaded.
|
||||
:param bool load_theme:
|
||||
Determines whether theme configuration should be reloaded.
|
||||
'''
|
||||
common_config_differs = False
|
||||
ext_config_differs = False
|
||||
if load_main:
|
||||
self._purge_configs('main')
|
||||
config = self.load_main_config()
|
||||
self.common_config = config['common']
|
||||
if self.common_config != self.prev_common_config:
|
||||
common_config_differs = True
|
||||
self.prev_common_config = self.common_config
|
||||
self.common_config['paths'] = [os.path.expanduser(path) for path in self.common_config.get('paths', [])]
|
||||
self.import_paths = self.common_config['paths']
|
||||
|
||||
self.renderer = Renderer(theme_config, local_themes, theme_kwargs, colorscheme, pl, **options)
|
||||
if not self.logger:
|
||||
log_format = self.common_config.get('log_format', '%(asctime)s:%(levelname)s:%(message)s')
|
||||
formatter = logging.Formatter(log_format)
|
||||
|
||||
def get_log_handler(self, common_config):
|
||||
level = getattr(logging, self.common_config.get('log_level', 'WARNING'))
|
||||
handler = self.get_log_handler()
|
||||
handler.setLevel(level)
|
||||
handler.setFormatter(formatter)
|
||||
|
||||
self.logger = logging.getLogger('powerline')
|
||||
self.logger.setLevel(level)
|
||||
self.logger.addHandler(handler)
|
||||
|
||||
if not self.pl:
|
||||
self.pl = PowerlineState(self.use_daemon_threads, self.logger, self.ext)
|
||||
|
||||
self.renderer_options.update(
|
||||
pl=self.pl,
|
||||
term_truecolor=self.common_config.get('term_truecolor', False),
|
||||
ambiwidth=self.common_config.get('ambiwidth', 1),
|
||||
tmux_escape=self.common_config.get('additional_escapes') == 'tmux',
|
||||
screen_escape=self.common_config.get('additional_escapes') == 'screen',
|
||||
theme_kwargs={
|
||||
'ext': self.ext,
|
||||
'common_config': self.common_config,
|
||||
'run_once': self.run_once,
|
||||
},
|
||||
)
|
||||
|
||||
self.ext_config = config['ext'][self.ext]
|
||||
if self.ext_config != self.prev_ext_config:
|
||||
ext_config_differs = True
|
||||
if not self.prev_ext_config or self.ext_config.get('local_themes') != self.prev_ext_config.get('local_themes'):
|
||||
self.renderer_options['local_themes'] = self.get_local_themes(self.ext_config.get('local_themes'))
|
||||
load_colorscheme = (load_colorscheme
|
||||
or not self.prev_ext_config
|
||||
or self.prev_ext_config['colorscheme'] != self.ext_config['colorscheme'])
|
||||
load_theme = (load_theme
|
||||
or not self.prev_ext_config
|
||||
or self.prev_ext_config['theme'] != self.ext_config['theme'])
|
||||
self.prev_ext_config = self.ext_config
|
||||
|
||||
create_renderer = load_colors or load_colorscheme or load_theme or common_config_differs or ext_config_differs
|
||||
|
||||
if load_colors:
|
||||
self._purge_configs('colors')
|
||||
self.colors_config = self.load_colors_config()
|
||||
|
||||
if load_colorscheme or load_colors:
|
||||
self._purge_configs('colorscheme')
|
||||
if load_colorscheme:
|
||||
self.colorscheme_config = self.load_colorscheme_config(self.ext_config['colorscheme'])
|
||||
self.renderer_options['colorscheme'] = Colorscheme(self.colorscheme_config, self.colors_config)
|
||||
|
||||
if load_theme:
|
||||
self._purge_configs('theme')
|
||||
self.renderer_options['theme_config'] = self.load_theme_config(self.ext_config.get('theme', 'default'))
|
||||
|
||||
if create_renderer:
|
||||
try:
|
||||
Renderer = __import__(self.renderer_module, fromlist=['renderer']).renderer
|
||||
except Exception as e:
|
||||
self.pl.exception('Failed to import renderer module: {0}', str(e))
|
||||
sys.exit(1)
|
||||
|
||||
# Renderer updates configuration file via segments’ .startup thus it
|
||||
# should be locked to prevent state when configuration was updated,
|
||||
# but .render still uses old renderer.
|
||||
with self.renderer_lock:
|
||||
try:
|
||||
renderer = Renderer(**self.renderer_options)
|
||||
except Exception as e:
|
||||
self.pl.exception('Failed to construct renderer object: {0}', str(e))
|
||||
if not hasattr(self, 'renderer'):
|
||||
raise
|
||||
else:
|
||||
self.renderer = renderer
|
||||
|
||||
if not self.run_once and not self.is_alive() and self.interval is not None:
|
||||
self.start()
|
||||
|
||||
def get_log_handler(self):
|
||||
'''Get log handler.
|
||||
|
||||
:param dict common_config:
|
||||
|
@ -162,7 +297,7 @@ class Powerline(object):
|
|||
|
||||
:return: logging.Handler subclass.
|
||||
'''
|
||||
log_file = common_config.get('log_file', None)
|
||||
log_file = self.common_config.get('log_file', None)
|
||||
if log_file:
|
||||
log_file = os.path.expanduser(log_file)
|
||||
log_dir = os.path.dirname(log_file)
|
||||
|
@ -188,6 +323,26 @@ class Powerline(object):
|
|||
config_paths.append(plugin_path)
|
||||
return config_paths
|
||||
|
||||
def _load_config(self, cfg_path, type):
|
||||
'''Load configuration and setup watcher.'''
|
||||
try:
|
||||
path = find_config_file(self.config_paths, cfg_path)
|
||||
except IOError:
|
||||
with self.configs_lock:
|
||||
self.missing[type].add(cfg_path)
|
||||
raise
|
||||
with self.configs_lock:
|
||||
self.configs[type].add(path)
|
||||
self.watcher.watch(path)
|
||||
return load_json_config(path)
|
||||
|
||||
def _purge_configs(self, type):
|
||||
try:
|
||||
with self.configs_lock:
|
||||
self.configs.pop(type)
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
def load_theme_config(self, name):
|
||||
'''Get theme configuration.
|
||||
|
||||
|
@ -196,14 +351,14 @@ class Powerline(object):
|
|||
|
||||
:return: dictionary with :ref:`theme configuration <config-themes>`
|
||||
'''
|
||||
return load_json_config(self.config_paths, os.path.join('themes', self.ext, name))
|
||||
return self._load_config(os.path.join('themes', self.ext, name), 'theme')
|
||||
|
||||
def load_main_config(self):
|
||||
'''Get top-level configuration.
|
||||
|
||||
:return: dictionary with :ref:`top-level configuration <config-main>`.
|
||||
'''
|
||||
return load_json_config(self.config_paths, 'config')
|
||||
return self._load_config('config', 'main')
|
||||
|
||||
def load_colorscheme_config(self, name):
|
||||
'''Get colorscheme.
|
||||
|
@ -213,14 +368,14 @@ class Powerline(object):
|
|||
|
||||
:return: dictionary with :ref:`colorscheme configuration <config-colorschemes>`.
|
||||
'''
|
||||
return load_json_config(self.config_paths, os.path.join('colorschemes', self.ext, name))
|
||||
return self._load_config(os.path.join('colorschemes', self.ext, name), 'colorscheme')
|
||||
|
||||
def load_colors_config(self):
|
||||
'''Get colorscheme.
|
||||
|
||||
:return: dictionary with :ref:`colors configuration <config-colors>`.
|
||||
'''
|
||||
return load_json_config(self.config_paths, 'colors')
|
||||
return self._load_config('colors', 'colors')
|
||||
|
||||
@staticmethod
|
||||
def get_local_themes(local_themes):
|
||||
|
@ -237,3 +392,58 @@ class Powerline(object):
|
|||
``__init__`` arguments, refer to its documentation.
|
||||
'''
|
||||
return None
|
||||
|
||||
def render(self, *args, **kwargs):
|
||||
'''Lock renderer from modifications and pass all arguments further to
|
||||
``self.renderer.render()``.
|
||||
'''
|
||||
with self.renderer_lock:
|
||||
return self.renderer.render(*args, **kwargs)
|
||||
|
||||
def shutdown(self):
|
||||
'''Lock renderer from modifications and run its ``.shutdown()`` method.
|
||||
'''
|
||||
self.shutdown_event.set()
|
||||
if self.use_daemon_threads and self.is_alive():
|
||||
self.thread.join()
|
||||
with self.renderer_lock:
|
||||
self.renderer.shutdown()
|
||||
self.watcher.unsubscribe()
|
||||
|
||||
def is_alive(self):
|
||||
return self.thread and self.thread.is_alive()
|
||||
|
||||
def start(self):
|
||||
self.thread = Thread(target=self.run)
|
||||
if self.use_daemon_threads:
|
||||
self.thread.daemon = True
|
||||
self.thread.start()
|
||||
|
||||
def run(self):
|
||||
while not self.shutdown_event.is_set():
|
||||
kwargs = {}
|
||||
with self.configs_lock:
|
||||
for type, paths in self.configs.items():
|
||||
for path in paths:
|
||||
if self.watcher(path):
|
||||
kwargs['load_' + type] = True
|
||||
for type, cfg_paths in self.missing.items():
|
||||
for cfg_path in cfg_paths:
|
||||
try:
|
||||
find_config_file(self.config_paths, cfg_path)
|
||||
except IOError:
|
||||
pass
|
||||
else:
|
||||
kwargs['load_' + type] = True
|
||||
if kwargs:
|
||||
try:
|
||||
self.create_renderer(**kwargs)
|
||||
except Exception as e:
|
||||
self.pl.exception('Failed to create renderer: {0}', str(e))
|
||||
self.shutdown_event.wait(self.interval)
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, *args):
|
||||
self.shutdown()
|
||||
|
|
|
@ -24,7 +24,7 @@ class PowerlinePromptManager(PromptManager):
|
|||
|
||||
def render(self, name, color=True, *args, **kwargs):
|
||||
width = None if name == 'in' else self.width
|
||||
res, res_nocolor = self.powerline.renderer.render(output_raw=True, width=width, matcher_info=name, segment_info=self.powerline_segment_info)
|
||||
res, res_nocolor = self.powerline.render(output_raw=True, width=width, matcher_info=name, segment_info=self.powerline_segment_info)
|
||||
self.txtwidth = len(res_nocolor)
|
||||
self.width = self.txtwidth
|
||||
return res if color else res_nocolor
|
||||
|
@ -51,7 +51,7 @@ def load_ipython_extension(ip):
|
|||
ip.prompt_manager = PowerlinePromptManager(powerline=powerline, shell=ip.prompt_manager.shell)
|
||||
|
||||
def shutdown_hook():
|
||||
powerline.renderer.shutdown()
|
||||
powerline.shutdown()
|
||||
raise TryNext()
|
||||
|
||||
ip.hooks.shutdown_hook.add(shutdown_hook)
|
||||
|
|
|
@ -56,7 +56,7 @@ class PowerlinePrompt(BasePrompt):
|
|||
|
||||
def set_p_str(self, width=None):
|
||||
self.p_str, self.p_str_nocolor = (
|
||||
self.powerline.renderer.render(output_raw=True,
|
||||
self.powerline.render(output_raw=True,
|
||||
segment_info=self.powerline_segment_info,
|
||||
matcher_info=self.powerline_prompt_type,
|
||||
width=width)
|
||||
|
@ -85,7 +85,7 @@ class PowerlinePrompt1(PowerlinePrompt):
|
|||
self.powerline_last_in['prompt_text_len'] = self.prompt_text_len
|
||||
|
||||
def auto_rewrite(self):
|
||||
return RewriteResult(self.powerline.renderer.render(matcher_info='rewrite', width=self.prompt_text_len, segment_info=self.powerline_segment_info)
|
||||
return RewriteResult(self.powerline.render(matcher_info='rewrite', width=self.prompt_text_len, segment_info=self.powerline_segment_info)
|
||||
+ (' ' * self.nrspaces))
|
||||
|
||||
|
||||
|
@ -128,7 +128,7 @@ def setup(**kwargs):
|
|||
raise TryNext()
|
||||
|
||||
def shutdown_hook():
|
||||
powerline.renderer.shutdown()
|
||||
powerline.shutdown()
|
||||
raise TryNext()
|
||||
|
||||
ip.IP.hooks.late_startup_hook.add(late_startup_hook)
|
||||
|
|
|
@ -15,7 +15,7 @@ class Powerline(base._TextBox):
|
|||
def update(self):
|
||||
if not self.configured:
|
||||
return True
|
||||
self.text = self.powerline.renderer.render(side='right')
|
||||
self.text = self.powerline.render(side='right')
|
||||
self.bar.draw()
|
||||
return True
|
||||
|
||||
|
|
|
@ -67,7 +67,7 @@ endfunction
|
|||
function! Powerline(window_id)
|
||||
let winidx = index(map(range(1, winnr('$')), 's:GetWinID(v:val)'), a:window_id)
|
||||
let current = w:window_id is# a:window_id
|
||||
return s:pyeval('powerline.renderer.render('. a:window_id .', '. winidx .', '. current .')')
|
||||
return s:pyeval('powerline.render('. a:window_id .', '. winidx .', '. current .')')
|
||||
endfunction
|
||||
|
||||
function! PowerlineNew()
|
||||
|
@ -84,7 +84,7 @@ endfunction
|
|||
augroup Powerline
|
||||
autocmd! ColorScheme * :exec s:powerline_pycmd 'powerline.renderer.reset_highlight()'
|
||||
autocmd! VimEnter * :redrawstatus!
|
||||
autocmd! VimLeave * :exec s:powerline_pycmd 'powerline.renderer.shutdown()'
|
||||
autocmd! VimLeavePre * :exec s:powerline_pycmd 'powerline.shutdown()'
|
||||
augroup END
|
||||
|
||||
exec s:powerline_pycmd 'powerline = VimPowerline()'
|
||||
|
|
|
@ -10,7 +10,7 @@ used_powerlines = []
|
|||
|
||||
def shutdown():
|
||||
for powerline in used_powerlines:
|
||||
powerline.renderer.shutdown()
|
||||
powerline.shutdown()
|
||||
|
||||
|
||||
def get_var_config(var):
|
||||
|
@ -77,6 +77,9 @@ class Environment(object):
|
|||
return default
|
||||
|
||||
|
||||
environ = Environment()
|
||||
|
||||
|
||||
class Prompt(object):
|
||||
__slots__ = ('powerline', 'side', 'savedpsvar', 'savedps', 'args')
|
||||
|
||||
|
@ -88,7 +91,11 @@ class Prompt(object):
|
|||
self.args = powerline.args
|
||||
|
||||
def __str__(self):
|
||||
r = self.powerline.renderer.render(width=zsh.columns(), side=self.side, segment_info=self.args)
|
||||
r = self.powerline.render(
|
||||
width=zsh.columns(),
|
||||
side=self.side,
|
||||
segment_info={'args': self.args, 'environ': environ}
|
||||
)
|
||||
if type(r) is not str:
|
||||
if type(r) is bytes:
|
||||
return r.decode('utf-8')
|
||||
|
@ -101,7 +108,7 @@ class Prompt(object):
|
|||
zsh.setvalue(self.savedpsvar, self.savedps)
|
||||
used_powerlines.remove(self.powerline)
|
||||
if self.powerline not in used_powerlines:
|
||||
self.powerline.renderer.shutdown()
|
||||
self.powerline.shutdown()
|
||||
|
||||
|
||||
def set_prompt(powerline, psvar, side):
|
||||
|
@ -113,8 +120,7 @@ def set_prompt(powerline, psvar, side):
|
|||
|
||||
|
||||
def setup():
|
||||
environ = Environment()
|
||||
powerline = ShellPowerline(Args(), environ=environ, getcwd=lambda: environ['PWD'])
|
||||
powerline = ShellPowerline(Args())
|
||||
used_powerlines.append(powerline)
|
||||
used_powerlines.append(powerline)
|
||||
set_prompt(powerline, 'PS1', 'left')
|
||||
|
|
|
@ -109,7 +109,7 @@ class Colorscheme(object):
|
|||
'fg': pick_color(group_props['fg']),
|
||||
'bg': pick_color(group_props['bg']),
|
||||
'attr': get_attr_flag(group_props.get('attr', [])),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
# 0 1 2 3 4 5 6 7 8 9
|
||||
|
|
|
@ -6,7 +6,7 @@ from powerline.lib import mergedicts
|
|||
|
||||
class IpythonPowerline(Powerline):
|
||||
def __init__(self):
|
||||
super(IpythonPowerline, self).__init__('ipython')
|
||||
super(IpythonPowerline, self).__init__('ipython', use_daemon_threads=True)
|
||||
|
||||
def get_config_paths(self):
|
||||
if self.path:
|
||||
|
|
|
@ -0,0 +1,309 @@
|
|||
# vim:fileencoding=UTF-8:noet
|
||||
from __future__ import unicode_literals, absolute_import
|
||||
|
||||
__copyright__ = '2013, Kovid Goyal <kovid at kovidgoyal.net>'
|
||||
__docformat__ = 'restructuredtext en'
|
||||
|
||||
import os
|
||||
import sys
|
||||
import errno
|
||||
from powerline.lib.time import monotonic
|
||||
from time import sleep
|
||||
from threading import RLock
|
||||
|
||||
|
||||
class INotifyError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class INotifyWatch(object):
|
||||
|
||||
is_stat_based = False
|
||||
|
||||
# See <sys/inotify.h> for the flags defined below
|
||||
|
||||
# Supported events suitable for MASK parameter of INOTIFY_ADD_WATCH.
|
||||
ACCESS = 0x00000001 # File was accessed.
|
||||
MODIFY = 0x00000002 # File was modified.
|
||||
ATTRIB = 0x00000004 # Metadata changed.
|
||||
CLOSE_WRITE = 0x00000008 # Writtable file was closed.
|
||||
CLOSE_NOWRITE = 0x00000010 # Unwrittable file closed.
|
||||
OPEN = 0x00000020 # File was opened.
|
||||
MOVED_FROM = 0x00000040 # File was moved from X.
|
||||
MOVED_TO = 0x00000080 # File was moved to Y.
|
||||
CREATE = 0x00000100 # Subfile was created.
|
||||
DELETE = 0x00000200 # Subfile was deleted.
|
||||
DELETE_SELF = 0x00000400 # Self was deleted.
|
||||
MOVE_SELF = 0x00000800 # Self was moved.
|
||||
|
||||
# Events sent by the kernel.
|
||||
UNMOUNT = 0x00002000 # Backing fs was unmounted.
|
||||
Q_OVERFLOW = 0x00004000 # Event queued overflowed.
|
||||
IGNORED = 0x00008000 # File was ignored.
|
||||
|
||||
# Helper events.
|
||||
CLOSE = (CLOSE_WRITE | CLOSE_NOWRITE) # Close.
|
||||
MOVE = (MOVED_FROM | MOVED_TO) # Moves.
|
||||
|
||||
# Special flags.
|
||||
ONLYDIR = 0x01000000 # Only watch the path if it is a directory.
|
||||
DONT_FOLLOW = 0x02000000 # Do not follow a sym link.
|
||||
EXCL_UNLINK = 0x04000000 # Exclude events on unlinked objects.
|
||||
MASK_ADD = 0x20000000 # Add to the mask of an already existing watch.
|
||||
ISDIR = 0x40000000 # Event occurred against dir.
|
||||
ONESHOT = 0x80000000 # Only send event once.
|
||||
|
||||
# All events which a program can wait on.
|
||||
ALL_EVENTS = (ACCESS | MODIFY | ATTRIB | CLOSE_WRITE | CLOSE_NOWRITE |
|
||||
OPEN | MOVED_FROM | MOVED_TO | CREATE | DELETE |
|
||||
DELETE_SELF | MOVE_SELF)
|
||||
|
||||
# See <bits/inotify.h>
|
||||
CLOEXEC = 0x80000
|
||||
NONBLOCK = 0x800
|
||||
|
||||
def __init__(self, inotify_fd, add_watch, rm_watch, read, expire_time=10):
|
||||
import ctypes
|
||||
import struct
|
||||
self._add_watch, self._rm_watch = add_watch, rm_watch
|
||||
self._read = read
|
||||
# We keep a reference to os to prevent it from being deleted
|
||||
# during interpreter shutdown, which would lead to errors in the
|
||||
# __del__ method
|
||||
self.os = os
|
||||
self.watches = {}
|
||||
self.modified = {}
|
||||
self.last_query = {}
|
||||
self._buf = ctypes.create_string_buffer(5000)
|
||||
self.fenc = sys.getfilesystemencoding() or 'utf-8'
|
||||
self.hdr = struct.Struct(b'iIII')
|
||||
if self.fenc == 'ascii':
|
||||
self.fenc = 'utf-8'
|
||||
self.lock = RLock()
|
||||
self.expire_time = expire_time * 60
|
||||
self._inotify_fd = inotify_fd
|
||||
|
||||
def handle_error(self):
|
||||
import ctypes
|
||||
eno = ctypes.get_errno()
|
||||
raise OSError(eno, self.os.strerror(eno))
|
||||
|
||||
def __del__(self):
|
||||
# This method can be called during interpreter shutdown, which means we
|
||||
# must do the absolute minimum here. Note that there could be running
|
||||
# daemon threads that are trying to call other methods on this object.
|
||||
try:
|
||||
self.os.close(self._inotify_fd)
|
||||
except (AttributeError, TypeError):
|
||||
pass
|
||||
|
||||
def read(self):
|
||||
import ctypes
|
||||
buf = []
|
||||
while True:
|
||||
num = self._read(self._inotify_fd, self._buf, len(self._buf))
|
||||
if num == 0:
|
||||
break
|
||||
if num < 0:
|
||||
en = ctypes.get_errno()
|
||||
if en == errno.EAGAIN:
|
||||
break # No more data
|
||||
if en == errno.EINTR:
|
||||
continue # Interrupted, try again
|
||||
raise OSError(en, self.os.strerror(en))
|
||||
buf.append(self._buf.raw[:num])
|
||||
raw = b''.join(buf)
|
||||
pos = 0
|
||||
lraw = len(raw)
|
||||
while lraw - pos >= self.hdr.size:
|
||||
wd, mask, cookie, name_len = self.hdr.unpack_from(raw, pos)
|
||||
# We dont care about names as we only watch files
|
||||
pos += self.hdr.size + name_len
|
||||
self.process_event(wd, mask, cookie)
|
||||
|
||||
def expire_watches(self):
|
||||
now = monotonic()
|
||||
for path, last_query in tuple(self.last_query.items()):
|
||||
if last_query - now > self.expire_time:
|
||||
self.unwatch(path)
|
||||
|
||||
def process_event(self, wd, mask, cookie):
|
||||
for path, num in tuple(self.watches.items()):
|
||||
if num == wd:
|
||||
if mask & self.IGNORED:
|
||||
self.watches.pop(path, None)
|
||||
self.modified.pop(path, None)
|
||||
self.last_query.pop(path, None)
|
||||
else:
|
||||
self.modified[path] = True
|
||||
|
||||
def unwatch(self, path):
|
||||
''' Remove the watch for path. Raises an OSError if removing the watch
|
||||
fails for some reason. '''
|
||||
path = self.os.path.abspath(path)
|
||||
with self.lock:
|
||||
self.modified.pop(path, None)
|
||||
self.last_query.pop(path, None)
|
||||
wd = self.watches.pop(path, None)
|
||||
if wd is not None:
|
||||
if self._rm_watch(self._inotify_fd, wd) != 0:
|
||||
self.handle_error()
|
||||
|
||||
def watch(self, path):
|
||||
''' Register a watch for the file named path. Raises an OSError if path
|
||||
does not exist. '''
|
||||
import ctypes
|
||||
path = self.os.path.abspath(path)
|
||||
with self.lock:
|
||||
if path not in self.watches:
|
||||
bpath = path if isinstance(path, bytes) else path.encode(self.fenc)
|
||||
wd = self._add_watch(self._inotify_fd, ctypes.c_char_p(bpath),
|
||||
self.MODIFY | self.ATTRIB | self.MOVE_SELF | self.DELETE_SELF)
|
||||
if wd == -1:
|
||||
self.handle_error()
|
||||
self.watches[path] = wd
|
||||
self.modified[path] = False
|
||||
|
||||
def __call__(self, path):
|
||||
''' Return True if path has been modified since the last call. Can
|
||||
raise OSError if the path does not exist. '''
|
||||
path = self.os.path.abspath(path)
|
||||
with self.lock:
|
||||
self.last_query[path] = monotonic()
|
||||
self.expire_watches()
|
||||
if path not in self.watches:
|
||||
# Try to re-add the watch, it will fail if the file does not
|
||||
# exist/you dont have permission
|
||||
self.watch(path)
|
||||
return True
|
||||
self.read()
|
||||
if path not in self.modified:
|
||||
# An ignored event was received which means the path has been
|
||||
# automatically unwatched
|
||||
return True
|
||||
ans = self.modified[path]
|
||||
if ans:
|
||||
self.modified[path] = False
|
||||
return ans
|
||||
|
||||
def close(self):
|
||||
with self.lock:
|
||||
for path in tuple(self.watches):
|
||||
try:
|
||||
self.unwatch(path)
|
||||
except OSError:
|
||||
pass
|
||||
if hasattr(self, '_inotify_fd'):
|
||||
self.os.close(self._inotify_fd)
|
||||
del self.os
|
||||
del self._add_watch
|
||||
del self._rm_watch
|
||||
del self._inotify_fd
|
||||
|
||||
|
||||
def get_inotify(expire_time=10):
|
||||
''' Initialize the inotify based file watcher '''
|
||||
import ctypes
|
||||
if not hasattr(ctypes, 'c_ssize_t'):
|
||||
raise INotifyError('You need python >= 2.7 to use inotify')
|
||||
from ctypes.util import find_library
|
||||
name = find_library('c')
|
||||
if not name:
|
||||
raise INotifyError('Cannot find C library')
|
||||
libc = ctypes.CDLL(name, use_errno=True)
|
||||
for function in ("inotify_add_watch", "inotify_init1", "inotify_rm_watch"):
|
||||
if not hasattr(libc, function):
|
||||
raise INotifyError('libc is too old')
|
||||
# inotify_init1()
|
||||
prototype = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_int, use_errno=True)
|
||||
init1 = prototype(('inotify_init1', libc), ((1, "flags", 0),))
|
||||
|
||||
# inotify_add_watch()
|
||||
prototype = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_int, ctypes.c_char_p, ctypes.c_uint32, use_errno=True)
|
||||
add_watch = prototype(('inotify_add_watch', libc), (
|
||||
(1, "fd"), (1, "pathname"), (1, "mask")), use_errno=True)
|
||||
|
||||
# inotify_rm_watch()
|
||||
prototype = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_int, ctypes.c_int, use_errno=True)
|
||||
rm_watch = prototype(('inotify_rm_watch', libc), (
|
||||
(1, "fd"), (1, "wd")), use_errno=True)
|
||||
|
||||
# read()
|
||||
prototype = ctypes.CFUNCTYPE(ctypes.c_ssize_t, ctypes.c_int, ctypes.c_void_p, ctypes.c_size_t, use_errno=True)
|
||||
read = prototype(('read', libc), (
|
||||
(1, "fd"), (1, "buf"), (1, "count")), use_errno=True)
|
||||
|
||||
inotify_fd = init1(INotifyWatch.CLOEXEC | INotifyWatch.NONBLOCK)
|
||||
if inotify_fd == -1:
|
||||
raise INotifyError(os.strerror(ctypes.get_errno()))
|
||||
return INotifyWatch(inotify_fd, add_watch, rm_watch, read, expire_time=expire_time)
|
||||
|
||||
|
||||
class StatWatch(object):
|
||||
is_stat_based = True
|
||||
|
||||
def __init__(self):
|
||||
self.watches = {}
|
||||
self.lock = RLock()
|
||||
|
||||
def watch(self, path):
|
||||
path = os.path.abspath(path)
|
||||
with self.lock:
|
||||
self.watches[path] = os.path.getmtime(path)
|
||||
|
||||
def unwatch(self, path):
|
||||
path = os.path.abspath(path)
|
||||
with self.lock:
|
||||
self.watches.pop(path, None)
|
||||
|
||||
def __call__(self, path):
|
||||
path = os.path.abspath(path)
|
||||
with self.lock:
|
||||
if path not in self.watches:
|
||||
self.watches[path] = os.path.getmtime(path)
|
||||
return True
|
||||
mtime = os.path.getmtime(path)
|
||||
if mtime != self.watches[path]:
|
||||
self.watches[path] = mtime
|
||||
return True
|
||||
return False
|
||||
|
||||
def close(self):
|
||||
with self.lock:
|
||||
self.watches.clear()
|
||||
|
||||
|
||||
def create_file_watcher(use_stat=False, expire_time=10):
|
||||
'''
|
||||
Create an object that can watch for changes to specified files. To use:
|
||||
|
||||
watcher = create_file_watcher()
|
||||
watcher(path1) # Will return True if path1 has changed since the last time this was called. Always returns True the first time.
|
||||
watcher.unwatch(path1)
|
||||
|
||||
Uses inotify if available, otherwise tracks mtimes. expire_time is the
|
||||
number of minutes after the last query for a given path for the inotify
|
||||
watch for that path to be automatically removed. This conserves kernel
|
||||
resources.
|
||||
'''
|
||||
if use_stat:
|
||||
return StatWatch()
|
||||
try:
|
||||
return get_inotify(expire_time=expire_time)
|
||||
except INotifyError:
|
||||
pass
|
||||
return StatWatch()
|
||||
|
||||
if __name__ == '__main__':
|
||||
watcher = create_file_watcher()
|
||||
print ('Using watcher: %s' % watcher.__class__.__name__)
|
||||
print ('Watching %s, press Ctrl-C to quit' % sys.argv[-1])
|
||||
watcher.watch(sys.argv[-1])
|
||||
try:
|
||||
while True:
|
||||
if watcher(sys.argv[-1]):
|
||||
print ('%s has changed' % sys.argv[-1])
|
||||
sleep(1)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
watcher.close()
|
|
@ -11,66 +11,73 @@ class ThreadedSegment(object):
|
|||
min_sleep_time = 0.1
|
||||
update_first = True
|
||||
interval = 1
|
||||
daemon = False
|
||||
|
||||
def __init__(self):
|
||||
super(ThreadedSegment, self).__init__()
|
||||
self.shutdown_event = Event()
|
||||
self.write_lock = Lock()
|
||||
self.run_once = True
|
||||
self.did_set_interval = False
|
||||
self.thread = None
|
||||
self.skip = False
|
||||
self.crashed_value = None
|
||||
self.update_value = None
|
||||
|
||||
def __call__(self, pl, update_first=True, **kwargs):
|
||||
if self.run_once:
|
||||
self.pl = pl
|
||||
self.set_state(**kwargs)
|
||||
self.update()
|
||||
update_value = self.get_update_value(True)
|
||||
elif not self.is_alive():
|
||||
# Without this we will not have to wait long until receiving bug “I
|
||||
# opened vim, but branch information is only shown after I move
|
||||
# cursor”.
|
||||
#
|
||||
# If running once .update() is called in __call__.
|
||||
if update_first and self.update_first:
|
||||
self.update()
|
||||
update_value = self.get_update_value(update_first and self.update_first)
|
||||
self.start()
|
||||
elif not self.updated:
|
||||
self.update()
|
||||
update_value = self.get_update_value(True)
|
||||
else:
|
||||
update_value = self.update_value
|
||||
|
||||
if self.skip:
|
||||
return self.crashed_value
|
||||
with self.write_lock:
|
||||
return self.render(update_first=update_first, pl=pl, **kwargs)
|
||||
|
||||
return self.render(update_value, update_first=update_first, pl=pl, **kwargs)
|
||||
|
||||
def get_update_value(self, update=False):
|
||||
if update:
|
||||
self.update_value = self.update(self.update_value)
|
||||
return self.update_value
|
||||
|
||||
def is_alive(self):
|
||||
return self.thread and self.thread.is_alive()
|
||||
|
||||
def start(self):
|
||||
self.keep_going = True
|
||||
self.shutdown_event.clear()
|
||||
self.thread = Thread(target=self.run)
|
||||
self.thread.daemon = self.daemon
|
||||
self.thread.start()
|
||||
|
||||
def sleep(self, adjust_time):
|
||||
self.shutdown_event.wait(max(self.interval - adjust_time, self.min_sleep_time))
|
||||
if self.shutdown_event.is_set():
|
||||
self.keep_going = False
|
||||
|
||||
def run(self):
|
||||
while self.keep_going:
|
||||
while not self.shutdown_event.is_set():
|
||||
start_time = monotonic()
|
||||
try:
|
||||
self.update()
|
||||
self.update_value = self.update(self.update_value)
|
||||
except Exception as e:
|
||||
self.error('Exception while updating: {0}', str(e))
|
||||
self.exception('Exception while updating: {0}', str(e))
|
||||
self.skip = True
|
||||
except KeyboardInterrupt:
|
||||
self.warn('Caught keyboard interrupt while updating')
|
||||
self.skip = True
|
||||
else:
|
||||
self.skip = False
|
||||
self.sleep(monotonic() - start_time)
|
||||
self.shutdown_event.wait(max(self.interval - (monotonic() - start_time), self.min_sleep_time))
|
||||
|
||||
def shutdown(self):
|
||||
self.shutdown_event.set()
|
||||
if self.daemon and self.is_alive():
|
||||
self.thread.join()
|
||||
|
||||
def set_interval(self, interval=None):
|
||||
# Allowing “interval” keyword in configuration.
|
||||
|
@ -79,19 +86,19 @@ class ThreadedSegment(object):
|
|||
# .set_interval().
|
||||
interval = interval or getattr(self, 'interval')
|
||||
self.interval = interval
|
||||
self.has_set_interval = True
|
||||
|
||||
def set_state(self, interval=None, update_first=True, **kwargs):
|
||||
if not self.did_set_interval or interval:
|
||||
self.set_interval(interval)
|
||||
self.updated = not (update_first and self.update_first)
|
||||
self.set_interval(interval)
|
||||
self.updated = not (update_first and self.update_first)
|
||||
|
||||
def startup(self, pl, **kwargs):
|
||||
self.run_once = False
|
||||
self.pl = pl
|
||||
self.daemon = pl.use_daemon_threads
|
||||
|
||||
self.set_state(**kwargs)
|
||||
|
||||
if not self.is_alive():
|
||||
self.set_state(**kwargs)
|
||||
self.start()
|
||||
|
||||
def error(self, *args, **kwargs):
|
||||
|
@ -117,52 +124,57 @@ class KwThreadedSegment(ThreadedSegment):
|
|||
|
||||
def __init__(self):
|
||||
super(KwThreadedSegment, self).__init__()
|
||||
self.queries = {}
|
||||
self.crashed = set()
|
||||
self.updated = True
|
||||
self.update_value = ({}, set())
|
||||
self.write_lock = Lock()
|
||||
self.new_queries = {}
|
||||
|
||||
@staticmethod
|
||||
def key(**kwargs):
|
||||
return frozenset(kwargs.items())
|
||||
|
||||
def render(self, update_first, **kwargs):
|
||||
def render(self, update_value, update_first, **kwargs):
|
||||
queries, crashed = update_value
|
||||
key = self.key(**kwargs)
|
||||
if key in self.crashed:
|
||||
if key in crashed:
|
||||
return self.crashed_value
|
||||
|
||||
try:
|
||||
update_state = self.queries[key][1]
|
||||
update_state = queries[key][1]
|
||||
except KeyError:
|
||||
# Allow only to forbid to compute missing values: in either user
|
||||
# configuration or in subclasses.
|
||||
update_state = self.compute_state(key) if update_first and self.update_first or self.run_once else None
|
||||
|
||||
# No locks: render method is already running with write_lock acquired.
|
||||
self.queries[key] = (monotonic(), update_state)
|
||||
with self.write_lock:
|
||||
self.new_queries[key] = (monotonic(), update_state)
|
||||
return self.render_one(update_state, **kwargs)
|
||||
|
||||
def update(self):
|
||||
def update(self, old_update_value):
|
||||
updates = {}
|
||||
removes = []
|
||||
for key, (last_query_time, state) in list(self.queries.items()):
|
||||
crashed = set()
|
||||
update_value = (updates, crashed)
|
||||
queries = old_update_value[0]
|
||||
with self.write_lock:
|
||||
if self.new_queries:
|
||||
queries.update(self.new_queries)
|
||||
self.new_queries.clear()
|
||||
|
||||
for key, (last_query_time, state) in queries.items():
|
||||
if last_query_time < monotonic() < last_query_time + self.drop_interval:
|
||||
try:
|
||||
updates[key] = (last_query_time, self.compute_state(key))
|
||||
except Exception as e:
|
||||
self.exception('Exception while computing state for {0}: {1}', repr(key), str(e))
|
||||
with self.write_lock:
|
||||
self.crashed.add(key)
|
||||
else:
|
||||
removes.append(key)
|
||||
with self.write_lock:
|
||||
self.queries.update(updates)
|
||||
self.crashed -= set(updates)
|
||||
for key in removes:
|
||||
self.queries.pop(key)
|
||||
self.exception('Exception while computing state for {0!r}: {1}', key, str(e))
|
||||
crashed.add(key)
|
||||
except KeyboardInterrupt:
|
||||
self.warn('Interrupt while computing state for {0!r}', key)
|
||||
crashed.add(key)
|
||||
|
||||
return update_value
|
||||
|
||||
def set_state(self, interval=None, update_first=True, **kwargs):
|
||||
if not self.did_set_interval or (interval < self.interval):
|
||||
self.set_interval(interval)
|
||||
self.set_interval(interval)
|
||||
|
||||
if self.update_first:
|
||||
self.update_first = update_first
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
from powerline.lint.markedjson import load
|
||||
from powerline import load_json_config, Powerline
|
||||
from powerline import load_json_config, find_config_file, Powerline
|
||||
from powerline.lint.markedjson.error import echoerr, MarkedError
|
||||
from powerline.segments.vim import vim_modes
|
||||
import itertools
|
||||
|
@ -21,14 +21,6 @@ def open_file(path):
|
|||
return open(path, 'rb')
|
||||
|
||||
|
||||
def find_config(search_paths, config_file):
|
||||
config_file += '.json'
|
||||
for path in search_paths:
|
||||
if os.path.isfile(os.path.join(path, config_file)):
|
||||
return path
|
||||
return None
|
||||
|
||||
|
||||
EMPTYTUPLE = tuple()
|
||||
|
||||
|
||||
|
@ -893,7 +885,7 @@ def check(path=None):
|
|||
|
||||
hadproblem = False
|
||||
try:
|
||||
main_config = load_json_config(search_paths, 'config', load=load_config, open_file=open_file)
|
||||
main_config = load_json_config(find_config_file(search_paths, 'config'), load=load_config, open_file=open_file)
|
||||
except IOError:
|
||||
main_config = {}
|
||||
sys.stderr.write('\nConfiguration file not found: config.json\n')
|
||||
|
@ -909,7 +901,7 @@ def check(path=None):
|
|||
import_paths = [os.path.expanduser(path) for path in main_config.get('common', {}).get('paths', [])]
|
||||
|
||||
try:
|
||||
colors_config = load_json_config(search_paths, 'colors', load=load_config, open_file=open_file)
|
||||
colors_config = load_json_config(find_config_file(search_paths, 'colors'), load=load_config, open_file=open_file)
|
||||
except IOError:
|
||||
colors_config = {}
|
||||
sys.stderr.write('\nConfiguration file not found: colors.json\n')
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
from powerline.theme import Theme
|
||||
from unicodedata import east_asian_width, combining
|
||||
import os
|
||||
|
||||
|
||||
try:
|
||||
|
@ -18,7 +19,19 @@ def construct_returned_value(rendered_highlighted, segments, output_raw):
|
|||
|
||||
|
||||
class Renderer(object):
|
||||
def __init__(self, theme_config, local_themes, theme_kwargs, colorscheme, pl, **options):
|
||||
segment_info = {
|
||||
'environ': os.environ,
|
||||
'getcwd': getattr(os, 'getcwdu', os.getcwd),
|
||||
'home': os.environ.get('HOME'),
|
||||
}
|
||||
|
||||
def __init__(self,
|
||||
theme_config,
|
||||
local_themes,
|
||||
theme_kwargs,
|
||||
colorscheme,
|
||||
pl,
|
||||
**options):
|
||||
self.__dict__.update(options)
|
||||
self.theme_config = theme_config
|
||||
theme_kwargs['pl'] = pl
|
||||
|
@ -53,6 +66,9 @@ class Renderer(object):
|
|||
segment['divider_highlight'] = None
|
||||
return segment
|
||||
|
||||
def get_segment_info(self, segment_info):
|
||||
return segment_info or self.segment_info
|
||||
|
||||
def render(self, mode=None, width=None, side=None, output_raw=False, segment_info=None, matcher_info=None):
|
||||
'''Render all segments.
|
||||
|
||||
|
@ -63,7 +79,7 @@ class Renderer(object):
|
|||
reached.
|
||||
'''
|
||||
theme = self.get_theme(matcher_info)
|
||||
segments = theme.get_segments(side, segment_info)
|
||||
segments = theme.get_segments(side, self.get_segment_info(segment_info))
|
||||
|
||||
# Handle excluded/included segments for the current mode
|
||||
segments = [self.get_highlighting(segment, mode) for segment in segments
|
||||
|
|
|
@ -9,6 +9,11 @@ class IpythonRenderer(ShellRenderer):
|
|||
escape_hl_start = '\x01'
|
||||
escape_hl_end = '\x02'
|
||||
|
||||
def get_segment_info(self, segment_info):
|
||||
r = self.segment_info.copy()
|
||||
r['ipython'] = segment_info
|
||||
return r
|
||||
|
||||
def get_theme(self, matcher_info):
|
||||
if matcher_info == 'in':
|
||||
return self.theme
|
||||
|
|
|
@ -19,6 +19,13 @@ class ShellRenderer(Renderer):
|
|||
tmux_escape = False
|
||||
screen_escape = False
|
||||
|
||||
def get_segment_info(self, segment_info):
|
||||
r = self.segment_info.copy()
|
||||
r.update(segment_info)
|
||||
if 'PWD' in r['environ']:
|
||||
r['getcwd'] = lambda: r['environ']['PWD']
|
||||
return r
|
||||
|
||||
def hlstyle(self, fg=None, bg=None, attr=None):
|
||||
'''Highlight a segment.
|
||||
|
||||
|
|
|
@ -76,11 +76,17 @@ class VimRenderer(Renderer):
|
|||
'window': vim.windows[winidx],
|
||||
'mode': mode,
|
||||
'window_id': window_id,
|
||||
}
|
||||
}
|
||||
segment_info['buffer'] = segment_info['window'].buffer
|
||||
segment_info['bufnr'] = segment_info['buffer'].number
|
||||
segment_info.update(self.segment_info)
|
||||
winwidth = segment_info['window'].width
|
||||
statusline = super(VimRenderer, self).render(mode, winwidth, segment_info=segment_info, matcher_info=segment_info)
|
||||
statusline = super(VimRenderer, self).render(
|
||||
mode=mode,
|
||||
width=winwidth,
|
||||
segment_info=segment_info,
|
||||
matcher_info=segment_info,
|
||||
)
|
||||
return statusline
|
||||
|
||||
def reset_highlight(self):
|
||||
|
@ -109,7 +115,7 @@ class VimRenderer(Renderer):
|
|||
'guibg': None,
|
||||
'attr': ['NONE'],
|
||||
'name': '',
|
||||
}
|
||||
}
|
||||
if fg is not None and fg is not False:
|
||||
hl_group['ctermfg'] = fg[0]
|
||||
hl_group['guifg'] = fg[1]
|
||||
|
@ -132,13 +138,13 @@ class VimRenderer(Renderer):
|
|||
''.join(hl_group['attr'])
|
||||
self.hl_groups[(fg, bg, attr)] = hl_group
|
||||
vim.command('hi {group} ctermfg={ctermfg} guifg={guifg} guibg={guibg} ctermbg={ctermbg} cterm={attr} gui={attr}'.format(
|
||||
group=hl_group['name'],
|
||||
ctermfg=hl_group['ctermfg'],
|
||||
guifg='#{0:06x}'.format(hl_group['guifg']) if hl_group['guifg'] is not None else 'NONE',
|
||||
ctermbg=hl_group['ctermbg'],
|
||||
guibg='#{0:06x}'.format(hl_group['guibg']) if hl_group['guibg'] is not None else 'NONE',
|
||||
attr=','.join(hl_group['attr']),
|
||||
))
|
||||
group=hl_group['name'],
|
||||
ctermfg=hl_group['ctermfg'],
|
||||
guifg='#{0:06x}'.format(hl_group['guifg']) if hl_group['guifg'] is not None else 'NONE',
|
||||
ctermbg=hl_group['ctermbg'],
|
||||
guibg='#{0:06x}'.format(hl_group['guibg']) if hl_group['guibg'] is not None else 'NONE',
|
||||
attr=','.join(hl_group['attr']),
|
||||
))
|
||||
return '%#' + self.hl_groups[(fg, bg, attr)]['name'] + '#'
|
||||
|
||||
|
||||
|
|
|
@ -15,36 +15,39 @@ from powerline.lib.vcs import guess
|
|||
from powerline.lib.threaded import ThreadedSegment, KwThreadedSegment, with_docstring
|
||||
from powerline.lib.time import monotonic
|
||||
from powerline.lib.humanize_bytes import humanize_bytes
|
||||
from powerline.theme import requires_segment_info
|
||||
from collections import namedtuple
|
||||
|
||||
|
||||
def hostname(pl, only_if_ssh=False):
|
||||
@requires_segment_info
|
||||
def hostname(pl, segment_info, only_if_ssh=False):
|
||||
'''Return the current hostname.
|
||||
|
||||
:param bool only_if_ssh:
|
||||
only return the hostname if currently in an SSH session
|
||||
'''
|
||||
if only_if_ssh and not pl.environ.get('SSH_CLIENT'):
|
||||
if only_if_ssh and not segment_info['environ'].get('SSH_CLIENT'):
|
||||
return None
|
||||
return socket.gethostname()
|
||||
|
||||
|
||||
@requires_segment_info
|
||||
class RepositorySegment(KwThreadedSegment):
|
||||
def __init__(self):
|
||||
super(RepositorySegment, self).__init__()
|
||||
self.directories = {}
|
||||
|
||||
@staticmethod
|
||||
def key(pl, **kwargs):
|
||||
return os.path.abspath(pl.getcwd())
|
||||
def key(segment_info, **kwargs):
|
||||
return os.path.abspath(segment_info['getcwd']())
|
||||
|
||||
def update(self):
|
||||
def update(self, *args):
|
||||
# .compute_state() is running only in this method, and only in one
|
||||
# thread, thus operations with .directories do not need write locks
|
||||
# (.render() method is not using .directories). If this is changed
|
||||
# .directories needs redesigning
|
||||
self.directories.clear()
|
||||
super(RepositorySegment, self).update()
|
||||
return super(RepositorySegment, self).update(*args)
|
||||
|
||||
def compute_state(self, path):
|
||||
repo = guess(path=path)
|
||||
|
@ -83,7 +86,7 @@ class BranchSegment(RepositorySegment):
|
|||
return [{
|
||||
'contents': branch,
|
||||
'highlight_group': ['branch_dirty' if repository_status(**kwargs) else 'branch_clean', 'branch'],
|
||||
}]
|
||||
}]
|
||||
else:
|
||||
return branch
|
||||
|
||||
|
@ -109,7 +112,8 @@ Highlight groups used: ``branch_clean``, ``branch_dirty``, ``branch``.
|
|||
''')
|
||||
|
||||
|
||||
def cwd(pl, dir_shorten_len=None, dir_limit_depth=None):
|
||||
@requires_segment_info
|
||||
def cwd(pl, segment_info, dir_shorten_len=None, dir_limit_depth=None):
|
||||
'''Return the current working directory.
|
||||
|
||||
Returns a segment list to create a breadcrumb-like effect.
|
||||
|
@ -126,7 +130,7 @@ def cwd(pl, dir_shorten_len=None, dir_limit_depth=None):
|
|||
'''
|
||||
import re
|
||||
try:
|
||||
cwd = pl.getcwd()
|
||||
cwd = segment_info['getcwd']()
|
||||
except OSError as e:
|
||||
if e.errno == 2:
|
||||
# user most probably deleted the directory
|
||||
|
@ -135,7 +139,7 @@ def cwd(pl, dir_shorten_len=None, dir_limit_depth=None):
|
|||
cwd = "[not found]"
|
||||
else:
|
||||
raise
|
||||
home = pl.home
|
||||
home = segment_info['home']
|
||||
if home:
|
||||
cwd = re.sub('^' + re.escape(home), '~', cwd, 1)
|
||||
cwd_split = cwd.split(os.sep)
|
||||
|
@ -153,7 +157,7 @@ def cwd(pl, dir_shorten_len=None, dir_limit_depth=None):
|
|||
ret.append({
|
||||
'contents': part,
|
||||
'divider_highlight_group': 'cwd:divider',
|
||||
})
|
||||
})
|
||||
ret[-1]['highlight_group'] = ['cwd:current_folder', 'cwd']
|
||||
return ret
|
||||
|
||||
|
@ -190,7 +194,7 @@ def fuzzy_time(pl):
|
|||
45: 'quarter to',
|
||||
50: 'ten to',
|
||||
55: 'five to',
|
||||
}
|
||||
}
|
||||
special_case_str = {
|
||||
(23, 58): 'round about midnight',
|
||||
(23, 59): 'round about midnight',
|
||||
|
@ -198,7 +202,7 @@ def fuzzy_time(pl):
|
|||
(0, 1): 'round about midnight',
|
||||
(0, 2): 'round about midnight',
|
||||
(12, 0): 'noon',
|
||||
}
|
||||
}
|
||||
|
||||
now = datetime.now()
|
||||
|
||||
|
@ -230,19 +234,19 @@ def _external_ip(query_url='http://ipv4.icanhazip.com/'):
|
|||
|
||||
|
||||
class ExternalIpSegment(ThreadedSegment):
|
||||
interval = 10
|
||||
|
||||
def set_state(self, query_url='http://ipv4.icanhazip.com/', **kwargs):
|
||||
self.query_url = query_url
|
||||
super(ExternalIpSegment, self).set_state(**kwargs)
|
||||
|
||||
def update(self):
|
||||
ip = _external_ip(query_url=self.query_url)
|
||||
with self.write_lock:
|
||||
self.ip = ip
|
||||
def update(self, old_ip):
|
||||
return _external_ip(query_url=self.query_url)
|
||||
|
||||
def render(self, **kwargs):
|
||||
if not hasattr(self, 'ip'):
|
||||
def render(self, ip, **kwargs):
|
||||
if not ip:
|
||||
return None
|
||||
return [{'contents': self.ip, 'divider_highlight_group': 'background:divider'}]
|
||||
return [{'contents': ip, 'divider_highlight_group': 'background:divider'}]
|
||||
|
||||
|
||||
external_ip = with_docstring(ExternalIpSegment(),
|
||||
|
@ -337,17 +341,17 @@ weather_conditions_icons = {
|
|||
}
|
||||
|
||||
temp_conversions = {
|
||||
'C': lambda temp: temp,
|
||||
'F': lambda temp: (temp * 9 / 5) + 32,
|
||||
'K': lambda temp: temp + 273.15,
|
||||
}
|
||||
'C': lambda temp: temp,
|
||||
'F': lambda temp: (temp * 9 / 5) + 32,
|
||||
'K': lambda temp: temp + 273.15,
|
||||
}
|
||||
|
||||
# Note: there are also unicode characters for units: ℃, ℉ and K
|
||||
temp_units = {
|
||||
'C': '°C',
|
||||
'F': '°F',
|
||||
'K': 'K',
|
||||
}
|
||||
'C': '°C',
|
||||
'F': '°F',
|
||||
'K': 'K',
|
||||
}
|
||||
|
||||
|
||||
class WeatherSegment(ThreadedSegment):
|
||||
|
@ -356,10 +360,9 @@ class WeatherSegment(ThreadedSegment):
|
|||
def set_state(self, location_query=None, **kwargs):
|
||||
self.location = location_query
|
||||
self.url = None
|
||||
self.condition = {}
|
||||
super(WeatherSegment, self).set_state(**kwargs)
|
||||
|
||||
def update(self):
|
||||
def update(self, old_weather):
|
||||
import json
|
||||
|
||||
if not self.url:
|
||||
|
@ -371,11 +374,11 @@ class WeatherSegment(ThreadedSegment):
|
|||
location_data['region_name'],
|
||||
location_data['country_name']])
|
||||
query_data = {
|
||||
'q':
|
||||
'use "http://github.com/yql/yql-tables/raw/master/weather/weather.bylocation.xml" as we;'
|
||||
'select * from we where location="{0}" and unit="c"'.format(self.location).encode('utf-8'),
|
||||
'format': 'json',
|
||||
}
|
||||
'q':
|
||||
'use "http://github.com/yql/yql-tables/raw/master/weather/weather.bylocation.xml" as we;'
|
||||
'select * from we where location="{0}" and unit="c"'.format(self.location).encode('utf-8'),
|
||||
'format': 'json',
|
||||
}
|
||||
self.url = 'http://query.yahooapis.com/v1/public/yql?' + urllib_urlencode(query_data)
|
||||
|
||||
raw_response = urllib_read(self.url)
|
||||
|
@ -397,45 +400,45 @@ class WeatherSegment(ThreadedSegment):
|
|||
icon_names = ('unknown',)
|
||||
self.error('Unknown condition code: {0}', condition_code)
|
||||
|
||||
with self.write_lock:
|
||||
self.temp = temp
|
||||
self.icon_names = icon_names
|
||||
return (temp, icon_names)
|
||||
|
||||
def render(self, icons=None, unit='C', temp_format=None, temp_coldest=-30, temp_hottest=40, **kwargs):
|
||||
if not hasattr(self, 'icon_names'):
|
||||
def render(self, weather, icons=None, unit='C', temp_format=None, temp_coldest=-30, temp_hottest=40, **kwargs):
|
||||
if not weather:
|
||||
return None
|
||||
|
||||
for icon_name in self.icon_names:
|
||||
temp, icon_names = weather
|
||||
|
||||
for icon_name in icon_names:
|
||||
if icons:
|
||||
if icon_name in icons:
|
||||
icon = icons[icon_name]
|
||||
break
|
||||
else:
|
||||
icon = weather_conditions_icons[self.icon_names[-1]]
|
||||
icon = weather_conditions_icons[icon_names[-1]]
|
||||
|
||||
temp_format = temp_format or ('{temp:.0f}' + temp_units[unit])
|
||||
temp = temp_conversions[unit](self.temp)
|
||||
if self.temp <= temp_coldest:
|
||||
converted_temp = temp_conversions[unit](temp)
|
||||
if temp <= temp_coldest:
|
||||
gradient_level = 0
|
||||
elif self.temp >= temp_hottest:
|
||||
elif temp >= temp_hottest:
|
||||
gradient_level = 100
|
||||
else:
|
||||
gradient_level = (self.temp - temp_coldest) * 100.0 / (temp_hottest - temp_coldest)
|
||||
groups = ['weather_condition_' + icon_name for icon_name in self.icon_names] + ['weather_conditions', 'weather']
|
||||
gradient_level = (temp - temp_coldest) * 100.0 / (temp_hottest - temp_coldest)
|
||||
groups = ['weather_condition_' + icon_name for icon_name in icon_names] + ['weather_conditions', 'weather']
|
||||
return [
|
||||
{
|
||||
{
|
||||
'contents': icon + ' ',
|
||||
'highlight_group': groups,
|
||||
'divider_highlight_group': 'background:divider',
|
||||
},
|
||||
{
|
||||
'contents': temp_format.format(temp=temp),
|
||||
},
|
||||
{
|
||||
'contents': temp_format.format(temp=converted_temp),
|
||||
'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'],
|
||||
'draw_divider': False,
|
||||
'divider_highlight_group': 'background:divider',
|
||||
'gradient_level': gradient_level,
|
||||
},
|
||||
]
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
weather = with_docstring(WeatherSegment(),
|
||||
|
@ -517,7 +520,7 @@ def system_load(pl, format='{avg:.1f}', threshold_good=1, threshold_bad=2):
|
|||
'draw_divider': False,
|
||||
'divider_highlight_group': 'background:divider',
|
||||
'gradient_level': gradient_level,
|
||||
})
|
||||
})
|
||||
ret[0]['draw_divider'] = True
|
||||
ret[0]['contents'] += ' '
|
||||
ret[1]['contents'] += ' '
|
||||
|
@ -540,7 +543,7 @@ try:
|
|||
if data:
|
||||
yield interface, data.bytes_recv, data.bytes_sent
|
||||
|
||||
def _get_user(pl):
|
||||
def _get_user(segment_info):
|
||||
return psutil.Process(os.getpid()).username
|
||||
|
||||
def cpu_load_percent(pl, measure_interval=.5):
|
||||
|
@ -567,8 +570,8 @@ except ImportError:
|
|||
if x is not None:
|
||||
yield interface, x[0], x[1]
|
||||
|
||||
def _get_user(pl): # NOQA
|
||||
return pl.environ.get('USER', None)
|
||||
def _get_user(segment_info): # NOQA
|
||||
return segment_info['environ'].get('USER', None)
|
||||
|
||||
def cpu_load_percent(pl, measure_interval=.5): # NOQA
|
||||
'''Return the average CPU load as a percentage.
|
||||
|
@ -587,7 +590,7 @@ username = False
|
|||
_geteuid = getattr(os, 'geteuid', lambda: 1)
|
||||
|
||||
|
||||
def user(pl):
|
||||
def user(pl, segment_info=None):
|
||||
'''Return the current user.
|
||||
|
||||
Highlights the user with the ``superuser`` if the effective user ID is 0.
|
||||
|
@ -602,9 +605,11 @@ def user(pl):
|
|||
return None
|
||||
euid = _geteuid()
|
||||
return [{
|
||||
'contents': username,
|
||||
'highlight_group': 'user' if euid != 0 else ['superuser', 'user'],
|
||||
}]
|
||||
'contents': username,
|
||||
'highlight_group': 'user' if euid != 0 else ['superuser', 'user'],
|
||||
}]
|
||||
if 'psutil' in globals():
|
||||
user = requires_segment_info(user)
|
||||
|
||||
|
||||
if os.path.exists('/proc/uptime'):
|
||||
|
@ -691,7 +696,7 @@ class NetworkLoadSegment(KwThreadedSegment):
|
|||
idata = {}
|
||||
if self.run_once:
|
||||
idata['prev'] = (monotonic(), _get_bytes(interface))
|
||||
self.sleep(0)
|
||||
self.shutdown_event.wait(self.interval)
|
||||
self.interfaces[interface] = idata
|
||||
|
||||
idata['last'] = (monotonic(), _get_bytes(interface))
|
||||
|
@ -724,7 +729,7 @@ class NetworkLoadSegment(KwThreadedSegment):
|
|||
'contents': format.format(value=humanize_bytes(value, suffix, si_prefix)),
|
||||
'divider_highlight_group': 'background:divider',
|
||||
'highlight_group': hl_groups,
|
||||
})
|
||||
})
|
||||
if is_gradient:
|
||||
max = kwargs[max_key]
|
||||
if value >= max:
|
||||
|
@ -765,9 +770,10 @@ Highlight groups used: ``network_load_sent_gradient`` (gradient) or ``network_lo
|
|||
''')
|
||||
|
||||
|
||||
def virtualenv(pl):
|
||||
@requires_segment_info
|
||||
def virtualenv(pl, segment_info):
|
||||
'''Return the name of the current Python virtualenv.'''
|
||||
return os.path.basename(pl.environ.get('VIRTUAL_ENV', '')) or None
|
||||
return os.path.basename(segment_info['environ'].get('VIRTUAL_ENV', '')) or None
|
||||
|
||||
|
||||
_IMAPKey = namedtuple('Key', 'username password server port folder')
|
||||
|
@ -804,13 +810,13 @@ class EmailIMAPSegment(KwThreadedSegment):
|
|||
return [{
|
||||
'contents': str(unread_count),
|
||||
'highlight_group': 'email_alert',
|
||||
}]
|
||||
}]
|
||||
else:
|
||||
return [{
|
||||
'contents': str(unread_count),
|
||||
'highlight_group': ['email_alert_gradient', 'email_alert'],
|
||||
'gradient_level': unread_count * 100.0 / max_msgs,
|
||||
}]
|
||||
}]
|
||||
|
||||
|
||||
email_imap_alert = with_docstring(EmailIMAPSegment(),
|
||||
|
@ -841,7 +847,7 @@ class NowPlayingSegment(object):
|
|||
'play': '▶',
|
||||
'pause': '▮▮',
|
||||
'stop': '■',
|
||||
}
|
||||
}
|
||||
|
||||
def __call__(self, player='mpd', format='{state_symbol} {artist} - {title} ({total})', **kwargs):
|
||||
player_func = getattr(self, 'player_{0}'.format(player))
|
||||
|
@ -853,7 +859,7 @@ class NowPlayingSegment(object):
|
|||
'title': None,
|
||||
'elapsed': None,
|
||||
'total': None,
|
||||
}
|
||||
}
|
||||
func_stats = player_func(**kwargs)
|
||||
if not func_stats:
|
||||
return None
|
||||
|
@ -921,7 +927,7 @@ class NowPlayingSegment(object):
|
|||
'title': now_playing.get('title'),
|
||||
'elapsed': self._convert_seconds(now_playing.get('position', 0)),
|
||||
'total': self._convert_seconds(now_playing.get('duration', 0)),
|
||||
}
|
||||
}
|
||||
|
||||
def player_mpd(self, pl, host='localhost', port=6600):
|
||||
try:
|
||||
|
@ -942,7 +948,7 @@ class NowPlayingSegment(object):
|
|||
'title': now_playing.get('title'),
|
||||
'elapsed': self._convert_seconds(now_playing.get('elapsed', 0)),
|
||||
'total': self._convert_seconds(now_playing.get('time', 0)),
|
||||
}
|
||||
}
|
||||
except ImportError:
|
||||
now_playing = self._run_cmd(['mpc', 'current', '-f', '%album%\n%artist%\n%title%\n%time%', '-h', str(host), '-p', str(port)])
|
||||
if not now_playing:
|
||||
|
@ -953,7 +959,7 @@ class NowPlayingSegment(object):
|
|||
'artist': now_playing[1],
|
||||
'title': now_playing[2],
|
||||
'total': now_playing[3],
|
||||
}
|
||||
}
|
||||
|
||||
def player_spotify(self, pl):
|
||||
try:
|
||||
|
@ -981,7 +987,7 @@ class NowPlayingSegment(object):
|
|||
'artist': info.get('xesam:artist')[0],
|
||||
'title': info.get('xesam:title'),
|
||||
'total': self._convert_seconds(info.get('mpris:length') / 1e6),
|
||||
}
|
||||
}
|
||||
|
||||
def player_rhythmbox(self, pl):
|
||||
now_playing = self._run_cmd(['rhythmbox-client', '--no-start', '--no-present', '--print-playing-format', '%at\n%aa\n%tt\n%te\n%td'])
|
||||
|
@ -994,5 +1000,5 @@ class NowPlayingSegment(object):
|
|||
'title': now_playing[2],
|
||||
'elapsed': now_playing[3],
|
||||
'total': now_playing[4],
|
||||
}
|
||||
}
|
||||
now_playing = NowPlayingSegment()
|
||||
|
|
|
@ -5,4 +5,4 @@ from powerline.theme import requires_segment_info
|
|||
|
||||
@requires_segment_info
|
||||
def prompt_count(pl, segment_info):
|
||||
return str(segment_info.prompt_count)
|
||||
return str(segment_info['ipython'].prompt_count)
|
||||
|
|
|
@ -9,9 +9,9 @@ def last_status(pl, segment_info):
|
|||
|
||||
Highlight groups used: ``exit_fail``
|
||||
'''
|
||||
if not segment_info.last_exit_code:
|
||||
if not segment_info['args'].last_exit_code:
|
||||
return None
|
||||
return [{'contents': str(segment_info.last_exit_code), 'highlight_group': 'exit_fail'}]
|
||||
return [{'contents': str(segment_info['args'].last_exit_code), 'highlight_group': 'exit_fail'}]
|
||||
|
||||
|
||||
@requires_segment_info
|
||||
|
@ -20,8 +20,9 @@ def last_pipe_status(pl, segment_info):
|
|||
|
||||
Highlight groups used: ``exit_fail``, ``exit_success``
|
||||
'''
|
||||
if any(segment_info.last_pipe_status):
|
||||
last_pipe_status = segment_info['args'].last_pipe_status
|
||||
if any(last_pipe_status):
|
||||
return [{"contents": str(status), "highlight_group": "exit_fail" if status else "exit_success"}
|
||||
for status in segment_info.last_pipe_status]
|
||||
for status in last_pipe_status]
|
||||
else:
|
||||
return None
|
||||
|
|
|
@ -181,7 +181,7 @@ def file_name(pl, segment_info, display_no_file=False, no_file_text='[No file]')
|
|||
return [{
|
||||
'contents': no_file_text,
|
||||
'highlight_group': ['file_name_no_file', 'file_name'],
|
||||
}]
|
||||
}]
|
||||
else:
|
||||
return None
|
||||
file_name = vim_funcs['fnamemodify'](name, ':~:.:t')
|
||||
|
@ -258,7 +258,7 @@ def line_percent(pl, segment_info, gradient=False):
|
|||
'contents': str(int(round(percentage))),
|
||||
'highlight_group': ['line_percent_gradient', 'line_percent'],
|
||||
'gradient_level': percentage,
|
||||
}]
|
||||
}]
|
||||
|
||||
|
||||
@requires_segment_info
|
||||
|
@ -318,13 +318,13 @@ class RepositorySegment(KwWindowThreadedSegment):
|
|||
# FIXME os.getcwd() is not a proper variant for non-current buffers
|
||||
return segment_info['buffer'].name or os.getcwd()
|
||||
|
||||
def update(self):
|
||||
def update(self, *args):
|
||||
# .compute_state() is running only in this method, and only in one
|
||||
# thread, thus operations with .directories do not need write locks
|
||||
# (.render() method is not using .directories). If this is changed
|
||||
# .directories needs redesigning
|
||||
self.directories.clear()
|
||||
super(RepositorySegment, self).update()
|
||||
return super(RepositorySegment, self).update(*args)
|
||||
|
||||
def compute_state(self, path):
|
||||
repo = guess(path=path)
|
||||
|
@ -359,19 +359,19 @@ class BranchSegment(RepositorySegment):
|
|||
def process_repo(repo):
|
||||
return repo.branch()
|
||||
|
||||
def render_one(self, update_state, segment_info, status_colors=False, **kwargs):
|
||||
if not update_state:
|
||||
def render_one(self, branch, segment_info, status_colors=False, **kwargs):
|
||||
if not branch:
|
||||
return None
|
||||
|
||||
if status_colors:
|
||||
self.started_repository_status = True
|
||||
|
||||
return [{
|
||||
'contents': update_state,
|
||||
'contents': branch,
|
||||
'highlight_group': (['branch_dirty' if repository_status(segment_info=segment_info, **kwargs) else 'branch_clean']
|
||||
if status_colors else []) + ['branch'],
|
||||
'divider_highlight_group': 'branch:divider',
|
||||
}]
|
||||
}]
|
||||
|
||||
def startup(self, status_colors=False, **kwargs):
|
||||
super(BranchSegment, self).startup(**kwargs)
|
||||
|
@ -422,7 +422,7 @@ class FileVCSStatusSegment(KwWindowThreadedSegment):
|
|||
ret.append({
|
||||
'contents': status,
|
||||
'highlight_group': ['file_vcs_status_' + status, 'file_vcs_status'],
|
||||
})
|
||||
})
|
||||
return ret
|
||||
return None
|
||||
|
||||
|
|
|
@ -30,11 +30,11 @@ class Theme(object):
|
|||
self.segments = {
|
||||
'left': [],
|
||||
'right': [],
|
||||
}
|
||||
}
|
||||
self.EMPTY_SEGMENT = {
|
||||
'contents': None,
|
||||
'highlight': {'fg': False, 'bg': False, 'attr': 0}
|
||||
}
|
||||
}
|
||||
self.pl = pl
|
||||
theme_configs = [theme_config]
|
||||
if top_theme_config:
|
||||
|
|
|
@ -13,11 +13,12 @@ except ImportError:
|
|||
|
||||
if __name__ == '__main__':
|
||||
args = get_argparser(description=__doc__).parse_args()
|
||||
kwargs = {}
|
||||
if 'PWD' in os.environ:
|
||||
kwargs['getcwd'] = lambda: os.environ['PWD']
|
||||
powerline = ShellPowerline(args, run_once=True, **kwargs)
|
||||
rendered = powerline.renderer.render(width=args.width, side=args.side, segment_info=args)
|
||||
powerline = ShellPowerline(args, run_once=True)
|
||||
rendered = powerline.render(
|
||||
width=args.width,
|
||||
side=args.side,
|
||||
segment_info={'args': args, 'environ': os.environ},
|
||||
)
|
||||
try:
|
||||
sys.stdout.write(rendered)
|
||||
except UnicodeEncodeError:
|
||||
|
|
8
setup.py
8
setup.py
|
@ -26,7 +26,7 @@ setup(
|
|||
scripts=[
|
||||
'scripts/powerline',
|
||||
'scripts/powerline-lint',
|
||||
],
|
||||
],
|
||||
keywords='',
|
||||
packages=find_packages(exclude=('tests', 'tests.*')),
|
||||
include_package_data=True,
|
||||
|
@ -35,7 +35,7 @@ setup(
|
|||
extras_require={
|
||||
'docs': [
|
||||
'Sphinx',
|
||||
],
|
||||
},
|
||||
],
|
||||
},
|
||||
test_suite='tests' if not old_python else None,
|
||||
)
|
||||
)
|
||||
|
|
|
@ -8,16 +8,8 @@ class Pl(object):
|
|||
self.errors = []
|
||||
self.warns = []
|
||||
self.debugs = []
|
||||
self._cwd = None
|
||||
self.prefix = None
|
||||
self.environ = {}
|
||||
self.home = None
|
||||
|
||||
def getcwd(self):
|
||||
if isinstance(self._cwd, Exception):
|
||||
raise self._cwd
|
||||
else:
|
||||
return self._cwd
|
||||
self.use_daemon_threads = True
|
||||
|
||||
for meth in ('error', 'warn', 'debug'):
|
||||
exec (('def {0}(self, msg, *args, **kwargs):\n'
|
||||
|
@ -132,9 +124,11 @@ class ItemReplace(object):
|
|||
self.d[self.key] = self.old
|
||||
|
||||
|
||||
def replace_env(key, new, d=None):
|
||||
r = None
|
||||
if not d:
|
||||
r = Pl()
|
||||
d = r.environ
|
||||
return ItemReplace(d, key, new, r)
|
||||
def replace_item(d, key, new):
|
||||
return ItemReplace(d, key, new, d)
|
||||
|
||||
|
||||
def replace_env(key, new, environ=None, **kwargs):
|
||||
r = kwargs.copy()
|
||||
r['environ'] = environ or {}
|
||||
return ItemReplace(r['environ'], key, new, r)
|
||||
|
|
|
@ -0,0 +1,366 @@
|
|||
# vim:fileencoding=utf-8:noet
|
||||
from __future__ import unicode_literals
|
||||
import powerline as powerline_module
|
||||
import time
|
||||
from powerline.renderer import Renderer
|
||||
from tests import TestCase
|
||||
from tests.lib import replace_item
|
||||
from copy import deepcopy
|
||||
from threading import Lock
|
||||
|
||||
|
||||
class Watcher(object):
|
||||
events = set()
|
||||
lock = Lock()
|
||||
|
||||
def watch(self, file):
|
||||
pass
|
||||
|
||||
def __call__(self, file):
|
||||
if file in self.events:
|
||||
with self.lock:
|
||||
self.events.remove(file)
|
||||
return True
|
||||
return False
|
||||
|
||||
def _reset(self, files):
|
||||
with self.lock:
|
||||
self.events.clear()
|
||||
self.events.update(files)
|
||||
|
||||
def unsubscribe(self):
|
||||
pass
|
||||
|
||||
|
||||
class Logger(object):
|
||||
def __init__(self):
|
||||
self.messages = []
|
||||
self.lock = Lock()
|
||||
|
||||
def _add_msg(self, msg):
|
||||
with self.lock:
|
||||
self.messages.append(msg)
|
||||
|
||||
def _pop_msgs(self):
|
||||
with self.lock:
|
||||
r = self.messages
|
||||
self.messages = []
|
||||
return r
|
||||
|
||||
def __getattr__(self, attr):
|
||||
return self._add_msg
|
||||
|
||||
|
||||
config = {
|
||||
'config': {
|
||||
'common': {
|
||||
'dividers': {
|
||||
"left": {
|
||||
"hard": ">>",
|
||||
"soft": ">",
|
||||
},
|
||||
"right": {
|
||||
"hard": "<<",
|
||||
"soft": "<",
|
||||
},
|
||||
},
|
||||
'spaces': 0,
|
||||
},
|
||||
'ext': {
|
||||
'test': {
|
||||
'theme': 'default',
|
||||
'colorscheme': 'default',
|
||||
},
|
||||
},
|
||||
},
|
||||
'colors': {
|
||||
'colors': {
|
||||
"col1": 1,
|
||||
"col2": 2,
|
||||
"col3": 3,
|
||||
"col4": 4,
|
||||
},
|
||||
'gradients': {
|
||||
},
|
||||
},
|
||||
'colorschemes/test/default': {
|
||||
'groups': {
|
||||
"str1": {"fg": "col1", "bg": "col2", "attr": ["bold"]},
|
||||
"str2": {"fg": "col3", "bg": "col4", "attr": ["underline"]},
|
||||
},
|
||||
},
|
||||
'colorschemes/test/2': {
|
||||
'groups': {
|
||||
"str1": {"fg": "col2", "bg": "col3", "attr": ["bold"]},
|
||||
"str2": {"fg": "col1", "bg": "col4", "attr": ["underline"]},
|
||||
},
|
||||
},
|
||||
'themes/test/default': {
|
||||
'segments': {
|
||||
"left": [
|
||||
{
|
||||
"type": "string",
|
||||
"contents": "s",
|
||||
"highlight_group": ["str1"],
|
||||
},
|
||||
{
|
||||
"type": "string",
|
||||
"contents": "g",
|
||||
"highlight_group": ["str2"],
|
||||
},
|
||||
],
|
||||
"right": [
|
||||
],
|
||||
},
|
||||
},
|
||||
'themes/test/2': {
|
||||
'segments': {
|
||||
"left": [
|
||||
{
|
||||
"type": "string",
|
||||
"contents": "t",
|
||||
"highlight_group": ["str1"],
|
||||
},
|
||||
{
|
||||
"type": "string",
|
||||
"contents": "b",
|
||||
"highlight_group": ["str2"],
|
||||
},
|
||||
],
|
||||
"right": [
|
||||
],
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
access_log = []
|
||||
access_lock = Lock()
|
||||
|
||||
|
||||
def load_json_config(config_file_path, *args, **kwargs):
|
||||
global access_log
|
||||
with access_lock:
|
||||
access_log.append(config_file_path)
|
||||
try:
|
||||
return deepcopy(config[config_file_path])
|
||||
except KeyError:
|
||||
raise IOError(config_file_path)
|
||||
|
||||
|
||||
def find_config_file(search_paths, config_file):
|
||||
if config_file.endswith('raise') and config_file not in config:
|
||||
raise IOError('fcf:' + config_file)
|
||||
return config_file
|
||||
|
||||
|
||||
class SimpleRenderer(Renderer):
|
||||
def hlstyle(self, fg=None, bg=None, attr=None):
|
||||
return '<{fg} {bg} {attr}>'.format(fg=fg and fg[0], bg=bg and bg[0], attr=attr)
|
||||
|
||||
|
||||
class TestPowerline(powerline_module.Powerline):
|
||||
_created = False
|
||||
|
||||
@staticmethod
|
||||
def get_local_themes(local_themes):
|
||||
return local_themes
|
||||
|
||||
def create_renderer(self, *args, **kwargs):
|
||||
try:
|
||||
r = super(TestPowerline, self).create_renderer(*args, **kwargs)
|
||||
finally:
|
||||
self._created = True
|
||||
return r
|
||||
|
||||
def _created_renderer(self):
|
||||
if self._created:
|
||||
self._created = False
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
renderer = SimpleRenderer
|
||||
|
||||
|
||||
def get_powerline(**kwargs):
|
||||
return TestPowerline(
|
||||
ext='test',
|
||||
renderer_module='tests.test_config_reload',
|
||||
interval=0,
|
||||
logger=Logger(),
|
||||
watcher=Watcher(),
|
||||
**kwargs
|
||||
)
|
||||
|
||||
|
||||
def sleep(interval):
|
||||
time.sleep(interval)
|
||||
|
||||
|
||||
def add_watcher_events(p, *args, **kwargs):
|
||||
p._created_renderer()
|
||||
p.watcher._reset(args)
|
||||
while not p._created_renderer():
|
||||
sleep(kwargs.get('interval', 0.000001))
|
||||
if not kwargs.get('wait', True):
|
||||
return
|
||||
|
||||
|
||||
class TestConfigReload(TestCase):
|
||||
def assertAccessEvents(self, *args):
|
||||
global access_log
|
||||
with access_lock:
|
||||
self.assertEqual(set(access_log), set(args))
|
||||
access_log = []
|
||||
|
||||
def test_noreload(self):
|
||||
with get_powerline(run_once=True) as p:
|
||||
with replace_item(globals(), 'config', deepcopy(config)):
|
||||
self.assertAccessEvents('config', 'colors', 'colorschemes/test/default', 'themes/test/default')
|
||||
self.assertEqual(p.render(), '<1 2 1> s<2 4 False>>><3 4 4>g<4 False False>>><None None None>')
|
||||
config['config']['common']['spaces'] = 1
|
||||
add_watcher_events(p, 'config', wait=False, interval=0.05)
|
||||
# When running once thread should not start
|
||||
self.assertAccessEvents()
|
||||
self.assertEqual(p.render(), '<1 2 1> s<2 4 False>>><3 4 4>g<4 False False>>><None None None>')
|
||||
self.assertEqual(p.logger._pop_msgs(), [])
|
||||
# Without the following assertion test_reload_colors may fail
|
||||
# for unknown reason (with AssertionError telling about “config”
|
||||
# accessed one more time then needed)
|
||||
self.assertAccessEvents()
|
||||
|
||||
def test_reload_main(self):
|
||||
with get_powerline(run_once=False) as p:
|
||||
with replace_item(globals(), 'config', deepcopy(config)):
|
||||
self.assertAccessEvents('config', 'colors', 'colorschemes/test/default', 'themes/test/default')
|
||||
self.assertEqual(p.render(), '<1 2 1> s<2 4 False>>><3 4 4>g<4 False False>>><None None None>')
|
||||
|
||||
config['config']['common']['spaces'] = 1
|
||||
add_watcher_events(p, 'config')
|
||||
self.assertAccessEvents('config')
|
||||
self.assertEqual(p.render(), '<1 2 1> s <2 4 False>>><3 4 4>g <4 False False>>><None None None>')
|
||||
self.assertEqual(p.logger._pop_msgs(), [])
|
||||
|
||||
config['config']['ext']['test']['theme'] = 'nonexistent'
|
||||
add_watcher_events(p, 'config')
|
||||
self.assertAccessEvents('config', 'themes/test/nonexistent')
|
||||
self.assertEqual(p.render(), '<1 2 1> s <2 4 False>>><3 4 4>g <4 False False>>><None None None>')
|
||||
# It should normally handle file missing error
|
||||
self.assertEqual(p.logger._pop_msgs(), ['test:Failed to create renderer: themes/test/nonexistent'])
|
||||
|
||||
config['config']['ext']['test']['theme'] = 'default'
|
||||
add_watcher_events(p, 'config')
|
||||
self.assertAccessEvents('config', 'themes/test/default')
|
||||
self.assertEqual(p.render(), '<1 2 1> s <2 4 False>>><3 4 4>g <4 False False>>><None None None>')
|
||||
self.assertEqual(p.logger._pop_msgs(), [])
|
||||
|
||||
config['config']['ext']['test']['colorscheme'] = 'nonexistent'
|
||||
add_watcher_events(p, 'config')
|
||||
self.assertAccessEvents('config', 'colorschemes/test/nonexistent')
|
||||
self.assertEqual(p.render(), '<1 2 1> s <2 4 False>>><3 4 4>g <4 False False>>><None None None>')
|
||||
# It should normally handle file missing error
|
||||
self.assertEqual(p.logger._pop_msgs(), ['test:Failed to create renderer: colorschemes/test/nonexistent'])
|
||||
|
||||
config['config']['ext']['test']['colorscheme'] = '2'
|
||||
add_watcher_events(p, 'config')
|
||||
self.assertAccessEvents('config', 'colorschemes/test/2')
|
||||
self.assertEqual(p.render(), '<2 3 1> s <3 4 False>>><1 4 4>g <4 False False>>><None None None>')
|
||||
self.assertEqual(p.logger._pop_msgs(), [])
|
||||
|
||||
config['config']['ext']['test']['theme'] = '2'
|
||||
add_watcher_events(p, 'config')
|
||||
self.assertAccessEvents('config', 'themes/test/2')
|
||||
self.assertEqual(p.render(), '<2 3 1> t <3 4 False>>><1 4 4>b <4 False False>>><None None None>')
|
||||
self.assertEqual(p.logger._pop_msgs(), [])
|
||||
|
||||
self.assertEqual(p.renderer.local_themes, None)
|
||||
config['config']['ext']['test']['local_themes'] = 'something'
|
||||
add_watcher_events(p, 'config')
|
||||
self.assertAccessEvents('config')
|
||||
self.assertEqual(p.render(), '<2 3 1> t <3 4 False>>><1 4 4>b <4 False False>>><None None None>')
|
||||
self.assertEqual(p.logger._pop_msgs(), [])
|
||||
self.assertEqual(p.renderer.local_themes, 'something')
|
||||
|
||||
def test_reload_unexistent(self):
|
||||
with get_powerline(run_once=False) as p:
|
||||
with replace_item(globals(), 'config', deepcopy(config)):
|
||||
self.assertAccessEvents('config', 'colors', 'colorschemes/test/default', 'themes/test/default')
|
||||
self.assertEqual(p.render(), '<1 2 1> s<2 4 False>>><3 4 4>g<4 False False>>><None None None>')
|
||||
|
||||
config['config']['ext']['test']['colorscheme'] = 'nonexistentraise'
|
||||
add_watcher_events(p, 'config')
|
||||
self.assertAccessEvents('config')
|
||||
self.assertEqual(p.render(), '<1 2 1> s<2 4 False>>><3 4 4>g<4 False False>>><None None None>')
|
||||
self.assertEqual(p.logger._pop_msgs(), ['test:Failed to create renderer: fcf:colorschemes/test/nonexistentraise'])
|
||||
|
||||
config['colorschemes/test/nonexistentraise'] = {
|
||||
'groups': {
|
||||
"str1": {"fg": "col1", "bg": "col3", "attr": ["bold"]},
|
||||
"str2": {"fg": "col2", "bg": "col4", "attr": ["underline"]},
|
||||
},
|
||||
}
|
||||
while not p._created_renderer():
|
||||
sleep(0.000001)
|
||||
self.assertAccessEvents('colorschemes/test/nonexistentraise')
|
||||
self.assertEqual(p.render(), '<1 3 1> s<3 4 False>>><2 4 4>g<4 False False>>><None None None>')
|
||||
self.assertEqual(p.logger._pop_msgs(), [])
|
||||
|
||||
def test_reload_colors(self):
|
||||
with get_powerline(run_once=False) as p:
|
||||
with replace_item(globals(), 'config', deepcopy(config)):
|
||||
self.assertAccessEvents('config', 'colors', 'colorschemes/test/default', 'themes/test/default')
|
||||
self.assertEqual(p.render(), '<1 2 1> s<2 4 False>>><3 4 4>g<4 False False>>><None None None>')
|
||||
|
||||
config['colors']['colors']['col1'] = 5
|
||||
add_watcher_events(p, 'colors')
|
||||
self.assertAccessEvents('colors')
|
||||
self.assertEqual(p.render(), '<5 2 1> s<2 4 False>>><3 4 4>g<4 False False>>><None None None>')
|
||||
self.assertEqual(p.logger._pop_msgs(), [])
|
||||
|
||||
def test_reload_colorscheme(self):
|
||||
with get_powerline(run_once=False) as p:
|
||||
with replace_item(globals(), 'config', deepcopy(config)):
|
||||
self.assertAccessEvents('config', 'colors', 'colorschemes/test/default', 'themes/test/default')
|
||||
self.assertEqual(p.render(), '<1 2 1> s<2 4 False>>><3 4 4>g<4 False False>>><None None None>')
|
||||
|
||||
config['colorschemes/test/default']['groups']['str1']['bg'] = 'col3'
|
||||
add_watcher_events(p, 'colorschemes/test/default')
|
||||
self.assertAccessEvents('colorschemes/test/default')
|
||||
self.assertEqual(p.render(), '<1 3 1> s<3 4 False>>><3 4 4>g<4 False False>>><None None None>')
|
||||
self.assertEqual(p.logger._pop_msgs(), [])
|
||||
|
||||
def test_reload_theme(self):
|
||||
with get_powerline(run_once=False) as p:
|
||||
with replace_item(globals(), 'config', deepcopy(config)):
|
||||
self.assertAccessEvents('config', 'colors', 'colorschemes/test/default', 'themes/test/default')
|
||||
self.assertEqual(p.render(), '<1 2 1> s<2 4 False>>><3 4 4>g<4 False False>>><None None None>')
|
||||
|
||||
config['themes/test/default']['segments']['left'][0]['contents'] = 'col3'
|
||||
add_watcher_events(p, 'themes/test/default')
|
||||
self.assertAccessEvents('themes/test/default')
|
||||
self.assertEqual(p.render(), '<1 2 1> col3<2 4 False>>><3 4 4>g<4 False False>>><None None None>')
|
||||
self.assertEqual(p.logger._pop_msgs(), [])
|
||||
|
||||
|
||||
replaces = {
|
||||
'watcher': Watcher(),
|
||||
'load_json_config': load_json_config,
|
||||
'find_config_file': find_config_file,
|
||||
}
|
||||
|
||||
|
||||
def swap_attributes():
|
||||
global replaces
|
||||
for attr, val in replaces.items():
|
||||
old_val = getattr(powerline_module, attr)
|
||||
setattr(powerline_module, attr, val)
|
||||
replaces[attr] = old_val
|
||||
|
||||
|
||||
tearDownModule = setUpModule = swap_attributes
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
from tests import main
|
||||
main()
|
|
@ -15,17 +15,6 @@ VBLOCK = chr(ord('V') - 0x40)
|
|||
SBLOCK = chr(ord('S') - 0x40)
|
||||
|
||||
|
||||
def shutdown(powerline):
|
||||
from powerline.segments import common, vim
|
||||
try:
|
||||
powerline.renderer.shutdown()
|
||||
finally:
|
||||
# After shutdown threads are useless, it is needed to recreate them.
|
||||
from imp import reload
|
||||
reload(common)
|
||||
reload(vim)
|
||||
|
||||
|
||||
class TestConfig(TestCase):
|
||||
def test_vim(self):
|
||||
from powerline.vim import VimPowerline
|
||||
|
@ -36,31 +25,30 @@ class TestConfig(TestCase):
|
|||
outputs = {}
|
||||
i = 0
|
||||
mode = None
|
||||
powerline = VimPowerline()
|
||||
|
||||
def check_output(*args):
|
||||
out = powerline.renderer.render(*args + (0 if mode == 'nc' else 1,))
|
||||
if out in outputs:
|
||||
self.fail('Duplicate in set #{0} for mode {1!r} (previously defined in set #{2} for mode {3!r})'.format(i, mode, *outputs[out]))
|
||||
outputs[out] = (i, mode)
|
||||
with VimPowerline() as powerline:
|
||||
def check_output(*args):
|
||||
out = powerline.render(*args + (0 if mode == 'nc' else 1,))
|
||||
if out in outputs:
|
||||
self.fail('Duplicate in set #{0} for mode {1!r} (previously defined in set #{2} for mode {3!r})'.format(i, mode, *outputs[out]))
|
||||
outputs[out] = (i, mode)
|
||||
|
||||
with vim_module._with('buffer', 'foo.txt'):
|
||||
with vim_module._with('globals', powerline_config_path=cfg_path):
|
||||
exclude = set(('no', 'v', 'V', VBLOCK, 's', 'S', SBLOCK, 'R', 'Rv', 'c', 'cv', 'ce', 'r', 'rm', 'r?', '!'))
|
||||
try:
|
||||
for mode in ['n', 'nc', 'no', 'v', 'V', VBLOCK, 's', 'S', SBLOCK, 'i', 'R', 'Rv', 'c', 'cv', 'ce', 'r', 'rm', 'r?', '!']:
|
||||
if mode != 'nc':
|
||||
vim_module._start_mode(mode)
|
||||
check_output(1, 0)
|
||||
for args, kwargs in buffers:
|
||||
i += 1
|
||||
if mode in exclude:
|
||||
continue
|
||||
with vim_module._with(*args, **kwargs):
|
||||
check_output(1, 0)
|
||||
finally:
|
||||
vim_module._start_mode('n')
|
||||
shutdown(powerline)
|
||||
with vim_module._with('buffer', 'foo.txt'):
|
||||
with vim_module._with('globals', powerline_config_path=cfg_path):
|
||||
exclude = set(('no', 'v', 'V', VBLOCK, 's', 'S', SBLOCK, 'R', 'Rv', 'c', 'cv', 'ce', 'r', 'rm', 'r?', '!'))
|
||||
try:
|
||||
for mode in ['n', 'nc', 'no', 'v', 'V', VBLOCK, 's', 'S', SBLOCK, 'i', 'R', 'Rv', 'c', 'cv', 'ce', 'r', 'rm', 'r?', '!']:
|
||||
if mode != 'nc':
|
||||
vim_module._start_mode(mode)
|
||||
check_output(1, 0)
|
||||
for args, kwargs in buffers:
|
||||
i += 1
|
||||
if mode in exclude:
|
||||
continue
|
||||
with vim_module._with(*args, **kwargs):
|
||||
check_output(1, 0)
|
||||
finally:
|
||||
vim_module._start_mode('n')
|
||||
|
||||
def test_tmux(self):
|
||||
from powerline.segments import common
|
||||
|
@ -68,29 +56,26 @@ class TestConfig(TestCase):
|
|||
reload(common)
|
||||
from powerline.shell import ShellPowerline
|
||||
with replace_attr(common, 'urllib_read', urllib_read):
|
||||
powerline = ShellPowerline(Args(ext=['tmux']), run_once=False)
|
||||
powerline.renderer.render()
|
||||
powerline = ShellPowerline(Args(ext=['tmux']), run_once=False)
|
||||
powerline.renderer.render()
|
||||
shutdown(powerline)
|
||||
with ShellPowerline(Args(ext=['tmux']), run_once=False) as powerline:
|
||||
powerline.render()
|
||||
with ShellPowerline(Args(ext=['tmux']), run_once=False) as powerline:
|
||||
powerline.render()
|
||||
|
||||
def test_zsh(self):
|
||||
from powerline.shell import ShellPowerline
|
||||
args = Args(last_pipe_status=[1, 0], ext=['shell'], renderer_module='zsh_prompt')
|
||||
powerline = ShellPowerline(args, run_once=False)
|
||||
powerline.renderer.render(segment_info=args)
|
||||
powerline = ShellPowerline(args, run_once=False)
|
||||
powerline.renderer.render(segment_info=args)
|
||||
shutdown(powerline)
|
||||
with ShellPowerline(args, run_once=False) as powerline:
|
||||
powerline.render(segment_info={'args': args})
|
||||
with ShellPowerline(args, run_once=False) as powerline:
|
||||
powerline.render(segment_info={'args': args})
|
||||
|
||||
def test_bash(self):
|
||||
from powerline.shell import ShellPowerline
|
||||
args = Args(last_exit_code=1, ext=['shell'], renderer_module='bash_prompt', config=[('ext', {'shell': {'theme': 'default_leftonly'}})])
|
||||
powerline = ShellPowerline(args, run_once=False)
|
||||
powerline.renderer.render(segment_info=args)
|
||||
powerline = ShellPowerline(args, run_once=False)
|
||||
powerline.renderer.render(segment_info=args)
|
||||
shutdown(powerline)
|
||||
with ShellPowerline(args, run_once=False) as powerline:
|
||||
powerline.render(segment_info={'args': args})
|
||||
with ShellPowerline(args, run_once=False) as powerline:
|
||||
powerline.render(segment_info={'args': args})
|
||||
|
||||
def test_ipython(self):
|
||||
from powerline.ipython import IpythonPowerline
|
||||
|
@ -100,12 +85,11 @@ class TestConfig(TestCase):
|
|||
config_overrides = None
|
||||
theme_overrides = {}
|
||||
|
||||
powerline = IpyPowerline()
|
||||
segment_info = Args(prompt_count=1)
|
||||
for prompt_type in ['in', 'in2', 'out', 'rewrite']:
|
||||
powerline.renderer.render(matcher_info=prompt_type, segment_info=segment_info)
|
||||
powerline.renderer.render(matcher_info=prompt_type, segment_info=segment_info)
|
||||
shutdown(powerline)
|
||||
with IpyPowerline() as powerline:
|
||||
segment_info = Args(prompt_count=1)
|
||||
for prompt_type in ['in', 'in2', 'out', 'rewrite']:
|
||||
powerline.render(matcher_info=prompt_type, segment_info=segment_info)
|
||||
powerline.render(matcher_info=prompt_type, segment_info=segment_info)
|
||||
|
||||
def test_wm(self):
|
||||
from powerline.segments import common
|
||||
|
@ -113,7 +97,7 @@ class TestConfig(TestCase):
|
|||
reload(common)
|
||||
from powerline import Powerline
|
||||
with replace_attr(common, 'urllib_read', urllib_read):
|
||||
Powerline(ext='wm', renderer_module='pango_markup', run_once=True).renderer.render()
|
||||
Powerline(ext='wm', renderer_module='pango_markup', run_once=True).render()
|
||||
reload(common)
|
||||
|
||||
|
||||
|
|
|
@ -14,15 +14,22 @@ vim = None
|
|||
class TestShell(TestCase):
|
||||
def test_last_status(self):
|
||||
pl = Pl()
|
||||
self.assertEqual(shell.last_status(pl=pl, segment_info=Args(last_exit_code=10)),
|
||||
segment_info = {'args': Args(last_exit_code=10)}
|
||||
self.assertEqual(shell.last_status(pl=pl, segment_info=segment_info),
|
||||
[{'contents': '10', 'highlight_group': 'exit_fail'}])
|
||||
self.assertEqual(shell.last_status(pl=pl, segment_info=Args(last_exit_code=None)), None)
|
||||
segment_info['args'].last_exit_code = 0
|
||||
self.assertEqual(shell.last_status(pl=pl, segment_info=segment_info), None)
|
||||
segment_info['args'].last_exit_code = None
|
||||
self.assertEqual(shell.last_status(pl=pl, segment_info=segment_info), None)
|
||||
|
||||
def test_last_pipe_status(self):
|
||||
pl = Pl()
|
||||
self.assertEqual(shell.last_pipe_status(pl=pl, segment_info=Args(last_pipe_status=[])), None)
|
||||
self.assertEqual(shell.last_pipe_status(pl=pl, segment_info=Args(last_pipe_status=[0, 0, 0])), None)
|
||||
self.assertEqual(shell.last_pipe_status(pl=pl, segment_info=Args(last_pipe_status=[0, 2, 0])),
|
||||
segment_info = {'args': Args(last_pipe_status=[])}
|
||||
self.assertEqual(shell.last_pipe_status(pl=pl, segment_info=segment_info), None)
|
||||
segment_info['args'].last_pipe_status = [0, 0, 0]
|
||||
self.assertEqual(shell.last_pipe_status(pl=pl, segment_info=segment_info), None)
|
||||
segment_info['args'].last_pipe_status = [0, 2, 0]
|
||||
self.assertEqual(shell.last_pipe_status(pl=pl, segment_info=segment_info),
|
||||
[{'contents': '0', 'highlight_group': 'exit_success'},
|
||||
{'contents': '2', 'highlight_group': 'exit_fail'},
|
||||
{'contents': '0', 'highlight_group': 'exit_success'}])
|
||||
|
@ -30,77 +37,93 @@ class TestShell(TestCase):
|
|||
|
||||
class TestCommon(TestCase):
|
||||
def test_hostname(self):
|
||||
with replace_env('SSH_CLIENT', '192.168.0.12 40921 22') as pl:
|
||||
pl = Pl()
|
||||
with replace_env('SSH_CLIENT', '192.168.0.12 40921 22') as segment_info:
|
||||
with replace_module_module(common, 'socket', gethostname=lambda: 'abc'):
|
||||
self.assertEqual(common.hostname(pl=pl), 'abc')
|
||||
self.assertEqual(common.hostname(pl=pl, only_if_ssh=True), 'abc')
|
||||
pl.environ.pop('SSH_CLIENT')
|
||||
self.assertEqual(common.hostname(pl=pl), 'abc')
|
||||
self.assertEqual(common.hostname(pl=pl, only_if_ssh=True), None)
|
||||
self.assertEqual(common.hostname(pl=pl, segment_info=segment_info), 'abc')
|
||||
self.assertEqual(common.hostname(pl=pl, segment_info=segment_info, only_if_ssh=True), 'abc')
|
||||
segment_info['environ'].pop('SSH_CLIENT')
|
||||
self.assertEqual(common.hostname(pl=pl, segment_info=segment_info), 'abc')
|
||||
self.assertEqual(common.hostname(pl=pl, segment_info=segment_info, only_if_ssh=True), None)
|
||||
|
||||
def test_user(self):
|
||||
new_os = new_module('os', getpid=lambda: 1)
|
||||
new_psutil = new_module('psutil', Process=lambda pid: Args(username='def'))
|
||||
with replace_env('USER', 'def') as pl:
|
||||
pl = Pl()
|
||||
with replace_env('USER', 'def') as segment_info:
|
||||
with replace_attr(common, 'os', new_os):
|
||||
with replace_attr(common, 'psutil', new_psutil):
|
||||
with replace_attr(common, '_geteuid', lambda: 5):
|
||||
self.assertEqual(common.user(pl=pl), [{'contents': 'def', 'highlight_group': 'user'}])
|
||||
self.assertEqual(common.user(pl=pl, segment_info=segment_info), [
|
||||
{'contents': 'def', 'highlight_group': 'user'}
|
||||
])
|
||||
with replace_attr(common, '_geteuid', lambda: 0):
|
||||
self.assertEqual(common.user(pl=pl), [{'contents': 'def', 'highlight_group': ['superuser', 'user']}])
|
||||
self.assertEqual(common.user(pl=pl, segment_info=segment_info), [
|
||||
{'contents': 'def', 'highlight_group': ['superuser', 'user']}
|
||||
])
|
||||
|
||||
def test_branch(self):
|
||||
pl = Pl()
|
||||
pl._cwd = os.getcwd()
|
||||
segment_info = {'getcwd': os.getcwd}
|
||||
with replace_attr(common, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: None, directory='/tmp/tests')):
|
||||
self.assertEqual(common.branch(pl=pl, status_colors=False), 'tests')
|
||||
self.assertEqual(common.branch(pl=pl, status_colors=True),
|
||||
self.assertEqual(common.branch(pl=pl, segment_info=segment_info, status_colors=False), 'tests')
|
||||
self.assertEqual(common.branch(pl=pl, segment_info=segment_info, status_colors=True),
|
||||
[{'contents': 'tests', 'highlight_group': ['branch_clean', 'branch']}])
|
||||
with replace_attr(common, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: 'D ', directory='/tmp/tests')):
|
||||
self.assertEqual(common.branch(pl=pl, status_colors=False), 'tests')
|
||||
self.assertEqual(common.branch(pl=pl, status_colors=True),
|
||||
self.assertEqual(common.branch(pl=pl, segment_info=segment_info, status_colors=False), 'tests')
|
||||
self.assertEqual(common.branch(pl=pl, segment_info=segment_info, status_colors=True),
|
||||
[{'contents': 'tests', 'highlight_group': ['branch_dirty', 'branch']}])
|
||||
self.assertEqual(common.branch(pl=pl), 'tests')
|
||||
self.assertEqual(common.branch(pl=pl, segment_info=segment_info), 'tests')
|
||||
with replace_attr(common, 'guess', lambda path: None):
|
||||
self.assertEqual(common.branch(pl=pl), None)
|
||||
self.assertEqual(common.branch(pl=pl, segment_info=segment_info), None)
|
||||
|
||||
def test_cwd(self):
|
||||
new_os = new_module('os', path=os.path, sep='/')
|
||||
pl = Pl()
|
||||
cwd = [None]
|
||||
|
||||
def getcwd():
|
||||
wd = cwd[0]
|
||||
if isinstance(wd, Exception):
|
||||
raise wd
|
||||
else:
|
||||
return wd
|
||||
|
||||
segment_info = {'getcwd': getcwd, 'home': None}
|
||||
with replace_attr(common, 'os', new_os):
|
||||
pl._cwd = '/abc/def/ghi/foo/bar'
|
||||
self.assertEqual(common.cwd(pl=pl),
|
||||
cwd[0] = '/abc/def/ghi/foo/bar'
|
||||
self.assertEqual(common.cwd(pl=pl, segment_info=segment_info),
|
||||
[{'contents': '/', 'divider_highlight_group': 'cwd:divider'},
|
||||
{'contents': 'abc', 'divider_highlight_group': 'cwd:divider'},
|
||||
{'contents': 'def', 'divider_highlight_group': 'cwd:divider'},
|
||||
{'contents': 'ghi', 'divider_highlight_group': 'cwd:divider'},
|
||||
{'contents': 'foo', 'divider_highlight_group': 'cwd:divider'},
|
||||
{'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}])
|
||||
pl.home = '/abc/def/ghi'
|
||||
self.assertEqual(common.cwd(pl=pl),
|
||||
segment_info['home'] = '/abc/def/ghi'
|
||||
self.assertEqual(common.cwd(pl=pl, segment_info=segment_info),
|
||||
[{'contents': '~', 'divider_highlight_group': 'cwd:divider'},
|
||||
{'contents': 'foo', 'divider_highlight_group': 'cwd:divider'},
|
||||
{'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}])
|
||||
self.assertEqual(common.cwd(pl=pl, dir_limit_depth=3),
|
||||
self.assertEqual(common.cwd(pl=pl, segment_info=segment_info, dir_limit_depth=3),
|
||||
[{'contents': '~', 'divider_highlight_group': 'cwd:divider'},
|
||||
{'contents': 'foo', 'divider_highlight_group': 'cwd:divider'},
|
||||
{'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}])
|
||||
self.assertEqual(common.cwd(pl=pl, dir_limit_depth=1),
|
||||
self.assertEqual(common.cwd(pl=pl, segment_info=segment_info, dir_limit_depth=1),
|
||||
[{'contents': '⋯', 'divider_highlight_group': 'cwd:divider'},
|
||||
{'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}])
|
||||
self.assertEqual(common.cwd(pl=pl, dir_limit_depth=2, dir_shorten_len=2),
|
||||
self.assertEqual(common.cwd(pl=pl, segment_info=segment_info, dir_limit_depth=2, dir_shorten_len=2),
|
||||
[{'contents': '~', 'divider_highlight_group': 'cwd:divider'},
|
||||
{'contents': 'fo', 'divider_highlight_group': 'cwd:divider'},
|
||||
{'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}])
|
||||
ose = OSError()
|
||||
ose.errno = 2
|
||||
pl._cwd = ose
|
||||
self.assertEqual(common.cwd(pl=pl, dir_limit_depth=2, dir_shorten_len=2),
|
||||
cwd[0] = ose
|
||||
self.assertEqual(common.cwd(pl=pl, segment_info=segment_info, dir_limit_depth=2, dir_shorten_len=2),
|
||||
[{'contents': '[not found]', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}])
|
||||
pl._cwd = OSError()
|
||||
self.assertRaises(OSError, common.cwd, pl=pl, dir_limit_depth=2, dir_shorten_len=2)
|
||||
pl._cwd = ValueError()
|
||||
self.assertRaises(ValueError, common.cwd, pl=pl, dir_limit_depth=2, dir_shorten_len=2)
|
||||
cwd[0] = OSError()
|
||||
self.assertRaises(OSError, common.cwd, pl=pl, segment_info=segment_info, dir_limit_depth=2, dir_shorten_len=2)
|
||||
cwd[0] = ValueError()
|
||||
self.assertRaises(ValueError, common.cwd, pl=pl, segment_info=segment_info, dir_limit_depth=2, dir_shorten_len=2)
|
||||
|
||||
def test_date(self):
|
||||
pl = Pl()
|
||||
|
@ -143,35 +166,35 @@ class TestCommon(TestCase):
|
|||
self.assertEqual(common.weather(pl=pl), [
|
||||
{'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': '☁ '},
|
||||
{'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '-9°C', 'gradient_level': 30.0}
|
||||
])
|
||||
])
|
||||
self.assertEqual(common.weather(pl=pl, temp_coldest=0, temp_hottest=100), [
|
||||
{'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': '☁ '},
|
||||
{'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '-9°C', 'gradient_level': 0}
|
||||
])
|
||||
])
|
||||
self.assertEqual(common.weather(pl=pl, temp_coldest=-100, temp_hottest=-50), [
|
||||
{'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': '☁ '},
|
||||
{'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '-9°C', 'gradient_level': 100}
|
||||
])
|
||||
])
|
||||
self.assertEqual(common.weather(pl=pl, icons={'cloudy': 'o'}), [
|
||||
{'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': 'o '},
|
||||
{'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '-9°C', 'gradient_level': 30.0}
|
||||
])
|
||||
])
|
||||
self.assertEqual(common.weather(pl=pl, icons={'partly_cloudy_day': 'x'}), [
|
||||
{'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': 'x '},
|
||||
{'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '-9°C', 'gradient_level': 30.0}
|
||||
])
|
||||
])
|
||||
self.assertEqual(common.weather(pl=pl, unit='F'), [
|
||||
{'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': '☁ '},
|
||||
{'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '16°F', 'gradient_level': 30.0}
|
||||
])
|
||||
])
|
||||
self.assertEqual(common.weather(pl=pl, unit='K'), [
|
||||
{'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': '☁ '},
|
||||
{'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '264K', 'gradient_level': 30.0}
|
||||
])
|
||||
])
|
||||
self.assertEqual(common.weather(pl=pl, temp_format='{temp:.1e}C'), [
|
||||
{'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': '☁ '},
|
||||
{'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '-9.0e+00C', 'gradient_level': 30.0}
|
||||
])
|
||||
])
|
||||
|
||||
def test_system_load(self):
|
||||
pl = Pl()
|
||||
|
@ -206,59 +229,62 @@ class TestCommon(TestCase):
|
|||
|
||||
with replace_attr(common, '_get_bytes', _get_bytes):
|
||||
common.network_load.startup(pl=pl)
|
||||
self.assertEqual(common.network_load(pl=pl, interface='eth0'), None)
|
||||
common.network_load.sleep(0)
|
||||
self.assertEqual(common.network_load(pl=pl, interface='eth0'), None)
|
||||
while 'prev' not in common.network_load.interfaces.get('eth0', {}):
|
||||
sleep(0.1)
|
||||
self.assertEqual(common.network_load(pl=pl, interface='eth0'), None)
|
||||
try:
|
||||
self.assertEqual(common.network_load(pl=pl, interface='eth0'), None)
|
||||
sleep(common.network_load.interval)
|
||||
self.assertEqual(common.network_load(pl=pl, interface='eth0'), None)
|
||||
while 'prev' not in common.network_load.interfaces.get('eth0', {}):
|
||||
sleep(0.1)
|
||||
self.assertEqual(common.network_load(pl=pl, interface='eth0'), None)
|
||||
|
||||
l = [0, 0]
|
||||
l = [0, 0]
|
||||
|
||||
def gb2(interface):
|
||||
l[0] += 1200
|
||||
l[1] += 2400
|
||||
return tuple(l)
|
||||
f[0] = gb2
|
||||
def gb2(interface):
|
||||
l[0] += 1200
|
||||
l[1] += 2400
|
||||
return tuple(l)
|
||||
f[0] = gb2
|
||||
|
||||
while not common.network_load.interfaces.get('eth0', {}).get('prev', (None, None))[1]:
|
||||
sleep(0.1)
|
||||
self.assertEqual(common.network_load(pl=pl, interface='eth0'), [
|
||||
{'divider_highlight_group': 'background:divider', 'contents': '⬇ 1 KiB/s', 'highlight_group': ['network_load_recv', 'network_load']},
|
||||
{'divider_highlight_group': 'background:divider', 'contents': '⬆ 2 KiB/s', 'highlight_group': ['network_load_sent', 'network_load']},
|
||||
while not common.network_load.interfaces.get('eth0', {}).get('prev', (None, None))[1]:
|
||||
sleep(0.1)
|
||||
self.assertEqual(common.network_load(pl=pl, interface='eth0'), [
|
||||
{'divider_highlight_group': 'background:divider', 'contents': '⬇ 1 KiB/s', 'highlight_group': ['network_load_recv', 'network_load']},
|
||||
{'divider_highlight_group': 'background:divider', 'contents': '⬆ 2 KiB/s', 'highlight_group': ['network_load_sent', 'network_load']},
|
||||
])
|
||||
self.assertEqual(common.network_load(pl=pl, interface='eth0', recv_format='r {value}', sent_format='s {value}'), [
|
||||
{'divider_highlight_group': 'background:divider', 'contents': 'r 1 KiB/s', 'highlight_group': ['network_load_recv', 'network_load']},
|
||||
{'divider_highlight_group': 'background:divider', 'contents': 's 2 KiB/s', 'highlight_group': ['network_load_sent', 'network_load']},
|
||||
self.assertEqual(common.network_load(pl=pl, interface='eth0', recv_format='r {value}', sent_format='s {value}'), [
|
||||
{'divider_highlight_group': 'background:divider', 'contents': 'r 1 KiB/s', 'highlight_group': ['network_load_recv', 'network_load']},
|
||||
{'divider_highlight_group': 'background:divider', 'contents': 's 2 KiB/s', 'highlight_group': ['network_load_sent', 'network_load']},
|
||||
])
|
||||
self.assertEqual(common.network_load(pl=pl, recv_format='r {value}', sent_format='s {value}', suffix='bps', interface='eth0'), [
|
||||
{'divider_highlight_group': 'background:divider', 'contents': 'r 1 Kibps', 'highlight_group': ['network_load_recv', 'network_load']},
|
||||
{'divider_highlight_group': 'background:divider', 'contents': 's 2 Kibps', 'highlight_group': ['network_load_sent', 'network_load']},
|
||||
self.assertEqual(common.network_load(pl=pl, recv_format='r {value}', sent_format='s {value}', suffix='bps', interface='eth0'), [
|
||||
{'divider_highlight_group': 'background:divider', 'contents': 'r 1 Kibps', 'highlight_group': ['network_load_recv', 'network_load']},
|
||||
{'divider_highlight_group': 'background:divider', 'contents': 's 2 Kibps', 'highlight_group': ['network_load_sent', 'network_load']},
|
||||
])
|
||||
self.assertEqual(common.network_load(pl=pl, recv_format='r {value}', sent_format='s {value}', si_prefix=True, interface='eth0'), [
|
||||
{'divider_highlight_group': 'background:divider', 'contents': 'r 1 kB/s', 'highlight_group': ['network_load_recv', 'network_load']},
|
||||
{'divider_highlight_group': 'background:divider', 'contents': 's 2 kB/s', 'highlight_group': ['network_load_sent', 'network_load']},
|
||||
self.assertEqual(common.network_load(pl=pl, recv_format='r {value}', sent_format='s {value}', si_prefix=True, interface='eth0'), [
|
||||
{'divider_highlight_group': 'background:divider', 'contents': 'r 1 kB/s', 'highlight_group': ['network_load_recv', 'network_load']},
|
||||
{'divider_highlight_group': 'background:divider', 'contents': 's 2 kB/s', 'highlight_group': ['network_load_sent', 'network_load']},
|
||||
])
|
||||
self.assertEqual(common.network_load(pl=pl, recv_format='r {value}', sent_format='s {value}', recv_max=0, interface='eth0'), [
|
||||
{'divider_highlight_group': 'background:divider', 'contents': 'r 1 KiB/s', 'highlight_group': ['network_load_recv_gradient', 'network_load_gradient', 'network_load_recv', 'network_load'], 'gradient_level': 100},
|
||||
{'divider_highlight_group': 'background:divider', 'contents': 's 2 KiB/s', 'highlight_group': ['network_load_sent', 'network_load']},
|
||||
self.assertEqual(common.network_load(pl=pl, recv_format='r {value}', sent_format='s {value}', recv_max=0, interface='eth0'), [
|
||||
{'divider_highlight_group': 'background:divider', 'contents': 'r 1 KiB/s', 'highlight_group': ['network_load_recv_gradient', 'network_load_gradient', 'network_load_recv', 'network_load'], 'gradient_level': 100},
|
||||
{'divider_highlight_group': 'background:divider', 'contents': 's 2 KiB/s', 'highlight_group': ['network_load_sent', 'network_load']},
|
||||
])
|
||||
|
||||
class ApproxEqual(object):
|
||||
def __eq__(self, i):
|
||||
return abs(i - 50.0) < 1
|
||||
class ApproxEqual(object):
|
||||
def __eq__(self, i):
|
||||
return abs(i - 50.0) < 1
|
||||
|
||||
self.assertEqual(common.network_load(pl=pl, recv_format='r {value}', sent_format='s {value}', sent_max=4800, interface='eth0'), [
|
||||
{'divider_highlight_group': 'background:divider', 'contents': 'r 1 KiB/s', 'highlight_group': ['network_load_recv', 'network_load']},
|
||||
{'divider_highlight_group': 'background:divider', 'contents': 's 2 KiB/s', 'highlight_group': ['network_load_sent_gradient', 'network_load_gradient', 'network_load_sent', 'network_load'], 'gradient_level': ApproxEqual()},
|
||||
self.assertEqual(common.network_load(pl=pl, recv_format='r {value}', sent_format='s {value}', sent_max=4800, interface='eth0'), [
|
||||
{'divider_highlight_group': 'background:divider', 'contents': 'r 1 KiB/s', 'highlight_group': ['network_load_recv', 'network_load']},
|
||||
{'divider_highlight_group': 'background:divider', 'contents': 's 2 KiB/s', 'highlight_group': ['network_load_sent_gradient', 'network_load_gradient', 'network_load_sent', 'network_load'], 'gradient_level': ApproxEqual()},
|
||||
])
|
||||
common.network_load.shutdown()
|
||||
finally:
|
||||
common.network_load.shutdown()
|
||||
|
||||
def test_virtualenv(self):
|
||||
with replace_env('VIRTUAL_ENV', '/abc/def/ghi') as pl:
|
||||
self.assertEqual(common.virtualenv(pl=pl), 'ghi')
|
||||
pl.environ.pop('VIRTUAL_ENV')
|
||||
self.assertEqual(common.virtualenv(pl=pl), None)
|
||||
pl = Pl()
|
||||
with replace_env('VIRTUAL_ENV', '/abc/def/ghi') as segment_info:
|
||||
self.assertEqual(common.virtualenv(pl=pl, segment_info=segment_info), 'ghi')
|
||||
segment_info['environ'].pop('VIRTUAL_ENV')
|
||||
self.assertEqual(common.virtualenv(pl=pl, segment_info=segment_info), None)
|
||||
|
||||
def test_email_imap_alert(self):
|
||||
# TODO
|
||||
|
|
Loading…
Reference in New Issue