Move tree_watcher to powerline/lib/watcher and split it

This commit is contained in:
ZyX 2014-06-28 22:59:46 +04:00
parent cb41ce40d2
commit 2faa2a254f
6 changed files with 241 additions and 231 deletions

View File

@ -1,217 +0,0 @@
# vim:fileencoding=utf-8:noet
from __future__ import (unicode_literals, absolute_import, print_function)
__copyright__ = '2013, Kovid Goyal <kovid at kovidgoyal.net>'
__docformat__ = 'restructuredtext en'
import sys
import os
import errno
from time import sleep
from powerline.lib.monotonic import monotonic
from powerline.lib.inotify import INotify, INotifyError
class NoSuchDir(ValueError):
pass
class BaseDirChanged(ValueError):
pass
class DirTooLarge(ValueError):
def __init__(self, bdir):
ValueError.__init__(self, 'The directory {0} is too large to monitor. Try increasing the value in /proc/sys/fs/inotify/max_user_watches'.format(bdir))
def realpath(path):
return os.path.abspath(os.path.realpath(path))
class INotifyTreeWatcher(INotify):
is_dummy = False
def __init__(self, basedir, ignore_event=None):
super(INotifyTreeWatcher, self).__init__()
self.basedir = realpath(basedir)
self.watch_tree()
self.modified = True
self.ignore_event = (lambda path, name: False) if ignore_event is None else ignore_event
def watch_tree(self):
self.watched_dirs = {}
self.watched_rmap = {}
try:
self.add_watches(self.basedir)
except OSError as e:
if e.errno == errno.ENOSPC:
raise DirTooLarge(self.basedir)
def add_watches(self, base, top_level=True):
''' Add watches for this directory and all its descendant directories,
recursively. '''
base = realpath(base)
# There may exist a link which leads to an endless
# add_watches loop or to maximum recursion depth exceeded
if not top_level and base in self.watched_dirs:
return
try:
is_dir = self.add_watch(base)
except OSError as e:
if e.errno == errno.ENOENT:
# The entry could have been deleted between listdir() and
# add_watch().
if top_level:
raise NoSuchDir('The dir {0} does not exist'.format(base))
return
if e.errno == errno.EACCES:
# We silently ignore entries for which we dont have permission,
# unless they are the top level dir
if top_level:
raise NoSuchDir('You do not have permission to monitor {0}'.format(base))
return
raise
else:
if is_dir:
try:
files = os.listdir(base)
except OSError as e:
if e.errno in (errno.ENOTDIR, errno.ENOENT):
# The dir was deleted/replaced between the add_watch()
# and listdir()
if top_level:
raise NoSuchDir('The dir {0} does not exist'.format(base))
return
raise
for x in files:
self.add_watches(os.path.join(base, x), top_level=False)
elif top_level:
# The top level dir is a file, not good.
raise NoSuchDir('The dir {0} does not exist'.format(base))
def add_watch(self, path):
import ctypes
bpath = path if isinstance(path, bytes) else path.encode(self.fenc)
wd = self._add_watch(
self._inotify_fd, ctypes.c_char_p(bpath),
# Ignore symlinks and watch only directories
self.DONT_FOLLOW | self.ONLYDIR |
self.MODIFY | self.CREATE | self.DELETE |
self.MOVE_SELF | self.MOVED_FROM | self.MOVED_TO |
self.ATTRIB | self.DELETE_SELF
)
if wd == -1:
eno = ctypes.get_errno()
if eno == errno.ENOTDIR:
return False
raise OSError(eno, 'Failed to add watch for: {0}: {1}'.format(path, self.os.strerror(eno)))
self.watched_dirs[path] = wd
self.watched_rmap[wd] = path
return True
def process_event(self, wd, mask, cookie, name):
if wd == -1 and (mask & self.Q_OVERFLOW):
# We missed some INOTIFY events, so we dont
# know the state of any tracked dirs.
self.watch_tree()
self.modified = True
return
path = self.watched_rmap.get(wd, None)
if path is not None:
if not self.ignore_event(path, name):
self.modified = True
if mask & self.CREATE:
# A new sub-directory might have been created, monitor it.
try:
self.add_watch(os.path.join(path, name))
except OSError as e:
if e.errno == errno.ENOENT:
# Deleted before add_watch()
pass
elif e.errno == errno.ENOSPC:
raise DirTooLarge(self.basedir)
else:
raise
if (mask & self.DELETE_SELF or mask & self.MOVE_SELF) and path == self.basedir:
raise BaseDirChanged('The directory %s was moved/deleted' % path)
def __call__(self):
self.read()
ret = self.modified
self.modified = False
return ret
class DummyTreeWatcher(object):
is_dummy = True
def __init__(self, basedir):
self.basedir = realpath(basedir)
def __call__(self):
return False
class TreeWatcher(object):
def __init__(self, expire_time=10):
self.watches = {}
self.last_query_times = {}
self.expire_time = expire_time * 60
def watch(self, path, logger=None, ignore_event=None):
path = realpath(path)
try:
w = INotifyTreeWatcher(path, ignore_event=ignore_event)
except (INotifyError, DirTooLarge) as e:
if logger is not None and not isinstance(e, INotifyError):
logger.warn('Failed to watch path: {0} with error: {1}'.format(path, e))
w = DummyTreeWatcher(path)
self.watches[path] = w
return w
def expire_old_queries(self):
pop = []
now = monotonic()
for path, lt in self.last_query_times.items():
if now - lt > self.expire_time:
pop.append(path)
for path in pop:
del self.last_query_times[path]
def __call__(self, path, logger=None, ignore_event=None):
path = realpath(path)
self.expire_old_queries()
self.last_query_times[path] = monotonic()
w = self.watches.get(path, None)
if w is None:
try:
self.watch(path, logger=logger, ignore_event=ignore_event)
except NoSuchDir:
pass
return True
try:
return w()
except BaseDirChanged:
self.watches.pop(path, None)
return True
except DirTooLarge as e:
if logger is not None:
logger.warn(str(e))
self.watches[path] = DummyTreeWatcher(path)
return False
if __name__ == '__main__':
w = INotifyTreeWatcher(sys.argv[-1])
w()
print ('Monitoring', sys.argv[-1], 'press Ctrl-C to stop')
try:
while True:
if w():
print (sys.argv[-1], 'changed')
sleep(1)
except KeyboardInterrupt:
raise SystemExit(0)

View File

@ -6,6 +6,8 @@ import errno
from threading import Lock from threading import Lock
from collections import defaultdict from collections import defaultdict
from powerline.lib.watcher import create_tree_watcher
def generate_directories(path): def generate_directories(path):
if os.path.isdir(path): if os.path.isdir(path):
@ -178,9 +180,9 @@ def get_file_status(directory, dirstate_file, file_path, ignore_file_name, get_f
class TreeStatusCache(dict): class TreeStatusCache(dict):
def __init__(self): def __init__(self, pl):
from powerline.lib.tree_watcher import TreeWatcher self.tw = create_tree_watcher(pl)
self.tw = TreeWatcher() self.pl = pl
def cache_and_get(self, key, status): def cache_and_get(self, key, status):
ans = self.get(key, self) ans = self.get(key, self)
@ -188,24 +190,24 @@ class TreeStatusCache(dict):
ans = self[key] = status() ans = self[key] = status()
return ans return ans
def __call__(self, repo, logger): def __call__(self, repo):
key = repo.directory key = repo.directory
try: try:
if self.tw(key, logger=logger, ignore_event=getattr(repo, 'ignore_event', None)): if self.tw(key, ignore_event=getattr(repo, 'ignore_event', None)):
self.pop(key, None) self.pop(key, None)
except OSError as e: except OSError as e:
logger.warn('Failed to check %s for changes, with error: %s' % key, e) self.pl.warn('Failed to check %s for changes, with error: %s' % key, e)
return self.cache_and_get(key, repo.status) return self.cache_and_get(key, repo.status)
_tree_status_cache = None _tree_status_cache = None
def tree_status(repo, logger): def tree_status(repo, pl):
global _tree_status_cache global _tree_status_cache
if _tree_status_cache is None: if _tree_status_cache is None:
_tree_status_cache = TreeStatusCache() _tree_status_cache = TreeStatusCache(pl)
return _tree_status_cache(repo, logger) return _tree_status_cache(repo)
vcs_props = ( vcs_props = (

View File

@ -4,7 +4,9 @@ from __future__ import unicode_literals, absolute_import
import sys import sys
from powerline.lib.watcher.stat import StatFileWatcher from powerline.lib.watcher.stat import StatFileWatcher
from powerline.lib.watcher.inotify import INotifyFileWatcher, INotifyError from powerline.lib.watcher.inotify import INotifyFileWatcher
from powerline.lib.watcher.tree import TreeWatcher
from powerline.lib.inotify import INotifyError
def create_file_watcher(pl, watcher_type='auto', expire_time=10): def create_file_watcher(pl, watcher_type='auto', expire_time=10):
@ -47,3 +49,18 @@ def create_file_watcher(pl, watcher_type='auto', expire_time=10):
pl.debug('Using stat-based watcher') pl.debug('Using stat-based watcher')
return StatFileWatcher() return StatFileWatcher()
def create_tree_watcher(pl, watcher_type='auto', expire_time=10):
'''Create an object that can watch for changes in specified directories
:param PowerlineLogger pl:
Logger.
:param str watcher_type:
Watcher type. Currently the only supported types are ``inotify`` (linux
only), ``dummy`` and ``auto``.
:param int expire_time:
Number of minutes since last ``.__call__()`` before inotify watcher will
stop watching given file.
'''
return TreeWatcher(pl, watcher_type, expire_time)

View File

@ -135,3 +135,129 @@ class INotifyFileWatcher(INotify):
except OSError: except OSError:
pass pass
super(INotifyFileWatcher, self).close() super(INotifyFileWatcher, self).close()
class NoSuchDir(ValueError):
pass
class BaseDirChanged(ValueError):
pass
class DirTooLarge(ValueError):
def __init__(self, bdir):
ValueError.__init__(self, 'The directory {0} is too large to monitor. Try increasing the value in /proc/sys/fs/inotify/max_user_watches'.format(bdir))
class INotifyTreeWatcher(INotify):
is_dummy = False
def __init__(self, basedir, ignore_event=None):
super(INotifyTreeWatcher, self).__init__()
self.basedir = realpath(basedir)
self.watch_tree()
self.modified = True
self.ignore_event = (lambda path, name: False) if ignore_event is None else ignore_event
def watch_tree(self):
self.watched_dirs = {}
self.watched_rmap = {}
try:
self.add_watches(self.basedir)
except OSError as e:
if e.errno == errno.ENOSPC:
raise DirTooLarge(self.basedir)
def add_watches(self, base, top_level=True):
''' Add watches for this directory and all its descendant directories,
recursively. '''
base = realpath(base)
# There may exist a link which leads to an endless
# add_watches loop or to maximum recursion depth exceeded
if not top_level and base in self.watched_dirs:
return
try:
is_dir = self.add_watch(base)
except OSError as e:
if e.errno == errno.ENOENT:
# The entry could have been deleted between listdir() and
# add_watch().
if top_level:
raise NoSuchDir('The dir {0} does not exist'.format(base))
return
if e.errno == errno.EACCES:
# We silently ignore entries for which we dont have permission,
# unless they are the top level dir
if top_level:
raise NoSuchDir('You do not have permission to monitor {0}'.format(base))
return
raise
else:
if is_dir:
try:
files = os.listdir(base)
except OSError as e:
if e.errno in (errno.ENOTDIR, errno.ENOENT):
# The dir was deleted/replaced between the add_watch()
# and listdir()
if top_level:
raise NoSuchDir('The dir {0} does not exist'.format(base))
return
raise
for x in files:
self.add_watches(os.path.join(base, x), top_level=False)
elif top_level:
# The top level dir is a file, not good.
raise NoSuchDir('The dir {0} does not exist'.format(base))
def add_watch(self, path):
import ctypes
bpath = path if isinstance(path, bytes) else path.encode(self.fenc)
wd = self._add_watch(self._inotify_fd, ctypes.c_char_p(bpath),
# Ignore symlinks and watch only directories
self.DONT_FOLLOW | self.ONLYDIR |
self.MODIFY | self.CREATE | self.DELETE |
self.MOVE_SELF | self.MOVED_FROM | self.MOVED_TO |
self.ATTRIB | self.DELETE_SELF)
if wd == -1:
eno = ctypes.get_errno()
if eno == errno.ENOTDIR:
return False
raise OSError(eno, 'Failed to add watch for: {0}: {1}'.format(path, self.os.strerror(eno)))
self.watched_dirs[path] = wd
self.watched_rmap[wd] = path
return True
def process_event(self, wd, mask, cookie, name):
if wd == -1 and (mask & self.Q_OVERFLOW):
# We missed some INOTIFY events, so we dont
# know the state of any tracked dirs.
self.watch_tree()
self.modified = True
return
path = self.watched_rmap.get(wd, None)
if path is not None:
if not self.ignore_event(path, name):
self.modified = True
if mask & self.CREATE:
# A new sub-directory might have been created, monitor it.
try:
self.add_watch(os.path.join(path, name))
except OSError as e:
if e.errno == errno.ENOENT:
# Deleted before add_watch()
pass
elif e.errno == errno.ENOSPC:
raise DirTooLarge(self.basedir)
else:
raise
if (mask & self.DELETE_SELF or mask & self.MOVE_SELF) and path == self.basedir:
raise BaseDirChanged('The directory %s was moved/deleted' % path)
def __call__(self):
self.read()
ret = self.modified
self.modified = False
return ret

View File

@ -0,0 +1,83 @@
# vim:fileencoding=utf-8:noet
from __future__ import (unicode_literals, absolute_import, print_function)
import sys
from powerline.lib.monotonic import monotonic
from powerline.lib.inotify import INotifyError
from powerline.lib.path import realpath
from powerline.lib.watcher.inotify import INotifyTreeWatcher, DirTooLarge, NoSuchDir, BaseDirChanged
class DummyTreeWatcher(object):
is_dummy = True
def __init__(self, basedir):
self.basedir = realpath(basedir)
def __call__(self):
return False
class TreeWatcher(object):
def __init__(self, pl, watcher_type, expire_time):
self.watches = {}
self.last_query_times = {}
self.expire_time = expire_time * 60
self.pl = pl
self.watcher_type = watcher_type
def get_watcher(self, path, ignore_event):
if self.watcher_type == 'inotify':
return INotifyTreeWatcher(path, ignore_event=ignore_event)
if self.watcher_type == 'dummy':
return DummyTreeWatcher(path)
# FIXME
if self.watcher_type == 'stat':
return DummyTreeWatcher(path)
if self.watcher_type == 'auto':
if sys.platform.startswith('linux'):
try:
return INotifyTreeWatcher(path, ignore_event=ignore_event)
except (INotifyError, DirTooLarge) as e:
if not isinstance(e, INotifyError):
self.pl.warn('Failed to watch path: {0} with error: {1}'.format(path, e))
return DummyTreeWatcher(path)
else:
raise ValueError('Unknown watcher type: {0}'.format(self.watcher_type))
def watch(self, path, ignore_event=None):
path = realpath(path)
w = self.get_watcher(path, ignore_event)
self.watches[path] = w
return w
def expire_old_queries(self):
pop = []
now = monotonic()
for path, lt in self.last_query_times.items():
if now - lt > self.expire_time:
pop.append(path)
for path in pop:
del self.last_query_times[path]
def __call__(self, path, ignore_event=None):
path = realpath(path)
self.expire_old_queries()
self.last_query_times[path] = monotonic()
w = self.watches.get(path, None)
if w is None:
try:
self.watch(path, ignore_event=ignore_event)
except NoSuchDir:
pass
return True
try:
return w()
except BaseDirChanged:
self.watches.pop(path, None)
return True
except DirTooLarge as e:
self.pl.warn(str(e))
self.watches[path] = DummyTreeWatcher(path)
return False

View File

@ -6,7 +6,7 @@ from powerline.lib.humanize_bytes import humanize_bytes
from powerline.lib.vcs import guess, get_fallback_create_watcher from powerline.lib.vcs import guess, get_fallback_create_watcher
from powerline.lib.threaded import ThreadedSegment, KwThreadedSegment from powerline.lib.threaded import ThreadedSegment, KwThreadedSegment
from powerline.lib.monotonic import monotonic from powerline.lib.monotonic import monotonic
from powerline.lib.watcher import create_file_watcher, INotifyError from powerline.lib.watcher import create_file_watcher, create_tree_watcher, INotifyError
from powerline.lib.vcs.git import git_directory from powerline.lib.vcs.git import git_directory
from powerline import get_fallback_logger from powerline import get_fallback_logger
import threading import threading
@ -14,6 +14,7 @@ import os
import sys import sys
import re import re
import platform import platform
import shutil
from time import sleep from time import sleep
from subprocess import call, PIPE from subprocess import call, PIPE
from functools import partial from functools import partial
@ -433,13 +434,11 @@ class TestFilesystemWatchers(TestCase):
self.do_test_for_change(w, f2) self.do_test_for_change(w, f2)
def test_tree_watcher(self): def test_tree_watcher(self):
from powerline.lib.tree_watcher import TreeWatcher tw = create_tree_watcher(get_fallback_logger())
tw = TreeWatcher()
subdir = os.path.join(INOTIFY_DIR, 'subdir') subdir = os.path.join(INOTIFY_DIR, 'subdir')
os.mkdir(subdir) os.mkdir(subdir)
if tw.watch(INOTIFY_DIR).is_dummy: if tw.watch(INOTIFY_DIR).is_dummy:
raise SkipTest('No tree watcher available') raise SkipTest('No tree watcher available')
import shutil
self.assertTrue(tw(INOTIFY_DIR)) self.assertTrue(tw(INOTIFY_DIR))
self.assertFalse(tw(INOTIFY_DIR)) self.assertFalse(tw(INOTIFY_DIR))
changed = partial(self.do_test_for_change, tw, INOTIFY_DIR) changed = partial(self.do_test_for_change, tw, INOTIFY_DIR)