Also take tests from @kovidgoyal develop branch
This commit is contained in:
parent
47eef9860f
commit
63e2942929
|
@ -36,6 +36,53 @@ class TestLib(TestCase):
|
|||
self.assertEqual(humanize_bytes(1000000000, si_prefix=True), '1.00 GB')
|
||||
self.assertEqual(humanize_bytes(1000000000, si_prefix=False), '953.7 MiB')
|
||||
|
||||
def do_test_for_change(self, watcher, path):
|
||||
import time
|
||||
st = time.time()
|
||||
while time.time() - st < 1:
|
||||
if watcher(path):
|
||||
return
|
||||
time.sleep(0.1)
|
||||
self.fail('The change to %s was not detected'%path)
|
||||
|
||||
def test_file_watcher(self):
|
||||
from powerline.lib.file_watcher import create_file_watcher
|
||||
w = create_file_watcher(use_stat=False)
|
||||
if w.is_stat_based:
|
||||
# The granularity of mtime (1 second) means that we cannot use the
|
||||
# same tests for inotify and StatWatch.
|
||||
return
|
||||
f1, f2 = os.path.join(INOTIFY_DIR, 'file1'), os.path.join(INOTIFY_DIR, 'file2')
|
||||
with open(f1, 'wb'):
|
||||
with open(f2, 'wb'):
|
||||
pass
|
||||
ne = os.path.join(INOTIFY_DIR, 'notexists')
|
||||
self.assertRaises(OSError, w, ne)
|
||||
self.assertTrue(w(f1))
|
||||
self.assertTrue(w(f2))
|
||||
os.utime(f1, None), os.utime(f2, None)
|
||||
self.do_test_for_change(w, f1)
|
||||
self.do_test_for_change(w, f2)
|
||||
# Repeat once
|
||||
os.utime(f1, None), os.utime(f2, None)
|
||||
self.do_test_for_change(w, f1)
|
||||
self.do_test_for_change(w, f2)
|
||||
# Check that no false changes are reported
|
||||
self.assertFalse(w(f1), 'Spurious change detected')
|
||||
self.assertFalse(w(f2), 'Spurious change detected')
|
||||
# Check that open the file with 'w' triggers a change
|
||||
with open(f1, 'wb'):
|
||||
with open(f2, 'wb'):
|
||||
pass
|
||||
self.do_test_for_change(w, f1)
|
||||
self.do_test_for_change(w, f2)
|
||||
# Check that writing to a file with 'a' triggers a change
|
||||
with open(f1, 'ab') as f:
|
||||
f.write(b'1')
|
||||
self.do_test_for_change(w, f1)
|
||||
# Check that deleting a file registers as a change
|
||||
os.unlink(f1)
|
||||
self.do_test_for_change(w, f1)
|
||||
|
||||
use_mercurial = use_bzr = sys.version_info < (3, 0)
|
||||
|
||||
|
@ -106,7 +153,7 @@ old_cwd = None
|
|||
GIT_REPO = 'git_repo' + os.environ.get('PYTHON', '')
|
||||
HG_REPO = 'hg_repo' + os.environ.get('PYTHON', '')
|
||||
BZR_REPO = 'bzr_repo' + os.environ.get('PYTHON', '')
|
||||
|
||||
INOTIFY_DIR = 'inotify' + os.environ.get('PYTHON', '')
|
||||
|
||||
def setUpModule():
|
||||
global old_cwd
|
||||
|
@ -130,12 +177,14 @@ def setUpModule():
|
|||
call(['bzr', 'config', 'email=Foo <bar@example.org>'], cwd=BZR_REPO)
|
||||
call(['bzr', 'config', 'nickname=test_powerline'], cwd=BZR_REPO)
|
||||
call(['bzr', 'config', 'create_signatures=0'], cwd=BZR_REPO)
|
||||
os.mkdir(INOTIFY_DIR)
|
||||
|
||||
|
||||
|
||||
def tearDownModule():
|
||||
global old_cwd
|
||||
global old_HGRCPATH
|
||||
for repo_dir in [GIT_REPO] + ([HG_REPO] if use_mercurial else []) + ([BZR_REPO] if use_bzr else []):
|
||||
for repo_dir in [INOTIFY_DIR, GIT_REPO] + ([HG_REPO] if use_mercurial else []) + ([BZR_REPO] if use_bzr else []):
|
||||
for root, dirs, files in list(os.walk(repo_dir, topdown=False)):
|
||||
for file in files:
|
||||
os.remove(os.path.join(root, file))
|
||||
|
|
Loading…
Reference in New Issue