Add powerline.lib.threaded tests

This commit is contained in:
ZyX 2014-02-23 14:36:08 +04:00
parent 68a6fd056c
commit ba41cecb72
2 changed files with 333 additions and 1 deletions

View File

@ -5,13 +5,14 @@ import sys
class Pl(object): class Pl(object):
def __init__(self): def __init__(self):
self.exceptions = []
self.errors = [] self.errors = []
self.warns = [] self.warns = []
self.debugs = [] self.debugs = []
self.prefix = None self.prefix = None
self.use_daemon_threads = True self.use_daemon_threads = True
for meth in ('error', 'warn', 'debug'): for meth in ('error', 'warn', 'debug', 'exception'):
exec (('def {0}(self, msg, *args, **kwargs):\n' exec (('def {0}(self, msg, *args, **kwargs):\n'
' self.{0}s.append((kwargs.get("prefix") or self.prefix, msg, args, kwargs))\n').format(meth)) ' self.{0}s.append((kwargs.get("prefix") or self.prefix, msg, args, kwargs))\n').format(meth))

View File

@ -4,7 +4,9 @@ from __future__ import division
from powerline.lib import mergedicts, add_divider_highlight_group from powerline.lib import mergedicts, add_divider_highlight_group
from powerline.lib.humanize_bytes import humanize_bytes from powerline.lib.humanize_bytes import humanize_bytes
from powerline.lib.vcs import guess from powerline.lib.vcs import guess
from powerline.lib.threaded import ThreadedSegment, KwThreadedSegment
from powerline.lib.monotonic import monotonic from powerline.lib.monotonic import monotonic
import threading
import os import os
import sys import sys
import re import re
@ -12,6 +14,335 @@ from time import sleep
from subprocess import call, PIPE from subprocess import call, PIPE
from functools import partial from functools import partial
from tests import TestCase, SkipTest from tests import TestCase, SkipTest
from tests.lib import Pl
def thread_number():
return len(threading.enumerate())
class TestThreaded(TestCase):
def test_threaded_segment(self):
log = []
pl = Pl()
updates = [(None,)]
lock = threading.Lock()
event = threading.Event()
block_event = threading.Event()
class TestSegment(ThreadedSegment):
interval = 10
def set_state(self, **kwargs):
event.clear()
log.append(('set_state', kwargs))
return super(TestSegment, self).set_state(**kwargs)
def update(self, update_value):
block_event.wait()
event.set()
# Make sleep first to prevent some race conditions
log.append(('update', update_value))
with lock:
ret = updates[0]
if isinstance(ret, Exception):
raise ret
else:
return ret[0]
def render(self, update, **kwargs):
log.append(('render', update, kwargs))
if isinstance(update, Exception):
raise update
else:
return update
# Non-threaded tests
segment = TestSegment()
block_event.set()
updates[0] = (None,)
self.assertEqual(segment(pl=pl), None)
self.assertEqual(thread_number(), 1)
self.assertEqual(log, [
('set_state', {}),
('update', None),
('render', None, {'pl': pl, 'update_first': True}),
])
log[:] = ()
segment = TestSegment()
block_event.set()
updates[0] = ('abc',)
self.assertEqual(segment(pl=pl), 'abc')
self.assertEqual(thread_number(), 1)
self.assertEqual(log, [
('set_state', {}),
('update', None),
('render', 'abc', {'pl': pl, 'update_first': True}),
])
log[:] = ()
segment = TestSegment()
block_event.set()
updates[0] = ('abc',)
self.assertEqual(segment(pl=pl, update_first=False), 'abc')
self.assertEqual(thread_number(), 1)
self.assertEqual(log, [
('set_state', {}),
('update', None),
('render', 'abc', {'pl': pl, 'update_first': False}),
])
log[:] = ()
segment = TestSegment()
block_event.set()
updates[0] = ValueError('abc')
self.assertEqual(segment(pl=pl), None)
self.assertEqual(thread_number(), 1)
self.assertEqual(len(pl.exceptions), 1)
self.assertEqual(log, [
('set_state', {}),
('update', None),
])
log[:] = ()
pl.exceptions[:] = ()
segment = TestSegment()
block_event.set()
updates[0] = (TypeError('def'),)
self.assertRaises(TypeError, segment, pl=pl)
self.assertEqual(thread_number(), 1)
self.assertEqual(log, [
('set_state', {}),
('update', None),
('render', updates[0][0], {'pl': pl, 'update_first': True}),
])
log[:] = ()
# Threaded tests
segment = TestSegment()
block_event.clear()
kwargs = {'pl': pl, 'update_first': False, 'other': 1}
with lock:
updates[0] = ('abc',)
segment.startup(**kwargs)
ret = segment(**kwargs)
self.assertEqual(thread_number(), 2)
block_event.set()
event.wait()
segment.shutdown_event.set()
segment.thread.join()
self.assertEqual(ret, None)
self.assertEqual(log, [
('set_state', {'update_first': False, 'other': 1}),
('render', None, {'pl': pl, 'update_first': False, 'other': 1}),
('update', None),
])
log[:] = ()
segment = TestSegment()
block_event.set()
kwargs = {'pl': pl, 'update_first': True, 'other': 1}
with lock:
updates[0] = ('def',)
segment.startup(**kwargs)
ret = segment(**kwargs)
self.assertEqual(thread_number(), 2)
segment.shutdown_event.set()
segment.thread.join()
self.assertEqual(ret, 'def')
self.assertEqual(log, [
('set_state', {'update_first': True, 'other': 1}),
('update', None),
('render', 'def', {'pl': pl, 'update_first': True, 'other': 1}),
])
log[:] = ()
segment = TestSegment()
block_event.set()
kwargs = {'pl': pl, 'update_first': True, 'interval': 0.2}
with lock:
updates[0] = ('abc',)
segment.startup(**kwargs)
start = monotonic()
ret1 = segment(**kwargs)
with lock:
updates[0] = ('def',)
self.assertEqual(thread_number(), 2)
sleep(0.5)
ret2 = segment(**kwargs)
segment.shutdown_event.set()
segment.thread.join()
end = monotonic()
duration = end - start
self.assertEqual(ret1, 'abc')
self.assertEqual(ret2, 'def')
self.assertEqual(log[:5], [
('set_state', {'update_first': True, 'interval': 0.2}),
('update', None),
('render', 'abc', {'pl': pl, 'update_first': True, 'interval': 0.2}),
('update', 'abc'),
('update', 'def'),
])
num_runs = len([e for e in log if e[0] == 'update'])
self.assertAlmostEqual(duration / 0.2, num_runs, delta=1)
log[:] = ()
segment = TestSegment()
block_event.set()
kwargs = {'pl': pl, 'update_first': True, 'interval': 0.2}
with lock:
updates[0] = ('ghi',)
segment.startup(**kwargs)
start = monotonic()
ret1 = segment(**kwargs)
with lock:
updates[0] = TypeError('jkl')
self.assertEqual(thread_number(), 2)
sleep(0.5)
ret2 = segment(**kwargs)
segment.shutdown_event.set()
segment.thread.join()
end = monotonic()
duration = end - start
self.assertEqual(ret1, 'ghi')
self.assertEqual(ret2, None)
self.assertEqual(log[:5], [
('set_state', {'update_first': True, 'interval': 0.2}),
('update', None),
('render', 'ghi', {'pl': pl, 'update_first': True, 'interval': 0.2}),
('update', 'ghi'),
('update', 'ghi'),
])
num_runs = len([e for e in log if e[0] == 'update'])
self.assertAlmostEqual(duration / 0.2, num_runs, delta=1)
self.assertEqual(num_runs - 1, len(pl.exceptions))
log[:] = ()
def test_kw_threaded_segment(self):
log = []
pl = Pl()
lock = threading.Lock()
event = threading.Event()
class TestSegment(KwThreadedSegment):
interval = 10
@staticmethod
def key(_key=(None,), **kwargs):
log.append(('key', _key, kwargs))
return _key
def compute_state(self, key):
event.set()
sleep(0.1)
log.append(('compute_state', key))
with lock:
ret = key
if isinstance(ret, Exception):
raise ret
else:
return ret[0]
def render_one(self, state, **kwargs):
log.append(('render_one', state, kwargs))
if isinstance(state, Exception):
raise state
else:
return state
# Non-threaded tests
segment = TestSegment()
event.clear()
self.assertEqual(segment(pl=pl), None)
self.assertEqual(thread_number(), 1)
self.assertEqual(log, [
('key', (None,), {'pl': pl}),
('compute_state', (None,)),
('render_one', None, {'pl': pl}),
])
log[:] = ()
segment = TestSegment()
kwargs = {'pl': pl, '_key': ('abc',), 'update_first': False}
event.clear()
self.assertEqual(segment(**kwargs), 'abc')
kwargs.update(_key=('def',))
self.assertEqual(segment(**kwargs), 'def')
self.assertEqual(thread_number(), 1)
self.assertEqual(log, [
('key', ('abc',), {'pl': pl}),
('compute_state', ('abc',)),
('render_one', 'abc', {'pl': pl, '_key': ('abc',)}),
('key', ('def',), {'pl': pl}),
('compute_state', ('def',)),
('render_one', 'def', {'pl': pl, '_key': ('def',)}),
])
log[:] = ()
segment = TestSegment()
kwargs = {'pl': pl, '_key': ValueError('xyz'), 'update_first': False}
event.clear()
self.assertEqual(segment(**kwargs), None)
self.assertEqual(thread_number(), 1)
self.assertEqual(log, [
('key', kwargs['_key'], {'pl': pl}),
('compute_state', kwargs['_key']),
])
log[:] = ()
segment = TestSegment()
kwargs = {'pl': pl, '_key': (ValueError('abc'),), 'update_first': False}
event.clear()
self.assertRaises(ValueError, segment, **kwargs)
self.assertEqual(thread_number(), 1)
self.assertEqual(log, [
('key', kwargs['_key'], {'pl': pl}),
('compute_state', kwargs['_key']),
('render_one', kwargs['_key'][0], {'pl': pl, '_key': kwargs['_key']}),
])
log[:] = ()
# Threaded tests
segment = TestSegment()
kwargs = {'pl': pl, 'update_first': False, '_key': ('_abc',)}
event.clear()
segment.startup(**kwargs)
ret = segment(**kwargs)
self.assertEqual(thread_number(), 2)
segment.shutdown_event.set()
segment.thread.join()
self.assertEqual(ret, None)
self.assertEqual(log[:2], [
('key', kwargs['_key'], {'pl': pl}),
('render_one', None, {'pl': pl, '_key': kwargs['_key']}),
])
self.assertLessEqual(len(log), 3)
if len(log) > 2:
self.assertEqual(log[2], ('compute_state', kwargs['_key']))
log[:] = ()
segment = TestSegment()
kwargs = {'pl': pl, 'update_first': True, '_key': ('_abc',)}
event.clear()
segment.startup(**kwargs)
ret1 = segment(**kwargs)
kwargs.update(_key=('_def',))
ret2 = segment(**kwargs)
self.assertEqual(thread_number(), 2)
segment.shutdown_event.set()
segment.thread.join()
self.assertEqual(ret1, '_abc')
self.assertEqual(ret2, '_def')
self.assertEqual(log, [
('key', ('_abc',), {'pl': pl}),
('compute_state', ('_abc',)),
('render_one', '_abc', {'pl': pl, '_key': ('_abc',)}),
('key', ('_def',), {'pl': pl}),
('compute_state', ('_def',)),
('render_one', '_def', {'pl': pl, '_key': ('_def',)}),
])
log[:] = ()
class TestLib(TestCase): class TestLib(TestCase):