diff --git a/tests/test_watcher.py b/tests/test_watcher.py index 356790b6..64681d96 100644 --- a/tests/test_watcher.py +++ b/tests/test_watcher.py @@ -2,6 +2,7 @@ from __future__ import absolute_import, unicode_literals, print_function, division from powerline.lib.watcher import create_file_watcher, create_tree_watcher, INotifyError +from powerline.lib.watcher.uv import UvNotFound from powerline import get_fallback_logger from powerline.lib.monotonic import monotonic @@ -16,6 +17,14 @@ from tests import TestCase, SkipTest INOTIFY_DIR = 'inotify' + os.environ.get('PYTHON', '') +def clear_dir(dir): + for root, dirs, files in list(os.walk(dir, topdown=False)): + for f in files: + os.remove(os.path.join(root, f)) + for d in dirs: + os.rmdir(os.path.join(root, d)) + + class TestFilesystemWatchers(TestCase): def do_test_for_change(self, watcher, path): st = monotonic() @@ -30,78 +39,104 @@ class TestFilesystemWatchers(TestCase): w = create_file_watcher(pl=get_fallback_logger(), watcher_type='inotify') except INotifyError: raise SkipTest('This test is not suitable for a stat based file watcher') - f1, f2, f3 = map(lambda x: os.path.join(INOTIFY_DIR, 'file%d' % x), (1, 2, 3)) - with open(f1, 'wb'): - with open(f2, 'wb'): - with open(f3, 'wb'): + return self.do_test_file_watcher(w) + + def do_test_file_watcher(self, w): + try: + f1, f2, f3 = map(lambda x: os.path.join(INOTIFY_DIR, 'file%d' % x), (1, 2, 3)) + with open(f1, 'wb'): + with open(f2, 'wb'): + with open(f3, 'wb'): + pass + ne = os.path.join(INOTIFY_DIR, 'notexists') + self.assertRaises(OSError, w, ne) + self.assertTrue(w(f1)) + self.assertTrue(w(f2)) + os.utime(f1, None), os.utime(f2, None) + self.do_test_for_change(w, f1) + self.do_test_for_change(w, f2) + # Repeat once + os.utime(f1, None), os.utime(f2, None) + self.do_test_for_change(w, f1) + self.do_test_for_change(w, f2) + # Check that no false changes are reported + self.assertFalse(w(f1), 'Spurious change detected') + self.assertFalse(w(f2), 'Spurious change detected') + # Check that open the file with 'w' triggers a change + with open(f1, 'wb'): + with open(f2, 'wb'): pass - ne = os.path.join(INOTIFY_DIR, 'notexists') - self.assertRaises(OSError, w, ne) - self.assertTrue(w(f1)) - self.assertTrue(w(f2)) - os.utime(f1, None), os.utime(f2, None) - self.do_test_for_change(w, f1) - self.do_test_for_change(w, f2) - # Repeat once - os.utime(f1, None), os.utime(f2, None) - self.do_test_for_change(w, f1) - self.do_test_for_change(w, f2) - # Check that no false changes are reported - self.assertFalse(w(f1), 'Spurious change detected') - self.assertFalse(w(f2), 'Spurious change detected') - # Check that open the file with 'w' triggers a change - with open(f1, 'wb'): - with open(f2, 'wb'): - pass - self.do_test_for_change(w, f1) - self.do_test_for_change(w, f2) - # Check that writing to a file with 'a' triggers a change - with open(f1, 'ab') as f: - f.write(b'1') - self.do_test_for_change(w, f1) - # Check that deleting a file registers as a change - os.unlink(f1) - self.do_test_for_change(w, f1) - # Test that changing the inode of a file does not cause it to stop - # being watched - os.rename(f3, f2) - self.do_test_for_change(w, f2) - self.assertFalse(w(f2), 'Spurious change detected') - os.utime(f2, None) - self.do_test_for_change(w, f2) + self.do_test_for_change(w, f1) + self.do_test_for_change(w, f2) + # Check that writing to a file with 'a' triggers a change + with open(f1, 'ab') as f: + f.write(b'1') + self.do_test_for_change(w, f1) + # Check that deleting a file registers as a change + os.unlink(f1) + self.do_test_for_change(w, f1) + # Test that changing the inode of a file does not cause it to stop + # being watched + os.rename(f3, f2) + self.do_test_for_change(w, f2) + self.assertFalse(w(f2), 'Spurious change detected') + os.utime(f2, None) + self.do_test_for_change(w, f2) + finally: + clear_dir(INOTIFY_DIR) + + def test_uv_file_watcher(self): + try: + w = create_file_watcher(pl=get_fallback_logger(), watcher_type='uv') + except UvNotFound: + raise SkipTest('Pyuv is not available') + return self.do_test_file_watcher(w) def test_tree_watcher(self): tw = create_tree_watcher(get_fallback_logger()) - subdir = os.path.join(INOTIFY_DIR, 'subdir') - os.mkdir(subdir) - if tw.watch(INOTIFY_DIR).is_dummy: - raise SkipTest('No tree watcher available') - self.assertTrue(tw(INOTIFY_DIR)) - self.assertFalse(tw(INOTIFY_DIR)) - changed = partial(self.do_test_for_change, tw, INOTIFY_DIR) - open(os.path.join(INOTIFY_DIR, 'tree1'), 'w').close() - changed() - open(os.path.join(subdir, 'tree1'), 'w').close() - changed() - os.unlink(os.path.join(subdir, 'tree1')) - changed() - os.rmdir(subdir) - changed() - os.mkdir(subdir) - changed() - os.rename(subdir, subdir + '1') - changed() - shutil.rmtree(subdir + '1') - changed() - os.mkdir(subdir) - f = os.path.join(subdir, 'f') - open(f, 'w').close() - changed() - with open(f, 'a') as s: - s.write(' ') - changed() - os.rename(f, f + '1') - changed() + return self.do_test_tree_watcher(tw) + + def do_test_tree_watcher(self, tw): + try: + subdir = os.path.join(INOTIFY_DIR, 'subdir') + os.mkdir(subdir) + try: + if tw.watch(INOTIFY_DIR).is_dummy: + raise SkipTest('No tree watcher available') + except UvNotFound: + raise SkipTest('Pyuv is not available') + self.assertTrue(tw(INOTIFY_DIR)) + self.assertFalse(tw(INOTIFY_DIR)) + changed = partial(self.do_test_for_change, tw, INOTIFY_DIR) + open(os.path.join(INOTIFY_DIR, 'tree1'), 'w').close() + changed() + open(os.path.join(subdir, 'tree1'), 'w').close() + changed() + os.unlink(os.path.join(subdir, 'tree1')) + changed() + os.rmdir(subdir) + changed() + os.mkdir(subdir) + changed() + os.rename(subdir, subdir + '1') + changed() + shutil.rmtree(subdir + '1') + changed() + os.mkdir(subdir) + f = os.path.join(subdir, 'f') + open(f, 'w').close() + changed() + with open(f, 'a') as s: + s.write(' ') + changed() + os.rename(f, f + '1') + changed() + finally: + clear_dir(INOTIFY_DIR) + + def test_uv_tree_watcher(self): + tw = create_tree_watcher(get_fallback_logger(), 'uv') + return self.do_test_tree_watcher(tw) old_cwd = None @@ -116,11 +151,7 @@ def setUpModule(): def tearDownModule(): for d in [INOTIFY_DIR]: - for root, dirs, files in list(os.walk(d, topdown=False)): - for file in files: - os.remove(os.path.join(root, file)) - for dir in dirs: - os.rmdir(os.path.join(root, dir)) + clear_dir(d) os.rmdir(d) os.chdir(old_cwd)