Merge pull request #1093 from ZyX-I/fix-tests

Some test fixes
This commit is contained in:
Nikolai Aleksandrovich Pavlov 2014-09-21 16:03:59 +04:00
commit 3929ee17d6
6 changed files with 86 additions and 56 deletions

View File

@ -15,6 +15,10 @@ from powerline.lib import mergedicts
from powerline.lib.encoding import get_preferred_output_encoding
class NotInterceptedError(BaseException):
pass
def _config_loader_condition(path):
if path and os.path.isfile(path):
return path
@ -744,16 +748,12 @@ class Powerline(object):
self.update_renderer()
return self.renderer.render(*args, **kwargs)
except Exception as e:
exc = e
try:
self.exception('Failed to render: {0}', str(e))
except Exception as e:
# Updates e variable to new value, masking previous one.
# Normally it is the same exception (due to raise in case pl is
# unset), but it may also show error in logger. Note that latter
# is not logged by logger for obvious reasons, thus this also
# prevents us from seeing logger traceback.
pass
return FailedUnicode(safe_unicode(e))
exc = e
return FailedUnicode(safe_unicode(exc))
def render_above_lines(self, *args, **kwargs):
'''Like .render(), but for ``self.renderer.render_above_lines()``
@ -763,16 +763,12 @@ class Powerline(object):
for line in self.renderer.render_above_lines(*args, **kwargs):
yield line
except Exception as e:
exc = e
try:
self.exception('Failed to render: {0}', str(e))
except Exception as e:
# Updates e variable to new value, masking previous one.
# Normally it is the same exception (due to raise in case pl is
# unset), but it may also show error in logger. Note that latter
# is not logged by logger for obvious reasons, thus this also
# prevents us from seeing logger traceback.
pass
yield FailedUnicode(safe_unicode(e))
exc = e
yield FailedUnicode(safe_unicode(exc))
def setup(self, *args, **kwargs):
'''Setup the environment to use powerline.

View File

@ -24,10 +24,11 @@ class RewriteResult(object):
class IPythonPowerline(Powerline):
def init(self):
def init(self, **kwargs):
super(IPythonPowerline, self).init(
'ipython',
use_daemon_threads=True
use_daemon_threads=True,
**kwargs
)
def get_config_paths(self):

View File

@ -9,6 +9,7 @@ from collections import defaultdict
from powerline.lib.watcher import create_tree_watcher
from powerline.lib.unicode import out_u
from powerline.lib.path import join
def generate_directories(path):
@ -97,7 +98,7 @@ class FileStatusCache(dict):
if nparent == parent:
break
parent = nparent
ignore_files.add(os.path.join(parent, ignore_file_name))
ignore_files.add(join(parent, ignore_file_name))
for f in extra_ignore_files:
ignore_files.add(f)
self.keypath_ignore_map[keypath] = ignore_files
@ -121,7 +122,7 @@ file_status_cache = FileStatusCache()
def get_file_status(directory, dirstate_file, file_path, ignore_file_name, get_func, create_watcher, extra_ignore_files=()):
global file_status_cache
keypath = file_path if os.path.isabs(file_path) else os.path.join(directory, file_path)
keypath = file_path if os.path.isabs(file_path) else join(directory, file_path)
file_status_cache.update_maps(keypath, directory, dirstate_file, ignore_file_name, extra_ignore_files)
with file_status_lock:

View File

@ -122,15 +122,20 @@ try:
# psutil-2.0.0: psutil.Process.username is unbound method
if callable(psutil.Process.username):
def _get_user(segment_info):
def _get_user():
return psutil.Process(os.getpid()).username()
# pre psutil-2.0.0: psutil.Process.username has type property
else:
def _get_user(segment_info):
def _get_user():
return psutil.Process(os.getpid()).username
except ImportError:
def _get_user(segment_info):
return segment_info['environ'].get('USER', None)
try:
import pwd
except ImportError:
from getpass import getuser as _get_user
else:
def _get_user():
return pwd.getpwuid(os.geteuid()).pw_name
username = False
@ -138,7 +143,7 @@ username = False
_geteuid = getattr(os, 'geteuid', lambda: 1)
def user(pl, segment_info=None, hide_user=None):
def user(pl, hide_user=None):
'''Return the current user.
:param str hide_user:
@ -150,7 +155,7 @@ def user(pl, segment_info=None, hide_user=None):
'''
global username
if username is False:
username = _get_user(segment_info)
username = _get_user()
if username is None:
pl.warn('Failed to get username')
return None
@ -161,5 +166,3 @@ def user(pl, segment_info=None, hide_user=None):
'contents': username,
'highlight_group': ['user'] if euid != 0 else ['superuser', 'user'],
}]
if 'psutil' not in globals():
user = requires_segment_info(user)

View File

@ -7,17 +7,42 @@ from __future__ import (unicode_literals, division, absolute_import, print_funct
import sys
import os
import json
import logging
import tests.vim as vim_module
from tests.lib import Args, urllib_read, replace_attr
from tests import TestCase
from powerline import NotInterceptedError
from powerline.segments.common import wthr
VBLOCK = chr(ord('V') - 0x40)
SBLOCK = chr(ord('S') - 0x40)
class FailingLogger(logging.Logger):
def exception(self, *args, **kwargs):
super(FailingLogger, self).exception(*args, **kwargs)
raise NotInterceptedError('Unexpected exception occurred')
def get_logger(stream=None):
log_format = '%(asctime)s:%(levelname)s:%(message)s'
formatter = logging.Formatter(log_format)
level = logging.WARNING
handler = logging.StreamHandler(stream)
handler.setLevel(level)
handler.setFormatter(formatter)
logger = FailingLogger('powerline')
logger.setLevel(level)
logger.addHandler(handler)
return logger
class TestVimConfig(TestCase):
def test_vim(self):
from powerline.vim import VimPowerline
@ -40,7 +65,7 @@ class TestVimConfig(TestCase):
i = 0
with vim_module._with('split'):
with VimPowerline() as powerline:
with VimPowerline(logger=get_logger()) as powerline:
def check_output(mode, args, kwargs):
if mode == 'nc':
window = vim_module.windows[0]
@ -92,33 +117,33 @@ class TestConfig(TestCase):
reload(common)
from powerline.shell import ShellPowerline
with replace_attr(common, 'urllib_read', urllib_read):
with ShellPowerline(Args(ext=['tmux']), run_once=False) as powerline:
with ShellPowerline(Args(ext=['tmux']), logger=get_logger(), run_once=False) as powerline:
powerline.render()
with ShellPowerline(Args(ext=['tmux']), run_once=False) as powerline:
with ShellPowerline(Args(ext=['tmux']), logger=get_logger(), run_once=False) as powerline:
powerline.render()
def test_zsh(self):
from powerline.shell import ShellPowerline
args = Args(last_pipe_status=[1, 0], jobnum=0, ext=['shell'], renderer_module='.zsh')
segment_info = {'args': args}
with ShellPowerline(args, run_once=False) as powerline:
with ShellPowerline(args, logger=get_logger(), run_once=False) as powerline:
powerline.render(segment_info=segment_info)
with ShellPowerline(args, run_once=False) as powerline:
with ShellPowerline(args, logger=get_logger(), run_once=False) as powerline:
powerline.render(segment_info=segment_info)
segment_info['local_theme'] = 'select'
with ShellPowerline(args, run_once=False) as powerline:
with ShellPowerline(args, logger=get_logger(), run_once=False) as powerline:
powerline.render(segment_info=segment_info)
segment_info['local_theme'] = 'continuation'
segment_info['parser_state'] = 'if cmdsubst'
with ShellPowerline(args, run_once=False) as powerline:
with ShellPowerline(args, logger=get_logger(), run_once=False) as powerline:
powerline.render(segment_info=segment_info)
def test_bash(self):
from powerline.shell import ShellPowerline
args = Args(last_exit_code=1, jobnum=0, ext=['shell'], renderer_module='.bash', config={'ext': {'shell': {'theme': 'default_leftonly'}}})
with ShellPowerline(args, run_once=False) as powerline:
with ShellPowerline(args, logger=get_logger(), run_once=False) as powerline:
powerline.render(segment_info={'args': args})
with ShellPowerline(args, run_once=False) as powerline:
with ShellPowerline(args, logger=get_logger(), run_once=False) as powerline:
powerline.render(segment_info={'args': args})
def test_ipython(self):
@ -131,11 +156,11 @@ class TestConfig(TestCase):
segment_info = Args(prompt_count=1)
with IpyPowerline() as powerline:
with IpyPowerline(logger=get_logger()) as powerline:
for prompt_type in ['in', 'in2']:
powerline.render(is_prompt=True, matcher_info=prompt_type, segment_info=segment_info)
powerline.render(is_prompt=True, matcher_info=prompt_type, segment_info=segment_info)
with IpyPowerline() as powerline:
with IpyPowerline(logger=get_logger()) as powerline:
for prompt_type in ['out', 'rewrite']:
powerline.render(is_prompt=False, matcher_info=prompt_type, segment_info=segment_info)
powerline.render(is_prompt=False, matcher_info=prompt_type, segment_info=segment_info)
@ -145,8 +170,8 @@ class TestConfig(TestCase):
from imp import reload
reload(common)
from powerline import Powerline
with replace_attr(common, 'urllib_read', urllib_read):
Powerline(ext='wm', renderer_module='pango_markup', run_once=True).render()
with replace_attr(wthr, 'urllib_read', urllib_read):
Powerline(logger=get_logger(), ext='wm', renderer_module='pango_markup', run_once=True).render()
reload(common)

View File

@ -5,6 +5,7 @@ import sys
import os
from functools import partial
from collections import namedtuple
from powerline.segments import shell, tmux, common
from powerline.lib.vcs import get_fallback_create_watcher
@ -448,27 +449,30 @@ class TestEnv(TestCommon):
def username(self):
return 'def'
if hasattr(common, 'psutil') and not callable(common.psutil.Process.username):
if hasattr(self.module, 'psutil') and not callable(self.module.psutil.Process.username):
username = property(username)
struct_passwd = namedtuple('struct_passwd', ('pw_name',))
new_psutil = new_module('psutil', Process=Process)
new_pwd = new_module('pwd', getpwuid=lambda uid: struct_passwd(pw_name='def'))
new_getpass = new_module('getpass', getuser=lambda: 'def')
pl = Pl()
with replace_env('USER', 'def') as segment_info:
common.username = False
with replace_attr(self.module, 'os', new_os):
with replace_attr(self.module, 'psutil', new_psutil):
with replace_attr(self.module, '_geteuid', lambda: 5):
self.assertEqual(common.user(pl=pl, segment_info=segment_info), [
{'contents': 'def', 'highlight_group': ['user']}
])
self.assertEqual(common.user(pl=pl, segment_info=segment_info, hide_user='abc'), [
{'contents': 'def', 'highlight_group': ['user']}
])
self.assertEqual(common.user(pl=pl, segment_info=segment_info, hide_user='def'), None)
with replace_attr(self.module, '_geteuid', lambda: 0):
self.assertEqual(common.user(pl=pl, segment_info=segment_info), [
{'contents': 'def', 'highlight_group': ['superuser', 'user']}
])
with replace_attr(self.module, 'pwd', new_pwd):
with replace_attr(self.module, 'getpass', new_getpass):
with replace_attr(self.module, 'os', new_os):
with replace_attr(self.module, 'psutil', new_psutil):
with replace_attr(self.module, '_geteuid', lambda: 5):
self.assertEqual(common.user(pl=pl), [
{'contents': 'def', 'highlight_group': ['user']}
])
self.assertEqual(common.user(pl=pl, hide_user='abc'), [
{'contents': 'def', 'highlight_group': ['user']}
])
self.assertEqual(common.user(pl=pl, hide_user='def'), None)
with replace_attr(self.module, '_geteuid', lambda: 0):
self.assertEqual(common.user(pl=pl), [
{'contents': 'def', 'highlight_group': ['superuser', 'user']}
])
def test_cwd(self):
new_os = new_module('os', path=os.path, sep='/')