mirror of
https://github.com/powerline/powerline.git
synced 2025-07-25 06:46:33 +02:00
Merge pull request #904 from ZyX-I/disable-inotify-option
Add watcher option
This commit is contained in:
commit
138611412e
@ -128,6 +128,19 @@ Common configuration is a subdictionary that is a value of ``common`` key in
|
|||||||
letters, Cyrillic letters). Valid values: any positive integer; it is
|
letters, Cyrillic letters). Valid values: any positive integer; it is
|
||||||
suggested that you only set it to 1 (default) or 2.
|
suggested that you only set it to 1 (default) or 2.
|
||||||
|
|
||||||
|
``watcher``
|
||||||
|
Select filesystem watcher. Variants are
|
||||||
|
|
||||||
|
======= ===================================
|
||||||
|
Variant Description
|
||||||
|
======= ===================================
|
||||||
|
auto Selects most performant watcher.
|
||||||
|
inotify Select inotify watcher. Linux only.
|
||||||
|
stat Select stat-based polling watcher.
|
||||||
|
======= ===================================
|
||||||
|
|
||||||
|
Default is ``auto``.
|
||||||
|
|
||||||
.. _config-common-additional_escapes:
|
.. _config-common-additional_escapes:
|
||||||
|
|
||||||
``additional_escapes``
|
``additional_escapes``
|
||||||
|
@ -63,7 +63,7 @@ class PowerlineLogger(object):
|
|||||||
_fallback_logger = None
|
_fallback_logger = None
|
||||||
|
|
||||||
|
|
||||||
def _get_fallback_logger():
|
def get_fallback_logger():
|
||||||
global _fallback_logger
|
global _fallback_logger
|
||||||
if _fallback_logger:
|
if _fallback_logger:
|
||||||
return _fallback_logger
|
return _fallback_logger
|
||||||
@ -179,15 +179,31 @@ class Powerline(object):
|
|||||||
self.common_config = config['common']
|
self.common_config = config['common']
|
||||||
if self.common_config != self.prev_common_config:
|
if self.common_config != self.prev_common_config:
|
||||||
common_config_differs = True
|
common_config_differs = True
|
||||||
|
|
||||||
self.prev_common_config = self.common_config
|
self.prev_common_config = self.common_config
|
||||||
self.common_config['paths'] = [os.path.expanduser(path) for path in self.common_config.get('paths', [])]
|
|
||||||
|
self.common_config.setdefault('paths', [])
|
||||||
|
self.common_config.setdefault('watcher', 'auto')
|
||||||
|
self.common_config.setdefault('log_level', 'WARNING')
|
||||||
|
self.common_config.setdefault('log_format', '%(asctime)s:%(levelname)s:%(message)s')
|
||||||
|
self.common_config.setdefault('term_truecolor', False)
|
||||||
|
self.common_config.setdefault('ambiwidth', 1)
|
||||||
|
self.common_config.setdefault('additional_escapes', None)
|
||||||
|
self.common_config.setdefault('reload_config', True)
|
||||||
|
self.common_config.setdefault('interval', None)
|
||||||
|
self.common_config.setdefault('log_file', None)
|
||||||
|
|
||||||
|
self.common_config['paths'] = [
|
||||||
|
os.path.expanduser(path) for path in self.common_config['paths']
|
||||||
|
]
|
||||||
|
|
||||||
self.import_paths = self.common_config['paths']
|
self.import_paths = self.common_config['paths']
|
||||||
|
|
||||||
if not self.logger:
|
if not self.logger:
|
||||||
log_format = self.common_config.get('log_format', '%(asctime)s:%(levelname)s:%(message)s')
|
log_format = self.common_config['log_format']
|
||||||
formatter = logging.Formatter(log_format)
|
formatter = logging.Formatter(log_format)
|
||||||
|
|
||||||
level = getattr(logging, self.common_config.get('log_level', 'WARNING'))
|
level = getattr(logging, self.common_config['log_level'])
|
||||||
handler = self.get_log_handler()
|
handler = self.get_log_handler()
|
||||||
handler.setLevel(level)
|
handler.setLevel(level)
|
||||||
handler.setFormatter(formatter)
|
handler.setFormatter(formatter)
|
||||||
@ -198,15 +214,17 @@ class Powerline(object):
|
|||||||
|
|
||||||
if not self.pl:
|
if not self.pl:
|
||||||
self.pl = PowerlineLogger(self.use_daemon_threads, self.logger, self.ext)
|
self.pl = PowerlineLogger(self.use_daemon_threads, self.logger, self.ext)
|
||||||
if not self.config_loader.pl:
|
self.config_loader.pl = self.pl
|
||||||
self.config_loader.pl = self.pl
|
|
||||||
|
if not self.run_once:
|
||||||
|
self.config_loader.set_watcher(self.common_config['watcher'])
|
||||||
|
|
||||||
self.renderer_options.update(
|
self.renderer_options.update(
|
||||||
pl=self.pl,
|
pl=self.pl,
|
||||||
term_truecolor=self.common_config.get('term_truecolor', False),
|
term_truecolor=self.common_config['term_truecolor'],
|
||||||
ambiwidth=self.common_config.get('ambiwidth', 1),
|
ambiwidth=self.common_config['ambiwidth'],
|
||||||
tmux_escape=self.common_config.get('additional_escapes') == 'tmux',
|
tmux_escape=self.common_config['additional_escapes'] == 'tmux',
|
||||||
screen_escape=self.common_config.get('additional_escapes') == 'screen',
|
screen_escape=self.common_config['additional_escapes'] == 'screen',
|
||||||
theme_kwargs={
|
theme_kwargs={
|
||||||
'ext': self.ext,
|
'ext': self.ext,
|
||||||
'common_config': self.common_config,
|
'common_config': self.common_config,
|
||||||
@ -215,8 +233,8 @@ class Powerline(object):
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
if not self.run_once and self.common_config.get('reload_config', True):
|
if not self.run_once and self.common_config['reload_config']:
|
||||||
interval = self.common_config.get('interval', None)
|
interval = self.common_config['interval']
|
||||||
self.config_loader.set_interval(interval)
|
self.config_loader.set_interval(interval)
|
||||||
self.run_loader_update = (interval is None)
|
self.run_loader_update = (interval is None)
|
||||||
if interval is not None and not self.config_loader.is_alive():
|
if interval is not None and not self.config_loader.is_alive():
|
||||||
@ -278,7 +296,7 @@ class Powerline(object):
|
|||||||
|
|
||||||
:return: logging.Handler subclass.
|
:return: logging.Handler subclass.
|
||||||
'''
|
'''
|
||||||
log_file = self.common_config.get('log_file', None)
|
log_file = self.common_config['log_file']
|
||||||
if log_file:
|
if log_file:
|
||||||
log_file = os.path.expanduser(log_file)
|
log_file = os.path.expanduser(log_file)
|
||||||
log_dir = os.path.dirname(log_file)
|
log_dir = os.path.dirname(log_file)
|
||||||
@ -473,5 +491,5 @@ class Powerline(object):
|
|||||||
def exception(self, msg, *args, **kwargs):
|
def exception(self, msg, *args, **kwargs):
|
||||||
if 'prefix' not in kwargs:
|
if 'prefix' not in kwargs:
|
||||||
kwargs['prefix'] = 'powerline'
|
kwargs['prefix'] = 'powerline'
|
||||||
pl = getattr(self, 'pl', None) or _get_fallback_logger()
|
pl = getattr(self, 'pl', None) or get_fallback_logger()
|
||||||
return pl.exception(msg, *args, **kwargs)
|
return pl.exception(msg, *args, **kwargs)
|
||||||
|
@ -28,14 +28,41 @@ class DummyWatcher(object):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class DeferredWatcher(object):
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
self.args = args
|
||||||
|
self.kwargs = kwargs
|
||||||
|
self.calls = []
|
||||||
|
|
||||||
|
def __call__(self, *args, **kwargs):
|
||||||
|
self.calls.append(('__call__', args, kwargs))
|
||||||
|
|
||||||
|
def watch(self, *args, **kwargs):
|
||||||
|
self.calls.append(('watch', args, kwargs))
|
||||||
|
|
||||||
|
def unwatch(self, *args, **kwargs):
|
||||||
|
self.calls.append(('unwatch', args, kwargs))
|
||||||
|
|
||||||
|
def transfer_calls(self, watcher):
|
||||||
|
for attr, args, kwargs in self.calls:
|
||||||
|
getattr(watcher, attr)(*args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
class ConfigLoader(MultiRunnedThread):
|
class ConfigLoader(MultiRunnedThread):
|
||||||
def __init__(self, shutdown_event=None, watcher=None, load=load_json_config, run_once=False):
|
def __init__(self, shutdown_event=None, watcher=None, watcher_type=None, load=load_json_config, run_once=False):
|
||||||
super(ConfigLoader, self).__init__()
|
super(ConfigLoader, self).__init__()
|
||||||
self.shutdown_event = shutdown_event or Event()
|
self.shutdown_event = shutdown_event or Event()
|
||||||
if run_once:
|
if run_once:
|
||||||
self.watcher = DummyWatcher()
|
self.watcher = DummyWatcher()
|
||||||
|
self.watcher_type = 'dummy'
|
||||||
else:
|
else:
|
||||||
self.watcher = watcher or create_file_watcher()
|
self.watcher = watcher or DeferredWatcher()
|
||||||
|
if watcher:
|
||||||
|
if not watcher_type:
|
||||||
|
raise ValueError('When specifying watcher you must also specify watcher type')
|
||||||
|
self.watcher_type = watcher_type
|
||||||
|
else:
|
||||||
|
self.watcher_type = 'deferred'
|
||||||
self._load = load
|
self._load = load
|
||||||
|
|
||||||
self.pl = None
|
self.pl = None
|
||||||
@ -47,6 +74,16 @@ class ConfigLoader(MultiRunnedThread):
|
|||||||
self.missing = defaultdict(set)
|
self.missing = defaultdict(set)
|
||||||
self.loaded = {}
|
self.loaded = {}
|
||||||
|
|
||||||
|
def set_watcher(self, watcher_type, force=False):
|
||||||
|
if watcher_type == self.watcher_type:
|
||||||
|
return
|
||||||
|
watcher = create_file_watcher(self.pl, watcher_type)
|
||||||
|
with self.lock:
|
||||||
|
if self.watcher_type == 'deferred':
|
||||||
|
self.watcher.transfer_calls(watcher)
|
||||||
|
self.watcher = watcher
|
||||||
|
self.watcher_type = watcher_type
|
||||||
|
|
||||||
def set_pl(self, pl):
|
def set_pl(self, pl):
|
||||||
self.pl = pl
|
self.pl = pl
|
||||||
|
|
||||||
|
@ -19,8 +19,6 @@ def realpath(path):
|
|||||||
|
|
||||||
|
|
||||||
class INotifyWatch(INotify):
|
class INotifyWatch(INotify):
|
||||||
is_stat_based = False
|
|
||||||
|
|
||||||
def __init__(self, expire_time=10):
|
def __init__(self, expire_time=10):
|
||||||
super(INotifyWatch, self).__init__()
|
super(INotifyWatch, self).__init__()
|
||||||
self.watches = {}
|
self.watches = {}
|
||||||
@ -147,8 +145,6 @@ class INotifyWatch(INotify):
|
|||||||
|
|
||||||
|
|
||||||
class StatWatch(object):
|
class StatWatch(object):
|
||||||
is_stat_based = True
|
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.watches = {}
|
self.watches = {}
|
||||||
self.lock = RLock()
|
self.lock = RLock()
|
||||||
@ -184,29 +180,51 @@ class StatWatch(object):
|
|||||||
self.watches.clear()
|
self.watches.clear()
|
||||||
|
|
||||||
|
|
||||||
def create_file_watcher(use_stat=False, expire_time=10):
|
def create_file_watcher(pl, watcher_type='auto', expire_time=10):
|
||||||
'''
|
'''
|
||||||
Create an object that can watch for changes to specified files. To use:
|
Create an object that can watch for changes to specified files
|
||||||
|
|
||||||
watcher = create_file_watcher()
|
Use ``.__call__()`` method of the returned object to start watching the file
|
||||||
watcher(path1) # Will return True if path1 has changed since the last time this was called. Always returns True the first time.
|
or check whether file has changed since last call.
|
||||||
watcher.unwatch(path1)
|
|
||||||
|
|
||||||
Uses inotify if available, otherwise tracks mtimes. expire_time is the
|
Use ``.unwatch()`` method of the returned object to stop watching the file.
|
||||||
number of minutes after the last query for a given path for the inotify
|
|
||||||
watch for that path to be automatically removed. This conserves kernel
|
Uses inotify if available, otherwise tracks mtimes. expire_time is the
|
||||||
|
number of minutes after the last query for a given path for the inotify
|
||||||
|
watch for that path to be automatically removed. This conserves kernel
|
||||||
resources.
|
resources.
|
||||||
|
|
||||||
|
:param PowerlineLogger pl:
|
||||||
|
Logger.
|
||||||
|
:param str watcher_type:
|
||||||
|
One of ``inotify`` (linux only), ``stat``, ``auto``. Determines what
|
||||||
|
watcher will be used. ``auto`` will use ``inotify`` if available.
|
||||||
|
:param int expire_time:
|
||||||
|
Number of minutes since last ``.__call__()`` before inotify watcher will
|
||||||
|
stop watching given file.
|
||||||
'''
|
'''
|
||||||
if use_stat:
|
if watcher_type == 'stat':
|
||||||
|
pl.debug('Using requested stat-based watcher', prefix='watcher')
|
||||||
return StatWatch()
|
return StatWatch()
|
||||||
try:
|
if watcher_type == 'inotify':
|
||||||
|
# Explicitly selected inotify watcher: do not catch INotifyError then.
|
||||||
|
pl.debug('Using requested inotify watcher', prefix='watcher')
|
||||||
return INotifyWatch(expire_time=expire_time)
|
return INotifyWatch(expire_time=expire_time)
|
||||||
except INotifyError:
|
|
||||||
pass
|
if sys.platform.startswith('linux'):
|
||||||
|
try:
|
||||||
|
pl.debug('Trying to use inotify watcher', prefix='watcher')
|
||||||
|
return INotifyWatch(expire_time=expire_time)
|
||||||
|
except INotifyError:
|
||||||
|
pl.info('Failed to create inotify watcher', prefix='watcher')
|
||||||
|
|
||||||
|
pl.debug('Using stat-based watcher')
|
||||||
return StatWatch()
|
return StatWatch()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
watcher = create_file_watcher()
|
from powerline import get_fallback_logger
|
||||||
|
watcher = create_file_watcher(get_fallback_logger())
|
||||||
print ('Using watcher: %s' % watcher.__class__.__name__)
|
print ('Using watcher: %s' % watcher.__class__.__name__)
|
||||||
print ('Watching %s, press Ctrl-C to quit' % sys.argv[-1])
|
print ('Watching %s, press Ctrl-C to quit' % sys.argv[-1])
|
||||||
watcher.watch(sys.argv[-1])
|
watcher.watch(sys.argv[-1])
|
||||||
|
@ -23,22 +23,20 @@ def generate_directories(path):
|
|||||||
_file_watcher = None
|
_file_watcher = None
|
||||||
|
|
||||||
|
|
||||||
def file_watcher():
|
def file_watcher(create_watcher):
|
||||||
global _file_watcher
|
global _file_watcher
|
||||||
if _file_watcher is None:
|
if _file_watcher is None:
|
||||||
from powerline.lib.file_watcher import create_file_watcher
|
_file_watcher = create_watcher()
|
||||||
_file_watcher = create_file_watcher()
|
|
||||||
return _file_watcher
|
return _file_watcher
|
||||||
|
|
||||||
|
|
||||||
_branch_watcher = None
|
_branch_watcher = None
|
||||||
|
|
||||||
|
|
||||||
def branch_watcher():
|
def branch_watcher(create_watcher):
|
||||||
global _branch_watcher
|
global _branch_watcher
|
||||||
if _branch_watcher is None:
|
if _branch_watcher is None:
|
||||||
from powerline.lib.file_watcher import create_file_watcher
|
_branch_watcher = create_watcher()
|
||||||
_branch_watcher = create_file_watcher()
|
|
||||||
return _branch_watcher
|
return _branch_watcher
|
||||||
|
|
||||||
|
|
||||||
@ -47,11 +45,11 @@ branch_lock = Lock()
|
|||||||
file_status_lock = Lock()
|
file_status_lock = Lock()
|
||||||
|
|
||||||
|
|
||||||
def get_branch_name(directory, config_file, get_func):
|
def get_branch_name(directory, config_file, get_func, create_watcher):
|
||||||
global branch_name_cache
|
global branch_name_cache
|
||||||
with branch_lock:
|
with branch_lock:
|
||||||
# Check if the repo directory was moved/deleted
|
# Check if the repo directory was moved/deleted
|
||||||
fw = branch_watcher()
|
fw = branch_watcher(create_watcher)
|
||||||
is_watched = fw.is_watched(directory)
|
is_watched = fw.is_watched(directory)
|
||||||
try:
|
try:
|
||||||
changed = fw(directory)
|
changed = fw(directory)
|
||||||
@ -117,7 +115,7 @@ class FileStatusCache(dict):
|
|||||||
file_status_cache = FileStatusCache()
|
file_status_cache = FileStatusCache()
|
||||||
|
|
||||||
|
|
||||||
def get_file_status(directory, dirstate_file, file_path, ignore_file_name, get_func, extra_ignore_files=()):
|
def get_file_status(directory, dirstate_file, file_path, ignore_file_name, get_func, create_watcher, extra_ignore_files=()):
|
||||||
global file_status_cache
|
global file_status_cache
|
||||||
keypath = file_path if os.path.isabs(file_path) else os.path.join(directory, file_path)
|
keypath = file_path if os.path.isabs(file_path) else os.path.join(directory, file_path)
|
||||||
file_status_cache.update_maps(keypath, directory, dirstate_file, ignore_file_name, extra_ignore_files)
|
file_status_cache.update_maps(keypath, directory, dirstate_file, ignore_file_name, extra_ignore_files)
|
||||||
@ -129,7 +127,7 @@ def get_file_status(directory, dirstate_file, file_path, ignore_file_name, get_f
|
|||||||
return ans
|
return ans
|
||||||
|
|
||||||
# Check if any relevant files have changed
|
# Check if any relevant files have changed
|
||||||
file_changed = file_watcher()
|
file_changed = file_watcher(create_watcher)
|
||||||
changed = False
|
changed = False
|
||||||
# Check if dirstate has changed
|
# Check if dirstate has changed
|
||||||
try:
|
try:
|
||||||
@ -217,7 +215,7 @@ vcs_props = (
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def guess(path):
|
def guess(path, create_watcher):
|
||||||
for directory in generate_directories(path):
|
for directory in generate_directories(path):
|
||||||
for vcs, vcs_dir, check in vcs_props:
|
for vcs, vcs_dir, check in vcs_props:
|
||||||
repo_dir = os.path.join(directory, vcs_dir)
|
repo_dir = os.path.join(directory, vcs_dir)
|
||||||
@ -227,20 +225,28 @@ def guess(path):
|
|||||||
try:
|
try:
|
||||||
if vcs not in globals():
|
if vcs not in globals():
|
||||||
globals()[vcs] = getattr(__import__('powerline.lib.vcs', fromlist=[vcs]), vcs)
|
globals()[vcs] = getattr(__import__('powerline.lib.vcs', fromlist=[vcs]), vcs)
|
||||||
return globals()[vcs].Repository(directory)
|
return globals()[vcs].Repository(directory, create_watcher)
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def get_fallback_create_watcher():
|
||||||
|
from powerline.lib.file_watcher import create_file_watcher
|
||||||
|
from powerline import get_fallback_logger
|
||||||
|
from functools import partial
|
||||||
|
return partial(create_file_watcher, get_fallback_logger(), 'auto')
|
||||||
|
|
||||||
|
|
||||||
def debug():
|
def debug():
|
||||||
'''Test run guess(), repo.branch() and repo.status()
|
'''Test run guess(), repo.branch() and repo.status()
|
||||||
|
|
||||||
To use run python -c "from powerline.lib.vcs import debug; debug()"
|
To use::
|
||||||
some_file_to_watch '''
|
python -c "from powerline.lib.vcs import debug; debug()" some_file_to_watch.
|
||||||
|
'''
|
||||||
import sys
|
import sys
|
||||||
dest = sys.argv[-1]
|
dest = sys.argv[-1]
|
||||||
repo = guess(os.path.abspath(dest))
|
repo = guess(os.path.abspath(dest), get_fallback_create_watcher)
|
||||||
if repo is None:
|
if repo is None:
|
||||||
print ('%s is not a recognized vcs repo' % dest)
|
print ('%s is not a recognized vcs repo' % dest)
|
||||||
raise SystemExit(1)
|
raise SystemExit(1)
|
||||||
|
@ -39,10 +39,11 @@ state = None
|
|||||||
|
|
||||||
|
|
||||||
class Repository(object):
|
class Repository(object):
|
||||||
def __init__(self, directory):
|
def __init__(self, directory, create_watcher):
|
||||||
if isinstance(directory, bytes):
|
if isinstance(directory, bytes):
|
||||||
directory = directory.decode(sys.getfilesystemencoding() or sys.getdefaultencoding() or 'utf-8')
|
directory = directory.decode(sys.getfilesystemencoding() or sys.getdefaultencoding() or 'utf-8')
|
||||||
self.directory = os.path.abspath(directory)
|
self.directory = os.path.abspath(directory)
|
||||||
|
self.create_watcher = create_watcher
|
||||||
|
|
||||||
def status(self, path=None):
|
def status(self, path=None):
|
||||||
'''Return status of repository or file.
|
'''Return status of repository or file.
|
||||||
@ -57,8 +58,14 @@ class Repository(object):
|
|||||||
those returned by bzr status -S
|
those returned by bzr status -S
|
||||||
'''
|
'''
|
||||||
if path is not None:
|
if path is not None:
|
||||||
return get_file_status(self.directory, os.path.join(self.directory, '.bzr', 'checkout', 'dirstate'),
|
return get_file_status(
|
||||||
path, '.bzrignore', self.do_status)
|
directory=self.directory,
|
||||||
|
dirstate_file=os.path.join(self.directory, '.bzr', 'checkout', 'dirstate'),
|
||||||
|
file_path=path,
|
||||||
|
ignore_file_name='.bzrignore',
|
||||||
|
get_func=self.do_status,
|
||||||
|
create_watcher=self.create_watcher,
|
||||||
|
)
|
||||||
return self.do_status(self.directory, path)
|
return self.do_status(self.directory, path)
|
||||||
|
|
||||||
def do_status(self, directory, path):
|
def do_status(self, directory, path):
|
||||||
@ -93,4 +100,9 @@ class Repository(object):
|
|||||||
|
|
||||||
def branch(self):
|
def branch(self):
|
||||||
config_file = os.path.join(self.directory, '.bzr', 'branch', 'branch.conf')
|
config_file = os.path.join(self.directory, '.bzr', 'branch', 'branch.conf')
|
||||||
return get_branch_name(self.directory, config_file, branch_name_from_config_file)
|
return get_branch_name(
|
||||||
|
directory=self.directory,
|
||||||
|
config_file=config_file,
|
||||||
|
get_func=branch_name_from_config_file,
|
||||||
|
create_watcher=self.create_watcher,
|
||||||
|
)
|
||||||
|
@ -6,7 +6,7 @@ import os
|
|||||||
import sys
|
import sys
|
||||||
import re
|
import re
|
||||||
|
|
||||||
from powerline.lib.vcs import get_branch_name as _get_branch_name, get_file_status
|
from powerline.lib.vcs import get_branch_name, get_file_status
|
||||||
from powerline.lib.shell import readlines
|
from powerline.lib.shell import readlines
|
||||||
|
|
||||||
|
|
||||||
@ -40,32 +40,57 @@ def git_directory(directory):
|
|||||||
return path
|
return path
|
||||||
|
|
||||||
|
|
||||||
def get_branch_name(base_dir):
|
class GitRepository(object):
|
||||||
head = os.path.join(git_directory(base_dir), 'HEAD')
|
__slots__ = ('directory', 'create_watcher')
|
||||||
return _get_branch_name(base_dir, head, branch_name_from_config_file)
|
|
||||||
|
|
||||||
|
def __init__(self, directory, create_watcher):
|
||||||
|
self.directory = os.path.abspath(directory)
|
||||||
|
self.create_watcher = create_watcher
|
||||||
|
|
||||||
def do_status(directory, path, func):
|
def status(self, path=None):
|
||||||
if path:
|
'''Return status of repository or file.
|
||||||
gitd = git_directory(directory)
|
|
||||||
# We need HEAD as without it using fugitive to commit causes the
|
Without file argument: returns status of the repository:
|
||||||
# current file's status (and only the current file) to not be updated
|
|
||||||
# for some reason I cannot be bothered to figure out.
|
:First column: working directory status (D: dirty / space)
|
||||||
return get_file_status(
|
:Second column: index status (I: index dirty / space)
|
||||||
directory, os.path.join(gitd, 'index'),
|
:Third column: presence of untracked files (U: untracked files / space)
|
||||||
path, '.gitignore', func, extra_ignore_files=tuple(os.path.join(gitd, x) for x in ('logs/HEAD', 'info/exclude')))
|
:None: repository clean
|
||||||
return func(directory, path)
|
|
||||||
|
With file argument: returns status of this file. Output is
|
||||||
|
equivalent to the first two columns of "git status --porcelain"
|
||||||
|
(except for merge statuses as they are not supported by libgit2).
|
||||||
|
'''
|
||||||
|
if path:
|
||||||
|
gitd = git_directory(self.directory)
|
||||||
|
# We need HEAD as without it using fugitive to commit causes the
|
||||||
|
# current file's status (and only the current file) to not be updated
|
||||||
|
# for some reason I cannot be bothered to figure out.
|
||||||
|
return get_file_status(
|
||||||
|
directory=self.directory,
|
||||||
|
dirstate_file=os.path.join(gitd, 'index'),
|
||||||
|
file_path=path,
|
||||||
|
ignore_file_name='.gitignore',
|
||||||
|
get_func=self.do_status,
|
||||||
|
create_watcher=self.create_watcher,
|
||||||
|
extra_ignore_files=tuple(os.path.join(gitd, x) for x in ('logs/HEAD', 'info/exclude')),
|
||||||
|
)
|
||||||
|
return self.do_status(self.directory, path)
|
||||||
|
|
||||||
|
def branch(self):
|
||||||
|
head = os.path.join(git_directory(self.directory), 'HEAD')
|
||||||
|
return get_branch_name(
|
||||||
|
directory=self.directory,
|
||||||
|
config_file=head,
|
||||||
|
get_func=branch_name_from_config_file,
|
||||||
|
create_watcher=self.create_watcher,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
import pygit2 as git
|
import pygit2 as git
|
||||||
|
|
||||||
class Repository(object):
|
class Repository(GitRepository):
|
||||||
__slots__ = ('directory',)
|
|
||||||
|
|
||||||
def __init__(self, directory):
|
|
||||||
self.directory = os.path.abspath(directory)
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def ignore_event(path, name):
|
def ignore_event(path, name):
|
||||||
return False
|
return False
|
||||||
@ -121,32 +146,8 @@ try:
|
|||||||
index_column = 'I'
|
index_column = 'I'
|
||||||
r = wt_column + index_column + untracked_column
|
r = wt_column + index_column + untracked_column
|
||||||
return r if r != ' ' else None
|
return r if r != ' ' else None
|
||||||
|
|
||||||
def status(self, path=None):
|
|
||||||
'''Return status of repository or file.
|
|
||||||
|
|
||||||
Without file argument: returns status of the repository:
|
|
||||||
|
|
||||||
:First column: working directory status (D: dirty / space)
|
|
||||||
:Second column: index status (I: index dirty / space)
|
|
||||||
:Third column: presence of untracked files (U: untracked files / space)
|
|
||||||
:None: repository clean
|
|
||||||
|
|
||||||
With file argument: returns status of this file. Output is
|
|
||||||
equivalent to the first two columns of "git status --porcelain"
|
|
||||||
(except for merge statuses as they are not supported by libgit2).
|
|
||||||
'''
|
|
||||||
return do_status(self.directory, path, self.do_status)
|
|
||||||
|
|
||||||
def branch(self):
|
|
||||||
return get_branch_name(self.directory)
|
|
||||||
except ImportError:
|
except ImportError:
|
||||||
class Repository(object):
|
class Repository(GitRepository):
|
||||||
__slots__ = ('directory',)
|
|
||||||
|
|
||||||
def __init__(self, directory):
|
|
||||||
self.directory = os.path.abspath(directory)
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def ignore_event(path, name):
|
def ignore_event(path, name):
|
||||||
# Ignore changes to the index.lock file, since they happen
|
# Ignore changes to the index.lock file, since they happen
|
||||||
@ -182,9 +183,3 @@ except ImportError:
|
|||||||
|
|
||||||
r = wt_column + index_column + untracked_column
|
r = wt_column + index_column + untracked_column
|
||||||
return r if r != ' ' else None
|
return r if r != ' ' else None
|
||||||
|
|
||||||
def status(self, path=None):
|
|
||||||
return do_status(self.directory, path, self.do_status)
|
|
||||||
|
|
||||||
def branch(self):
|
|
||||||
return get_branch_name(self.directory)
|
|
||||||
|
@ -18,15 +18,16 @@ def branch_name_from_config_file(directory, config_file):
|
|||||||
|
|
||||||
|
|
||||||
class Repository(object):
|
class Repository(object):
|
||||||
__slots__ = ('directory', 'ui')
|
__slots__ = ('directory', 'ui', 'create_watcher')
|
||||||
|
|
||||||
statuses = 'MARDUI'
|
statuses = 'MARDUI'
|
||||||
repo_statuses = (1, 1, 1, 1, 2)
|
repo_statuses = (1, 1, 1, 1, 2)
|
||||||
repo_statuses_str = (None, 'D ', ' U', 'DU')
|
repo_statuses_str = (None, 'D ', ' U', 'DU')
|
||||||
|
|
||||||
def __init__(self, directory):
|
def __init__(self, directory, create_watcher):
|
||||||
self.directory = os.path.abspath(directory)
|
self.directory = os.path.abspath(directory)
|
||||||
self.ui = ui.ui()
|
self.ui = ui.ui()
|
||||||
|
self.create_watcher = create_watcher
|
||||||
|
|
||||||
def _repo(self, directory):
|
def _repo(self, directory):
|
||||||
# Cannot create this object once and use always: when repository updates
|
# Cannot create this object once and use always: when repository updates
|
||||||
@ -47,8 +48,14 @@ class Repository(object):
|
|||||||
"U"nknown, "I"gnored, (None)Clean.
|
"U"nknown, "I"gnored, (None)Clean.
|
||||||
'''
|
'''
|
||||||
if path:
|
if path:
|
||||||
return get_file_status(self.directory, os.path.join(self.directory, '.hg', 'dirstate'),
|
return get_file_status(
|
||||||
path, '.hgignore', self.do_status)
|
directory=self.directory,
|
||||||
|
dirstate_file=os.path.join(self.directory, '.hg', 'dirstate'),
|
||||||
|
file_path=path,
|
||||||
|
ignore_file_name='.hgignore',
|
||||||
|
get_func=self.do_status,
|
||||||
|
create_watcher=self.create_watcher,
|
||||||
|
)
|
||||||
return self.do_status(self.directory, path)
|
return self.do_status(self.directory, path)
|
||||||
|
|
||||||
def do_status(self, directory, path):
|
def do_status(self, directory, path):
|
||||||
@ -69,4 +76,9 @@ class Repository(object):
|
|||||||
|
|
||||||
def branch(self):
|
def branch(self):
|
||||||
config_file = os.path.join(self.directory, '.hg', 'branch')
|
config_file = os.path.join(self.directory, '.hg', 'branch')
|
||||||
return get_branch_name(self.directory, config_file, branch_name_from_config_file)
|
return get_branch_name(
|
||||||
|
directory=self.directory,
|
||||||
|
config_file=config_file,
|
||||||
|
get_func=branch_name_from_config_file,
|
||||||
|
create_watcher=self.create_watcher,
|
||||||
|
)
|
||||||
|
@ -460,13 +460,14 @@ main_spec = (Spec(
|
|||||||
# only for existence of the path, not for it being a directory
|
# only for existence of the path, not for it being a directory
|
||||||
paths=Spec().list((lambda value, *args: (True, True, not os.path.exists(os.path.expanduser(value.value)))),
|
paths=Spec().list((lambda value, *args: (True, True, not os.path.exists(os.path.expanduser(value.value)))),
|
||||||
lambda value: 'path does not exist: {0}'.format(value)).optional(),
|
lambda value: 'path does not exist: {0}'.format(value)).optional(),
|
||||||
log_file=Spec().type(str).func(lambda value, *args: (True, True, not os.path.isdir(os.path.dirname(os.path.expanduser(value)))),
|
log_file=Spec().type(unicode).func(lambda value, *args: (True, True, not os.path.isdir(os.path.dirname(os.path.expanduser(value)))),
|
||||||
lambda value: 'directory does not exist: {0}'.format(os.path.dirname(value))).optional(),
|
lambda value: 'directory does not exist: {0}'.format(os.path.dirname(value))).optional(),
|
||||||
log_level=Spec().re('^[A-Z]+$').func(lambda value, *args: (True, True, not hasattr(logging, value)),
|
log_level=Spec().re('^[A-Z]+$').func(lambda value, *args: (True, True, not hasattr(logging, value)),
|
||||||
lambda value: 'unknown debugging level {0}'.format(value)).optional(),
|
lambda value: 'unknown debugging level {0}'.format(value)).optional(),
|
||||||
log_format=Spec().type(str).optional(),
|
log_format=Spec().type(unicode).optional(),
|
||||||
interval=Spec().either(Spec().cmp('gt', 0.0), Spec().type(type(None))).optional(),
|
interval=Spec().either(Spec().cmp('gt', 0.0), Spec().type(type(None))).optional(),
|
||||||
reload_config=Spec().type(bool).optional(),
|
reload_config=Spec().type(bool).optional(),
|
||||||
|
watcher=Spec().type(unicode).oneof(set(('auto', 'inotify', 'stat'))).optional(),
|
||||||
).context_message('Error while loading common configuration (key {key})'),
|
).context_message('Error while loading common configuration (key {key})'),
|
||||||
ext=Spec(
|
ext=Spec(
|
||||||
vim=Spec(
|
vim=Spec(
|
||||||
|
@ -25,6 +25,8 @@ def getconfigargspec(obj):
|
|||||||
if (arg == 'self' or
|
if (arg == 'self' or
|
||||||
(arg == 'segment_info' and
|
(arg == 'segment_info' and
|
||||||
getattr(obj, 'powerline_requires_segment_info', None)) or
|
getattr(obj, 'powerline_requires_segment_info', None)) or
|
||||||
|
(arg == 'create_watcher' and
|
||||||
|
getattr(obj, 'powerline_requires_filesystem_watcher', None)) or
|
||||||
(arg == 'pl') or
|
(arg == 'pl') or
|
||||||
(method.startswith('render') and (1 if argspec.args[0] == 'self' else 0) + i == len(argspec.args)) or
|
(method.startswith('render') and (1 if argspec.args[0] == 'self' else 0) + i == len(argspec.args)) or
|
||||||
arg in args):
|
arg in args):
|
||||||
|
@ -1,6 +1,8 @@
|
|||||||
# vim:fileencoding=utf-8:noet
|
# vim:fileencoding=utf-8:noet
|
||||||
|
|
||||||
from __future__ import absolute_import
|
from __future__ import absolute_import, unicode_literals, division, print_function
|
||||||
|
|
||||||
|
from powerline.lib.file_watcher import create_file_watcher
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
|
|
||||||
@ -52,15 +54,15 @@ def get_attr_func(contents_func, key, args):
|
|||||||
return None
|
return None
|
||||||
else:
|
else:
|
||||||
if args is None:
|
if args is None:
|
||||||
return lambda : func()
|
return lambda: func()
|
||||||
else:
|
else:
|
||||||
return lambda pl, shutdown_event: func(pl=pl, shutdown_event=shutdown_event, **args)
|
return lambda pl, shutdown_event: func(pl=pl, shutdown_event=shutdown_event, **args)
|
||||||
|
|
||||||
|
|
||||||
def gen_segment_getter(pl, ext, path, theme_configs, default_module=None):
|
def gen_segment_getter(pl, ext, common_config, theme_configs, default_module=None):
|
||||||
data = {
|
data = {
|
||||||
'default_module': default_module or 'powerline.segments.' + ext,
|
'default_module': default_module or 'powerline.segments.' + ext,
|
||||||
'path': path,
|
'path': common_config['paths'],
|
||||||
}
|
}
|
||||||
|
|
||||||
def get_key(segment, module, key, default=None):
|
def get_key(segment, module, key, default=None):
|
||||||
@ -90,6 +92,10 @@ def gen_segment_getter(pl, ext, path, theme_configs, default_module=None):
|
|||||||
startup_func = get_attr_func(_contents_func, 'startup', args)
|
startup_func = get_attr_func(_contents_func, 'startup', args)
|
||||||
shutdown_func = get_attr_func(_contents_func, 'shutdown', None)
|
shutdown_func = get_attr_func(_contents_func, 'shutdown', None)
|
||||||
|
|
||||||
|
if hasattr(_contents_func, 'powerline_requires_filesystem_watcher'):
|
||||||
|
create_watcher = lambda: create_file_watcher(pl, common_config['watcher'])
|
||||||
|
args['create_watcher'] = create_watcher
|
||||||
|
|
||||||
if hasattr(_contents_func, 'powerline_requires_segment_info'):
|
if hasattr(_contents_func, 'powerline_requires_segment_info'):
|
||||||
contents_func = lambda pl, segment_info: _contents_func(pl=pl, segment_info=segment_info, **args)
|
contents_func = lambda pl, segment_info: _contents_func(pl=pl, segment_info=segment_info, **args)
|
||||||
else:
|
else:
|
||||||
|
@ -18,7 +18,7 @@ from powerline.lib.threaded import ThreadedSegment, KwThreadedSegment, with_docs
|
|||||||
from powerline.lib.monotonic import monotonic
|
from powerline.lib.monotonic import monotonic
|
||||||
from powerline.lib.humanize_bytes import humanize_bytes
|
from powerline.lib.humanize_bytes import humanize_bytes
|
||||||
from powerline.lib.unicode import u
|
from powerline.lib.unicode import u
|
||||||
from powerline.theme import requires_segment_info
|
from powerline.theme import requires_segment_info, requires_filesystem_watcher
|
||||||
from collections import namedtuple
|
from collections import namedtuple
|
||||||
|
|
||||||
|
|
||||||
@ -51,8 +51,9 @@ def hostname(pl, segment_info, only_if_ssh=False, exclude_domain=False):
|
|||||||
return socket.gethostname()
|
return socket.gethostname()
|
||||||
|
|
||||||
|
|
||||||
|
@requires_filesystem_watcher
|
||||||
@requires_segment_info
|
@requires_segment_info
|
||||||
def branch(pl, segment_info, status_colors=False):
|
def branch(pl, segment_info, create_watcher, status_colors=False):
|
||||||
'''Return the current VCS branch.
|
'''Return the current VCS branch.
|
||||||
|
|
||||||
:param bool status_colors:
|
:param bool status_colors:
|
||||||
@ -61,7 +62,7 @@ def branch(pl, segment_info, status_colors=False):
|
|||||||
Highlight groups used: ``branch_clean``, ``branch_dirty``, ``branch``.
|
Highlight groups used: ``branch_clean``, ``branch_dirty``, ``branch``.
|
||||||
'''
|
'''
|
||||||
name = segment_info['getcwd']()
|
name = segment_info['getcwd']()
|
||||||
repo = guess(path=name)
|
repo = guess(path=name, create_watcher=create_watcher)
|
||||||
if repo is not None:
|
if repo is not None:
|
||||||
branch = repo.branch()
|
branch = repo.branch()
|
||||||
scol = ['branch']
|
scol = ['branch']
|
||||||
|
@ -10,7 +10,7 @@ except ImportError:
|
|||||||
|
|
||||||
from powerline.bindings.vim import (vim_get_func, getbufvar, vim_getbufoption,
|
from powerline.bindings.vim import (vim_get_func, getbufvar, vim_getbufoption,
|
||||||
buffer_name, vim_getwinvar)
|
buffer_name, vim_getwinvar)
|
||||||
from powerline.theme import requires_segment_info
|
from powerline.theme import requires_segment_info, requires_filesystem_watcher
|
||||||
from powerline.lib import add_divider_highlight_group
|
from powerline.lib import add_divider_highlight_group
|
||||||
from powerline.lib.vcs import guess, tree_status
|
from powerline.lib.vcs import guess, tree_status
|
||||||
from powerline.lib.humanize_bytes import humanize_bytes
|
from powerline.lib.humanize_bytes import humanize_bytes
|
||||||
@ -368,8 +368,9 @@ def modified_buffers(pl, text='+ ', join_str=','):
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
@requires_filesystem_watcher
|
||||||
@requires_segment_info
|
@requires_segment_info
|
||||||
def branch(pl, segment_info, status_colors=False):
|
def branch(pl, segment_info, create_watcher, status_colors=False):
|
||||||
'''Return the current working branch.
|
'''Return the current working branch.
|
||||||
|
|
||||||
:param bool status_colors:
|
:param bool status_colors:
|
||||||
@ -382,7 +383,7 @@ def branch(pl, segment_info, status_colors=False):
|
|||||||
name = segment_info['buffer'].name
|
name = segment_info['buffer'].name
|
||||||
skip = not (name and (not vim_getbufoption(segment_info, 'buftype')))
|
skip = not (name and (not vim_getbufoption(segment_info, 'buftype')))
|
||||||
if not skip:
|
if not skip:
|
||||||
repo = guess(path=name)
|
repo = guess(path=name, create_watcher=create_watcher)
|
||||||
if repo is not None:
|
if repo is not None:
|
||||||
branch = repo.branch()
|
branch = repo.branch()
|
||||||
scol = ['branch']
|
scol = ['branch']
|
||||||
@ -396,8 +397,9 @@ def branch(pl, segment_info, status_colors=False):
|
|||||||
}]
|
}]
|
||||||
|
|
||||||
|
|
||||||
|
@requires_filesystem_watcher
|
||||||
@requires_segment_info
|
@requires_segment_info
|
||||||
def file_vcs_status(pl, segment_info):
|
def file_vcs_status(pl, segment_info, create_watcher):
|
||||||
'''Return the VCS status for this buffer.
|
'''Return the VCS status for this buffer.
|
||||||
|
|
||||||
Highlight groups used: ``file_vcs_status``.
|
Highlight groups used: ``file_vcs_status``.
|
||||||
@ -405,7 +407,7 @@ def file_vcs_status(pl, segment_info):
|
|||||||
name = segment_info['buffer'].name
|
name = segment_info['buffer'].name
|
||||||
skip = not (name and (not vim_getbufoption(segment_info, 'buftype')))
|
skip = not (name and (not vim_getbufoption(segment_info, 'buftype')))
|
||||||
if not skip:
|
if not skip:
|
||||||
repo = guess(path=name)
|
repo = guess(path=name, create_watcher=create_watcher)
|
||||||
if repo is not None:
|
if repo is not None:
|
||||||
status = repo.status(os.path.relpath(name, repo.directory))
|
status = repo.status(os.path.relpath(name, repo.directory))
|
||||||
if not status:
|
if not status:
|
||||||
|
@ -10,6 +10,11 @@ def requires_segment_info(func):
|
|||||||
return func
|
return func
|
||||||
|
|
||||||
|
|
||||||
|
def requires_filesystem_watcher(func):
|
||||||
|
func.powerline_requires_filesystem_watcher = True
|
||||||
|
return func
|
||||||
|
|
||||||
|
|
||||||
def new_empty_segment_line():
|
def new_empty_segment_line():
|
||||||
return {
|
return {
|
||||||
'left': [],
|
'left': [],
|
||||||
@ -37,7 +42,7 @@ class Theme(object):
|
|||||||
theme_configs = [theme_config]
|
theme_configs = [theme_config]
|
||||||
if top_theme_config:
|
if top_theme_config:
|
||||||
theme_configs.append(top_theme_config)
|
theme_configs.append(top_theme_config)
|
||||||
get_segment = gen_segment_getter(pl, ext, common_config['paths'], theme_configs, theme_config.get('default_module'))
|
get_segment = gen_segment_getter(pl, ext, common_config, theme_configs, theme_config.get('default_module'))
|
||||||
for segdict in itertools.chain((theme_config['segments'],),
|
for segdict in itertools.chain((theme_config['segments'],),
|
||||||
theme_config['segments'].get('above', ())):
|
theme_config['segments'].get('above', ())):
|
||||||
self.segments.append(new_empty_segment_line())
|
self.segments.append(new_empty_segment_line())
|
||||||
|
@ -138,7 +138,12 @@ def get_powerline_raw(PowerlineClass, **kwargs):
|
|||||||
else:
|
else:
|
||||||
renderer = SimpleRenderer
|
renderer = SimpleRenderer
|
||||||
pl = PowerlineClass(
|
pl = PowerlineClass(
|
||||||
config_loader=ConfigLoader(load=load_json_config, watcher=watcher, run_once=kwargs.get('run_once')),
|
config_loader=ConfigLoader(
|
||||||
|
load=load_json_config,
|
||||||
|
watcher=watcher,
|
||||||
|
watcher_type='test',
|
||||||
|
run_once=kwargs.get('run_once')
|
||||||
|
),
|
||||||
**kwargs
|
**kwargs
|
||||||
)
|
)
|
||||||
pl._watcher = watcher
|
pl._watcher = watcher
|
||||||
|
@ -23,6 +23,7 @@ config = {
|
|||||||
},
|
},
|
||||||
'spaces': 0,
|
'spaces': 0,
|
||||||
'interval': 0,
|
'interval': 0,
|
||||||
|
'watcher': 'test',
|
||||||
},
|
},
|
||||||
'ext': {
|
'ext': {
|
||||||
'test': {
|
'test': {
|
||||||
|
@ -4,7 +4,7 @@ import tests.vim as vim_module
|
|||||||
import powerline as powerline_module
|
import powerline as powerline_module
|
||||||
from tests import TestCase
|
from tests import TestCase
|
||||||
from tests.lib import replace_item
|
from tests.lib import replace_item
|
||||||
from tests.lib.config_mock import swap_attributes, get_powerline, pop_events
|
from tests.lib.config_mock import swap_attributes, get_powerline
|
||||||
from tests.lib.config_mock import get_powerline_raw
|
from tests.lib.config_mock import get_powerline_raw
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
from copy import deepcopy
|
from copy import deepcopy
|
||||||
@ -27,6 +27,7 @@ config = {
|
|||||||
},
|
},
|
||||||
'spaces': 0,
|
'spaces': 0,
|
||||||
'interval': 0,
|
'interval': 0,
|
||||||
|
'watcher': 'test',
|
||||||
},
|
},
|
||||||
'ext': {
|
'ext': {
|
||||||
'test': {
|
'test': {
|
||||||
|
@ -3,9 +3,11 @@ from __future__ import division
|
|||||||
|
|
||||||
from powerline.lib import mergedicts, add_divider_highlight_group, REMOVE_THIS_KEY
|
from powerline.lib import mergedicts, add_divider_highlight_group, REMOVE_THIS_KEY
|
||||||
from powerline.lib.humanize_bytes import humanize_bytes
|
from powerline.lib.humanize_bytes import humanize_bytes
|
||||||
from powerline.lib.vcs import guess
|
from powerline.lib.vcs import guess, get_fallback_create_watcher
|
||||||
from powerline.lib.threaded import ThreadedSegment, KwThreadedSegment
|
from powerline.lib.threaded import ThreadedSegment, KwThreadedSegment
|
||||||
from powerline.lib.monotonic import monotonic
|
from powerline.lib.monotonic import monotonic
|
||||||
|
from powerline.lib.file_watcher import create_file_watcher, INotifyError
|
||||||
|
from powerline import get_fallback_logger
|
||||||
import threading
|
import threading
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
@ -384,9 +386,9 @@ class TestFilesystemWatchers(TestCase):
|
|||||||
self.fail('The change to {0} was not detected'.format(path))
|
self.fail('The change to {0} was not detected'.format(path))
|
||||||
|
|
||||||
def test_file_watcher(self):
|
def test_file_watcher(self):
|
||||||
from powerline.lib.file_watcher import create_file_watcher
|
try:
|
||||||
w = create_file_watcher(use_stat=False)
|
w = create_file_watcher(pl=get_fallback_logger(), watcher_type='inotify')
|
||||||
if w.is_stat_based:
|
except INotifyError:
|
||||||
raise SkipTest('This test is not suitable for a stat based file watcher')
|
raise SkipTest('This test is not suitable for a stat based file watcher')
|
||||||
f1, f2, f3 = map(lambda x: os.path.join(INOTIFY_DIR, 'file%d' % x), (1, 2, 3))
|
f1, f2, f3 = map(lambda x: os.path.join(INOTIFY_DIR, 'file%d' % x), (1, 2, 3))
|
||||||
with open(f1, 'wb'):
|
with open(f1, 'wb'):
|
||||||
@ -485,7 +487,8 @@ class TestVCS(TestCase):
|
|||||||
self.assertEqual(ans, q)
|
self.assertEqual(ans, q)
|
||||||
|
|
||||||
def test_git(self):
|
def test_git(self):
|
||||||
repo = guess(path=GIT_REPO)
|
create_watcher = get_fallback_create_watcher()
|
||||||
|
repo = guess(path=GIT_REPO, create_watcher=create_watcher)
|
||||||
self.assertNotEqual(repo, None)
|
self.assertNotEqual(repo, None)
|
||||||
self.assertEqual(repo.branch(), 'master')
|
self.assertEqual(repo.branch(), 'master')
|
||||||
self.assertEqual(repo.status(), None)
|
self.assertEqual(repo.status(), None)
|
||||||
@ -520,7 +523,8 @@ class TestVCS(TestCase):
|
|||||||
|
|
||||||
if use_mercurial:
|
if use_mercurial:
|
||||||
def test_mercurial(self):
|
def test_mercurial(self):
|
||||||
repo = guess(path=HG_REPO)
|
create_watcher = get_fallback_create_watcher()
|
||||||
|
repo = guess(path=HG_REPO, create_watcher=create_watcher)
|
||||||
self.assertNotEqual(repo, None)
|
self.assertNotEqual(repo, None)
|
||||||
self.assertEqual(repo.branch(), 'default')
|
self.assertEqual(repo.branch(), 'default')
|
||||||
self.assertEqual(repo.status(), None)
|
self.assertEqual(repo.status(), None)
|
||||||
@ -536,7 +540,8 @@ class TestVCS(TestCase):
|
|||||||
|
|
||||||
if use_bzr:
|
if use_bzr:
|
||||||
def test_bzr(self):
|
def test_bzr(self):
|
||||||
repo = guess(path=BZR_REPO)
|
create_watcher = get_fallback_create_watcher()
|
||||||
|
repo = guess(path=BZR_REPO, create_watcher=create_watcher)
|
||||||
self.assertNotEqual(repo, None, 'No bzr repo found. Do you have bzr installed?')
|
self.assertNotEqual(repo, None, 'No bzr repo found. Do you have bzr installed?')
|
||||||
self.assertEqual(repo.branch(), 'test_powerline')
|
self.assertEqual(repo.branch(), 'test_powerline')
|
||||||
self.assertEqual(repo.status(), None)
|
self.assertEqual(repo.status(), None)
|
||||||
@ -587,7 +592,7 @@ class TestVCS(TestCase):
|
|||||||
os.mkdir(d)
|
os.mkdir(d)
|
||||||
call(['bzr', 'init', '-q'], cwd=d)
|
call(['bzr', 'init', '-q'], cwd=d)
|
||||||
call(['bzr', 'nick', '-q', x], cwd=d)
|
call(['bzr', 'nick', '-q', x], cwd=d)
|
||||||
repo = guess(path=d)
|
repo = guess(path=d, create_watcher=create_watcher)
|
||||||
self.assertEqual(repo.branch(), x)
|
self.assertEqual(repo.branch(), x)
|
||||||
self.assertFalse(repo.status())
|
self.assertFalse(repo.status())
|
||||||
if x == 'b1':
|
if x == 'b1':
|
||||||
@ -598,7 +603,7 @@ class TestVCS(TestCase):
|
|||||||
os.rename(os.path.join(BZR_REPO, 'b'), os.path.join(BZR_REPO, 'b2'))
|
os.rename(os.path.join(BZR_REPO, 'b'), os.path.join(BZR_REPO, 'b2'))
|
||||||
for x, y in (('b1', 'b2'), ('b2', 'b1')):
|
for x, y in (('b1', 'b2'), ('b2', 'b1')):
|
||||||
d = os.path.join(BZR_REPO, x)
|
d = os.path.join(BZR_REPO, x)
|
||||||
repo = guess(path=d)
|
repo = guess(path=d, create_watcher=create_watcher)
|
||||||
self.do_branch_rename_test(repo, y)
|
self.do_branch_rename_test(repo, y)
|
||||||
if x == 'b1':
|
if x == 'b1':
|
||||||
self.assertFalse(repo.status())
|
self.assertFalse(repo.status())
|
||||||
|
@ -3,9 +3,11 @@
|
|||||||
from __future__ import unicode_literals
|
from __future__ import unicode_literals
|
||||||
|
|
||||||
from powerline.segments import shell, common
|
from powerline.segments import shell, common
|
||||||
|
from powerline.lib.vcs import get_fallback_create_watcher
|
||||||
import tests.vim as vim_module
|
import tests.vim as vim_module
|
||||||
import sys
|
import sys
|
||||||
import os
|
import os
|
||||||
|
from functools import partial
|
||||||
from tests.lib import Args, urllib_read, replace_attr, new_module, replace_module_module, replace_env, Pl
|
from tests.lib import Args, urllib_read, replace_attr, new_module, replace_module_module, replace_env, Pl
|
||||||
from tests import TestCase
|
from tests import TestCase
|
||||||
|
|
||||||
@ -13,6 +15,16 @@ from tests import TestCase
|
|||||||
vim = None
|
vim = None
|
||||||
|
|
||||||
|
|
||||||
|
def get_dummy_guess(**kwargs):
|
||||||
|
if 'directory' in kwargs:
|
||||||
|
def guess(path, create_watcher):
|
||||||
|
return Args(branch=lambda: os.path.basename(path), **kwargs)
|
||||||
|
else:
|
||||||
|
def guess(path, create_watcher):
|
||||||
|
return Args(branch=lambda: os.path.basename(path), directory=path, **kwargs)
|
||||||
|
return guess
|
||||||
|
|
||||||
|
|
||||||
class TestShell(TestCase):
|
class TestShell(TestCase):
|
||||||
def test_last_status(self):
|
def test_last_status(self):
|
||||||
pl = Pl()
|
pl = Pl()
|
||||||
@ -201,23 +213,25 @@ class TestCommon(TestCase):
|
|||||||
|
|
||||||
def test_branch(self):
|
def test_branch(self):
|
||||||
pl = Pl()
|
pl = Pl()
|
||||||
|
create_watcher = get_fallback_create_watcher()
|
||||||
segment_info = {'getcwd': os.getcwd}
|
segment_info = {'getcwd': os.getcwd}
|
||||||
with replace_attr(common, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: None, directory='/tmp/tests')):
|
branch = partial(common.branch, pl=pl, create_watcher=create_watcher)
|
||||||
|
with replace_attr(common, 'guess', get_dummy_guess(status=lambda: None, directory='/tmp/tests')):
|
||||||
with replace_attr(common, 'tree_status', lambda repo, pl: None):
|
with replace_attr(common, 'tree_status', lambda repo, pl: None):
|
||||||
self.assertEqual(common.branch(pl=pl, segment_info=segment_info, status_colors=False),
|
self.assertEqual(branch(segment_info=segment_info, status_colors=False),
|
||||||
[{'highlight_group': ['branch'], 'contents': 'tests'}])
|
[{'highlight_group': ['branch'], 'contents': 'tests'}])
|
||||||
self.assertEqual(common.branch(pl=pl, segment_info=segment_info, status_colors=True),
|
self.assertEqual(branch(segment_info=segment_info, status_colors=True),
|
||||||
[{'contents': 'tests', 'highlight_group': ['branch_clean', 'branch']}])
|
[{'contents': 'tests', 'highlight_group': ['branch_clean', 'branch']}])
|
||||||
with replace_attr(common, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: 'D ', directory='/tmp/tests')):
|
with replace_attr(common, 'guess', get_dummy_guess(status=lambda: 'D ', directory='/tmp/tests')):
|
||||||
with replace_attr(common, 'tree_status', lambda repo, pl: 'D '):
|
with replace_attr(common, 'tree_status', lambda repo, pl: 'D '):
|
||||||
self.assertEqual(common.branch(pl=pl, segment_info=segment_info, status_colors=False),
|
self.assertEqual(branch(segment_info=segment_info, status_colors=False),
|
||||||
[{'highlight_group': ['branch'], 'contents': 'tests'}])
|
[{'highlight_group': ['branch'], 'contents': 'tests'}])
|
||||||
self.assertEqual(common.branch(pl=pl, segment_info=segment_info, status_colors=True),
|
self.assertEqual(branch(segment_info=segment_info, status_colors=True),
|
||||||
[{'contents': 'tests', 'highlight_group': ['branch_dirty', 'branch']}])
|
[{'contents': 'tests', 'highlight_group': ['branch_dirty', 'branch']}])
|
||||||
self.assertEqual(common.branch(pl=pl, segment_info=segment_info, status_colors=False),
|
self.assertEqual(branch(segment_info=segment_info, status_colors=False),
|
||||||
[{'highlight_group': ['branch'], 'contents': 'tests'}])
|
[{'highlight_group': ['branch'], 'contents': 'tests'}])
|
||||||
with replace_attr(common, 'guess', lambda path: None):
|
with replace_attr(common, 'guess', lambda path, create_watcher: None):
|
||||||
self.assertEqual(common.branch(pl=pl, segment_info=segment_info, status_colors=False), None)
|
self.assertEqual(branch(segment_info=segment_info, status_colors=False), None)
|
||||||
|
|
||||||
def test_cwd(self):
|
def test_cwd(self):
|
||||||
new_os = new_module('os', path=os.path, sep='/')
|
new_os = new_module('os', path=os.path, sep='/')
|
||||||
@ -705,32 +719,36 @@ class TestVim(TestCase):
|
|||||||
|
|
||||||
def test_branch(self):
|
def test_branch(self):
|
||||||
pl = Pl()
|
pl = Pl()
|
||||||
|
create_watcher = get_fallback_create_watcher()
|
||||||
|
branch = partial(vim.branch, pl=pl, create_watcher=create_watcher)
|
||||||
with vim_module._with('buffer', '/foo') as segment_info:
|
with vim_module._with('buffer', '/foo') as segment_info:
|
||||||
with replace_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: None, directory=path)):
|
with replace_attr(vim, 'guess', get_dummy_guess(status=lambda: None)):
|
||||||
with replace_attr(vim, 'tree_status', lambda repo, pl: None):
|
with replace_attr(vim, 'tree_status', lambda repo, pl: None):
|
||||||
self.assertEqual(vim.branch(pl=pl, segment_info=segment_info, status_colors=False),
|
self.assertEqual(branch(segment_info=segment_info, status_colors=False),
|
||||||
[{'divider_highlight_group': 'branch:divider', 'highlight_group': ['branch'], 'contents': 'foo'}])
|
[{'divider_highlight_group': 'branch:divider', 'highlight_group': ['branch'], 'contents': 'foo'}])
|
||||||
self.assertEqual(vim.branch(pl=pl, segment_info=segment_info, status_colors=True),
|
self.assertEqual(branch(segment_info=segment_info, status_colors=True),
|
||||||
[{'divider_highlight_group': 'branch:divider', 'highlight_group': ['branch_clean', 'branch'], 'contents': 'foo'}])
|
[{'divider_highlight_group': 'branch:divider', 'highlight_group': ['branch_clean', 'branch'], 'contents': 'foo'}])
|
||||||
with replace_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda: 'DU', directory=path)):
|
with replace_attr(vim, 'guess', get_dummy_guess(status=lambda: 'DU')):
|
||||||
with replace_attr(vim, 'tree_status', lambda repo, pl: 'DU'):
|
with replace_attr(vim, 'tree_status', lambda repo, pl: 'DU'):
|
||||||
self.assertEqual(vim.branch(pl=pl, segment_info=segment_info, status_colors=False),
|
self.assertEqual(branch(segment_info=segment_info, status_colors=False),
|
||||||
[{'divider_highlight_group': 'branch:divider', 'highlight_group': ['branch'], 'contents': 'foo'}])
|
[{'divider_highlight_group': 'branch:divider', 'highlight_group': ['branch'], 'contents': 'foo'}])
|
||||||
self.assertEqual(vim.branch(pl=pl, segment_info=segment_info, status_colors=True),
|
self.assertEqual(branch(segment_info=segment_info, status_colors=True),
|
||||||
[{'divider_highlight_group': 'branch:divider', 'highlight_group': ['branch_dirty', 'branch'], 'contents': 'foo'}])
|
[{'divider_highlight_group': 'branch:divider', 'highlight_group': ['branch_dirty', 'branch'], 'contents': 'foo'}])
|
||||||
|
|
||||||
def test_file_vcs_status(self):
|
def test_file_vcs_status(self):
|
||||||
pl = Pl()
|
pl = Pl()
|
||||||
|
create_watcher = get_fallback_create_watcher()
|
||||||
|
file_vcs_status = partial(vim.file_vcs_status, pl=pl, create_watcher=create_watcher)
|
||||||
with vim_module._with('buffer', '/foo') as segment_info:
|
with vim_module._with('buffer', '/foo') as segment_info:
|
||||||
with replace_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda file: 'M', directory=path)):
|
with replace_attr(vim, 'guess', get_dummy_guess(status=lambda file: 'M')):
|
||||||
self.assertEqual(vim.file_vcs_status(pl=pl, segment_info=segment_info),
|
self.assertEqual(file_vcs_status(segment_info=segment_info),
|
||||||
[{'highlight_group': ['file_vcs_status_M', 'file_vcs_status'], 'contents': 'M'}])
|
[{'highlight_group': ['file_vcs_status_M', 'file_vcs_status'], 'contents': 'M'}])
|
||||||
with replace_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda file: None, directory=path)):
|
with replace_attr(vim, 'guess', get_dummy_guess(status=lambda file: None)):
|
||||||
self.assertEqual(vim.file_vcs_status(pl=pl, segment_info=segment_info), None)
|
self.assertEqual(file_vcs_status(segment_info=segment_info), None)
|
||||||
with vim_module._with('buffer', '/bar') as segment_info:
|
with vim_module._with('buffer', '/bar') as segment_info:
|
||||||
with vim_module._with('bufoptions', buftype='nofile'):
|
with vim_module._with('bufoptions', buftype='nofile'):
|
||||||
with replace_attr(vim, 'guess', lambda path: Args(branch=lambda: os.path.basename(path), status=lambda file: 'M', directory=path)):
|
with replace_attr(vim, 'guess', get_dummy_guess(status=lambda file: 'M')):
|
||||||
self.assertEqual(vim.file_vcs_status(pl=pl, segment_info=segment_info), None)
|
self.assertEqual(file_vcs_status(segment_info=segment_info), None)
|
||||||
|
|
||||||
old_cwd = None
|
old_cwd = None
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user