From 42ee82c1de84427689b842edbf9b87a65e3f06c0 Mon Sep 17 00:00:00 2001 From: ZyX Date: Sat, 6 Apr 2013 18:48:04 +0400 Subject: [PATCH] Update *watcher stuff from @kovidgoyal develop branch --- powerline/lib/file_watcher.py | 168 ++++------------------------- powerline/lib/inotify.py | 175 ++++++++++++++++++++++++++++++ powerline/lib/tree_watcher.py | 197 ++++++++++++++++++++++++++++++++++ tests/test_lib.py | 43 +++++++- 4 files changed, 432 insertions(+), 151 deletions(-) create mode 100644 powerline/lib/inotify.py create mode 100644 powerline/lib/tree_watcher.py diff --git a/powerline/lib/file_watcher.py b/powerline/lib/file_watcher.py index 83d679ee..34616e90 100644 --- a/powerline/lib/file_watcher.py +++ b/powerline/lib/file_watcher.py @@ -6,120 +6,23 @@ __docformat__ = 'restructuredtext en' import os import sys -import errno from time import sleep from threading import RLock from powerline.lib.monotonic import monotonic +from powerline.lib.inotify import INotify, INotifyError -class INotifyError(Exception): - pass - - -class INotifyWatch(object): +class INotifyWatch(INotify): is_stat_based = False - # See for the flags defined below - - # Supported events suitable for MASK parameter of INOTIFY_ADD_WATCH. - ACCESS = 0x00000001 # File was accessed. - MODIFY = 0x00000002 # File was modified. - ATTRIB = 0x00000004 # Metadata changed. - CLOSE_WRITE = 0x00000008 # Writtable file was closed. - CLOSE_NOWRITE = 0x00000010 # Unwrittable file closed. - OPEN = 0x00000020 # File was opened. - MOVED_FROM = 0x00000040 # File was moved from X. - MOVED_TO = 0x00000080 # File was moved to Y. - CREATE = 0x00000100 # Subfile was created. - DELETE = 0x00000200 # Subfile was deleted. - DELETE_SELF = 0x00000400 # Self was deleted. - MOVE_SELF = 0x00000800 # Self was moved. - - # Events sent by the kernel. - UNMOUNT = 0x00002000 # Backing fs was unmounted. - Q_OVERFLOW = 0x00004000 # Event queued overflowed. - IGNORED = 0x00008000 # File was ignored. - - # Helper events. - CLOSE = (CLOSE_WRITE | CLOSE_NOWRITE) # Close. - MOVE = (MOVED_FROM | MOVED_TO) # Moves. - - # Special flags. - ONLYDIR = 0x01000000 # Only watch the path if it is a directory. - DONT_FOLLOW = 0x02000000 # Do not follow a sym link. - EXCL_UNLINK = 0x04000000 # Exclude events on unlinked objects. - MASK_ADD = 0x20000000 # Add to the mask of an already existing watch. - ISDIR = 0x40000000 # Event occurred against dir. - ONESHOT = 0x80000000 # Only send event once. - - # All events which a program can wait on. - ALL_EVENTS = (ACCESS | MODIFY | ATTRIB | CLOSE_WRITE | CLOSE_NOWRITE | - OPEN | MOVED_FROM | MOVED_TO | CREATE | DELETE | - DELETE_SELF | MOVE_SELF) - - # See - CLOEXEC = 0x80000 - NONBLOCK = 0x800 - - def __init__(self, inotify_fd, add_watch, rm_watch, read, expire_time=10): - import ctypes - import struct - self._add_watch, self._rm_watch = add_watch, rm_watch - self._read = read - # We keep a reference to os to prevent it from being deleted - # during interpreter shutdown, which would lead to errors in the - # __del__ method - self.os = os + def __init__(self, expire_time=10): + super(INotifyWatch, self).__init__() self.watches = {} self.modified = {} self.last_query = {} - self._buf = ctypes.create_string_buffer(5000) - self.fenc = sys.getfilesystemencoding() or 'utf-8' - self.hdr = struct.Struct(b'iIII') - if self.fenc == 'ascii': - self.fenc = 'utf-8' self.lock = RLock() self.expire_time = expire_time * 60 - self._inotify_fd = inotify_fd - - def handle_error(self): - import ctypes - eno = ctypes.get_errno() - raise OSError(eno, self.os.strerror(eno)) - - def __del__(self): - # This method can be called during interpreter shutdown, which means we - # must do the absolute minimum here. Note that there could be running - # daemon threads that are trying to call other methods on this object. - try: - self.os.close(self._inotify_fd) - except (AttributeError, TypeError): - pass - - def read(self): - import ctypes - buf = [] - while True: - num = self._read(self._inotify_fd, self._buf, len(self._buf)) - if num == 0: - break - if num < 0: - en = ctypes.get_errno() - if en == errno.EAGAIN: - break # No more data - if en == errno.EINTR: - continue # Interrupted, try again - raise OSError(en, self.os.strerror(en)) - buf.append(self._buf.raw[:num]) - raw = b''.join(buf) - pos = 0 - lraw = len(raw) - while lraw - pos >= self.hdr.size: - wd, mask, cookie, name_len = self.hdr.unpack_from(raw, pos) - # We dont care about names as we only watch files - pos += self.hdr.size + name_len - self.process_event(wd, mask, cookie) def expire_watches(self): now = monotonic() @@ -127,7 +30,19 @@ class INotifyWatch(object): if last_query - now > self.expire_time: self.unwatch(path) - def process_event(self, wd, mask, cookie): + def process_event(self, wd, mask, cookie, name): + if wd == -1 and (mask & self.Q_OVERFLOW): + # We missed some INOTIFY events, so we dont + # know the state of any tracked files. + for path in tuple(self.modified): + if os.path.exists(path): + self.modified[path] = True + else: + self.watches.pop(path, None) + self.modified.pop(path, None) + self.last_query.pop(path, None) + return + for path, num in tuple(self.watches.items()): if num == wd: if mask & self.IGNORED: @@ -176,7 +91,7 @@ class INotifyWatch(object): # exist/you dont have permission self.watch(path) return True - self.read() + self.read(get_name=False) if path not in self.modified: # An ignored event was received which means the path has been # automatically unwatched @@ -193,50 +108,7 @@ class INotifyWatch(object): self.unwatch(path) except OSError: pass - if hasattr(self, '_inotify_fd'): - self.os.close(self._inotify_fd) - del self.os - del self._add_watch - del self._rm_watch - del self._inotify_fd - - -def get_inotify(expire_time=10): - ''' Initialize the inotify based file watcher ''' - import ctypes - if not hasattr(ctypes, 'c_ssize_t'): - raise INotifyError('You need python >= 2.7 to use inotify') - from ctypes.util import find_library - name = find_library('c') - if not name: - raise INotifyError('Cannot find C library') - libc = ctypes.CDLL(name, use_errno=True) - for function in ("inotify_add_watch", "inotify_init1", "inotify_rm_watch"): - if not hasattr(libc, function): - raise INotifyError('libc is too old') - # inotify_init1() - prototype = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_int, use_errno=True) - init1 = prototype(('inotify_init1', libc), ((1, "flags", 0),)) - - # inotify_add_watch() - prototype = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_int, ctypes.c_char_p, ctypes.c_uint32, use_errno=True) - add_watch = prototype(('inotify_add_watch', libc), ( - (1, "fd"), (1, "pathname"), (1, "mask")), use_errno=True) - - # inotify_rm_watch() - prototype = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_int, ctypes.c_int, use_errno=True) - rm_watch = prototype(('inotify_rm_watch', libc), ( - (1, "fd"), (1, "wd")), use_errno=True) - - # read() - prototype = ctypes.CFUNCTYPE(ctypes.c_ssize_t, ctypes.c_int, ctypes.c_void_p, ctypes.c_size_t, use_errno=True) - read = prototype(('read', libc), ( - (1, "fd"), (1, "buf"), (1, "count")), use_errno=True) - - inotify_fd = init1(INotifyWatch.CLOEXEC | INotifyWatch.NONBLOCK) - if inotify_fd == -1: - raise INotifyError(os.strerror(ctypes.get_errno())) - return INotifyWatch(inotify_fd, add_watch, rm_watch, read, expire_time=expire_time) + super(INotifyWatch, self).close() class StatWatch(object): @@ -289,7 +161,7 @@ def create_file_watcher(use_stat=False, expire_time=10): if use_stat: return StatWatch() try: - return get_inotify(expire_time=expire_time) + return INotifyWatch(expire_time=expire_time) except INotifyError: pass return StatWatch() diff --git a/powerline/lib/inotify.py b/powerline/lib/inotify.py new file mode 100644 index 00000000..ecfdd1f8 --- /dev/null +++ b/powerline/lib/inotify.py @@ -0,0 +1,175 @@ +# vim:fileencoding=UTF-8:noet +from __future__ import unicode_literals, absolute_import + +__copyright__ = '2013, Kovid Goyal ' +__docformat__ = 'restructuredtext en' + +import sys, os, errno + +class INotifyError(Exception): + pass + +_inotify = None + +def load_inotify(): + ''' Initialize the inotify library ''' + global _inotify + if _inotify is None: + if hasattr(sys, 'getwindowsversion'): + # On windows abort before loading the C library. Windows has + # multiple, incompatible C runtimes, and we have no way of knowing + # if the one chosen by ctypes is compatible with the currently + # loaded one. + raise INotifyError('INotify not available on windows') + import ctypes + if not hasattr(ctypes, 'c_ssize_t'): + raise INotifyError('You need python >= 2.7 to use inotify') + from ctypes.util import find_library + name = find_library('c') + if not name: + raise INotifyError('Cannot find C library') + libc = ctypes.CDLL(name, use_errno=True) + for function in ("inotify_add_watch", "inotify_init1", "inotify_rm_watch"): + if not hasattr(libc, function): + raise INotifyError('libc is too old') + # inotify_init1() + prototype = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_int, use_errno=True) + init1 = prototype(('inotify_init1', libc), ((1, "flags", 0),)) + + # inotify_add_watch() + prototype = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_int, ctypes.c_char_p, ctypes.c_uint32, use_errno=True) + add_watch = prototype(('inotify_add_watch', libc), ( + (1, "fd"), (1, "pathname"), (1, "mask")), use_errno=True) + + # inotify_rm_watch() + prototype = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_int, ctypes.c_int, use_errno=True) + rm_watch = prototype(('inotify_rm_watch', libc), ( + (1, "fd"), (1, "wd")), use_errno=True) + + # read() + prototype = ctypes.CFUNCTYPE(ctypes.c_ssize_t, ctypes.c_int, ctypes.c_void_p, ctypes.c_size_t, use_errno=True) + read = prototype(('read', libc), ( + (1, "fd"), (1, "buf"), (1, "count")), use_errno=True) + _inotify = (init1, add_watch, rm_watch, read) + return _inotify + +class INotify(object): + + # See for the flags defined below + + # Supported events suitable for MASK parameter of INOTIFY_ADD_WATCH. + ACCESS = 0x00000001 # File was accessed. + MODIFY = 0x00000002 # File was modified. + ATTRIB = 0x00000004 # Metadata changed. + CLOSE_WRITE = 0x00000008 # Writtable file was closed. + CLOSE_NOWRITE = 0x00000010 # Unwrittable file closed. + OPEN = 0x00000020 # File was opened. + MOVED_FROM = 0x00000040 # File was moved from X. + MOVED_TO = 0x00000080 # File was moved to Y. + CREATE = 0x00000100 # Subfile was created. + DELETE = 0x00000200 # Subfile was deleted. + DELETE_SELF = 0x00000400 # Self was deleted. + MOVE_SELF = 0x00000800 # Self was moved. + + # Events sent by the kernel. + UNMOUNT = 0x00002000 # Backing fs was unmounted. + Q_OVERFLOW = 0x00004000 # Event queued overflowed. + IGNORED = 0x00008000 # File was ignored. + + # Helper events. + CLOSE = (CLOSE_WRITE | CLOSE_NOWRITE) # Close. + MOVE = (MOVED_FROM | MOVED_TO) # Moves. + + # Special flags. + ONLYDIR = 0x01000000 # Only watch the path if it is a directory. + DONT_FOLLOW = 0x02000000 # Do not follow a sym link. + EXCL_UNLINK = 0x04000000 # Exclude events on unlinked objects. + MASK_ADD = 0x20000000 # Add to the mask of an already existing watch. + ISDIR = 0x40000000 # Event occurred against dir. + ONESHOT = 0x80000000 # Only send event once. + + # All events which a program can wait on. + ALL_EVENTS = (ACCESS | MODIFY | ATTRIB | CLOSE_WRITE | CLOSE_NOWRITE | + OPEN | MOVED_FROM | MOVED_TO | CREATE | DELETE | + DELETE_SELF | MOVE_SELF) + + # See + CLOEXEC = 0x80000 + NONBLOCK = 0x800 + + def __init__(self, cloexec=True, nonblock=True): + import ctypes, struct + self._init1, self._add_watch, self._rm_watch, self._read = load_inotify() + flags = 0 + if cloexec: + flags |= self.CLOEXEC + if nonblock: + flags |= self.NONBLOCK + self._inotify_fd = self._init1(flags) + if self._inotify_fd == -1: + raise INotifyError(os.strerror(ctypes.get_errno())) + + self._buf = ctypes.create_string_buffer(5000) + self.fenc = sys.getfilesystemencoding() or 'utf-8' + self.hdr = struct.Struct(b'iIII') + if self.fenc == 'ascii': + self.fenc = 'utf-8' + # We keep a reference to os to prevent it from being deleted + # during interpreter shutdown, which would lead to errors in the + # __del__ method + self.os = os + + def handle_error(self): + import ctypes + eno = ctypes.get_errno() + raise OSError(eno, self.os.strerror(eno)) + + def __del__(self): + # This method can be called during interpreter shutdown, which means we + # must do the absolute minimum here. Note that there could be running + # daemon threads that are trying to call other methods on this object. + try: + self.os.close(self._inotify_fd) + except (AttributeError, TypeError): + pass + + def close(self): + if hasattr(self, '_inotify_fd'): + self.os.close(self._inotify_fd) + del self.os + del self._add_watch + del self._rm_watch + del self._inotify_fd + + def read(self, get_name=True): + import ctypes + buf = [] + while True: + num = self._read(self._inotify_fd, self._buf, len(self._buf)) + if num == 0: + break + if num < 0: + en = ctypes.get_errno() + if en == errno.EAGAIN: + break # No more data + if en == errno.EINTR: + continue # Interrupted, try again + raise OSError(en, self.os.strerror(en)) + buf.append(self._buf.raw[:num]) + raw = b''.join(buf) + pos = 0 + lraw = len(raw) + while lraw - pos >= self.hdr.size: + wd, mask, cookie, name_len = self.hdr.unpack_from(raw, pos) + pos += self.hdr.size + name = None + if get_name: + name = raw[pos:pos+name_len].rstrip(b'\0').decode(self.fenc) + pos += name_len + self.process_event(wd, mask, cookie, name) + + def process_event(self, *args): + raise NotImplementedError() + + + diff --git a/powerline/lib/tree_watcher.py b/powerline/lib/tree_watcher.py new file mode 100644 index 00000000..1d2070c5 --- /dev/null +++ b/powerline/lib/tree_watcher.py @@ -0,0 +1,197 @@ +#!/usr/bin/env python +# vim:fileencoding=UTF-8:noet +from __future__ import (unicode_literals, absolute_import, print_function) + +__copyright__ = '2013, Kovid Goyal ' +__docformat__ = 'restructuredtext en' + +import sys, os, errno +from time import sleep +from powerline.lib.monotonic import monotonic + +from powerline.lib.inotify import INotify, INotifyError + +class NoSuchDir(ValueError): + pass + +class DirTooLarge(ValueError): + + def __init__(self, bdir): + ValueError.__init__(self, 'The directory %s is too large to monitor. Try increasing the value in /proc/sys/fs/inotify/max_user_watches'%bdir) + +class INotifyTreeWatcher(INotify): + + is_dummy = False + + def __init__(self, basedir): + super(INotifyTreeWatcher, self).__init__() + self.basedir = os.path.abspath(basedir) + self.watch_tree() + self.modified = True + + def watch_tree(self): + self.watched_dirs = {} + self.watched_rmap = {} + try: + self.add_watches(self.basedir) + except OSError as e: + if e.errno == errno.ENOSPC: + raise DirTooLarge(self.basedir) + + def add_watches(self, base, top_level=True): + ''' Add watches for this directory and all its descendant directories, + recursively. ''' + base = os.path.abspath(base) + try: + is_dir = self.add_watch(base) + except OSError as e: + if e.errno == errno.ENOENT: + # The entry could have been deleted between listdir() and + # add_watch(). + if top_level: + raise NoSuchDir('The dir %s does not exist'%base) + return + if e.errno == errno.EACCES: + # We silently ignore entries for which we dont have permission, + # unless they are the top level dir + if top_level: + raise NoSuchDir('You do not have permission to monitor %s'%base) + return + raise + else: + if is_dir: + try: + files = os.listdir(base) + except OSError as e: + if e.errno in (errno.ENOTDIR, errno.ENOENT): + # The dir was deleted/replaced between the add_watch() + # and listdir() + if top_level: + raise NoSuchDir('The dir %s does not exist'%base) + return + raise + for x in files: + self.add_watches(os.path.join(base, x), top_level=False) + elif top_level: + # The top level dir is a file, not good. + raise NoSuchDir('The dir %s does not exist'%base) + + def add_watch(self, path): + import ctypes + bpath = path if isinstance(path, bytes) else path.encode(self.fenc) + wd = self._add_watch(self._inotify_fd, ctypes.c_char_p(bpath), + self.DONT_FOLLOW | self.ONLYDIR | # Ignore symlinks and watch only directories + self.MODIFY | self.CREATE | self.DELETE | + self.MOVE_SELF | self.MOVED_FROM | self.MOVED_TO | + self.ATTRIB | self.MOVE_SELF | self.DELETE_SELF) + if wd == -1: + eno = ctypes.get_errno() + if eno == errno.ENOTDIR: + return False + raise OSError(eno, 'Failed to add watch for: %s: %s'%(path, self.os.strerror(eno))) + self.watched_dirs[path] = wd + self.watched_rmap[wd] = path + return True + + def process_event(self, wd, mask, cookie, name): + if wd == -1 and (mask & self.Q_OVERFLOW): + # We missed some INOTIFY events, so we dont + # know the state of any tracked dirs. + self.watch_tree() + self.modified = True + return + path = self.watched_rmap.get(wd, None) + if path is not None: + self.modified = True + if mask & self.CREATE: + # A new sub-directory might have been created, monitor it. + try: + self.add_watch(os.path.join(path, name)) + except OSError as e: + if e.errno == errno.ENOENT: + # Deleted before add_watch() + pass + elif e.errno == errno.ENOSPC: + raise DirTooLarge(self.basedir) + else: + raise + + def __call__(self): + self.read() + ret = self.modified + self.modified = False + return ret + +class DummyTreeWatcher(object): + + is_dummy = True + + def __init__(self, basedir): + self.basedir = os.path.abspath(basedir) + + def __call__(self): + return False + +class TreeWatcher(object): + + def __init__(self, expire_time=10): + self.watches = {} + self.last_query_times = {} + self.expire_time = expire_time * 60 + + def watch(self, path, logger=None): + path = os.path.abspath(path) + try: + w = INotifyTreeWatcher(path) + except (INotifyError, DirTooLarge) as e: + if logger is not None: + logger.warn('Failed to watch path: %s with error: %s'%(path, e)) + w = DummyTreeWatcher(path) + self.watches[path] = w + return w + + def is_actually_watched(self, path): + w = self.watches.get(path, None) + return not getattr(w, 'is_dummy', True) + + def expire_old_queries(self): + pop = [] + now = monotonic() + for path, lt in self.last_query_times.items(): + if now - lt > self.expire_time: + pop.append(path) + for path in pop: + del self.last_query_times[path] + + def __call__(self, path, logger=None): + path = os.path.abspath(path) + self.expire_old_queries() + self.last_query_times[path] = monotonic() + w = self.watches.get(path, None) + if w is None: + try: + self.watch(path) + except NoSuchDir: + pass + return True + try: + return w() + except DirTooLarge as e: + if logger is not None: + logger.warn(str(e)) + self.watches[path] = DummyTreeWatcher(path) + return False + +if __name__ == '__main__': + w = INotifyTreeWatcher(sys.argv[-1]) + w() + print ('Monitoring', sys.argv[-1], 'press Ctrl-C to stop') + try: + while True: + if w(): + print (sys.argv[-1], 'changed') + sleep(1) + except KeyboardInterrupt: + raise SystemExit(0) + + diff --git a/tests/test_lib.py b/tests/test_lib.py index 1983e6c8..b51dac83 100644 --- a/tests/test_lib.py +++ b/tests/test_lib.py @@ -5,6 +5,8 @@ from powerline.lib.vcs import guess from subprocess import call, PIPE import os import sys +from unittest.case import SkipTest +from functools import partial from tests import TestCase @@ -36,6 +38,8 @@ class TestLib(TestCase): self.assertEqual(humanize_bytes(1000000000, si_prefix=True), '1.00 GB') self.assertEqual(humanize_bytes(1000000000, si_prefix=False), '953.7 MiB') +class TestFilesystemWatchers(TestCase): + def do_test_for_change(self, watcher, path): import time st = time.time() @@ -49,9 +53,7 @@ class TestLib(TestCase): from powerline.lib.file_watcher import create_file_watcher w = create_file_watcher(use_stat=False) if w.is_stat_based: - # The granularity of mtime (1 second) means that we cannot use the - # same tests for inotify and StatWatch. - return + raise SkipTest('This test is not suitable for a stat based file watcher') f1, f2 = os.path.join(INOTIFY_DIR, 'file1'), os.path.join(INOTIFY_DIR, 'file2') with open(f1, 'wb'): with open(f2, 'wb'): @@ -84,6 +86,41 @@ class TestLib(TestCase): os.unlink(f1) self.do_test_for_change(w, f1) + def test_tree_watcher(self): + from powerline.lib.tree_watcher import TreeWatcher + tw = TreeWatcher() + subdir = os.path.join(INOTIFY_DIR, 'subdir') + os.mkdir(subdir) + if tw.watch(INOTIFY_DIR).is_dummy: + raise SkipTest('No tree watcher available') + import shutil + self.assertTrue(tw(INOTIFY_DIR)) + self.assertFalse(tw(INOTIFY_DIR)) + changed = partial(self.do_test_for_change, tw, INOTIFY_DIR) + open(os.path.join(INOTIFY_DIR, 'tree1'), 'w').close() + changed() + open(os.path.join(subdir, 'tree1'), 'w').close() + changed() + os.unlink(os.path.join(subdir, 'tree1')) + changed() + os.rmdir(subdir) + changed() + os.mkdir(subdir) + changed() + os.rename(subdir, subdir+'1') + changed() + shutil.rmtree(subdir+'1') + changed() + os.mkdir(subdir) + f = os.path.join(subdir, 'f') + open(f, 'w').close() + changed() + with open(f, 'a') as s: + s.write(' ') + changed() + os.rename(f, f+'1') + changed() + use_mercurial = use_bzr = sys.version_info < (3, 0)