Replace pl.environ/getcwd/home with segment_info

This commit is contained in:
ZyX 2013-03-30 21:55:00 +04:00
parent 854216810e
commit 3ebc16a48c
13 changed files with 151 additions and 105 deletions

View File

@ -64,10 +64,7 @@ def load_json_config(config_file_path, load=json.load, open_file=open_file):
class PowerlineState(object):
def __init__(self, use_daemon_threads, logger, ext, environ, getcwd, home):
self.environ = environ
self.getcwd = getcwd
self.home = home or environ.get('HOME', None)
def __init__(self, use_daemon_threads, logger, ext):
self.logger = logger
self.ext = ext
self.use_daemon_threads = use_daemon_threads
@ -124,14 +121,6 @@ class Powerline(object):
during python session.
:param Logger logger:
If present, no new logger will be created and this logger will be used.
:param dict environ:
Object with ``.__getitem__`` and ``.get`` methods used to obtain
environment variables. Defaults to ``os.environ``.
:param func getcwd:
Function used to get current working directory. Defaults to
``os.getcwdu`` or ``os.getcwd``.
:param str home:
Home directory. Defaults to ``environ.get('HOME')``.
'''
def __init__(self,
@ -139,18 +128,12 @@ class Powerline(object):
renderer_module=None,
run_once=False,
logger=None,
use_daemon_threads=True,
environ=os.environ,
getcwd=getattr(os, 'getcwdu', os.getcwd),
home=None):
use_daemon_threads=True):
global watcher
self.ext = ext
self.renderer_module = renderer_module or ext
self.run_once = run_once
self.logger = logger
self.environ = environ
self.getcwd = getcwd
self.home = home
self.use_daemon_threads = use_daemon_threads
if '.' not in self.renderer_module:
@ -221,7 +204,7 @@ class Powerline(object):
self.logger.setLevel(level)
self.logger.addHandler(handler)
self.pl = PowerlineState(self.use_daemon_threads, self.logger, self.ext, self.environ, self.getcwd, self.home)
self.pl = PowerlineState(self.use_daemon_threads, self.logger, self.ext)
self.renderer_options.update(
pl=self.pl,

View File

@ -77,6 +77,9 @@ class Environment(object):
return default
environ = Environment()
class Prompt(object):
__slots__ = ('powerline', 'side', 'savedpsvar', 'savedps', 'args')
@ -88,7 +91,11 @@ class Prompt(object):
self.args = powerline.args
def __str__(self):
r = self.powerline.render(width=zsh.columns(), side=self.side, segment_info=self.args)
r = self.powerline.render(
width=zsh.columns(),
side=self.side,
segment_info={'args': self.args, 'environ': environ}
)
if type(r) is not str:
if type(r) is bytes:
return r.decode('utf-8')
@ -113,8 +120,7 @@ def set_prompt(powerline, psvar, side):
def setup():
environ = Environment()
powerline = ShellPowerline(Args(), environ=environ, getcwd=lambda: environ['PWD'])
powerline = ShellPowerline(Args())
used_powerlines.append(powerline)
used_powerlines.append(powerline)
set_prompt(powerline, 'PS1', 'left')

View File

@ -2,6 +2,7 @@
from powerline.theme import Theme
from unicodedata import east_asian_width, combining
import os
try:
@ -18,7 +19,19 @@ def construct_returned_value(rendered_highlighted, segments, output_raw):
class Renderer(object):
def __init__(self, theme_config, local_themes, theme_kwargs, colorscheme, pl, **options):
segment_info = {
'environ': os.environ,
'getcwd': getattr(os, 'getcwdu', os.getcwd),
'home': os.environ.get('HOME'),
}
def __init__(self,
theme_config,
local_themes,
theme_kwargs,
colorscheme,
pl,
**options):
self.__dict__.update(options)
self.theme_config = theme_config
theme_kwargs['pl'] = pl
@ -53,6 +66,9 @@ class Renderer(object):
segment['divider_highlight'] = None
return segment
def get_segment_info(self, segment_info):
return segment_info or self.segment_info
def render(self, mode=None, width=None, side=None, output_raw=False, segment_info=None, matcher_info=None):
'''Render all segments.
@ -63,7 +79,7 @@ class Renderer(object):
reached.
'''
theme = self.get_theme(matcher_info)
segments = theme.get_segments(side, segment_info)
segments = theme.get_segments(side, self.get_segment_info(segment_info))
# Handle excluded/included segments for the current mode
segments = [self.get_highlighting(segment, mode) for segment in segments

View File

@ -9,6 +9,11 @@ class IpythonRenderer(ShellRenderer):
escape_hl_start = '\x01'
escape_hl_end = '\x02'
def get_segment_info(self, segment_info):
r = self.segment_info.copy()
r['ipython'] = segment_info
return r
def get_theme(self, matcher_info):
if matcher_info == 'in':
return self.theme

View File

@ -19,6 +19,13 @@ class ShellRenderer(Renderer):
tmux_escape = False
screen_escape = False
def get_segment_info(self, segment_info):
r = self.segment_info.copy()
r.update(segment_info)
if 'PWD' in r['environ']:
r['getcwd'] = lambda: r['environ']['PWD']
return r
def hlstyle(self, fg=None, bg=None, attr=None):
'''Highlight a segment.

View File

@ -79,8 +79,14 @@ class VimRenderer(Renderer):
}
segment_info['buffer'] = segment_info['window'].buffer
segment_info['bufnr'] = segment_info['buffer'].number
segment_info.update(self.segment_info)
winwidth = segment_info['window'].width
statusline = super(VimRenderer, self).render(mode, winwidth, segment_info=segment_info, matcher_info=segment_info)
statusline = super(VimRenderer, self).render(
mode=mode,
width=winwidth,
segment_info=segment_info,
matcher_info=segment_info,
)
return statusline
def reset_highlight(self):

View File

@ -15,29 +15,32 @@ from powerline.lib.vcs import guess
from powerline.lib.threaded import ThreadedSegment, KwThreadedSegment, with_docstring
from powerline.lib.time import monotonic
from powerline.lib.humanize_bytes import humanize_bytes
from powerline.theme import requires_segment_info
from collections import namedtuple
from time import sleep
def hostname(pl, only_if_ssh=False):
@requires_segment_info
def hostname(pl, segment_info, only_if_ssh=False):
'''Return the current hostname.
:param bool only_if_ssh:
only return the hostname if currently in an SSH session
'''
if only_if_ssh and not pl.environ.get('SSH_CLIENT'):
if only_if_ssh and not segment_info['environ'].get('SSH_CLIENT'):
return None
return socket.gethostname()
@requires_segment_info
class RepositorySegment(KwThreadedSegment):
def __init__(self):
super(RepositorySegment, self).__init__()
self.directories = {}
@staticmethod
def key(pl, **kwargs):
return os.path.abspath(pl.getcwd())
def key(segment_info, **kwargs):
return os.path.abspath(segment_info['getcwd']())
def update(self, *args):
# .compute_state() is running only in this method, and only in one
@ -110,7 +113,8 @@ Highlight groups used: ``branch_clean``, ``branch_dirty``, ``branch``.
''')
def cwd(pl, dir_shorten_len=None, dir_limit_depth=None):
@requires_segment_info
def cwd(pl, segment_info, dir_shorten_len=None, dir_limit_depth=None):
'''Return the current working directory.
Returns a segment list to create a breadcrumb-like effect.
@ -127,7 +131,7 @@ def cwd(pl, dir_shorten_len=None, dir_limit_depth=None):
'''
import re
try:
cwd = pl.getcwd()
cwd = segment_info['getcwd']()
except OSError as e:
if e.errno == 2:
# user most probably deleted the directory
@ -136,7 +140,7 @@ def cwd(pl, dir_shorten_len=None, dir_limit_depth=None):
cwd = "[not found]"
else:
raise
home = pl.home
home = segment_info['home']
if home:
cwd = re.sub('^' + re.escape(home), '~', cwd, 1)
cwd_split = cwd.split(os.sep)
@ -540,7 +544,7 @@ try:
if data:
yield interface, data.bytes_recv, data.bytes_sent
def _get_user(pl):
def _get_user(segment_info):
return psutil.Process(os.getpid()).username
def cpu_load_percent(pl, measure_interval=.5):
@ -567,8 +571,8 @@ except ImportError:
if x is not None:
yield interface, x[0], x[1]
def _get_user(pl): # NOQA
return pl.environ.get('USER', None)
def _get_user(segment_info): # NOQA
return segment_info['environ'].get('USER', None)
def cpu_load_percent(pl, measure_interval=.5): # NOQA
'''Return the average CPU load as a percentage.
@ -587,7 +591,7 @@ username = False
_geteuid = getattr(os, 'geteuid', lambda: 1)
def user(pl):
def user(pl, segment_info=None):
'''Return the current user.
Highlights the user with the ``superuser`` if the effective user ID is 0.
@ -605,6 +609,8 @@ def user(pl):
'contents': username,
'highlight_group': 'user' if euid != 0 else ['superuser', 'user'],
}]
if 'psutil' in globals():
user = requires_segment_info(user)
if os.path.exists('/proc/uptime'):
@ -765,9 +771,10 @@ Highlight groups used: ``network_load_sent_gradient`` (gradient) or ``network_lo
''')
def virtualenv(pl):
@requires_segment_info
def virtualenv(pl, segment_info):
'''Return the name of the current Python virtualenv.'''
return os.path.basename(pl.environ.get('VIRTUAL_ENV', '')) or None
return os.path.basename(segment_info['environ'].get('VIRTUAL_ENV', '')) or None
_IMAPKey = namedtuple('Key', 'username password server port folder')

View File

@ -5,4 +5,4 @@ from powerline.theme import requires_segment_info
@requires_segment_info
def prompt_count(pl, segment_info):
return str(segment_info.prompt_count)
return str(segment_info['ipython'].prompt_count)

View File

@ -9,9 +9,9 @@ def last_status(pl, segment_info):
Highlight groups used: ``exit_fail``
'''
if not segment_info.last_exit_code:
if not segment_info['args'].last_exit_code:
return None
return [{'contents': str(segment_info.last_exit_code), 'highlight_group': 'exit_fail'}]
return [{'contents': str(segment_info['args'].last_exit_code), 'highlight_group': 'exit_fail'}]
@requires_segment_info
@ -20,8 +20,9 @@ def last_pipe_status(pl, segment_info):
Highlight groups used: ``exit_fail``, ``exit_success``
'''
if any(segment_info.last_pipe_status):
last_pipe_status = segment_info['args'].last_pipe_status
if any(last_pipe_status):
return [{"contents": str(status), "highlight_group": "exit_fail" if status else "exit_success"}
for status in segment_info.last_pipe_status]
for status in last_pipe_status]
else:
return None

View File

@ -13,11 +13,12 @@ except ImportError:
if __name__ == '__main__':
args = get_argparser(description=__doc__).parse_args()
kwargs = {}
if 'PWD' in os.environ:
kwargs['getcwd'] = lambda: os.environ['PWD']
powerline = ShellPowerline(args, run_once=True, **kwargs)
rendered = powerline.render(width=args.width, side=args.side, segment_info=args)
rendered = powerline.render(
width=args.width,
side=args.side,
segment_info={'args': args, 'environ': os.environ},
)
try:
sys.stdout.write(rendered)
except UnicodeEncodeError:

View File

@ -1,6 +1,7 @@
# vim:fileencoding=utf-8:noet
import imp
import sys
import os
class Pl(object):
@ -8,18 +9,9 @@ class Pl(object):
self.errors = []
self.warns = []
self.debugs = []
self._cwd = None
self.prefix = None
self.environ = {}
self.home = None
self.use_daemon_threads = True
def getcwd(self):
if isinstance(self._cwd, Exception):
raise self._cwd
else:
return self._cwd
for meth in ('error', 'warn', 'debug'):
exec (('def {0}(self, msg, *args, **kwargs):\n'
' self.{0}s.append((kwargs.get("prefix") or self.prefix, msg, args, kwargs))\n').format(meth))
@ -133,9 +125,7 @@ class ItemReplace(object):
self.d[self.key] = self.old
def replace_env(key, new, d=None):
r = None
if not d:
r = Pl()
d = r.environ
return ItemReplace(d, key, new, r)
def replace_env(key, new, environ=None, **kwargs):
r = kwargs.copy()
r['environ'] = environ or {}
return ItemReplace(r['environ'], key, new, r)

View File

@ -65,17 +65,17 @@ class TestConfig(TestCase):
from powerline.shell import ShellPowerline
args = Args(last_pipe_status=[1, 0], ext=['shell'], renderer_module='zsh_prompt')
with ShellPowerline(args, run_once=False) as powerline:
powerline.render(segment_info=args)
powerline.render(segment_info={'args': args})
with ShellPowerline(args, run_once=False) as powerline:
powerline.render(segment_info=args)
powerline.render(segment_info={'args': args})
def test_bash(self):
from powerline.shell import ShellPowerline
args = Args(last_exit_code=1, ext=['shell'], renderer_module='bash_prompt', config=[('ext', {'shell': {'theme': 'default_leftonly'}})])
with ShellPowerline(args, run_once=False) as powerline:
powerline.render(segment_info=args)
powerline.render(segment_info={'args': args})
with ShellPowerline(args, run_once=False) as powerline:
powerline.render(segment_info=args)
powerline.render(segment_info={'args': args})
def test_ipython(self):
from powerline.ipython import IpythonPowerline

View File

@ -14,15 +14,22 @@ vim = None
class TestShell(TestCase):
def test_last_status(self):
pl = Pl()
self.assertEqual(shell.last_status(pl=pl, segment_info=Args(last_exit_code=10)),
segment_info = {'args': Args(last_exit_code=10)}
self.assertEqual(shell.last_status(pl=pl, segment_info=segment_info),
[{'contents': '10', 'highlight_group': 'exit_fail'}])
self.assertEqual(shell.last_status(pl=pl, segment_info=Args(last_exit_code=None)), None)
segment_info['args'].last_exit_code=0
self.assertEqual(shell.last_status(pl=pl, segment_info=segment_info), None)
segment_info['args'].last_exit_code=None
self.assertEqual(shell.last_status(pl=pl, segment_info=segment_info), None)
def test_last_pipe_status(self):
pl = Pl()
self.assertEqual(shell.last_pipe_status(pl=pl, segment_info=Args(last_pipe_status=[])), None)
self.assertEqual(shell.last_pipe_status(pl=pl, segment_info=Args(last_pipe_status=[0, 0, 0])), None)
self.assertEqual(shell.last_pipe_status(pl=pl, segment_info=Args(last_pipe_status=[0, 2, 0])),
segment_info = {'args': Args(last_pipe_status=[])}
self.assertEqual(shell.last_pipe_status(pl=pl, segment_info=segment_info), None)
segment_info['args'].last_pipe_status=[0, 0, 0]
self.assertEqual(shell.last_pipe_status(pl=pl, segment_info=segment_info), None)
segment_info['args'].last_pipe_status=[0, 2, 0]
self.assertEqual(shell.last_pipe_status(pl=pl, segment_info=segment_info),
[{'contents': '0', 'highlight_group': 'exit_success'},
{'contents': '2', 'highlight_group': 'exit_fail'},
{'contents': '0', 'highlight_group': 'exit_success'}])
@ -30,77 +37,93 @@ class TestShell(TestCase):
class TestCommon(TestCase):
def test_hostname(self):
with replace_env('SSH_CLIENT', '192.168.0.12 40921 22') as pl:
pl = Pl()
with replace_env('SSH_CLIENT', '192.168.0.12 40921 22') as segment_info:
with replace_module_module(common, 'socket', gethostname=lambda: 'abc'):
self.assertEqual(common.hostname(pl=pl), 'abc')
self.assertEqual(common.hostname(pl=pl, only_if_ssh=True), 'abc')
pl.environ.pop('SSH_CLIENT')
self.assertEqual(common.hostname(pl=pl), 'abc')
self.assertEqual(common.hostname(pl=pl, only_if_ssh=True), None)
self.assertEqual(common.hostname(pl=pl, segment_info=segment_info), 'abc')
self.assertEqual(common.hostname(pl=pl, segment_info=segment_info, only_if_ssh=True), 'abc')
segment_info['environ'].pop('SSH_CLIENT')
self.assertEqual(common.hostname(pl=pl, segment_info=segment_info), 'abc')
self.assertEqual(common.hostname(pl=pl, segment_info=segment_info, only_if_ssh=True), None)
def test_user(self):
new_os = new_module('os', getpid=lambda: 1)
new_psutil = new_module('psutil', Process=lambda pid: Args(username='def'))
with replace_env('USER', 'def') as pl:
pl = Pl()
with replace_env('USER', 'def') as segment_info:
with replace_attr(common, 'os', new_os):
with replace_attr(common, 'psutil', new_psutil):
with replace_attr(common, '_geteuid', lambda: 5):
self.assertEqual(common.user(pl=pl), [{'contents': 'def', 'highlight_group': 'user'}])
self.assertEqual(common.user(pl=pl, segment_info=segment_info), [
{'contents': 'def', 'highlight_group': 'user'}
])
with replace_attr(common, '_geteuid', lambda: 0):
self.assertEqual(common.user(pl=pl), [{'contents': 'def', 'highlight_group': ['superuser', 'user']}])
self.assertEqual(common.user(pl=pl, segment_info=segment_info), [
{'contents': 'def', 'highlight_group': ['superuser', 'user']}
])
def test_branch(self):
pl = Pl()
pl._cwd = os.getcwd()
segment_info = {'getcwd': os.getcwd}
with replace_attr(common, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: None, directory='/tmp/tests')):
self.assertEqual(common.branch(pl=pl, status_colors=False), 'tests')
self.assertEqual(common.branch(pl=pl, status_colors=True),
self.assertEqual(common.branch(pl=pl, segment_info=segment_info, status_colors=False), 'tests')
self.assertEqual(common.branch(pl=pl, segment_info=segment_info, status_colors=True),
[{'contents': 'tests', 'highlight_group': ['branch_clean', 'branch']}])
with replace_attr(common, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: 'D ', directory='/tmp/tests')):
self.assertEqual(common.branch(pl=pl, status_colors=False), 'tests')
self.assertEqual(common.branch(pl=pl, status_colors=True),
self.assertEqual(common.branch(pl=pl, segment_info=segment_info, status_colors=False), 'tests')
self.assertEqual(common.branch(pl=pl, segment_info=segment_info, status_colors=True),
[{'contents': 'tests', 'highlight_group': ['branch_dirty', 'branch']}])
self.assertEqual(common.branch(pl=pl), 'tests')
self.assertEqual(common.branch(pl=pl, segment_info=segment_info), 'tests')
with replace_attr(common, 'guess', lambda path: None):
self.assertEqual(common.branch(pl=pl), None)
self.assertEqual(common.branch(pl=pl, segment_info=segment_info), None)
def test_cwd(self):
new_os = new_module('os', path=os.path, sep='/')
pl = Pl()
cwd = [None]
def getcwd():
wd = cwd[0]
if isinstance(wd, Exception):
raise wd
else:
return wd
segment_info = {'getcwd': getcwd, 'home': None}
with replace_attr(common, 'os', new_os):
pl._cwd = '/abc/def/ghi/foo/bar'
self.assertEqual(common.cwd(pl=pl),
cwd[0] = '/abc/def/ghi/foo/bar'
self.assertEqual(common.cwd(pl=pl, segment_info=segment_info),
[{'contents': '/', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'abc', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'def', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'ghi', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'foo', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}])
pl.home = '/abc/def/ghi'
self.assertEqual(common.cwd(pl=pl),
segment_info['home'] = '/abc/def/ghi'
self.assertEqual(common.cwd(pl=pl, segment_info=segment_info),
[{'contents': '~', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'foo', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}])
self.assertEqual(common.cwd(pl=pl, dir_limit_depth=3),
self.assertEqual(common.cwd(pl=pl, segment_info=segment_info, dir_limit_depth=3),
[{'contents': '~', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'foo', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}])
self.assertEqual(common.cwd(pl=pl, dir_limit_depth=1),
self.assertEqual(common.cwd(pl=pl, segment_info=segment_info, dir_limit_depth=1),
[{'contents': '', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}])
self.assertEqual(common.cwd(pl=pl, dir_limit_depth=2, dir_shorten_len=2),
self.assertEqual(common.cwd(pl=pl, segment_info=segment_info, dir_limit_depth=2, dir_shorten_len=2),
[{'contents': '~', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'fo', 'divider_highlight_group': 'cwd:divider'},
{'contents': 'bar', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}])
ose = OSError()
ose.errno = 2
pl._cwd = ose
self.assertEqual(common.cwd(pl=pl, dir_limit_depth=2, dir_shorten_len=2),
cwd[0] = ose
self.assertEqual(common.cwd(pl=pl, segment_info=segment_info, dir_limit_depth=2, dir_shorten_len=2),
[{'contents': '[not found]', 'divider_highlight_group': 'cwd:divider', 'highlight_group': ['cwd:current_folder', 'cwd']}])
pl._cwd = OSError()
self.assertRaises(OSError, common.cwd, pl=pl, dir_limit_depth=2, dir_shorten_len=2)
pl._cwd = ValueError()
self.assertRaises(ValueError, common.cwd, pl=pl, dir_limit_depth=2, dir_shorten_len=2)
cwd[0] = OSError()
self.assertRaises(OSError, common.cwd, pl=pl, segment_info=segment_info, dir_limit_depth=2, dir_shorten_len=2)
cwd[0] = ValueError()
self.assertRaises(ValueError, common.cwd, pl=pl, segment_info=segment_info, dir_limit_depth=2, dir_shorten_len=2)
def test_date(self):
pl = Pl()
@ -257,10 +280,11 @@ class TestCommon(TestCase):
common.network_load.shutdown()
def test_virtualenv(self):
with replace_env('VIRTUAL_ENV', '/abc/def/ghi') as pl:
self.assertEqual(common.virtualenv(pl=pl), 'ghi')
pl.environ.pop('VIRTUAL_ENV')
self.assertEqual(common.virtualenv(pl=pl), None)
pl = Pl()
with replace_env('VIRTUAL_ENV', '/abc/def/ghi') as segment_info:
self.assertEqual(common.virtualenv(pl=pl, segment_info=segment_info), 'ghi')
segment_info['environ'].pop('VIRTUAL_ENV')
self.assertEqual(common.virtualenv(pl=pl, segment_info=segment_info), None)
def test_email_imap_alert(self):
# TODO