Add required pl argument to segments

Fixes #340
Ref #330
This commit is contained in:
ZyX 2013-03-23 17:51:02 +04:00
parent cc1c982696
commit ed435f8063
17 changed files with 510 additions and 298 deletions

View File

@ -155,6 +155,17 @@ Common configuration is a subdictionary that is a value of ``common`` key in
:ref:`module segment option <config-themes-seg-module>`. Paths defined here :ref:`module segment option <config-themes-seg-module>`. Paths defined here
have priority when searching for modules. have priority when searching for modules.
``log_file``
Defines path which will hold powerline logs. If not present, logging will be
done to stderr.
``log_level``
String, determines logging level. Defaults to ``WARNING``.
``log_format``
String, determines format of the log messages. Defaults to
``'%(asctime)s:%(level)s:%(message)s'``.
Extension-specific configuration Extension-specific configuration
-------------------------------- --------------------------------

View File

@ -4,6 +4,7 @@ from __future__ import absolute_import
import json import json
import os import os
import sys import sys
import logging
from powerline.colorscheme import Colorscheme from powerline.colorscheme import Colorscheme
@ -25,6 +26,42 @@ def load_json_config(search_paths, config_file, load=json.load, open_file=open_f
raise IOError('Config file not found in search path: {0}'.format(config_file)) raise IOError('Config file not found in search path: {0}'.format(config_file))
class PowerlineState(object):
def __init__(self, logger, environ, getcwd, home):
self.environ = environ
self.getcwd = getcwd
self.home = home or environ.get('HOME', None)
self.logger = logger
self.prefix = None
self.last_msgs = {}
def _log(self, attr, msg, *args, **kwargs):
prefix = kwargs.get('prefix') or self.prefix
msg = ((prefix + ':') if prefix else '') + msg.format(*args, **kwargs)
key = attr+':'+prefix
if msg != self.last_msgs.get(key):
getattr(self.logger, attr)(msg)
self.last_msgs[key] = msg
def critical(self, msg, *args, **kwargs):
self._log('critical', msg, *args, **kwargs)
def exception(self, msg, *args, **kwargs):
self._log('exception', msg, *args, **kwargs)
def info(self, msg, *args, **kwargs):
self._log('info', msg, *args, **kwargs)
def error(self, msg, *args, **kwargs):
self._log('error', msg, *args, **kwargs)
def warn(self, msg, *args, **kwargs):
self._log('warning', msg, *args, **kwargs)
def debug(self, msg, *args, **kwargs):
self._log('debug', msg, *args, **kwargs)
class Powerline(object): class Powerline(object):
'''Main powerline class, entrance point for all powerline uses. Sets '''Main powerline class, entrance point for all powerline uses. Sets
powerline up and loads the configuration. powerline up and loads the configuration.
@ -37,9 +74,29 @@ class Powerline(object):
:param str renderer_module: :param str renderer_module:
Overrides renderer module (defaults to ``ext``). Should be the name of Overrides renderer module (defaults to ``ext``). Should be the name of
the package imported like this: ``powerline.renders.{render_module}``. the package imported like this: ``powerline.renders.{render_module}``.
:param bool run_once:
Determines whether .renderer.render() method will be run only once
during python session.
:param Logger logger:
If present, no new logger will be created and this logger will be used.
:param dict environ:
Object with ``.__getitem__`` and ``.get`` methods used to obtain
environment variables. Defaults to ``os.environ``.
:param func getcwd:
Function used to get current working directory. Defaults to
``os.getcwdu`` or ``os.getcwd``.
:param str home:
Home directory. Defaults to ``environ.get('HOME')``.
''' '''
def __init__(self, ext, renderer_module=None, run_once=False): def __init__(self,
ext,
renderer_module=None,
run_once=False,
logger=None,
environ=os.environ,
getcwd=getattr(os, 'getcwdu', os.getcwd),
home=None):
self.config_paths = self.get_config_paths() self.config_paths = self.get_config_paths()
# Load main config file # Load main config file
@ -78,7 +135,40 @@ class Powerline(object):
'tmux_escape': common_config.get('additional_escapes') == 'tmux', 'tmux_escape': common_config.get('additional_escapes') == 'tmux',
'screen_escape': common_config.get('additional_escapes') == 'screen', 'screen_escape': common_config.get('additional_escapes') == 'screen',
} }
self.renderer = Renderer(theme_config, local_themes, theme_kwargs, colorscheme, **options)
# Create logger
if not logger:
log_format = common_config.get('format', '%(asctime)s:%(level)s:%(message)s')
formatter = logging.Formatter(log_format)
level = getattr(logging, common_config.get('log_level', 'WARNING'))
handler = self.get_log_handler(common_config)
handler.setLevel(level)
logger = logging.getLogger('powerline')
logger.setLevel(level)
logger.addHandler(handler)
pl = PowerlineState(logger=logger, environ=environ, getcwd=getcwd, home=home)
self.renderer = Renderer(theme_config, local_themes, theme_kwargs, colorscheme, pl, **options)
def get_log_handler(self, common_config):
'''Get log handler.
:param dict common_config:
Common configuration.
:return: logging.Handler subclass.
'''
log_file = common_config.get('file', None)
if log_file:
log_dir = os.path.dirname(log_file)
if not os.path.isdir(log_dir):
os.mkdir(log_dir)
return logging.FileHandler(log_file)
else:
return logging.StreamHandler()
@staticmethod @staticmethod
def get_config_paths(): def get_config_paths():

View File

@ -45,6 +45,22 @@ class Args(object):
return None return None
class Environment(object):
@staticmethod
def __getitem__(key):
try:
return zsh.getvalue(key)
except IndexError as e:
raise KeyError(*e.args)
@staticmethod
def get(key, default=None):
try:
return zsh.getvalue(key)
except IndexError:
return default
class Prompt(object): class Prompt(object):
__slots__ = ('render', 'side', 'savedpsvar', 'savedps') __slots__ = ('render', 'side', 'savedpsvar', 'savedps')
@ -53,9 +69,10 @@ class Prompt(object):
self.side = side self.side = side
self.savedpsvar = savedpsvar self.savedpsvar = savedpsvar
self.savedps = savedps self.savedps = savedps
self.args = powerline.args
def __str__(self): def __str__(self):
return self.render(width=zsh.columns(), side=self.side).encode('utf-8') return self.render(width=zsh.columns(), side=self.side, segment_info=args).encode('utf-8')
def __del__(self): def __del__(self):
if self.savedps: if self.savedps:
@ -71,6 +88,6 @@ def set_prompt(powerline, psvar, side):
def setup(): def setup():
powerline = ShellPowerline(Args()) powerline = ShellPowerline(Args(), environ=Environment(), getcwd=lambda: zsh.getvalue('PWD'))
set_prompt(powerline, 'PS1', 'left') set_prompt(powerline, 'PS1', 'left')
set_prompt(powerline, 'RPS1', 'right') set_prompt(powerline, 'RPS1', 'right')

View File

@ -23,15 +23,24 @@ class ThreadedSegment(object):
self.did_set_interval = False self.did_set_interval = False
self.thread = None self.thread = None
def __call__(self, **kwargs): def __call__(self, update_first=True, **kwargs):
if self.run_once: if self.run_once:
self.pl = kwargs['pl']
self.set_state(**kwargs) self.set_state(**kwargs)
self.update() self.update()
elif not self.is_alive(): elif not self.is_alive():
self.startup(**kwargs) # Without this we will not have to wait long until receiving bug “I
# opened vim, but branch information is only shown after I move
# cursor”.
#
# If running once .update() is called in __call__.
if update_first and self.update_first:
self.update_first = False
self.update()
self.start()
with self.write_lock: with self.write_lock:
return self.render(**kwargs) return self.render(update_first=update_first, **kwargs)
def is_alive(self): def is_alive(self):
return self.thread and self.thread.is_alive() return self.thread and self.thread.is_alive()
@ -49,7 +58,10 @@ class ThreadedSegment(object):
start_time = monotonic() start_time = monotonic()
with self.update_lock: with self.update_lock:
try:
self.update() self.update()
except Exception as e:
self.error('Exception while updating: {0}', str(e))
self.sleep(monotonic() - start_time) self.sleep(monotonic() - start_time)
@ -67,28 +79,27 @@ class ThreadedSegment(object):
self.interval = interval self.interval = interval
self.has_set_interval = True self.has_set_interval = True
def set_state(self, update_first=True, interval=None, **kwargs): def set_state(self, interval=None, **kwargs):
if not self.did_set_interval or interval: if not self.did_set_interval or interval:
self.set_interval(interval) self.set_interval(interval)
# Without this we will not have to wait long until receiving bug “I
# opened vim, but branch information is only shown after I move cursor”.
#
# If running once .update() is called in __call__.
if update_first and self.update_first and not self.run_once:
self.update_first = False
self.update()
def startup(self, **kwargs): def startup(self, pl, **kwargs):
# Normally .update() succeeds to run before value is requested, meaning
# that user is getting values he needs directly at vim startup. Without
# .startup() we will not have to wait long until receiving bug “I opened
# vim, but branch information is only shown after I move cursor”.
self.run_once = False self.run_once = False
self.pl = pl
if not self.is_alive(): if not self.is_alive():
self.set_state(**kwargs) self.set_state(**kwargs)
self.start() self.start()
def error(self, *args, **kwargs):
self.pl.error(prefix=self.__class__.__name__, *args, **kwargs)
def warn(self, *args, **kwargs):
self.pl.warn(prefix=self.__class__.__name__, *args, **kwargs)
def debug(self, *args, **kwargs):
self.pl.debug(prefix=self.__class__.__name__, *args, **kwargs)
def printed(func): def printed(func):
def f(*args, **kwargs): def f(*args, **kwargs):
@ -109,12 +120,14 @@ class KwThreadedSegment(ThreadedSegment):
def key(**kwargs): def key(**kwargs):
return frozenset(kwargs.items()) return frozenset(kwargs.items())
def render(self, **kwargs): def render(self, update_first, **kwargs):
key = self.key(**kwargs) key = self.key(**kwargs)
try: try:
update_state = self.queries[key][1] update_state = self.queries[key][1]
except KeyError: except KeyError:
update_state = self.compute_state(key) if self.update_first or self.run_once else None # Allow only to forbid to compute missing values: in either user
# configuration or in subclasses.
update_state = self.compute_state(key) if update_first and self.update_first or self.run_once else None
# No locks: render method is already running with write_lock acquired. # No locks: render method is already running with write_lock acquired.
self.queries[key] = (monotonic(), update_state) self.queries[key] = (monotonic(), update_state)
return self.render_one(update_state, **kwargs) return self.render_one(update_state, **kwargs)
@ -124,7 +137,10 @@ class KwThreadedSegment(ThreadedSegment):
removes = [] removes = []
for key, (last_query_time, state) in list(self.queries.items()): for key, (last_query_time, state) in list(self.queries.items()):
if last_query_time < monotonic() < last_query_time + self.drop_interval: if last_query_time < monotonic() < last_query_time + self.drop_interval:
try:
updates[key] = (last_query_time, self.compute_state(key)) updates[key] = (last_query_time, self.compute_state(key))
except Exception as e:
self.error('Exception while computing state for {0}: {1}', repr(key), str(e))
else: else:
removes.append(key) removes.append(key)
with self.write_lock: with self.write_lock:
@ -132,19 +148,13 @@ class KwThreadedSegment(ThreadedSegment):
for key in removes: for key in removes:
self.queries.pop(key) self.queries.pop(key)
def set_state(self, update_first=True, interval=None, **kwargs): def set_state(self, interval=None, **kwargs):
if not self.did_set_interval or (interval < self.interval): if not self.did_set_interval or (interval < self.interval):
self.set_interval(interval) self.set_interval(interval)
# Allow only to forbid to compute missing values: in either user
# configuration or in subclasses.
if self.update_first: if self.update_first:
self.update_first = update_first self.update_first = update_first
key = self.key(**kwargs)
if not self.run_once and key not in self.queries:
self.queries[key] = (monotonic(), self.compute_state(key) if self.update_first else None)
@staticmethod @staticmethod
def render_one(update_state, **kwargs): def render_one(update_state, **kwargs):
return update_state return update_state

View File

@ -8,6 +8,7 @@ import os
import re import re
from collections import defaultdict from collections import defaultdict
from copy import copy from copy import copy
import logging
try: try:
@ -408,6 +409,11 @@ main_spec = (Spec(
# only for existence of the path, not for it being a directory # only for existence of the path, not for it being a directory
paths=Spec().list((lambda value, *args: (True, True, not os.path.exists(value.value))), paths=Spec().list((lambda value, *args: (True, True, not os.path.exists(value.value))),
lambda value: 'path does not exist: {0}'.format(value)).optional(), lambda value: 'path does not exist: {0}'.format(value)).optional(),
log_file=Spec().type(str).func(lambda value, *args: (True, True, not os.path.isdir(os.path.dirname(value))),
lambda value: 'directory does not exist: {0}'.format(os.path.dirname(value))).optional(),
log_level=Spec().re('^[A-Z]+$').func(lambda value, *args: (True, True, not hasattr(logging, value)),
lambda value: 'unknown debugging level {0}'.format(value)).optional(),
log_format=Spec().type(str).optional(),
).context_message('Error while loading common configuration (key {key})'), ).context_message('Error while loading common configuration (key {key})'),
ext=Spec( ext=Spec(
vim=Spec( vim=Spec(
@ -786,6 +792,11 @@ def check_segment_data_key(key, data, context, echoerr):
return True, False, False return True, False, False
# FIXME More checks, limit existing to ThreadedSegment instances only
args_spec = Spec(
interval=Spec().either(Spec().type(float), Spec().type(int)).optional(),
update_first=Spec().type(bool).optional(),
).unknown_spec(Spec(), Spec()).optional().copy
highlight_group_spec = Spec().type(unicode).copy highlight_group_spec = Spec().type(unicode).copy
segment_module_spec = Spec().type(unicode).func(check_segment_module).optional().copy segment_module_spec = Spec().type(unicode).func(check_segment_module).optional().copy
segments_spec = Spec().optional().list( segments_spec = Spec().optional().list(
@ -801,8 +812,7 @@ segments_spec = Spec().optional().list(
before=Spec().type(unicode).optional(), before=Spec().type(unicode).optional(),
width=Spec().either(Spec().unsigned(), Spec().cmp('eq', 'auto')).optional(), width=Spec().either(Spec().unsigned(), Spec().cmp('eq', 'auto')).optional(),
align=Spec().oneof(set('lr')).optional(), align=Spec().oneof(set('lr')).optional(),
# FIXME Check args args=args_spec(),
args=Spec().type(dict).optional(),
contents=Spec().type(unicode).optional(), contents=Spec().type(unicode).optional(),
highlight_group=Spec().list( highlight_group=Spec().list(
highlight_group_spec().re('^(?:(?!:divider$).)+$', highlight_group_spec().re('^(?:(?!:divider$).)+$',
@ -819,8 +829,7 @@ theme_spec = (Spec(
Spec( Spec(
after=Spec().type(unicode).optional(), after=Spec().type(unicode).optional(),
before=Spec().type(unicode).optional(), before=Spec().type(unicode).optional(),
# FIXME Check args args=args_spec(),
args=Spec().type(dict).optional(),
contents=Spec().type(unicode).optional(), contents=Spec().type(unicode).optional(),
), ),
).optional().context_message('Error while loading segment data (key {key})'), ).optional().context_message('Error while loading segment data (key {key})'),

View File

@ -18,9 +18,11 @@ def construct_returned_value(rendered_highlighted, segments, output_raw):
class Renderer(object): class Renderer(object):
def __init__(self, theme_config, local_themes, theme_kwargs, colorscheme, **options): def __init__(self, theme_config, local_themes, theme_kwargs, colorscheme, pl, **options):
self.__dict__.update(options) self.__dict__.update(options)
self.theme_config = theme_config self.theme_config = theme_config
theme_kwargs['pl'] = pl
self.pl = pl
self.theme = Theme(theme_config=theme_config, **theme_kwargs) self.theme = Theme(theme_config=theme_config, **theme_kwargs)
self.local_themes = local_themes self.local_themes = local_themes
self.theme_kwargs = theme_kwargs self.theme_kwargs = theme_kwargs

View File

@ -66,12 +66,7 @@ class VimRenderer(Renderer):
return vim.strwidth(string) return vim.strwidth(string)
def render(self, window_id, winidx, current): def render(self, window_id, winidx, current):
'''Render all segments. '''Render all segments.'''
This method handles replacing of the percent placeholder for vim
statuslines, and it caches segment contents which are retrieved and
used in non-current windows.
'''
if current: if current:
mode = vim_mode(1) mode = vim_mode(1)
mode = mode_translations.get(mode, mode) mode = mode_translations.get(mode, mode)

View File

@ -64,6 +64,7 @@ def gen_segment_getter(ext, path, theme_configs, default_module=None):
contents, contents_func, module = get_segment_info(data, segment) contents, contents_func, module = get_segment_info(data, segment)
highlight_group = segment_type != 'function' and segment.get('highlight_group') or segment.get('name') highlight_group = segment_type != 'function' and segment.get('highlight_group') or segment.get('name')
return { return {
'name': segment.get('name'),
'type': segment_type, 'type': segment_type,
'highlight_group': highlight_group, 'highlight_group': highlight_group,
'divider_highlight_group': None, 'divider_highlight_group': None,

View File

@ -18,13 +18,13 @@ from powerline.lib.humanize_bytes import humanize_bytes
from collections import namedtuple from collections import namedtuple
def hostname(only_if_ssh=False): def hostname(pl, only_if_ssh=False):
'''Return the current hostname. '''Return the current hostname.
:param bool only_if_ssh: :param bool only_if_ssh:
only return the hostname if currently in an SSH session only return the hostname if currently in an SSH session
''' '''
if only_if_ssh and not os.environ.get('SSH_CLIENT'): if only_if_ssh and not pl.environ.get('SSH_CLIENT'):
return None return None
return socket.gethostname() return socket.gethostname()
@ -35,8 +35,8 @@ class RepositorySegment(KwThreadedSegment):
self.directories = {} self.directories = {}
@staticmethod @staticmethod
def key(**kwargs): def key(pl, **kwargs):
return os.path.abspath(os.getcwd()) return os.path.abspath(pl.getcwd())
def update(self): def update(self):
# .compute_state() is running only in this method, and only in one # .compute_state() is running only in this method, and only in one
@ -82,16 +82,16 @@ class BranchSegment(RepositorySegment):
if branch and status_colors: if branch and status_colors:
return [{ return [{
'contents': branch, 'contents': branch,
'highlight_group': ['branch_dirty' if repository_status() else 'branch_clean', 'branch'], 'highlight_group': ['branch_dirty' if repository_status(**kwargs) else 'branch_clean', 'branch'],
}] }]
else: else:
return branch return branch
def startup(self, status_colors=False, **kwargs): def startup(self, status_colors=False, **kwargs):
super(BranchSegment, self).startup() super(BranchSegment, self).startup(**kwargs)
if status_colors: if status_colors:
self.started_repository_status = True self.started_repository_status = True
repository_status.startup() repository_status.startup(**kwargs)
def shutdown(self): def shutdown(self):
if self.started_repository_status: if self.started_repository_status:
@ -109,7 +109,7 @@ Highlight groups used: ``branch_clean``, ``branch_dirty``, ``branch``.
''') ''')
def cwd(dir_shorten_len=None, dir_limit_depth=None): def cwd(pl, dir_shorten_len=None, dir_limit_depth=None):
'''Return the current working directory. '''Return the current working directory.
Returns a segment list to create a breadcrumb-like effect. Returns a segment list to create a breadcrumb-like effect.
@ -126,18 +126,16 @@ def cwd(dir_shorten_len=None, dir_limit_depth=None):
''' '''
import re import re
try: try:
try: cwd = pl.getcwd()
cwd = os.getcwdu()
except AttributeError:
cwd = os.getcwd()
except OSError as e: except OSError as e:
if e.errno == 2: if e.errno == 2:
# user most probably deleted the directory # user most probably deleted the directory
# this happens when removing files from Mercurial repos for example # this happens when removing files from Mercurial repos for example
pl.warn('Current directory not found')
cwd = "[not found]" cwd = "[not found]"
else: else:
raise raise
home = os.environ.get('HOME') home = pl.home
if home: if home:
cwd = re.sub('^' + re.escape(home), '~', cwd, 1) cwd = re.sub('^' + re.escape(home), '~', cwd, 1)
cwd_split = cwd.split(os.sep) cwd_split = cwd.split(os.sep)
@ -160,7 +158,7 @@ def cwd(dir_shorten_len=None, dir_limit_depth=None):
return ret return ret
def date(format='%Y-%m-%d', istime=False): def date(pl, format='%Y-%m-%d', istime=False):
'''Return the current date. '''Return the current date.
:param str format: :param str format:
@ -177,7 +175,7 @@ def date(format='%Y-%m-%d', istime=False):
}] }]
def fuzzy_time(): def fuzzy_time(pl):
'''Display the current time as fuzzy time, e.g. "quarter past six".''' '''Display the current time as fuzzy time, e.g. "quarter past six".'''
hour_str = ['twelve', 'one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine', 'ten', 'eleven'] hour_str = ['twelve', 'one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine', 'ten', 'eleven']
minute_str = { minute_str = {
@ -241,7 +239,7 @@ class ExternalIpSegment(ThreadedSegment):
with self.write_lock: with self.write_lock:
self.ip = ip self.ip = ip
def render(self): def render(self, **kwargs):
if not hasattr(self, 'ip'): if not hasattr(self, 'ip'):
return None return None
return [{'contents': self.ip, 'divider_highlight_group': 'background:divider'}] return [{'contents': self.ip, 'divider_highlight_group': 'background:divider'}]
@ -373,7 +371,8 @@ class WeatherSegment(ThreadedSegment):
self.location = ','.join([location_data['city'], self.location = ','.join([location_data['city'],
location_data['region_name'], location_data['region_name'],
location_data['country_name']]) location_data['country_name']])
except (TypeError, ValueError): except (TypeError, ValueError) as e:
self.error('Failed to get location: {0}', str(e))
return return
query_data = { query_data = {
'q': 'q':
@ -389,13 +388,19 @@ class WeatherSegment(ThreadedSegment):
condition = response['query']['results']['weather']['rss']['channel']['item']['condition'] condition = response['query']['results']['weather']['rss']['channel']['item']['condition']
condition_code = int(condition['code']) condition_code = int(condition['code'])
temp = float(condition['temp']) temp = float(condition['temp'])
except (KeyError, TypeError, ValueError): except (KeyError, TypeError, ValueError) as e:
self.error('Failed to get weather conditions: {0}', str(e))
return return
try: try:
icon_names = weather_conditions_codes[condition_code] icon_names = weather_conditions_codes[condition_code]
except IndexError: except IndexError as e:
icon_names = (('not_available' if condition_code == 3200 else 'unknown'),) if condition_code == 3200:
icon_names = ('not_available',)
self.warn('Weather is not available for location {0}', self.location)
else:
icon_names = ('unknown',)
self.error('Unknown condition code: {0}', condition_code)
with self.write_lock: with self.write_lock:
self.temp = temp self.temp = temp
@ -472,7 +477,7 @@ Also uses ``weather_conditions_{condition}`` for all weather conditions supporte
''') ''')
def system_load(format='{avg:.1f}', threshold_good=1, threshold_bad=2): def system_load(pl, format='{avg:.1f}', threshold_good=1, threshold_bad=2):
'''Return system load average. '''Return system load average.
Highlights using ``system_load_good``, ``system_load_bad`` and Highlights using ``system_load_good``, ``system_load_bad`` and
@ -500,6 +505,7 @@ def system_load(format='{avg:.1f}', threshold_good=1, threshold_bad=2):
try: try:
cpu_num = cpu_count() cpu_num = cpu_count()
except NotImplementedError: except NotImplementedError:
pl.warn('Unable to get CPU count: method is not implemented')
return None return None
ret = [] ret = []
for avg in os.getloadavg(): for avg in os.getloadavg():
@ -539,10 +545,10 @@ try:
if data: if data:
yield interface, data.bytes_recv, data.bytes_sent yield interface, data.bytes_recv, data.bytes_sent
def _get_user(): def _get_user(pl):
return psutil.Process(os.getpid()).username return psutil.Process(os.getpid()).username
def cpu_load_percent(measure_interval=.5): def cpu_load_percent(pl, measure_interval=.5):
'''Return the average CPU load as a percentage. '''Return the average CPU load as a percentage.
Requires the ``psutil`` module. Requires the ``psutil`` module.
@ -569,10 +575,10 @@ except ImportError:
if x is not None: if x is not None:
yield interface, x[0], x[1] yield interface, x[0], x[1]
def _get_user(): # NOQA def _get_user(pl): # NOQA
return os.environ.get('USER', None) return pl.environ.get('USER', None)
def cpu_load_percent(measure_interval=.5): # NOQA def cpu_load_percent(pl, measure_interval=.5): # NOQA
'''Return the average CPU load as a percentage. '''Return the average CPU load as a percentage.
Requires the ``psutil`` module. Requires the ``psutil`` module.
@ -580,13 +586,14 @@ except ImportError:
:param float measure_interval: :param float measure_interval:
interval used to measure CPU load (in seconds) interval used to measure CPU load (in seconds)
''' '''
pl.warn('psutil package is not installed, thus CPU load is not available')
return None return None
username = False username = False
def user(): def user(pl):
'''Return the current user. '''Return the current user.
Highlights the user with the ``superuser`` if the effective user ID is 0. Highlights the user with the ``superuser`` if the effective user ID is 0.
@ -595,8 +602,9 @@ def user():
''' '''
global username global username
if username is False: if username is False:
username = _get_user() username = _get_user(pl)
if username is None: if username is None:
pl.warn('Failed to get username')
return None return None
try: try:
euid = os.geteuid() euid = os.geteuid()
@ -625,7 +633,7 @@ else:
@add_divider_highlight_group('background:divider') @add_divider_highlight_group('background:divider')
def uptime(format='{days}d {hours:02d}h {minutes:02d}m'): def uptime(pl, format='{days}d {hours:02d}h {minutes:02d}m'):
'''Return system uptime. '''Return system uptime.
:param str format: :param str format:
@ -636,7 +644,11 @@ def uptime(format='{days}d {hours:02d}h {minutes:02d}m'):
''' '''
try: try:
seconds = _get_uptime() seconds = _get_uptime()
except (IOError, NotImplementedError): except IOError as e:
pl.error('Failed to get uptime: {0}', e)
return None
except NotImplementedError:
pl.warn('Unable to get uptime. You should install psutil package')
return None return None
minutes, seconds = divmod(seconds, 60) minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60) hours, minutes = divmod(minutes, 60)
@ -705,7 +717,10 @@ class NetworkLoadSegment(KwThreadedSegment):
t2, b2 = idata['last'] t2, b2 = idata['last']
measure_interval = t2 - t1 measure_interval = t2 - t1
if None in (b1, b2) or measure_interval == 0: if None in (b1, b2):
return None
if measure_interval == 0:
self.error('Measure interval is zero. This should not happen')
return None return None
r = [] r = []
@ -762,9 +777,9 @@ Highlight groups used: ``network_load_sent_gradient`` (gradient) or ``network_lo
''') ''')
def virtualenv(): def virtualenv(pl):
'''Return the name of the current Python virtualenv.''' '''Return the name of the current Python virtualenv.'''
return os.path.basename(os.environ.get('VIRTUAL_ENV', '')) or None return os.path.basename(pl.environ.get('VIRTUAL_ENV', '')) or None
_IMAPKey = namedtuple('Key', 'username password server port folder') _IMAPKey = namedtuple('Key', 'username password server port folder')
@ -774,12 +789,12 @@ class EmailIMAPSegment(KwThreadedSegment):
interval = 60 interval = 60
@staticmethod @staticmethod
def key(username, password, server='imap.gmail.com', port=993, folder='INBOX'): def key(username, password, server='imap.gmail.com', port=993, folder='INBOX', **kwargs):
return _IMAPKey(username, password, server, port, folder) return _IMAPKey(username, password, server, port, folder)
@staticmethod def compute_state(self, key):
def compute_state(key):
if not key.username or not key.password: if not key.username or not key.password:
self.warn('Username and password are not configured')
return None return None
try: try:
import imaplib import imaplib
@ -789,8 +804,6 @@ class EmailIMAPSegment(KwThreadedSegment):
rc, message = mail.status(key.folder, '(UNSEEN)') rc, message = mail.status(key.folder, '(UNSEEN)')
unread_str = message[0].decode('utf-8') unread_str = message[0].decode('utf-8')
unread_count = int(re.search('UNSEEN (\d+)', unread_str).group(1)) unread_count = int(re.search('UNSEEN (\d+)', unread_str).group(1))
except socket.gaierror:
return None
except imaplib.IMAP4.error as e: except imaplib.IMAP4.error as e:
unread_count = str(e) unread_count = str(e)
return unread_count return unread_count
@ -842,7 +855,7 @@ class NowPlayingSegment(object):
'stop': '', 'stop': '',
} }
def __call__(self, player='mpd', format='{state_symbol} {artist} - {title} ({total})', *args, **kwargs): def __call__(self, player='mpd', format='{state_symbol} {artist} - {title} ({total})', **kwargs):
player_func = getattr(self, 'player_{0}'.format(player)) player_func = getattr(self, 'player_{0}'.format(player))
stats = { stats = {
'state': None, 'state': None,
@ -853,7 +866,7 @@ class NowPlayingSegment(object):
'elapsed': None, 'elapsed': None,
'total': None, 'total': None,
} }
func_stats = player_func(*args, **kwargs) func_stats = player_func(**kwargs)
if not func_stats: if not func_stats:
return None return None
stats.update(func_stats) stats.update(func_stats)
@ -884,7 +897,7 @@ class NowPlayingSegment(object):
def _convert_seconds(seconds): def _convert_seconds(seconds):
return '{0:.0f}:{1:02.0f}'.format(*divmod(float(seconds), 60)) return '{0:.0f}:{1:02.0f}'.format(*divmod(float(seconds), 60))
def player_cmus(self): def player_cmus(self, pl):
'''Return cmus player information. '''Return cmus player information.
cmus-remote -Q returns data with multi-level information i.e. cmus-remote -Q returns data with multi-level information i.e.
@ -922,7 +935,7 @@ class NowPlayingSegment(object):
'total': self._convert_seconds(now_playing.get('duration', 0)), 'total': self._convert_seconds(now_playing.get('duration', 0)),
} }
def player_mpd(self, host='localhost', port=6600): def player_mpd(self, pl, host='localhost', port=6600):
try: try:
import mpd import mpd
client = mpd.MPDClient() client = mpd.MPDClient()
@ -954,7 +967,7 @@ class NowPlayingSegment(object):
'total': now_playing[3], 'total': now_playing[3],
} }
def player_spotify(self): def player_spotify(self, pl):
try: try:
import dbus import dbus
except ImportError: except ImportError:
@ -982,7 +995,7 @@ class NowPlayingSegment(object):
'total': self._convert_seconds(info.get('mpris:length') / 1e6), 'total': self._convert_seconds(info.get('mpris:length') / 1e6),
} }
def player_rhythmbox(self): def player_rhythmbox(self, pl):
now_playing = self._run_cmd(['rhythmbox-client', '--no-start', '--no-present', '--print-playing-format', '%at\n%aa\n%tt\n%te\n%td']) now_playing = self._run_cmd(['rhythmbox-client', '--no-start', '--no-present', '--print-playing-format', '%at\n%aa\n%tt\n%te\n%td'])
if not now_playing: if not now_playing:
return return

View File

@ -3,5 +3,5 @@
from powerline.theme import requires_segment_info from powerline.theme import requires_segment_info
@requires_segment_info @requires_segment_info
def prompt_count(segment_info): def prompt_count(pl, segment_info):
return str(segment_info.prompt_count) return str(segment_info.prompt_count)

View File

@ -4,7 +4,7 @@ from powerline.theme import requires_segment_info
@requires_segment_info @requires_segment_info
def last_status(segment_info): def last_status(pl, segment_info):
'''Return last exit code. '''Return last exit code.
Highlight groups used: ``exit_fail`` Highlight groups used: ``exit_fail``
@ -15,7 +15,7 @@ def last_status(segment_info):
@requires_segment_info @requires_segment_info
def last_pipe_status(segment_info): def last_pipe_status(pl, segment_info):
'''Return last pipe status. '''Return last pipe status.
Highlight groups used: ``exit_fail``, ``exit_success`` Highlight groups used: ``exit_fail``, ``exit_success``

View File

@ -94,7 +94,7 @@ def window_cached(func):
@requires_segment_info @requires_segment_info
def mode(segment_info, override=None): def mode(pl, segment_info, override=None):
'''Return the current vim mode. '''Return the current vim mode.
:param dict override: :param dict override:
@ -112,7 +112,7 @@ def mode(segment_info, override=None):
@requires_segment_info @requires_segment_info
def modified_indicator(segment_info, text='+'): def modified_indicator(pl, segment_info, text='+'):
'''Return a file modified indicator. '''Return a file modified indicator.
:param string text: :param string text:
@ -122,7 +122,7 @@ def modified_indicator(segment_info, text='+'):
@requires_segment_info @requires_segment_info
def paste_indicator(segment_info, text='PASTE'): def paste_indicator(pl, segment_info, text='PASTE'):
'''Return a paste mode indicator. '''Return a paste mode indicator.
:param string text: :param string text:
@ -132,7 +132,7 @@ def paste_indicator(segment_info, text='PASTE'):
@requires_segment_info @requires_segment_info
def readonly_indicator(segment_info, text=''): def readonly_indicator(pl, segment_info, text=''):
'''Return a read-only indicator. '''Return a read-only indicator.
:param string text: :param string text:
@ -142,7 +142,7 @@ def readonly_indicator(segment_info, text=''):
@requires_segment_info @requires_segment_info
def file_directory(segment_info, shorten_user=True, shorten_cwd=True, shorten_home=False): def file_directory(pl, segment_info, shorten_user=True, shorten_cwd=True, shorten_home=False):
'''Return file directory (head component of the file path). '''Return file directory (head component of the file path).
:param bool shorten_user: :param bool shorten_user:
@ -165,13 +165,15 @@ def file_directory(segment_info, shorten_user=True, shorten_cwd=True, shorten_ho
@requires_segment_info @requires_segment_info
def file_name(segment_info, display_no_file=False, no_file_text='[No file]'): def file_name(pl, segment_info, display_no_file=False, no_file_text='[No file]'):
'''Return file name (tail component of the file path). '''Return file name (tail component of the file path).
:param bool display_no_file: :param bool display_no_file:
display a string if the buffer is missing a file name display a string if the buffer is missing a file name
:param str no_file_text: :param str no_file_text:
the string to display if the buffer is missing a file name the string to display if the buffer is missing a file name
Highlight groups used: ``file_name_no_file`` or ``file_name``, ``file_name``.
''' '''
name = segment_info['buffer'].name name = segment_info['buffer'].name
if not name: if not name:
@ -187,7 +189,7 @@ def file_name(segment_info, display_no_file=False, no_file_text='[No file]'):
@window_cached @window_cached
def file_size(suffix='B', si_prefix=False): def file_size(pl, suffix='B', si_prefix=False):
'''Return file size in &encoding. '''Return file size in &encoding.
:param str suffix: :param str suffix:
@ -204,7 +206,7 @@ def file_size(suffix='B', si_prefix=False):
@requires_segment_info @requires_segment_info
@add_divider_highlight_group('background:divider') @add_divider_highlight_group('background:divider')
def file_format(segment_info): def file_format(pl, segment_info):
'''Return file format (i.e. line ending type). '''Return file format (i.e. line ending type).
:return: file format or None if unknown or missing file format :return: file format or None if unknown or missing file format
@ -216,7 +218,7 @@ def file_format(segment_info):
@requires_segment_info @requires_segment_info
@add_divider_highlight_group('background:divider') @add_divider_highlight_group('background:divider')
def file_encoding(segment_info): def file_encoding(pl, segment_info):
'''Return file encoding/character set. '''Return file encoding/character set.
:return: file encoding/character set or None if unknown or missing file encoding :return: file encoding/character set or None if unknown or missing file encoding
@ -228,7 +230,7 @@ def file_encoding(segment_info):
@requires_segment_info @requires_segment_info
@add_divider_highlight_group('background:divider') @add_divider_highlight_group('background:divider')
def file_type(segment_info): def file_type(pl, segment_info):
'''Return file type. '''Return file type.
:return: file type or None if unknown file type :return: file type or None if unknown file type
@ -239,7 +241,7 @@ def file_type(segment_info):
@requires_segment_info @requires_segment_info
def line_percent(segment_info, gradient=False): def line_percent(pl, segment_info, gradient=False):
'''Return the cursor position in the file as a percentage. '''Return the cursor position in the file as a percentage.
:param bool gradient: :param bool gradient:
@ -260,20 +262,20 @@ def line_percent(segment_info, gradient=False):
@requires_segment_info @requires_segment_info
def line_current(segment_info): def line_current(pl, segment_info):
'''Return the current cursor line.''' '''Return the current cursor line.'''
return str(segment_info['window'].cursor[0]) return str(segment_info['window'].cursor[0])
@requires_segment_info @requires_segment_info
def col_current(segment_info): def col_current(pl, segment_info):
'''Return the current cursor column. '''Return the current cursor column.
''' '''
return str(segment_info['window'].cursor[1] + 1) return str(segment_info['window'].cursor[1] + 1)
@window_cached @window_cached
def virtcol_current(): def virtcol_current(pl):
'''Return current visual column with concealed characters ingored '''Return current visual column with concealed characters ingored
Highlight groups used: ``virtcol_current`` or ``col_current``. Highlight groups used: ``virtcol_current`` or ``col_current``.
@ -282,7 +284,7 @@ def virtcol_current():
'highlight_group': ['virtcol_current', 'col_current']}] 'highlight_group': ['virtcol_current', 'col_current']}]
def modified_buffers(text='+ ', join_str=','): def modified_buffers(pl, text='+ ', join_str=','):
'''Return a comma-separated list of modified buffers. '''Return a comma-separated list of modified buffers.
:param str text: :param str text:
@ -366,16 +368,16 @@ class BranchSegment(RepositorySegment):
return [{ return [{
'contents': update_state, 'contents': update_state,
'highlight_group': (['branch_dirty' if repository_status(segment_info=segment_info) else 'branch_clean'] 'highlight_group': (['branch_dirty' if repository_status(segment_info=segment_info, **kwargs) else 'branch_clean']
if status_colors else []) + ['branch'], if status_colors else []) + ['branch'],
'divider_highlight_group': 'branch:divider', 'divider_highlight_group': 'branch:divider',
}] }]
def startup(self, status_colors=False, **kwargs): def startup(self, status_colors=False, **kwargs):
super(BranchSegment, self).startup() super(BranchSegment, self).startup(**kwargs)
if status_colors: if status_colors:
self.started_repository_status = True self.started_repository_status = True
repository_status.startup() repository_status.startup(**kwargs)
def shutdown(self): def shutdown(self):
if self.started_repository_status: if self.started_repository_status:

View File

@ -19,9 +19,6 @@ class ShellPowerline(Powerline):
self.theme_option = mergeargs(args.theme_option) or {} self.theme_option = mergeargs(args.theme_option) or {}
super(ShellPowerline, self).__init__(args.ext[0], args.renderer_module, run_once=run_once) super(ShellPowerline, self).__init__(args.ext[0], args.renderer_module, run_once=run_once)
def get_segment_info(self):
return self.args
def load_main_config(self): def load_main_config(self):
r = super(ShellPowerline, self).load_main_config() r = super(ShellPowerline, self).load_main_config()
if self.args.config: if self.args.config:

View File

@ -24,7 +24,7 @@ def requires_segment_info(func):
class Theme(object): class Theme(object):
def __init__(self, ext, theme_config, common_config, top_theme_config=None, run_once=False): def __init__(self, ext, theme_config, common_config, pl, top_theme_config=None, run_once=False):
self.dividers = theme_config.get('dividers', common_config['dividers']) self.dividers = theme_config.get('dividers', common_config['dividers'])
self.spaces = theme_config.get('spaces', common_config['spaces']) self.spaces = theme_config.get('spaces', common_config['spaces'])
self.segments = { self.segments = {
@ -35,16 +35,22 @@ class Theme(object):
'contents': None, 'contents': None,
'highlight': {'fg': False, 'bg': False, 'attr': 0} 'highlight': {'fg': False, 'bg': False, 'attr': 0}
} }
self.pl = pl
theme_configs = [theme_config] theme_configs = [theme_config]
if top_theme_config: if top_theme_config:
theme_configs.append(top_theme_config) theme_configs.append(top_theme_config)
get_segment = gen_segment_getter(ext, common_config['paths'], theme_configs, theme_config.get('default_module')) get_segment = gen_segment_getter(ext, common_config['paths'], theme_configs, theme_config.get('default_module'))
for side in ['left', 'right']: for side in ['left', 'right']:
self.segments[side].extend((get_segment(segment, side) for segment in theme_config['segments'].get(side, []))) for segment in theme_config['segments'].get(side, []):
segment = get_segment(segment, side)
if not run_once: if not run_once:
for segment in self.segments[side]:
if segment['startup']: if segment['startup']:
segment['startup'](**segment['args']) try:
segment['startup'](pl=pl, **segment['args'])
except Exception as e:
pl.error('Exception during {0} startup: {1}', segment['name'], str(e))
continue
self.segments[side].append(segment)
def shutdown(self): def shutdown(self):
for segments in self.segments.values(): for segments in self.segments.values():
@ -71,11 +77,17 @@ class Theme(object):
parsed_segments = [] parsed_segments = []
for segment in self.segments[side]: for segment in self.segments[side]:
if segment['type'] == 'function': if segment['type'] == 'function':
self.pl.prefix = segment['name']
try:
if (hasattr(segment['contents_func'], 'powerline_requires_segment_info') if (hasattr(segment['contents_func'], 'powerline_requires_segment_info')
and segment['contents_func'].powerline_requires_segment_info): and segment['contents_func'].powerline_requires_segment_info):
contents = segment['contents_func'](segment_info=segment_info, **segment['args']) contents = segment['contents_func'](pl=self.pl, segment_info=segment_info, **segment['args'])
else: else:
contents = segment['contents_func'](**segment['args']) contents = segment['contents_func'](pl=self.pl, **segment['args'])
except Exception as e:
self.pl.exception('Exception while computing segment: {0}', str(e))
continue
if contents is None: if contents is None:
continue continue
if isinstance(contents, list): if isinstance(contents, list):

View File

@ -4,6 +4,28 @@ import sys
import os import os
class Pl(object):
def __init__(self):
self.errors = []
self.warns = []
self.debugs = []
self._cwd = None
self.prefix = None
self.environ = {}
self.home = None
def getcwd(self):
if isinstance(self._cwd, Exception):
raise self._cwd
else:
return self._cwd
for meth in ('error', 'warn', 'debug'):
exec ('''def {0}(self, msg, *args, **kwargs):
self.{0}s.append((kwargs.get('prefix') or self.prefix, msg, args, kwargs))
''').format(meth)
class Args(object): class Args(object):
theme_option = None theme_option = None
config = None config = None
@ -63,50 +85,58 @@ def new_module(name, **kwargs):
return module return module
class ModuleAttrReplace(object): class AttrReplace(object):
def __init__(self, module, attr, new): def __init__(self, obj, attr, new):
self.module = module self.obj = obj
self.attr = attr self.attr = attr
self.new = new self.new = new
def __enter__(self): def __enter__(self):
try: try:
self.old = getattr(self.module, self.attr) self.old = getattr(self.obj, self.attr)
except AttributeError: except AttributeError:
pass pass
setattr(self.module, self.attr, self.new) setattr(self.obj, self.attr, self.new)
def __exit__(self, *args): def __exit__(self, *args):
try: try:
setattr(self.module, self.attr, self.old) setattr(self.obj, self.attr, self.old)
except AttributeError: except AttributeError:
delattr(self.module, self.attr) delattr(self.obj, self.attr)
replace_module_attr = ModuleAttrReplace replace_attr = AttrReplace
def replace_module_module(module, name, **kwargs): def replace_module_module(module, name, **kwargs):
return replace_module_attr(module, name, new_module(name, **kwargs)) return replace_attr(module, name, new_module(name, **kwargs))
class EnvReplace(object): class ItemReplace(object):
def __init__(self, name, new): def __init__(self, d, key, new, r=None):
self.name = name self.key = key
self.new = new self.new = new
self.d = d
self.r = r
def __enter__(self): def __enter__(self):
self.old = os.environ.get(self.name) self.old = self.d.get(self.key)
os.environ[self.name] = self.new self.d[self.key] = self.new
return self.r
def __exit__(self, *args): def __exit__(self, *args):
if self.old is None: if self.old is None:
try: try:
os.environ.pop(self.name) self.d.pop(self.key)
except KeyError: except KeyError:
pass pass
else: else:
os.environ[self.name] = self.old self.d[self.key] = self.old
replace_env = EnvReplace def replace_env(key, new, d=None):
r = None
if not d:
r = Pl()
d = r.environ
return ItemReplace(d, key, new, r)

View File

@ -7,7 +7,7 @@ import tests.vim as vim_module
import sys import sys
import os import os
import json import json
from tests.lib import Args, urllib_read, replace_module_attr from tests.lib import Args, urllib_read, replace_attr
from tests import TestCase from tests import TestCase
@ -67,7 +67,7 @@ class TestConfig(TestCase):
from imp import reload from imp import reload
reload(common) reload(common)
from powerline.shell import ShellPowerline from powerline.shell import ShellPowerline
with replace_module_attr(common, 'urllib_read', urllib_read): with replace_attr(common, 'urllib_read', urllib_read):
powerline = ShellPowerline(Args(ext=['tmux']), run_once=False) powerline = ShellPowerline(Args(ext=['tmux']), run_once=False)
powerline.renderer.render() powerline.renderer.render()
powerline = ShellPowerline(Args(ext=['tmux']), run_once=False) powerline = ShellPowerline(Args(ext=['tmux']), run_once=False)
@ -112,7 +112,7 @@ class TestConfig(TestCase):
from imp import reload from imp import reload
reload(common) reload(common)
from powerline import Powerline from powerline import Powerline
with replace_module_attr(common, 'urllib_read', urllib_read): with replace_attr(common, 'urllib_read', urllib_read):
Powerline(ext='wm', renderer_module='pango_markup', run_once=True).renderer.render() Powerline(ext='wm', renderer_module='pango_markup', run_once=True).renderer.render()
reload(common) reload(common)

View File

@ -4,7 +4,7 @@ from powerline.segments import shell, common
import tests.vim as vim_module import tests.vim as vim_module
import sys import sys
import os import os
from tests.lib import Args, urllib_read, replace_module_attr, new_module, replace_module_module, replace_env from tests.lib import Args, urllib_read, replace_attr, new_module, replace_module_module, replace_env, Pl
from tests import TestCase from tests import TestCase
@ -13,14 +13,16 @@ vim = None
class TestShell(TestCase): class TestShell(TestCase):
def test_last_status(self): def test_last_status(self):
self.assertEqual(shell.last_status(Args(last_exit_code=10)), pl = Pl()
self.assertEqual(shell.last_status(pl=pl, segment_info=Args(last_exit_code=10)),
[{'contents': '10', 'highlight_group': 'exit_fail'}]) [{'contents': '10', 'highlight_group': 'exit_fail'}])
self.assertEqual(shell.last_status(Args(last_exit_code=None)), None) self.assertEqual(shell.last_status(pl=pl, segment_info=Args(last_exit_code=None)), None)
def test_last_pipe_status(self): def test_last_pipe_status(self):
self.assertEqual(shell.last_pipe_status(Args(last_pipe_status=[])), None) pl = Pl()
self.assertEqual(shell.last_pipe_status(Args(last_pipe_status=[0, 0, 0])), None) self.assertEqual(shell.last_pipe_status(pl=pl, segment_info=Args(last_pipe_status=[])), None)
self.assertEqual(shell.last_pipe_status(Args(last_pipe_status=[0, 2, 0])), self.assertEqual(shell.last_pipe_status(pl=pl, segment_info=Args(last_pipe_status=[0, 0, 0])), None)
self.assertEqual(shell.last_pipe_status(pl=pl, segment_info=Args(last_pipe_status=[0, 2, 0])),
[{'contents': '0', 'highlight_group': 'exit_success'}, [{'contents': '0', 'highlight_group': 'exit_success'},
{'contents': '2', 'highlight_group': 'exit_fail'}, {'contents': '2', 'highlight_group': 'exit_fail'},
{'contents': '0', 'highlight_group': 'exit_success'}]) {'contents': '0', 'highlight_group': 'exit_success'}])
@ -28,220 +30,227 @@ class TestShell(TestCase):
class TestCommon(TestCase): class TestCommon(TestCase):
def test_hostname(self): def test_hostname(self):
with replace_env('SSH_CLIENT', '192.168.0.12 40921 22'): with replace_env('SSH_CLIENT', '192.168.0.12 40921 22') as pl:
with replace_module_module(common, 'socket', gethostname=lambda: 'abc'): with replace_module_module(common, 'socket', gethostname=lambda: 'abc'):
self.assertEqual(common.hostname(), 'abc') self.assertEqual(common.hostname(pl=pl), 'abc')
self.assertEqual(common.hostname(only_if_ssh=True), 'abc') self.assertEqual(common.hostname(pl=pl, only_if_ssh=True), 'abc')
os.environ.pop('SSH_CLIENT') pl.environ.pop('SSH_CLIENT')
self.assertEqual(common.hostname(), 'abc') self.assertEqual(common.hostname(pl=pl), 'abc')
self.assertEqual(common.hostname(only_if_ssh=True), None) self.assertEqual(common.hostname(pl=pl, only_if_ssh=True), None)
def test_user(self): def test_user(self):
new_os = new_module('os', environ={'USER': 'def'}, getpid=lambda: 1) new_os = new_module('os', getpid=lambda: 1)
new_psutil = new_module('psutil', Process=lambda pid: Args(username='def')) new_psutil = new_module('psutil', Process=lambda pid: Args(username='def'))
with replace_module_attr(common, 'os', new_os): with replace_env('USER', 'def') as pl:
with replace_module_attr(common, 'psutil', new_psutil): with replace_attr(common, 'os', new_os):
self.assertEqual(common.user(), [{'contents': 'def', 'highlight_group': 'user'}]) with replace_attr(common, 'psutil', new_psutil):
self.assertEqual(common.user(pl=pl), [{'contents': 'def', 'highlight_group': 'user'}])
new_os.geteuid = lambda: 1 new_os.geteuid = lambda: 1
self.assertEqual(common.user(), [{'contents': 'def', 'highlight_group': 'user'}]) self.assertEqual(common.user(pl=pl), [{'contents': 'def', 'highlight_group': 'user'}])
new_os.geteuid = lambda: 0 new_os.geteuid = lambda: 0
self.assertEqual(common.user(), [{'contents': 'def', 'highlight_group': ['superuser', 'user']}]) self.assertEqual(common.user(pl=pl), [{'contents': 'def', 'highlight_group': ['superuser', 'user']}])
def test_branch(self): def test_branch(self):
with replace_module_attr(common, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: None, directory='/tmp/tests')): pl = Pl()
self.assertEqual(common.branch(status_colors=False), 'tests') pl._cwd = os.getcwd()
self.assertEqual(common.branch(status_colors=True), with replace_attr(common, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: None, directory='/tmp/tests')):
self.assertEqual(common.branch(pl=pl, status_colors=False), 'tests')
self.assertEqual(common.branch(pl=pl, status_colors=True),
[{'contents': 'tests', 'highlight_group': ['branch_clean', 'branch']}]) [{'contents': 'tests', 'highlight_group': ['branch_clean', 'branch']}])
with replace_module_attr(common, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: 'D ', directory='/tmp/tests')): with replace_attr(common, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: 'D ', directory='/tmp/tests')):
self.assertEqual(common.branch(status_colors=False), 'tests') self.assertEqual(common.branch(pl=pl, status_colors=False), 'tests')
self.assertEqual(common.branch(status_colors=True), self.assertEqual(common.branch(pl=pl, status_colors=True),
[{'contents': 'tests', 'highlight_group': ['branch_dirty', 'branch']}]) [{'contents': 'tests', 'highlight_group': ['branch_dirty', 'branch']}])
self.assertEqual(common.branch(), 'tests') self.assertEqual(common.branch(pl=pl), 'tests')
with replace_module_attr(common, 'guess', lambda path: None): with replace_attr(common, 'guess', lambda path: None):
self.assertEqual(common.branch(), None) self.assertEqual(common.branch(pl=pl), None)
def test_cwd(self): def test_cwd(self):
new_os = new_module('os', path=os.path, environ={}, sep='/') new_os = new_module('os', path=os.path, sep='/')
new_os.getcwd = lambda: '/abc/def/ghi/foo/bar' pl = Pl()
with replace_module_attr(common, 'os', new_os): with replace_attr(common, 'os', new_os):
self.assertEqual(common.cwd(), pl._cwd = '/abc/def/ghi/foo/bar'
self.assertEqual(common.cwd(pl=pl),
[{'contents': '/', 'divider_highlight_group': 'cwd:divider'}, [{'contents': '/', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'abc', 'divider_highlight_group': 'cwd:divider'}, {'contents': 'abc', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'def', 'divider_highlight_group': 'cwd:divider'}, {'contents': 'def', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'ghi', 'divider_highlight_group': 'cwd:divider'}, {'contents': 'ghi', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'foo', 'divider_highlight_group': 'cwd:divider'}, {'contents': 'foo', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}]) {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}])
new_os.getcwdu = lambda: '/abc/def/ghi/foo/bar' pl.home = '/abc/def/ghi'
self.assertEqual(common.cwd(), self.assertEqual(common.cwd(pl=pl),
[{'contents': '/', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'abc', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'def', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'ghi', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'foo', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}])
new_os.environ['HOME'] = '/abc/def/ghi'
self.assertEqual(common.cwd(),
[{'contents': '~', 'divider_highlight_group': 'cwd:divider'}, [{'contents': '~', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'foo', 'divider_highlight_group': 'cwd:divider'}, {'contents': 'foo', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}]) {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}])
self.assertEqual(common.cwd(dir_limit_depth=3), self.assertEqual(common.cwd(pl=pl, dir_limit_depth=3),
[{'contents': '~', 'divider_highlight_group': 'cwd:divider'}, [{'contents': '~', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'foo', 'divider_highlight_group': 'cwd:divider'}, {'contents': 'foo', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}]) {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}])
self.assertEqual(common.cwd(dir_limit_depth=1), self.assertEqual(common.cwd(pl=pl, dir_limit_depth=1),
[{'contents': '', 'divider_highlight_group': 'cwd:divider'}, [{'contents': '', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}]) {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}])
self.assertEqual(common.cwd(dir_limit_depth=2, dir_shorten_len=2), self.assertEqual(common.cwd(pl=pl, dir_limit_depth=2, dir_shorten_len=2),
[{'contents': '~', 'divider_highlight_group': 'cwd:divider'}, [{'contents': '~', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'fo', 'divider_highlight_group': 'cwd:divider'}, {'contents': 'fo', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}]) {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}])
ose = OSError() ose = OSError()
ose.errno = 2 ose.errno = 2
pl._cwd = ose
def raises(exc): self.assertEqual(common.cwd(pl=pl, dir_limit_depth=2, dir_shorten_len=2),
raise exc
new_os.getcwdu = lambda: raises(ose)
self.assertEqual(common.cwd(dir_limit_depth=2, dir_shorten_len=2),
[{'contents': '[not found]', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}]) [{'contents': '[not found]', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}])
new_os.getcwdu = lambda: raises(OSError()) pl._cwd = OSError()
self.assertRaises(OSError, common.cwd, tuple(), {'dir_limit_depth': 2, 'dir_shorten_len': 2}) self.assertRaises(OSError, common.cwd, pl=pl, dir_limit_depth=2, dir_shorten_len=2)
new_os.getcwdu = lambda: raises(ValueError()) pl._cwd = ValueError()
self.assertRaises(ValueError, common.cwd, tuple(), {'dir_limit_depth': 2, 'dir_shorten_len': 2}) self.assertRaises(ValueError, common.cwd, pl=pl, dir_limit_depth=2, dir_shorten_len=2)
def test_date(self): def test_date(self):
with replace_module_attr(common, 'datetime', Args(now=lambda: Args(strftime=lambda fmt: fmt))): pl = Pl()
self.assertEqual(common.date(), [{'contents': '%Y-%m-%d', 'highlight_group': ['date'], 'divider_highlight_group': None}]) with replace_attr(common, 'datetime', Args(now=lambda: Args(strftime=lambda fmt: fmt))):
self.assertEqual(common.date(format='%H:%M', istime=True), [{'contents': '%H:%M', 'highlight_group': ['time', 'date'], 'divider_highlight_group': 'time:divider'}]) self.assertEqual(common.date(pl=pl), [{'contents': '%Y-%m-%d', 'highlight_group': ['date'], 'divider_highlight_group': None}])
self.assertEqual(common.date(pl=pl, format='%H:%M', istime=True), [{'contents': '%H:%M', 'highlight_group': ['time', 'date'], 'divider_highlight_group': 'time:divider'}])
def test_fuzzy_time(self): def test_fuzzy_time(self):
time = Args(hour=0, minute=45) time = Args(hour=0, minute=45)
with replace_module_attr(common, 'datetime', Args(now=lambda: time)): pl = Pl()
self.assertEqual(common.fuzzy_time(), 'quarter to one') with replace_attr(common, 'datetime', Args(now=lambda: time)):
self.assertEqual(common.fuzzy_time(pl=pl), 'quarter to one')
time.hour = 23 time.hour = 23
time.minute = 59 time.minute = 59
self.assertEqual(common.fuzzy_time(), 'round about midnight') self.assertEqual(common.fuzzy_time(pl=pl), 'round about midnight')
time.minute = 33 time.minute = 33
self.assertEqual(common.fuzzy_time(), 'twenty-five to twelve') self.assertEqual(common.fuzzy_time(pl=pl), 'twenty-five to twelve')
time.minute = 60 time.minute = 60
self.assertEqual(common.fuzzy_time(), 'twelve o\'clock') self.assertEqual(common.fuzzy_time(pl=pl), 'twelve o\'clock')
def test_external_ip(self): def test_external_ip(self):
with replace_module_attr(common, 'urllib_read', urllib_read): pl = Pl()
self.assertEqual(common.external_ip(), [{'contents': '127.0.0.1', 'divider_highlight_group': 'background:divider'}]) with replace_attr(common, 'urllib_read', urllib_read):
self.assertEqual(common.external_ip(pl=pl), [{'contents': '127.0.0.1', 'divider_highlight_group': 'background:divider'}])
def test_uptime(self): def test_uptime(self):
with replace_module_attr(common, '_get_uptime', lambda: 65536): pl = Pl()
self.assertEqual(common.uptime(), [{'contents': '0d 18h 12m', 'divider_highlight_group': 'background:divider'}]) with replace_attr(common, '_get_uptime', lambda: 65536):
self.assertEqual(common.uptime(pl=pl), [{'contents': '0d 18h 12m', 'divider_highlight_group': 'background:divider'}])
def _get_uptime(): def _get_uptime():
raise NotImplementedError raise NotImplementedError
with replace_module_attr(common, '_get_uptime', _get_uptime): with replace_attr(common, '_get_uptime', _get_uptime):
self.assertEqual(common.uptime(), None) self.assertEqual(common.uptime(pl=pl), None)
def test_weather(self): def test_weather(self):
with replace_module_attr(common, 'urllib_read', urllib_read): pl = Pl()
self.assertEqual(common.weather(), [ with replace_attr(common, 'urllib_read', urllib_read):
self.assertEqual(common.weather(pl=pl), [
{'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': ''}, {'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': ''},
{'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '-9°C', 'gradient_level': 30.0} {'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '-9°C', 'gradient_level': 30.0}
]) ])
self.assertEqual(common.weather(temp_coldest=0, temp_hottest=100), [ self.assertEqual(common.weather(pl=pl, temp_coldest=0, temp_hottest=100), [
{'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': ''}, {'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': ''},
{'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '-9°C', 'gradient_level': 0} {'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '-9°C', 'gradient_level': 0}
]) ])
self.assertEqual(common.weather(temp_coldest=-100, temp_hottest=-50), [ self.assertEqual(common.weather(pl=pl, temp_coldest=-100, temp_hottest=-50), [
{'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': ''}, {'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': ''},
{'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '-9°C', 'gradient_level': 100} {'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '-9°C', 'gradient_level': 100}
]) ])
self.assertEqual(common.weather(icons={'cloudy': 'o'}), [ self.assertEqual(common.weather(pl=pl, icons={'cloudy': 'o'}), [
{'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': 'o '}, {'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': 'o '},
{'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '-9°C', 'gradient_level': 30.0} {'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '-9°C', 'gradient_level': 30.0}
]) ])
self.assertEqual(common.weather(icons={'partly_cloudy_day': 'x'}), [ self.assertEqual(common.weather(pl=pl, icons={'partly_cloudy_day': 'x'}), [
{'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': 'x '}, {'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': 'x '},
{'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '-9°C', 'gradient_level': 30.0} {'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '-9°C', 'gradient_level': 30.0}
]) ])
self.assertEqual(common.weather(unit='F'), [ self.assertEqual(common.weather(pl=pl, unit='F'), [
{'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': ''}, {'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': ''},
{'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '16°F', 'gradient_level': 30.0} {'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '16°F', 'gradient_level': 30.0}
]) ])
self.assertEqual(common.weather(unit='K'), [ self.assertEqual(common.weather(pl=pl, unit='K'), [
{'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': ''}, {'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': ''},
{'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '264K', 'gradient_level': 30.0} {'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '264K', 'gradient_level': 30.0}
]) ])
self.assertEqual(common.weather(temp_format='{temp:.1e}C'), [ self.assertEqual(common.weather(pl=pl, temp_format='{temp:.1e}C'), [
{'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': ''}, {'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': ''},
{'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '-9.0e+00C', 'gradient_level': 30.0} {'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '-9.0e+00C', 'gradient_level': 30.0}
]) ])
def test_system_load(self): def test_system_load(self):
pl = Pl()
with replace_module_module(common, 'os', getloadavg=lambda: (7.5, 3.5, 1.5)): with replace_module_module(common, 'os', getloadavg=lambda: (7.5, 3.5, 1.5)):
with replace_module_attr(common, 'cpu_count', lambda: 2): with replace_attr(common, 'cpu_count', lambda: 2):
self.assertEqual(common.system_load(), self.assertEqual(common.system_load(pl=pl),
[{'contents': '7.5 ', 'highlight_group': ['system_load_gradient', 'system_load'], 'draw_divider': True, 'divider_highlight_group': 'background:divider', 'gradient_level': 100}, [{'contents': '7.5 ', 'highlight_group': ['system_load_gradient', 'system_load'], 'draw_divider': True, 'divider_highlight_group': 'background:divider', 'gradient_level': 100},
{'contents': '3.5 ', 'highlight_group': ['system_load_gradient', 'system_load'], 'draw_divider': False, 'divider_highlight_group': 'background:divider', 'gradient_level': 75.0}, {'contents': '3.5 ', 'highlight_group': ['system_load_gradient', 'system_load'], 'draw_divider': False, 'divider_highlight_group': 'background:divider', 'gradient_level': 75.0},
{'contents': '1.5', 'highlight_group': ['system_load_gradient', 'system_load'], 'draw_divider': False, 'divider_highlight_group': 'background:divider', 'gradient_level': 0}]) {'contents': '1.5', 'highlight_group': ['system_load_gradient', 'system_load'], 'draw_divider': False, 'divider_highlight_group': 'background:divider', 'gradient_level': 0}])
self.assertEqual(common.system_load(format='{avg:.0f}', threshold_good=0, threshold_bad=1), self.assertEqual(common.system_load(pl=pl, format='{avg:.0f}', threshold_good=0, threshold_bad=1),
[{'contents': '8 ', 'highlight_group': ['system_load_gradient', 'system_load'], 'draw_divider': True, 'divider_highlight_group': 'background:divider', 'gradient_level': 100}, [{'contents': '8 ', 'highlight_group': ['system_load_gradient', 'system_load'], 'draw_divider': True, 'divider_highlight_group': 'background:divider', 'gradient_level': 100},
{'contents': '4 ', 'highlight_group': ['system_load_gradient', 'system_load'], 'draw_divider': False, 'divider_highlight_group': 'background:divider', 'gradient_level': 100}, {'contents': '4 ', 'highlight_group': ['system_load_gradient', 'system_load'], 'draw_divider': False, 'divider_highlight_group': 'background:divider', 'gradient_level': 100},
{'contents': '2', 'highlight_group': ['system_load_gradient', 'system_load'], 'draw_divider': False, 'divider_highlight_group': 'background:divider', 'gradient_level': 75.0}]) {'contents': '2', 'highlight_group': ['system_load_gradient', 'system_load'], 'draw_divider': False, 'divider_highlight_group': 'background:divider', 'gradient_level': 75.0}])
def test_cpu_load_percent(self): def test_cpu_load_percent(self):
pl = Pl()
with replace_module_module(common, 'psutil', cpu_percent=lambda **kwargs: 52.3): with replace_module_module(common, 'psutil', cpu_percent=lambda **kwargs: 52.3):
self.assertEqual(common.cpu_load_percent(), '52%') self.assertEqual(common.cpu_load_percent(pl=pl), '52%')
def test_network_load(self): def test_network_load(self):
def _get_bytes(interface): def gb(interface):
return None return None
with replace_module_attr(common, '_get_bytes', _get_bytes):
self.assertEqual(common.network_load(), None) f = [gb]
def _get_bytes(interface):
return f[0](interface)
pl = Pl()
with replace_attr(common, '_get_bytes', _get_bytes):
common.network_load.startup(pl=pl)
self.assertEqual(common.network_load(pl=pl), None)
common.network_load.sleep(0)
self.assertEqual(common.network_load(pl=pl), None)
l = [0, 0] l = [0, 0]
def _get_bytes2(interface): def gb2(interface):
l[0] += 1200 l[0] += 1200
l[1] += 2400 l[1] += 2400
return tuple(l) return tuple(l)
f[0] = gb2
from imp import reload
reload(common)
with replace_module_attr(common, '_get_bytes', _get_bytes2):
common.network_load.startup()
common.network_load.sleep(0) common.network_load.sleep(0)
common.network_load.sleep(0) common.network_load.sleep(0)
self.assertEqual(common.network_load(), [ self.assertEqual(common.network_load(pl=pl), [
{'divider_highlight_group': 'background:divider', 'contents': '⬇ 1 KiB/s', 'highlight_group': ['network_load_recv', 'network_load']}, {'divider_highlight_group': 'background:divider', 'contents': '⬇ 1 KiB/s', 'highlight_group': ['network_load_recv', 'network_load']},
{'divider_highlight_group': 'background:divider', 'contents': '⬆ 2 KiB/s', 'highlight_group': ['network_load_sent', 'network_load']}, {'divider_highlight_group': 'background:divider', 'contents': '⬆ 2 KiB/s', 'highlight_group': ['network_load_sent', 'network_load']},
]) ])
self.assertEqual(common.network_load(recv_format='r {value}', sent_format='s {value}'), [ self.assertEqual(common.network_load(pl=pl, recv_format='r {value}', sent_format='s {value}'), [
{'divider_highlight_group': 'background:divider', 'contents': 'r 1 KiB/s', 'highlight_group': ['network_load_recv', 'network_load']}, {'divider_highlight_group': 'background:divider', 'contents': 'r 1 KiB/s', 'highlight_group': ['network_load_recv', 'network_load']},
{'divider_highlight_group': 'background:divider', 'contents': 's 2 KiB/s', 'highlight_group': ['network_load_sent', 'network_load']}, {'divider_highlight_group': 'background:divider', 'contents': 's 2 KiB/s', 'highlight_group': ['network_load_sent', 'network_load']},
]) ])
self.assertEqual(common.network_load(recv_format='r {value}', sent_format='s {value}', suffix='bps'), [ self.assertEqual(common.network_load(pl=pl, recv_format='r {value}', sent_format='s {value}', suffix='bps'), [
{'divider_highlight_group': 'background:divider', 'contents': 'r 1 Kibps', 'highlight_group': ['network_load_recv', 'network_load']}, {'divider_highlight_group': 'background:divider', 'contents': 'r 1 Kibps', 'highlight_group': ['network_load_recv', 'network_load']},
{'divider_highlight_group': 'background:divider', 'contents': 's 2 Kibps', 'highlight_group': ['network_load_sent', 'network_load']}, {'divider_highlight_group': 'background:divider', 'contents': 's 2 Kibps', 'highlight_group': ['network_load_sent', 'network_load']},
]) ])
self.assertEqual(common.network_load(recv_format='r {value}', sent_format='s {value}', si_prefix=True), [ self.assertEqual(common.network_load(pl=pl, recv_format='r {value}', sent_format='s {value}', si_prefix=True), [
{'divider_highlight_group': 'background:divider', 'contents': 'r 1 kB/s', 'highlight_group': ['network_load_recv', 'network_load']}, {'divider_highlight_group': 'background:divider', 'contents': 'r 1 kB/s', 'highlight_group': ['network_load_recv', 'network_load']},
{'divider_highlight_group': 'background:divider', 'contents': 's 2 kB/s', 'highlight_group': ['network_load_sent', 'network_load']}, {'divider_highlight_group': 'background:divider', 'contents': 's 2 kB/s', 'highlight_group': ['network_load_sent', 'network_load']},
]) ])
self.assertEqual(common.network_load(recv_format='r {value}', sent_format='s {value}', recv_max=0), [ self.assertEqual(common.network_load(pl=pl, recv_format='r {value}', sent_format='s {value}', recv_max=0), [
{'divider_highlight_group': 'background:divider', 'contents': 'r 1 KiB/s', 'highlight_group': ['network_load_recv_gradient', 'network_load_gradient', 'network_load_recv', 'network_load'], 'gradient_level': 100}, {'divider_highlight_group': 'background:divider', 'contents': 'r 1 KiB/s', 'highlight_group': ['network_load_recv_gradient', 'network_load_gradient', 'network_load_recv', 'network_load'], 'gradient_level': 100},
{'divider_highlight_group': 'background:divider', 'contents': 's 2 KiB/s', 'highlight_group': ['network_load_sent', 'network_load']}, {'divider_highlight_group': 'background:divider', 'contents': 's 2 KiB/s', 'highlight_group': ['network_load_sent', 'network_load']},
]) ])
class ApproxEqual(object): class ApproxEqual(object):
def __eq__(self, i): def __eq__(self, i):
return abs(i - 50.0) < 1 return abs(i - 50.0) < 1
self.assertEqual(common.network_load(recv_format='r {value}', sent_format='s {value}', sent_max=4800), [ self.assertEqual(common.network_load(pl=pl, recv_format='r {value}', sent_format='s {value}', sent_max=4800), [
{'divider_highlight_group': 'background:divider', 'contents': 'r 1 KiB/s', 'highlight_group': ['network_load_recv', 'network_load']}, {'divider_highlight_group': 'background:divider', 'contents': 'r 1 KiB/s', 'highlight_group': ['network_load_recv', 'network_load']},
{'divider_highlight_group': 'background:divider', 'contents': 's 2 KiB/s', 'highlight_group': ['network_load_sent_gradient', 'network_load_gradient', 'network_load_sent', 'network_load'], 'gradient_level': ApproxEqual()}, {'divider_highlight_group': 'background:divider', 'contents': 's 2 KiB/s', 'highlight_group': ['network_load_sent_gradient', 'network_load_gradient', 'network_load_sent', 'network_load'], 'gradient_level': ApproxEqual()},
]) ])
def test_virtualenv(self): def test_virtualenv(self):
with replace_env('VIRTUAL_ENV', '/abc/def/ghi'): with replace_env('VIRTUAL_ENV', '/abc/def/ghi') as pl:
self.assertEqual(common.virtualenv(), 'ghi') self.assertEqual(common.virtualenv(pl=pl), 'ghi')
os.environ.pop('VIRTUAL_ENV') pl.environ.pop('VIRTUAL_ENV')
self.assertEqual(common.virtualenv(), None) self.assertEqual(common.virtualenv(pl=pl), None)
def test_email_imap_alert(self): def test_email_imap_alert(self):
# TODO # TODO
@ -254,131 +263,145 @@ class TestCommon(TestCase):
class TestVim(TestCase): class TestVim(TestCase):
def test_mode(self): def test_mode(self):
pl = Pl()
segment_info = vim_module._get_segment_info() segment_info = vim_module._get_segment_info()
self.assertEqual(vim.mode(segment_info=segment_info), 'NORMAL') self.assertEqual(vim.mode(pl=pl, segment_info=segment_info), 'NORMAL')
self.assertEqual(vim.mode(segment_info=segment_info, override={'i': 'INS'}), 'NORMAL') self.assertEqual(vim.mode(pl=pl, segment_info=segment_info, override={'i': 'INS'}), 'NORMAL')
self.assertEqual(vim.mode(segment_info=segment_info, override={'n': 'NORM'}), 'NORM') self.assertEqual(vim.mode(pl=pl, segment_info=segment_info, override={'n': 'NORM'}), 'NORM')
with vim_module._with('mode', 'i') as segment_info: with vim_module._with('mode', 'i') as segment_info:
self.assertEqual(vim.mode(segment_info=segment_info), 'INSERT') self.assertEqual(vim.mode(pl=pl, segment_info=segment_info), 'INSERT')
with vim_module._with('mode', chr(ord('V') - 0x40)) as segment_info: with vim_module._with('mode', chr(ord('V') - 0x40)) as segment_info:
self.assertEqual(vim.mode(segment_info=segment_info), 'V·BLCK') self.assertEqual(vim.mode(pl=pl, segment_info=segment_info), 'V·BLCK')
self.assertEqual(vim.mode(segment_info=segment_info, override={'^V': 'VBLK'}), 'VBLK') self.assertEqual(vim.mode(pl=pl, segment_info=segment_info, override={'^V': 'VBLK'}), 'VBLK')
def test_modified_indicator(self): def test_modified_indicator(self):
pl = Pl()
segment_info = vim_module._get_segment_info() segment_info = vim_module._get_segment_info()
self.assertEqual(vim.modified_indicator(segment_info=segment_info), None) self.assertEqual(vim.modified_indicator(pl=pl, segment_info=segment_info), None)
segment_info['buffer'][0] = 'abc' segment_info['buffer'][0] = 'abc'
try: try:
self.assertEqual(vim.modified_indicator(segment_info=segment_info), '+') self.assertEqual(vim.modified_indicator(pl=pl, segment_info=segment_info), '+')
self.assertEqual(vim.modified_indicator(segment_info=segment_info, text='-'), '-') self.assertEqual(vim.modified_indicator(pl=pl, segment_info=segment_info, text='-'), '-')
finally: finally:
vim_module._bw(segment_info['bufnr']) vim_module._bw(segment_info['bufnr'])
def test_paste_indicator(self): def test_paste_indicator(self):
pl = Pl()
segment_info = vim_module._get_segment_info() segment_info = vim_module._get_segment_info()
self.assertEqual(vim.paste_indicator(segment_info=segment_info), None) self.assertEqual(vim.paste_indicator(pl=pl, segment_info=segment_info), None)
with vim_module._with('options', paste=1): with vim_module._with('options', paste=1):
self.assertEqual(vim.paste_indicator(segment_info=segment_info), 'PASTE') self.assertEqual(vim.paste_indicator(pl=pl, segment_info=segment_info), 'PASTE')
self.assertEqual(vim.paste_indicator(segment_info=segment_info, text='P'), 'P') self.assertEqual(vim.paste_indicator(pl=pl, segment_info=segment_info, text='P'), 'P')
def test_readonly_indicator(self): def test_readonly_indicator(self):
pl = Pl()
segment_info = vim_module._get_segment_info() segment_info = vim_module._get_segment_info()
self.assertEqual(vim.readonly_indicator(segment_info=segment_info), None) self.assertEqual(vim.readonly_indicator(pl=pl, segment_info=segment_info), None)
with vim_module._with('bufoptions', readonly=1): with vim_module._with('bufoptions', readonly=1):
self.assertEqual(vim.readonly_indicator(segment_info=segment_info), '') self.assertEqual(vim.readonly_indicator(pl=pl, segment_info=segment_info), '')
self.assertEqual(vim.readonly_indicator(segment_info=segment_info, text='L'), 'L') self.assertEqual(vim.readonly_indicator(pl=pl, segment_info=segment_info, text='L'), 'L')
def test_file_directory(self): def test_file_directory(self):
pl = Pl()
segment_info = vim_module._get_segment_info() segment_info = vim_module._get_segment_info()
self.assertEqual(vim.file_directory(segment_info=segment_info), None) self.assertEqual(vim.file_directory(pl=pl, segment_info=segment_info), None)
with replace_env('HOME', '/home/foo'): with replace_env('HOME', '/home/foo', os.environ):
with vim_module._with('buffer', '/tmp/abc') as segment_info: with vim_module._with('buffer', '/tmp/abc') as segment_info:
self.assertEqual(vim.file_directory(segment_info=segment_info), '/tmp/') self.assertEqual(vim.file_directory(pl=pl, segment_info=segment_info), '/tmp/')
os.environ['HOME'] = '/tmp' os.environ['HOME'] = '/tmp'
self.assertEqual(vim.file_directory(segment_info=segment_info), '~/') self.assertEqual(vim.file_directory(pl=pl, segment_info=segment_info), '~/')
def test_file_name(self): def test_file_name(self):
pl = Pl()
segment_info = vim_module._get_segment_info() segment_info = vim_module._get_segment_info()
self.assertEqual(vim.file_name(segment_info=segment_info), None) self.assertEqual(vim.file_name(pl=pl, segment_info=segment_info), None)
self.assertEqual(vim.file_name(segment_info=segment_info, display_no_file=True), self.assertEqual(vim.file_name(pl=pl, segment_info=segment_info, display_no_file=True),
[{'contents': '[No file]', 'highlight_group': ['file_name_no_file', 'file_name']}]) [{'contents': '[No file]', 'highlight_group': ['file_name_no_file', 'file_name']}])
self.assertEqual(vim.file_name(segment_info=segment_info, display_no_file=True, no_file_text='X'), self.assertEqual(vim.file_name(pl=pl, segment_info=segment_info, display_no_file=True, no_file_text='X'),
[{'contents': 'X', 'highlight_group': ['file_name_no_file', 'file_name']}]) [{'contents': 'X', 'highlight_group': ['file_name_no_file', 'file_name']}])
with vim_module._with('buffer', '/tmp/abc') as segment_info: with vim_module._with('buffer', '/tmp/abc') as segment_info:
self.assertEqual(vim.file_name(segment_info=segment_info), 'abc') self.assertEqual(vim.file_name(pl=pl, segment_info=segment_info), 'abc')
with vim_module._with('buffer', '/tmp/') as segment_info: with vim_module._with('buffer', '/tmp/') as segment_info:
self.assertEqual(vim.file_name(segment_info=segment_info), '') self.assertEqual(vim.file_name(pl=pl, segment_info=segment_info), '')
def test_file_size(self): def test_file_size(self):
pl = Pl()
segment_info = vim_module._get_segment_info() segment_info = vim_module._get_segment_info()
self.assertEqual(vim.file_size(segment_info=segment_info), '0 B') self.assertEqual(vim.file_size(pl=pl, segment_info=segment_info), '0 B')
with vim_module._with('buffer', os.path.join(os.path.dirname(__file__), 'empty')) as segment_info: with vim_module._with('buffer', os.path.join(os.path.dirname(__file__), 'empty')) as segment_info:
self.assertEqual(vim.file_size(segment_info=segment_info), '0 B') self.assertEqual(vim.file_size(pl=pl, segment_info=segment_info), '0 B')
def test_file_opts(self): def test_file_opts(self):
pl = Pl()
segment_info = vim_module._get_segment_info() segment_info = vim_module._get_segment_info()
self.assertEqual(vim.file_format(segment_info=segment_info), self.assertEqual(vim.file_format(pl=pl, segment_info=segment_info),
[{'divider_highlight_group': 'background:divider', 'contents': 'unix'}]) [{'divider_highlight_group': 'background:divider', 'contents': 'unix'}])
self.assertEqual(vim.file_encoding(segment_info=segment_info), self.assertEqual(vim.file_encoding(pl=pl, segment_info=segment_info),
[{'divider_highlight_group': 'background:divider', 'contents': 'utf-8'}]) [{'divider_highlight_group': 'background:divider', 'contents': 'utf-8'}])
self.assertEqual(vim.file_type(segment_info=segment_info), None) self.assertEqual(vim.file_type(pl=pl, segment_info=segment_info), None)
with vim_module._with('bufoptions', filetype='python'): with vim_module._with('bufoptions', filetype='python'):
self.assertEqual(vim.file_type(segment_info=segment_info), self.assertEqual(vim.file_type(pl=pl, segment_info=segment_info),
[{'divider_highlight_group': 'background:divider', 'contents': 'python'}]) [{'divider_highlight_group': 'background:divider', 'contents': 'python'}])
def test_line_percent(self): def test_line_percent(self):
pl = Pl()
segment_info = vim_module._get_segment_info() segment_info = vim_module._get_segment_info()
segment_info['buffer'][0:-1] = [str(i) for i in range(100)] segment_info['buffer'][0:-1] = [str(i) for i in range(100)]
try: try:
self.assertEqual(vim.line_percent(segment_info=segment_info), '1') self.assertEqual(vim.line_percent(pl=pl, segment_info=segment_info), '1')
vim_module._set_cursor(50, 0) vim_module._set_cursor(50, 0)
self.assertEqual(vim.line_percent(segment_info=segment_info), '50') self.assertEqual(vim.line_percent(pl=pl, segment_info=segment_info), '50')
self.assertEqual(vim.line_percent(segment_info=segment_info, gradient=True), self.assertEqual(vim.line_percent(pl=pl, segment_info=segment_info, gradient=True),
[{'contents': '50', 'highlight_group': ['line_percent_gradient', 'line_percent'], 'gradient_level': 50 * 100.0 / 101}]) [{'contents': '50', 'highlight_group': ['line_percent_gradient', 'line_percent'], 'gradient_level': 50 * 100.0 / 101}])
finally: finally:
vim_module._bw(segment_info['bufnr']) vim_module._bw(segment_info['bufnr'])
def test_cursor_current(self): def test_cursor_current(self):
pl = Pl()
segment_info = vim_module._get_segment_info() segment_info = vim_module._get_segment_info()
self.assertEqual(vim.line_current(segment_info=segment_info), '1') self.assertEqual(vim.line_current(pl=pl, segment_info=segment_info), '1')
self.assertEqual(vim.col_current(segment_info=segment_info), '1') self.assertEqual(vim.col_current(pl=pl, segment_info=segment_info), '1')
self.assertEqual(vim.virtcol_current(segment_info=segment_info), self.assertEqual(vim.virtcol_current(pl=pl, segment_info=segment_info),
[{'highlight_group': ['virtcol_current', 'col_current'], 'contents': '1'}]) [{'highlight_group': ['virtcol_current', 'col_current'], 'contents': '1'}])
def test_modified_buffers(self): def test_modified_buffers(self):
self.assertEqual(vim.modified_buffers(), None) pl = Pl()
self.assertEqual(vim.modified_buffers(pl=pl), None)
def test_branch(self): def test_branch(self):
pl = Pl()
with vim_module._with('buffer', '/foo') as segment_info: with vim_module._with('buffer', '/foo') as segment_info:
with replace_module_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: None, directory=path)): with replace_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: None, directory=path)):
self.assertEqual(vim.branch(segment_info=segment_info), self.assertEqual(vim.branch(pl=pl, segment_info=segment_info),
[{'divider_highlight_group': 'branch:divider', 'highlight_group': ['branch'], 'contents': 'foo'}]) [{'divider_highlight_group': 'branch:divider', 'highlight_group': ['branch'], 'contents': 'foo'}])
self.assertEqual(vim.branch(segment_info=segment_info, status_colors=True), self.assertEqual(vim.branch(pl=pl, segment_info=segment_info, status_colors=True),
[{'divider_highlight_group': 'branch:divider', 'highlight_group': ['branch_clean', 'branch'], 'contents': 'foo'}]) [{'divider_highlight_group': 'branch:divider', 'highlight_group': ['branch_clean', 'branch'], 'contents': 'foo'}])
with replace_module_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: 'DU', directory=path)): with replace_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: 'DU', directory=path)):
self.assertEqual(vim.branch(segment_info=segment_info), self.assertEqual(vim.branch(pl=pl, segment_info=segment_info),
[{'divider_highlight_group': 'branch:divider', 'highlight_group': ['branch'], 'contents': 'foo'}]) [{'divider_highlight_group': 'branch:divider', 'highlight_group': ['branch'], 'contents': 'foo'}])
self.assertEqual(vim.branch(segment_info=segment_info, status_colors=True), self.assertEqual(vim.branch(pl=pl, segment_info=segment_info, status_colors=True),
[{'divider_highlight_group': 'branch:divider', 'highlight_group': ['branch_dirty', 'branch'], 'contents': 'foo'}]) [{'divider_highlight_group': 'branch:divider', 'highlight_group': ['branch_dirty', 'branch'], 'contents': 'foo'}])
def test_file_vcs_status(self): def test_file_vcs_status(self):
pl = Pl()
with vim_module._with('buffer', '/foo') as segment_info: with vim_module._with('buffer', '/foo') as segment_info:
with replace_module_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda file: 'M', directory=path)): with replace_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda file: 'M', directory=path)):
self.assertEqual(vim.file_vcs_status(segment_info=segment_info), self.assertEqual(vim.file_vcs_status(pl=pl, segment_info=segment_info),
[{'highlight_group': ['file_vcs_status_M', 'file_vcs_status'], 'contents': 'M'}]) [{'highlight_group': ['file_vcs_status_M', 'file_vcs_status'], 'contents': 'M'}])
with replace_module_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda file: None, directory=path)): with replace_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda file: None, directory=path)):
self.assertEqual(vim.file_vcs_status(segment_info=segment_info), None) self.assertEqual(vim.file_vcs_status(pl=pl, segment_info=segment_info), None)
with vim_module._with('buffer', '/bar') as segment_info: with vim_module._with('buffer', '/bar') as segment_info:
with vim_module._with('bufoptions', buftype='nofile'): with vim_module._with('bufoptions', buftype='nofile'):
with replace_module_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda file: 'M', directory=path)): with replace_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda file: 'M', directory=path)):
self.assertEqual(vim.file_vcs_status(segment_info=segment_info), None) self.assertEqual(vim.file_vcs_status(pl=pl, segment_info=segment_info), None)
def test_repository_status(self): def test_repository_status(self):
pl = Pl()
segment_info = vim_module._get_segment_info() segment_info = vim_module._get_segment_info()
with replace_module_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: None, directory=path)): with replace_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: None, directory=path)):
self.assertEqual(vim.repository_status(segment_info=segment_info), None) self.assertEqual(vim.repository_status(pl=pl, segment_info=segment_info), None)
with replace_module_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: 'DU', directory=path)): with replace_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: 'DU', directory=path)):
self.assertEqual(vim.repository_status(segment_info=segment_info), 'DU') self.assertEqual(vim.repository_status(pl=pl, segment_info=segment_info), 'DU')
old_cwd = None old_cwd = None