Merge pull request #1077 from ZyX-I/fix-non-unicode-vim

Fix non-unicode strings and locales handling
This commit is contained in:
Nikolai Aleksandrovich Pavlov 2014-09-14 20:07:49 +04:00
commit 2f81af5e59
26 changed files with 418 additions and 191 deletions

View File

@ -11,7 +11,7 @@ sys.path.insert(0, os.path.abspath(os.getcwd()))
extensions = ['powerline_autodoc', 'sphinx.ext.todo', 'sphinx.ext.coverage', 'sphinx.ext.viewcode']
source_suffix = '.rst'
master_doc = 'index'
project = u'Powerline'
project = 'Powerline'
version = 'beta'
release = 'beta'
exclude_patterns = ['_build']

View File

@ -371,6 +371,10 @@ Vim ``segment_info`` argument is a dictionary with the following keys:
``mode``
Current mode.
``encoding``
Value of ``&encoding`` from the time when powerline was initialized. It
should be used to convert return values.
.. note::
Your segment generally should not assume that it is run for the current
window, current buffer or current tabpage. “Current window” and “current

View File

@ -7,43 +7,127 @@ import codecs
try:
import vim
except ImportError:
vim = {}
vim = object()
if not hasattr(vim, 'bindeval'):
import json
from powerline.lib.unicode import unicode
try:
vim_encoding = vim.eval('&encoding')
except AttributeError:
vim_encoding = 'utf-8'
python_to_vim_types = {
unicode: (
lambda o: b'\'' + (o.translate({
ord('\''): '\'\'',
}).encode(vim_encoding)) + b'\''
),
bytes: (lambda o: b'\'' + o.replace(b'\'', b'\'\'') + b'\''),
int: (str if str is bytes else (lambda o: unicode(o).encode('ascii'))),
}
python_to_vim_types[float] = python_to_vim_types[int]
def python_to_vim(o):
return python_to_vim_types[type(o)](o)
if sys.version_info < (3,):
def str_to_bytes(s):
return s
def unicode_eval(expr):
ret = vim.eval(expr)
return ret.decode(vim_encoding, 'powerline_vim_strtrans_error')
else:
def str_to_bytes(s):
return s.encode(vim_encoding)
def unicode_eval(expr):
return vim.eval(expr)
def safe_bytes_eval(expr):
return bytes(bytearray((
int(chunk) for chunk in (
vim.eval(
b'substitute(' + expr + b', ' +
b'\'^.*$\', \'\\=join(map(range(len(submatch(0))), ' +
b'"char2nr(submatch(0)[v:val])"))\', "")'
).split()
)
)))
def eval_bytes(expr):
try:
return str_to_bytes(vim.eval(expr))
except UnicodeDecodeError:
return safe_bytes_eval(expr)
def eval_unicode(expr):
try:
return unicode_eval(expr)
except UnicodeDecodeError:
return safe_bytes_eval(expr).decode(vim_encoding, 'powerline_vim_strtrans_error')
if hasattr(vim, 'bindeval'):
rettype_func = {
None: lambda f: f,
'unicode': (
lambda f: (
lambda *args, **kwargs: (
f(*args, **kwargs).decode(
vim_encoding, 'powerline_vim_strtrans_error'
))))
}
rettype_func['int'] = rettype_func['bytes'] = rettype_func[None]
rettype_func['str'] = rettype_func['bytes'] if str is bytes else rettype_func['unicode']
def vim_get_func(f, rettype=None):
'''Return a vim function binding.'''
try:
func = vim.bindeval('function("' + f + '")')
if sys.version_info >= (3,) and rettype is str:
return (lambda *args, **kwargs: func(*args, **kwargs).decode('utf-8', errors='replace'))
return func
except vim.error:
return None
else:
return rettype_func[rettype](func)
else:
rettype_eval = {
None: getattr(vim, 'eval', None),
'int': lambda expr: int(vim.eval(expr)),
'bytes': eval_bytes,
'unicode': eval_unicode,
}
rettype_eval['str'] = rettype_eval[None]
class VimFunc(object):
'''Evaluate a vim function using vim.eval().
This is a fallback class for older vim versions.
'''
__slots__ = ('f', 'rettype')
__slots__ = ('f', 'eval')
def __init__(self, f, rettype=None):
self.f = f
self.rettype = rettype
self.f = f.encode('utf-8')
self.eval = rettype_eval[rettype]
def __call__(self, *args):
r = vim.eval(self.f + '(' + json.dumps(args)[1:-1] + ')')
if self.rettype:
return self.rettype(r)
return r
return self.eval(self.f + b'(' + (b','.join((
python_to_vim(o) for o in args
))) + b')')
vim_get_func = VimFunc
if type(vim) is object:
vim_get_func = lambda *args, **kwargs: None
_getbufvar = vim_get_func('getbufvar')
@ -52,7 +136,10 @@ _getbufvar = vim_get_func('getbufvar')
if hasattr(vim, 'vvars') and vim.vvars['version'] > 703:
_vim_to_python_types = {
getattr(vim, 'Dictionary', None) or type(vim.bindeval('{}')):
lambda value: dict(((key, _vim_to_python(value[key])) for key in value.keys())),
lambda value: dict((
(_vim_to_python(k), _vim_to_python(v))
for k, v in value.items()
)),
getattr(vim, 'List', None) or type(vim.bindeval('[]')):
lambda value: [_vim_to_python(item) for item in value],
getattr(vim, 'Function', None) or type(vim.bindeval('function("mode")')):
@ -74,7 +161,7 @@ else:
list: (lambda value: [_vim_to_python(i) for i in value]),
}
_vim_exists = vim_get_func('exists', rettype=int)
_vim_exists = vim_get_func('exists', rettype='int')
def vim_getvar(varname):
varname = 'g:' + varname
@ -102,7 +189,7 @@ else:
if sys.version_info < (3,):
getbufvar = _getbufvar
else:
_vim_to_python_types[bytes] = lambda value: value.decode('utf-8')
_vim_to_python_types[bytes] = lambda value: value.decode(vim_encoding)
def getbufvar(*args):
return _vim_to_python(_getbufvar(*args))
@ -133,7 +220,7 @@ else:
def vim_setoption(option, value):
vim.command('let &g:{option} = {value}'.format(
option=option, value=json.encode(value)))
option=option, value=python_to_vim(value)))
if hasattr(vim, 'tabpages'):
@ -256,29 +343,27 @@ class VimEnviron(object):
if sys.version_info < (3,):
def buffer_name(buf):
return buf.name
def buffer_name(segment_info):
return segment_info['buffer'].name
else:
vim_bufname = vim_get_func('bufname')
vim_bufname = vim_get_func('bufname', rettype='bytes')
def buffer_name(buf):
def buffer_name(segment_info):
try:
name = buf.name
name = segment_info['buffer'].name
except UnicodeDecodeError:
return vim_bufname(buf.number)
return vim_bufname(segment_info['bufnr'])
else:
return name.encode('utf-8') if name else None
return name.encode(segment_info['encoding']) if name else None
vim_strtrans = vim_get_func('strtrans')
vim_strtrans = vim_get_func('strtrans', rettype='unicode')
def powerline_vim_strtrans_error(e):
if not isinstance(e, UnicodeDecodeError):
raise NotImplementedError
# Assuming &encoding is utf-8 strtrans should not return anything but ASCII
# under current circumstances
text = vim_strtrans(e.object[e.start:e.end]).decode()
text = vim_strtrans(e.object[e.start:e.end])
return (text, e.end)

View File

@ -4,6 +4,7 @@ from __future__ import (unicode_literals, division, absolute_import, print_funct
import atexit
from weakref import WeakValueDictionary, ref
from locale import getpreferredencoding
import zsh
@ -65,7 +66,7 @@ class Args(object):
def string(s):
if type(s) is bytes:
return s.decode('utf-8', 'replace')
return s.decode(getpreferredencoding(), 'replace')
else:
return str(s)
@ -154,9 +155,9 @@ class Prompt(object):
)
if type(r) is not str:
if type(r) is bytes:
return r.decode('utf-8')
return r.decode(getpreferredencoding(), 'replace')
else:
return r.encode('utf-8')
return r.encode(getpreferredencoding(), 'replace')
return r
def __del__(self):

View File

@ -123,8 +123,6 @@ class INotify(object):
self._buf = ctypes.create_string_buffer(5000)
self.fenc = sys.getfilesystemencoding() or 'utf-8'
self.hdr = struct.Struct(b'iIII')
if self.fenc == 'ascii':
self.fenc = 'utf-8'
# We keep a reference to os to prevent it from being deleted
# during interpreter shutdown, which would lead to errors in the
# __del__ method
@ -176,7 +174,7 @@ class INotify(object):
pos += self.hdr.size
name = None
if get_name:
name = raw[pos:pos + name_len].rstrip(b'\0').decode(self.fenc)
name = raw[pos:pos + name_len].rstrip(b'\0')
pos += name_len
self.process_event(wd, mask, cookie, name)

View File

@ -6,3 +6,13 @@ import os
def realpath(path):
return os.path.abspath(os.path.realpath(path))
def join(*components):
if any((isinstance(p, bytes) for p in components)):
return os.path.join(*[
p if isinstance(p, bytes) else p.encode('ascii')
for p in components
])
else:
return os.path.join(*components)

View File

@ -52,6 +52,23 @@ def powerline_decode_error(e):
codecs.register_error('powerline_decode_error', powerline_decode_error)
last_swe_idx = 0
def register_strwidth_error(strwidth):
global last_swe_idx
last_swe_idx += 1
def powerline_encode_strwidth_error(e):
if not isinstance(e, UnicodeEncodeError):
raise NotImplementedError
return ('?' * strwidth(e.object[e.start:e.end]), e.end)
ename = 'powerline_encode_strwidth_error_{0}'.format(last_swe_idx)
codecs.register_error(ename, powerline_encode_strwidth_error)
return ename
def out_u(s):
'''Return unicode string suitable for displaying

View File

@ -8,6 +8,7 @@ from threading import Lock
from collections import defaultdict
from powerline.lib.watcher import create_tree_watcher
from powerline.lib.unicode import out_u
def generate_directories(path):
@ -75,10 +76,10 @@ def get_branch_name(directory, config_file, get_func, create_watcher):
raise
# Config file does not exist (happens for mercurial)
if config_file not in branch_name_cache:
branch_name_cache[config_file] = get_func(directory, config_file)
branch_name_cache[config_file] = out_u(get_func(directory, config_file))
if changed:
# Config file has changed or was not tracked
branch_name_cache[config_file] = get_func(directory, config_file)
branch_name_cache[config_file] = out_u(get_func(directory, config_file))
return branch_name_cache[config_file]
@ -218,9 +219,15 @@ vcs_props = (
)
vcs_props_bytes = [
(vcs, vcs_dir.encode('ascii'), check)
for vcs, vcs_dir, check in vcs_props
]
def guess(path, create_watcher):
for directory in generate_directories(path):
for vcs, vcs_dir, check in vcs_props:
for vcs, vcs_dir, check in (vcs_props_bytes if isinstance(path, bytes) else vcs_props):
repo_dir = os.path.join(directory, vcs_dir)
if check(repo_dir):
if os.path.isdir(repo_dir) and not os.access(repo_dir, os.X_OK):

View File

@ -1,21 +1,22 @@
# vim:fileencoding=utf-8:noet
from __future__ import (unicode_literals, division, absolute_import, print_function)
import sys
import os
import re
from io import StringIO
from locale import getpreferredencoding
from bzrlib import (workingtree, status, library_state, trace, ui)
from powerline.lib.vcs import get_branch_name, get_file_status
from powerline.lib.path import join
class CoerceIO(StringIO):
def write(self, arg):
if isinstance(arg, bytes):
arg = arg.decode('utf-8', 'replace')
arg = arg.decode(getpreferredencoding(), 'replace')
return super(CoerceIO, self).write(arg)
@ -29,7 +30,7 @@ def branch_name_from_config_file(directory, config_file):
for line in f:
m = nick_pat.match(line)
if m is not None:
ans = m.group(1).strip().decode('utf-8', 'replace')
ans = m.group(1).strip().decode(getpreferredencoding(), 'replace')
break
except Exception:
pass
@ -41,8 +42,6 @@ state = None
class Repository(object):
def __init__(self, directory, create_watcher):
if isinstance(directory, bytes):
directory = directory.decode(sys.getfilesystemencoding() or sys.getdefaultencoding() or 'utf-8')
self.directory = os.path.abspath(directory)
self.create_watcher = create_watcher
@ -61,7 +60,7 @@ class Repository(object):
if path is not None:
return get_file_status(
directory=self.directory,
dirstate_file=os.path.join(self.directory, '.bzr', 'checkout', 'dirstate'),
dirstate_file=join(self.directory, '.bzr', 'checkout', 'dirstate'),
file_path=path,
ignore_file_name='.bzrignore',
get_func=self.do_status,
@ -100,7 +99,7 @@ class Repository(object):
return ans if ans.strip() else None
def branch(self):
config_file = os.path.join(self.directory, '.bzr', 'branch', 'branch.conf')
config_file = join(self.directory, '.bzr', 'branch', 'branch.conf')
return get_branch_name(
directory=self.directory,
config_file=config_file,

View File

@ -5,8 +5,11 @@ import os
import sys
import re
from locale import getpreferredencoding
from powerline.lib.vcs import get_branch_name, get_file_status
from powerline.lib.shell import readlines
from powerline.lib.path import join
_ref_pat = re.compile(br'ref:\s*refs/heads/(.+)')
@ -20,20 +23,22 @@ def branch_name_from_config_file(directory, config_file):
return os.path.basename(directory)
m = _ref_pat.match(raw)
if m is not None:
return m.group(1).decode('utf-8', 'replace')
return m.group(1).decode(getpreferredencoding(), 'replace')
return raw[:7]
def git_directory(directory):
path = os.path.join(directory, '.git')
path = join(directory, '.git')
if os.path.isfile(path):
with open(path, 'rb') as f:
raw = f.read()
if not raw.startswith(b'gitdir: '):
raise IOError('invalid gitfile format')
raw = raw[8:].decode(sys.getfilesystemencoding() or 'utf-8')
if raw[-1] == '\n':
raw = raw[8:]
if raw[-1:] == b'\n':
raw = raw[:-1]
if not isinstance(path, bytes):
raw = raw.decode(sys.getfilesystemencoding() or 'utf-8')
if not raw:
raise IOError('no path in gitfile')
return os.path.abspath(os.path.join(directory, raw))
@ -69,18 +74,18 @@ class GitRepository(object):
# for some reason I cannot be bothered to figure out.
return get_file_status(
directory=self.directory,
dirstate_file=os.path.join(gitd, 'index'),
dirstate_file=join(gitd, 'index'),
file_path=path,
ignore_file_name='.gitignore',
get_func=self.do_status,
create_watcher=self.create_watcher,
extra_ignore_files=tuple(os.path.join(gitd, x) for x in ('logs/HEAD', 'info/exclude')),
extra_ignore_files=tuple(join(gitd, x) for x in ('logs/HEAD', 'info/exclude')),
)
return self.do_status(self.directory, path)
def branch(self):
directory = git_directory(self.directory)
head = os.path.join(directory, 'HEAD')
head = join(directory, 'HEAD')
return get_branch_name(
directory=directory,
config_file=head,

View File

@ -3,16 +3,19 @@ from __future__ import (unicode_literals, division, absolute_import, print_funct
import os
from locale import getpreferredencoding
from mercurial import hg, ui, match
from powerline.lib.vcs import get_branch_name, get_file_status
from powerline.lib.path import join
def branch_name_from_config_file(directory, config_file):
try:
with open(config_file, 'rb') as f:
raw = f.read()
return raw.decode('utf-8', 'replace').strip()
return raw.decode(getpreferredencoding(), 'replace').strip()
except Exception:
return 'default'
@ -50,7 +53,7 @@ class Repository(object):
if path:
return get_file_status(
directory=self.directory,
dirstate_file=os.path.join(self.directory, '.hg', 'dirstate'),
dirstate_file=join(self.directory, '.hg', 'dirstate'),
file_path=path,
ignore_file_name='.hgignore',
get_func=self.do_status,
@ -75,7 +78,7 @@ class Repository(object):
return self.repo_statuses_str[resulting_status]
def branch(self):
config_file = os.path.join(self.directory, '.hg', 'branch')
config_file = join(self.directory, '.hg', 'branch')
return get_branch_name(
directory=self.directory,
config_file=config_file,

View File

@ -247,6 +247,8 @@ class INotifyTreeWatcher(INotify):
if mask & self.CREATE:
# A new sub-directory might have been created, monitor it.
try:
if not isinstance(path, bytes):
name = name.decode(self.fenc)
self.add_watch(os.path.join(path, name))
except OSError as e:
if e.errno == errno.ENOENT:

View File

@ -2,6 +2,7 @@
from __future__ import (unicode_literals, division, absolute_import, print_function)
import os
import sys
from collections import defaultdict
from threading import RLock
@ -56,15 +57,24 @@ def start_uv_thread():
return _uv_thread.uv_loop
def normpath(path, fenc):
path = realpath(path)
if isinstance(path, bytes):
return path.decode(fenc)
else:
return path
class UvWatcher(object):
def __init__(self):
import_pyuv()
self.watches = {}
self.lock = RLock()
self.loop = start_uv_thread()
self.fenc = sys.getfilesystemencoding() or 'utf-8'
def watch(self, path):
path = realpath(path)
path = normpath(path, self.fenc)
with self.lock:
if path not in self.watches:
try:
@ -82,7 +92,7 @@ class UvWatcher(object):
raise
def unwatch(self, path):
path = realpath(path)
path = normpath(path, self.fenc)
with self.lock:
try:
watch = self.watches.pop(path)
@ -92,7 +102,7 @@ class UvWatcher(object):
def is_watching(self, path):
with self.lock:
return realpath(path) in self.watches
return normpath(path, self.fenc) in self.watches
def __del__(self):
try:
@ -122,7 +132,7 @@ class UvFileWatcher(UvWatcher):
self.events.pop(path, None)
def __call__(self, path):
path = realpath(path)
path = normpath(path, self.fenc)
with self.lock:
events = self.events.pop(path, None)
if events:
@ -139,14 +149,15 @@ class UvTreeWatcher(UvWatcher):
def __init__(self, basedir, ignore_event=None):
super(UvTreeWatcher, self).__init__()
self.ignore_event = ignore_event or (lambda path, name: False)
self.basedir = realpath(basedir)
self.basedir = normpath(basedir, self.fenc)
self.modified = True
self.watch_directory(self.basedir)
def watch_directory(self, path):
os.path.walk(realpath(path), self.watch_one_directory, None)
for root, dirs, files in os.walk(normpath(path, self.fenc)):
self.watch_one_directory(root)
def watch_one_directory(self, arg, dirname, fnames):
def watch_one_directory(self, dirname):
try:
self.watch(dirname)
except OSError:

View File

@ -3,7 +3,7 @@ from __future__ import (unicode_literals, division, absolute_import, print_funct
import os
from powerline.bindings.vim import vim_getbufoption
from powerline.bindings.vim import vim_getbufoption, buffer_name
def help(matcher_info):
@ -11,8 +11,8 @@ def help(matcher_info):
def cmdwin(matcher_info):
name = matcher_info['buffer'].name
return name and os.path.basename(name) == '[Command Line]'
name = buffer_name(matcher_info)
return name and os.path.basename(name) == b'[Command Line]'
def quickfix(matcher_info):

View File

@ -3,10 +3,13 @@ from __future__ import (unicode_literals, division, absolute_import, print_funct
import os
from powerline.bindings.vim import buffer_name
try:
import vim
except ImportError:
vim = object()
pass
else:
vim.command('''
function! Powerline_plugin_ctrlp_main(...)
@ -26,5 +29,5 @@ else:
def ctrlp(matcher_info):
name = matcher_info['buffer'].name
return name and os.path.basename(name) == 'ControlP'
name = buffer_name(matcher_info)
return name and os.path.basename(name) == b'ControlP'

View File

@ -3,12 +3,14 @@ from __future__ import (unicode_literals, division, absolute_import, print_funct
import os
from powerline.bindings.vim import buffer_name
def gundo(matcher_info):
name = matcher_info['buffer'].name
return name and os.path.basename(name) == '__Gundo__'
name = buffer_name(matcher_info)
return name and os.path.basename(name) == b'__Gundo__'
def gundo_preview(matcher_info):
name = matcher_info['buffer'].name
return name and os.path.basename(name) == '__Gundo_Preview__'
name = buffer_name(matcher_info)
return name and os.path.basename(name) == b'__Gundo_Preview__'

View File

@ -4,7 +4,12 @@ from __future__ import (unicode_literals, division, absolute_import, print_funct
import os
import re
from powerline.bindings.vim import buffer_name
NERD_TREE_RE = re.compile(b'NERD_TREE_\\d+')
def nerdtree(matcher_info):
name = matcher_info['buffer'].name
return name and re.match(r'NERD_tree_\d+', os.path.basename(name))
name = buffer_name(matcher_info)
return name and NERD_TREE_RE.match(os.path.basename(name))

View File

@ -9,10 +9,10 @@ from powerline.bindings.vim import vim_get_func, vim_getoption, environ, current
from powerline.renderer import Renderer
from powerline.colorscheme import ATTR_BOLD, ATTR_ITALIC, ATTR_UNDERLINE
from powerline.theme import Theme
from powerline.lib.unicode import unichr
from powerline.lib.unicode import unichr, register_strwidth_error
vim_mode = vim_get_func('mode', rettype=str)
vim_mode = vim_get_func('mode', rettype='unicode')
if int(vim.eval('v:version')) >= 702:
_vim_mode = vim_mode
vim_mode = lambda: _vim_mode(1)
@ -41,6 +41,8 @@ class VimRenderer(Renderer):
super(VimRenderer, self).__init__(*args, **kwargs)
self.hl_groups = {}
self.prev_highlight = None
self.strwidth_error_name = register_strwidth_error(self.strwidth)
self.encoding = vim.eval('&encoding')
def shutdown(self):
self.theme.shutdown()
@ -71,11 +73,10 @@ class VimRenderer(Renderer):
if hasattr(vim, 'strwidth'):
if sys.version_info < (3,):
@staticmethod
def strwidth(string):
def strwidth(self, string):
# Does not work with tabs, but neither is strwidth from default
# renderer
return vim.strwidth(string.encode('utf-8'))
return vim.strwidth(string.encode(self.encoding, 'replace'))
else:
@staticmethod
def strwidth(string):
@ -101,6 +102,7 @@ class VimRenderer(Renderer):
winnr=winnr,
buffer=window.buffer,
tabpage=current_tabpage(),
encoding=self.encoding,
)
segment_info['tabnr'] = segment_info['tabpage'].number
segment_info['bufnr'] = segment_info['buffer'].number
@ -115,6 +117,7 @@ class VimRenderer(Renderer):
segment_info=segment_info,
matcher_info=(None if is_tabline else segment_info),
)
statusline = statusline.encode(self.encoding, self.strwidth_error_name)
return statusline
def reset_highlight(self):

View File

@ -28,13 +28,11 @@ except ImportError:
vim_funcs = {
'virtcol': vim_get_func('virtcol', rettype=int),
'virtcol': vim_get_func('virtcol', rettype='int'),
'getpos': vim_get_func('getpos'),
'fnamemodify': vim_get_func('fnamemodify'),
'expand': vim_get_func('expand'),
'bufnr': vim_get_func('bufnr', rettype=int),
'line2byte': vim_get_func('line2byte', rettype=int),
'line': vim_get_func('line', rettype=int),
'fnamemodify': vim_get_func('fnamemodify', rettype='bytes'),
'line2byte': vim_get_func('line2byte', rettype='int'),
'line': vim_get_func('line', rettype='int'),
}
vim_modes = {
@ -225,7 +223,7 @@ def file_scheme(pl, segment_info):
name will look like :file:`zipfile:/path/to/archive.zip::file.txt`.
``file_scheme`` segment will catch ``zipfile`` part here.
'''
name = buffer_name(segment_info['buffer'])
name = buffer_name(segment_info)
if not name:
return None
match = SCHEME_RE.match(name)
@ -254,7 +252,7 @@ def file_directory(pl, segment_info, remove_scheme=True, shorten_user=True, shor
Shorten all directories in :file:`/home/` to :file:`~user/` instead of
:file:`/home/user/`. Does not work for files with scheme present.
'''
name = buffer_name(segment_info['buffer'])
name = buffer_name(segment_info)
if not name:
return None
match = SCHEME_RE.match(name)
@ -271,7 +269,7 @@ def file_directory(pl, segment_info, remove_scheme=True, shorten_user=True, shor
return None
if shorten_home and file_directory.startswith('/home/'):
file_directory = b'~' + file_directory[6:]
file_directory = file_directory.decode('utf-8', 'powerline_vim_strtrans_error')
file_directory = file_directory.decode(segment_info['encoding'], 'powerline_vim_strtrans_error')
return file_directory + os.sep
@ -286,7 +284,7 @@ def file_name(pl, segment_info, display_no_file=False, no_file_text='[No file]')
Highlight groups used: ``file_name_no_file`` or ``file_name``, ``file_name``.
'''
name = buffer_name(segment_info['buffer'])
name = buffer_name(segment_info)
if not name:
if display_no_file:
return [{
@ -295,7 +293,7 @@ def file_name(pl, segment_info, display_no_file=False, no_file_text='[No file]')
}]
else:
return None
return os.path.basename(name).decode('utf-8', 'powerline_vim_strtrans_error')
return os.path.basename(name).decode(segment_info['encoding'], 'powerline_vim_strtrans_error')
@window_cached
@ -470,10 +468,13 @@ def modified_buffers(pl, text='+ ', join_str=','):
:param str join_str:
string to use for joining the modified buffer list
'''
buffer_len = vim_funcs['bufnr']('$')
buffer_mod = [str(bufnr) for bufnr in range(1, buffer_len + 1) if int(getbufvar(bufnr, '&modified') or 0)]
if buffer_mod:
return text + join_str.join(buffer_mod)
buffer_mod_text = join_str.join((
str(buffer.number)
for buffer in vim.buffers
if int(vim_getbufoption({'buffer': buffer, 'bufnr': buffer.number}, 'modified'))
))
if buffer_mod_text:
return text + buffer_mod_text
return None
@ -489,7 +490,7 @@ def branch(pl, segment_info, create_watcher, status_colors=False):
Divider highlight group used: ``branch:divider``.
'''
name = segment_info['buffer'].name
name = buffer_name(segment_info)
skip = not (name and (not vim_getbufoption(segment_info, 'buftype')))
if not skip:
repo = guess(path=name, create_watcher=create_watcher)
@ -513,7 +514,7 @@ def file_vcs_status(pl, segment_info, create_watcher):
Highlight groups used: ``file_vcs_status``.
'''
name = segment_info['buffer'].name
name = buffer_name(segment_info)
skip = not (name and (not vim_getbufoption(segment_info, 'buftype')))
if not skip:
repo = guess(path=name, create_watcher=create_watcher)

View File

@ -2,6 +2,7 @@
from __future__ import (unicode_literals, division, absolute_import, print_function)
import sys
import json
from itertools import count
@ -11,9 +12,6 @@ from powerline.bindings.vim import vim_get_func, vim_getvar
from powerline import Powerline, FailedUnicode
from powerline.lib import mergedicts
if not hasattr(vim, 'bindeval'):
import json
def _override_from(config, override_varname):
try:
@ -134,7 +132,7 @@ class VimPowerline(Powerline):
set_pycmd(pycmd)
# pyeval() and vim.bindeval were both introduced in one patch
if not hasattr(vim, 'bindeval') and can_replace_pyeval:
if (not hasattr(vim, 'bindeval') and can_replace_pyeval) or pyeval == 'PowerlinePyeval':
vim.command(('''
function! PowerlinePyeval(e)
{pycmd} powerline.do_pyeval()
@ -148,23 +146,30 @@ class VimPowerline(Powerline):
self.update_renderer()
__main__.powerline = self
if (
bool(int(vim.eval("has('gui_running') && argc() == 0")))
and not vim.current.buffer.name
and len(vim.windows) == 1
):
# Hack to show startup screen. Problems in GUI:
# - Defining local value of &statusline option while computing global
# value purges startup screen.
# - Defining highlight group while computing statusline purges startup
# screen.
# This hack removes the “while computing statusline” part: both things
# are defined, but they are defined right now.
#
# The above condition disables this hack if no GUI is running, Vim did
# not open any files and there is only one window. Without GUI
# everything works, in other cases startup screen is not shown.
self.new_window()
try:
if (
bool(int(vim.eval("has('gui_running') && argc() == 0")))
and not vim.current.buffer.name
and len(vim.windows) == 1
):
# Hack to show startup screen. Problems in GUI:
# - Defining local value of &statusline option while computing
# global value purges startup screen.
# - Defining highlight group while computing statusline purges
# startup screen.
# This hack removes the “while computing statusline” part: both
# things are defined, but they are defined right now.
#
# The above condition disables this hack if no GUI is running,
# Vim did not open any files and there is only one window.
# Without GUI everything works, in other cases startup screen is
# not shown.
self.new_window()
except UnicodeDecodeError:
# vim.current.buffer.name may raise UnicodeDecodeError when using
# Python-3*. Fortunately, this means that current buffer is not
# empty buffer, so the above condition should be False.
pass
# Cannot have this in one line due to weird newline handling (in :execute
# context newline is considered part of the command in just the same cases
@ -245,14 +250,15 @@ class VimPowerline(Powerline):
def new_window(self):
return self.render(*self.win_idx(None))
if not hasattr(vim, 'bindeval'):
# Method for PowerlinePyeval function. Is here to reduce the number of
# requirements to __main__ globals to just one powerline object
# (previously it required as well vim and json)
@staticmethod
def do_pyeval():
import __main__
vim.command('return ' + json.dumps(eval(vim.eval('a:e'), __main__.__dict__)))
@staticmethod
def do_pyeval():
'''Evaluate python string passed to PowerlinePyeval
Is here to reduce the number of requirements to __main__ globals to just
one powerline object (previously it required as well vim and json).
'''
import __main__
vim.command('return ' + json.dumps(eval(vim.eval('a:e'), __main__.__dict__)))
def setup_components(self, components):
if components is None:

View File

@ -653,9 +653,9 @@ class TestVim(TestCase):
window = vim_module.current.window
window_id = 1
winnr = window.number
self.assertEqual(powerline.render(window, window_id, winnr), '%#Pl_3_8404992_4_192_underline#\xa0abc%#Pl_4_192_NONE_None_NONE#>>')
self.assertEqual(powerline.render(window, window_id, winnr), b'%#Pl_3_8404992_4_192_underline#\xc2\xa0abc%#Pl_4_192_NONE_None_NONE#>>')
vim_module._environ['TEST'] = 'def'
self.assertEqual(powerline.render(window, window_id, winnr), '%#Pl_3_8404992_4_192_underline#\xa0def%#Pl_4_192_NONE_None_NONE#>>')
self.assertEqual(powerline.render(window, window_id, winnr), b'%#Pl_3_8404992_4_192_underline#\xc2\xa0def%#Pl_4_192_NONE_None_NONE#>>')
def test_local_themes(self):
# Regression test: VimPowerline.add_local_theme did not work properly.
@ -682,7 +682,7 @@ class TestVim(TestCase):
window = vim_module.current.window
window_id = 1
winnr = window.number
self.assertEqual(powerline.render(window, window_id, winnr), '%#Pl_5_12583104_6_32896_NONE#\xa0\u201cbar\u201d%#Pl_6_32896_NONE_None_NONE#>>')
self.assertEqual(powerline.render(window, window_id, winnr), b'%#Pl_5_12583104_6_32896_NONE#\xc2\xa0\xe2\x80\x9cbar\xe2\x80\x9d%#Pl_6_32896_NONE_None_NONE#>>')
@classmethod
def setUpClass(cls):

View File

@ -0,0 +1,19 @@
set encoding=utf-8
let g:powerline_config_paths = [expand('<sfile>:p:h:h') . '/powerline/config_files']
set laststatus=2
set showtabline=2
edit `="\xFF"`
redir => g:messages
try
source powerline/bindings/vim/plugin/powerline.vim
redrawstatus!
catch
call writefile(['Unexpected exception', v:exception], 'message.fail')
cquit
endtry
redir END
if g:messages =~# '\v\S'
call writefile(['Unexpected messages'] + split(g:messages, "\n", 1), 'message.fail')
cquit
endif
qall!

View File

@ -445,7 +445,7 @@ class TestVCS(TestCase):
call(['git', 'checkout', '-q', 'branch2'], cwd=GIT_REPO)
self.do_branch_rename_test(repo, 'branch2')
call(['git', 'checkout', '-q', '--detach', 'branch1'], cwd=GIT_REPO)
self.do_branch_rename_test(repo, lambda b: re.match(br'^[a-f0-9]+$', b))
self.do_branch_rename_test(repo, lambda b: re.match(r'^[a-f0-9]+$', b))
finally:
call(['git', 'checkout', '-q', 'master'], cwd=GIT_REPO)

View File

@ -8,6 +8,7 @@ from functools import partial
from powerline.segments import shell, tmux, common
from powerline.lib.vcs import get_fallback_create_watcher
from powerline.lib.unicode import out_u
import tests.vim as vim_module
@ -18,10 +19,10 @@ from tests import TestCase, SkipTest
def get_dummy_guess(**kwargs):
if 'directory' in kwargs:
def guess(path, create_watcher):
return Args(branch=lambda: os.path.basename(path), **kwargs)
return Args(branch=lambda: out_u(os.path.basename(path)), **kwargs)
else:
def guess(path, create_watcher):
return Args(branch=lambda: os.path.basename(path), directory=path, **kwargs)
return Args(branch=lambda: out_u(os.path.basename(path)), directory=path, **kwargs)
return guess

View File

@ -27,6 +27,67 @@ def clear_dir(dir):
os.rmdir(os.path.join(root, d))
def set_watcher_tests(l):
byte_tests = (('bytes', True), ('unicode', False))
for btn, use_bytes in byte_tests:
def test_inotify_file_watcher(self, use_bytes=use_bytes):
try:
w = create_file_watcher(pl=get_fallback_logger(), watcher_type='inotify')
except INotifyError:
raise SkipTest('This test is not suitable for a stat based file watcher')
self.do_test_file_watcher(w, use_bytes)
def test_uv_file_watcher(self, use_bytes=use_bytes):
raise SkipTest('Uv watcher tests are not stable')
try:
w = create_file_watcher(pl=get_fallback_logger(), watcher_type='uv')
except UvNotFound:
raise SkipTest('Pyuv is not available')
self.do_test_file_watcher(w, use_bytes)
def test_inotify_tree_watcher(self, use_bytes=use_bytes):
try:
tw = create_tree_watcher(get_fallback_logger(), watcher_type='inotify')
except INotifyError:
raise SkipTest('INotify is not available')
self.do_test_tree_watcher(tw, use_bytes)
def test_uv_tree_watcher(self, use_bytes=use_bytes):
raise SkipTest('Uv watcher tests are not stable')
try:
tw = create_tree_watcher(get_fallback_logger(), 'uv')
except UvNotFound:
raise SkipTest('Pyuv is not available')
self.do_test_tree_watcher(tw, use_bytes)
def test_inotify_file_watcher_is_watching(self, use_bytes=use_bytes):
try:
w = create_file_watcher(pl=get_fallback_logger(), watcher_type='inotify')
except INotifyError:
raise SkipTest('INotify is not available')
self.do_test_file_watcher_is_watching(w, use_bytes)
def test_stat_file_watcher_is_watching(self, use_bytes=use_bytes):
w = create_file_watcher(pl=get_fallback_logger(), watcher_type='stat')
self.do_test_file_watcher_is_watching(w, use_bytes)
def test_uv_file_watcher_is_watching(self, use_bytes=use_bytes):
try:
w = create_file_watcher(pl=get_fallback_logger(), watcher_type='uv')
except UvNotFound:
raise SkipTest('Pyuv is not available')
self.do_test_file_watcher_is_watching(w, use_bytes)
for wt in ('uv', 'inotify'):
l['test_{0}_file_watcher_{1}'.format(wt, btn)] = locals()['test_{0}_file_watcher'.format(wt)]
l['test_{0}_tree_watcher_{1}'.format(wt, btn)] = locals()['test_{0}_tree_watcher'.format(wt)]
l['test_{0}_file_watcher_is_watching_{1}'.format(wt, btn)] = (
locals()['test_{0}_file_watcher_is_watching'.format(wt)])
l['test_{0}_file_watcher_is_watching_{1}'.format('stat', btn)] = (
locals()['test_{0}_file_watcher_is_watching'.format('stat')])
class TestFilesystemWatchers(TestCase):
def do_test_for_change(self, watcher, path):
st = monotonic()
@ -36,21 +97,19 @@ class TestFilesystemWatchers(TestCase):
sleep(0.1)
self.fail('The change to {0} was not detected'.format(path))
def test_file_watcher(self):
try:
w = create_file_watcher(pl=get_fallback_logger(), watcher_type='inotify')
except INotifyError:
raise SkipTest('This test is not suitable for a stat based file watcher')
return self.do_test_file_watcher(w)
def do_test_file_watcher(self, w):
def do_test_file_watcher(self, w, use_bytes=False):
try:
f1, f2, f3 = map(lambda x: os.path.join(INOTIFY_DIR, 'file%d' % x), (1, 2, 3))
ne = os.path.join(INOTIFY_DIR, 'notexists')
if use_bytes:
f1 = f1.encode('utf-8')
f2 = f2.encode('utf-8')
f3 = f3.encode('utf-8')
ne = ne.encode('utf-8')
with open(f1, 'wb'):
with open(f2, 'wb'):
with open(f3, 'wb'):
pass
ne = os.path.join(INOTIFY_DIR, 'notexists')
self.assertRaises(OSError, w, ne)
self.assertTrue(w(f1))
self.assertTrue(w(f2))
@ -87,87 +146,70 @@ class TestFilesystemWatchers(TestCase):
finally:
clear_dir(INOTIFY_DIR)
def test_uv_file_watcher(self):
raise SkipTest('Uv watcher tests are not stable')
def do_test_tree_watcher(self, tw, use_bytes=False):
try:
w = create_file_watcher(pl=get_fallback_logger(), watcher_type='uv')
except UvNotFound:
raise SkipTest('Pyuv is not available')
return self.do_test_file_watcher(w)
def test_tree_watcher(self):
tw = create_tree_watcher(get_fallback_logger())
return self.do_test_tree_watcher(tw)
def do_test_tree_watcher(self, tw):
try:
subdir = os.path.join(INOTIFY_DIR, 'subdir')
inotify_dir = INOTIFY_DIR
subdir = os.path.join(inotify_dir, 'subdir')
t1 = os.path.join(inotify_dir, 'tree1')
ts1 = os.path.join(subdir, 'tree1')
suffix = '1'
f = os.path.join(subdir, 'f')
if use_bytes:
inotify_dir = inotify_dir.encode('utf-8')
subdir = subdir.encode('utf-8')
t1 = t1.encode('utf-8')
ts1 = ts1.encode('utf-8')
suffix = suffix.encode('utf-8')
f = f.encode('utf-8')
os.mkdir(subdir)
try:
if tw.watch(INOTIFY_DIR).is_dummy:
if tw.watch(inotify_dir).is_dummy:
raise SkipTest('No tree watcher available')
except UvNotFound:
raise SkipTest('Pyuv is not available')
self.assertTrue(tw(INOTIFY_DIR))
self.assertFalse(tw(INOTIFY_DIR))
changed = partial(self.do_test_for_change, tw, INOTIFY_DIR)
open(os.path.join(INOTIFY_DIR, 'tree1'), 'w').close()
except INotifyError:
raise SkipTest('INotify is not available')
self.assertTrue(tw(inotify_dir))
self.assertFalse(tw(inotify_dir))
changed = partial(self.do_test_for_change, tw, inotify_dir)
open(t1, 'w').close()
changed()
open(os.path.join(subdir, 'tree1'), 'w').close()
open(ts1, 'w').close()
changed()
os.unlink(os.path.join(subdir, 'tree1'))
os.unlink(ts1)
changed()
os.rmdir(subdir)
changed()
os.mkdir(subdir)
changed()
os.rename(subdir, subdir + '1')
os.rename(subdir, subdir + suffix)
changed()
shutil.rmtree(subdir + '1')
shutil.rmtree(subdir + suffix)
changed()
os.mkdir(subdir)
f = os.path.join(subdir, 'f')
open(f, 'w').close()
changed()
with open(f, 'a') as s:
s.write(' ')
changed()
os.rename(f, f + '1')
os.rename(f, f + suffix)
changed()
finally:
clear_dir(INOTIFY_DIR)
clear_dir(inotify_dir)
def test_uv_tree_watcher(self):
raise SkipTest('Uv watcher tests are not stable')
tw = create_tree_watcher(get_fallback_logger(), 'uv')
return self.do_test_tree_watcher(tw)
def test_inotify_file_watcher_is_watching(self):
try:
w = create_file_watcher(pl=get_fallback_logger(), watcher_type='inotify')
except INotifyError:
raise SkipTest('INotify is not available')
return self.do_test_file_watcher_is_watching(w)
def test_stat_file_watcher_is_watching(self):
w = create_file_watcher(pl=get_fallback_logger(), watcher_type='stat')
return self.do_test_file_watcher_is_watching(w)
def test_uv_file_watcher_is_watching(self):
try:
w = create_file_watcher(pl=get_fallback_logger(), watcher_type='uv')
except UvNotFound:
raise SkipTest('Pyuv is not available')
return self.do_test_file_watcher_is_watching(w)
def do_test_file_watcher_is_watching(self, w):
def do_test_file_watcher_is_watching(self, w, use_bytes=False):
try:
f1, f2, f3 = map(lambda x: os.path.join(INOTIFY_DIR, 'file%d' % x), (1, 2, 3))
ne = os.path.join(INOTIFY_DIR, 'notexists')
if use_bytes:
f1 = f1.encode('utf-8')
f2 = f2.encode('utf-8')
f3 = f3.encode('utf-8')
ne = ne.encode('utf-8')
with open(f1, 'wb'):
with open(f2, 'wb'):
with open(f3, 'wb'):
pass
ne = os.path.join(INOTIFY_DIR, 'notexists')
self.assertRaises(OSError, w, ne)
try:
w(ne)
@ -180,6 +222,8 @@ class TestFilesystemWatchers(TestCase):
finally:
clear_dir(INOTIFY_DIR)
set_watcher_tests(locals())
old_cwd = None

View File

@ -651,6 +651,7 @@ def _get_segment_info():
'tabnr': tabpage.number,
'window_id': window._window_id,
'mode': mode,
'encoding': options['encoding'],
}