Merge remote-tracking branch 'zyx-i/pl-argument' into develop

This commit is contained in:
Kim Silkebækken 2013-03-24 16:52:44 +01:00
commit 5cebad9428
22 changed files with 705 additions and 387 deletions

View File

@ -155,6 +155,17 @@ Common configuration is a subdictionary that is a value of ``common`` key in
:ref:`module segment option <config-themes-seg-module>`. Paths defined here :ref:`module segment option <config-themes-seg-module>`. Paths defined here
have priority when searching for modules. have priority when searching for modules.
``log_file``
Defines path which will hold powerline logs. If not present, logging will be
done to stderr.
``log_level``
String, determines logging level. Defaults to ``WARNING``.
``log_format``
String, determines format of the log messages. Defaults to
``'%(asctime)s:%(level)s:%(message)s'``.
Extension-specific configuration Extension-specific configuration
-------------------------------- --------------------------------

View File

@ -28,6 +28,9 @@ class ThreadedDocumenter(autodoc.FunctionDocumenter):
if isinstance(self.object, ThreadedSegment): if isinstance(self.object, ThreadedSegment):
args = ['interval'] args = ['interval']
defaults = [getattr(self.object, 'interval', 1)] defaults = [getattr(self.object, 'interval', 1)]
if self.object.update_first:
args.append('update_first')
defaults.append(True)
methods = ['render', 'set_state'] methods = ['render', 'set_state']
if isinstance(self.object, KwThreadedSegment): if isinstance(self.object, KwThreadedSegment):
methods += ['key', 'render_one'] methods += ['key', 'render_one']
@ -41,7 +44,8 @@ class ThreadedDocumenter(autodoc.FunctionDocumenter):
if (arg == 'self' or if (arg == 'self' or
(arg == 'segment_info' and (arg == 'segment_info' and
getattr(self.object, 'powerline_requires_segment_info', None)) or getattr(self.object, 'powerline_requires_segment_info', None)) or
(method == 'render_one' and -i == len(argspec.args))): (method == 'render_one' and -i == len(argspec.args)) or
arg in args):
continue continue
if argspec.defaults and len(argspec.defaults) >= -i: if argspec.defaults and len(argspec.defaults) >= -i:
default = argspec.defaults[i] default = argspec.defaults[i]

View File

@ -1 +1 @@
EBUILD powerline-9999.ebuild 3916 SHA256 754f8750aa5c6a455871d44c9478b81278b71318e591a13ef30f763ddb1fbda5 SHA512 48dd9d3ac737417b6a072397a4a50fefc04c0cb4a3c6249b5bf11ee201986f7b25a94b83e6901e8099ae3ea733dc0e650f5ce2234b5143bdbafd906af56916e7 WHIRLPOOL 803298b97adaeb2e3a6cd31c4bd12897373d7ce0722df22a991c684ecd5146fcd87addfc6e428374d924d6cb950dee6bbcf2ea0a262904cc4d04e3eac0b2dcb8 EBUILD powerline-9999.ebuild 4091 SHA256 d1d13b09e3ebefdaa1b90c50ccceb563bb444ca0f484e6448a993d2612dfbb78 SHA512 47f211249bb85608cb9c5b8e72daba5c36971e294e27843396cbf517bc792db64a4f80e843f5c883f8411ed4c9cef5618f9d617b305303a31a4590636879dcd3 WHIRLPOOL 2fc50e1da46d56d160140d3a87a4d9421bf63d67c792bfd88486e0dc72d379045ed79d152aa060a014e7f6bf4eb232642463014de9523909946bd8b2cbf83371

View File

@ -130,6 +130,10 @@ src_install() {
fi fi
rm powerline/bindings/bash/powerline.sh rm powerline/bindings/bash/powerline.sh
elog "" elog ""
insinto /etc/xdg/powerline
doins -r powerline/config_files/*
rm -r powerline/config_files
sed -i -e "/DEFAULT_SYSTEM_CONFIG_DIR/ s@None@'/etc/xdg'@" powerline/__init__.py
distutils-r1_src_install distutils-r1_src_install
use doc && dohtml -r docs_output/* use doc && dohtml -r docs_output/*
} }

View File

@ -4,10 +4,14 @@ from __future__ import absolute_import
import json import json
import os import os
import sys import sys
import logging
from powerline.colorscheme import Colorscheme from powerline.colorscheme import Colorscheme
DEFAULT_SYSTEM_CONFIG_DIR = None
def open_file(path): def open_file(path):
return open(path, 'r') return open(path, 'r')
@ -22,6 +26,42 @@ def load_json_config(search_paths, config_file, load=json.load, open_file=open_f
raise IOError('Config file not found in search path: {0}'.format(config_file)) raise IOError('Config file not found in search path: {0}'.format(config_file))
class PowerlineState(object):
def __init__(self, logger, environ, getcwd, home):
self.environ = environ
self.getcwd = getcwd
self.home = home or environ.get('HOME', None)
self.logger = logger
self.prefix = None
self.last_msgs = {}
def _log(self, attr, msg, *args, **kwargs):
prefix = kwargs.get('prefix') or self.prefix
msg = ((prefix + ':') if prefix else '') + msg.format(*args, **kwargs)
key = attr+':'+prefix
if msg != self.last_msgs.get(key):
getattr(self.logger, attr)(msg)
self.last_msgs[key] = msg
def critical(self, msg, *args, **kwargs):
self._log('critical', msg, *args, **kwargs)
def exception(self, msg, *args, **kwargs):
self._log('exception', msg, *args, **kwargs)
def info(self, msg, *args, **kwargs):
self._log('info', msg, *args, **kwargs)
def error(self, msg, *args, **kwargs):
self._log('error', msg, *args, **kwargs)
def warn(self, msg, *args, **kwargs):
self._log('warning', msg, *args, **kwargs)
def debug(self, msg, *args, **kwargs):
self._log('debug', msg, *args, **kwargs)
class Powerline(object): class Powerline(object):
'''Main powerline class, entrance point for all powerline uses. Sets '''Main powerline class, entrance point for all powerline uses. Sets
powerline up and loads the configuration. powerline up and loads the configuration.
@ -34,9 +74,29 @@ class Powerline(object):
:param str renderer_module: :param str renderer_module:
Overrides renderer module (defaults to ``ext``). Should be the name of Overrides renderer module (defaults to ``ext``). Should be the name of
the package imported like this: ``powerline.renders.{render_module}``. the package imported like this: ``powerline.renders.{render_module}``.
:param bool run_once:
Determines whether .renderer.render() method will be run only once
during python session.
:param Logger logger:
If present, no new logger will be created and this logger will be used.
:param dict environ:
Object with ``.__getitem__`` and ``.get`` methods used to obtain
environment variables. Defaults to ``os.environ``.
:param func getcwd:
Function used to get current working directory. Defaults to
``os.getcwdu`` or ``os.getcwd``.
:param str home:
Home directory. Defaults to ``environ.get('HOME')``.
''' '''
def __init__(self, ext, renderer_module=None, run_once=False): def __init__(self,
ext,
renderer_module=None,
run_once=False,
logger=None,
environ=os.environ,
getcwd=getattr(os, 'getcwdu', os.getcwd),
home=None):
self.config_paths = self.get_config_paths() self.config_paths = self.get_config_paths()
# Load main config file # Load main config file
@ -75,7 +135,40 @@ class Powerline(object):
'tmux_escape': common_config.get('additional_escapes') == 'tmux', 'tmux_escape': common_config.get('additional_escapes') == 'tmux',
'screen_escape': common_config.get('additional_escapes') == 'screen', 'screen_escape': common_config.get('additional_escapes') == 'screen',
} }
self.renderer = Renderer(theme_config, local_themes, theme_kwargs, colorscheme, **options)
# Create logger
if not logger:
log_format = common_config.get('format', '%(asctime)s:%(level)s:%(message)s')
formatter = logging.Formatter(log_format)
level = getattr(logging, common_config.get('log_level', 'WARNING'))
handler = self.get_log_handler(common_config)
handler.setLevel(level)
logger = logging.getLogger('powerline')
logger.setLevel(level)
logger.addHandler(handler)
pl = PowerlineState(logger=logger, environ=environ, getcwd=getcwd, home=home)
self.renderer = Renderer(theme_config, local_themes, theme_kwargs, colorscheme, pl, **options)
def get_log_handler(self, common_config):
'''Get log handler.
:param dict common_config:
Common configuration.
:return: logging.Handler subclass.
'''
log_file = common_config.get('file', None)
if log_file:
log_dir = os.path.dirname(log_file)
if not os.path.isdir(log_dir):
os.mkdir(log_dir)
return logging.FileHandler(log_file)
else:
return logging.StreamHandler()
@staticmethod @staticmethod
def get_config_paths(): def get_config_paths():
@ -86,7 +179,7 @@ class Powerline(object):
config_home = os.environ.get('XDG_CONFIG_HOME', os.path.join(os.path.expanduser('~'), '.config')) config_home = os.environ.get('XDG_CONFIG_HOME', os.path.join(os.path.expanduser('~'), '.config'))
config_path = os.path.join(config_home, 'powerline') config_path = os.path.join(config_home, 'powerline')
config_paths = [config_path] config_paths = [config_path]
config_dirs = os.environ.get('XDG_CONFIG_DIRS', None) config_dirs = os.environ.get('XDG_CONFIG_DIRS', DEFAULT_SYSTEM_CONFIG_DIR)
if config_dirs is not None: if config_dirs is not None:
config_paths.extend([os.path.join(d, 'powerline') for d in config_dirs.split(':')]) config_paths.extend([os.path.join(d, 'powerline') for d in config_dirs.split(':')])
plugin_path = os.path.join(os.path.realpath(os.path.dirname(__file__)), 'config_files') plugin_path = os.path.join(os.path.realpath(os.path.dirname(__file__)), 'config_files')

View File

@ -21,27 +21,24 @@ let s:powerline_pyeval = get(g:, 'powerline_pyeval', s:powerline_pycmd.'eval')
let s:import_cmd = 'from powerline.vim import VimPowerline' let s:import_cmd = 'from powerline.vim import VimPowerline'
try try
exec s:powerline_pycmd s:import_cmd exec s:powerline_pycmd "try:\n"
catch \ ." ".s:import_cmd."\n"
" An error occurred while importing the module, it could be installed \ ."except ImportError:\n"
" outside of Python's module search paths. Update sys.path and try again. \ ." import sys, vim\n"
exec s:powerline_pycmd 'import sys, vim' \ ." sys.path.append(vim.eval('expand(\"<sfile>:h:h:h:h:h\")'))\n"
exec s:powerline_pycmd 'sys.path.append(vim.eval(''expand("<sfile>:h:h:h:h:h")''))' \ ." ".s:import_cmd
try let s:launched = 1
exec s:powerline_pycmd s:import_cmd finally
let s:launched = 1 if !exists('s:launched')
finally call s:CriticalError('An error occurred while importing the Powerline package.
if !exists('s:launched') \ This could be caused by an invalid sys.path setting, or by an incompatible
call s:CriticalError('An error occurred while importing the Powerline package. \ Python version (Powerline requires Python 2.6+ or 3.2+ to work). Please consult
\ This could be caused by an invalid sys.path setting, or by an incompatible \ the troubleshooting section in the documentation for possible solutions.')
\ Python version (Powerline requires Python 2.6+ or 3.2+ to work). Please consult finish
\ the troubleshooting section in the documentation for possible solutions.') else
finish unlet s:launched
endif endif
endtry
endtry endtry
exec s:powerline_pycmd 'powerline = VimPowerline()'
exec s:powerline_pycmd 'del VimPowerline'
if !get(g:, 'powerline_debugging_pyeval') && exists('*'. s:powerline_pyeval) if !get(g:, 'powerline_debugging_pyeval') && exists('*'. s:powerline_pyeval)
let s:pyeval = function(s:powerline_pyeval) let s:pyeval = function(s:powerline_pyeval)
@ -80,18 +77,19 @@ endfunction
function! PowerlineRegisterCachePurgerEvent(event) function! PowerlineRegisterCachePurgerEvent(event)
exec s:powerline_pycmd 'from powerline.segments.vim import launchevent as powerline_launchevent' exec s:powerline_pycmd 'from powerline.segments.vim import launchevent as powerline_launchevent'
augroup Powerline augroup Powerline
exec 'autocmd!' a:event '*' s:powerline_pycmd.' powerline_launchevent("'.a:event.'")' exec 'autocmd' a:event '*' s:powerline_pycmd.' powerline_launchevent("'.a:event.'")'
augroup END augroup END
endfunction endfunction
augroup Powerline
autocmd! ColorScheme * :exec s:powerline_pycmd 'powerline.renderer.reset_highlight()'
autocmd! VimEnter * :redrawstatus!
autocmd! VimLeave * :exec s:powerline_pycmd 'powerline.renderer.shutdown()'
augroup END
exec s:powerline_pycmd 'powerline = VimPowerline()'
exec s:powerline_pycmd 'del VimPowerline'
" Is immediately changed when PowerlineNew() function is run. Good for global " Is immediately changed when PowerlineNew() function is run. Good for global
" value. " value.
set statusline=%!PowerlineNew() set statusline=%!PowerlineNew()
call PowerlineNew() call PowerlineNew()
augroup Powerline
autocmd!
autocmd ColorScheme * :exec s:powerline_pycmd 'powerline.renderer.reset_highlight()'
autocmd VimEnter * :redrawstatus!
autocmd VimLeave * :exec s:powerline_pycmd 'powerline.renderer.shutdown()'
augroup END

View File

@ -45,17 +45,47 @@ class Args(object):
return None return None
def string(s):
if type(s) is bytes:
return s.decode('utf-8', errors='replace')
else:
return str(s)
class Environment(object):
@staticmethod
def __getitem__(key):
try:
return string(zsh.getvalue(key))
except IndexError as e:
raise KeyError(*e.args)
@staticmethod
def get(key, default=None):
try:
return string(zsh.getvalue(key))
except IndexError:
return default
class Prompt(object): class Prompt(object):
__slots__ = ('render', 'side', 'savedpsvar', 'savedps') __slots__ = ('render', 'side', 'savedpsvar', 'savedps', 'args')
def __init__(self, powerline, side, savedpsvar=None, savedps=None): def __init__(self, powerline, side, savedpsvar=None, savedps=None):
self.render = powerline.renderer.render self.render = powerline.renderer.render
self.side = side self.side = side
self.savedpsvar = savedpsvar self.savedpsvar = savedpsvar
self.savedps = savedps self.savedps = savedps
self.args = powerline.args
def __str__(self): def __str__(self):
return self.render(width=zsh.columns(), side=self.side).encode('utf-8') r = self.render(width=zsh.columns(), side=self.side, segment_info=self.args)
if type(r) is not str:
if type(r) is bytes:
return r.decode('utf-8')
else:
return r.encode('utf-8')
return r
def __del__(self): def __del__(self):
if self.savedps: if self.savedps:
@ -71,6 +101,7 @@ def set_prompt(powerline, psvar, side):
def setup(): def setup():
powerline = ShellPowerline(Args()) environ = Environment()
powerline = ShellPowerline(Args(), environ=environ, getcwd=lambda: environ['PWD'])
set_prompt(powerline, 'PS1', 'left') set_prompt(powerline, 'PS1', 'left')
set_prompt(powerline, 'RPS1', 'right') set_prompt(powerline, 'RPS1', 'right')

View File

@ -8,7 +8,7 @@ from time import sleep
from threading import Thread, Lock from threading import Thread, Lock
class ThreadedSegment(Thread): class ThreadedSegment(object):
daemon = True daemon = True
min_sleep_time = 0.1 min_sleep_time = 0.1
update_first = True update_first = True
@ -21,16 +21,37 @@ class ThreadedSegment(Thread):
self.keep_going = True self.keep_going = True
self.run_once = True self.run_once = True
self.did_set_interval = False self.did_set_interval = False
self.thread = None
self.skip = False
self.crashed_value = None
def __call__(self, **kwargs): def __call__(self, pl, update_first=True, **kwargs):
if self.run_once: if self.run_once:
self.pl = pl
self.set_state(**kwargs) self.set_state(**kwargs)
self.update() self.update()
elif not self.is_alive(): elif not self.is_alive():
self.startup(**kwargs) # Without this we will not have to wait long until receiving bug “I
# opened vim, but branch information is only shown after I move
# cursor”.
#
# If running once .update() is called in __call__.
if update_first and self.update_first:
self.update()
self.start()
if self.skip:
return self.crashed_value
with self.write_lock: with self.write_lock:
return self.render(**kwargs) return self.render(update_first=update_first, pl=pl, **kwargs)
def is_alive(self):
return self.thread and self.thread.is_alive()
def start(self):
self.thread = Thread(target=self.run)
self.thread.daemon = self.daemon
self.thread.start()
def sleep(self, adjust_time): def sleep(self, adjust_time):
sleep(max(self.interval - adjust_time, self.min_sleep_time)) sleep(max(self.interval - adjust_time, self.min_sleep_time))
@ -39,14 +60,29 @@ class ThreadedSegment(Thread):
while self.keep_going: while self.keep_going:
start_time = monotonic() start_time = monotonic()
with self.update_lock: try:
self.update() if self.update_lock.acquire(False):
try:
self.update()
except Exception as e:
self.error('Exception while updating: {0}', str(e))
self.skip = True
else:
self.skip = False
else:
return
finally:
# Release lock in any case. If it is not locked in this thread,
# it was done in main thread in .shutdown method, and the lock
# will never be released.
self.update_lock.release()
self.sleep(monotonic() - start_time) self.sleep(monotonic() - start_time)
def shutdown(self): def shutdown(self):
self.keep_going = False if self.keep_going:
self.update_lock.acquire() self.keep_going = False
self.update_lock.acquire()
def set_interval(self, interval=None): def set_interval(self, interval=None):
# Allowing “interval” keyword in configuration. # Allowing “interval” keyword in configuration.
@ -60,23 +96,24 @@ class ThreadedSegment(Thread):
def set_state(self, interval=None, **kwargs): def set_state(self, interval=None, **kwargs):
if not self.did_set_interval or interval: if not self.did_set_interval or interval:
self.set_interval(interval) self.set_interval(interval)
# Without this we will not have to wait long until receiving bug “I
# opened vim, but branch information is only shown after I move cursor”.
if self.update_first:
self.update_first = False
self.update()
def startup(self, **kwargs): def startup(self, pl, **kwargs):
# Normally .update() succeeds to run before value is requested, meaning
# that user is getting values he needs directly at vim startup. Without
# .startup() we will not have to wait long until receiving bug “I opened
# vim, but branch information is only shown after I move cursor”.
self.run_once = False self.run_once = False
self.pl = pl
if not self.is_alive(): if not self.is_alive():
self.set_state(**kwargs) self.set_state(**kwargs)
self.start() self.start()
def error(self, *args, **kwargs):
self.pl.error(prefix=self.__class__.__name__, *args, **kwargs)
def warn(self, *args, **kwargs):
self.pl.warn(prefix=self.__class__.__name__, *args, **kwargs)
def debug(self, *args, **kwargs):
self.pl.debug(prefix=self.__class__.__name__, *args, **kwargs)
def printed(func): def printed(func):
def f(*args, **kwargs): def f(*args, **kwargs):
@ -87,25 +124,29 @@ def printed(func):
class KwThreadedSegment(ThreadedSegment): class KwThreadedSegment(ThreadedSegment):
drop_interval = 10 * 60 drop_interval = 10 * 60
update_missing = True
update_first = False update_first = False
def __init__(self): def __init__(self):
super(KwThreadedSegment, self).__init__() super(KwThreadedSegment, self).__init__()
self.queries = {} self.queries = {}
self.crashed = set()
@staticmethod @staticmethod
def key(**kwargs): def key(**kwargs):
return frozenset(kwargs.items()) return frozenset(kwargs.items())
def render(self, **kwargs): def render(self, update_first, **kwargs):
key = self.key(**kwargs) key = self.key(**kwargs)
if key in self.crashed:
return self.crashed_value
try: try:
update_state = self.queries[key][1] update_state = self.queries[key][1]
except KeyError: except KeyError:
# self.update_missing has the same reasoning as self.update_first in # Allow only to forbid to compute missing values: in either user
# parent class # configuration or in subclasses.
update_state = self.compute_state(key) if self.update_missing else None update_state = self.compute_state(key) if update_first and self.update_first or self.run_once else None
# No locks: render method is already running with write_lock acquired. # No locks: render method is already running with write_lock acquired.
self.queries[key] = (monotonic(), update_state) self.queries[key] = (monotonic(), update_state)
return self.render_one(update_state, **kwargs) return self.render_one(update_state, **kwargs)
@ -115,11 +156,17 @@ class KwThreadedSegment(ThreadedSegment):
removes = [] removes = []
for key, (last_query_time, state) in list(self.queries.items()): for key, (last_query_time, state) in list(self.queries.items()):
if last_query_time < monotonic() < last_query_time + self.drop_interval: if last_query_time < monotonic() < last_query_time + self.drop_interval:
updates[key] = (last_query_time, self.compute_state(key)) try:
updates[key] = (last_query_time, self.compute_state(key))
except Exception as e:
self.exception('Exception while computing state for {0}: {1}', repr(key), str(e))
with self.write_lock:
self.crashed.add(key)
else: else:
removes.append(key) removes.append(key)
with self.write_lock: with self.write_lock:
self.queries.update(updates) self.queries.update(updates)
self.crashed -= set(updates)
for key in removes: for key in removes:
self.queries.pop(key) self.queries.pop(key)
@ -127,9 +174,8 @@ class KwThreadedSegment(ThreadedSegment):
if not self.did_set_interval or (interval < self.interval): if not self.did_set_interval or (interval < self.interval):
self.set_interval(interval) self.set_interval(interval)
key = self.key(**kwargs) if self.update_first:
if key not in self.queries: self.update_first = update_first
self.queries[key] = (monotonic(), self.compute_state(key) if self.update_missing else None)
@staticmethod @staticmethod
def render_one(update_state, **kwargs): def render_one(update_state, **kwargs):

View File

@ -8,6 +8,7 @@ import os
import re import re
from collections import defaultdict from collections import defaultdict
from copy import copy from copy import copy
import logging
try: try:
@ -408,6 +409,11 @@ main_spec = (Spec(
# only for existence of the path, not for it being a directory # only for existence of the path, not for it being a directory
paths=Spec().list((lambda value, *args: (True, True, not os.path.exists(value.value))), paths=Spec().list((lambda value, *args: (True, True, not os.path.exists(value.value))),
lambda value: 'path does not exist: {0}'.format(value)).optional(), lambda value: 'path does not exist: {0}'.format(value)).optional(),
log_file=Spec().type(str).func(lambda value, *args: (True, True, not os.path.isdir(os.path.dirname(value))),
lambda value: 'directory does not exist: {0}'.format(os.path.dirname(value))).optional(),
log_level=Spec().re('^[A-Z]+$').func(lambda value, *args: (True, True, not hasattr(logging, value)),
lambda value: 'unknown debugging level {0}'.format(value)).optional(),
log_format=Spec().type(str).optional(),
).context_message('Error while loading common configuration (key {key})'), ).context_message('Error while loading common configuration (key {key})'),
ext=Spec( ext=Spec(
vim=Spec( vim=Spec(
@ -786,6 +792,11 @@ def check_segment_data_key(key, data, context, echoerr):
return True, False, False return True, False, False
# FIXME More checks, limit existing to ThreadedSegment instances only
args_spec = Spec(
interval=Spec().either(Spec().type(float), Spec().type(int)).optional(),
update_first=Spec().type(bool).optional(),
).unknown_spec(Spec(), Spec()).optional().copy
highlight_group_spec = Spec().type(unicode).copy highlight_group_spec = Spec().type(unicode).copy
segment_module_spec = Spec().type(unicode).func(check_segment_module).optional().copy segment_module_spec = Spec().type(unicode).func(check_segment_module).optional().copy
segments_spec = Spec().optional().list( segments_spec = Spec().optional().list(
@ -801,8 +812,7 @@ segments_spec = Spec().optional().list(
before=Spec().type(unicode).optional(), before=Spec().type(unicode).optional(),
width=Spec().either(Spec().unsigned(), Spec().cmp('eq', 'auto')).optional(), width=Spec().either(Spec().unsigned(), Spec().cmp('eq', 'auto')).optional(),
align=Spec().oneof(set('lr')).optional(), align=Spec().oneof(set('lr')).optional(),
# FIXME Check args args=args_spec(),
args=Spec().type(dict).optional(),
contents=Spec().type(unicode).optional(), contents=Spec().type(unicode).optional(),
highlight_group=Spec().list( highlight_group=Spec().list(
highlight_group_spec().re('^(?:(?!:divider$).)+$', highlight_group_spec().re('^(?:(?!:divider$).)+$',
@ -819,8 +829,7 @@ theme_spec = (Spec(
Spec( Spec(
after=Spec().type(unicode).optional(), after=Spec().type(unicode).optional(),
before=Spec().type(unicode).optional(), before=Spec().type(unicode).optional(),
# FIXME Check args args=args_spec(),
args=Spec().type(dict).optional(),
contents=Spec().type(unicode).optional(), contents=Spec().type(unicode).optional(),
), ),
).optional().context_message('Error while loading segment data (key {key})'), ).optional().context_message('Error while loading segment data (key {key})'),

View File

@ -18,9 +18,11 @@ def construct_returned_value(rendered_highlighted, segments, output_raw):
class Renderer(object): class Renderer(object):
def __init__(self, theme_config, local_themes, theme_kwargs, colorscheme, **options): def __init__(self, theme_config, local_themes, theme_kwargs, colorscheme, pl, **options):
self.__dict__.update(options) self.__dict__.update(options)
self.theme_config = theme_config self.theme_config = theme_config
theme_kwargs['pl'] = pl
self.pl = pl
self.theme = Theme(theme_config=theme_config, **theme_kwargs) self.theme = Theme(theme_config=theme_config, **theme_kwargs)
self.local_themes = local_themes self.local_themes = local_themes
self.theme_kwargs = theme_kwargs self.theme_kwargs = theme_kwargs

View File

@ -66,12 +66,7 @@ class VimRenderer(Renderer):
return vim.strwidth(string) return vim.strwidth(string)
def render(self, window_id, winidx, current): def render(self, window_id, winidx, current):
'''Render all segments. '''Render all segments.'''
This method handles replacing of the percent placeholder for vim
statuslines, and it caches segment contents which are retrieved and
used in non-current windows.
'''
if current: if current:
mode = vim_mode(1) mode = vim_mode(1)
mode = mode_translations.get(mode, mode) mode = mode_translations.get(mode, mode)

View File

@ -64,6 +64,7 @@ def gen_segment_getter(ext, path, theme_configs, default_module=None):
contents, contents_func, module = get_segment_info(data, segment) contents, contents_func, module = get_segment_info(data, segment)
highlight_group = segment_type != 'function' and segment.get('highlight_group') or segment.get('name') highlight_group = segment_type != 'function' and segment.get('highlight_group') or segment.get('name')
return { return {
'name': segment.get('name'),
'type': segment_type, 'type': segment_type,
'highlight_group': highlight_group, 'highlight_group': highlight_group,
'divider_highlight_group': None, 'divider_highlight_group': None,

View File

@ -18,39 +18,98 @@ from powerline.lib.humanize_bytes import humanize_bytes
from collections import namedtuple from collections import namedtuple
def hostname(only_if_ssh=False): def hostname(pl, only_if_ssh=False):
'''Return the current hostname. '''Return the current hostname.
:param bool only_if_ssh: :param bool only_if_ssh:
only return the hostname if currently in an SSH session only return the hostname if currently in an SSH session
''' '''
if only_if_ssh and not os.environ.get('SSH_CLIENT'): if only_if_ssh and not pl.environ.get('SSH_CLIENT'):
return None return None
return socket.gethostname() return socket.gethostname()
def branch(status_colors=True): class RepositorySegment(KwThreadedSegment):
'''Return the current VCS branch. def __init__(self):
super(RepositorySegment, self).__init__()
self.directories = {}
:param bool status_colors: @staticmethod
determines whether repository status will be used to determine highlighting. Default: True. def key(pl, **kwargs):
return os.path.abspath(pl.getcwd())
Highlight groups used: ``branch_clean``, ``branch_dirty``, ``branch``. def update(self):
''' # .compute_state() is running only in this method, and only in one
repo = guess(path=os.path.abspath(os.getcwd())) # thread, thus operations with .directories do not need write locks
if repo: # (.render() method is not using .directories). If this is changed
branch = repo.branch() # .directories needs redesigning
if status_colors: self.directories.clear()
super(RepositorySegment, self).update()
def compute_state(self, path):
repo = guess(path=path)
if repo:
if repo.directory in self.directories:
return self.directories[repo.directory]
else:
r = self.process_repo(repo)
self.directories[repo.directory] = r
return r
class RepositoryStatusSegment(RepositorySegment):
interval = 2
@staticmethod
def process_repo(repo):
return repo.status()
repository_status = with_docstring(RepositoryStatusSegment(),
'''Return the status for the current VCS repository.''')
class BranchSegment(RepositorySegment):
interval = 0.2
started_repository_status = False
@staticmethod
def process_repo(repo):
return repo.branch()
@staticmethod
def render_one(branch, status_colors=False, **kwargs):
if branch and status_colors:
return [{ return [{
'contents': branch, 'contents': branch,
'highlight_group': ['branch_dirty' if repo.status() else 'branch_clean', 'branch'], 'highlight_group': ['branch_dirty' if repository_status(**kwargs) else 'branch_clean', 'branch'],
}] }]
else: else:
return branch return branch
return None
def startup(self, status_colors=False, **kwargs):
super(BranchSegment, self).startup(**kwargs)
if status_colors:
self.started_repository_status = True
repository_status.startup(**kwargs)
def shutdown(self):
if self.started_repository_status:
repository_status.shutdown()
super(BranchSegment, self).shutdown()
def cwd(dir_shorten_len=None, dir_limit_depth=None): branch = with_docstring(BranchSegment(),
'''Return the current VCS branch.
:param bool status_colors:
determines whether repository status will be used to determine highlighting. Default: True.
Highlight groups used: ``branch_clean``, ``branch_dirty``, ``branch``.
''')
def cwd(pl, dir_shorten_len=None, dir_limit_depth=None):
'''Return the current working directory. '''Return the current working directory.
Returns a segment list to create a breadcrumb-like effect. Returns a segment list to create a breadcrumb-like effect.
@ -67,18 +126,16 @@ def cwd(dir_shorten_len=None, dir_limit_depth=None):
''' '''
import re import re
try: try:
try: cwd = pl.getcwd()
cwd = os.getcwdu()
except AttributeError:
cwd = os.getcwd()
except OSError as e: except OSError as e:
if e.errno == 2: if e.errno == 2:
# user most probably deleted the directory # user most probably deleted the directory
# this happens when removing files from Mercurial repos for example # this happens when removing files from Mercurial repos for example
pl.warn('Current directory not found')
cwd = "[not found]" cwd = "[not found]"
else: else:
raise raise
home = os.environ.get('HOME') home = pl.home
if home: if home:
cwd = re.sub('^' + re.escape(home), '~', cwd, 1) cwd = re.sub('^' + re.escape(home), '~', cwd, 1)
cwd_split = cwd.split(os.sep) cwd_split = cwd.split(os.sep)
@ -101,7 +158,7 @@ def cwd(dir_shorten_len=None, dir_limit_depth=None):
return ret return ret
def date(format='%Y-%m-%d', istime=False): def date(pl, format='%Y-%m-%d', istime=False):
'''Return the current date. '''Return the current date.
:param str format: :param str format:
@ -118,7 +175,7 @@ def date(format='%Y-%m-%d', istime=False):
}] }]
def fuzzy_time(): def fuzzy_time(pl):
'''Display the current time as fuzzy time, e.g. "quarter past six".''' '''Display the current time as fuzzy time, e.g. "quarter past six".'''
hour_str = ['twelve', 'one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine', 'ten', 'eleven'] hour_str = ['twelve', 'one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine', 'ten', 'eleven']
minute_str = { minute_str = {
@ -182,7 +239,7 @@ class ExternalIpSegment(ThreadedSegment):
with self.write_lock: with self.write_lock:
self.ip = ip self.ip = ip
def render(self): def render(self, **kwargs):
if not hasattr(self, 'ip'): if not hasattr(self, 'ip'):
return None return None
return [{'contents': self.ip, 'divider_highlight_group': 'background:divider'}] return [{'contents': self.ip, 'divider_highlight_group': 'background:divider'}]
@ -309,13 +366,10 @@ class WeatherSegment(ThreadedSegment):
# Do not lock attribute assignments in this branch: they are used # Do not lock attribute assignments in this branch: they are used
# only in .update() # only in .update()
if not self.location: if not self.location:
try: location_data = json.loads(urllib_read('http://freegeoip.net/json/' + _external_ip()))
location_data = json.loads(urllib_read('http://freegeoip.net/json/' + _external_ip())) self.location = ','.join([location_data['city'],
self.location = ','.join([location_data['city'], location_data['region_name'],
location_data['region_name'], location_data['country_name']])
location_data['country_name']])
except (TypeError, ValueError):
return
query_data = { query_data = {
'q': 'q':
'use "http://github.com/yql/yql-tables/raw/master/weather/weather.bylocation.xml" as we;' 'use "http://github.com/yql/yql-tables/raw/master/weather/weather.bylocation.xml" as we;'
@ -324,19 +378,24 @@ class WeatherSegment(ThreadedSegment):
} }
self.url = 'http://query.yahooapis.com/v1/public/yql?' + urllib_urlencode(query_data) self.url = 'http://query.yahooapis.com/v1/public/yql?' + urllib_urlencode(query_data)
try: raw_response = urllib_read(self.url)
raw_response = urllib_read(self.url) if not raw_response:
response = json.loads(raw_response) self.error('Failed to get response')
condition = response['query']['results']['weather']['rss']['channel']['item']['condition']
condition_code = int(condition['code'])
temp = float(condition['temp'])
except (KeyError, TypeError, ValueError):
return return
response = json.loads(raw_response)
condition = response['query']['results']['weather']['rss']['channel']['item']['condition']
condition_code = int(condition['code'])
temp = float(condition['temp'])
try: try:
icon_names = weather_conditions_codes[condition_code] icon_names = weather_conditions_codes[condition_code]
except IndexError: except IndexError as e:
icon_names = (('not_available' if condition_code == 3200 else 'unknown'),) if condition_code == 3200:
icon_names = ('not_available',)
self.warn('Weather is not available for location {0}', self.location)
else:
icon_names = ('unknown',)
self.error('Unknown condition code: {0}', condition_code)
with self.write_lock: with self.write_lock:
self.temp = temp self.temp = temp
@ -413,7 +472,7 @@ Also uses ``weather_conditions_{condition}`` for all weather conditions supporte
''') ''')
def system_load(format='{avg:.1f}', threshold_good=1, threshold_bad=2): def system_load(pl, format='{avg:.1f}', threshold_good=1, threshold_bad=2):
'''Return system load average. '''Return system load average.
Highlights using ``system_load_good``, ``system_load_bad`` and Highlights using ``system_load_good``, ``system_load_bad`` and
@ -441,6 +500,7 @@ def system_load(format='{avg:.1f}', threshold_good=1, threshold_bad=2):
try: try:
cpu_num = cpu_count() cpu_num = cpu_count()
except NotImplementedError: except NotImplementedError:
pl.warn('Unable to get CPU count: method is not implemented')
return None return None
ret = [] ret = []
for avg in os.getloadavg(): for avg in os.getloadavg():
@ -480,10 +540,10 @@ try:
if data: if data:
yield interface, data.bytes_recv, data.bytes_sent yield interface, data.bytes_recv, data.bytes_sent
def _get_user(): def _get_user(pl):
return psutil.Process(os.getpid()).username return psutil.Process(os.getpid()).username
def cpu_load_percent(measure_interval=.5): def cpu_load_percent(pl, measure_interval=.5):
'''Return the average CPU load as a percentage. '''Return the average CPU load as a percentage.
Requires the ``psutil`` module. Requires the ``psutil`` module.
@ -495,14 +555,11 @@ try:
return '{0}%'.format(cpu_percent) return '{0}%'.format(cpu_percent)
except ImportError: except ImportError:
def _get_bytes(interface): # NOQA def _get_bytes(interface): # NOQA
try: with open('/sys/class/net/{interface}/statistics/rx_bytes'.format(interface=interface), 'rb') as file_obj:
with open('/sys/class/net/{interface}/statistics/rx_bytes'.format(interface=interface), 'rb') as file_obj: rx = int(file_obj.read())
rx = int(file_obj.read()) with open('/sys/class/net/{interface}/statistics/tx_bytes'.format(interface=interface), 'rb') as file_obj:
with open('/sys/class/net/{interface}/statistics/tx_bytes'.format(interface=interface), 'rb') as file_obj: tx = int(file_obj.read())
tx = int(file_obj.read()) return (rx, tx)
return (rx, tx)
except IOError:
return None
def _get_interfaces(): def _get_interfaces():
for interface in os.listdir('/sys/class/net'): for interface in os.listdir('/sys/class/net'):
@ -510,10 +567,10 @@ except ImportError:
if x is not None: if x is not None:
yield interface, x[0], x[1] yield interface, x[0], x[1]
def _get_user(): # NOQA def _get_user(pl): # NOQA
return os.environ.get('USER', None) return pl.environ.get('USER', None)
def cpu_load_percent(measure_interval=.5): # NOQA def cpu_load_percent(pl, measure_interval=.5): # NOQA
'''Return the average CPU load as a percentage. '''Return the average CPU load as a percentage.
Requires the ``psutil`` module. Requires the ``psutil`` module.
@ -521,13 +578,16 @@ except ImportError:
:param float measure_interval: :param float measure_interval:
interval used to measure CPU load (in seconds) interval used to measure CPU load (in seconds)
''' '''
pl.warn('psutil package is not installed, thus CPU load is not available')
return None return None
username = False username = False
# os.geteuid is not available on windows
_geteuid = getattr(os, 'geteuid', lambda: 1)
def user(): def user(pl):
'''Return the current user. '''Return the current user.
Highlights the user with the ``superuser`` if the effective user ID is 0. Highlights the user with the ``superuser`` if the effective user ID is 0.
@ -536,14 +596,11 @@ def user():
''' '''
global username global username
if username is False: if username is False:
username = _get_user() username = _get_user(pl)
if username is None: if username is None:
pl.warn('Failed to get username')
return None return None
try: euid = _geteuid()
euid = os.geteuid()
except AttributeError:
# os.geteuid is not available on windows
euid = 1
return [{ return [{
'contents': username, 'contents': username,
'highlight_group': 'user' if euid != 0 else ['superuser', 'user'], 'highlight_group': 'user' if euid != 0 else ['superuser', 'user'],
@ -566,7 +623,7 @@ else:
@add_divider_highlight_group('background:divider') @add_divider_highlight_group('background:divider')
def uptime(format='{days}d {hours:02d}h {minutes:02d}m'): def uptime(pl, format='{days}d {hours:02d}h {minutes:02d}m'):
'''Return system uptime. '''Return system uptime.
:param str format: :param str format:
@ -577,7 +634,8 @@ def uptime(format='{days}d {hours:02d}h {minutes:02d}m'):
''' '''
try: try:
seconds = _get_uptime() seconds = _get_uptime()
except (IOError, NotImplementedError): except NotImplementedError:
pl.warn('Unable to get uptime. You should install psutil package')
return None return None
minutes, seconds = divmod(seconds, 60) minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60) hours, minutes = divmod(minutes, 60)
@ -646,7 +704,10 @@ class NetworkLoadSegment(KwThreadedSegment):
t2, b2 = idata['last'] t2, b2 = idata['last']
measure_interval = t2 - t1 measure_interval = t2 - t1
if None in (b1, b2) or measure_interval == 0: if None in (b1, b2):
return None
if measure_interval == 0:
self.error('Measure interval is zero. This should not happen')
return None return None
r = [] r = []
@ -703,9 +764,9 @@ Highlight groups used: ``network_load_sent_gradient`` (gradient) or ``network_lo
''') ''')
def virtualenv(): def virtualenv(pl):
'''Return the name of the current Python virtualenv.''' '''Return the name of the current Python virtualenv.'''
return os.path.basename(os.environ.get('VIRTUAL_ENV', '')) or None return os.path.basename(pl.environ.get('VIRTUAL_ENV', '')) or None
_IMAPKey = namedtuple('Key', 'username password server port folder') _IMAPKey = namedtuple('Key', 'username password server port folder')
@ -715,12 +776,12 @@ class EmailIMAPSegment(KwThreadedSegment):
interval = 60 interval = 60
@staticmethod @staticmethod
def key(username, password, server='imap.gmail.com', port=993, folder='INBOX'): def key(username, password, server='imap.gmail.com', port=993, folder='INBOX', **kwargs):
return _IMAPKey(username, password, server, port, folder) return _IMAPKey(username, password, server, port, folder)
@staticmethod def compute_state(self, key):
def compute_state(key):
if not key.username or not key.password: if not key.username or not key.password:
self.warn('Username and password are not configured')
return None return None
try: try:
import imaplib import imaplib
@ -730,8 +791,6 @@ class EmailIMAPSegment(KwThreadedSegment):
rc, message = mail.status(key.folder, '(UNSEEN)') rc, message = mail.status(key.folder, '(UNSEEN)')
unread_str = message[0].decode('utf-8') unread_str = message[0].decode('utf-8')
unread_count = int(re.search('UNSEEN (\d+)', unread_str).group(1)) unread_count = int(re.search('UNSEEN (\d+)', unread_str).group(1))
except socket.gaierror:
return None
except imaplib.IMAP4.error as e: except imaplib.IMAP4.error as e:
unread_count = str(e) unread_count = str(e)
return unread_count return unread_count
@ -783,7 +842,7 @@ class NowPlayingSegment(object):
'stop': '', 'stop': '',
} }
def __call__(self, player='mpd', format='{state_symbol} {artist} - {title} ({total})', *args, **kwargs): def __call__(self, player='mpd', format='{state_symbol} {artist} - {title} ({total})', **kwargs):
player_func = getattr(self, 'player_{0}'.format(player)) player_func = getattr(self, 'player_{0}'.format(player))
stats = { stats = {
'state': None, 'state': None,
@ -794,7 +853,7 @@ class NowPlayingSegment(object):
'elapsed': None, 'elapsed': None,
'total': None, 'total': None,
} }
func_stats = player_func(*args, **kwargs) func_stats = player_func(**kwargs)
if not func_stats: if not func_stats:
return None return None
stats.update(func_stats) stats.update(func_stats)
@ -825,7 +884,7 @@ class NowPlayingSegment(object):
def _convert_seconds(seconds): def _convert_seconds(seconds):
return '{0:.0f}:{1:02.0f}'.format(*divmod(float(seconds), 60)) return '{0:.0f}:{1:02.0f}'.format(*divmod(float(seconds), 60))
def player_cmus(self): def player_cmus(self, pl):
'''Return cmus player information. '''Return cmus player information.
cmus-remote -Q returns data with multi-level information i.e. cmus-remote -Q returns data with multi-level information i.e.
@ -863,7 +922,7 @@ class NowPlayingSegment(object):
'total': self._convert_seconds(now_playing.get('duration', 0)), 'total': self._convert_seconds(now_playing.get('duration', 0)),
} }
def player_mpd(self, host='localhost', port=6600): def player_mpd(self, pl, host='localhost', port=6600):
try: try:
import mpd import mpd
client = mpd.MPDClient() client = mpd.MPDClient()
@ -895,7 +954,7 @@ class NowPlayingSegment(object):
'total': now_playing[3], 'total': now_playing[3],
} }
def player_spotify(self): def player_spotify(self, pl):
try: try:
import dbus import dbus
except ImportError: except ImportError:
@ -923,7 +982,7 @@ class NowPlayingSegment(object):
'total': self._convert_seconds(info.get('mpris:length') / 1e6), 'total': self._convert_seconds(info.get('mpris:length') / 1e6),
} }
def player_rhythmbox(self): def player_rhythmbox(self, pl):
now_playing = self._run_cmd(['rhythmbox-client', '--no-start', '--no-present', '--print-playing-format', '%at\n%aa\n%tt\n%te\n%td']) now_playing = self._run_cmd(['rhythmbox-client', '--no-start', '--no-present', '--print-playing-format', '%at\n%aa\n%tt\n%te\n%td'])
if not now_playing: if not now_playing:
return return

View File

@ -3,5 +3,5 @@
from powerline.theme import requires_segment_info from powerline.theme import requires_segment_info
@requires_segment_info @requires_segment_info
def prompt_count(segment_info): def prompt_count(pl, segment_info):
return str(segment_info.prompt_count) return str(segment_info.prompt_count)

View File

@ -4,7 +4,7 @@ from powerline.theme import requires_segment_info
@requires_segment_info @requires_segment_info
def last_status(segment_info): def last_status(pl, segment_info):
'''Return last exit code. '''Return last exit code.
Highlight groups used: ``exit_fail`` Highlight groups used: ``exit_fail``
@ -15,7 +15,7 @@ def last_status(segment_info):
@requires_segment_info @requires_segment_info
def last_pipe_status(segment_info): def last_pipe_status(pl, segment_info):
'''Return last pipe status. '''Return last pipe status.
Highlight groups used: ``exit_fail``, ``exit_success`` Highlight groups used: ``exit_fail``, ``exit_success``

View File

@ -47,34 +47,32 @@ vim_modes = {
} }
eventcaches = defaultdict(lambda: []) eventfuncs = defaultdict(lambda: [])
bufeventcaches = defaultdict(lambda: []) bufeventfuncs = defaultdict(lambda: [])
defined_events = set()
def purgeonevents_reg(events, eventcaches=bufeventcaches): def purgeonevents_reg(func, events, is_buffer_event=False):
def cache_reg_func(cache): if is_buffer_event:
for event in events: cureventfuncs = bufeventfuncs
if event not in eventcaches: else:
vim.eval('PowerlineRegisterCachePurgerEvent("' + event + '")') cureventfuncs = eventfuncs
eventcaches[event].append(cache) for event in events:
return cache_reg_func if event not in defined_events:
vim.eval('PowerlineRegisterCachePurgerEvent("' + event + '")')
purgeall_on_shell = purgeonevents_reg(('ShellCmdPost', 'ShellFilterPost', 'FocusGained'), eventcaches=eventcaches) defined_events.add(event)
purgebuf_on_shell_and_write = purgeonevents_reg(('BufWritePost', 'ShellCmdPost', 'ShellFilterPost', 'FocusGained')) cureventfuncs[event].append(func)
def launchevent(event): def launchevent(event):
global eventcaches global eventfuncs
global bufeventcaches global bufeventfuncs
for cache in eventcaches[event]: for func in eventfuncs[event]:
cache.clear() func()
if bufeventcaches[event]: if bufeventfuncs[event]:
buf = int(vim_funcs['expand']('<abuf>')) buffer = vim.buffers[int(vim_funcs['expand']('<abuf>')) - 1]
for cache in bufeventcaches[event]: for func in bufeventfuncs[event]:
try: func(buffer)
cache.pop(buf)
except KeyError:
pass
# TODO Remove cache when needed # TODO Remove cache when needed
@ -96,7 +94,7 @@ def window_cached(func):
@requires_segment_info @requires_segment_info
def mode(segment_info, override=None): def mode(pl, segment_info, override=None):
'''Return the current vim mode. '''Return the current vim mode.
:param dict override: :param dict override:
@ -114,7 +112,7 @@ def mode(segment_info, override=None):
@requires_segment_info @requires_segment_info
def modified_indicator(segment_info, text='+'): def modified_indicator(pl, segment_info, text='+'):
'''Return a file modified indicator. '''Return a file modified indicator.
:param string text: :param string text:
@ -124,7 +122,7 @@ def modified_indicator(segment_info, text='+'):
@requires_segment_info @requires_segment_info
def paste_indicator(segment_info, text='PASTE'): def paste_indicator(pl, segment_info, text='PASTE'):
'''Return a paste mode indicator. '''Return a paste mode indicator.
:param string text: :param string text:
@ -134,7 +132,7 @@ def paste_indicator(segment_info, text='PASTE'):
@requires_segment_info @requires_segment_info
def readonly_indicator(segment_info, text=''): def readonly_indicator(pl, segment_info, text=''):
'''Return a read-only indicator. '''Return a read-only indicator.
:param string text: :param string text:
@ -144,7 +142,7 @@ def readonly_indicator(segment_info, text=''):
@requires_segment_info @requires_segment_info
def file_directory(segment_info, shorten_user=True, shorten_cwd=True, shorten_home=False): def file_directory(pl, segment_info, shorten_user=True, shorten_cwd=True, shorten_home=False):
'''Return file directory (head component of the file path). '''Return file directory (head component of the file path).
:param bool shorten_user: :param bool shorten_user:
@ -167,13 +165,15 @@ def file_directory(segment_info, shorten_user=True, shorten_cwd=True, shorten_ho
@requires_segment_info @requires_segment_info
def file_name(segment_info, display_no_file=False, no_file_text='[No file]'): def file_name(pl, segment_info, display_no_file=False, no_file_text='[No file]'):
'''Return file name (tail component of the file path). '''Return file name (tail component of the file path).
:param bool display_no_file: :param bool display_no_file:
display a string if the buffer is missing a file name display a string if the buffer is missing a file name
:param str no_file_text: :param str no_file_text:
the string to display if the buffer is missing a file name the string to display if the buffer is missing a file name
Highlight groups used: ``file_name_no_file`` or ``file_name``, ``file_name``.
''' '''
name = segment_info['buffer'].name name = segment_info['buffer'].name
if not name: if not name:
@ -189,7 +189,7 @@ def file_name(segment_info, display_no_file=False, no_file_text='[No file]'):
@window_cached @window_cached
def file_size(suffix='B', si_prefix=False): def file_size(pl, suffix='B', si_prefix=False):
'''Return file size in &encoding. '''Return file size in &encoding.
:param str suffix: :param str suffix:
@ -206,7 +206,7 @@ def file_size(suffix='B', si_prefix=False):
@requires_segment_info @requires_segment_info
@add_divider_highlight_group('background:divider') @add_divider_highlight_group('background:divider')
def file_format(segment_info): def file_format(pl, segment_info):
'''Return file format (i.e. line ending type). '''Return file format (i.e. line ending type).
:return: file format or None if unknown or missing file format :return: file format or None if unknown or missing file format
@ -218,7 +218,7 @@ def file_format(segment_info):
@requires_segment_info @requires_segment_info
@add_divider_highlight_group('background:divider') @add_divider_highlight_group('background:divider')
def file_encoding(segment_info): def file_encoding(pl, segment_info):
'''Return file encoding/character set. '''Return file encoding/character set.
:return: file encoding/character set or None if unknown or missing file encoding :return: file encoding/character set or None if unknown or missing file encoding
@ -230,7 +230,7 @@ def file_encoding(segment_info):
@requires_segment_info @requires_segment_info
@add_divider_highlight_group('background:divider') @add_divider_highlight_group('background:divider')
def file_type(segment_info): def file_type(pl, segment_info):
'''Return file type. '''Return file type.
:return: file type or None if unknown file type :return: file type or None if unknown file type
@ -241,7 +241,7 @@ def file_type(segment_info):
@requires_segment_info @requires_segment_info
def line_percent(segment_info, gradient=False): def line_percent(pl, segment_info, gradient=False):
'''Return the cursor position in the file as a percentage. '''Return the cursor position in the file as a percentage.
:param bool gradient: :param bool gradient:
@ -262,20 +262,20 @@ def line_percent(segment_info, gradient=False):
@requires_segment_info @requires_segment_info
def line_current(segment_info): def line_current(pl, segment_info):
'''Return the current cursor line.''' '''Return the current cursor line.'''
return str(segment_info['window'].cursor[0]) return str(segment_info['window'].cursor[0])
@requires_segment_info @requires_segment_info
def col_current(segment_info): def col_current(pl, segment_info):
'''Return the current cursor column. '''Return the current cursor column.
''' '''
return str(segment_info['window'].cursor[1] + 1) return str(segment_info['window'].cursor[1] + 1)
@window_cached @window_cached
def virtcol_current(): def virtcol_current(pl):
'''Return current visual column with concealed characters ingored '''Return current visual column with concealed characters ingored
Highlight groups used: ``virtcol_current`` or ``col_current``. Highlight groups used: ``virtcol_current`` or ``col_current``.
@ -284,7 +284,7 @@ def virtcol_current():
'highlight_group': ['virtcol_current', 'col_current']}] 'highlight_group': ['virtcol_current', 'col_current']}]
def modified_buffers(text='+ ', join_str=','): def modified_buffers(pl, text='+ ', join_str=','):
'''Return a comma-separated list of modified buffers. '''Return a comma-separated list of modified buffers.
:param str text: :param str text:
@ -368,16 +368,16 @@ class BranchSegment(RepositorySegment):
return [{ return [{
'contents': update_state, 'contents': update_state,
'highlight_group': (['branch_dirty' if repository_status(segment_info=segment_info) else 'branch_clean'] 'highlight_group': (['branch_dirty' if repository_status(segment_info=segment_info, **kwargs) else 'branch_clean']
if status_colors else []) + ['branch'], if status_colors else []) + ['branch'],
'divider_highlight_group': 'branch:divider', 'divider_highlight_group': 'branch:divider',
}] }]
def startup(self, status_colors=False, **kwargs): def startup(self, status_colors=False, **kwargs):
super(BranchSegment, self).startup() super(BranchSegment, self).startup(**kwargs)
if status_colors: if status_colors:
self.started_repository_status = True self.started_repository_status = True
repository_status.startup() repository_status.startup(**kwargs)
def shutdown(self): def shutdown(self):
if self.started_repository_status: if self.started_repository_status:

View File

@ -14,13 +14,10 @@ def mergeargs(argvalue):
class ShellPowerline(Powerline): class ShellPowerline(Powerline):
def __init__(self, args, run_once=False): def __init__(self, args, **kwargs):
self.args = args self.args = args
self.theme_option = mergeargs(args.theme_option) or {} self.theme_option = mergeargs(args.theme_option) or {}
super(ShellPowerline, self).__init__(args.ext[0], args.renderer_module, run_once=run_once) super(ShellPowerline, self).__init__(args.ext[0], args.renderer_module, **kwargs)
def get_segment_info(self):
return self.args
def load_main_config(self): def load_main_config(self):
r = super(ShellPowerline, self).load_main_config() r = super(ShellPowerline, self).load_main_config()

View File

@ -24,7 +24,7 @@ def requires_segment_info(func):
class Theme(object): class Theme(object):
def __init__(self, ext, theme_config, common_config, top_theme_config=None, run_once=False): def __init__(self, ext, theme_config, common_config, pl, top_theme_config=None, run_once=False):
self.dividers = theme_config.get('dividers', common_config['dividers']) self.dividers = theme_config.get('dividers', common_config['dividers'])
self.spaces = theme_config.get('spaces', common_config['spaces']) self.spaces = theme_config.get('spaces', common_config['spaces'])
self.segments = { self.segments = {
@ -35,16 +35,22 @@ class Theme(object):
'contents': None, 'contents': None,
'highlight': {'fg': False, 'bg': False, 'attr': 0} 'highlight': {'fg': False, 'bg': False, 'attr': 0}
} }
self.pl = pl
theme_configs = [theme_config] theme_configs = [theme_config]
if top_theme_config: if top_theme_config:
theme_configs.append(top_theme_config) theme_configs.append(top_theme_config)
get_segment = gen_segment_getter(ext, common_config['paths'], theme_configs, theme_config.get('default_module')) get_segment = gen_segment_getter(ext, common_config['paths'], theme_configs, theme_config.get('default_module'))
for side in ['left', 'right']: for side in ['left', 'right']:
self.segments[side].extend((get_segment(segment, side) for segment in theme_config['segments'].get(side, []))) for segment in theme_config['segments'].get(side, []):
if not run_once: segment = get_segment(segment, side)
for segment in self.segments[side]: if not run_once:
if segment['startup']: if segment['startup']:
segment['startup'](**segment['args']) try:
segment['startup'](pl=pl, **segment['args'])
except Exception as e:
pl.error('Exception during {0} startup: {1}', segment['name'], str(e))
continue
self.segments[side].append(segment)
def shutdown(self): def shutdown(self):
for segments in self.segments.values(): for segments in self.segments.values():
@ -71,11 +77,17 @@ class Theme(object):
parsed_segments = [] parsed_segments = []
for segment in self.segments[side]: for segment in self.segments[side]:
if segment['type'] == 'function': if segment['type'] == 'function':
if (hasattr(segment['contents_func'], 'powerline_requires_segment_info') self.pl.prefix = segment['name']
and segment['contents_func'].powerline_requires_segment_info): try:
contents = segment['contents_func'](segment_info=segment_info, **segment['args']) if (hasattr(segment['contents_func'], 'powerline_requires_segment_info')
else: and segment['contents_func'].powerline_requires_segment_info):
contents = segment['contents_func'](**segment['args']) contents = segment['contents_func'](pl=self.pl, segment_info=segment_info, **segment['args'])
else:
contents = segment['contents_func'](pl=self.pl, **segment['args'])
except Exception as e:
self.pl.exception('Exception while computing segment: {0}', str(e))
continue
if contents is None: if contents is None:
continue continue
if isinstance(contents, list): if isinstance(contents, list):

View File

@ -13,7 +13,10 @@ except ImportError:
if __name__ == '__main__': if __name__ == '__main__':
args = get_argparser(description=__doc__).parse_args() args = get_argparser(description=__doc__).parse_args()
powerline = ShellPowerline(args, run_once=True) kwargs = {}
if 'PWD' in os.environ:
kwargs['getcwd'] = lambda: os.environ['PWD']
powerline = ShellPowerline(args, run_once=True, **kwargs)
rendered = powerline.renderer.render(width=args.width, side=args.side, segment_info=args) rendered = powerline.renderer.render(width=args.width, side=args.side, segment_info=args)
try: try:
sys.stdout.write(rendered) sys.stdout.write(rendered)

View File

@ -4,6 +4,28 @@ import sys
import os import os
class Pl(object):
def __init__(self):
self.errors = []
self.warns = []
self.debugs = []
self._cwd = None
self.prefix = None
self.environ = {}
self.home = None
def getcwd(self):
if isinstance(self._cwd, Exception):
raise self._cwd
else:
return self._cwd
for meth in ('error', 'warn', 'debug'):
exec ('''def {0}(self, msg, *args, **kwargs):
self.{0}s.append((kwargs.get('prefix') or self.prefix, msg, args, kwargs))
''').format(meth)
class Args(object): class Args(object):
theme_option = None theme_option = None
config = None config = None
@ -63,50 +85,58 @@ def new_module(name, **kwargs):
return module return module
class ModuleAttrReplace(object): class AttrReplace(object):
def __init__(self, module, attr, new): def __init__(self, obj, attr, new):
self.module = module self.obj = obj
self.attr = attr self.attr = attr
self.new = new self.new = new
def __enter__(self): def __enter__(self):
try: try:
self.old = getattr(self.module, self.attr) self.old = getattr(self.obj, self.attr)
except AttributeError: except AttributeError:
pass pass
setattr(self.module, self.attr, self.new) setattr(self.obj, self.attr, self.new)
def __exit__(self, *args): def __exit__(self, *args):
try: try:
setattr(self.module, self.attr, self.old) setattr(self.obj, self.attr, self.old)
except AttributeError: except AttributeError:
delattr(self.module, self.attr) delattr(self.obj, self.attr)
replace_module_attr = ModuleAttrReplace replace_attr = AttrReplace
def replace_module_module(module, name, **kwargs): def replace_module_module(module, name, **kwargs):
return replace_module_attr(module, name, new_module(name, **kwargs)) return replace_attr(module, name, new_module(name, **kwargs))
class EnvReplace(object): class ItemReplace(object):
def __init__(self, name, new): def __init__(self, d, key, new, r=None):
self.name = name self.key = key
self.new = new self.new = new
self.d = d
self.r = r
def __enter__(self): def __enter__(self):
self.old = os.environ.get(self.name) self.old = self.d.get(self.key)
os.environ[self.name] = self.new self.d[self.key] = self.new
return self.r
def __exit__(self, *args): def __exit__(self, *args):
if self.old is None: if self.old is None:
try: try:
os.environ.pop(self.name) self.d.pop(self.key)
except KeyError: except KeyError:
pass pass
else: else:
os.environ[self.name] = self.old self.d[self.key] = self.old
replace_env = EnvReplace def replace_env(key, new, d=None):
r = None
if not d:
r = Pl()
d = r.environ
return ItemReplace(d, key, new, r)

View File

@ -7,7 +7,7 @@ import tests.vim as vim_module
import sys import sys
import os import os
import json import json
from tests.lib import Args, urllib_read, replace_module_attr from tests.lib import Args, urllib_read, replace_attr
from tests import TestCase from tests import TestCase
@ -67,7 +67,7 @@ class TestConfig(TestCase):
from imp import reload from imp import reload
reload(common) reload(common)
from powerline.shell import ShellPowerline from powerline.shell import ShellPowerline
with replace_module_attr(common, 'urllib_read', urllib_read): with replace_attr(common, 'urllib_read', urllib_read):
powerline = ShellPowerline(Args(ext=['tmux']), run_once=False) powerline = ShellPowerline(Args(ext=['tmux']), run_once=False)
powerline.renderer.render() powerline.renderer.render()
powerline = ShellPowerline(Args(ext=['tmux']), run_once=False) powerline = ShellPowerline(Args(ext=['tmux']), run_once=False)
@ -112,7 +112,7 @@ class TestConfig(TestCase):
from imp import reload from imp import reload
reload(common) reload(common)
from powerline import Powerline from powerline import Powerline
with replace_module_attr(common, 'urllib_read', urllib_read): with replace_attr(common, 'urllib_read', urllib_read):
Powerline(ext='wm', renderer_module='pango_markup', run_once=True).renderer.render() Powerline(ext='wm', renderer_module='pango_markup', run_once=True).renderer.render()
reload(common) reload(common)

View File

@ -4,7 +4,7 @@ from powerline.segments import shell, common
import tests.vim as vim_module import tests.vim as vim_module
import sys import sys
import os import os
from tests.lib import Args, urllib_read, replace_module_attr, new_module, replace_module_module, replace_env from tests.lib import Args, urllib_read, replace_attr, new_module, replace_module_module, replace_env, Pl
from tests import TestCase from tests import TestCase
@ -13,14 +13,16 @@ vim = None
class TestShell(TestCase): class TestShell(TestCase):
def test_last_status(self): def test_last_status(self):
self.assertEqual(shell.last_status(Args(last_exit_code=10)), pl = Pl()
self.assertEqual(shell.last_status(pl=pl, segment_info=Args(last_exit_code=10)),
[{'contents': '10', 'highlight_group': 'exit_fail'}]) [{'contents': '10', 'highlight_group': 'exit_fail'}])
self.assertEqual(shell.last_status(Args(last_exit_code=None)), None) self.assertEqual(shell.last_status(pl=pl, segment_info=Args(last_exit_code=None)), None)
def test_last_pipe_status(self): def test_last_pipe_status(self):
self.assertEqual(shell.last_pipe_status(Args(last_pipe_status=[])), None) pl = Pl()
self.assertEqual(shell.last_pipe_status(Args(last_pipe_status=[0, 0, 0])), None) self.assertEqual(shell.last_pipe_status(pl=pl, segment_info=Args(last_pipe_status=[])), None)
self.assertEqual(shell.last_pipe_status(Args(last_pipe_status=[0, 2, 0])), self.assertEqual(shell.last_pipe_status(pl=pl, segment_info=Args(last_pipe_status=[0, 0, 0])), None)
self.assertEqual(shell.last_pipe_status(pl=pl, segment_info=Args(last_pipe_status=[0, 2, 0])),
[{'contents': '0', 'highlight_group': 'exit_success'}, [{'contents': '0', 'highlight_group': 'exit_success'},
{'contents': '2', 'highlight_group': 'exit_fail'}, {'contents': '2', 'highlight_group': 'exit_fail'},
{'contents': '0', 'highlight_group': 'exit_success'}]) {'contents': '0', 'highlight_group': 'exit_success'}])
@ -28,219 +30,226 @@ class TestShell(TestCase):
class TestCommon(TestCase): class TestCommon(TestCase):
def test_hostname(self): def test_hostname(self):
with replace_env('SSH_CLIENT', '192.168.0.12 40921 22'): with replace_env('SSH_CLIENT', '192.168.0.12 40921 22') as pl:
with replace_module_module(common, 'socket', gethostname=lambda: 'abc'): with replace_module_module(common, 'socket', gethostname=lambda: 'abc'):
self.assertEqual(common.hostname(), 'abc') self.assertEqual(common.hostname(pl=pl), 'abc')
self.assertEqual(common.hostname(only_if_ssh=True), 'abc') self.assertEqual(common.hostname(pl=pl, only_if_ssh=True), 'abc')
os.environ.pop('SSH_CLIENT') pl.environ.pop('SSH_CLIENT')
self.assertEqual(common.hostname(), 'abc') self.assertEqual(common.hostname(pl=pl), 'abc')
self.assertEqual(common.hostname(only_if_ssh=True), None) self.assertEqual(common.hostname(pl=pl, only_if_ssh=True), None)
def test_user(self): def test_user(self):
new_os = new_module('os', environ={'USER': 'def'}, getpid=lambda: 1) new_os = new_module('os', getpid=lambda: 1)
new_psutil = new_module('psutil', Process=lambda pid: Args(username='def')) new_psutil = new_module('psutil', Process=lambda pid: Args(username='def'))
with replace_module_attr(common, 'os', new_os): with replace_env('USER', 'def') as pl:
with replace_module_attr(common, 'psutil', new_psutil): with replace_attr(common, 'os', new_os):
self.assertEqual(common.user(), [{'contents': 'def', 'highlight_group': 'user'}]) with replace_attr(common, 'psutil', new_psutil):
new_os.geteuid = lambda: 1 with replace_attr(common, '_geteuid', lambda: 5):
self.assertEqual(common.user(), [{'contents': 'def', 'highlight_group': 'user'}]) self.assertEqual(common.user(pl=pl), [{'contents': 'def', 'highlight_group': 'user'}])
new_os.geteuid = lambda: 0 with replace_attr(common, '_geteuid', lambda: 0):
self.assertEqual(common.user(), [{'contents': 'def', 'highlight_group': ['superuser', 'user']}]) self.assertEqual(common.user(pl=pl), [{'contents': 'def', 'highlight_group': ['superuser', 'user']}])
def test_branch(self): def test_branch(self):
with replace_module_attr(common, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: None)): pl = Pl()
self.assertEqual(common.branch(status_colors=False), 'tests') pl._cwd = os.getcwd()
self.assertEqual(common.branch(status_colors=True), with replace_attr(common, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: None, directory='/tmp/tests')):
self.assertEqual(common.branch(pl=pl, status_colors=False), 'tests')
self.assertEqual(common.branch(pl=pl, status_colors=True),
[{'contents': 'tests', 'highlight_group': ['branch_clean', 'branch']}]) [{'contents': 'tests', 'highlight_group': ['branch_clean', 'branch']}])
with replace_module_attr(common, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: 'D ')): with replace_attr(common, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: 'D ', directory='/tmp/tests')):
self.assertEqual(common.branch(status_colors=False), 'tests') self.assertEqual(common.branch(pl=pl, status_colors=False), 'tests')
self.assertEqual(common.branch(), self.assertEqual(common.branch(pl=pl, status_colors=True),
[{'contents': 'tests', 'highlight_group': ['branch_dirty', 'branch']}]) [{'contents': 'tests', 'highlight_group': ['branch_dirty', 'branch']}])
with replace_module_attr(common, 'guess', lambda path: None): self.assertEqual(common.branch(pl=pl), 'tests')
self.assertEqual(common.branch(), None) with replace_attr(common, 'guess', lambda path: None):
self.assertEqual(common.branch(pl=pl), None)
def test_cwd(self): def test_cwd(self):
new_os = new_module('os', path=os.path, environ={}, sep='/') new_os = new_module('os', path=os.path, sep='/')
new_os.getcwd = lambda: '/abc/def/ghi/foo/bar' pl = Pl()
with replace_module_attr(common, 'os', new_os): with replace_attr(common, 'os', new_os):
self.assertEqual(common.cwd(), pl._cwd = '/abc/def/ghi/foo/bar'
self.assertEqual(common.cwd(pl=pl),
[{'contents': '/', 'divider_highlight_group': 'cwd:divider'}, [{'contents': '/', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'abc', 'divider_highlight_group': 'cwd:divider'}, {'contents': 'abc', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'def', 'divider_highlight_group': 'cwd:divider'}, {'contents': 'def', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'ghi', 'divider_highlight_group': 'cwd:divider'}, {'contents': 'ghi', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'foo', 'divider_highlight_group': 'cwd:divider'}, {'contents': 'foo', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}]) {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}])
new_os.getcwdu = lambda: '/abc/def/ghi/foo/bar' pl.home = '/abc/def/ghi'
self.assertEqual(common.cwd(), self.assertEqual(common.cwd(pl=pl),
[{'contents': '/', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'abc', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'def', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'ghi', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'foo', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}])
new_os.environ['HOME'] = '/abc/def/ghi'
self.assertEqual(common.cwd(),
[{'contents': '~', 'divider_highlight_group': 'cwd:divider'}, [{'contents': '~', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'foo', 'divider_highlight_group': 'cwd:divider'}, {'contents': 'foo', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}]) {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}])
self.assertEqual(common.cwd(dir_limit_depth=3), self.assertEqual(common.cwd(pl=pl, dir_limit_depth=3),
[{'contents': '~', 'divider_highlight_group': 'cwd:divider'}, [{'contents': '~', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'foo', 'divider_highlight_group': 'cwd:divider'}, {'contents': 'foo', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}]) {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}])
self.assertEqual(common.cwd(dir_limit_depth=1), self.assertEqual(common.cwd(pl=pl, dir_limit_depth=1),
[{'contents': '', 'divider_highlight_group': 'cwd:divider'}, [{'contents': '', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}]) {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}])
self.assertEqual(common.cwd(dir_limit_depth=2, dir_shorten_len=2), self.assertEqual(common.cwd(pl=pl, dir_limit_depth=2, dir_shorten_len=2),
[{'contents': '~', 'divider_highlight_group': 'cwd:divider'}, [{'contents': '~', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'fo', 'divider_highlight_group': 'cwd:divider'}, {'contents': 'fo', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}]) {'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}])
ose = OSError() ose = OSError()
ose.errno = 2 ose.errno = 2
pl._cwd = ose
def raises(exc): self.assertEqual(common.cwd(pl=pl, dir_limit_depth=2, dir_shorten_len=2),
raise exc
new_os.getcwdu = lambda: raises(ose)
self.assertEqual(common.cwd(dir_limit_depth=2, dir_shorten_len=2),
[{'contents': '[not found]', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}]) [{'contents': '[not found]', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}])
new_os.getcwdu = lambda: raises(OSError()) pl._cwd = OSError()
self.assertRaises(OSError, common.cwd, tuple(), {'dir_limit_depth': 2, 'dir_shorten_len': 2}) self.assertRaises(OSError, common.cwd, pl=pl, dir_limit_depth=2, dir_shorten_len=2)
new_os.getcwdu = lambda: raises(ValueError()) pl._cwd = ValueError()
self.assertRaises(ValueError, common.cwd, tuple(), {'dir_limit_depth': 2, 'dir_shorten_len': 2}) self.assertRaises(ValueError, common.cwd, pl=pl, dir_limit_depth=2, dir_shorten_len=2)
def test_date(self): def test_date(self):
with replace_module_attr(common, 'datetime', Args(now=lambda: Args(strftime=lambda fmt: fmt))): pl = Pl()
self.assertEqual(common.date(), [{'contents': '%Y-%m-%d', 'highlight_group': ['date'], 'divider_highlight_group': None}]) with replace_attr(common, 'datetime', Args(now=lambda: Args(strftime=lambda fmt: fmt))):
self.assertEqual(common.date(format='%H:%M', istime=True), [{'contents': '%H:%M', 'highlight_group': ['time', 'date'], 'divider_highlight_group': 'time:divider'}]) self.assertEqual(common.date(pl=pl), [{'contents': '%Y-%m-%d', 'highlight_group': ['date'], 'divider_highlight_group': None}])
self.assertEqual(common.date(pl=pl, format='%H:%M', istime=True), [{'contents': '%H:%M', 'highlight_group': ['time', 'date'], 'divider_highlight_group': 'time:divider'}])
def test_fuzzy_time(self): def test_fuzzy_time(self):
time = Args(hour=0, minute=45) time = Args(hour=0, minute=45)
with replace_module_attr(common, 'datetime', Args(now=lambda: time)): pl = Pl()
self.assertEqual(common.fuzzy_time(), 'quarter to one') with replace_attr(common, 'datetime', Args(now=lambda: time)):
self.assertEqual(common.fuzzy_time(pl=pl), 'quarter to one')
time.hour = 23 time.hour = 23
time.minute = 59 time.minute = 59
self.assertEqual(common.fuzzy_time(), 'round about midnight') self.assertEqual(common.fuzzy_time(pl=pl), 'round about midnight')
time.minute = 33 time.minute = 33
self.assertEqual(common.fuzzy_time(), 'twenty-five to twelve') self.assertEqual(common.fuzzy_time(pl=pl), 'twenty-five to twelve')
time.minute = 60 time.minute = 60
self.assertEqual(common.fuzzy_time(), 'twelve o\'clock') self.assertEqual(common.fuzzy_time(pl=pl), 'twelve o\'clock')
def test_external_ip(self): def test_external_ip(self):
with replace_module_attr(common, 'urllib_read', urllib_read): pl = Pl()
self.assertEqual(common.external_ip(), [{'contents': '127.0.0.1', 'divider_highlight_group': 'background:divider'}]) with replace_attr(common, 'urllib_read', urllib_read):
self.assertEqual(common.external_ip(pl=pl), [{'contents': '127.0.0.1', 'divider_highlight_group': 'background:divider'}])
def test_uptime(self): def test_uptime(self):
with replace_module_attr(common, '_get_uptime', lambda: 65536): pl = Pl()
self.assertEqual(common.uptime(), [{'contents': '0d 18h 12m', 'divider_highlight_group': 'background:divider'}]) with replace_attr(common, '_get_uptime', lambda: 65536):
self.assertEqual(common.uptime(pl=pl), [{'contents': '0d 18h 12m', 'divider_highlight_group': 'background:divider'}])
def _get_uptime(): def _get_uptime():
raise NotImplementedError raise NotImplementedError
with replace_module_attr(common, '_get_uptime', _get_uptime): with replace_attr(common, '_get_uptime', _get_uptime):
self.assertEqual(common.uptime(), None) self.assertEqual(common.uptime(pl=pl), None)
def test_weather(self): def test_weather(self):
with replace_module_attr(common, 'urllib_read', urllib_read): pl = Pl()
self.assertEqual(common.weather(), [ with replace_attr(common, 'urllib_read', urllib_read):
self.assertEqual(common.weather(pl=pl), [
{'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': ''}, {'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': ''},
{'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '-9°C', 'gradient_level': 30.0} {'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '-9°C', 'gradient_level': 30.0}
]) ])
self.assertEqual(common.weather(temp_coldest=0, temp_hottest=100), [ self.assertEqual(common.weather(pl=pl, temp_coldest=0, temp_hottest=100), [
{'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': ''}, {'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': ''},
{'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '-9°C', 'gradient_level': 0} {'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '-9°C', 'gradient_level': 0}
]) ])
self.assertEqual(common.weather(temp_coldest=-100, temp_hottest=-50), [ self.assertEqual(common.weather(pl=pl, temp_coldest=-100, temp_hottest=-50), [
{'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': ''}, {'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': ''},
{'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '-9°C', 'gradient_level': 100} {'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '-9°C', 'gradient_level': 100}
]) ])
self.assertEqual(common.weather(icons={'cloudy': 'o'}), [ self.assertEqual(common.weather(pl=pl, icons={'cloudy': 'o'}), [
{'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': 'o '}, {'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': 'o '},
{'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '-9°C', 'gradient_level': 30.0} {'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '-9°C', 'gradient_level': 30.0}
]) ])
self.assertEqual(common.weather(icons={'partly_cloudy_day': 'x'}), [ self.assertEqual(common.weather(pl=pl, icons={'partly_cloudy_day': 'x'}), [
{'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': 'x '}, {'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': 'x '},
{'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '-9°C', 'gradient_level': 30.0} {'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '-9°C', 'gradient_level': 30.0}
]) ])
self.assertEqual(common.weather(unit='F'), [ self.assertEqual(common.weather(pl=pl, unit='F'), [
{'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': ''}, {'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': ''},
{'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '16°F', 'gradient_level': 30.0} {'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '16°F', 'gradient_level': 30.0}
]) ])
self.assertEqual(common.weather(unit='K'), [ self.assertEqual(common.weather(pl=pl, unit='K'), [
{'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': ''}, {'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': ''},
{'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '264K', 'gradient_level': 30.0} {'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '264K', 'gradient_level': 30.0}
]) ])
self.assertEqual(common.weather(temp_format='{temp:.1e}C'), [ self.assertEqual(common.weather(pl=pl, temp_format='{temp:.1e}C'), [
{'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': ''}, {'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_condition_partly_cloudy_day', 'weather_condition_cloudy', 'weather_conditions', 'weather'], 'contents': ''},
{'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '-9.0e+00C', 'gradient_level': 30.0} {'draw_divider': False, 'divider_highlight_group': 'background:divider', 'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'], 'contents': '-9.0e+00C', 'gradient_level': 30.0}
]) ])
def test_system_load(self): def test_system_load(self):
pl = Pl()
with replace_module_module(common, 'os', getloadavg=lambda: (7.5, 3.5, 1.5)): with replace_module_module(common, 'os', getloadavg=lambda: (7.5, 3.5, 1.5)):
with replace_module_attr(common, 'cpu_count', lambda: 2): with replace_attr(common, 'cpu_count', lambda: 2):
self.assertEqual(common.system_load(), self.assertEqual(common.system_load(pl=pl),
[{'contents': '7.5 ', 'highlight_group': ['system_load_gradient', 'system_load'], 'draw_divider': True, 'divider_highlight_group': 'background:divider', 'gradient_level': 100}, [{'contents': '7.5 ', 'highlight_group': ['system_load_gradient', 'system_load'], 'draw_divider': True, 'divider_highlight_group': 'background:divider', 'gradient_level': 100},
{'contents': '3.5 ', 'highlight_group': ['system_load_gradient', 'system_load'], 'draw_divider': False, 'divider_highlight_group': 'background:divider', 'gradient_level': 75.0}, {'contents': '3.5 ', 'highlight_group': ['system_load_gradient', 'system_load'], 'draw_divider': False, 'divider_highlight_group': 'background:divider', 'gradient_level': 75.0},
{'contents': '1.5', 'highlight_group': ['system_load_gradient', 'system_load'], 'draw_divider': False, 'divider_highlight_group': 'background:divider', 'gradient_level': 0}]) {'contents': '1.5', 'highlight_group': ['system_load_gradient', 'system_load'], 'draw_divider': False, 'divider_highlight_group': 'background:divider', 'gradient_level': 0}])
self.assertEqual(common.system_load(format='{avg:.0f}', threshold_good=0, threshold_bad=1), self.assertEqual(common.system_load(pl=pl, format='{avg:.0f}', threshold_good=0, threshold_bad=1),
[{'contents': '8 ', 'highlight_group': ['system_load_gradient', 'system_load'], 'draw_divider': True, 'divider_highlight_group': 'background:divider', 'gradient_level': 100}, [{'contents': '8 ', 'highlight_group': ['system_load_gradient', 'system_load'], 'draw_divider': True, 'divider_highlight_group': 'background:divider', 'gradient_level': 100},
{'contents': '4 ', 'highlight_group': ['system_load_gradient', 'system_load'], 'draw_divider': False, 'divider_highlight_group': 'background:divider', 'gradient_level': 100}, {'contents': '4 ', 'highlight_group': ['system_load_gradient', 'system_load'], 'draw_divider': False, 'divider_highlight_group': 'background:divider', 'gradient_level': 100},
{'contents': '2', 'highlight_group': ['system_load_gradient', 'system_load'], 'draw_divider': False, 'divider_highlight_group': 'background:divider', 'gradient_level': 75.0}]) {'contents': '2', 'highlight_group': ['system_load_gradient', 'system_load'], 'draw_divider': False, 'divider_highlight_group': 'background:divider', 'gradient_level': 75.0}])
def test_cpu_load_percent(self): def test_cpu_load_percent(self):
pl = Pl()
with replace_module_module(common, 'psutil', cpu_percent=lambda **kwargs: 52.3): with replace_module_module(common, 'psutil', cpu_percent=lambda **kwargs: 52.3):
self.assertEqual(common.cpu_load_percent(), '52%') self.assertEqual(common.cpu_load_percent(pl=pl), '52%')
def test_network_load(self): def test_network_load(self):
def _get_bytes(interface): def gb(interface):
return None return None
with replace_module_attr(common, '_get_bytes', _get_bytes):
self.assertEqual(common.network_load(), None)
l = [0, 0]
def _get_bytes2(interface): f = [gb]
l[0] += 1200 def _get_bytes(interface):
l[1] += 2400 return f[0](interface)
return tuple(l)
pl = Pl()
with replace_attr(common, '_get_bytes', _get_bytes):
common.network_load.startup(pl=pl)
self.assertEqual(common.network_load(pl=pl), None)
common.network_load.sleep(0)
self.assertEqual(common.network_load(pl=pl), None)
l = [0, 0]
def gb2(interface):
l[0] += 1200
l[1] += 2400
return tuple(l)
f[0] = gb2
from imp import reload
reload(common)
with replace_module_attr(common, '_get_bytes', _get_bytes2):
common.network_load.startup()
common.network_load.sleep(0) common.network_load.sleep(0)
common.network_load.sleep(0) common.network_load.sleep(0)
self.assertEqual(common.network_load(), [ self.assertEqual(common.network_load(pl=pl), [
{'divider_highlight_group': 'background:divider', 'contents': '⬇ 1 KiB/s', 'highlight_group': ['network_load_recv', 'network_load']}, {'divider_highlight_group': 'background:divider', 'contents': '⬇ 1 KiB/s', 'highlight_group': ['network_load_recv', 'network_load']},
{'divider_highlight_group': 'background:divider', 'contents': '⬆ 2 KiB/s', 'highlight_group': ['network_load_sent', 'network_load']}, {'divider_highlight_group': 'background:divider', 'contents': '⬆ 2 KiB/s', 'highlight_group': ['network_load_sent', 'network_load']},
]) ])
self.assertEqual(common.network_load(recv_format='r {value}', sent_format='s {value}'), [ self.assertEqual(common.network_load(pl=pl, recv_format='r {value}', sent_format='s {value}'), [
{'divider_highlight_group': 'background:divider', 'contents': 'r 1 KiB/s', 'highlight_group': ['network_load_recv', 'network_load']}, {'divider_highlight_group': 'background:divider', 'contents': 'r 1 KiB/s', 'highlight_group': ['network_load_recv', 'network_load']},
{'divider_highlight_group': 'background:divider', 'contents': 's 2 KiB/s', 'highlight_group': ['network_load_sent', 'network_load']}, {'divider_highlight_group': 'background:divider', 'contents': 's 2 KiB/s', 'highlight_group': ['network_load_sent', 'network_load']},
]) ])
self.assertEqual(common.network_load(recv_format='r {value}', sent_format='s {value}', suffix='bps'), [ self.assertEqual(common.network_load(pl=pl, recv_format='r {value}', sent_format='s {value}', suffix='bps'), [
{'divider_highlight_group': 'background:divider', 'contents': 'r 1 Kibps', 'highlight_group': ['network_load_recv', 'network_load']}, {'divider_highlight_group': 'background:divider', 'contents': 'r 1 Kibps', 'highlight_group': ['network_load_recv', 'network_load']},
{'divider_highlight_group': 'background:divider', 'contents': 's 2 Kibps', 'highlight_group': ['network_load_sent', 'network_load']}, {'divider_highlight_group': 'background:divider', 'contents': 's 2 Kibps', 'highlight_group': ['network_load_sent', 'network_load']},
]) ])
self.assertEqual(common.network_load(recv_format='r {value}', sent_format='s {value}', si_prefix=True), [ self.assertEqual(common.network_load(pl=pl, recv_format='r {value}', sent_format='s {value}', si_prefix=True), [
{'divider_highlight_group': 'background:divider', 'contents': 'r 1 kB/s', 'highlight_group': ['network_load_recv', 'network_load']}, {'divider_highlight_group': 'background:divider', 'contents': 'r 1 kB/s', 'highlight_group': ['network_load_recv', 'network_load']},
{'divider_highlight_group': 'background:divider', 'contents': 's 2 kB/s', 'highlight_group': ['network_load_sent', 'network_load']}, {'divider_highlight_group': 'background:divider', 'contents': 's 2 kB/s', 'highlight_group': ['network_load_sent', 'network_load']},
]) ])
self.assertEqual(common.network_load(recv_format='r {value}', sent_format='s {value}', recv_max=0), [ self.assertEqual(common.network_load(pl=pl, recv_format='r {value}', sent_format='s {value}', recv_max=0), [
{'divider_highlight_group': 'background:divider', 'contents': 'r 1 KiB/s', 'highlight_group': ['network_load_recv_gradient', 'network_load_gradient', 'network_load_recv', 'network_load'], 'gradient_level': 100}, {'divider_highlight_group': 'background:divider', 'contents': 'r 1 KiB/s', 'highlight_group': ['network_load_recv_gradient', 'network_load_gradient', 'network_load_recv', 'network_load'], 'gradient_level': 100},
{'divider_highlight_group': 'background:divider', 'contents': 's 2 KiB/s', 'highlight_group': ['network_load_sent', 'network_load']}, {'divider_highlight_group': 'background:divider', 'contents': 's 2 KiB/s', 'highlight_group': ['network_load_sent', 'network_load']},
]) ])
class ApproxEqual(object): class ApproxEqual(object):
def __eq__(self, i): def __eq__(self, i):
return abs(i - 50.0) < 1 return abs(i - 50.0) < 1
self.assertEqual(common.network_load(recv_format='r {value}', sent_format='s {value}', sent_max=4800), [ self.assertEqual(common.network_load(pl=pl, recv_format='r {value}', sent_format='s {value}', sent_max=4800), [
{'divider_highlight_group': 'background:divider', 'contents': 'r 1 KiB/s', 'highlight_group': ['network_load_recv', 'network_load']}, {'divider_highlight_group': 'background:divider', 'contents': 'r 1 KiB/s', 'highlight_group': ['network_load_recv', 'network_load']},
{'divider_highlight_group': 'background:divider', 'contents': 's 2 KiB/s', 'highlight_group': ['network_load_sent_gradient', 'network_load_gradient', 'network_load_sent', 'network_load'], 'gradient_level': ApproxEqual()}, {'divider_highlight_group': 'background:divider', 'contents': 's 2 KiB/s', 'highlight_group': ['network_load_sent_gradient', 'network_load_gradient', 'network_load_sent', 'network_load'], 'gradient_level': ApproxEqual()},
]) ])
def test_virtualenv(self): def test_virtualenv(self):
with replace_env('VIRTUAL_ENV', '/abc/def/ghi'): with replace_env('VIRTUAL_ENV', '/abc/def/ghi') as pl:
self.assertEqual(common.virtualenv(), 'ghi') self.assertEqual(common.virtualenv(pl=pl), 'ghi')
os.environ.pop('VIRTUAL_ENV') pl.environ.pop('VIRTUAL_ENV')
self.assertEqual(common.virtualenv(), None) self.assertEqual(common.virtualenv(pl=pl), None)
def test_email_imap_alert(self): def test_email_imap_alert(self):
# TODO # TODO
@ -253,131 +262,145 @@ class TestCommon(TestCase):
class TestVim(TestCase): class TestVim(TestCase):
def test_mode(self): def test_mode(self):
pl = Pl()
segment_info = vim_module._get_segment_info() segment_info = vim_module._get_segment_info()
self.assertEqual(vim.mode(segment_info=segment_info), 'NORMAL') self.assertEqual(vim.mode(pl=pl, segment_info=segment_info), 'NORMAL')
self.assertEqual(vim.mode(segment_info=segment_info, override={'i': 'INS'}), 'NORMAL') self.assertEqual(vim.mode(pl=pl, segment_info=segment_info, override={'i': 'INS'}), 'NORMAL')
self.assertEqual(vim.mode(segment_info=segment_info, override={'n': 'NORM'}), 'NORM') self.assertEqual(vim.mode(pl=pl, segment_info=segment_info, override={'n': 'NORM'}), 'NORM')
with vim_module._with('mode', 'i') as segment_info: with vim_module._with('mode', 'i') as segment_info:
self.assertEqual(vim.mode(segment_info=segment_info), 'INSERT') self.assertEqual(vim.mode(pl=pl, segment_info=segment_info), 'INSERT')
with vim_module._with('mode', chr(ord('V') - 0x40)) as segment_info: with vim_module._with('mode', chr(ord('V') - 0x40)) as segment_info:
self.assertEqual(vim.mode(segment_info=segment_info), 'V·BLCK') self.assertEqual(vim.mode(pl=pl, segment_info=segment_info), 'V·BLCK')
self.assertEqual(vim.mode(segment_info=segment_info, override={'^V': 'VBLK'}), 'VBLK') self.assertEqual(vim.mode(pl=pl, segment_info=segment_info, override={'^V': 'VBLK'}), 'VBLK')
def test_modified_indicator(self): def test_modified_indicator(self):
pl = Pl()
segment_info = vim_module._get_segment_info() segment_info = vim_module._get_segment_info()
self.assertEqual(vim.modified_indicator(segment_info=segment_info), None) self.assertEqual(vim.modified_indicator(pl=pl, segment_info=segment_info), None)
segment_info['buffer'][0] = 'abc' segment_info['buffer'][0] = 'abc'
try: try:
self.assertEqual(vim.modified_indicator(segment_info=segment_info), '+') self.assertEqual(vim.modified_indicator(pl=pl, segment_info=segment_info), '+')
self.assertEqual(vim.modified_indicator(segment_info=segment_info, text='-'), '-') self.assertEqual(vim.modified_indicator(pl=pl, segment_info=segment_info, text='-'), '-')
finally: finally:
vim_module._bw(segment_info['bufnr']) vim_module._bw(segment_info['bufnr'])
def test_paste_indicator(self): def test_paste_indicator(self):
pl = Pl()
segment_info = vim_module._get_segment_info() segment_info = vim_module._get_segment_info()
self.assertEqual(vim.paste_indicator(segment_info=segment_info), None) self.assertEqual(vim.paste_indicator(pl=pl, segment_info=segment_info), None)
with vim_module._with('options', paste=1): with vim_module._with('options', paste=1):
self.assertEqual(vim.paste_indicator(segment_info=segment_info), 'PASTE') self.assertEqual(vim.paste_indicator(pl=pl, segment_info=segment_info), 'PASTE')
self.assertEqual(vim.paste_indicator(segment_info=segment_info, text='P'), 'P') self.assertEqual(vim.paste_indicator(pl=pl, segment_info=segment_info, text='P'), 'P')
def test_readonly_indicator(self): def test_readonly_indicator(self):
pl = Pl()
segment_info = vim_module._get_segment_info() segment_info = vim_module._get_segment_info()
self.assertEqual(vim.readonly_indicator(segment_info=segment_info), None) self.assertEqual(vim.readonly_indicator(pl=pl, segment_info=segment_info), None)
with vim_module._with('bufoptions', readonly=1): with vim_module._with('bufoptions', readonly=1):
self.assertEqual(vim.readonly_indicator(segment_info=segment_info), '') self.assertEqual(vim.readonly_indicator(pl=pl, segment_info=segment_info), '')
self.assertEqual(vim.readonly_indicator(segment_info=segment_info, text='L'), 'L') self.assertEqual(vim.readonly_indicator(pl=pl, segment_info=segment_info, text='L'), 'L')
def test_file_directory(self): def test_file_directory(self):
pl = Pl()
segment_info = vim_module._get_segment_info() segment_info = vim_module._get_segment_info()
self.assertEqual(vim.file_directory(segment_info=segment_info), None) self.assertEqual(vim.file_directory(pl=pl, segment_info=segment_info), None)
with replace_env('HOME', '/home/foo'): with replace_env('HOME', '/home/foo', os.environ):
with vim_module._with('buffer', '/tmp/abc') as segment_info: with vim_module._with('buffer', '/tmp/abc') as segment_info:
self.assertEqual(vim.file_directory(segment_info=segment_info), '/tmp/') self.assertEqual(vim.file_directory(pl=pl, segment_info=segment_info), '/tmp/')
os.environ['HOME'] = '/tmp' os.environ['HOME'] = '/tmp'
self.assertEqual(vim.file_directory(segment_info=segment_info), '~/') self.assertEqual(vim.file_directory(pl=pl, segment_info=segment_info), '~/')
def test_file_name(self): def test_file_name(self):
pl = Pl()
segment_info = vim_module._get_segment_info() segment_info = vim_module._get_segment_info()
self.assertEqual(vim.file_name(segment_info=segment_info), None) self.assertEqual(vim.file_name(pl=pl, segment_info=segment_info), None)
self.assertEqual(vim.file_name(segment_info=segment_info, display_no_file=True), self.assertEqual(vim.file_name(pl=pl, segment_info=segment_info, display_no_file=True),
[{'contents': '[No file]', 'highlight_group': ['file_name_no_file', 'file_name']}]) [{'contents': '[No file]', 'highlight_group': ['file_name_no_file', 'file_name']}])
self.assertEqual(vim.file_name(segment_info=segment_info, display_no_file=True, no_file_text='X'), self.assertEqual(vim.file_name(pl=pl, segment_info=segment_info, display_no_file=True, no_file_text='X'),
[{'contents': 'X', 'highlight_group': ['file_name_no_file', 'file_name']}]) [{'contents': 'X', 'highlight_group': ['file_name_no_file', 'file_name']}])
with vim_module._with('buffer', '/tmp/abc') as segment_info: with vim_module._with('buffer', '/tmp/abc') as segment_info:
self.assertEqual(vim.file_name(segment_info=segment_info), 'abc') self.assertEqual(vim.file_name(pl=pl, segment_info=segment_info), 'abc')
with vim_module._with('buffer', '/tmp/') as segment_info: with vim_module._with('buffer', '/tmp/') as segment_info:
self.assertEqual(vim.file_name(segment_info=segment_info), '') self.assertEqual(vim.file_name(pl=pl, segment_info=segment_info), '')
def test_file_size(self): def test_file_size(self):
pl = Pl()
segment_info = vim_module._get_segment_info() segment_info = vim_module._get_segment_info()
self.assertEqual(vim.file_size(segment_info=segment_info), '0 B') self.assertEqual(vim.file_size(pl=pl, segment_info=segment_info), '0 B')
with vim_module._with('buffer', os.path.join(os.path.dirname(__file__), 'empty')) as segment_info: with vim_module._with('buffer', os.path.join(os.path.dirname(__file__), 'empty')) as segment_info:
self.assertEqual(vim.file_size(segment_info=segment_info), '0 B') self.assertEqual(vim.file_size(pl=pl, segment_info=segment_info), '0 B')
def test_file_opts(self): def test_file_opts(self):
pl = Pl()
segment_info = vim_module._get_segment_info() segment_info = vim_module._get_segment_info()
self.assertEqual(vim.file_format(segment_info=segment_info), self.assertEqual(vim.file_format(pl=pl, segment_info=segment_info),
[{'divider_highlight_group': 'background:divider', 'contents': 'unix'}]) [{'divider_highlight_group': 'background:divider', 'contents': 'unix'}])
self.assertEqual(vim.file_encoding(segment_info=segment_info), self.assertEqual(vim.file_encoding(pl=pl, segment_info=segment_info),
[{'divider_highlight_group': 'background:divider', 'contents': 'utf-8'}]) [{'divider_highlight_group': 'background:divider', 'contents': 'utf-8'}])
self.assertEqual(vim.file_type(segment_info=segment_info), None) self.assertEqual(vim.file_type(pl=pl, segment_info=segment_info), None)
with vim_module._with('bufoptions', filetype='python'): with vim_module._with('bufoptions', filetype='python'):
self.assertEqual(vim.file_type(segment_info=segment_info), self.assertEqual(vim.file_type(pl=pl, segment_info=segment_info),
[{'divider_highlight_group': 'background:divider', 'contents': 'python'}]) [{'divider_highlight_group': 'background:divider', 'contents': 'python'}])
def test_line_percent(self): def test_line_percent(self):
pl = Pl()
segment_info = vim_module._get_segment_info() segment_info = vim_module._get_segment_info()
segment_info['buffer'][0:-1] = [str(i) for i in range(100)] segment_info['buffer'][0:-1] = [str(i) for i in range(100)]
try: try:
self.assertEqual(vim.line_percent(segment_info=segment_info), '1') self.assertEqual(vim.line_percent(pl=pl, segment_info=segment_info), '1')
vim_module._set_cursor(50, 0) vim_module._set_cursor(50, 0)
self.assertEqual(vim.line_percent(segment_info=segment_info), '50') self.assertEqual(vim.line_percent(pl=pl, segment_info=segment_info), '50')
self.assertEqual(vim.line_percent(segment_info=segment_info, gradient=True), self.assertEqual(vim.line_percent(pl=pl, segment_info=segment_info, gradient=True),
[{'contents': '50', 'highlight_group': ['line_percent_gradient', 'line_percent'], 'gradient_level': 50 * 100.0 / 101}]) [{'contents': '50', 'highlight_group': ['line_percent_gradient', 'line_percent'], 'gradient_level': 50 * 100.0 / 101}])
finally: finally:
vim_module._bw(segment_info['bufnr']) vim_module._bw(segment_info['bufnr'])
def test_cursor_current(self): def test_cursor_current(self):
pl = Pl()
segment_info = vim_module._get_segment_info() segment_info = vim_module._get_segment_info()
self.assertEqual(vim.line_current(segment_info=segment_info), '1') self.assertEqual(vim.line_current(pl=pl, segment_info=segment_info), '1')
self.assertEqual(vim.col_current(segment_info=segment_info), '1') self.assertEqual(vim.col_current(pl=pl, segment_info=segment_info), '1')
self.assertEqual(vim.virtcol_current(segment_info=segment_info), self.assertEqual(vim.virtcol_current(pl=pl, segment_info=segment_info),
[{'highlight_group': ['virtcol_current', 'col_current'], 'contents': '1'}]) [{'highlight_group': ['virtcol_current', 'col_current'], 'contents': '1'}])
def test_modified_buffers(self): def test_modified_buffers(self):
self.assertEqual(vim.modified_buffers(), None) pl = Pl()
self.assertEqual(vim.modified_buffers(pl=pl), None)
def test_branch(self): def test_branch(self):
pl = Pl()
with vim_module._with('buffer', '/foo') as segment_info: with vim_module._with('buffer', '/foo') as segment_info:
with replace_module_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: None, directory=path)): with replace_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: None, directory=path)):
self.assertEqual(vim.branch(segment_info=segment_info), self.assertEqual(vim.branch(pl=pl, segment_info=segment_info),
[{'divider_highlight_group': 'branch:divider', 'highlight_group': ['branch'], 'contents': 'foo'}]) [{'divider_highlight_group': 'branch:divider', 'highlight_group': ['branch'], 'contents': 'foo'}])
self.assertEqual(vim.branch(segment_info=segment_info, status_colors=True), self.assertEqual(vim.branch(pl=pl, segment_info=segment_info, status_colors=True),
[{'divider_highlight_group': 'branch:divider', 'highlight_group': ['branch_clean', 'branch'], 'contents': 'foo'}]) [{'divider_highlight_group': 'branch:divider', 'highlight_group': ['branch_clean', 'branch'], 'contents': 'foo'}])
with replace_module_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: 'DU', directory=path)): with replace_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: 'DU', directory=path)):
self.assertEqual(vim.branch(segment_info=segment_info), self.assertEqual(vim.branch(pl=pl, segment_info=segment_info),
[{'divider_highlight_group': 'branch:divider', 'highlight_group': ['branch'], 'contents': 'foo'}]) [{'divider_highlight_group': 'branch:divider', 'highlight_group': ['branch'], 'contents': 'foo'}])
self.assertEqual(vim.branch(segment_info=segment_info, status_colors=True), self.assertEqual(vim.branch(pl=pl, segment_info=segment_info, status_colors=True),
[{'divider_highlight_group': 'branch:divider', 'highlight_group': ['branch_dirty', 'branch'], 'contents': 'foo'}]) [{'divider_highlight_group': 'branch:divider', 'highlight_group': ['branch_dirty', 'branch'], 'contents': 'foo'}])
def test_file_vcs_status(self): def test_file_vcs_status(self):
pl = Pl()
with vim_module._with('buffer', '/foo') as segment_info: with vim_module._with('buffer', '/foo') as segment_info:
with replace_module_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda file: 'M', directory=path)): with replace_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda file: 'M', directory=path)):
self.assertEqual(vim.file_vcs_status(segment_info=segment_info), self.assertEqual(vim.file_vcs_status(pl=pl, segment_info=segment_info),
[{'highlight_group': ['file_vcs_status_M', 'file_vcs_status'], 'contents': 'M'}]) [{'highlight_group': ['file_vcs_status_M', 'file_vcs_status'], 'contents': 'M'}])
with replace_module_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda file: None, directory=path)): with replace_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda file: None, directory=path)):
self.assertEqual(vim.file_vcs_status(segment_info=segment_info), None) self.assertEqual(vim.file_vcs_status(pl=pl, segment_info=segment_info), None)
with vim_module._with('buffer', '/bar') as segment_info: with vim_module._with('buffer', '/bar') as segment_info:
with vim_module._with('bufoptions', buftype='nofile'): with vim_module._with('bufoptions', buftype='nofile'):
with replace_module_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda file: 'M', directory=path)): with replace_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda file: 'M', directory=path)):
self.assertEqual(vim.file_vcs_status(segment_info=segment_info), None) self.assertEqual(vim.file_vcs_status(pl=pl, segment_info=segment_info), None)
def test_repository_status(self): def test_repository_status(self):
pl = Pl()
segment_info = vim_module._get_segment_info() segment_info = vim_module._get_segment_info()
with replace_module_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: None, directory=path)): with replace_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: None, directory=path)):
self.assertEqual(vim.repository_status(segment_info=segment_info), None) self.assertEqual(vim.repository_status(pl=pl, segment_info=segment_info), None)
with replace_module_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: 'DU', directory=path)): with replace_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: 'DU', directory=path)):
self.assertEqual(vim.repository_status(segment_info=segment_info), 'DU') self.assertEqual(vim.repository_status(pl=pl, segment_info=segment_info), 'DU')
old_cwd = None old_cwd = None