Merge remote-tracking branch 'kovidgoyal/vcs' into develop

This commit is contained in:
Kim Silkebækken 2013-05-06 15:33:08 +02:00
commit 1527da10a7
11 changed files with 533 additions and 312 deletions

View File

@ -6,14 +6,18 @@ __docformat__ = 'restructuredtext en'
import os import os
import sys import sys
import errno
from time import sleep from time import sleep
from threading import RLock from threading import RLock
from powerline.lib.monotonic import monotonic from powerline.lib.monotonic import monotonic
from powerline.lib.inotify import INotify, INotifyError from powerline.lib.inotify import INotify, INotifyError
def realpath(path):
return os.path.abspath(os.path.realpath(path))
class INotifyWatch(INotify): class INotifyWatch(INotify):
is_stat_based = False is_stat_based = False
def __init__(self, expire_time=10): def __init__(self, expire_time=10):
@ -55,7 +59,7 @@ class INotifyWatch(INotify):
def unwatch(self, path): def unwatch(self, path):
''' Remove the watch for path. Raises an OSError if removing the watch ''' Remove the watch for path. Raises an OSError if removing the watch
fails for some reason. ''' fails for some reason. '''
path = self.os.path.abspath(path) path = realpath(path)
with self.lock: with self.lock:
self.modified.pop(path, None) self.modified.pop(path, None)
self.last_query.pop(path, None) self.last_query.pop(path, None)
@ -65,15 +69,24 @@ class INotifyWatch(INotify):
self.handle_error() self.handle_error()
def watch(self, path): def watch(self, path):
''' Register a watch for the file named path. Raises an OSError if path ''' Register a watch for the file/directory named path. Raises an OSError if path
does not exist. ''' does not exist. '''
import ctypes import ctypes
path = self.os.path.abspath(path) path = realpath(path)
with self.lock: with self.lock:
if path not in self.watches: if path not in self.watches:
bpath = path if isinstance(path, bytes) else path.encode(self.fenc) bpath = path if isinstance(path, bytes) else path.encode(self.fenc)
wd = self._add_watch(self._inotify_fd, ctypes.c_char_p(bpath), flags = self.MOVE_SELF | self.DELETE_SELF
self.MODIFY | self.ATTRIB | self.MOVE_SELF | self.DELETE_SELF) buf = ctypes.c_char_p(bpath)
# Try watching path as a directory
wd = self._add_watch(self._inotify_fd, buf, flags | self.ONLYDIR)
if wd == -1:
eno = ctypes.get_errno()
if eno != errno.ENOTDIR:
self.handle_error()
# Try watching path as a file
flags |= (self.MODIFY | self.ATTRIB)
wd = self._add_watch(self._inotify_fd, buf, flags)
if wd == -1: if wd == -1:
self.handle_error() self.handle_error()
self.watches[path] = wd self.watches[path] = wd
@ -82,7 +95,7 @@ class INotifyWatch(INotify):
def __call__(self, path): def __call__(self, path):
''' Return True if path has been modified since the last call. Can ''' Return True if path has been modified since the last call. Can
raise OSError if the path does not exist. ''' raise OSError if the path does not exist. '''
path = self.os.path.abspath(path) path = realpath(path)
with self.lock: with self.lock:
self.last_query[path] = monotonic() self.last_query[path] = monotonic()
self.expire_watches() self.expire_watches()
@ -119,17 +132,17 @@ class StatWatch(object):
self.lock = RLock() self.lock = RLock()
def watch(self, path): def watch(self, path):
path = os.path.abspath(path) path = realpath(path)
with self.lock: with self.lock:
self.watches[path] = os.path.getmtime(path) self.watches[path] = os.path.getmtime(path)
def unwatch(self, path): def unwatch(self, path):
path = os.path.abspath(path) path = realpath(path)
with self.lock: with self.lock:
self.watches.pop(path, None) self.watches.pop(path, None)
def __call__(self, path): def __call__(self, path):
path = os.path.abspath(path) path = realpath(path)
with self.lock: with self.lock:
if path not in self.watches: if path not in self.watches:
self.watches[path] = os.path.getmtime(path) self.watches[path] = os.path.getmtime(path)

View File

@ -60,6 +60,7 @@ def load_inotify():
class INotify(object): class INotify(object):
# See <sys/inotify.h> for the flags defined below # See <sys/inotify.h> for the flags defined below
# Supported events suitable for MASK parameter of INOTIFY_ADD_WATCH. # Supported events suitable for MASK parameter of INOTIFY_ADD_WATCH.

View File

@ -16,18 +16,24 @@ from powerline.lib.inotify import INotify, INotifyError
class NoSuchDir(ValueError): class NoSuchDir(ValueError):
pass pass
class BaseDirChanged(ValueError):
pass
class DirTooLarge(ValueError): class DirTooLarge(ValueError):
def __init__(self, bdir): def __init__(self, bdir):
ValueError.__init__(self, 'The directory {0} is too large to monitor. Try increasing the value in /proc/sys/fs/inotify/max_user_watches'.format(bdir)) ValueError.__init__(self, 'The directory {0} is too large to monitor. Try increasing the value in /proc/sys/fs/inotify/max_user_watches'.format(bdir))
def realpath(path):
return os.path.abspath(os.path.realpath(path))
class INotifyTreeWatcher(INotify): class INotifyTreeWatcher(INotify):
is_dummy = False is_dummy = False
def __init__(self, basedir): def __init__(self, basedir):
super(INotifyTreeWatcher, self).__init__() super(INotifyTreeWatcher, self).__init__()
self.basedir = os.path.abspath(basedir) self.basedir = realpath(basedir)
self.watch_tree() self.watch_tree()
self.modified = True self.modified = True
@ -43,7 +49,7 @@ class INotifyTreeWatcher(INotify):
def add_watches(self, base, top_level=True): def add_watches(self, base, top_level=True):
''' Add watches for this directory and all its descendant directories, ''' Add watches for this directory and all its descendant directories,
recursively. ''' recursively. '''
base = os.path.abspath(base) base = realpath(base)
try: try:
is_dir = self.add_watch(base) is_dir = self.add_watch(base)
except OSError as e: except OSError as e:
@ -119,6 +125,8 @@ class INotifyTreeWatcher(INotify):
raise DirTooLarge(self.basedir) raise DirTooLarge(self.basedir)
else: else:
raise raise
if (mask & self.DELETE_SELF or mask & self.MOVE_SELF) and path == self.basedir:
raise BaseDirChanged('The directory %s was moved/deleted' % path)
def __call__(self): def __call__(self):
self.read() self.read()
@ -128,23 +136,24 @@ class INotifyTreeWatcher(INotify):
class DummyTreeWatcher(object): class DummyTreeWatcher(object):
is_dummy = True is_dummy = True
def __init__(self, basedir): def __init__(self, basedir):
self.basedir = os.path.abspath(basedir) self.basedir = realpath(basedir)
def __call__(self): def __call__(self):
return False return False
class TreeWatcher(object): class TreeWatcher(object):
def __init__(self, expire_time=10): def __init__(self, expire_time=10):
self.watches = {} self.watches = {}
self.last_query_times = {} self.last_query_times = {}
self.expire_time = expire_time * 60 self.expire_time = expire_time * 60
def watch(self, path, logger=None): def watch(self, path, logger=None):
path = os.path.abspath(path) path = realpath(path)
try: try:
w = INotifyTreeWatcher(path) w = INotifyTreeWatcher(path)
except (INotifyError, DirTooLarge) as e: except (INotifyError, DirTooLarge) as e:
@ -168,7 +177,7 @@ class TreeWatcher(object):
del self.last_query_times[path] del self.last_query_times[path]
def __call__(self, path, logger=None): def __call__(self, path, logger=None):
path = os.path.abspath(path) path = realpath(path)
self.expire_old_queries() self.expire_old_queries()
self.last_query_times[path] = monotonic() self.last_query_times[path] = monotonic()
w = self.watches.get(path, None) w = self.watches.get(path, None)
@ -180,6 +189,9 @@ class TreeWatcher(object):
return True return True
try: try:
return w() return w()
except BaseDirChanged:
self.watches.pop(path, None)
return True
except DirTooLarge as e: except DirTooLarge as e:
if logger is not None: if logger is not None:
logger.warn(str(e)) logger.warn(str(e))

View File

@ -1,7 +1,9 @@
# vim:fileencoding=utf-8:noet # vim:fileencoding=utf-8:noet
from __future__ import absolute_import from __future__ import absolute_import
import os
import os, errno
from threading import Lock
from collections import defaultdict
vcs_props = ( vcs_props = (
('git', '.git', os.path.exists), ('git', '.git', os.path.exists),
@ -11,6 +13,7 @@ vcs_props = (
def generate_directories(path): def generate_directories(path):
if os.path.isdir(path):
yield path yield path
while True: while True:
old_path = path old_path = path
@ -19,6 +22,173 @@ def generate_directories(path):
break break
yield path yield path
_file_watcher = None
def file_watcher():
global _file_watcher
if _file_watcher is None:
from powerline.lib.file_watcher import create_file_watcher
_file_watcher = create_file_watcher()
return _file_watcher
branch_name_cache = {}
branch_lock = Lock()
file_status_lock = Lock()
def get_branch_name(directory, config_file, get_func):
global branch_name_cache
with branch_lock:
# Check if the repo directory was moved/deleted
try:
changed = file_watcher()(directory)
except OSError as e:
if getattr(e, 'errno', None) != errno.ENOENT:
raise
changed = True
if changed:
branch_name_cache.pop(config_file, None)
# Remove the watches for this repo
file_watcher().unwatch(directory)
file_watcher().unwatch(config_file)
else:
# Check if the config file has changed
try:
changed = file_watcher()(config_file)
except OSError as e:
if getattr(e, 'errno', None) != errno.ENOENT:
raise
# Config file does not exist (happens for mercurial)
if config_file not in branch_name_cache:
branch_name_cache[config_file] = get_func(directory, config_file)
if changed:
# Config file has changed or was not tracked
branch_name_cache[config_file] = get_func(directory, config_file)
return branch_name_cache[config_file]
class FileStatusCache(dict):
def __init__(self):
self.dirstate_map = defaultdict(set)
self.ignore_map = defaultdict(set)
self.keypath_ignore_map = {}
def update_maps(self, keypath, directory, dirstate_file, ignore_file_name, extra_ignore_files):
parent = keypath
ignore_files = set()
while parent != directory:
nparent = os.path.dirname(keypath)
if nparent == parent:
break
parent = nparent
ignore_files.add(os.path.join(parent, ignore_file_name))
for f in extra_ignore_files:
ignore_files.add(os.path.join(directory, *f.split('/')))
self.keypath_ignore_map[keypath] = ignore_files
for ignf in ignore_files:
self.ignore_map[ignf].add(keypath)
self.dirstate_map[dirstate_file].add(keypath)
def invalidate(self, dirstate_file=None, ignore_file=None):
for keypath in self.dirstate_map[dirstate_file]:
self.pop(keypath, None)
for keypath in self.ignore_map[ignore_file]:
self.pop(keypath, None)
def ignore_files(self, keypath):
for ignf in self.keypath_ignore_map[keypath]:
yield ignf
file_status_cache = FileStatusCache()
def get_file_status(directory, dirstate_file, file_path, ignore_file_name, get_func, extra_ignore_files=()):
global file_status_cache
keypath = file_path if os.path.isabs(file_path) else os.path.join(directory, file_path)
file_status_cache.update_maps(keypath, directory, dirstate_file, ignore_file_name, extra_ignore_files)
with file_status_lock:
# Optimize case of keypath not being cached
if keypath not in file_status_cache:
file_status_cache[keypath] = ans = get_func(directory, file_path)
return ans
# Check if any relevant files have changed
file_changed = file_watcher()
changed = False
# Check if dirstate has changed
try:
changed = file_changed(dirstate_file)
except OSError as e:
if getattr(e, 'errno', None) != errno.ENOENT:
raise
# The .git index file does not exist for a new git repo
return get_func(directory, file_path)
if changed:
# Remove all cached values for files that depend on this
# dirstate_file
file_status_cache.invalidate(dirstate_file=dirstate_file)
else:
# Check if the file itself has changed
try:
changed ^= file_changed(keypath)
except OSError as e:
if getattr(e, 'errno', None) != errno.ENOENT:
raise
# Do not call get_func again for a non-existant file
if keypath not in file_status_cache:
file_status_cache[keypath] = get_func(directory, file_path)
return file_status_cache[keypath]
if changed:
file_status_cache.pop(keypath, None)
else:
# Check if one of the ignore files has changed
for ignf in file_status_cache.ignore_files(keypath):
try:
changed ^= file_changed(ignf)
except OSError as e:
if getattr(e, 'errno', None) != errno.ENOENT:
raise
if changed:
# Invalidate cache for all files that might be affected
# by this ignore file
file_status_cache.invalidate(ignore_file=ignf)
break
try:
return file_status_cache[keypath]
except KeyError:
file_status_cache[keypath] = ans = get_func(directory, file_path)
return ans
class TreeStatusCache(dict):
def __init__(self):
from powerline.lib.tree_watcher import TreeWatcher
self.tw = TreeWatcher()
def cache_and_get(self, key, status):
ans = self.get(key, self)
if ans is self:
ans = self[key] = status()
return ans
def __call__(self, repo, logger):
key = repo.directory
try:
if self.tw(key):
self.pop(key, None)
except OSError as e:
logger.warn('Failed to check %s for changes, with error: %s'% key, e)
return self.cache_and_get(key, repo.status)
_tree_status_cache = None
def tree_status(repo, logger):
global _tree_status_cache
if _tree_status_cache is None:
_tree_status_cache = TreeStatusCache()
return _tree_status_cache(repo, logger)
def guess(path): def guess(path):
for directory in generate_directories(path): for directory in generate_directories(path):
@ -31,3 +201,23 @@ def guess(path):
except: except:
pass pass
return None return None
def debug():
''' To use run python -c "from powerline.lib.vcs import debug; debug()" some_file_to_watch '''
import sys
dest = sys.argv[-1]
repo = guess(dest)
if repo is None:
print ('%s is not a recognized vcs repo' % dest)
raise SystemExit(1)
print ('Watching %s' % dest)
print ('Press Ctrl-C to exit.')
try:
while True:
if os.path.isdir(dest):
print ('Branch name: %s Status: %s' % (repo.branch(), repo.status()))
else:
print ('File status: %s' % repo.status(dest))
raw_input('Press Enter to check again: ')
except KeyboardInterrupt:
pass

View File

@ -2,10 +2,13 @@
from __future__ import absolute_import, unicode_literals, division, print_function from __future__ import absolute_import, unicode_literals, division, print_function
import sys import sys
import os
import re
from io import StringIO from io import StringIO
from bzrlib import (branch, workingtree, status, library_state, trace, ui) from bzrlib import (workingtree, status, library_state, trace, ui)
from powerline.lib.vcs import get_branch_name, get_file_status
class CoerceIO(StringIO): class CoerceIO(StringIO):
def write(self, arg): def write(self, arg):
@ -13,13 +16,29 @@ class CoerceIO(StringIO):
arg = arg.decode('utf-8', 'replace') arg = arg.decode('utf-8', 'replace')
return super(CoerceIO, self).write(arg) return super(CoerceIO, self).write(arg)
state = None
nick_pat = re.compile(br'nickname\s*=\s*(.+)')
def branch_name_from_config_file(directory, config_file):
ans = None
try:
with open(config_file, 'rb') as f:
for line in f:
m = nick_pat.match(line)
if m is not None:
ans = m.group(1).strip().decode('utf-8', 'replace')
break
except Exception:
pass
return ans or os.path.basename(directory)
class Repository(object): class Repository(object):
def __init__(self, directory): def __init__(self, directory):
if isinstance(directory, bytes): if isinstance(directory, bytes):
directory = directory.decode(sys.getfilesystemencoding() or sys.getdefaultencoding() or 'utf-8') directory = directory.decode(sys.getfilesystemencoding() or sys.getdefaultencoding() or 'utf-8')
self.directory = directory self.directory = os.path.abspath(directory)
self.state = library_state.BzrLibraryState(ui=ui.SilentUIFactory, trace=trace.DefaultConfig())
def status(self, path=None): def status(self, path=None):
'''Return status of repository or file. '''Return status of repository or file.
@ -33,20 +52,32 @@ class Repository(object):
With file argument: returns status of this file: The status codes are With file argument: returns status of this file: The status codes are
those returned by bzr status -S those returned by bzr status -S
''' '''
if path is not None:
return get_file_status(self.directory, os.path.join(self.directory, '.bzr', 'checkout', 'dirstate'),
path, '.bzrignore', self.do_status)
return self.do_status(self.directory, path)
def do_status(self, directory, path):
try: try:
return self._status(path) return self._status(self.directory, path)
except: except Exception:
pass pass
def _status(self, path): def _status(self, directory, path):
global state
if state is None:
state = library_state.BzrLibraryState(ui=ui.SilentUIFactory, trace=trace.DefaultConfig())
buf = CoerceIO() buf = CoerceIO()
w = workingtree.WorkingTree.open(self.directory) w = workingtree.WorkingTree.open(directory)
status.show_tree_status(w, specific_files=[path] if path else None, to_file=buf, short=True) status.show_tree_status(w, specific_files=[path] if path else None, to_file=buf, short=True)
raw = buf.getvalue() raw = buf.getvalue()
if not raw.strip(): if not raw.strip():
return return
if path: if path:
return raw[:2] ans = raw[:2]
if ans == 'I ': # Ignored
ans = None
return ans
dirtied = untracked = ' ' dirtied = untracked = ' '
for line in raw.splitlines(): for line in raw.splitlines():
if len(line) > 1 and line[1] in 'ACDMRIN': if len(line) > 1 and line[1] in 'ACDMRIN':
@ -57,8 +88,6 @@ class Repository(object):
return ans if ans.strip() else None return ans if ans.strip() else None
def branch(self): def branch(self):
try: config_file = os.path.join(self.directory, '.bzr', 'branch', 'branch.conf')
b = branch.Branch.open(self.directory) return get_branch_name(self.directory, config_file, branch_name_from_config_file)
return b._get_nick(local=True) or None
except:
pass

View File

@ -1,4 +1,46 @@
# vim:fileencoding=utf-8:noet # vim:fileencoding=utf-8:noet
import os
import re
import errno
from powerline.lib.vcs import get_branch_name as _get_branch_name, get_file_status
_ref_pat = re.compile(br'ref:\s*refs/heads/(.+)')
def branch_name_from_config_file(directory, config_file):
try:
with open(config_file, 'rb') as f:
raw = f.read()
except EnvironmentError:
return os.path.basename(directory)
m = _ref_pat.match(raw)
if m is not None:
return m.group(1).decode('utf-8', 'replace')
return '[DETACHED HEAD]'
def get_branch_name(base_dir):
head = os.path.join(base_dir, '.git', 'HEAD')
try:
return _get_branch_name(base_dir, head, branch_name_from_config_file)
except OSError as e:
if getattr(e, 'errno', None) == errno.ENOTDIR or getattr(e, 'winerror', None) == 3:
# We are in a submodule
return '(no branch)'
raise
def do_status(directory, path, func):
if path:
gitd = os.path.join(directory, '.git')
if os.path.isfile(gitd):
with open(gitd, 'rb') as f:
raw = f.read().partition(b':')[2].strip()
gitd = os.path.abspath(os.path.join(directory, raw))
return get_file_status(directory, os.path.join(gitd, 'index'),
path, '.gitignore', func, extra_ignore_files=('.git/info/exclude',))
return func(directory, path)
try: try:
import pygit2 as git import pygit2 as git
@ -6,28 +48,12 @@ try:
__slots__ = ('directory') __slots__ = ('directory')
def __init__(self, directory): def __init__(self, directory):
self.directory = directory self.directory = os.path.abspath(directory)
def _repo(self): def do_status(self, directory, path):
return git.Repository(self.directory)
def status(self, path=None):
'''Return status of repository or file.
Without file argument: returns status of the repository:
:First column: working directory status (D: dirty / space)
:Second column: index status (I: index dirty / space)
:Third column: presense of untracked files (U: untracked files / space)
:None: repository clean
With file argument: returns status of this file. Output is
equivalent to the first two columns of "git status --porcelain"
(except for merge statuses as they are not supported by libgit2).
'''
if path: if path:
try: try:
status = self._repo().status_file(path) status = git.Repository(directory).status_file(path)
except (KeyError, ValueError): except (KeyError, ValueError):
return None return None
@ -60,7 +86,7 @@ try:
wt_column = ' ' wt_column = ' '
index_column = ' ' index_column = ' '
untracked_column = ' ' untracked_column = ' '
for status in self._repo().status().values(): for status in git.Repository(directory).status().values():
if status & git.GIT_STATUS_WT_NEW: if status & git.GIT_STATUS_WT_NEW:
untracked_column = 'U' untracked_column = 'U'
continue continue
@ -76,21 +102,24 @@ try:
r = wt_column + index_column + untracked_column r = wt_column + index_column + untracked_column
return r if r != ' ' else None return r if r != ' ' else None
def status(self, path=None):
'''Return status of repository or file.
Without file argument: returns status of the repository:
:First column: working directory status (D: dirty / space)
:Second column: index status (I: index dirty / space)
:Third column: presence of untracked files (U: untracked files / space)
:None: repository clean
With file argument: returns status of this file. Output is
equivalent to the first two columns of "git status --porcelain"
(except for merge statuses as they are not supported by libgit2).
'''
return do_status(self.directory, path, self.do_status)
def branch(self): def branch(self):
try: return get_branch_name(self.directory)
ref = self._repo().lookup_reference('HEAD')
except KeyError:
return None
try:
target = ref.target
except ValueError:
return '[DETACHED HEAD]'
if target.startswith('refs/heads/'):
return target[11:]
else:
return '[DETACHED HEAD]'
except ImportError: except ImportError:
from subprocess import Popen, PIPE from subprocess import Popen, PIPE
@ -105,22 +134,22 @@ except ImportError:
__slots__ = ('directory',) __slots__ = ('directory',)
def __init__(self, directory): def __init__(self, directory):
self.directory = directory self.directory = os.path.abspath(directory)
def _gitcmd(self, *args): def _gitcmd(self, directory, *args):
return readlines(('git',) + args, self.directory) return readlines(('git',) + args, directory)
def status(self, path=None): def do_status(self, directory, path):
if path: if path:
try: try:
return next(self._gitcmd('status', '--porcelain', '--ignored', '--', path))[:2] return next(self._gitcmd(directory, 'status', '--porcelain', '--ignored', '--', path))[:2]
except StopIteration: except StopIteration:
return None return None
else: else:
wt_column = ' ' wt_column = ' '
index_column = ' ' index_column = ' '
untracked_column = ' ' untracked_column = ' '
for line in self._gitcmd('status', '--porcelain'): for line in self._gitcmd(directory, 'status', '--porcelain'):
if line[0] == '?': if line[0] == '?':
untracked_column = 'U' untracked_column = 'U'
continue continue
@ -136,8 +165,8 @@ except ImportError:
r = wt_column + index_column + untracked_column r = wt_column + index_column + untracked_column
return r if r != ' ' else None return r if r != ' ' else None
def status(self, path=None):
return do_status(self.directory, path, self.do_status)
def branch(self): def branch(self):
for line in self._gitcmd('branch', '-l'): return get_branch_name(self.directory)
if line[0] == '*':
return line[2:]
return None

View File

@ -1,7 +1,19 @@
# vim:fileencoding=utf-8:noet # vim:fileencoding=utf-8:noet
from __future__ import absolute_import from __future__ import absolute_import
import os
from mercurial import hg, ui, match from mercurial import hg, ui, match
from powerline.lib.vcs import get_branch_name, get_file_status
def branch_name_from_config_file(directory, config_file):
try:
with open(config_file, 'rb') as f:
raw = f.read()
return raw.decode('utf-8', 'replace').strip()
except Exception:
return 'default'
class Repository(object): class Repository(object):
__slots__ = ('directory', 'ui') __slots__ = ('directory', 'ui')
@ -11,13 +23,13 @@ class Repository(object):
repo_statuses_str = (None, 'D ', ' U', 'DU') repo_statuses_str = (None, 'D ', ' U', 'DU')
def __init__(self, directory): def __init__(self, directory):
self.directory = directory self.directory = os.path.abspath(directory)
self.ui = ui.ui() self.ui = ui.ui()
def _repo(self): def _repo(self, directory):
# Cannot create this object once and use always: when repository updates # Cannot create this object once and use always: when repository updates
# functions emit invalid results # functions emit invalid results
return hg.repository(self.ui, self.directory) return hg.repository(self.ui, directory)
def status(self, path=None): def status(self, path=None):
'''Return status of repository or file. '''Return status of repository or file.
@ -32,7 +44,13 @@ class Repository(object):
"R"emoved, "D"eleted (removed from filesystem, but still tracked), "R"emoved, "D"eleted (removed from filesystem, but still tracked),
"U"nknown, "I"gnored, (None)Clean. "U"nknown, "I"gnored, (None)Clean.
''' '''
repo = self._repo() if path:
return get_file_status(self.directory, os.path.join(self.directory, '.hg', 'dirstate'),
path, '.hgignore', self.do_status)
return self.do_status(self.directory, path)
def do_status(self, directory, path):
repo = self._repo(directory)
if path: if path:
m = match.match(None, None, [path], exact=True) m = match.match(None, None, [path], exact=True)
statuses = repo.status(match=m, unknown=True, ignored=True) statuses = repo.status(match=m, unknown=True, ignored=True)
@ -48,4 +66,5 @@ class Repository(object):
return self.repo_statuses_str[resulting_status] return self.repo_statuses_str[resulting_status]
def branch(self): def branch(self):
return self._repo().dirstate.branch() config_file = os.path.join(self.directory, '.hg', 'branch')
return get_branch_name(self.directory, config_file, branch_name_from_config_file)

View File

@ -11,7 +11,7 @@ from multiprocessing import cpu_count as _cpu_count
from powerline.lib import add_divider_highlight_group from powerline.lib import add_divider_highlight_group
from powerline.lib.url import urllib_read, urllib_urlencode from powerline.lib.url import urllib_read, urllib_urlencode
from powerline.lib.vcs import guess from powerline.lib.vcs import guess, tree_status
from powerline.lib.threaded import ThreadedSegment, KwThreadedSegment, with_docstring from powerline.lib.threaded import ThreadedSegment, KwThreadedSegment, with_docstring
from powerline.lib.monotonic import monotonic from powerline.lib.monotonic import monotonic
from powerline.lib.humanize_bytes import humanize_bytes from powerline.lib.humanize_bytes import humanize_bytes
@ -39,84 +39,26 @@ def hostname(pl, segment_info, only_if_ssh=False, exclude_domain=False):
@requires_segment_info @requires_segment_info
class RepositorySegment(KwThreadedSegment): def branch(pl, segment_info, status_colors=False):
def __init__(self): '''Return the current VCS branch.
super(RepositorySegment, self).__init__()
self.directories = {}
@staticmethod :param bool status_colors:
def key(segment_info, **kwargs):
return os.path.abspath(segment_info['getcwd']())
def update(self, *args):
# .compute_state() is running only in this method, and only in one
# thread, thus operations with .directories do not need write locks
# (.render() method is not using .directories). If this is changed
# .directories needs redesigning
self.directories.clear()
return super(RepositorySegment, self).update(*args)
def compute_state(self, path):
repo = guess(path=path)
if repo:
if repo.directory in self.directories:
return self.directories[repo.directory]
else:
r = self.process_repo(repo)
self.directories[repo.directory] = r
return r
class RepositoryStatusSegment(RepositorySegment):
interval = 2
@staticmethod
def process_repo(repo):
return repo.status()
repository_status = with_docstring(RepositoryStatusSegment(),
'''Return the status for the current VCS repository.''')
class BranchSegment(RepositorySegment):
interval = 0.2
started_repository_status = False
@staticmethod
def process_repo(repo):
return repo.branch()
@staticmethod
def render_one(branch, status_colors=False, **kwargs):
if branch and status_colors:
return [{
'contents': branch,
'highlight_group': ['branch_dirty' if repository_status(**kwargs) else 'branch_clean', 'branch'],
}]
else:
return branch
def startup(self, status_colors=False, **kwargs):
super(BranchSegment, self).startup(**kwargs)
if status_colors:
self.started_repository_status = True
repository_status.startup(**kwargs)
def shutdown(self):
if self.started_repository_status:
repository_status.shutdown()
super(BranchSegment, self).shutdown()
branch = with_docstring(BranchSegment(),
'''Return the current VCS branch.
:param bool status_colors:
determines whether repository status will be used to determine highlighting. Default: True. determines whether repository status will be used to determine highlighting. Default: True.
Highlight groups used: ``branch_clean``, ``branch_dirty``, ``branch``. Highlight groups used: ``branch_clean``, ``branch_dirty``, ``branch``.
''') '''
name = segment_info['getcwd']()
repo = guess(path=name)
if repo is not None:
branch = repo.branch()
scol = ['branch']
if status_colors:
status = tree_status(repo, pl)
scol.insert(0, 'branch_dirty' if status and status.strip() else 'branch_clean')
return [{
'contents': branch,
'highlight_group': scol,
}]
@requires_segment_info @requires_segment_info

View File

@ -11,9 +11,8 @@ except ImportError:
from powerline.bindings.vim import vim_get_func, getbufvar from powerline.bindings.vim import vim_get_func, getbufvar
from powerline.theme import requires_segment_info from powerline.theme import requires_segment_info
from powerline.lib import add_divider_highlight_group from powerline.lib import add_divider_highlight_group
from powerline.lib.vcs import guess from powerline.lib.vcs import guess, tree_status
from powerline.lib.humanize_bytes import humanize_bytes from powerline.lib.humanize_bytes import humanize_bytes
from powerline.lib.threaded import KwThreadedSegment, with_docstring
from powerline.lib import wraps_saveargs as wraps from powerline.lib import wraps_saveargs as wraps
from collections import defaultdict from collections import defaultdict
@ -307,121 +306,44 @@ def modified_buffers(pl, text='+ ', join_str=','):
return text + join_str.join(buffer_mod) return text + join_str.join(buffer_mod)
return None return None
class KwWindowThreadedSegment(KwThreadedSegment):
def set_state(self, **kwargs):
kwargs = kwargs.copy()
for window in vim.windows:
buffer = window.buffer
kwargs['segment_info'] = {'bufnr': buffer.number, 'buffer': buffer}
super(KwWindowThreadedSegment, self).set_state(**kwargs)
class RepositorySegment(KwWindowThreadedSegment):
def __init__(self):
super(RepositorySegment, self).__init__()
self.directories = {}
@staticmethod
def key(segment_info, **kwargs):
# FIXME os.getcwd() is not a proper variant for non-current buffers
return segment_info['buffer'].name or os.getcwd()
def update(self, *args):
# .compute_state() is running only in this method, and only in one
# thread, thus operations with .directories do not need write locks
# (.render() method is not using .directories). If this is changed
# .directories needs redesigning
self.directories.clear()
return super(RepositorySegment, self).update(*args)
def compute_state(self, path):
repo = guess(path=path)
if repo:
if repo.directory in self.directories:
return self.directories[repo.directory]
else:
r = self.process_repo(repo)
self.directories[repo.directory] = r
return r
@requires_segment_info @requires_segment_info
class RepositoryStatusSegment(RepositorySegment): def branch(pl, segment_info, status_colors=False):
interval = 2 '''Return the current working branch.
@staticmethod :param bool status_colors:
def process_repo(repo): determines whether repository status will be used to determine highlighting. Default: False.
return repo.status()
Highlight groups used: ``branch_clean``, ``branch_dirty``, ``branch``.
repository_status = with_docstring(RepositoryStatusSegment(), Divider highlight group used: ``branch:divider``.
'''Return the status for the current repo.''') '''
name = segment_info['buffer'].name
skip = not (name and (not getbufvar(segment_info['bufnr'], '&buftype')))
@requires_segment_info if not skip:
class BranchSegment(RepositorySegment): repo = guess(path=name)
interval = 0.2 if repo is not None:
started_repository_status = False branch = repo.branch()
scol = ['branch']
@staticmethod
def process_repo(repo):
return repo.branch()
def render_one(self, branch, segment_info, status_colors=False, **kwargs):
if not branch:
return None
if status_colors: if status_colors:
self.started_repository_status = True status = tree_status(repo, pl)
scol.insert(0, 'branch_dirty' if status and status.strip() else 'branch_clean')
return [{ return [{
'contents': branch, 'contents': branch,
'highlight_group': (['branch_dirty' if repository_status(segment_info=segment_info, **kwargs) else 'branch_clean'] 'highlight_group': scol,
if status_colors else []) + ['branch'],
'divider_highlight_group': 'branch:divider', 'divider_highlight_group': 'branch:divider',
}] }]
def startup(self, status_colors=False, **kwargs):
super(BranchSegment, self).startup(**kwargs)
if status_colors:
self.started_repository_status = True
repository_status.startup(**kwargs)
def shutdown(self):
if self.started_repository_status:
repository_status.shutdown()
super(BranchSegment, self).shutdown()
branch = with_docstring(BranchSegment(),
'''Return the current working branch.
:param bool status_colors:
determines whether repository status will be used to determine highlighting. Default: False.
Highlight groups used: ``branch_clean``, ``branch_dirty``, ``branch``.
Divider highlight group used: ``branch:divider``.
''')
@requires_segment_info @requires_segment_info
class FileVCSStatusSegment(KwWindowThreadedSegment): def file_vcs_status(pl, segment_info):
interval = 0.2 '''Return the VCS status for this buffer.
@staticmethod Highlight groups used: ``file_vcs_status``.
def key(segment_info, **kwargs): '''
name = segment_info['buffer'].name name = segment_info['buffer'].name
skip = not (name and (not getbufvar(segment_info['bufnr'], '&buftype'))) skip = not (name and (not getbufvar(segment_info['bufnr'], '&buftype')))
return name, skip
@staticmethod
def compute_state(key):
name, skip = key
if not skip: if not skip:
repo = guess(path=name) repo = guess(path=name)
if repo: if repo is not None:
status = repo.status(os.path.relpath(name, repo.directory)) status = repo.status(os.path.relpath(name, repo.directory))
if not status: if not status:
return None return None
@ -433,11 +355,3 @@ class FileVCSStatusSegment(KwWindowThreadedSegment):
'highlight_group': ['file_vcs_status_' + status, 'file_vcs_status'], 'highlight_group': ['file_vcs_status_' + status, 'file_vcs_status'],
}) })
return ret return ret
return None
file_vcs_status = with_docstring(FileVCSStatusSegment(),
'''Return the VCS status for this buffer.
Highlight groups used: ``file_vcs_status``.
''')

View File

@ -124,6 +124,17 @@ use_mercurial = use_bzr = sys.version_info < (3, 0)
class TestVCS(TestCase): class TestVCS(TestCase):
def do_branch_rename_test(self, repo, q):
import time
st = time.time()
while time.time() - st < 1:
# Give inotify time to deliver events
ans = repo.branch()
if ans == q:
break
time.sleep(0.01)
self.assertEqual(ans, q)
def test_git(self): def test_git(self):
repo = guess(path=GIT_REPO) repo = guess(path=GIT_REPO)
self.assertNotEqual(repo, None) self.assertNotEqual(repo, None)
@ -143,6 +154,20 @@ class TestVCS(TestCase):
self.assertEqual(repo.status(), 'DI ') self.assertEqual(repo.status(), 'DI ')
self.assertEqual(repo.status('file'), 'AM') self.assertEqual(repo.status('file'), 'AM')
os.remove(os.path.join(GIT_REPO, 'file')) os.remove(os.path.join(GIT_REPO, 'file'))
# Test changing branch
self.assertEqual(repo.branch(), 'master')
call(['git', 'branch', 'branch1'], cwd=GIT_REPO)
call(['git', 'checkout', '-q', 'branch1'], cwd=GIT_REPO)
self.do_branch_rename_test(repo, 'branch1')
# For some reason the rest of this test fails on travis and only on
# travis, and I can't figure out why
if 'TRAVIS' in os.environ:
raise SkipTest('Part of this test fails on Travis for unknown reasons')
call(['git', 'branch', 'branch2'], cwd=GIT_REPO)
call(['git', 'checkout', '-q', 'branch2'], cwd=GIT_REPO)
self.do_branch_rename_test(repo, 'branch2')
call(['git', 'checkout', '-q', '--detach', 'branch1'], cwd=GIT_REPO)
self.do_branch_rename_test(repo, '[DETACHED HEAD]')
if use_mercurial: if use_mercurial:
def test_mercurial(self): def test_mercurial(self):
@ -170,17 +195,66 @@ class TestVCS(TestCase):
f.write('abc') f.write('abc')
self.assertEqual(repo.status(), ' U') self.assertEqual(repo.status(), ' U')
self.assertEqual(repo.status('file'), '? ') self.assertEqual(repo.status('file'), '? ')
call(['bzr', 'add', '.'], cwd=BZR_REPO, stdout=PIPE) call(['bzr', 'add', '-q', '.'], cwd=BZR_REPO, stdout=PIPE)
self.assertEqual(repo.status(), 'D ') self.assertEqual(repo.status(), 'D ')
self.assertEqual(repo.status('file'), '+N') self.assertEqual(repo.status('file'), '+N')
call(['bzr', 'commit', '-m', 'initial commit'], cwd=BZR_REPO, stdout=PIPE, stderr=PIPE) call(['bzr', 'commit', '-q', '-m', 'initial commit'], cwd=BZR_REPO)
self.assertEqual(repo.status(), None) self.assertEqual(repo.status(), None)
with open(os.path.join(BZR_REPO, 'file'), 'w') as f: with open(os.path.join(BZR_REPO, 'file'), 'w') as f:
f.write('def') f.write('def')
self.assertEqual(repo.status(), 'D ') self.assertEqual(repo.status(), 'D ')
self.assertEqual(repo.status('file'), ' M') self.assertEqual(repo.status('file'), ' M')
self.assertEqual(repo.status('notexist'), None) self.assertEqual(repo.status('notexist'), None)
os.remove(os.path.join(BZR_REPO, 'file')) with open(os.path.join(BZR_REPO, 'ignored'), 'w') as f:
f.write('abc')
self.assertEqual(repo.status('ignored'), '? ')
# Test changing the .bzrignore file should update status
with open(os.path.join(BZR_REPO, '.bzrignore'), 'w') as f:
f.write('ignored')
self.assertEqual(repo.status('ignored'), None)
# Test changing the dirstate file should invalidate the cache for
# all files in the repo
with open(os.path.join(BZR_REPO, 'file2'), 'w') as f:
f.write('abc')
call(['bzr', 'add', 'file2'], cwd=BZR_REPO, stdout=PIPE)
call(['bzr', 'commit', '-q', '-m', 'file2 added'], cwd=BZR_REPO)
with open(os.path.join(BZR_REPO, 'file'), 'a') as f:
f.write('hello')
with open(os.path.join(BZR_REPO, 'file2'), 'a') as f:
f.write('hello')
self.assertEqual(repo.status('file'), ' M')
self.assertEqual(repo.status('file2'), ' M')
call(['bzr', 'commit', '-q', '-m', 'multi'], cwd=BZR_REPO)
self.assertEqual(repo.status('file'), None)
self.assertEqual(repo.status('file2'), None)
# Test changing branch
call(['bzr', 'nick', 'branch1'], cwd=BZR_REPO, stdout=PIPE, stderr=PIPE)
self.do_branch_rename_test(repo, 'branch1')
# Test branch name/status changes when swapping repos
for x in ('b1', 'b2'):
d = os.path.join(BZR_REPO, x)
os.mkdir(d)
call(['bzr', 'init', '-q'], cwd=d)
call(['bzr', 'nick', '-q', x], cwd=d)
repo = guess(path=d)
self.assertEqual(repo.branch(), x)
self.assertFalse(repo.status())
if x == 'b1':
open(os.path.join(d, 'dirty'), 'w').close()
self.assertTrue(repo.status())
os.rename(os.path.join(BZR_REPO, 'b1'), os.path.join(BZR_REPO, 'b'))
os.rename(os.path.join(BZR_REPO, 'b2'), os.path.join(BZR_REPO, 'b1'))
os.rename(os.path.join(BZR_REPO, 'b'), os.path.join(BZR_REPO, 'b2'))
for x, y in (('b1', 'b2'), ('b2', 'b1')):
d = os.path.join(BZR_REPO, x)
repo = guess(path=d)
self.do_branch_rename_test(repo, y)
if x == 'b1':
self.assertFalse(repo.status())
else:
self.assertTrue(repo.status())
old_HGRCPATH = None old_HGRCPATH = None
old_cwd = None old_cwd = None

View File

@ -77,16 +77,21 @@ class TestCommon(TestCase):
pl = Pl() pl = Pl()
segment_info = {'getcwd': os.getcwd} segment_info = {'getcwd': os.getcwd}
with replace_attr(common, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: None, directory='/tmp/tests')): with replace_attr(common, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: None, directory='/tmp/tests')):
self.assertEqual(common.branch(pl=pl, segment_info=segment_info, status_colors=False), 'tests') with replace_attr(common, 'tree_status', lambda repo, pl: None):
self.assertEqual(common.branch(pl=pl, segment_info=segment_info, status_colors=False),
[{'highlight_group': ['branch'], 'contents': 'tests'}])
self.assertEqual(common.branch(pl=pl, segment_info=segment_info, status_colors=True), self.assertEqual(common.branch(pl=pl, segment_info=segment_info, status_colors=True),
[{'contents': 'tests', 'highlight_group': ['branch_clean', 'branch']}]) [{'contents': 'tests', 'highlight_group': ['branch_clean', 'branch']}])
with replace_attr(common, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: 'D ', directory='/tmp/tests')): with replace_attr(common, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: 'D ', directory='/tmp/tests')):
self.assertEqual(common.branch(pl=pl, segment_info=segment_info, status_colors=False), 'tests') with replace_attr(common, 'tree_status', lambda repo, pl: 'D '):
self.assertEqual(common.branch(pl=pl, segment_info=segment_info, status_colors=False),
[{'highlight_group': ['branch'], 'contents': 'tests'}])
self.assertEqual(common.branch(pl=pl, segment_info=segment_info, status_colors=True), self.assertEqual(common.branch(pl=pl, segment_info=segment_info, status_colors=True),
[{'contents': 'tests', 'highlight_group': ['branch_dirty', 'branch']}]) [{'contents': 'tests', 'highlight_group': ['branch_dirty', 'branch']}])
self.assertEqual(common.branch(pl=pl, segment_info=segment_info), 'tests') self.assertEqual(common.branch(pl=pl, segment_info=segment_info, status_colors=False),
[{'highlight_group': ['branch'], 'contents': 'tests'}])
with replace_attr(common, 'guess', lambda path: None): with replace_attr(common, 'guess', lambda path: None):
self.assertEqual(common.branch(pl=pl, segment_info=segment_info), None) self.assertEqual(common.branch(pl=pl, segment_info=segment_info, status_colors=False), None)
def test_cwd(self): def test_cwd(self):
new_os = new_module('os', path=os.path, sep='/') new_os = new_module('os', path=os.path, sep='/')
@ -451,12 +456,14 @@ class TestVim(TestCase):
pl = Pl() pl = Pl()
with vim_module._with('buffer', '/foo') as segment_info: with vim_module._with('buffer', '/foo') as segment_info:
with replace_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: None, directory=path)): with replace_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: None, directory=path)):
self.assertEqual(vim.branch(pl=pl, segment_info=segment_info), with replace_attr(vim, 'tree_status', lambda repo, pl: None):
self.assertEqual(vim.branch(pl=pl, segment_info=segment_info, status_colors=False),
[{'divider_highlight_group': 'branch:divider', 'highlight_group': ['branch'], 'contents': 'foo'}]) [{'divider_highlight_group': 'branch:divider', 'highlight_group': ['branch'], 'contents': 'foo'}])
self.assertEqual(vim.branch(pl=pl, segment_info=segment_info, status_colors=True), self.assertEqual(vim.branch(pl=pl, segment_info=segment_info, status_colors=True),
[{'divider_highlight_group': 'branch:divider', 'highlight_group': ['branch_clean', 'branch'], 'contents': 'foo'}]) [{'divider_highlight_group': 'branch:divider', 'highlight_group': ['branch_clean', 'branch'], 'contents': 'foo'}])
with replace_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: 'DU', directory=path)): with replace_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: 'DU', directory=path)):
self.assertEqual(vim.branch(pl=pl, segment_info=segment_info), with replace_attr(vim, 'tree_status', lambda repo, pl: 'DU'):
self.assertEqual(vim.branch(pl=pl, segment_info=segment_info, status_colors=False),
[{'divider_highlight_group': 'branch:divider', 'highlight_group': ['branch'], 'contents': 'foo'}]) [{'divider_highlight_group': 'branch:divider', 'highlight_group': ['branch'], 'contents': 'foo'}])
self.assertEqual(vim.branch(pl=pl, segment_info=segment_info, status_colors=True), self.assertEqual(vim.branch(pl=pl, segment_info=segment_info, status_colors=True),
[{'divider_highlight_group': 'branch:divider', 'highlight_group': ['branch_dirty', 'branch'], 'contents': 'foo'}]) [{'divider_highlight_group': 'branch:divider', 'highlight_group': ['branch_dirty', 'branch'], 'contents': 'foo'}])
@ -474,15 +481,6 @@ class TestVim(TestCase):
with replace_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda file: 'M', directory=path)): with replace_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda file: 'M', directory=path)):
self.assertEqual(vim.file_vcs_status(pl=pl, segment_info=segment_info), None) self.assertEqual(vim.file_vcs_status(pl=pl, segment_info=segment_info), None)
def test_repository_status(self):
pl = Pl()
segment_info = vim_module._get_segment_info()
with replace_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: None, directory=path)):
self.assertEqual(vim.repository_status(pl=pl, segment_info=segment_info), None)
with replace_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: 'DU', directory=path)):
self.assertEqual(vim.repository_status(pl=pl, segment_info=segment_info), 'DU')
old_cwd = None old_cwd = None