mirror of
https://github.com/powerline/powerline.git
synced 2025-10-16 21:29:47 +02:00
If you try to checkout the already current branch in git, git creates HEAD.lock and renames it to HEAD. This causes the inode of HEAD to change and so the inotify file watcher stops tracking HEAD. The fix is to re-create the inotify watch when the file attributes change. This is a bit of a performance penalty as most of the time the attribute changes are simple last modified time/size changes, but since inotify provides no way to know specifically when the inode has changed, this is the best we can do.
331 lines
11 KiB
Python
331 lines
11 KiB
Python
# vim:fileencoding=utf-8:noet
|
|
from powerline.lib import mergedicts, add_divider_highlight_group
|
|
from powerline.lib.humanize_bytes import humanize_bytes
|
|
from powerline.lib.vcs import guess
|
|
from subprocess import call, PIPE
|
|
import os
|
|
import sys
|
|
import re
|
|
from functools import partial
|
|
from tests import TestCase, SkipTest
|
|
|
|
|
|
class TestLib(TestCase):
|
|
def test_mergedicts(self):
|
|
d = {}
|
|
mergedicts(d, {'abc': {'def': 'ghi'}})
|
|
self.assertEqual(d, {'abc': {'def': 'ghi'}})
|
|
mergedicts(d, {'abc': {'def': {'ghi': 'jkl'}}})
|
|
self.assertEqual(d, {'abc': {'def': {'ghi': 'jkl'}}})
|
|
mergedicts(d, {})
|
|
self.assertEqual(d, {'abc': {'def': {'ghi': 'jkl'}}})
|
|
mergedicts(d, {'abc': {'mno': 'pqr'}})
|
|
self.assertEqual(d, {'abc': {'def': {'ghi': 'jkl'}, 'mno': 'pqr'}})
|
|
|
|
def test_add_divider_highlight_group(self):
|
|
def decorated_function_name(**kwargs):
|
|
return str(kwargs)
|
|
func = add_divider_highlight_group('hl_group')(decorated_function_name)
|
|
self.assertEqual(func.__name__, 'decorated_function_name')
|
|
self.assertEqual(func(kw={}), [{'contents': repr({'kw': {}}), 'divider_highlight_group': 'hl_group'}])
|
|
|
|
def test_humanize_bytes(self):
|
|
self.assertEqual(humanize_bytes(0), '0 B')
|
|
self.assertEqual(humanize_bytes(1), '1 B')
|
|
self.assertEqual(humanize_bytes(1, suffix='bit'), '1 bit')
|
|
self.assertEqual(humanize_bytes(1000, si_prefix=True), '1 kB')
|
|
self.assertEqual(humanize_bytes(1024, si_prefix=True), '1 kB')
|
|
self.assertEqual(humanize_bytes(1000000000, si_prefix=True), '1.00 GB')
|
|
self.assertEqual(humanize_bytes(1000000000, si_prefix=False), '953.7 MiB')
|
|
|
|
|
|
class TestFilesystemWatchers(TestCase):
|
|
def do_test_for_change(self, watcher, path):
|
|
import time
|
|
st = time.time()
|
|
while time.time() - st < 1:
|
|
if watcher(path):
|
|
return
|
|
time.sleep(0.1)
|
|
self.fail('The change to {0} was not detected'.format(path))
|
|
|
|
def test_file_watcher(self):
|
|
from powerline.lib.file_watcher import create_file_watcher
|
|
w = create_file_watcher(use_stat=False)
|
|
if w.is_stat_based:
|
|
raise SkipTest('This test is not suitable for a stat based file watcher')
|
|
f1, f2, f3 = map(lambda x: os.path.join(INOTIFY_DIR, 'file%d' % x), (1, 2, 3))
|
|
with open(f1, 'wb'):
|
|
with open(f2, 'wb'):
|
|
with open(f3, 'wb'):
|
|
pass
|
|
ne = os.path.join(INOTIFY_DIR, 'notexists')
|
|
self.assertRaises(OSError, w, ne)
|
|
self.assertTrue(w(f1))
|
|
self.assertTrue(w(f2))
|
|
os.utime(f1, None), os.utime(f2, None)
|
|
self.do_test_for_change(w, f1)
|
|
self.do_test_for_change(w, f2)
|
|
# Repeat once
|
|
os.utime(f1, None), os.utime(f2, None)
|
|
self.do_test_for_change(w, f1)
|
|
self.do_test_for_change(w, f2)
|
|
# Check that no false changes are reported
|
|
self.assertFalse(w(f1), 'Spurious change detected')
|
|
self.assertFalse(w(f2), 'Spurious change detected')
|
|
# Check that open the file with 'w' triggers a change
|
|
with open(f1, 'wb'):
|
|
with open(f2, 'wb'):
|
|
pass
|
|
self.do_test_for_change(w, f1)
|
|
self.do_test_for_change(w, f2)
|
|
# Check that writing to a file with 'a' triggers a change
|
|
with open(f1, 'ab') as f:
|
|
f.write(b'1')
|
|
self.do_test_for_change(w, f1)
|
|
# Check that deleting a file registers as a change
|
|
os.unlink(f1)
|
|
self.do_test_for_change(w, f1)
|
|
# Test that changing the inode of a file does not cause it to stop
|
|
# being watched
|
|
os.rename(f3, f2)
|
|
self.do_test_for_change(w, f2)
|
|
self.assertFalse(w(f2), 'Spurious change detected')
|
|
os.utime(f2, None)
|
|
self.do_test_for_change(w, f2)
|
|
|
|
def test_tree_watcher(self):
|
|
from powerline.lib.tree_watcher import TreeWatcher
|
|
tw = TreeWatcher()
|
|
subdir = os.path.join(INOTIFY_DIR, 'subdir')
|
|
os.mkdir(subdir)
|
|
if tw.watch(INOTIFY_DIR).is_dummy:
|
|
raise SkipTest('No tree watcher available')
|
|
import shutil
|
|
self.assertTrue(tw(INOTIFY_DIR))
|
|
self.assertFalse(tw(INOTIFY_DIR))
|
|
changed = partial(self.do_test_for_change, tw, INOTIFY_DIR)
|
|
open(os.path.join(INOTIFY_DIR, 'tree1'), 'w').close()
|
|
changed()
|
|
open(os.path.join(subdir, 'tree1'), 'w').close()
|
|
changed()
|
|
os.unlink(os.path.join(subdir, 'tree1'))
|
|
changed()
|
|
os.rmdir(subdir)
|
|
changed()
|
|
os.mkdir(subdir)
|
|
changed()
|
|
os.rename(subdir, subdir + '1')
|
|
changed()
|
|
shutil.rmtree(subdir + '1')
|
|
changed()
|
|
os.mkdir(subdir)
|
|
f = os.path.join(subdir, 'f')
|
|
open(f, 'w').close()
|
|
changed()
|
|
with open(f, 'a') as s:
|
|
s.write(' ')
|
|
changed()
|
|
os.rename(f, f + '1')
|
|
changed()
|
|
|
|
use_mercurial = use_bzr = sys.version_info < (3, 0)
|
|
|
|
|
|
class TestVCS(TestCase):
|
|
def do_branch_rename_test(self, repo, q):
|
|
import time
|
|
st = time.time()
|
|
while time.time() - st < 1:
|
|
# Give inotify time to deliver events
|
|
ans = repo.branch()
|
|
if hasattr(q, '__call__'):
|
|
if q(ans):
|
|
break
|
|
else:
|
|
if ans == q:
|
|
break
|
|
time.sleep(0.01)
|
|
if hasattr(q, '__call__'):
|
|
self.assertTrue(q(ans))
|
|
else:
|
|
self.assertEqual(ans, q)
|
|
|
|
def test_git(self):
|
|
repo = guess(path=GIT_REPO)
|
|
self.assertNotEqual(repo, None)
|
|
self.assertEqual(repo.branch(), 'master')
|
|
self.assertEqual(repo.status(), None)
|
|
self.assertEqual(repo.status('file'), None)
|
|
with open(os.path.join(GIT_REPO, 'file'), 'w') as f:
|
|
f.write('abc')
|
|
f.flush()
|
|
self.assertEqual(repo.status(), ' U')
|
|
self.assertEqual(repo.status('file'), '??')
|
|
call(['git', 'add', '.'], cwd=GIT_REPO)
|
|
self.assertEqual(repo.status(), ' I ')
|
|
self.assertEqual(repo.status('file'), 'A ')
|
|
f.write('def')
|
|
f.flush()
|
|
self.assertEqual(repo.status(), 'DI ')
|
|
self.assertEqual(repo.status('file'), 'AM')
|
|
os.remove(os.path.join(GIT_REPO, 'file'))
|
|
# Test changing branch
|
|
self.assertEqual(repo.branch(), 'master')
|
|
call(['git', 'branch', 'branch1'], cwd=GIT_REPO)
|
|
call(['git', 'checkout', '-q', 'branch1'], cwd=GIT_REPO)
|
|
self.do_branch_rename_test(repo, 'branch1')
|
|
# For some reason the rest of this test fails on travis and only on
|
|
# travis, and I can't figure out why
|
|
if 'TRAVIS' in os.environ:
|
|
raise SkipTest('Part of this test fails on Travis for unknown reasons')
|
|
call(['git', 'branch', 'branch2'], cwd=GIT_REPO)
|
|
call(['git', 'checkout', '-q', 'branch2'], cwd=GIT_REPO)
|
|
self.do_branch_rename_test(repo, 'branch2')
|
|
call(['git', 'checkout', '-q', '--detach', 'branch1'], cwd=GIT_REPO)
|
|
self.do_branch_rename_test(repo, lambda b: re.match(r'^[a-f0-9]+$', b))
|
|
|
|
if use_mercurial:
|
|
def test_mercurial(self):
|
|
repo = guess(path=HG_REPO)
|
|
self.assertNotEqual(repo, None)
|
|
self.assertEqual(repo.branch(), 'default')
|
|
self.assertEqual(repo.status(), None)
|
|
with open(os.path.join(HG_REPO, 'file'), 'w') as f:
|
|
f.write('abc')
|
|
f.flush()
|
|
self.assertEqual(repo.status(), ' U')
|
|
self.assertEqual(repo.status('file'), 'U')
|
|
call(['hg', 'add', '.'], cwd=HG_REPO, stdout=PIPE)
|
|
self.assertEqual(repo.status(), 'D ')
|
|
self.assertEqual(repo.status('file'), 'A')
|
|
os.remove(os.path.join(HG_REPO, 'file'))
|
|
|
|
if use_bzr:
|
|
def test_bzr(self):
|
|
repo = guess(path=BZR_REPO)
|
|
self.assertNotEqual(repo, None, 'No bzr repo found. Do you have bzr installed?')
|
|
self.assertEqual(repo.branch(), 'test_powerline')
|
|
self.assertEqual(repo.status(), None)
|
|
with open(os.path.join(BZR_REPO, 'file'), 'w') as f:
|
|
f.write('abc')
|
|
self.assertEqual(repo.status(), ' U')
|
|
self.assertEqual(repo.status('file'), '? ')
|
|
call(['bzr', 'add', '-q', '.'], cwd=BZR_REPO, stdout=PIPE)
|
|
self.assertEqual(repo.status(), 'D ')
|
|
self.assertEqual(repo.status('file'), '+N')
|
|
call(['bzr', 'commit', '-q', '-m', 'initial commit'], cwd=BZR_REPO)
|
|
self.assertEqual(repo.status(), None)
|
|
with open(os.path.join(BZR_REPO, 'file'), 'w') as f:
|
|
f.write('def')
|
|
self.assertEqual(repo.status(), 'D ')
|
|
self.assertEqual(repo.status('file'), ' M')
|
|
self.assertEqual(repo.status('notexist'), None)
|
|
with open(os.path.join(BZR_REPO, 'ignored'), 'w') as f:
|
|
f.write('abc')
|
|
self.assertEqual(repo.status('ignored'), '? ')
|
|
# Test changing the .bzrignore file should update status
|
|
with open(os.path.join(BZR_REPO, '.bzrignore'), 'w') as f:
|
|
f.write('ignored')
|
|
self.assertEqual(repo.status('ignored'), None)
|
|
# Test changing the dirstate file should invalidate the cache for
|
|
# all files in the repo
|
|
with open(os.path.join(BZR_REPO, 'file2'), 'w') as f:
|
|
f.write('abc')
|
|
call(['bzr', 'add', 'file2'], cwd=BZR_REPO, stdout=PIPE)
|
|
call(['bzr', 'commit', '-q', '-m', 'file2 added'], cwd=BZR_REPO)
|
|
with open(os.path.join(BZR_REPO, 'file'), 'a') as f:
|
|
f.write('hello')
|
|
with open(os.path.join(BZR_REPO, 'file2'), 'a') as f:
|
|
f.write('hello')
|
|
self.assertEqual(repo.status('file'), ' M')
|
|
self.assertEqual(repo.status('file2'), ' M')
|
|
call(['bzr', 'commit', '-q', '-m', 'multi'], cwd=BZR_REPO)
|
|
self.assertEqual(repo.status('file'), None)
|
|
self.assertEqual(repo.status('file2'), None)
|
|
|
|
# Test changing branch
|
|
call(['bzr', 'nick', 'branch1'], cwd=BZR_REPO, stdout=PIPE, stderr=PIPE)
|
|
self.do_branch_rename_test(repo, 'branch1')
|
|
|
|
# Test branch name/status changes when swapping repos
|
|
for x in ('b1', 'b2'):
|
|
d = os.path.join(BZR_REPO, x)
|
|
os.mkdir(d)
|
|
call(['bzr', 'init', '-q'], cwd=d)
|
|
call(['bzr', 'nick', '-q', x], cwd=d)
|
|
repo = guess(path=d)
|
|
self.assertEqual(repo.branch(), x)
|
|
self.assertFalse(repo.status())
|
|
if x == 'b1':
|
|
open(os.path.join(d, 'dirty'), 'w').close()
|
|
self.assertTrue(repo.status())
|
|
os.rename(os.path.join(BZR_REPO, 'b1'), os.path.join(BZR_REPO, 'b'))
|
|
os.rename(os.path.join(BZR_REPO, 'b2'), os.path.join(BZR_REPO, 'b1'))
|
|
os.rename(os.path.join(BZR_REPO, 'b'), os.path.join(BZR_REPO, 'b2'))
|
|
for x, y in (('b1', 'b2'), ('b2', 'b1')):
|
|
d = os.path.join(BZR_REPO, x)
|
|
repo = guess(path=d)
|
|
self.do_branch_rename_test(repo, y)
|
|
if x == 'b1':
|
|
self.assertFalse(repo.status())
|
|
else:
|
|
self.assertTrue(repo.status())
|
|
|
|
old_HGRCPATH = None
|
|
old_cwd = None
|
|
|
|
|
|
GIT_REPO = 'git_repo' + os.environ.get('PYTHON', '')
|
|
HG_REPO = 'hg_repo' + os.environ.get('PYTHON', '')
|
|
BZR_REPO = 'bzr_repo' + os.environ.get('PYTHON', '')
|
|
INOTIFY_DIR = 'inotify' + os.environ.get('PYTHON', '')
|
|
|
|
|
|
def setUpModule():
|
|
global old_cwd
|
|
global old_HGRCPATH
|
|
old_cwd = os.getcwd()
|
|
os.chdir(os.path.dirname(__file__))
|
|
call(['git', 'init', '--quiet', GIT_REPO])
|
|
assert os.path.isdir(GIT_REPO)
|
|
call(['git', 'config', '--local', 'user.name', 'Foo'], cwd=GIT_REPO)
|
|
call(['git', 'config', '--local', 'user.email', 'bar@example.org'], cwd=GIT_REPO)
|
|
call(['git', 'commit', '--allow-empty', '--message', 'Initial commit', '--quiet'], cwd=GIT_REPO)
|
|
if use_mercurial:
|
|
old_HGRCPATH = os.environ.get('HGRCPATH')
|
|
os.environ['HGRCPATH'] = ''
|
|
call(['hg', 'init', HG_REPO])
|
|
with open(os.path.join(HG_REPO, '.hg', 'hgrc'), 'w') as hgrc:
|
|
hgrc.write('[ui]\n')
|
|
hgrc.write('username = Foo <bar@example.org>\n')
|
|
if use_bzr:
|
|
call(['bzr', 'init', '--quiet', BZR_REPO])
|
|
call(['bzr', 'config', 'email=Foo <bar@example.org>'], cwd=BZR_REPO)
|
|
call(['bzr', 'config', 'nickname=test_powerline'], cwd=BZR_REPO)
|
|
call(['bzr', 'config', 'create_signatures=0'], cwd=BZR_REPO)
|
|
os.mkdir(INOTIFY_DIR)
|
|
|
|
|
|
def tearDownModule():
|
|
global old_cwd
|
|
global old_HGRCPATH
|
|
for repo_dir in [INOTIFY_DIR, GIT_REPO] + ([HG_REPO] if use_mercurial else []) + ([BZR_REPO] if use_bzr else []):
|
|
for root, dirs, files in list(os.walk(repo_dir, topdown=False)):
|
|
for file in files:
|
|
os.remove(os.path.join(root, file))
|
|
for dir in dirs:
|
|
os.rmdir(os.path.join(root, dir))
|
|
os.rmdir(repo_dir)
|
|
if use_mercurial:
|
|
if old_HGRCPATH is None:
|
|
os.environ.pop('HGRCPATH')
|
|
else:
|
|
os.environ['HGRCPATH'] = old_HGRCPATH
|
|
os.chdir(old_cwd)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
from tests import main
|
|
main()
|