mirror of
https://github.com/powerline/powerline.git
synced 2025-07-26 23:35:04 +02:00
Move watcher tests into a separate file
This commit is contained in:
parent
2faa2a254f
commit
cb99c06027
@ -6,20 +6,16 @@ from powerline.lib.humanize_bytes import humanize_bytes
|
|||||||
from powerline.lib.vcs import guess, get_fallback_create_watcher
|
from powerline.lib.vcs import guess, get_fallback_create_watcher
|
||||||
from powerline.lib.threaded import ThreadedSegment, KwThreadedSegment
|
from powerline.lib.threaded import ThreadedSegment, KwThreadedSegment
|
||||||
from powerline.lib.monotonic import monotonic
|
from powerline.lib.monotonic import monotonic
|
||||||
from powerline.lib.watcher import create_file_watcher, create_tree_watcher, INotifyError
|
|
||||||
from powerline.lib.vcs.git import git_directory
|
from powerline.lib.vcs.git import git_directory
|
||||||
from powerline import get_fallback_logger
|
|
||||||
import threading
|
import threading
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
import re
|
import re
|
||||||
import platform
|
import platform
|
||||||
import shutil
|
|
||||||
from time import sleep
|
from time import sleep
|
||||||
from subprocess import call, PIPE
|
from subprocess import call, PIPE
|
||||||
from functools import partial
|
|
||||||
from tests import TestCase, SkipTest
|
|
||||||
from tests.lib import Pl
|
from tests.lib import Pl
|
||||||
|
from tests import TestCase
|
||||||
|
|
||||||
|
|
||||||
def thread_number():
|
def thread_number():
|
||||||
@ -379,93 +375,6 @@ class TestLib(TestCase):
|
|||||||
self.assertEqual(humanize_bytes(1000000000, si_prefix=False), '953.7 MiB')
|
self.assertEqual(humanize_bytes(1000000000, si_prefix=False), '953.7 MiB')
|
||||||
|
|
||||||
|
|
||||||
class TestFilesystemWatchers(TestCase):
|
|
||||||
def do_test_for_change(self, watcher, path):
|
|
||||||
st = monotonic()
|
|
||||||
while monotonic() - st < 1:
|
|
||||||
if watcher(path):
|
|
||||||
return
|
|
||||||
sleep(0.1)
|
|
||||||
self.fail('The change to {0} was not detected'.format(path))
|
|
||||||
|
|
||||||
def test_file_watcher(self):
|
|
||||||
try:
|
|
||||||
w = create_file_watcher(pl=get_fallback_logger(), watcher_type='inotify')
|
|
||||||
except INotifyError:
|
|
||||||
raise SkipTest('This test is not suitable for a stat based file watcher')
|
|
||||||
f1, f2, f3 = map(lambda x: os.path.join(INOTIFY_DIR, 'file%d' % x), (1, 2, 3))
|
|
||||||
with open(f1, 'wb'):
|
|
||||||
with open(f2, 'wb'):
|
|
||||||
with open(f3, 'wb'):
|
|
||||||
pass
|
|
||||||
ne = os.path.join(INOTIFY_DIR, 'notexists')
|
|
||||||
self.assertRaises(OSError, w, ne)
|
|
||||||
self.assertTrue(w(f1))
|
|
||||||
self.assertTrue(w(f2))
|
|
||||||
os.utime(f1, None), os.utime(f2, None)
|
|
||||||
self.do_test_for_change(w, f1)
|
|
||||||
self.do_test_for_change(w, f2)
|
|
||||||
# Repeat once
|
|
||||||
os.utime(f1, None), os.utime(f2, None)
|
|
||||||
self.do_test_for_change(w, f1)
|
|
||||||
self.do_test_for_change(w, f2)
|
|
||||||
# Check that no false changes are reported
|
|
||||||
self.assertFalse(w(f1), 'Spurious change detected')
|
|
||||||
self.assertFalse(w(f2), 'Spurious change detected')
|
|
||||||
# Check that open the file with 'w' triggers a change
|
|
||||||
with open(f1, 'wb'):
|
|
||||||
with open(f2, 'wb'):
|
|
||||||
pass
|
|
||||||
self.do_test_for_change(w, f1)
|
|
||||||
self.do_test_for_change(w, f2)
|
|
||||||
# Check that writing to a file with 'a' triggers a change
|
|
||||||
with open(f1, 'ab') as f:
|
|
||||||
f.write(b'1')
|
|
||||||
self.do_test_for_change(w, f1)
|
|
||||||
# Check that deleting a file registers as a change
|
|
||||||
os.unlink(f1)
|
|
||||||
self.do_test_for_change(w, f1)
|
|
||||||
# Test that changing the inode of a file does not cause it to stop
|
|
||||||
# being watched
|
|
||||||
os.rename(f3, f2)
|
|
||||||
self.do_test_for_change(w, f2)
|
|
||||||
self.assertFalse(w(f2), 'Spurious change detected')
|
|
||||||
os.utime(f2, None)
|
|
||||||
self.do_test_for_change(w, f2)
|
|
||||||
|
|
||||||
def test_tree_watcher(self):
|
|
||||||
tw = create_tree_watcher(get_fallback_logger())
|
|
||||||
subdir = os.path.join(INOTIFY_DIR, 'subdir')
|
|
||||||
os.mkdir(subdir)
|
|
||||||
if tw.watch(INOTIFY_DIR).is_dummy:
|
|
||||||
raise SkipTest('No tree watcher available')
|
|
||||||
self.assertTrue(tw(INOTIFY_DIR))
|
|
||||||
self.assertFalse(tw(INOTIFY_DIR))
|
|
||||||
changed = partial(self.do_test_for_change, tw, INOTIFY_DIR)
|
|
||||||
open(os.path.join(INOTIFY_DIR, 'tree1'), 'w').close()
|
|
||||||
changed()
|
|
||||||
open(os.path.join(subdir, 'tree1'), 'w').close()
|
|
||||||
changed()
|
|
||||||
os.unlink(os.path.join(subdir, 'tree1'))
|
|
||||||
changed()
|
|
||||||
os.rmdir(subdir)
|
|
||||||
changed()
|
|
||||||
os.mkdir(subdir)
|
|
||||||
changed()
|
|
||||||
os.rename(subdir, subdir + '1')
|
|
||||||
changed()
|
|
||||||
shutil.rmtree(subdir + '1')
|
|
||||||
changed()
|
|
||||||
os.mkdir(subdir)
|
|
||||||
f = os.path.join(subdir, 'f')
|
|
||||||
open(f, 'w').close()
|
|
||||||
changed()
|
|
||||||
with open(f, 'a') as s:
|
|
||||||
s.write(' ')
|
|
||||||
changed()
|
|
||||||
os.rename(f, f + '1')
|
|
||||||
changed()
|
|
||||||
|
|
||||||
use_mercurial = use_bzr = (sys.version_info < (3, 0)
|
use_mercurial = use_bzr = (sys.version_info < (3, 0)
|
||||||
and platform.python_implementation() == 'CPython')
|
and platform.python_implementation() == 'CPython')
|
||||||
|
|
||||||
@ -635,7 +544,6 @@ old_cwd = None
|
|||||||
GIT_REPO = 'git_repo' + os.environ.get('PYTHON', '')
|
GIT_REPO = 'git_repo' + os.environ.get('PYTHON', '')
|
||||||
HG_REPO = 'hg_repo' + os.environ.get('PYTHON', '')
|
HG_REPO = 'hg_repo' + os.environ.get('PYTHON', '')
|
||||||
BZR_REPO = 'bzr_repo' + os.environ.get('PYTHON', '')
|
BZR_REPO = 'bzr_repo' + os.environ.get('PYTHON', '')
|
||||||
INOTIFY_DIR = 'inotify' + os.environ.get('PYTHON', '')
|
|
||||||
|
|
||||||
|
|
||||||
def setUpModule():
|
def setUpModule():
|
||||||
@ -660,13 +568,12 @@ def setUpModule():
|
|||||||
call(['bzr', 'config', 'email=Foo <bar@example.org>'], cwd=BZR_REPO)
|
call(['bzr', 'config', 'email=Foo <bar@example.org>'], cwd=BZR_REPO)
|
||||||
call(['bzr', 'config', 'nickname=test_powerline'], cwd=BZR_REPO)
|
call(['bzr', 'config', 'nickname=test_powerline'], cwd=BZR_REPO)
|
||||||
call(['bzr', 'config', 'create_signatures=0'], cwd=BZR_REPO)
|
call(['bzr', 'config', 'create_signatures=0'], cwd=BZR_REPO)
|
||||||
os.mkdir(INOTIFY_DIR)
|
|
||||||
|
|
||||||
|
|
||||||
def tearDownModule():
|
def tearDownModule():
|
||||||
global old_cwd
|
global old_cwd
|
||||||
global old_HGRCPATH
|
global old_HGRCPATH
|
||||||
for repo_dir in [INOTIFY_DIR, GIT_REPO] + ([HG_REPO] if use_mercurial else []) + ([BZR_REPO] if use_bzr else []):
|
for repo_dir in [GIT_REPO] + ([HG_REPO] if use_mercurial else []) + ([BZR_REPO] if use_bzr else []):
|
||||||
for root, dirs, files in list(os.walk(repo_dir, topdown=False)):
|
for root, dirs, files in list(os.walk(repo_dir, topdown=False)):
|
||||||
for file in files:
|
for file in files:
|
||||||
os.remove(os.path.join(root, file))
|
os.remove(os.path.join(root, file))
|
||||||
|
130
tests/test_watcher.py
Normal file
130
tests/test_watcher.py
Normal file
@ -0,0 +1,130 @@
|
|||||||
|
# vim:fileencoding=utf-8:noet
|
||||||
|
from __future__ import absolute_import, unicode_literals, print_function, division
|
||||||
|
|
||||||
|
from powerline.lib.watcher import create_file_watcher, create_tree_watcher, INotifyError
|
||||||
|
from powerline import get_fallback_logger
|
||||||
|
from powerline.lib.monotonic import monotonic
|
||||||
|
|
||||||
|
import shutil
|
||||||
|
from time import sleep
|
||||||
|
from functools import partial
|
||||||
|
import os
|
||||||
|
|
||||||
|
from tests import TestCase, SkipTest
|
||||||
|
|
||||||
|
|
||||||
|
INOTIFY_DIR = 'inotify' + os.environ.get('PYTHON', '')
|
||||||
|
|
||||||
|
|
||||||
|
class TestFilesystemWatchers(TestCase):
|
||||||
|
def do_test_for_change(self, watcher, path):
|
||||||
|
st = monotonic()
|
||||||
|
while monotonic() - st < 1:
|
||||||
|
if watcher(path):
|
||||||
|
return
|
||||||
|
sleep(0.1)
|
||||||
|
self.fail('The change to {0} was not detected'.format(path))
|
||||||
|
|
||||||
|
def test_file_watcher(self):
|
||||||
|
try:
|
||||||
|
w = create_file_watcher(pl=get_fallback_logger(), watcher_type='inotify')
|
||||||
|
except INotifyError:
|
||||||
|
raise SkipTest('This test is not suitable for a stat based file watcher')
|
||||||
|
f1, f2, f3 = map(lambda x: os.path.join(INOTIFY_DIR, 'file%d' % x), (1, 2, 3))
|
||||||
|
with open(f1, 'wb'):
|
||||||
|
with open(f2, 'wb'):
|
||||||
|
with open(f3, 'wb'):
|
||||||
|
pass
|
||||||
|
ne = os.path.join(INOTIFY_DIR, 'notexists')
|
||||||
|
self.assertRaises(OSError, w, ne)
|
||||||
|
self.assertTrue(w(f1))
|
||||||
|
self.assertTrue(w(f2))
|
||||||
|
os.utime(f1, None), os.utime(f2, None)
|
||||||
|
self.do_test_for_change(w, f1)
|
||||||
|
self.do_test_for_change(w, f2)
|
||||||
|
# Repeat once
|
||||||
|
os.utime(f1, None), os.utime(f2, None)
|
||||||
|
self.do_test_for_change(w, f1)
|
||||||
|
self.do_test_for_change(w, f2)
|
||||||
|
# Check that no false changes are reported
|
||||||
|
self.assertFalse(w(f1), 'Spurious change detected')
|
||||||
|
self.assertFalse(w(f2), 'Spurious change detected')
|
||||||
|
# Check that open the file with 'w' triggers a change
|
||||||
|
with open(f1, 'wb'):
|
||||||
|
with open(f2, 'wb'):
|
||||||
|
pass
|
||||||
|
self.do_test_for_change(w, f1)
|
||||||
|
self.do_test_for_change(w, f2)
|
||||||
|
# Check that writing to a file with 'a' triggers a change
|
||||||
|
with open(f1, 'ab') as f:
|
||||||
|
f.write(b'1')
|
||||||
|
self.do_test_for_change(w, f1)
|
||||||
|
# Check that deleting a file registers as a change
|
||||||
|
os.unlink(f1)
|
||||||
|
self.do_test_for_change(w, f1)
|
||||||
|
# Test that changing the inode of a file does not cause it to stop
|
||||||
|
# being watched
|
||||||
|
os.rename(f3, f2)
|
||||||
|
self.do_test_for_change(w, f2)
|
||||||
|
self.assertFalse(w(f2), 'Spurious change detected')
|
||||||
|
os.utime(f2, None)
|
||||||
|
self.do_test_for_change(w, f2)
|
||||||
|
|
||||||
|
def test_tree_watcher(self):
|
||||||
|
tw = create_tree_watcher(get_fallback_logger())
|
||||||
|
subdir = os.path.join(INOTIFY_DIR, 'subdir')
|
||||||
|
os.mkdir(subdir)
|
||||||
|
if tw.watch(INOTIFY_DIR).is_dummy:
|
||||||
|
raise SkipTest('No tree watcher available')
|
||||||
|
self.assertTrue(tw(INOTIFY_DIR))
|
||||||
|
self.assertFalse(tw(INOTIFY_DIR))
|
||||||
|
changed = partial(self.do_test_for_change, tw, INOTIFY_DIR)
|
||||||
|
open(os.path.join(INOTIFY_DIR, 'tree1'), 'w').close()
|
||||||
|
changed()
|
||||||
|
open(os.path.join(subdir, 'tree1'), 'w').close()
|
||||||
|
changed()
|
||||||
|
os.unlink(os.path.join(subdir, 'tree1'))
|
||||||
|
changed()
|
||||||
|
os.rmdir(subdir)
|
||||||
|
changed()
|
||||||
|
os.mkdir(subdir)
|
||||||
|
changed()
|
||||||
|
os.rename(subdir, subdir + '1')
|
||||||
|
changed()
|
||||||
|
shutil.rmtree(subdir + '1')
|
||||||
|
changed()
|
||||||
|
os.mkdir(subdir)
|
||||||
|
f = os.path.join(subdir, 'f')
|
||||||
|
open(f, 'w').close()
|
||||||
|
changed()
|
||||||
|
with open(f, 'a') as s:
|
||||||
|
s.write(' ')
|
||||||
|
changed()
|
||||||
|
os.rename(f, f + '1')
|
||||||
|
changed()
|
||||||
|
|
||||||
|
|
||||||
|
old_cwd = None
|
||||||
|
|
||||||
|
|
||||||
|
def setUpModule():
|
||||||
|
global old_cwd
|
||||||
|
old_cwd = os.getcwd()
|
||||||
|
os.chdir(os.path.dirname(__file__))
|
||||||
|
os.mkdir(INOTIFY_DIR)
|
||||||
|
|
||||||
|
|
||||||
|
def tearDownModule():
|
||||||
|
for d in [INOTIFY_DIR]:
|
||||||
|
for root, dirs, files in list(os.walk(d, topdown=False)):
|
||||||
|
for file in files:
|
||||||
|
os.remove(os.path.join(root, file))
|
||||||
|
for dir in dirs:
|
||||||
|
os.rmdir(os.path.join(root, dir))
|
||||||
|
os.rmdir(d)
|
||||||
|
os.chdir(old_cwd)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
from tests import main
|
||||||
|
main()
|
Loading…
x
Reference in New Issue
Block a user