Update *watcher stuff from @kovidgoyal develop branch

This commit is contained in:
ZyX 2013-04-06 18:48:04 +04:00
parent b17dab0cd4
commit 42ee82c1de
4 changed files with 432 additions and 151 deletions

View File

@ -6,120 +6,23 @@ __docformat__ = 'restructuredtext en'
import os import os
import sys import sys
import errno
from time import sleep from time import sleep
from threading import RLock from threading import RLock
from powerline.lib.monotonic import monotonic from powerline.lib.monotonic import monotonic
from powerline.lib.inotify import INotify, INotifyError
class INotifyError(Exception): class INotifyWatch(INotify):
pass
class INotifyWatch(object):
is_stat_based = False is_stat_based = False
# See <sys/inotify.h> for the flags defined below def __init__(self, expire_time=10):
super(INotifyWatch, self).__init__()
# Supported events suitable for MASK parameter of INOTIFY_ADD_WATCH.
ACCESS = 0x00000001 # File was accessed.
MODIFY = 0x00000002 # File was modified.
ATTRIB = 0x00000004 # Metadata changed.
CLOSE_WRITE = 0x00000008 # Writtable file was closed.
CLOSE_NOWRITE = 0x00000010 # Unwrittable file closed.
OPEN = 0x00000020 # File was opened.
MOVED_FROM = 0x00000040 # File was moved from X.
MOVED_TO = 0x00000080 # File was moved to Y.
CREATE = 0x00000100 # Subfile was created.
DELETE = 0x00000200 # Subfile was deleted.
DELETE_SELF = 0x00000400 # Self was deleted.
MOVE_SELF = 0x00000800 # Self was moved.
# Events sent by the kernel.
UNMOUNT = 0x00002000 # Backing fs was unmounted.
Q_OVERFLOW = 0x00004000 # Event queued overflowed.
IGNORED = 0x00008000 # File was ignored.
# Helper events.
CLOSE = (CLOSE_WRITE | CLOSE_NOWRITE) # Close.
MOVE = (MOVED_FROM | MOVED_TO) # Moves.
# Special flags.
ONLYDIR = 0x01000000 # Only watch the path if it is a directory.
DONT_FOLLOW = 0x02000000 # Do not follow a sym link.
EXCL_UNLINK = 0x04000000 # Exclude events on unlinked objects.
MASK_ADD = 0x20000000 # Add to the mask of an already existing watch.
ISDIR = 0x40000000 # Event occurred against dir.
ONESHOT = 0x80000000 # Only send event once.
# All events which a program can wait on.
ALL_EVENTS = (ACCESS | MODIFY | ATTRIB | CLOSE_WRITE | CLOSE_NOWRITE |
OPEN | MOVED_FROM | MOVED_TO | CREATE | DELETE |
DELETE_SELF | MOVE_SELF)
# See <bits/inotify.h>
CLOEXEC = 0x80000
NONBLOCK = 0x800
def __init__(self, inotify_fd, add_watch, rm_watch, read, expire_time=10):
import ctypes
import struct
self._add_watch, self._rm_watch = add_watch, rm_watch
self._read = read
# We keep a reference to os to prevent it from being deleted
# during interpreter shutdown, which would lead to errors in the
# __del__ method
self.os = os
self.watches = {} self.watches = {}
self.modified = {} self.modified = {}
self.last_query = {} self.last_query = {}
self._buf = ctypes.create_string_buffer(5000)
self.fenc = sys.getfilesystemencoding() or 'utf-8'
self.hdr = struct.Struct(b'iIII')
if self.fenc == 'ascii':
self.fenc = 'utf-8'
self.lock = RLock() self.lock = RLock()
self.expire_time = expire_time * 60 self.expire_time = expire_time * 60
self._inotify_fd = inotify_fd
def handle_error(self):
import ctypes
eno = ctypes.get_errno()
raise OSError(eno, self.os.strerror(eno))
def __del__(self):
# This method can be called during interpreter shutdown, which means we
# must do the absolute minimum here. Note that there could be running
# daemon threads that are trying to call other methods on this object.
try:
self.os.close(self._inotify_fd)
except (AttributeError, TypeError):
pass
def read(self):
import ctypes
buf = []
while True:
num = self._read(self._inotify_fd, self._buf, len(self._buf))
if num == 0:
break
if num < 0:
en = ctypes.get_errno()
if en == errno.EAGAIN:
break # No more data
if en == errno.EINTR:
continue # Interrupted, try again
raise OSError(en, self.os.strerror(en))
buf.append(self._buf.raw[:num])
raw = b''.join(buf)
pos = 0
lraw = len(raw)
while lraw - pos >= self.hdr.size:
wd, mask, cookie, name_len = self.hdr.unpack_from(raw, pos)
# We dont care about names as we only watch files
pos += self.hdr.size + name_len
self.process_event(wd, mask, cookie)
def expire_watches(self): def expire_watches(self):
now = monotonic() now = monotonic()
@ -127,7 +30,19 @@ class INotifyWatch(object):
if last_query - now > self.expire_time: if last_query - now > self.expire_time:
self.unwatch(path) self.unwatch(path)
def process_event(self, wd, mask, cookie): def process_event(self, wd, mask, cookie, name):
if wd == -1 and (mask & self.Q_OVERFLOW):
# We missed some INOTIFY events, so we dont
# know the state of any tracked files.
for path in tuple(self.modified):
if os.path.exists(path):
self.modified[path] = True
else:
self.watches.pop(path, None)
self.modified.pop(path, None)
self.last_query.pop(path, None)
return
for path, num in tuple(self.watches.items()): for path, num in tuple(self.watches.items()):
if num == wd: if num == wd:
if mask & self.IGNORED: if mask & self.IGNORED:
@ -176,7 +91,7 @@ class INotifyWatch(object):
# exist/you dont have permission # exist/you dont have permission
self.watch(path) self.watch(path)
return True return True
self.read() self.read(get_name=False)
if path not in self.modified: if path not in self.modified:
# An ignored event was received which means the path has been # An ignored event was received which means the path has been
# automatically unwatched # automatically unwatched
@ -193,50 +108,7 @@ class INotifyWatch(object):
self.unwatch(path) self.unwatch(path)
except OSError: except OSError:
pass pass
if hasattr(self, '_inotify_fd'): super(INotifyWatch, self).close()
self.os.close(self._inotify_fd)
del self.os
del self._add_watch
del self._rm_watch
del self._inotify_fd
def get_inotify(expire_time=10):
''' Initialize the inotify based file watcher '''
import ctypes
if not hasattr(ctypes, 'c_ssize_t'):
raise INotifyError('You need python >= 2.7 to use inotify')
from ctypes.util import find_library
name = find_library('c')
if not name:
raise INotifyError('Cannot find C library')
libc = ctypes.CDLL(name, use_errno=True)
for function in ("inotify_add_watch", "inotify_init1", "inotify_rm_watch"):
if not hasattr(libc, function):
raise INotifyError('libc is too old')
# inotify_init1()
prototype = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_int, use_errno=True)
init1 = prototype(('inotify_init1', libc), ((1, "flags", 0),))
# inotify_add_watch()
prototype = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_int, ctypes.c_char_p, ctypes.c_uint32, use_errno=True)
add_watch = prototype(('inotify_add_watch', libc), (
(1, "fd"), (1, "pathname"), (1, "mask")), use_errno=True)
# inotify_rm_watch()
prototype = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_int, ctypes.c_int, use_errno=True)
rm_watch = prototype(('inotify_rm_watch', libc), (
(1, "fd"), (1, "wd")), use_errno=True)
# read()
prototype = ctypes.CFUNCTYPE(ctypes.c_ssize_t, ctypes.c_int, ctypes.c_void_p, ctypes.c_size_t, use_errno=True)
read = prototype(('read', libc), (
(1, "fd"), (1, "buf"), (1, "count")), use_errno=True)
inotify_fd = init1(INotifyWatch.CLOEXEC | INotifyWatch.NONBLOCK)
if inotify_fd == -1:
raise INotifyError(os.strerror(ctypes.get_errno()))
return INotifyWatch(inotify_fd, add_watch, rm_watch, read, expire_time=expire_time)
class StatWatch(object): class StatWatch(object):
@ -289,7 +161,7 @@ def create_file_watcher(use_stat=False, expire_time=10):
if use_stat: if use_stat:
return StatWatch() return StatWatch()
try: try:
return get_inotify(expire_time=expire_time) return INotifyWatch(expire_time=expire_time)
except INotifyError: except INotifyError:
pass pass
return StatWatch() return StatWatch()

175
powerline/lib/inotify.py Normal file
View File

@ -0,0 +1,175 @@
# vim:fileencoding=UTF-8:noet
from __future__ import unicode_literals, absolute_import
__copyright__ = '2013, Kovid Goyal <kovid at kovidgoyal.net>'
__docformat__ = 'restructuredtext en'
import sys, os, errno
class INotifyError(Exception):
pass
_inotify = None
def load_inotify():
''' Initialize the inotify library '''
global _inotify
if _inotify is None:
if hasattr(sys, 'getwindowsversion'):
# On windows abort before loading the C library. Windows has
# multiple, incompatible C runtimes, and we have no way of knowing
# if the one chosen by ctypes is compatible with the currently
# loaded one.
raise INotifyError('INotify not available on windows')
import ctypes
if not hasattr(ctypes, 'c_ssize_t'):
raise INotifyError('You need python >= 2.7 to use inotify')
from ctypes.util import find_library
name = find_library('c')
if not name:
raise INotifyError('Cannot find C library')
libc = ctypes.CDLL(name, use_errno=True)
for function in ("inotify_add_watch", "inotify_init1", "inotify_rm_watch"):
if not hasattr(libc, function):
raise INotifyError('libc is too old')
# inotify_init1()
prototype = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_int, use_errno=True)
init1 = prototype(('inotify_init1', libc), ((1, "flags", 0),))
# inotify_add_watch()
prototype = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_int, ctypes.c_char_p, ctypes.c_uint32, use_errno=True)
add_watch = prototype(('inotify_add_watch', libc), (
(1, "fd"), (1, "pathname"), (1, "mask")), use_errno=True)
# inotify_rm_watch()
prototype = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_int, ctypes.c_int, use_errno=True)
rm_watch = prototype(('inotify_rm_watch', libc), (
(1, "fd"), (1, "wd")), use_errno=True)
# read()
prototype = ctypes.CFUNCTYPE(ctypes.c_ssize_t, ctypes.c_int, ctypes.c_void_p, ctypes.c_size_t, use_errno=True)
read = prototype(('read', libc), (
(1, "fd"), (1, "buf"), (1, "count")), use_errno=True)
_inotify = (init1, add_watch, rm_watch, read)
return _inotify
class INotify(object):
# See <sys/inotify.h> for the flags defined below
# Supported events suitable for MASK parameter of INOTIFY_ADD_WATCH.
ACCESS = 0x00000001 # File was accessed.
MODIFY = 0x00000002 # File was modified.
ATTRIB = 0x00000004 # Metadata changed.
CLOSE_WRITE = 0x00000008 # Writtable file was closed.
CLOSE_NOWRITE = 0x00000010 # Unwrittable file closed.
OPEN = 0x00000020 # File was opened.
MOVED_FROM = 0x00000040 # File was moved from X.
MOVED_TO = 0x00000080 # File was moved to Y.
CREATE = 0x00000100 # Subfile was created.
DELETE = 0x00000200 # Subfile was deleted.
DELETE_SELF = 0x00000400 # Self was deleted.
MOVE_SELF = 0x00000800 # Self was moved.
# Events sent by the kernel.
UNMOUNT = 0x00002000 # Backing fs was unmounted.
Q_OVERFLOW = 0x00004000 # Event queued overflowed.
IGNORED = 0x00008000 # File was ignored.
# Helper events.
CLOSE = (CLOSE_WRITE | CLOSE_NOWRITE) # Close.
MOVE = (MOVED_FROM | MOVED_TO) # Moves.
# Special flags.
ONLYDIR = 0x01000000 # Only watch the path if it is a directory.
DONT_FOLLOW = 0x02000000 # Do not follow a sym link.
EXCL_UNLINK = 0x04000000 # Exclude events on unlinked objects.
MASK_ADD = 0x20000000 # Add to the mask of an already existing watch.
ISDIR = 0x40000000 # Event occurred against dir.
ONESHOT = 0x80000000 # Only send event once.
# All events which a program can wait on.
ALL_EVENTS = (ACCESS | MODIFY | ATTRIB | CLOSE_WRITE | CLOSE_NOWRITE |
OPEN | MOVED_FROM | MOVED_TO | CREATE | DELETE |
DELETE_SELF | MOVE_SELF)
# See <bits/inotify.h>
CLOEXEC = 0x80000
NONBLOCK = 0x800
def __init__(self, cloexec=True, nonblock=True):
import ctypes, struct
self._init1, self._add_watch, self._rm_watch, self._read = load_inotify()
flags = 0
if cloexec:
flags |= self.CLOEXEC
if nonblock:
flags |= self.NONBLOCK
self._inotify_fd = self._init1(flags)
if self._inotify_fd == -1:
raise INotifyError(os.strerror(ctypes.get_errno()))
self._buf = ctypes.create_string_buffer(5000)
self.fenc = sys.getfilesystemencoding() or 'utf-8'
self.hdr = struct.Struct(b'iIII')
if self.fenc == 'ascii':
self.fenc = 'utf-8'
# We keep a reference to os to prevent it from being deleted
# during interpreter shutdown, which would lead to errors in the
# __del__ method
self.os = os
def handle_error(self):
import ctypes
eno = ctypes.get_errno()
raise OSError(eno, self.os.strerror(eno))
def __del__(self):
# This method can be called during interpreter shutdown, which means we
# must do the absolute minimum here. Note that there could be running
# daemon threads that are trying to call other methods on this object.
try:
self.os.close(self._inotify_fd)
except (AttributeError, TypeError):
pass
def close(self):
if hasattr(self, '_inotify_fd'):
self.os.close(self._inotify_fd)
del self.os
del self._add_watch
del self._rm_watch
del self._inotify_fd
def read(self, get_name=True):
import ctypes
buf = []
while True:
num = self._read(self._inotify_fd, self._buf, len(self._buf))
if num == 0:
break
if num < 0:
en = ctypes.get_errno()
if en == errno.EAGAIN:
break # No more data
if en == errno.EINTR:
continue # Interrupted, try again
raise OSError(en, self.os.strerror(en))
buf.append(self._buf.raw[:num])
raw = b''.join(buf)
pos = 0
lraw = len(raw)
while lraw - pos >= self.hdr.size:
wd, mask, cookie, name_len = self.hdr.unpack_from(raw, pos)
pos += self.hdr.size
name = None
if get_name:
name = raw[pos:pos+name_len].rstrip(b'\0').decode(self.fenc)
pos += name_len
self.process_event(wd, mask, cookie, name)
def process_event(self, *args):
raise NotImplementedError()

View File

@ -0,0 +1,197 @@
#!/usr/bin/env python
# vim:fileencoding=UTF-8:noet
from __future__ import (unicode_literals, absolute_import, print_function)
__copyright__ = '2013, Kovid Goyal <kovid at kovidgoyal.net>'
__docformat__ = 'restructuredtext en'
import sys, os, errno
from time import sleep
from powerline.lib.monotonic import monotonic
from powerline.lib.inotify import INotify, INotifyError
class NoSuchDir(ValueError):
pass
class DirTooLarge(ValueError):
def __init__(self, bdir):
ValueError.__init__(self, 'The directory %s is too large to monitor. Try increasing the value in /proc/sys/fs/inotify/max_user_watches'%bdir)
class INotifyTreeWatcher(INotify):
is_dummy = False
def __init__(self, basedir):
super(INotifyTreeWatcher, self).__init__()
self.basedir = os.path.abspath(basedir)
self.watch_tree()
self.modified = True
def watch_tree(self):
self.watched_dirs = {}
self.watched_rmap = {}
try:
self.add_watches(self.basedir)
except OSError as e:
if e.errno == errno.ENOSPC:
raise DirTooLarge(self.basedir)
def add_watches(self, base, top_level=True):
''' Add watches for this directory and all its descendant directories,
recursively. '''
base = os.path.abspath(base)
try:
is_dir = self.add_watch(base)
except OSError as e:
if e.errno == errno.ENOENT:
# The entry could have been deleted between listdir() and
# add_watch().
if top_level:
raise NoSuchDir('The dir %s does not exist'%base)
return
if e.errno == errno.EACCES:
# We silently ignore entries for which we dont have permission,
# unless they are the top level dir
if top_level:
raise NoSuchDir('You do not have permission to monitor %s'%base)
return
raise
else:
if is_dir:
try:
files = os.listdir(base)
except OSError as e:
if e.errno in (errno.ENOTDIR, errno.ENOENT):
# The dir was deleted/replaced between the add_watch()
# and listdir()
if top_level:
raise NoSuchDir('The dir %s does not exist'%base)
return
raise
for x in files:
self.add_watches(os.path.join(base, x), top_level=False)
elif top_level:
# The top level dir is a file, not good.
raise NoSuchDir('The dir %s does not exist'%base)
def add_watch(self, path):
import ctypes
bpath = path if isinstance(path, bytes) else path.encode(self.fenc)
wd = self._add_watch(self._inotify_fd, ctypes.c_char_p(bpath),
self.DONT_FOLLOW | self.ONLYDIR | # Ignore symlinks and watch only directories
self.MODIFY | self.CREATE | self.DELETE |
self.MOVE_SELF | self.MOVED_FROM | self.MOVED_TO |
self.ATTRIB | self.MOVE_SELF | self.DELETE_SELF)
if wd == -1:
eno = ctypes.get_errno()
if eno == errno.ENOTDIR:
return False
raise OSError(eno, 'Failed to add watch for: %s: %s'%(path, self.os.strerror(eno)))
self.watched_dirs[path] = wd
self.watched_rmap[wd] = path
return True
def process_event(self, wd, mask, cookie, name):
if wd == -1 and (mask & self.Q_OVERFLOW):
# We missed some INOTIFY events, so we dont
# know the state of any tracked dirs.
self.watch_tree()
self.modified = True
return
path = self.watched_rmap.get(wd, None)
if path is not None:
self.modified = True
if mask & self.CREATE:
# A new sub-directory might have been created, monitor it.
try:
self.add_watch(os.path.join(path, name))
except OSError as e:
if e.errno == errno.ENOENT:
# Deleted before add_watch()
pass
elif e.errno == errno.ENOSPC:
raise DirTooLarge(self.basedir)
else:
raise
def __call__(self):
self.read()
ret = self.modified
self.modified = False
return ret
class DummyTreeWatcher(object):
is_dummy = True
def __init__(self, basedir):
self.basedir = os.path.abspath(basedir)
def __call__(self):
return False
class TreeWatcher(object):
def __init__(self, expire_time=10):
self.watches = {}
self.last_query_times = {}
self.expire_time = expire_time * 60
def watch(self, path, logger=None):
path = os.path.abspath(path)
try:
w = INotifyTreeWatcher(path)
except (INotifyError, DirTooLarge) as e:
if logger is not None:
logger.warn('Failed to watch path: %s with error: %s'%(path, e))
w = DummyTreeWatcher(path)
self.watches[path] = w
return w
def is_actually_watched(self, path):
w = self.watches.get(path, None)
return not getattr(w, 'is_dummy', True)
def expire_old_queries(self):
pop = []
now = monotonic()
for path, lt in self.last_query_times.items():
if now - lt > self.expire_time:
pop.append(path)
for path in pop:
del self.last_query_times[path]
def __call__(self, path, logger=None):
path = os.path.abspath(path)
self.expire_old_queries()
self.last_query_times[path] = monotonic()
w = self.watches.get(path, None)
if w is None:
try:
self.watch(path)
except NoSuchDir:
pass
return True
try:
return w()
except DirTooLarge as e:
if logger is not None:
logger.warn(str(e))
self.watches[path] = DummyTreeWatcher(path)
return False
if __name__ == '__main__':
w = INotifyTreeWatcher(sys.argv[-1])
w()
print ('Monitoring', sys.argv[-1], 'press Ctrl-C to stop')
try:
while True:
if w():
print (sys.argv[-1], 'changed')
sleep(1)
except KeyboardInterrupt:
raise SystemExit(0)

View File

@ -5,6 +5,8 @@ from powerline.lib.vcs import guess
from subprocess import call, PIPE from subprocess import call, PIPE
import os import os
import sys import sys
from unittest.case import SkipTest
from functools import partial
from tests import TestCase from tests import TestCase
@ -36,6 +38,8 @@ class TestLib(TestCase):
self.assertEqual(humanize_bytes(1000000000, si_prefix=True), '1.00 GB') self.assertEqual(humanize_bytes(1000000000, si_prefix=True), '1.00 GB')
self.assertEqual(humanize_bytes(1000000000, si_prefix=False), '953.7 MiB') self.assertEqual(humanize_bytes(1000000000, si_prefix=False), '953.7 MiB')
class TestFilesystemWatchers(TestCase):
def do_test_for_change(self, watcher, path): def do_test_for_change(self, watcher, path):
import time import time
st = time.time() st = time.time()
@ -49,9 +53,7 @@ class TestLib(TestCase):
from powerline.lib.file_watcher import create_file_watcher from powerline.lib.file_watcher import create_file_watcher
w = create_file_watcher(use_stat=False) w = create_file_watcher(use_stat=False)
if w.is_stat_based: if w.is_stat_based:
# The granularity of mtime (1 second) means that we cannot use the raise SkipTest('This test is not suitable for a stat based file watcher')
# same tests for inotify and StatWatch.
return
f1, f2 = os.path.join(INOTIFY_DIR, 'file1'), os.path.join(INOTIFY_DIR, 'file2') f1, f2 = os.path.join(INOTIFY_DIR, 'file1'), os.path.join(INOTIFY_DIR, 'file2')
with open(f1, 'wb'): with open(f1, 'wb'):
with open(f2, 'wb'): with open(f2, 'wb'):
@ -84,6 +86,41 @@ class TestLib(TestCase):
os.unlink(f1) os.unlink(f1)
self.do_test_for_change(w, f1) self.do_test_for_change(w, f1)
def test_tree_watcher(self):
from powerline.lib.tree_watcher import TreeWatcher
tw = TreeWatcher()
subdir = os.path.join(INOTIFY_DIR, 'subdir')
os.mkdir(subdir)
if tw.watch(INOTIFY_DIR).is_dummy:
raise SkipTest('No tree watcher available')
import shutil
self.assertTrue(tw(INOTIFY_DIR))
self.assertFalse(tw(INOTIFY_DIR))
changed = partial(self.do_test_for_change, tw, INOTIFY_DIR)
open(os.path.join(INOTIFY_DIR, 'tree1'), 'w').close()
changed()
open(os.path.join(subdir, 'tree1'), 'w').close()
changed()
os.unlink(os.path.join(subdir, 'tree1'))
changed()
os.rmdir(subdir)
changed()
os.mkdir(subdir)
changed()
os.rename(subdir, subdir+'1')
changed()
shutil.rmtree(subdir+'1')
changed()
os.mkdir(subdir)
f = os.path.join(subdir, 'f')
open(f, 'w').close()
changed()
with open(f, 'a') as s:
s.write(' ')
changed()
os.rename(f, f+'1')
changed()
use_mercurial = use_bzr = sys.version_info < (3, 0) use_mercurial = use_bzr = sys.version_info < (3, 0)