mirror of
https://github.com/powerline/powerline.git
synced 2025-09-04 08:38:39 +02:00
Now imports follow the following structure: 1. __future__ line: exactly one line allowed: from __future__ import (unicode_literals, division, absolute_import, print_function) (powerline.shell is the only exception due to problems with argparse). 2. Standard python library imports in a form `import X`. 3. Standard python library imports in a form `from X import Y`. 4. and 5. 2. and 3. for third-party (non-python and non-powerline imports). 6. 3. for powerline non-test imports. 7. and 8. 2. and 3. for powerline testing module imports. Each list entry is separated by exactly one newline from another import. If there is module docstring it goes between `# vim:` comment and `__future__` import. So the structure containing all items is the following: #!/usr/bin/env python # vim:fileencoding=utf-8:noet '''Powerline super module''' import sys from argparse import ArgumentParser import psutil from colormath.color_diff import delta_e_cie2000 from powerline.lib.unicode import u import tests.vim as vim_module from tests import TestCase .
597 lines
18 KiB
Python
597 lines
18 KiB
Python
# vim:fileencoding=utf-8:noet
|
|
from __future__ import (unicode_literals, division, absolute_import, print_function)
|
|
|
|
import threading
|
|
import os
|
|
import re
|
|
import shutil
|
|
|
|
from time import sleep
|
|
from subprocess import call, PIPE
|
|
|
|
from powerline.lib import mergedicts, add_divider_highlight_group, REMOVE_THIS_KEY
|
|
from powerline.lib.humanize_bytes import humanize_bytes
|
|
from powerline.lib.vcs import guess, get_fallback_create_watcher
|
|
from powerline.lib.threaded import ThreadedSegment, KwThreadedSegment
|
|
from powerline.lib.monotonic import monotonic
|
|
from powerline.lib.vcs.git import git_directory
|
|
|
|
from tests.lib import Pl
|
|
from tests import TestCase, SkipTest
|
|
|
|
|
|
try:
|
|
__import__('bzrlib')
|
|
except ImportError:
|
|
use_bzr = False
|
|
else:
|
|
use_bzr = True
|
|
|
|
try:
|
|
__import__('mercurial')
|
|
except ImportError:
|
|
use_mercurial = False
|
|
else:
|
|
use_mercurial = True
|
|
|
|
|
|
GIT_REPO = 'git_repo' + os.environ.get('PYTHON', '')
|
|
HG_REPO = 'hg_repo' + os.environ.get('PYTHON', '')
|
|
BZR_REPO = 'bzr_repo' + os.environ.get('PYTHON', '')
|
|
|
|
|
|
def thread_number():
|
|
return len(threading.enumerate())
|
|
|
|
|
|
class TestThreaded(TestCase):
|
|
def test_threaded_segment(self):
|
|
log = []
|
|
pl = Pl()
|
|
updates = [(None,)]
|
|
lock = threading.Lock()
|
|
event = threading.Event()
|
|
block_event = threading.Event()
|
|
|
|
class TestSegment(ThreadedSegment):
|
|
interval = 10
|
|
|
|
def set_state(self, **kwargs):
|
|
event.clear()
|
|
log.append(('set_state', kwargs))
|
|
return super(TestSegment, self).set_state(**kwargs)
|
|
|
|
def update(self, update_value):
|
|
block_event.wait()
|
|
event.set()
|
|
# Make sleep first to prevent some race conditions
|
|
log.append(('update', update_value))
|
|
with lock:
|
|
ret = updates[0]
|
|
if isinstance(ret, Exception):
|
|
raise ret
|
|
else:
|
|
return ret[0]
|
|
|
|
def render(self, update, **kwargs):
|
|
log.append(('render', update, kwargs))
|
|
if isinstance(update, Exception):
|
|
raise update
|
|
else:
|
|
return update
|
|
|
|
# Non-threaded tests
|
|
segment = TestSegment()
|
|
block_event.set()
|
|
updates[0] = (None,)
|
|
self.assertEqual(segment(pl=pl), None)
|
|
self.assertEqual(thread_number(), 1)
|
|
self.assertEqual(log, [
|
|
('set_state', {}),
|
|
('update', None),
|
|
('render', None, {'pl': pl, 'update_first': True}),
|
|
])
|
|
log[:] = ()
|
|
|
|
segment = TestSegment()
|
|
block_event.set()
|
|
updates[0] = ('abc',)
|
|
self.assertEqual(segment(pl=pl), 'abc')
|
|
self.assertEqual(thread_number(), 1)
|
|
self.assertEqual(log, [
|
|
('set_state', {}),
|
|
('update', None),
|
|
('render', 'abc', {'pl': pl, 'update_first': True}),
|
|
])
|
|
log[:] = ()
|
|
|
|
segment = TestSegment()
|
|
block_event.set()
|
|
updates[0] = ('abc',)
|
|
self.assertEqual(segment(pl=pl, update_first=False), 'abc')
|
|
self.assertEqual(thread_number(), 1)
|
|
self.assertEqual(log, [
|
|
('set_state', {}),
|
|
('update', None),
|
|
('render', 'abc', {'pl': pl, 'update_first': False}),
|
|
])
|
|
log[:] = ()
|
|
|
|
segment = TestSegment()
|
|
block_event.set()
|
|
updates[0] = ValueError('abc')
|
|
self.assertEqual(segment(pl=pl), None)
|
|
self.assertEqual(thread_number(), 1)
|
|
self.assertEqual(len(pl.exceptions), 1)
|
|
self.assertEqual(log, [
|
|
('set_state', {}),
|
|
('update', None),
|
|
])
|
|
log[:] = ()
|
|
pl.exceptions[:] = ()
|
|
|
|
segment = TestSegment()
|
|
block_event.set()
|
|
updates[0] = (TypeError('def'),)
|
|
self.assertRaises(TypeError, segment, pl=pl)
|
|
self.assertEqual(thread_number(), 1)
|
|
self.assertEqual(log, [
|
|
('set_state', {}),
|
|
('update', None),
|
|
('render', updates[0][0], {'pl': pl, 'update_first': True}),
|
|
])
|
|
log[:] = ()
|
|
|
|
# Threaded tests
|
|
segment = TestSegment()
|
|
block_event.clear()
|
|
kwargs = {'pl': pl, 'update_first': False, 'other': 1}
|
|
with lock:
|
|
updates[0] = ('abc',)
|
|
segment.startup(**kwargs)
|
|
ret = segment(**kwargs)
|
|
self.assertEqual(thread_number(), 2)
|
|
block_event.set()
|
|
event.wait()
|
|
segment.shutdown_event.set()
|
|
segment.thread.join()
|
|
self.assertEqual(ret, None)
|
|
self.assertEqual(log, [
|
|
('set_state', {'update_first': False, 'other': 1}),
|
|
('render', None, {'pl': pl, 'update_first': False, 'other': 1}),
|
|
('update', None),
|
|
])
|
|
log[:] = ()
|
|
|
|
segment = TestSegment()
|
|
block_event.set()
|
|
kwargs = {'pl': pl, 'update_first': True, 'other': 1}
|
|
with lock:
|
|
updates[0] = ('def',)
|
|
segment.startup(**kwargs)
|
|
ret = segment(**kwargs)
|
|
self.assertEqual(thread_number(), 2)
|
|
segment.shutdown_event.set()
|
|
segment.thread.join()
|
|
self.assertEqual(ret, 'def')
|
|
self.assertEqual(log, [
|
|
('set_state', {'update_first': True, 'other': 1}),
|
|
('update', None),
|
|
('render', 'def', {'pl': pl, 'update_first': True, 'other': 1}),
|
|
])
|
|
log[:] = ()
|
|
|
|
segment = TestSegment()
|
|
block_event.set()
|
|
kwargs = {'pl': pl, 'update_first': True, 'interval': 0.2}
|
|
with lock:
|
|
updates[0] = ('abc',)
|
|
segment.startup(**kwargs)
|
|
start = monotonic()
|
|
ret1 = segment(**kwargs)
|
|
with lock:
|
|
updates[0] = ('def',)
|
|
self.assertEqual(thread_number(), 2)
|
|
sleep(0.5)
|
|
ret2 = segment(**kwargs)
|
|
segment.shutdown_event.set()
|
|
segment.thread.join()
|
|
end = monotonic()
|
|
duration = end - start
|
|
self.assertEqual(ret1, 'abc')
|
|
self.assertEqual(ret2, 'def')
|
|
self.assertEqual(log[:5], [
|
|
('set_state', {'update_first': True, 'interval': 0.2}),
|
|
('update', None),
|
|
('render', 'abc', {'pl': pl, 'update_first': True, 'interval': 0.2}),
|
|
('update', 'abc'),
|
|
('update', 'def'),
|
|
])
|
|
num_runs = len([e for e in log if e[0] == 'update'])
|
|
self.assertAlmostEqual(duration / 0.2, num_runs, delta=1)
|
|
log[:] = ()
|
|
|
|
segment = TestSegment()
|
|
block_event.set()
|
|
kwargs = {'pl': pl, 'update_first': True, 'interval': 0.2}
|
|
with lock:
|
|
updates[0] = ('ghi',)
|
|
segment.startup(**kwargs)
|
|
start = monotonic()
|
|
ret1 = segment(**kwargs)
|
|
with lock:
|
|
updates[0] = TypeError('jkl')
|
|
self.assertEqual(thread_number(), 2)
|
|
sleep(0.5)
|
|
ret2 = segment(**kwargs)
|
|
segment.shutdown_event.set()
|
|
segment.thread.join()
|
|
end = monotonic()
|
|
duration = end - start
|
|
self.assertEqual(ret1, 'ghi')
|
|
self.assertEqual(ret2, None)
|
|
self.assertEqual(log[:5], [
|
|
('set_state', {'update_first': True, 'interval': 0.2}),
|
|
('update', None),
|
|
('render', 'ghi', {'pl': pl, 'update_first': True, 'interval': 0.2}),
|
|
('update', 'ghi'),
|
|
('update', 'ghi'),
|
|
])
|
|
num_runs = len([e for e in log if e[0] == 'update'])
|
|
self.assertAlmostEqual(duration / 0.2, num_runs, delta=1)
|
|
self.assertEqual(num_runs - 1, len(pl.exceptions))
|
|
log[:] = ()
|
|
|
|
def test_kw_threaded_segment(self):
|
|
log = []
|
|
pl = Pl()
|
|
event = threading.Event()
|
|
|
|
class TestSegment(KwThreadedSegment):
|
|
interval = 10
|
|
|
|
@staticmethod
|
|
def key(_key=(None,), **kwargs):
|
|
log.append(('key', _key, kwargs))
|
|
return _key
|
|
|
|
def compute_state(self, key):
|
|
event.set()
|
|
sleep(0.1)
|
|
log.append(('compute_state', key))
|
|
ret = key
|
|
if isinstance(ret, Exception):
|
|
raise ret
|
|
else:
|
|
return ret[0]
|
|
|
|
def render_one(self, state, **kwargs):
|
|
log.append(('render_one', state, kwargs))
|
|
if isinstance(state, Exception):
|
|
raise state
|
|
else:
|
|
return state
|
|
|
|
# Non-threaded tests
|
|
segment = TestSegment()
|
|
event.clear()
|
|
self.assertEqual(segment(pl=pl), None)
|
|
self.assertEqual(thread_number(), 1)
|
|
self.assertEqual(log, [
|
|
('key', (None,), {'pl': pl}),
|
|
('compute_state', (None,)),
|
|
('render_one', None, {'pl': pl}),
|
|
])
|
|
log[:] = ()
|
|
|
|
segment = TestSegment()
|
|
kwargs = {'pl': pl, '_key': ('abc',), 'update_first': False}
|
|
event.clear()
|
|
self.assertEqual(segment(**kwargs), 'abc')
|
|
kwargs.update(_key=('def',))
|
|
self.assertEqual(segment(**kwargs), 'def')
|
|
self.assertEqual(thread_number(), 1)
|
|
self.assertEqual(log, [
|
|
('key', ('abc',), {'pl': pl}),
|
|
('compute_state', ('abc',)),
|
|
('render_one', 'abc', {'pl': pl, '_key': ('abc',)}),
|
|
('key', ('def',), {'pl': pl}),
|
|
('compute_state', ('def',)),
|
|
('render_one', 'def', {'pl': pl, '_key': ('def',)}),
|
|
])
|
|
log[:] = ()
|
|
|
|
segment = TestSegment()
|
|
kwargs = {'pl': pl, '_key': ValueError('xyz'), 'update_first': False}
|
|
event.clear()
|
|
self.assertEqual(segment(**kwargs), None)
|
|
self.assertEqual(thread_number(), 1)
|
|
self.assertEqual(log, [
|
|
('key', kwargs['_key'], {'pl': pl}),
|
|
('compute_state', kwargs['_key']),
|
|
])
|
|
log[:] = ()
|
|
|
|
segment = TestSegment()
|
|
kwargs = {'pl': pl, '_key': (ValueError('abc'),), 'update_first': False}
|
|
event.clear()
|
|
self.assertRaises(ValueError, segment, **kwargs)
|
|
self.assertEqual(thread_number(), 1)
|
|
self.assertEqual(log, [
|
|
('key', kwargs['_key'], {'pl': pl}),
|
|
('compute_state', kwargs['_key']),
|
|
('render_one', kwargs['_key'][0], {'pl': pl, '_key': kwargs['_key']}),
|
|
])
|
|
log[:] = ()
|
|
|
|
# Threaded tests
|
|
segment = TestSegment()
|
|
kwargs = {'pl': pl, 'update_first': False, '_key': ('_abc',)}
|
|
event.clear()
|
|
segment.startup(**kwargs)
|
|
ret = segment(**kwargs)
|
|
self.assertEqual(thread_number(), 2)
|
|
segment.shutdown_event.set()
|
|
segment.thread.join()
|
|
self.assertEqual(ret, None)
|
|
self.assertEqual(log[:2], [
|
|
('key', kwargs['_key'], {'pl': pl}),
|
|
('render_one', None, {'pl': pl, '_key': kwargs['_key']}),
|
|
])
|
|
self.assertLessEqual(len(log), 3)
|
|
if len(log) > 2:
|
|
self.assertEqual(log[2], ('compute_state', kwargs['_key']))
|
|
log[:] = ()
|
|
|
|
segment = TestSegment()
|
|
kwargs = {'pl': pl, 'update_first': True, '_key': ('_abc',)}
|
|
event.clear()
|
|
segment.startup(**kwargs)
|
|
ret1 = segment(**kwargs)
|
|
kwargs.update(_key=('_def',))
|
|
ret2 = segment(**kwargs)
|
|
self.assertEqual(thread_number(), 2)
|
|
segment.shutdown_event.set()
|
|
segment.thread.join()
|
|
self.assertEqual(ret1, '_abc')
|
|
self.assertEqual(ret2, '_def')
|
|
self.assertEqual(log, [
|
|
('key', ('_abc',), {'pl': pl}),
|
|
('compute_state', ('_abc',)),
|
|
('render_one', '_abc', {'pl': pl, '_key': ('_abc',)}),
|
|
('key', ('_def',), {'pl': pl}),
|
|
('compute_state', ('_def',)),
|
|
('render_one', '_def', {'pl': pl, '_key': ('_def',)}),
|
|
])
|
|
log[:] = ()
|
|
|
|
|
|
class TestLib(TestCase):
|
|
def test_mergedicts(self):
|
|
d = {}
|
|
mergedicts(d, {'abc': {'def': 'ghi'}})
|
|
self.assertEqual(d, {'abc': {'def': 'ghi'}})
|
|
mergedicts(d, {'abc': {'def': {'ghi': 'jkl'}}})
|
|
self.assertEqual(d, {'abc': {'def': {'ghi': 'jkl'}}})
|
|
mergedicts(d, {})
|
|
self.assertEqual(d, {'abc': {'def': {'ghi': 'jkl'}}})
|
|
mergedicts(d, {'abc': {'mno': 'pqr'}})
|
|
self.assertEqual(d, {'abc': {'def': {'ghi': 'jkl'}, 'mno': 'pqr'}})
|
|
mergedicts(d, {'abc': {'def': REMOVE_THIS_KEY}})
|
|
self.assertEqual(d, {'abc': {'mno': 'pqr'}})
|
|
|
|
def test_add_divider_highlight_group(self):
|
|
def decorated_function_name(**kwargs):
|
|
return str(kwargs)
|
|
func = add_divider_highlight_group('hl_group')(decorated_function_name)
|
|
self.assertEqual(func.__name__, 'decorated_function_name')
|
|
self.assertEqual(func(kw={}), [{'contents': repr({str('kw'): {}}), 'divider_highlight_group': 'hl_group'}])
|
|
|
|
def test_humanize_bytes(self):
|
|
self.assertEqual(humanize_bytes(0), '0 B')
|
|
self.assertEqual(humanize_bytes(1), '1 B')
|
|
self.assertEqual(humanize_bytes(1, suffix='bit'), '1 bit')
|
|
self.assertEqual(humanize_bytes(1000, si_prefix=True), '1 kB')
|
|
self.assertEqual(humanize_bytes(1024, si_prefix=True), '1 kB')
|
|
self.assertEqual(humanize_bytes(1000000000, si_prefix=True), '1.00 GB')
|
|
self.assertEqual(humanize_bytes(1000000000, si_prefix=False), '953.7 MiB')
|
|
|
|
|
|
class TestVCS(TestCase):
|
|
def do_branch_rename_test(self, repo, q):
|
|
st = monotonic()
|
|
while monotonic() - st < 1:
|
|
# Give inotify time to deliver events
|
|
ans = repo.branch()
|
|
if hasattr(q, '__call__'):
|
|
if q(ans):
|
|
break
|
|
else:
|
|
if ans == q:
|
|
break
|
|
sleep(0.01)
|
|
if hasattr(q, '__call__'):
|
|
self.assertTrue(q(ans))
|
|
else:
|
|
self.assertEqual(ans, q)
|
|
|
|
def test_git(self):
|
|
create_watcher = get_fallback_create_watcher()
|
|
repo = guess(path=GIT_REPO, create_watcher=create_watcher)
|
|
self.assertNotEqual(repo, None)
|
|
self.assertEqual(repo.branch(), 'master')
|
|
self.assertEqual(repo.status(), None)
|
|
self.assertEqual(repo.status('file'), None)
|
|
with open(os.path.join(GIT_REPO, 'file'), 'w') as f:
|
|
f.write('abc')
|
|
f.flush()
|
|
self.assertEqual(repo.status(), ' U')
|
|
self.assertEqual(repo.status('file'), '??')
|
|
call(['git', 'add', '.'], cwd=GIT_REPO)
|
|
self.assertEqual(repo.status(), ' I ')
|
|
self.assertEqual(repo.status('file'), 'A ')
|
|
f.write('def')
|
|
f.flush()
|
|
self.assertEqual(repo.status(), 'DI ')
|
|
self.assertEqual(repo.status('file'), 'AM')
|
|
os.remove(os.path.join(GIT_REPO, 'file'))
|
|
# Test changing branch
|
|
self.assertEqual(repo.branch(), 'master')
|
|
try:
|
|
call(['git', 'branch', 'branch1'], cwd=GIT_REPO)
|
|
call(['git', 'checkout', '-q', 'branch1'], cwd=GIT_REPO)
|
|
self.do_branch_rename_test(repo, 'branch1')
|
|
call(['git', 'branch', 'branch2'], cwd=GIT_REPO)
|
|
call(['git', 'checkout', '-q', 'branch2'], cwd=GIT_REPO)
|
|
self.do_branch_rename_test(repo, 'branch2')
|
|
call(['git', 'checkout', '-q', '--detach', 'branch1'], cwd=GIT_REPO)
|
|
self.do_branch_rename_test(repo, lambda b: re.match(br'^[a-f0-9]+$', b))
|
|
finally:
|
|
call(['git', 'checkout', '-q', 'master'], cwd=GIT_REPO)
|
|
|
|
def test_git_sym(self):
|
|
create_watcher = get_fallback_create_watcher()
|
|
dotgit = os.path.join(GIT_REPO, '.git')
|
|
spacegit = os.path.join(GIT_REPO, ' .git ')
|
|
os.rename(dotgit, spacegit)
|
|
try:
|
|
with open(dotgit, 'w') as F:
|
|
F.write('gitdir: .git \n')
|
|
gitdir = git_directory(GIT_REPO)
|
|
self.assertTrue(os.path.isdir(gitdir))
|
|
self.assertEqual(gitdir, os.path.abspath(spacegit))
|
|
repo = guess(path=GIT_REPO, create_watcher=create_watcher)
|
|
self.assertEqual(repo.branch(), 'master')
|
|
finally:
|
|
os.remove(dotgit)
|
|
os.rename(spacegit, dotgit)
|
|
|
|
def test_mercurial(self):
|
|
if not use_mercurial:
|
|
raise SkipTest('Mercurial is not available')
|
|
create_watcher = get_fallback_create_watcher()
|
|
repo = guess(path=HG_REPO, create_watcher=create_watcher)
|
|
self.assertNotEqual(repo, None)
|
|
self.assertEqual(repo.branch(), 'default')
|
|
self.assertEqual(repo.status(), None)
|
|
with open(os.path.join(HG_REPO, 'file'), 'w') as f:
|
|
f.write('abc')
|
|
f.flush()
|
|
self.assertEqual(repo.status(), ' U')
|
|
self.assertEqual(repo.status('file'), 'U')
|
|
call(['hg', 'add', '.'], cwd=HG_REPO, stdout=PIPE)
|
|
self.assertEqual(repo.status(), 'D ')
|
|
self.assertEqual(repo.status('file'), 'A')
|
|
os.remove(os.path.join(HG_REPO, 'file'))
|
|
|
|
def test_bzr(self):
|
|
if not use_bzr:
|
|
raise SkipTest('Bazaar is not available')
|
|
create_watcher = get_fallback_create_watcher()
|
|
repo = guess(path=BZR_REPO, create_watcher=create_watcher)
|
|
self.assertNotEqual(repo, None, 'No bzr repo found. Do you have bzr installed?')
|
|
self.assertEqual(repo.branch(), 'test_powerline')
|
|
self.assertEqual(repo.status(), None)
|
|
with open(os.path.join(BZR_REPO, 'file'), 'w') as f:
|
|
f.write('abc')
|
|
self.assertEqual(repo.status(), ' U')
|
|
self.assertEqual(repo.status('file'), '? ')
|
|
call(['bzr', 'add', '-q', '.'], cwd=BZR_REPO, stdout=PIPE)
|
|
self.assertEqual(repo.status(), 'D ')
|
|
self.assertEqual(repo.status('file'), '+N')
|
|
call(['bzr', 'commit', '-q', '-m', 'initial commit'], cwd=BZR_REPO)
|
|
self.assertEqual(repo.status(), None)
|
|
with open(os.path.join(BZR_REPO, 'file'), 'w') as f:
|
|
f.write('def')
|
|
self.assertEqual(repo.status(), 'D ')
|
|
self.assertEqual(repo.status('file'), ' M')
|
|
self.assertEqual(repo.status('notexist'), None)
|
|
with open(os.path.join(BZR_REPO, 'ignored'), 'w') as f:
|
|
f.write('abc')
|
|
self.assertEqual(repo.status('ignored'), '? ')
|
|
# Test changing the .bzrignore file should update status
|
|
with open(os.path.join(BZR_REPO, '.bzrignore'), 'w') as f:
|
|
f.write('ignored')
|
|
self.assertEqual(repo.status('ignored'), None)
|
|
# Test changing the dirstate file should invalidate the cache for
|
|
# all files in the repo
|
|
with open(os.path.join(BZR_REPO, 'file2'), 'w') as f:
|
|
f.write('abc')
|
|
call(['bzr', 'add', 'file2'], cwd=BZR_REPO, stdout=PIPE)
|
|
call(['bzr', 'commit', '-q', '-m', 'file2 added'], cwd=BZR_REPO)
|
|
with open(os.path.join(BZR_REPO, 'file'), 'a') as f:
|
|
f.write('hello')
|
|
with open(os.path.join(BZR_REPO, 'file2'), 'a') as f:
|
|
f.write('hello')
|
|
self.assertEqual(repo.status('file'), ' M')
|
|
self.assertEqual(repo.status('file2'), ' M')
|
|
call(['bzr', 'commit', '-q', '-m', 'multi'], cwd=BZR_REPO)
|
|
self.assertEqual(repo.status('file'), None)
|
|
self.assertEqual(repo.status('file2'), None)
|
|
|
|
# Test changing branch
|
|
call(['bzr', 'nick', 'branch1'], cwd=BZR_REPO, stdout=PIPE, stderr=PIPE)
|
|
self.do_branch_rename_test(repo, 'branch1')
|
|
|
|
# Test branch name/status changes when swapping repos
|
|
for x in ('b1', 'b2'):
|
|
d = os.path.join(BZR_REPO, x)
|
|
os.mkdir(d)
|
|
call(['bzr', 'init', '-q'], cwd=d)
|
|
call(['bzr', 'nick', '-q', x], cwd=d)
|
|
repo = guess(path=d, create_watcher=create_watcher)
|
|
self.assertEqual(repo.branch(), x)
|
|
self.assertFalse(repo.status())
|
|
if x == 'b1':
|
|
open(os.path.join(d, 'dirty'), 'w').close()
|
|
self.assertTrue(repo.status())
|
|
os.rename(os.path.join(BZR_REPO, 'b1'), os.path.join(BZR_REPO, 'b'))
|
|
os.rename(os.path.join(BZR_REPO, 'b2'), os.path.join(BZR_REPO, 'b1'))
|
|
os.rename(os.path.join(BZR_REPO, 'b'), os.path.join(BZR_REPO, 'b2'))
|
|
for x, y in (('b1', 'b2'), ('b2', 'b1')):
|
|
d = os.path.join(BZR_REPO, x)
|
|
repo = guess(path=d, create_watcher=create_watcher)
|
|
self.do_branch_rename_test(repo, y)
|
|
if x == 'b1':
|
|
self.assertFalse(repo.status())
|
|
else:
|
|
self.assertTrue(repo.status())
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
cls.powerline_old_cwd = os.getcwd()
|
|
os.chdir(os.path.dirname(__file__))
|
|
call(['git', 'init', '--quiet', GIT_REPO])
|
|
assert os.path.isdir(GIT_REPO)
|
|
call(['git', 'config', '--local', 'user.name', 'Foo'], cwd=GIT_REPO)
|
|
call(['git', 'config', '--local', 'user.email', 'bar@example.org'], cwd=GIT_REPO)
|
|
call(['git', 'commit', '--allow-empty', '--message', 'Initial commit', '--quiet'], cwd=GIT_REPO)
|
|
if use_mercurial:
|
|
cls.powerline_old_HGRCPATH = os.environ.get('HGRCPATH')
|
|
os.environ['HGRCPATH'] = ''
|
|
call(['hg', 'init', HG_REPO])
|
|
with open(os.path.join(HG_REPO, '.hg', 'hgrc'), 'w') as hgrc:
|
|
hgrc.write('[ui]\n')
|
|
hgrc.write('username = Foo <bar@example.org>\n')
|
|
if use_bzr:
|
|
call(['bzr', 'init', '--quiet', BZR_REPO])
|
|
call(['bzr', 'config', 'email=Foo <bar@example.org>'], cwd=BZR_REPO)
|
|
call(['bzr', 'config', 'nickname=test_powerline'], cwd=BZR_REPO)
|
|
call(['bzr', 'config', 'create_signatures=0'], cwd=BZR_REPO)
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
for repo_dir in [GIT_REPO] + ([HG_REPO] if use_mercurial else []) + ([BZR_REPO] if use_bzr else []):
|
|
shutil.rmtree(repo_dir)
|
|
if use_mercurial:
|
|
if cls.powerline_old_HGRCPATH is None:
|
|
os.environ.pop('HGRCPATH')
|
|
else:
|
|
os.environ['HGRCPATH'] = cls.powerline_old_HGRCPATH
|
|
os.chdir(cls.powerline_old_cwd)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
from tests import main
|
|
main()
|