powerline/tests/test_lib.py

686 lines
21 KiB
Python

# vim:fileencoding=utf-8:noet
from __future__ import division
from powerline.lib import mergedicts, add_divider_highlight_group, REMOVE_THIS_KEY
from powerline.lib.humanize_bytes import humanize_bytes
from powerline.lib.vcs import guess, get_fallback_create_watcher
from powerline.lib.threaded import ThreadedSegment, KwThreadedSegment
from powerline.lib.monotonic import monotonic
from powerline.lib.file_watcher import create_file_watcher, INotifyError
from powerline.lib.vcs.git import git_directory
from powerline import get_fallback_logger
import threading
import os
import sys
import re
from time import sleep
from subprocess import call, PIPE
from functools import partial
from tests import TestCase, SkipTest
from tests.lib import Pl
def thread_number():
return len(threading.enumerate())
class TestThreaded(TestCase):
def test_threaded_segment(self):
log = []
pl = Pl()
updates = [(None,)]
lock = threading.Lock()
event = threading.Event()
block_event = threading.Event()
class TestSegment(ThreadedSegment):
interval = 10
def set_state(self, **kwargs):
event.clear()
log.append(('set_state', kwargs))
return super(TestSegment, self).set_state(**kwargs)
def update(self, update_value):
block_event.wait()
event.set()
# Make sleep first to prevent some race conditions
log.append(('update', update_value))
with lock:
ret = updates[0]
if isinstance(ret, Exception):
raise ret
else:
return ret[0]
def render(self, update, **kwargs):
log.append(('render', update, kwargs))
if isinstance(update, Exception):
raise update
else:
return update
# Non-threaded tests
segment = TestSegment()
block_event.set()
updates[0] = (None,)
self.assertEqual(segment(pl=pl), None)
self.assertEqual(thread_number(), 1)
self.assertEqual(log, [
('set_state', {}),
('update', None),
('render', None, {'pl': pl, 'update_first': True}),
])
log[:] = ()
segment = TestSegment()
block_event.set()
updates[0] = ('abc',)
self.assertEqual(segment(pl=pl), 'abc')
self.assertEqual(thread_number(), 1)
self.assertEqual(log, [
('set_state', {}),
('update', None),
('render', 'abc', {'pl': pl, 'update_first': True}),
])
log[:] = ()
segment = TestSegment()
block_event.set()
updates[0] = ('abc',)
self.assertEqual(segment(pl=pl, update_first=False), 'abc')
self.assertEqual(thread_number(), 1)
self.assertEqual(log, [
('set_state', {}),
('update', None),
('render', 'abc', {'pl': pl, 'update_first': False}),
])
log[:] = ()
segment = TestSegment()
block_event.set()
updates[0] = ValueError('abc')
self.assertEqual(segment(pl=pl), None)
self.assertEqual(thread_number(), 1)
self.assertEqual(len(pl.exceptions), 1)
self.assertEqual(log, [
('set_state', {}),
('update', None),
])
log[:] = ()
pl.exceptions[:] = ()
segment = TestSegment()
block_event.set()
updates[0] = (TypeError('def'),)
self.assertRaises(TypeError, segment, pl=pl)
self.assertEqual(thread_number(), 1)
self.assertEqual(log, [
('set_state', {}),
('update', None),
('render', updates[0][0], {'pl': pl, 'update_first': True}),
])
log[:] = ()
# Threaded tests
segment = TestSegment()
block_event.clear()
kwargs = {'pl': pl, 'update_first': False, 'other': 1}
with lock:
updates[0] = ('abc',)
segment.startup(**kwargs)
ret = segment(**kwargs)
self.assertEqual(thread_number(), 2)
block_event.set()
event.wait()
segment.shutdown_event.set()
segment.thread.join()
self.assertEqual(ret, None)
self.assertEqual(log, [
('set_state', {'update_first': False, 'other': 1}),
('render', None, {'pl': pl, 'update_first': False, 'other': 1}),
('update', None),
])
log[:] = ()
segment = TestSegment()
block_event.set()
kwargs = {'pl': pl, 'update_first': True, 'other': 1}
with lock:
updates[0] = ('def',)
segment.startup(**kwargs)
ret = segment(**kwargs)
self.assertEqual(thread_number(), 2)
segment.shutdown_event.set()
segment.thread.join()
self.assertEqual(ret, 'def')
self.assertEqual(log, [
('set_state', {'update_first': True, 'other': 1}),
('update', None),
('render', 'def', {'pl': pl, 'update_first': True, 'other': 1}),
])
log[:] = ()
segment = TestSegment()
block_event.set()
kwargs = {'pl': pl, 'update_first': True, 'interval': 0.2}
with lock:
updates[0] = ('abc',)
segment.startup(**kwargs)
start = monotonic()
ret1 = segment(**kwargs)
with lock:
updates[0] = ('def',)
self.assertEqual(thread_number(), 2)
sleep(0.5)
ret2 = segment(**kwargs)
segment.shutdown_event.set()
segment.thread.join()
end = monotonic()
duration = end - start
self.assertEqual(ret1, 'abc')
self.assertEqual(ret2, 'def')
self.assertEqual(log[:5], [
('set_state', {'update_first': True, 'interval': 0.2}),
('update', None),
('render', 'abc', {'pl': pl, 'update_first': True, 'interval': 0.2}),
('update', 'abc'),
('update', 'def'),
])
num_runs = len([e for e in log if e[0] == 'update'])
self.assertAlmostEqual(duration / 0.2, num_runs, delta=1)
log[:] = ()
segment = TestSegment()
block_event.set()
kwargs = {'pl': pl, 'update_first': True, 'interval': 0.2}
with lock:
updates[0] = ('ghi',)
segment.startup(**kwargs)
start = monotonic()
ret1 = segment(**kwargs)
with lock:
updates[0] = TypeError('jkl')
self.assertEqual(thread_number(), 2)
sleep(0.5)
ret2 = segment(**kwargs)
segment.shutdown_event.set()
segment.thread.join()
end = monotonic()
duration = end - start
self.assertEqual(ret1, 'ghi')
self.assertEqual(ret2, None)
self.assertEqual(log[:5], [
('set_state', {'update_first': True, 'interval': 0.2}),
('update', None),
('render', 'ghi', {'pl': pl, 'update_first': True, 'interval': 0.2}),
('update', 'ghi'),
('update', 'ghi'),
])
num_runs = len([e for e in log if e[0] == 'update'])
self.assertAlmostEqual(duration / 0.2, num_runs, delta=1)
self.assertEqual(num_runs - 1, len(pl.exceptions))
log[:] = ()
def test_kw_threaded_segment(self):
log = []
pl = Pl()
event = threading.Event()
class TestSegment(KwThreadedSegment):
interval = 10
@staticmethod
def key(_key=(None,), **kwargs):
log.append(('key', _key, kwargs))
return _key
def compute_state(self, key):
event.set()
sleep(0.1)
log.append(('compute_state', key))
ret = key
if isinstance(ret, Exception):
raise ret
else:
return ret[0]
def render_one(self, state, **kwargs):
log.append(('render_one', state, kwargs))
if isinstance(state, Exception):
raise state
else:
return state
# Non-threaded tests
segment = TestSegment()
event.clear()
self.assertEqual(segment(pl=pl), None)
self.assertEqual(thread_number(), 1)
self.assertEqual(log, [
('key', (None,), {'pl': pl}),
('compute_state', (None,)),
('render_one', None, {'pl': pl}),
])
log[:] = ()
segment = TestSegment()
kwargs = {'pl': pl, '_key': ('abc',), 'update_first': False}
event.clear()
self.assertEqual(segment(**kwargs), 'abc')
kwargs.update(_key=('def',))
self.assertEqual(segment(**kwargs), 'def')
self.assertEqual(thread_number(), 1)
self.assertEqual(log, [
('key', ('abc',), {'pl': pl}),
('compute_state', ('abc',)),
('render_one', 'abc', {'pl': pl, '_key': ('abc',)}),
('key', ('def',), {'pl': pl}),
('compute_state', ('def',)),
('render_one', 'def', {'pl': pl, '_key': ('def',)}),
])
log[:] = ()
segment = TestSegment()
kwargs = {'pl': pl, '_key': ValueError('xyz'), 'update_first': False}
event.clear()
self.assertEqual(segment(**kwargs), None)
self.assertEqual(thread_number(), 1)
self.assertEqual(log, [
('key', kwargs['_key'], {'pl': pl}),
('compute_state', kwargs['_key']),
])
log[:] = ()
segment = TestSegment()
kwargs = {'pl': pl, '_key': (ValueError('abc'),), 'update_first': False}
event.clear()
self.assertRaises(ValueError, segment, **kwargs)
self.assertEqual(thread_number(), 1)
self.assertEqual(log, [
('key', kwargs['_key'], {'pl': pl}),
('compute_state', kwargs['_key']),
('render_one', kwargs['_key'][0], {'pl': pl, '_key': kwargs['_key']}),
])
log[:] = ()
# Threaded tests
segment = TestSegment()
kwargs = {'pl': pl, 'update_first': False, '_key': ('_abc',)}
event.clear()
segment.startup(**kwargs)
ret = segment(**kwargs)
self.assertEqual(thread_number(), 2)
segment.shutdown_event.set()
segment.thread.join()
self.assertEqual(ret, None)
self.assertEqual(log[:2], [
('key', kwargs['_key'], {'pl': pl}),
('render_one', None, {'pl': pl, '_key': kwargs['_key']}),
])
self.assertLessEqual(len(log), 3)
if len(log) > 2:
self.assertEqual(log[2], ('compute_state', kwargs['_key']))
log[:] = ()
segment = TestSegment()
kwargs = {'pl': pl, 'update_first': True, '_key': ('_abc',)}
event.clear()
segment.startup(**kwargs)
ret1 = segment(**kwargs)
kwargs.update(_key=('_def',))
ret2 = segment(**kwargs)
self.assertEqual(thread_number(), 2)
segment.shutdown_event.set()
segment.thread.join()
self.assertEqual(ret1, '_abc')
self.assertEqual(ret2, '_def')
self.assertEqual(log, [
('key', ('_abc',), {'pl': pl}),
('compute_state', ('_abc',)),
('render_one', '_abc', {'pl': pl, '_key': ('_abc',)}),
('key', ('_def',), {'pl': pl}),
('compute_state', ('_def',)),
('render_one', '_def', {'pl': pl, '_key': ('_def',)}),
])
log[:] = ()
class TestLib(TestCase):
def test_mergedicts(self):
d = {}
mergedicts(d, {'abc': {'def': 'ghi'}})
self.assertEqual(d, {'abc': {'def': 'ghi'}})
mergedicts(d, {'abc': {'def': {'ghi': 'jkl'}}})
self.assertEqual(d, {'abc': {'def': {'ghi': 'jkl'}}})
mergedicts(d, {})
self.assertEqual(d, {'abc': {'def': {'ghi': 'jkl'}}})
mergedicts(d, {'abc': {'mno': 'pqr'}})
self.assertEqual(d, {'abc': {'def': {'ghi': 'jkl'}, 'mno': 'pqr'}})
mergedicts(d, {'abc': {'def': REMOVE_THIS_KEY}})
self.assertEqual(d, {'abc': {'mno': 'pqr'}})
def test_add_divider_highlight_group(self):
def decorated_function_name(**kwargs):
return str(kwargs)
func = add_divider_highlight_group('hl_group')(decorated_function_name)
self.assertEqual(func.__name__, 'decorated_function_name')
self.assertEqual(func(kw={}), [{'contents': repr({'kw': {}}), 'divider_highlight_group': 'hl_group'}])
def test_humanize_bytes(self):
self.assertEqual(humanize_bytes(0), '0 B')
self.assertEqual(humanize_bytes(1), '1 B')
self.assertEqual(humanize_bytes(1, suffix='bit'), '1 bit')
self.assertEqual(humanize_bytes(1000, si_prefix=True), '1 kB')
self.assertEqual(humanize_bytes(1024, si_prefix=True), '1 kB')
self.assertEqual(humanize_bytes(1000000000, si_prefix=True), '1.00 GB')
self.assertEqual(humanize_bytes(1000000000, si_prefix=False), '953.7 MiB')
class TestFilesystemWatchers(TestCase):
def do_test_for_change(self, watcher, path):
st = monotonic()
while monotonic() - st < 1:
if watcher(path):
return
sleep(0.1)
self.fail('The change to {0} was not detected'.format(path))
def test_file_watcher(self):
try:
w = create_file_watcher(pl=get_fallback_logger(), watcher_type='inotify')
except INotifyError:
raise SkipTest('This test is not suitable for a stat based file watcher')
f1, f2, f3 = map(lambda x: os.path.join(INOTIFY_DIR, 'file%d' % x), (1, 2, 3))
with open(f1, 'wb'):
with open(f2, 'wb'):
with open(f3, 'wb'):
pass
ne = os.path.join(INOTIFY_DIR, 'notexists')
self.assertRaises(OSError, w, ne)
self.assertTrue(w(f1))
self.assertTrue(w(f2))
os.utime(f1, None), os.utime(f2, None)
self.do_test_for_change(w, f1)
self.do_test_for_change(w, f2)
# Repeat once
os.utime(f1, None), os.utime(f2, None)
self.do_test_for_change(w, f1)
self.do_test_for_change(w, f2)
# Check that no false changes are reported
self.assertFalse(w(f1), 'Spurious change detected')
self.assertFalse(w(f2), 'Spurious change detected')
# Check that open the file with 'w' triggers a change
with open(f1, 'wb'):
with open(f2, 'wb'):
pass
self.do_test_for_change(w, f1)
self.do_test_for_change(w, f2)
# Check that writing to a file with 'a' triggers a change
with open(f1, 'ab') as f:
f.write(b'1')
self.do_test_for_change(w, f1)
# Check that deleting a file registers as a change
os.unlink(f1)
self.do_test_for_change(w, f1)
# Test that changing the inode of a file does not cause it to stop
# being watched
os.rename(f3, f2)
self.do_test_for_change(w, f2)
self.assertFalse(w(f2), 'Spurious change detected')
os.utime(f2, None)
self.do_test_for_change(w, f2)
def test_tree_watcher(self):
from powerline.lib.tree_watcher import TreeWatcher
tw = TreeWatcher()
subdir = os.path.join(INOTIFY_DIR, 'subdir')
os.mkdir(subdir)
if tw.watch(INOTIFY_DIR).is_dummy:
raise SkipTest('No tree watcher available')
import shutil
self.assertTrue(tw(INOTIFY_DIR))
self.assertFalse(tw(INOTIFY_DIR))
changed = partial(self.do_test_for_change, tw, INOTIFY_DIR)
open(os.path.join(INOTIFY_DIR, 'tree1'), 'w').close()
changed()
open(os.path.join(subdir, 'tree1'), 'w').close()
changed()
os.unlink(os.path.join(subdir, 'tree1'))
changed()
os.rmdir(subdir)
changed()
os.mkdir(subdir)
changed()
os.rename(subdir, subdir + '1')
changed()
shutil.rmtree(subdir + '1')
changed()
os.mkdir(subdir)
f = os.path.join(subdir, 'f')
open(f, 'w').close()
changed()
with open(f, 'a') as s:
s.write(' ')
changed()
os.rename(f, f + '1')
changed()
use_mercurial = use_bzr = sys.version_info < (3, 0)
class TestVCS(TestCase):
def do_branch_rename_test(self, repo, q):
st = monotonic()
while monotonic() - st < 1:
# Give inotify time to deliver events
ans = repo.branch()
if hasattr(q, '__call__'):
if q(ans):
break
else:
if ans == q:
break
sleep(0.01)
if hasattr(q, '__call__'):
self.assertTrue(q(ans))
else:
self.assertEqual(ans, q)
def test_git(self):
create_watcher = get_fallback_create_watcher()
repo = guess(path=GIT_REPO, create_watcher=create_watcher)
self.assertNotEqual(repo, None)
self.assertEqual(repo.branch(), 'master')
self.assertEqual(repo.status(), None)
self.assertEqual(repo.status('file'), None)
with open(os.path.join(GIT_REPO, 'file'), 'w') as f:
f.write('abc')
f.flush()
self.assertEqual(repo.status(), ' U')
self.assertEqual(repo.status('file'), '??')
call(['git', 'add', '.'], cwd=GIT_REPO)
self.assertEqual(repo.status(), ' I ')
self.assertEqual(repo.status('file'), 'A ')
f.write('def')
f.flush()
self.assertEqual(repo.status(), 'DI ')
self.assertEqual(repo.status('file'), 'AM')
os.remove(os.path.join(GIT_REPO, 'file'))
# Test changing branch
self.assertEqual(repo.branch(), 'master')
try:
call(['git', 'branch', 'branch1'], cwd=GIT_REPO)
call(['git', 'checkout', '-q', 'branch1'], cwd=GIT_REPO)
self.do_branch_rename_test(repo, 'branch1')
call(['git', 'branch', 'branch2'], cwd=GIT_REPO)
call(['git', 'checkout', '-q', 'branch2'], cwd=GIT_REPO)
self.do_branch_rename_test(repo, 'branch2')
call(['git', 'checkout', '-q', '--detach', 'branch1'], cwd=GIT_REPO)
self.do_branch_rename_test(repo, lambda b: re.match(br'^[a-f0-9]+$', b))
finally:
call(['git', 'checkout', '-q', 'master'], cwd=GIT_REPO)
def test_git_sym(self):
create_watcher = get_fallback_create_watcher()
dotgit = os.path.join(GIT_REPO, '.git')
spacegit = os.path.join(GIT_REPO, ' .git ')
os.rename(dotgit, spacegit)
try:
with open(dotgit, 'w') as F:
F.write('gitdir: .git \n')
gitdir = git_directory(GIT_REPO)
self.assertTrue(os.path.isdir(gitdir))
self.assertEqual(gitdir, os.path.abspath(spacegit))
repo = guess(path=GIT_REPO, create_watcher=create_watcher)
self.assertEqual(repo.branch(), 'master')
finally:
os.remove(dotgit)
os.rename(spacegit, dotgit)
if use_mercurial:
def test_mercurial(self):
create_watcher = get_fallback_create_watcher()
repo = guess(path=HG_REPO, create_watcher=create_watcher)
self.assertNotEqual(repo, None)
self.assertEqual(repo.branch(), 'default')
self.assertEqual(repo.status(), None)
with open(os.path.join(HG_REPO, 'file'), 'w') as f:
f.write('abc')
f.flush()
self.assertEqual(repo.status(), ' U')
self.assertEqual(repo.status('file'), 'U')
call(['hg', 'add', '.'], cwd=HG_REPO, stdout=PIPE)
self.assertEqual(repo.status(), 'D ')
self.assertEqual(repo.status('file'), 'A')
os.remove(os.path.join(HG_REPO, 'file'))
if use_bzr:
def test_bzr(self):
create_watcher = get_fallback_create_watcher()
repo = guess(path=BZR_REPO, create_watcher=create_watcher)
self.assertNotEqual(repo, None, 'No bzr repo found. Do you have bzr installed?')
self.assertEqual(repo.branch(), 'test_powerline')
self.assertEqual(repo.status(), None)
with open(os.path.join(BZR_REPO, 'file'), 'w') as f:
f.write('abc')
self.assertEqual(repo.status(), ' U')
self.assertEqual(repo.status('file'), '? ')
call(['bzr', 'add', '-q', '.'], cwd=BZR_REPO, stdout=PIPE)
self.assertEqual(repo.status(), 'D ')
self.assertEqual(repo.status('file'), '+N')
call(['bzr', 'commit', '-q', '-m', 'initial commit'], cwd=BZR_REPO)
self.assertEqual(repo.status(), None)
with open(os.path.join(BZR_REPO, 'file'), 'w') as f:
f.write('def')
self.assertEqual(repo.status(), 'D ')
self.assertEqual(repo.status('file'), ' M')
self.assertEqual(repo.status('notexist'), None)
with open(os.path.join(BZR_REPO, 'ignored'), 'w') as f:
f.write('abc')
self.assertEqual(repo.status('ignored'), '? ')
# Test changing the .bzrignore file should update status
with open(os.path.join(BZR_REPO, '.bzrignore'), 'w') as f:
f.write('ignored')
self.assertEqual(repo.status('ignored'), None)
# Test changing the dirstate file should invalidate the cache for
# all files in the repo
with open(os.path.join(BZR_REPO, 'file2'), 'w') as f:
f.write('abc')
call(['bzr', 'add', 'file2'], cwd=BZR_REPO, stdout=PIPE)
call(['bzr', 'commit', '-q', '-m', 'file2 added'], cwd=BZR_REPO)
with open(os.path.join(BZR_REPO, 'file'), 'a') as f:
f.write('hello')
with open(os.path.join(BZR_REPO, 'file2'), 'a') as f:
f.write('hello')
self.assertEqual(repo.status('file'), ' M')
self.assertEqual(repo.status('file2'), ' M')
call(['bzr', 'commit', '-q', '-m', 'multi'], cwd=BZR_REPO)
self.assertEqual(repo.status('file'), None)
self.assertEqual(repo.status('file2'), None)
# Test changing branch
call(['bzr', 'nick', 'branch1'], cwd=BZR_REPO, stdout=PIPE, stderr=PIPE)
self.do_branch_rename_test(repo, 'branch1')
# Test branch name/status changes when swapping repos
for x in ('b1', 'b2'):
d = os.path.join(BZR_REPO, x)
os.mkdir(d)
call(['bzr', 'init', '-q'], cwd=d)
call(['bzr', 'nick', '-q', x], cwd=d)
repo = guess(path=d, create_watcher=create_watcher)
self.assertEqual(repo.branch(), x)
self.assertFalse(repo.status())
if x == 'b1':
open(os.path.join(d, 'dirty'), 'w').close()
self.assertTrue(repo.status())
os.rename(os.path.join(BZR_REPO, 'b1'), os.path.join(BZR_REPO, 'b'))
os.rename(os.path.join(BZR_REPO, 'b2'), os.path.join(BZR_REPO, 'b1'))
os.rename(os.path.join(BZR_REPO, 'b'), os.path.join(BZR_REPO, 'b2'))
for x, y in (('b1', 'b2'), ('b2', 'b1')):
d = os.path.join(BZR_REPO, x)
repo = guess(path=d, create_watcher=create_watcher)
self.do_branch_rename_test(repo, y)
if x == 'b1':
self.assertFalse(repo.status())
else:
self.assertTrue(repo.status())
old_HGRCPATH = None
old_cwd = None
GIT_REPO = 'git_repo' + os.environ.get('PYTHON', '')
HG_REPO = 'hg_repo' + os.environ.get('PYTHON', '')
BZR_REPO = 'bzr_repo' + os.environ.get('PYTHON', '')
INOTIFY_DIR = 'inotify' + os.environ.get('PYTHON', '')
def setUpModule():
global old_cwd
global old_HGRCPATH
old_cwd = os.getcwd()
os.chdir(os.path.dirname(__file__))
call(['git', 'init', '--quiet', GIT_REPO])
assert os.path.isdir(GIT_REPO)
call(['git', 'config', '--local', 'user.name', 'Foo'], cwd=GIT_REPO)
call(['git', 'config', '--local', 'user.email', 'bar@example.org'], cwd=GIT_REPO)
call(['git', 'commit', '--allow-empty', '--message', 'Initial commit', '--quiet'], cwd=GIT_REPO)
if use_mercurial:
old_HGRCPATH = os.environ.get('HGRCPATH')
os.environ['HGRCPATH'] = ''
call(['hg', 'init', HG_REPO])
with open(os.path.join(HG_REPO, '.hg', 'hgrc'), 'w') as hgrc:
hgrc.write('[ui]\n')
hgrc.write('username = Foo <bar@example.org>\n')
if use_bzr:
call(['bzr', 'init', '--quiet', BZR_REPO])
call(['bzr', 'config', 'email=Foo <bar@example.org>'], cwd=BZR_REPO)
call(['bzr', 'config', 'nickname=test_powerline'], cwd=BZR_REPO)
call(['bzr', 'config', 'create_signatures=0'], cwd=BZR_REPO)
os.mkdir(INOTIFY_DIR)
def tearDownModule():
global old_cwd
global old_HGRCPATH
for repo_dir in [INOTIFY_DIR, GIT_REPO] + ([HG_REPO] if use_mercurial else []) + ([BZR_REPO] if use_bzr else []):
for root, dirs, files in list(os.walk(repo_dir, topdown=False)):
for file in files:
os.remove(os.path.join(root, file))
for dir in dirs:
os.rmdir(os.path.join(root, dir))
os.rmdir(repo_dir)
if use_mercurial:
if old_HGRCPATH is None:
os.environ.pop('HGRCPATH')
else:
os.environ['HGRCPATH'] = old_HGRCPATH
os.chdir(old_cwd)
if __name__ == '__main__':
from tests import main
main()