diff --git a/tests/lib/__init__.py b/tests/lib/__init__.py index c6ac15d3..8447ab2f 100644 --- a/tests/lib/__init__.py +++ b/tests/lib/__init__.py @@ -5,13 +5,14 @@ import sys class Pl(object): def __init__(self): + self.exceptions = [] self.errors = [] self.warns = [] self.debugs = [] self.prefix = None self.use_daemon_threads = True - for meth in ('error', 'warn', 'debug'): + for meth in ('error', 'warn', 'debug', 'exception'): exec (('def {0}(self, msg, *args, **kwargs):\n' ' self.{0}s.append((kwargs.get("prefix") or self.prefix, msg, args, kwargs))\n').format(meth)) diff --git a/tests/test_lib.py b/tests/test_lib.py index 0fb31430..d1735b65 100644 --- a/tests/test_lib.py +++ b/tests/test_lib.py @@ -4,7 +4,9 @@ from __future__ import division from powerline.lib import mergedicts, add_divider_highlight_group from powerline.lib.humanize_bytes import humanize_bytes from powerline.lib.vcs import guess +from powerline.lib.threaded import ThreadedSegment, KwThreadedSegment from powerline.lib.monotonic import monotonic +import threading import os import sys import re @@ -12,6 +14,335 @@ from time import sleep from subprocess import call, PIPE from functools import partial from tests import TestCase, SkipTest +from tests.lib import Pl + + +def thread_number(): + return len(threading.enumerate()) + + +class TestThreaded(TestCase): + def test_threaded_segment(self): + log = [] + pl = Pl() + updates = [(None,)] + lock = threading.Lock() + event = threading.Event() + block_event = threading.Event() + + class TestSegment(ThreadedSegment): + interval = 10 + + def set_state(self, **kwargs): + event.clear() + log.append(('set_state', kwargs)) + return super(TestSegment, self).set_state(**kwargs) + + def update(self, update_value): + block_event.wait() + event.set() + # Make sleep first to prevent some race conditions + log.append(('update', update_value)) + with lock: + ret = updates[0] + if isinstance(ret, Exception): + raise ret + else: + return ret[0] + + def render(self, update, **kwargs): + log.append(('render', update, kwargs)) + if isinstance(update, Exception): + raise update + else: + return update + + # Non-threaded tests + segment = TestSegment() + block_event.set() + updates[0] = (None,) + self.assertEqual(segment(pl=pl), None) + self.assertEqual(thread_number(), 1) + self.assertEqual(log, [ + ('set_state', {}), + ('update', None), + ('render', None, {'pl': pl, 'update_first': True}), + ]) + log[:] = () + + segment = TestSegment() + block_event.set() + updates[0] = ('abc',) + self.assertEqual(segment(pl=pl), 'abc') + self.assertEqual(thread_number(), 1) + self.assertEqual(log, [ + ('set_state', {}), + ('update', None), + ('render', 'abc', {'pl': pl, 'update_first': True}), + ]) + log[:] = () + + segment = TestSegment() + block_event.set() + updates[0] = ('abc',) + self.assertEqual(segment(pl=pl, update_first=False), 'abc') + self.assertEqual(thread_number(), 1) + self.assertEqual(log, [ + ('set_state', {}), + ('update', None), + ('render', 'abc', {'pl': pl, 'update_first': False}), + ]) + log[:] = () + + segment = TestSegment() + block_event.set() + updates[0] = ValueError('abc') + self.assertEqual(segment(pl=pl), None) + self.assertEqual(thread_number(), 1) + self.assertEqual(len(pl.exceptions), 1) + self.assertEqual(log, [ + ('set_state', {}), + ('update', None), + ]) + log[:] = () + pl.exceptions[:] = () + + segment = TestSegment() + block_event.set() + updates[0] = (TypeError('def'),) + self.assertRaises(TypeError, segment, pl=pl) + self.assertEqual(thread_number(), 1) + self.assertEqual(log, [ + ('set_state', {}), + ('update', None), + ('render', updates[0][0], {'pl': pl, 'update_first': True}), + ]) + log[:] = () + + # Threaded tests + segment = TestSegment() + block_event.clear() + kwargs = {'pl': pl, 'update_first': False, 'other': 1} + with lock: + updates[0] = ('abc',) + segment.startup(**kwargs) + ret = segment(**kwargs) + self.assertEqual(thread_number(), 2) + block_event.set() + event.wait() + segment.shutdown_event.set() + segment.thread.join() + self.assertEqual(ret, None) + self.assertEqual(log, [ + ('set_state', {'update_first': False, 'other': 1}), + ('render', None, {'pl': pl, 'update_first': False, 'other': 1}), + ('update', None), + ]) + log[:] = () + + segment = TestSegment() + block_event.set() + kwargs = {'pl': pl, 'update_first': True, 'other': 1} + with lock: + updates[0] = ('def',) + segment.startup(**kwargs) + ret = segment(**kwargs) + self.assertEqual(thread_number(), 2) + segment.shutdown_event.set() + segment.thread.join() + self.assertEqual(ret, 'def') + self.assertEqual(log, [ + ('set_state', {'update_first': True, 'other': 1}), + ('update', None), + ('render', 'def', {'pl': pl, 'update_first': True, 'other': 1}), + ]) + log[:] = () + + segment = TestSegment() + block_event.set() + kwargs = {'pl': pl, 'update_first': True, 'interval': 0.2} + with lock: + updates[0] = ('abc',) + segment.startup(**kwargs) + start = monotonic() + ret1 = segment(**kwargs) + with lock: + updates[0] = ('def',) + self.assertEqual(thread_number(), 2) + sleep(0.5) + ret2 = segment(**kwargs) + segment.shutdown_event.set() + segment.thread.join() + end = monotonic() + duration = end - start + self.assertEqual(ret1, 'abc') + self.assertEqual(ret2, 'def') + self.assertEqual(log[:5], [ + ('set_state', {'update_first': True, 'interval': 0.2}), + ('update', None), + ('render', 'abc', {'pl': pl, 'update_first': True, 'interval': 0.2}), + ('update', 'abc'), + ('update', 'def'), + ]) + num_runs = len([e for e in log if e[0] == 'update']) + self.assertAlmostEqual(duration / 0.2, num_runs, delta=1) + log[:] = () + + segment = TestSegment() + block_event.set() + kwargs = {'pl': pl, 'update_first': True, 'interval': 0.2} + with lock: + updates[0] = ('ghi',) + segment.startup(**kwargs) + start = monotonic() + ret1 = segment(**kwargs) + with lock: + updates[0] = TypeError('jkl') + self.assertEqual(thread_number(), 2) + sleep(0.5) + ret2 = segment(**kwargs) + segment.shutdown_event.set() + segment.thread.join() + end = monotonic() + duration = end - start + self.assertEqual(ret1, 'ghi') + self.assertEqual(ret2, None) + self.assertEqual(log[:5], [ + ('set_state', {'update_first': True, 'interval': 0.2}), + ('update', None), + ('render', 'ghi', {'pl': pl, 'update_first': True, 'interval': 0.2}), + ('update', 'ghi'), + ('update', 'ghi'), + ]) + num_runs = len([e for e in log if e[0] == 'update']) + self.assertAlmostEqual(duration / 0.2, num_runs, delta=1) + self.assertEqual(num_runs - 1, len(pl.exceptions)) + log[:] = () + + def test_kw_threaded_segment(self): + log = [] + pl = Pl() + lock = threading.Lock() + event = threading.Event() + + class TestSegment(KwThreadedSegment): + interval = 10 + + @staticmethod + def key(_key=(None,), **kwargs): + log.append(('key', _key, kwargs)) + return _key + + def compute_state(self, key): + event.set() + sleep(0.1) + log.append(('compute_state', key)) + with lock: + ret = key + if isinstance(ret, Exception): + raise ret + else: + return ret[0] + + def render_one(self, state, **kwargs): + log.append(('render_one', state, kwargs)) + if isinstance(state, Exception): + raise state + else: + return state + + # Non-threaded tests + segment = TestSegment() + event.clear() + self.assertEqual(segment(pl=pl), None) + self.assertEqual(thread_number(), 1) + self.assertEqual(log, [ + ('key', (None,), {'pl': pl}), + ('compute_state', (None,)), + ('render_one', None, {'pl': pl}), + ]) + log[:] = () + + segment = TestSegment() + kwargs = {'pl': pl, '_key': ('abc',), 'update_first': False} + event.clear() + self.assertEqual(segment(**kwargs), 'abc') + kwargs.update(_key=('def',)) + self.assertEqual(segment(**kwargs), 'def') + self.assertEqual(thread_number(), 1) + self.assertEqual(log, [ + ('key', ('abc',), {'pl': pl}), + ('compute_state', ('abc',)), + ('render_one', 'abc', {'pl': pl, '_key': ('abc',)}), + ('key', ('def',), {'pl': pl}), + ('compute_state', ('def',)), + ('render_one', 'def', {'pl': pl, '_key': ('def',)}), + ]) + log[:] = () + + segment = TestSegment() + kwargs = {'pl': pl, '_key': ValueError('xyz'), 'update_first': False} + event.clear() + self.assertEqual(segment(**kwargs), None) + self.assertEqual(thread_number(), 1) + self.assertEqual(log, [ + ('key', kwargs['_key'], {'pl': pl}), + ('compute_state', kwargs['_key']), + ]) + log[:] = () + + segment = TestSegment() + kwargs = {'pl': pl, '_key': (ValueError('abc'),), 'update_first': False} + event.clear() + self.assertRaises(ValueError, segment, **kwargs) + self.assertEqual(thread_number(), 1) + self.assertEqual(log, [ + ('key', kwargs['_key'], {'pl': pl}), + ('compute_state', kwargs['_key']), + ('render_one', kwargs['_key'][0], {'pl': pl, '_key': kwargs['_key']}), + ]) + log[:] = () + + # Threaded tests + segment = TestSegment() + kwargs = {'pl': pl, 'update_first': False, '_key': ('_abc',)} + event.clear() + segment.startup(**kwargs) + ret = segment(**kwargs) + self.assertEqual(thread_number(), 2) + segment.shutdown_event.set() + segment.thread.join() + self.assertEqual(ret, None) + self.assertEqual(log[:2], [ + ('key', kwargs['_key'], {'pl': pl}), + ('render_one', None, {'pl': pl, '_key': kwargs['_key']}), + ]) + self.assertLessEqual(len(log), 3) + if len(log) > 2: + self.assertEqual(log[2], ('compute_state', kwargs['_key'])) + log[:] = () + + segment = TestSegment() + kwargs = {'pl': pl, 'update_first': True, '_key': ('_abc',)} + event.clear() + segment.startup(**kwargs) + ret1 = segment(**kwargs) + kwargs.update(_key=('_def',)) + ret2 = segment(**kwargs) + self.assertEqual(thread_number(), 2) + segment.shutdown_event.set() + segment.thread.join() + self.assertEqual(ret1, '_abc') + self.assertEqual(ret2, '_def') + self.assertEqual(log, [ + ('key', ('_abc',), {'pl': pl}), + ('compute_state', ('_abc',)), + ('render_one', '_abc', {'pl': pl, '_key': ('_abc',)}), + ('key', ('_def',), {'pl': pl}), + ('compute_state', ('_def',)), + ('render_one', '_def', {'pl': pl, '_key': ('_def',)}), + ]) + log[:] = () class TestLib(TestCase):