powerline/tests/test_lib.py

323 lines
11 KiB
Python

# vim:fileencoding=utf-8:noet
from powerline.lib import mergedicts, add_divider_highlight_group
from powerline.lib.humanize_bytes import humanize_bytes
from powerline.lib.vcs import guess
from subprocess import call, PIPE
import os
import sys
import re
from functools import partial
from tests import TestCase, SkipTest
class TestLib(TestCase):
def test_mergedicts(self):
d = {}
mergedicts(d, {'abc': {'def': 'ghi'}})
self.assertEqual(d, {'abc': {'def': 'ghi'}})
mergedicts(d, {'abc': {'def': {'ghi': 'jkl'}}})
self.assertEqual(d, {'abc': {'def': {'ghi': 'jkl'}}})
mergedicts(d, {})
self.assertEqual(d, {'abc': {'def': {'ghi': 'jkl'}}})
mergedicts(d, {'abc': {'mno': 'pqr'}})
self.assertEqual(d, {'abc': {'def': {'ghi': 'jkl'}, 'mno': 'pqr'}})
def test_add_divider_highlight_group(self):
def decorated_function_name(**kwargs):
return str(kwargs)
func = add_divider_highlight_group('hl_group')(decorated_function_name)
self.assertEqual(func.__name__, 'decorated_function_name')
self.assertEqual(func(kw={}), [{'contents': repr({'kw': {}}), 'divider_highlight_group': 'hl_group'}])
def test_humanize_bytes(self):
self.assertEqual(humanize_bytes(0), '0 B')
self.assertEqual(humanize_bytes(1), '1 B')
self.assertEqual(humanize_bytes(1, suffix='bit'), '1 bit')
self.assertEqual(humanize_bytes(1000, si_prefix=True), '1 kB')
self.assertEqual(humanize_bytes(1024, si_prefix=True), '1 kB')
self.assertEqual(humanize_bytes(1000000000, si_prefix=True), '1.00 GB')
self.assertEqual(humanize_bytes(1000000000, si_prefix=False), '953.7 MiB')
class TestFilesystemWatchers(TestCase):
def do_test_for_change(self, watcher, path):
import time
st = time.time()
while time.time() - st < 1:
if watcher(path):
return
time.sleep(0.1)
self.fail('The change to {0} was not detected'.format(path))
def test_file_watcher(self):
from powerline.lib.file_watcher import create_file_watcher
w = create_file_watcher(use_stat=False)
if w.is_stat_based:
raise SkipTest('This test is not suitable for a stat based file watcher')
f1, f2 = os.path.join(INOTIFY_DIR, 'file1'), os.path.join(INOTIFY_DIR, 'file2')
with open(f1, 'wb'):
with open(f2, 'wb'):
pass
ne = os.path.join(INOTIFY_DIR, 'notexists')
self.assertRaises(OSError, w, ne)
self.assertTrue(w(f1))
self.assertTrue(w(f2))
os.utime(f1, None), os.utime(f2, None)
self.do_test_for_change(w, f1)
self.do_test_for_change(w, f2)
# Repeat once
os.utime(f1, None), os.utime(f2, None)
self.do_test_for_change(w, f1)
self.do_test_for_change(w, f2)
# Check that no false changes are reported
self.assertFalse(w(f1), 'Spurious change detected')
self.assertFalse(w(f2), 'Spurious change detected')
# Check that open the file with 'w' triggers a change
with open(f1, 'wb'):
with open(f2, 'wb'):
pass
self.do_test_for_change(w, f1)
self.do_test_for_change(w, f2)
# Check that writing to a file with 'a' triggers a change
with open(f1, 'ab') as f:
f.write(b'1')
self.do_test_for_change(w, f1)
# Check that deleting a file registers as a change
os.unlink(f1)
self.do_test_for_change(w, f1)
def test_tree_watcher(self):
from powerline.lib.tree_watcher import TreeWatcher
tw = TreeWatcher()
subdir = os.path.join(INOTIFY_DIR, 'subdir')
os.mkdir(subdir)
if tw.watch(INOTIFY_DIR).is_dummy:
raise SkipTest('No tree watcher available')
import shutil
self.assertTrue(tw(INOTIFY_DIR))
self.assertFalse(tw(INOTIFY_DIR))
changed = partial(self.do_test_for_change, tw, INOTIFY_DIR)
open(os.path.join(INOTIFY_DIR, 'tree1'), 'w').close()
changed()
open(os.path.join(subdir, 'tree1'), 'w').close()
changed()
os.unlink(os.path.join(subdir, 'tree1'))
changed()
os.rmdir(subdir)
changed()
os.mkdir(subdir)
changed()
os.rename(subdir, subdir + '1')
changed()
shutil.rmtree(subdir + '1')
changed()
os.mkdir(subdir)
f = os.path.join(subdir, 'f')
open(f, 'w').close()
changed()
with open(f, 'a') as s:
s.write(' ')
changed()
os.rename(f, f + '1')
changed()
use_mercurial = use_bzr = sys.version_info < (3, 0)
class TestVCS(TestCase):
def do_branch_rename_test(self, repo, q):
import time
st = time.time()
while time.time() - st < 1:
# Give inotify time to deliver events
ans = repo.branch()
if hasattr(q, '__call__'):
if q(ans):
break
else:
if ans == q:
break
time.sleep(0.01)
if hasattr(q, '__call__'):
self.assertTrue(q(ans))
else:
self.assertEqual(ans, q)
def test_git(self):
repo = guess(path=GIT_REPO)
self.assertNotEqual(repo, None)
self.assertEqual(repo.branch(), 'master')
self.assertEqual(repo.status(), None)
self.assertEqual(repo.status('file'), None)
with open(os.path.join(GIT_REPO, 'file'), 'w') as f:
f.write('abc')
f.flush()
self.assertEqual(repo.status(), ' U')
self.assertEqual(repo.status('file'), '??')
call(['git', 'add', '.'], cwd=GIT_REPO)
self.assertEqual(repo.status(), ' I ')
self.assertEqual(repo.status('file'), 'A ')
f.write('def')
f.flush()
self.assertEqual(repo.status(), 'DI ')
self.assertEqual(repo.status('file'), 'AM')
os.remove(os.path.join(GIT_REPO, 'file'))
# Test changing branch
self.assertEqual(repo.branch(), 'master')
call(['git', 'branch', 'branch1'], cwd=GIT_REPO)
call(['git', 'checkout', '-q', 'branch1'], cwd=GIT_REPO)
self.do_branch_rename_test(repo, 'branch1')
# For some reason the rest of this test fails on travis and only on
# travis, and I can't figure out why
if 'TRAVIS' in os.environ:
raise SkipTest('Part of this test fails on Travis for unknown reasons')
call(['git', 'branch', 'branch2'], cwd=GIT_REPO)
call(['git', 'checkout', '-q', 'branch2'], cwd=GIT_REPO)
self.do_branch_rename_test(repo, 'branch2')
call(['git', 'checkout', '-q', '--detach', 'branch1'], cwd=GIT_REPO)
self.do_branch_rename_test(repo, lambda b: re.match(r'^[a-f0-9]+$', b))
if use_mercurial:
def test_mercurial(self):
repo = guess(path=HG_REPO)
self.assertNotEqual(repo, None)
self.assertEqual(repo.branch(), 'default')
self.assertEqual(repo.status(), None)
with open(os.path.join(HG_REPO, 'file'), 'w') as f:
f.write('abc')
f.flush()
self.assertEqual(repo.status(), ' U')
self.assertEqual(repo.status('file'), 'U')
call(['hg', 'add', '.'], cwd=HG_REPO, stdout=PIPE)
self.assertEqual(repo.status(), 'D ')
self.assertEqual(repo.status('file'), 'A')
os.remove(os.path.join(HG_REPO, 'file'))
if use_bzr:
def test_bzr(self):
repo = guess(path=BZR_REPO)
self.assertNotEqual(repo, None, 'No bzr repo found. Do you have bzr installed?')
self.assertEqual(repo.branch(), 'test_powerline')
self.assertEqual(repo.status(), None)
with open(os.path.join(BZR_REPO, 'file'), 'w') as f:
f.write('abc')
self.assertEqual(repo.status(), ' U')
self.assertEqual(repo.status('file'), '? ')
call(['bzr', 'add', '-q', '.'], cwd=BZR_REPO, stdout=PIPE)
self.assertEqual(repo.status(), 'D ')
self.assertEqual(repo.status('file'), '+N')
call(['bzr', 'commit', '-q', '-m', 'initial commit'], cwd=BZR_REPO)
self.assertEqual(repo.status(), None)
with open(os.path.join(BZR_REPO, 'file'), 'w') as f:
f.write('def')
self.assertEqual(repo.status(), 'D ')
self.assertEqual(repo.status('file'), ' M')
self.assertEqual(repo.status('notexist'), None)
with open(os.path.join(BZR_REPO, 'ignored'), 'w') as f:
f.write('abc')
self.assertEqual(repo.status('ignored'), '? ')
# Test changing the .bzrignore file should update status
with open(os.path.join(BZR_REPO, '.bzrignore'), 'w') as f:
f.write('ignored')
self.assertEqual(repo.status('ignored'), None)
# Test changing the dirstate file should invalidate the cache for
# all files in the repo
with open(os.path.join(BZR_REPO, 'file2'), 'w') as f:
f.write('abc')
call(['bzr', 'add', 'file2'], cwd=BZR_REPO, stdout=PIPE)
call(['bzr', 'commit', '-q', '-m', 'file2 added'], cwd=BZR_REPO)
with open(os.path.join(BZR_REPO, 'file'), 'a') as f:
f.write('hello')
with open(os.path.join(BZR_REPO, 'file2'), 'a') as f:
f.write('hello')
self.assertEqual(repo.status('file'), ' M')
self.assertEqual(repo.status('file2'), ' M')
call(['bzr', 'commit', '-q', '-m', 'multi'], cwd=BZR_REPO)
self.assertEqual(repo.status('file'), None)
self.assertEqual(repo.status('file2'), None)
# Test changing branch
call(['bzr', 'nick', 'branch1'], cwd=BZR_REPO, stdout=PIPE, stderr=PIPE)
self.do_branch_rename_test(repo, 'branch1')
# Test branch name/status changes when swapping repos
for x in ('b1', 'b2'):
d = os.path.join(BZR_REPO, x)
os.mkdir(d)
call(['bzr', 'init', '-q'], cwd=d)
call(['bzr', 'nick', '-q', x], cwd=d)
repo = guess(path=d)
self.assertEqual(repo.branch(), x)
self.assertFalse(repo.status())
if x == 'b1':
open(os.path.join(d, 'dirty'), 'w').close()
self.assertTrue(repo.status())
os.rename(os.path.join(BZR_REPO, 'b1'), os.path.join(BZR_REPO, 'b'))
os.rename(os.path.join(BZR_REPO, 'b2'), os.path.join(BZR_REPO, 'b1'))
os.rename(os.path.join(BZR_REPO, 'b'), os.path.join(BZR_REPO, 'b2'))
for x, y in (('b1', 'b2'), ('b2', 'b1')):
d = os.path.join(BZR_REPO, x)
repo = guess(path=d)
self.do_branch_rename_test(repo, y)
if x == 'b1':
self.assertFalse(repo.status())
else:
self.assertTrue(repo.status())
old_HGRCPATH = None
old_cwd = None
GIT_REPO = 'git_repo' + os.environ.get('PYTHON', '')
HG_REPO = 'hg_repo' + os.environ.get('PYTHON', '')
BZR_REPO = 'bzr_repo' + os.environ.get('PYTHON', '')
INOTIFY_DIR = 'inotify' + os.environ.get('PYTHON', '')
def setUpModule():
global old_cwd
global old_HGRCPATH
old_cwd = os.getcwd()
os.chdir(os.path.dirname(__file__))
call(['git', 'init', '--quiet', GIT_REPO])
assert os.path.isdir(GIT_REPO)
call(['git', 'config', '--local', 'user.name', 'Foo'], cwd=GIT_REPO)
call(['git', 'config', '--local', 'user.email', 'bar@example.org'], cwd=GIT_REPO)
call(['git', 'commit', '--allow-empty', '--message', 'Initial commit', '--quiet'], cwd=GIT_REPO)
if use_mercurial:
old_HGRCPATH = os.environ.get('HGRCPATH')
os.environ['HGRCPATH'] = ''
call(['hg', 'init', HG_REPO])
with open(os.path.join(HG_REPO, '.hg', 'hgrc'), 'w') as hgrc:
hgrc.write('[ui]\n')
hgrc.write('username = Foo <bar@example.org>\n')
if use_bzr:
call(['bzr', 'init', '--quiet', BZR_REPO])
call(['bzr', 'config', 'email=Foo <bar@example.org>'], cwd=BZR_REPO)
call(['bzr', 'config', 'nickname=test_powerline'], cwd=BZR_REPO)
call(['bzr', 'config', 'create_signatures=0'], cwd=BZR_REPO)
os.mkdir(INOTIFY_DIR)
def tearDownModule():
global old_cwd
global old_HGRCPATH
for repo_dir in [INOTIFY_DIR, GIT_REPO] + ([HG_REPO] if use_mercurial else []) + ([BZR_REPO] if use_bzr else []):
for root, dirs, files in list(os.walk(repo_dir, topdown=False)):
for file in files:
os.remove(os.path.join(root, file))
for dir in dirs:
os.rmdir(os.path.join(root, dir))
os.rmdir(repo_dir)
if use_mercurial:
if old_HGRCPATH is None:
os.environ.pop('HGRCPATH')
else:
os.environ['HGRCPATH'] = old_HGRCPATH
os.chdir(old_cwd)
if __name__ == '__main__':
from tests import main
main()