Merge pull request #994 from ZyX-I/refactor-watcher

Refactor watchers code
This commit is contained in:
Nikolai Aleksandrovich Pavlov 2014-08-16 15:16:51 +04:00
commit c2c2d7efca
13 changed files with 613 additions and 573 deletions

View File

@ -1,7 +1,7 @@
# vim:fileencoding=utf-8:noet
from powerline.lib.threaded import MultiRunnedThread
from powerline.lib.file_watcher import create_file_watcher
from powerline.lib.watcher import create_file_watcher
from copy import deepcopy
from threading import Event, Lock

View File

@ -1,238 +0,0 @@
# vim:fileencoding=utf-8:noet
from __future__ import unicode_literals, absolute_import
__copyright__ = '2013, Kovid Goyal <kovid at kovidgoyal.net>'
__docformat__ = 'restructuredtext en'
import os
import sys
import errno
from time import sleep
from threading import RLock
from powerline.lib.monotonic import monotonic
from powerline.lib.inotify import INotify, INotifyError
def realpath(path):
return os.path.abspath(os.path.realpath(path))
class INotifyWatch(INotify):
def __init__(self, expire_time=10):
super(INotifyWatch, self).__init__()
self.watches = {}
self.modified = {}
self.last_query = {}
self.lock = RLock()
self.expire_time = expire_time * 60
def expire_watches(self):
now = monotonic()
for path, last_query in tuple(self.last_query.items()):
if last_query - now > self.expire_time:
self.unwatch(path)
def process_event(self, wd, mask, cookie, name):
if wd == -1 and (mask & self.Q_OVERFLOW):
# We missed some INOTIFY events, so we dont
# know the state of any tracked files.
for path in tuple(self.modified):
if os.path.exists(path):
self.modified[path] = True
else:
self.watches.pop(path, None)
self.modified.pop(path, None)
self.last_query.pop(path, None)
return
for path, num in tuple(self.watches.items()):
if num == wd:
if mask & self.IGNORED:
self.watches.pop(path, None)
self.modified.pop(path, None)
self.last_query.pop(path, None)
else:
if mask & self.ATTRIB:
# The watched file could have had its inode changed, in
# which case we will not get any more events for this
# file, so re-register the watch. For example by some
# other file being renamed as this file.
try:
self.unwatch(path)
except OSError:
pass
try:
self.watch(path)
except OSError as e:
if getattr(e, 'errno', None) != errno.ENOENT:
raise
else:
self.modified[path] = True
else:
self.modified[path] = True
def unwatch(self, path):
''' Remove the watch for path. Raises an OSError if removing the watch
fails for some reason. '''
path = realpath(path)
with self.lock:
self.modified.pop(path, None)
self.last_query.pop(path, None)
wd = self.watches.pop(path, None)
if wd is not None:
if self._rm_watch(self._inotify_fd, wd) != 0:
self.handle_error()
def watch(self, path):
''' Register a watch for the file/directory named path. Raises an OSError if path
does not exist. '''
import ctypes
path = realpath(path)
with self.lock:
if path not in self.watches:
bpath = path if isinstance(path, bytes) else path.encode(self.fenc)
flags = self.MOVE_SELF | self.DELETE_SELF
buf = ctypes.c_char_p(bpath)
# Try watching path as a directory
wd = self._add_watch(self._inotify_fd, buf, flags | self.ONLYDIR)
if wd == -1:
eno = ctypes.get_errno()
if eno != errno.ENOTDIR:
self.handle_error()
# Try watching path as a file
flags |= (self.MODIFY | self.ATTRIB)
wd = self._add_watch(self._inotify_fd, buf, flags)
if wd == -1:
self.handle_error()
self.watches[path] = wd
self.modified[path] = False
def is_watched(self, path):
with self.lock:
return realpath(path) in self.watches
def __call__(self, path):
''' Return True if path has been modified since the last call. Can
raise OSError if the path does not exist. '''
path = realpath(path)
with self.lock:
self.last_query[path] = monotonic()
self.expire_watches()
if path not in self.watches:
# Try to re-add the watch, it will fail if the file does not
# exist/you dont have permission
self.watch(path)
return True
self.read(get_name=False)
if path not in self.modified:
# An ignored event was received which means the path has been
# automatically unwatched
return True
ans = self.modified[path]
if ans:
self.modified[path] = False
return ans
def close(self):
with self.lock:
for path in tuple(self.watches):
try:
self.unwatch(path)
except OSError:
pass
super(INotifyWatch, self).close()
class StatWatch(object):
def __init__(self):
self.watches = {}
self.lock = RLock()
def watch(self, path):
path = realpath(path)
with self.lock:
self.watches[path] = os.path.getmtime(path)
def unwatch(self, path):
path = realpath(path)
with self.lock:
self.watches.pop(path, None)
def is_watched(self, path):
with self.lock:
return realpath(path) in self.watches
def __call__(self, path):
path = realpath(path)
with self.lock:
if path not in self.watches:
self.watches[path] = os.path.getmtime(path)
return True
mtime = os.path.getmtime(path)
if mtime != self.watches[path]:
self.watches[path] = mtime
return True
return False
def close(self):
with self.lock:
self.watches.clear()
def create_file_watcher(pl, watcher_type='auto', expire_time=10):
'''
Create an object that can watch for changes to specified files
Use ``.__call__()`` method of the returned object to start watching the file
or check whether file has changed since last call.
Use ``.unwatch()`` method of the returned object to stop watching the file.
Uses inotify if available, otherwise tracks mtimes. expire_time is the
number of minutes after the last query for a given path for the inotify
watch for that path to be automatically removed. This conserves kernel
resources.
:param PowerlineLogger pl:
Logger.
:param str watcher_type:
One of ``inotify`` (linux only), ``stat``, ``auto``. Determines what
watcher will be used. ``auto`` will use ``inotify`` if available.
:param int expire_time:
Number of minutes since last ``.__call__()`` before inotify watcher will
stop watching given file.
'''
if watcher_type == 'stat':
pl.debug('Using requested stat-based watcher', prefix='watcher')
return StatWatch()
if watcher_type == 'inotify':
# Explicitly selected inotify watcher: do not catch INotifyError then.
pl.debug('Using requested inotify watcher', prefix='watcher')
return INotifyWatch(expire_time=expire_time)
if sys.platform.startswith('linux'):
try:
pl.debug('Trying to use inotify watcher', prefix='watcher')
return INotifyWatch(expire_time=expire_time)
except INotifyError:
pl.info('Failed to create inotify watcher', prefix='watcher')
pl.debug('Using stat-based watcher')
return StatWatch()
if __name__ == '__main__':
from powerline import get_fallback_logger
watcher = create_file_watcher(get_fallback_logger())
print ('Using watcher: %s' % watcher.__class__.__name__)
print ('Watching %s, press Ctrl-C to quit' % sys.argv[-1])
watcher.watch(sys.argv[-1])
try:
while True:
if watcher(sys.argv[-1]):
print ('%s has changed' % sys.argv[-1])
sleep(1)
except KeyboardInterrupt:
pass
watcher.close()

View File

@ -7,6 +7,10 @@ __docformat__ = 'restructuredtext en'
import sys
import os
import errno
import ctypes
import struct
from ctypes.util import find_library
class INotifyError(Exception):
@ -28,10 +32,8 @@ def load_inotify():
raise INotifyError('INotify not available on windows')
if sys.platform == 'darwin':
raise INotifyError('INotify not available on OS X')
import ctypes
if not hasattr(ctypes, 'c_ssize_t'):
raise INotifyError('You need python >= 2.7 to use inotify')
from ctypes.util import find_library
name = find_library('c')
if not name:
raise INotifyError('Cannot find C library')
@ -107,8 +109,6 @@ class INotify(object):
NONBLOCK = 0x800
def __init__(self, cloexec=True, nonblock=True):
import ctypes
import struct
self._init1, self._add_watch, self._rm_watch, self._read = load_inotify()
flags = 0
if cloexec:
@ -130,7 +130,6 @@ class INotify(object):
self.os = os
def handle_error(self):
import ctypes
eno = ctypes.get_errno()
extra = ''
if eno == errno.ENOSPC:
@ -155,7 +154,6 @@ class INotify(object):
del self._inotify_fd
def read(self, get_name=True):
import ctypes
buf = []
while True:
num = self._read(self._inotify_fd, self._buf, len(self._buf))

8
powerline/lib/path.py Normal file
View File

@ -0,0 +1,8 @@
# vim:fileencoding=utf-8:noet
from __future__ import unicode_literals, absolute_import
import os
def realpath(path):
return os.path.abspath(os.path.realpath(path))

View File

@ -1,221 +0,0 @@
# vim:fileencoding=utf-8:noet
from __future__ import (unicode_literals, absolute_import, print_function)
__copyright__ = '2013, Kovid Goyal <kovid at kovidgoyal.net>'
__docformat__ = 'restructuredtext en'
import sys
import os
import errno
from time import sleep
from powerline.lib.monotonic import monotonic
from powerline.lib.inotify import INotify, INotifyError
class NoSuchDir(ValueError):
pass
class BaseDirChanged(ValueError):
pass
class DirTooLarge(ValueError):
def __init__(self, bdir):
ValueError.__init__(self, 'The directory {0} is too large to monitor. Try increasing the value in /proc/sys/fs/inotify/max_user_watches'.format(bdir))
def realpath(path):
return os.path.abspath(os.path.realpath(path))
class INotifyTreeWatcher(INotify):
is_dummy = False
def __init__(self, basedir, ignore_event=None):
super(INotifyTreeWatcher, self).__init__()
self.basedir = realpath(basedir)
self.watch_tree()
self.modified = True
self.ignore_event = (lambda path, name: False) if ignore_event is None else ignore_event
def watch_tree(self):
self.watched_dirs = {}
self.watched_rmap = {}
try:
self.add_watches(self.basedir)
except OSError as e:
if e.errno == errno.ENOSPC:
raise DirTooLarge(self.basedir)
def add_watches(self, base, top_level=True):
''' Add watches for this directory and all its descendant directories,
recursively. '''
base = realpath(base)
# There may exist a link which leads to an endless
# add_watches loop or to maximum recursion depth exceeded
if not top_level and base in self.watched_dirs:
return
try:
is_dir = self.add_watch(base)
except OSError as e:
if e.errno == errno.ENOENT:
# The entry could have been deleted between listdir() and
# add_watch().
if top_level:
raise NoSuchDir('The dir {0} does not exist'.format(base))
return
if e.errno == errno.EACCES:
# We silently ignore entries for which we dont have permission,
# unless they are the top level dir
if top_level:
raise NoSuchDir('You do not have permission to monitor {0}'.format(base))
return
raise
else:
if is_dir:
try:
files = os.listdir(base)
except OSError as e:
if e.errno in (errno.ENOTDIR, errno.ENOENT):
# The dir was deleted/replaced between the add_watch()
# and listdir()
if top_level:
raise NoSuchDir('The dir {0} does not exist'.format(base))
return
raise
for x in files:
self.add_watches(os.path.join(base, x), top_level=False)
elif top_level:
# The top level dir is a file, not good.
raise NoSuchDir('The dir {0} does not exist'.format(base))
def add_watch(self, path):
import ctypes
bpath = path if isinstance(path, bytes) else path.encode(self.fenc)
wd = self._add_watch(
self._inotify_fd, ctypes.c_char_p(bpath),
# Ignore symlinks and watch only directories
self.DONT_FOLLOW | self.ONLYDIR |
self.MODIFY | self.CREATE | self.DELETE |
self.MOVE_SELF | self.MOVED_FROM | self.MOVED_TO |
self.ATTRIB | self.DELETE_SELF
)
if wd == -1:
eno = ctypes.get_errno()
if eno == errno.ENOTDIR:
return False
raise OSError(eno, 'Failed to add watch for: {0}: {1}'.format(path, self.os.strerror(eno)))
self.watched_dirs[path] = wd
self.watched_rmap[wd] = path
return True
def process_event(self, wd, mask, cookie, name):
if wd == -1 and (mask & self.Q_OVERFLOW):
# We missed some INOTIFY events, so we dont
# know the state of any tracked dirs.
self.watch_tree()
self.modified = True
return
path = self.watched_rmap.get(wd, None)
if path is not None:
if not self.ignore_event(path, name):
self.modified = True
if mask & self.CREATE:
# A new sub-directory might have been created, monitor it.
try:
self.add_watch(os.path.join(path, name))
except OSError as e:
if e.errno == errno.ENOENT:
# Deleted before add_watch()
pass
elif e.errno == errno.ENOSPC:
raise DirTooLarge(self.basedir)
else:
raise
if (mask & self.DELETE_SELF or mask & self.MOVE_SELF) and path == self.basedir:
raise BaseDirChanged('The directory %s was moved/deleted' % path)
def __call__(self):
self.read()
ret = self.modified
self.modified = False
return ret
class DummyTreeWatcher(object):
is_dummy = True
def __init__(self, basedir):
self.basedir = realpath(basedir)
def __call__(self):
return False
class TreeWatcher(object):
def __init__(self, expire_time=10):
self.watches = {}
self.last_query_times = {}
self.expire_time = expire_time * 60
def watch(self, path, logger=None, ignore_event=None):
path = realpath(path)
try:
w = INotifyTreeWatcher(path, ignore_event=ignore_event)
except (INotifyError, DirTooLarge) as e:
if logger is not None and not isinstance(e, INotifyError):
logger.warn('Failed to watch path: {0} with error: {1}'.format(path, e))
w = DummyTreeWatcher(path)
self.watches[path] = w
return w
def is_actually_watched(self, path):
w = self.watches.get(path, None)
return not getattr(w, 'is_dummy', True)
def expire_old_queries(self):
pop = []
now = monotonic()
for path, lt in self.last_query_times.items():
if now - lt > self.expire_time:
pop.append(path)
for path in pop:
del self.last_query_times[path]
def __call__(self, path, logger=None, ignore_event=None):
path = realpath(path)
self.expire_old_queries()
self.last_query_times[path] = monotonic()
w = self.watches.get(path, None)
if w is None:
try:
self.watch(path, logger=logger, ignore_event=ignore_event)
except NoSuchDir:
pass
return True
try:
return w()
except BaseDirChanged:
self.watches.pop(path, None)
return True
except DirTooLarge as e:
if logger is not None:
logger.warn(str(e))
self.watches[path] = DummyTreeWatcher(path)
return False
if __name__ == '__main__':
w = INotifyTreeWatcher(sys.argv[-1])
w()
print ('Monitoring', sys.argv[-1], 'press Ctrl-C to stop')
try:
while True:
if w():
print (sys.argv[-1], 'changed')
sleep(1)
except KeyboardInterrupt:
raise SystemExit(0)

View File

@ -6,6 +6,8 @@ import errno
from threading import Lock
from collections import defaultdict
from powerline.lib.watcher import create_tree_watcher
def generate_directories(path):
if os.path.isdir(path):
@ -178,9 +180,9 @@ def get_file_status(directory, dirstate_file, file_path, ignore_file_name, get_f
class TreeStatusCache(dict):
def __init__(self):
from powerline.lib.tree_watcher import TreeWatcher
self.tw = TreeWatcher()
def __init__(self, pl):
self.tw = create_tree_watcher(pl)
self.pl = pl
def cache_and_get(self, key, status):
ans = self.get(key, self)
@ -188,24 +190,24 @@ class TreeStatusCache(dict):
ans = self[key] = status()
return ans
def __call__(self, repo, logger):
def __call__(self, repo):
key = repo.directory
try:
if self.tw(key, logger=logger, ignore_event=getattr(repo, 'ignore_event', None)):
if self.tw(key, ignore_event=getattr(repo, 'ignore_event', None)):
self.pop(key, None)
except OSError as e:
logger.warn('Failed to check %s for changes, with error: %s' % key, e)
self.pl.warn('Failed to check %s for changes, with error: %s' % key, e)
return self.cache_and_get(key, repo.status)
_tree_status_cache = None
def tree_status(repo, logger):
def tree_status(repo, pl):
global _tree_status_cache
if _tree_status_cache is None:
_tree_status_cache = TreeStatusCache()
return _tree_status_cache(repo, logger)
_tree_status_cache = TreeStatusCache(pl)
return _tree_status_cache(repo)
vcs_props = (
@ -232,7 +234,7 @@ def guess(path, create_watcher):
def get_fallback_create_watcher():
from powerline.lib.file_watcher import create_file_watcher
from powerline.lib.watcher import create_file_watcher
from powerline import get_fallback_logger
from functools import partial
return partial(create_file_watcher, get_fallback_logger(), 'auto')

View File

@ -0,0 +1,66 @@
# vim:fileencoding=utf-8:noet
from __future__ import unicode_literals, absolute_import
import sys
from powerline.lib.watcher.stat import StatFileWatcher
from powerline.lib.watcher.inotify import INotifyFileWatcher
from powerline.lib.watcher.tree import TreeWatcher
from powerline.lib.inotify import INotifyError
def create_file_watcher(pl, watcher_type='auto', expire_time=10):
'''
Create an object that can watch for changes to specified files
Use ``.__call__()`` method of the returned object to start watching the file
or check whether file has changed since last call.
Use ``.unwatch()`` method of the returned object to stop watching the file.
Uses inotify if available, otherwise tracks mtimes. expire_time is the
number of minutes after the last query for a given path for the inotify
watch for that path to be automatically removed. This conserves kernel
resources.
:param PowerlineLogger pl:
Logger.
:param str watcher_type:
One of ``inotify`` (linux only), ``stat``, ``auto``. Determines what
watcher will be used. ``auto`` will use ``inotify`` if available.
:param int expire_time:
Number of minutes since last ``.__call__()`` before inotify watcher will
stop watching given file.
'''
if watcher_type == 'stat':
pl.debug('Using requested stat-based watcher', prefix='watcher')
return StatFileWatcher()
if watcher_type == 'inotify':
# Explicitly selected inotify watcher: do not catch INotifyError then.
pl.debug('Using requested inotify watcher', prefix='watcher')
return INotifyFileWatcher(expire_time=expire_time)
if sys.platform.startswith('linux'):
try:
pl.debug('Trying to use inotify watcher', prefix='watcher')
return INotifyFileWatcher(expire_time=expire_time)
except INotifyError:
pl.info('Failed to create inotify watcher', prefix='watcher')
pl.debug('Using stat-based watcher')
return StatFileWatcher()
def create_tree_watcher(pl, watcher_type='auto', expire_time=10):
'''Create an object that can watch for changes in specified directories
:param PowerlineLogger pl:
Logger.
:param str watcher_type:
Watcher type. Currently the only supported types are ``inotify`` (linux
only), ``dummy`` and ``auto``.
:param int expire_time:
Number of minutes since last ``.__call__()`` before inotify watcher will
stop watching given file.
'''
return TreeWatcher(pl, watcher_type, expire_time)

View File

@ -0,0 +1,262 @@
# vim:fileencoding=utf-8:noet
from __future__ import unicode_literals, absolute_import
import errno
import os
import ctypes
from threading import RLock
from powerline.lib.inotify import INotify
from powerline.lib.monotonic import monotonic
from powerline.lib.path import realpath
class INotifyFileWatcher(INotify):
def __init__(self, expire_time=10):
super(INotifyFileWatcher, self).__init__()
self.watches = {}
self.modified = {}
self.last_query = {}
self.lock = RLock()
self.expire_time = expire_time * 60
def expire_watches(self):
now = monotonic()
for path, last_query in tuple(self.last_query.items()):
if last_query - now > self.expire_time:
self.unwatch(path)
def process_event(self, wd, mask, cookie, name):
if wd == -1 and (mask & self.Q_OVERFLOW):
# We missed some INOTIFY events, so we dont
# know the state of any tracked files.
for path in tuple(self.modified):
if os.path.exists(path):
self.modified[path] = True
else:
self.watches.pop(path, None)
self.modified.pop(path, None)
self.last_query.pop(path, None)
return
for path, num in tuple(self.watches.items()):
if num == wd:
if mask & self.IGNORED:
self.watches.pop(path, None)
self.modified.pop(path, None)
self.last_query.pop(path, None)
else:
if mask & self.ATTRIB:
# The watched file could have had its inode changed, in
# which case we will not get any more events for this
# file, so re-register the watch. For example by some
# other file being renamed as this file.
try:
self.unwatch(path)
except OSError:
pass
try:
self.watch(path)
except OSError as e:
if getattr(e, 'errno', None) != errno.ENOENT:
raise
else:
self.modified[path] = True
else:
self.modified[path] = True
def unwatch(self, path):
''' Remove the watch for path. Raises an OSError if removing the watch
fails for some reason. '''
path = realpath(path)
with self.lock:
self.modified.pop(path, None)
self.last_query.pop(path, None)
wd = self.watches.pop(path, None)
if wd is not None:
if self._rm_watch(self._inotify_fd, wd) != 0:
self.handle_error()
def watch(self, path):
''' Register a watch for the file/directory named path. Raises an OSError if path
does not exist. '''
path = realpath(path)
with self.lock:
if path not in self.watches:
bpath = path if isinstance(path, bytes) else path.encode(self.fenc)
flags = self.MOVE_SELF | self.DELETE_SELF
buf = ctypes.c_char_p(bpath)
# Try watching path as a directory
wd = self._add_watch(self._inotify_fd, buf, flags | self.ONLYDIR)
if wd == -1:
eno = ctypes.get_errno()
if eno != errno.ENOTDIR:
self.handle_error()
# Try watching path as a file
flags |= (self.MODIFY | self.ATTRIB)
wd = self._add_watch(self._inotify_fd, buf, flags)
if wd == -1:
self.handle_error()
self.watches[path] = wd
self.modified[path] = False
def is_watched(self, path):
with self.lock:
return realpath(path) in self.watches
def __call__(self, path):
''' Return True if path has been modified since the last call. Can
raise OSError if the path does not exist. '''
path = realpath(path)
with self.lock:
self.last_query[path] = monotonic()
self.expire_watches()
if path not in self.watches:
# Try to re-add the watch, it will fail if the file does not
# exist/you dont have permission
self.watch(path)
return True
self.read(get_name=False)
if path not in self.modified:
# An ignored event was received which means the path has been
# automatically unwatched
return True
ans = self.modified[path]
if ans:
self.modified[path] = False
return ans
def close(self):
with self.lock:
for path in tuple(self.watches):
try:
self.unwatch(path)
except OSError:
pass
super(INotifyFileWatcher, self).close()
class NoSuchDir(ValueError):
pass
class BaseDirChanged(ValueError):
pass
class DirTooLarge(ValueError):
def __init__(self, bdir):
ValueError.__init__(self, 'The directory {0} is too large to monitor. Try increasing the value in /proc/sys/fs/inotify/max_user_watches'.format(bdir))
class INotifyTreeWatcher(INotify):
is_dummy = False
def __init__(self, basedir, ignore_event=None):
super(INotifyTreeWatcher, self).__init__()
self.basedir = realpath(basedir)
self.watch_tree()
self.modified = True
self.ignore_event = (lambda path, name: False) if ignore_event is None else ignore_event
def watch_tree(self):
self.watched_dirs = {}
self.watched_rmap = {}
try:
self.add_watches(self.basedir)
except OSError as e:
if e.errno == errno.ENOSPC:
raise DirTooLarge(self.basedir)
def add_watches(self, base, top_level=True):
''' Add watches for this directory and all its descendant directories,
recursively. '''
base = realpath(base)
# There may exist a link which leads to an endless
# add_watches loop or to maximum recursion depth exceeded
if not top_level and base in self.watched_dirs:
return
try:
is_dir = self.add_watch(base)
except OSError as e:
if e.errno == errno.ENOENT:
# The entry could have been deleted between listdir() and
# add_watch().
if top_level:
raise NoSuchDir('The dir {0} does not exist'.format(base))
return
if e.errno == errno.EACCES:
# We silently ignore entries for which we dont have permission,
# unless they are the top level dir
if top_level:
raise NoSuchDir('You do not have permission to monitor {0}'.format(base))
return
raise
else:
if is_dir:
try:
files = os.listdir(base)
except OSError as e:
if e.errno in (errno.ENOTDIR, errno.ENOENT):
# The dir was deleted/replaced between the add_watch()
# and listdir()
if top_level:
raise NoSuchDir('The dir {0} does not exist'.format(base))
return
raise
for x in files:
self.add_watches(os.path.join(base, x), top_level=False)
elif top_level:
# The top level dir is a file, not good.
raise NoSuchDir('The dir {0} does not exist'.format(base))
def add_watch(self, path):
bpath = path if isinstance(path, bytes) else path.encode(self.fenc)
wd = self._add_watch(self._inotify_fd, ctypes.c_char_p(bpath),
# Ignore symlinks and watch only directories
self.DONT_FOLLOW | self.ONLYDIR |
self.MODIFY | self.CREATE | self.DELETE |
self.MOVE_SELF | self.MOVED_FROM | self.MOVED_TO |
self.ATTRIB | self.DELETE_SELF)
if wd == -1:
eno = ctypes.get_errno()
if eno == errno.ENOTDIR:
return False
raise OSError(eno, 'Failed to add watch for: {0}: {1}'.format(path, self.os.strerror(eno)))
self.watched_dirs[path] = wd
self.watched_rmap[wd] = path
return True
def process_event(self, wd, mask, cookie, name):
if wd == -1 and (mask & self.Q_OVERFLOW):
# We missed some INOTIFY events, so we dont
# know the state of any tracked dirs.
self.watch_tree()
self.modified = True
return
path = self.watched_rmap.get(wd, None)
if path is not None:
if not self.ignore_event(path, name):
self.modified = True
if mask & self.CREATE:
# A new sub-directory might have been created, monitor it.
try:
self.add_watch(os.path.join(path, name))
except OSError as e:
if e.errno == errno.ENOENT:
# Deleted before add_watch()
pass
elif e.errno == errno.ENOSPC:
raise DirTooLarge(self.basedir)
else:
raise
if (mask & self.DELETE_SELF or mask & self.MOVE_SELF) and path == self.basedir:
raise BaseDirChanged('The directory %s was moved/deleted' % path)
def __call__(self):
self.read()
ret = self.modified
self.modified = False
return ret

View File

@ -0,0 +1,44 @@
# vim:fileencoding=utf-8:noet
from __future__ import unicode_literals, absolute_import
import os
from threading import RLock
from powerline.lib.path import realpath
class StatFileWatcher(object):
def __init__(self):
self.watches = {}
self.lock = RLock()
def watch(self, path):
path = realpath(path)
with self.lock:
self.watches[path] = os.path.getmtime(path)
def unwatch(self, path):
path = realpath(path)
with self.lock:
self.watches.pop(path, None)
def is_watched(self, path):
with self.lock:
return realpath(path) in self.watches
def __call__(self, path):
path = realpath(path)
with self.lock:
if path not in self.watches:
self.watches[path] = os.path.getmtime(path)
return True
mtime = os.path.getmtime(path)
if mtime != self.watches[path]:
self.watches[path] = mtime
return True
return False
def close(self):
with self.lock:
self.watches.clear()

View File

@ -0,0 +1,83 @@
# vim:fileencoding=utf-8:noet
from __future__ import (unicode_literals, absolute_import, print_function)
import sys
from powerline.lib.monotonic import monotonic
from powerline.lib.inotify import INotifyError
from powerline.lib.path import realpath
from powerline.lib.watcher.inotify import INotifyTreeWatcher, DirTooLarge, NoSuchDir, BaseDirChanged
class DummyTreeWatcher(object):
is_dummy = True
def __init__(self, basedir):
self.basedir = realpath(basedir)
def __call__(self):
return False
class TreeWatcher(object):
def __init__(self, pl, watcher_type, expire_time):
self.watches = {}
self.last_query_times = {}
self.expire_time = expire_time * 60
self.pl = pl
self.watcher_type = watcher_type
def get_watcher(self, path, ignore_event):
if self.watcher_type == 'inotify':
return INotifyTreeWatcher(path, ignore_event=ignore_event)
if self.watcher_type == 'dummy':
return DummyTreeWatcher(path)
# FIXME
if self.watcher_type == 'stat':
return DummyTreeWatcher(path)
if self.watcher_type == 'auto':
if sys.platform.startswith('linux'):
try:
return INotifyTreeWatcher(path, ignore_event=ignore_event)
except (INotifyError, DirTooLarge) as e:
if not isinstance(e, INotifyError):
self.pl.warn('Failed to watch path: {0} with error: {1}'.format(path, e))
return DummyTreeWatcher(path)
else:
raise ValueError('Unknown watcher type: {0}'.format(self.watcher_type))
def watch(self, path, ignore_event=None):
path = realpath(path)
w = self.get_watcher(path, ignore_event)
self.watches[path] = w
return w
def expire_old_queries(self):
pop = []
now = monotonic()
for path, lt in self.last_query_times.items():
if now - lt > self.expire_time:
pop.append(path)
for path in pop:
del self.last_query_times[path]
def __call__(self, path, ignore_event=None):
path = realpath(path)
self.expire_old_queries()
self.last_query_times[path] = monotonic()
w = self.watches.get(path, None)
if w is None:
try:
self.watch(path, ignore_event=ignore_event)
except NoSuchDir:
pass
return True
try:
return w()
except BaseDirChanged:
self.watches.pop(path, None)
return True
except DirTooLarge as e:
self.pl.warn(str(e))
self.watches[path] = DummyTreeWatcher(path)
return False

View File

@ -3,7 +3,7 @@ from __future__ import absolute_import, unicode_literals, division, print_functi
import sys
from powerline.lib.file_watcher import create_file_watcher
from powerline.lib.watcher import create_file_watcher
def list_segment_key_values(segment, theme_configs, key, module=None, default=None):

View File

@ -6,9 +6,7 @@ from powerline.lib.humanize_bytes import humanize_bytes
from powerline.lib.vcs import guess, get_fallback_create_watcher
from powerline.lib.threaded import ThreadedSegment, KwThreadedSegment
from powerline.lib.monotonic import monotonic
from powerline.lib.file_watcher import create_file_watcher, INotifyError
from powerline.lib.vcs.git import git_directory
from powerline import get_fallback_logger
import threading
import os
import sys
@ -16,9 +14,8 @@ import re
import platform
from time import sleep
from subprocess import call, PIPE
from functools import partial
from tests import TestCase, SkipTest
from tests.lib import Pl
from tests import TestCase
def thread_number():
@ -378,95 +375,6 @@ class TestLib(TestCase):
self.assertEqual(humanize_bytes(1000000000, si_prefix=False), '953.7 MiB')
class TestFilesystemWatchers(TestCase):
def do_test_for_change(self, watcher, path):
st = monotonic()
while monotonic() - st < 1:
if watcher(path):
return
sleep(0.1)
self.fail('The change to {0} was not detected'.format(path))
def test_file_watcher(self):
try:
w = create_file_watcher(pl=get_fallback_logger(), watcher_type='inotify')
except INotifyError:
raise SkipTest('This test is not suitable for a stat based file watcher')
f1, f2, f3 = map(lambda x: os.path.join(INOTIFY_DIR, 'file%d' % x), (1, 2, 3))
with open(f1, 'wb'):
with open(f2, 'wb'):
with open(f3, 'wb'):
pass
ne = os.path.join(INOTIFY_DIR, 'notexists')
self.assertRaises(OSError, w, ne)
self.assertTrue(w(f1))
self.assertTrue(w(f2))
os.utime(f1, None), os.utime(f2, None)
self.do_test_for_change(w, f1)
self.do_test_for_change(w, f2)
# Repeat once
os.utime(f1, None), os.utime(f2, None)
self.do_test_for_change(w, f1)
self.do_test_for_change(w, f2)
# Check that no false changes are reported
self.assertFalse(w(f1), 'Spurious change detected')
self.assertFalse(w(f2), 'Spurious change detected')
# Check that open the file with 'w' triggers a change
with open(f1, 'wb'):
with open(f2, 'wb'):
pass
self.do_test_for_change(w, f1)
self.do_test_for_change(w, f2)
# Check that writing to a file with 'a' triggers a change
with open(f1, 'ab') as f:
f.write(b'1')
self.do_test_for_change(w, f1)
# Check that deleting a file registers as a change
os.unlink(f1)
self.do_test_for_change(w, f1)
# Test that changing the inode of a file does not cause it to stop
# being watched
os.rename(f3, f2)
self.do_test_for_change(w, f2)
self.assertFalse(w(f2), 'Spurious change detected')
os.utime(f2, None)
self.do_test_for_change(w, f2)
def test_tree_watcher(self):
from powerline.lib.tree_watcher import TreeWatcher
tw = TreeWatcher()
subdir = os.path.join(INOTIFY_DIR, 'subdir')
os.mkdir(subdir)
if tw.watch(INOTIFY_DIR).is_dummy:
raise SkipTest('No tree watcher available')
import shutil
self.assertTrue(tw(INOTIFY_DIR))
self.assertFalse(tw(INOTIFY_DIR))
changed = partial(self.do_test_for_change, tw, INOTIFY_DIR)
open(os.path.join(INOTIFY_DIR, 'tree1'), 'w').close()
changed()
open(os.path.join(subdir, 'tree1'), 'w').close()
changed()
os.unlink(os.path.join(subdir, 'tree1'))
changed()
os.rmdir(subdir)
changed()
os.mkdir(subdir)
changed()
os.rename(subdir, subdir + '1')
changed()
shutil.rmtree(subdir + '1')
changed()
os.mkdir(subdir)
f = os.path.join(subdir, 'f')
open(f, 'w').close()
changed()
with open(f, 'a') as s:
s.write(' ')
changed()
os.rename(f, f + '1')
changed()
use_mercurial = use_bzr = (sys.version_info < (3, 0)
and platform.python_implementation() == 'CPython')
@ -636,7 +544,6 @@ old_cwd = None
GIT_REPO = 'git_repo' + os.environ.get('PYTHON', '')
HG_REPO = 'hg_repo' + os.environ.get('PYTHON', '')
BZR_REPO = 'bzr_repo' + os.environ.get('PYTHON', '')
INOTIFY_DIR = 'inotify' + os.environ.get('PYTHON', '')
def setUpModule():
@ -661,13 +568,12 @@ def setUpModule():
call(['bzr', 'config', 'email=Foo <bar@example.org>'], cwd=BZR_REPO)
call(['bzr', 'config', 'nickname=test_powerline'], cwd=BZR_REPO)
call(['bzr', 'config', 'create_signatures=0'], cwd=BZR_REPO)
os.mkdir(INOTIFY_DIR)
def tearDownModule():
global old_cwd
global old_HGRCPATH
for repo_dir in [INOTIFY_DIR, GIT_REPO] + ([HG_REPO] if use_mercurial else []) + ([BZR_REPO] if use_bzr else []):
for repo_dir in [GIT_REPO] + ([HG_REPO] if use_mercurial else []) + ([BZR_REPO] if use_bzr else []):
for root, dirs, files in list(os.walk(repo_dir, topdown=False)):
for file in files:
os.remove(os.path.join(root, file))

130
tests/test_watcher.py Normal file
View File

@ -0,0 +1,130 @@
# vim:fileencoding=utf-8:noet
from __future__ import absolute_import, unicode_literals, print_function, division
from powerline.lib.watcher import create_file_watcher, create_tree_watcher, INotifyError
from powerline import get_fallback_logger
from powerline.lib.monotonic import monotonic
import shutil
from time import sleep
from functools import partial
import os
from tests import TestCase, SkipTest
INOTIFY_DIR = 'inotify' + os.environ.get('PYTHON', '')
class TestFilesystemWatchers(TestCase):
def do_test_for_change(self, watcher, path):
st = monotonic()
while monotonic() - st < 1:
if watcher(path):
return
sleep(0.1)
self.fail('The change to {0} was not detected'.format(path))
def test_file_watcher(self):
try:
w = create_file_watcher(pl=get_fallback_logger(), watcher_type='inotify')
except INotifyError:
raise SkipTest('This test is not suitable for a stat based file watcher')
f1, f2, f3 = map(lambda x: os.path.join(INOTIFY_DIR, 'file%d' % x), (1, 2, 3))
with open(f1, 'wb'):
with open(f2, 'wb'):
with open(f3, 'wb'):
pass
ne = os.path.join(INOTIFY_DIR, 'notexists')
self.assertRaises(OSError, w, ne)
self.assertTrue(w(f1))
self.assertTrue(w(f2))
os.utime(f1, None), os.utime(f2, None)
self.do_test_for_change(w, f1)
self.do_test_for_change(w, f2)
# Repeat once
os.utime(f1, None), os.utime(f2, None)
self.do_test_for_change(w, f1)
self.do_test_for_change(w, f2)
# Check that no false changes are reported
self.assertFalse(w(f1), 'Spurious change detected')
self.assertFalse(w(f2), 'Spurious change detected')
# Check that open the file with 'w' triggers a change
with open(f1, 'wb'):
with open(f2, 'wb'):
pass
self.do_test_for_change(w, f1)
self.do_test_for_change(w, f2)
# Check that writing to a file with 'a' triggers a change
with open(f1, 'ab') as f:
f.write(b'1')
self.do_test_for_change(w, f1)
# Check that deleting a file registers as a change
os.unlink(f1)
self.do_test_for_change(w, f1)
# Test that changing the inode of a file does not cause it to stop
# being watched
os.rename(f3, f2)
self.do_test_for_change(w, f2)
self.assertFalse(w(f2), 'Spurious change detected')
os.utime(f2, None)
self.do_test_for_change(w, f2)
def test_tree_watcher(self):
tw = create_tree_watcher(get_fallback_logger())
subdir = os.path.join(INOTIFY_DIR, 'subdir')
os.mkdir(subdir)
if tw.watch(INOTIFY_DIR).is_dummy:
raise SkipTest('No tree watcher available')
self.assertTrue(tw(INOTIFY_DIR))
self.assertFalse(tw(INOTIFY_DIR))
changed = partial(self.do_test_for_change, tw, INOTIFY_DIR)
open(os.path.join(INOTIFY_DIR, 'tree1'), 'w').close()
changed()
open(os.path.join(subdir, 'tree1'), 'w').close()
changed()
os.unlink(os.path.join(subdir, 'tree1'))
changed()
os.rmdir(subdir)
changed()
os.mkdir(subdir)
changed()
os.rename(subdir, subdir + '1')
changed()
shutil.rmtree(subdir + '1')
changed()
os.mkdir(subdir)
f = os.path.join(subdir, 'f')
open(f, 'w').close()
changed()
with open(f, 'a') as s:
s.write(' ')
changed()
os.rename(f, f + '1')
changed()
old_cwd = None
def setUpModule():
global old_cwd
old_cwd = os.getcwd()
os.chdir(os.path.dirname(__file__))
os.mkdir(INOTIFY_DIR)
def tearDownModule():
for d in [INOTIFY_DIR]:
for root, dirs, files in list(os.walk(d, topdown=False)):
for file in files:
os.remove(os.path.join(root, file))
for dir in dirs:
os.rmdir(os.path.join(root, dir))
os.rmdir(d)
os.chdir(old_cwd)
if __name__ == '__main__':
from tests import main
main()