Merge pull request #822 from ZyX-I/threaded-fixes

Some fixes for powerline.lib.threaded

Fixes #813
This commit is contained in:
ZyX-I 2014-02-26 07:28:45 +03:00
commit 70a94ee7d3
3 changed files with 416 additions and 53 deletions

View File

@ -37,7 +37,7 @@ class ThreadedSegment(MultiRunnedThread):
def __init__(self): def __init__(self):
super(ThreadedSegment, self).__init__() super(ThreadedSegment, self).__init__()
self.run_once = True self.run_once = True
self.skip = False self.crashed = False
self.crashed_value = None self.crashed_value = None
self.update_value = None self.update_value = None
self.updated = False self.updated = False
@ -53,38 +53,48 @@ class ThreadedSegment(MultiRunnedThread):
# cursor”. # cursor”.
# #
# If running once .update() is called in __call__. # If running once .update() is called in __call__.
update_value = self.get_update_value(update_first and self.update_first)
self.start() self.start()
elif not self.updated: update_value = self.get_update_value(self.do_update_first)
update_value = self.get_update_value(True)
self.updated = True
else: else:
update_value = self.update_value update_value = self.get_update_value(not self.updated)
if self.skip: if self.crashed:
return self.crashed_value return self.crashed_value
return self.render(update_value, update_first=update_first, pl=pl, **kwargs) return self.render(update_value, update_first=update_first, pl=pl, **kwargs)
def set_update_value(self):
try:
self.update_value = self.update(self.update_value)
except Exception as e:
self.exception('Exception while updating: {0}', str(e))
self.crashed = True
except KeyboardInterrupt:
self.warn('Caught keyboard interrupt while updating')
self.crashed = True
else:
self.crashed = False
self.updated = True
def get_update_value(self, update=False): def get_update_value(self, update=False):
if update: if update:
self.update_value = self.update(self.update_value) self.set_update_value()
return self.update_value return self.update_value
def run(self): def run(self):
while not self.shutdown_event.is_set(): if self.do_update_first:
start_time = monotonic() start_time = monotonic()
try: while True:
self.update_value = self.update(self.update_value) self.shutdown_event.wait(max(self.interval - (monotonic() - start_time), self.min_sleep_time))
except Exception as e: if self.shutdown_event.is_set():
self.exception('Exception while updating: {0}', str(e)) break
self.skip = True start_time = monotonic()
except KeyboardInterrupt: self.set_update_value()
self.warn('Caught keyboard interrupt while updating') else:
self.skip = True while not self.shutdown_event.is_set():
else: start_time = monotonic()
self.skip = False self.set_update_value()
self.shutdown_event.wait(max(self.interval - (monotonic() - start_time), self.min_sleep_time)) self.shutdown_event.wait(max(self.interval - (monotonic() - start_time), self.min_sleep_time))
def shutdown(self): def shutdown(self):
self.shutdown_event.set() self.shutdown_event.set()
@ -104,7 +114,8 @@ class ThreadedSegment(MultiRunnedThread):
def set_state(self, interval=None, update_first=True, shutdown_event=None, **kwargs): def set_state(self, interval=None, update_first=True, shutdown_event=None, **kwargs):
self.set_interval(interval) self.set_interval(interval)
self.shutdown_event = shutdown_event or Event() self.shutdown_event = shutdown_event or Event()
self.updated = self.updated or (not (update_first and self.update_first)) self.do_update_first = update_first and self.update_first
self.updated = self.updated or (not self.do_update_first)
def startup(self, pl, **kwargs): def startup(self, pl, **kwargs):
self.run_once = False self.run_once = False
@ -136,7 +147,6 @@ class ThreadedSegment(MultiRunnedThread):
class KwThreadedSegment(ThreadedSegment): class KwThreadedSegment(ThreadedSegment):
drop_interval = 10 * 60
update_first = True update_first = True
def __init__(self): def __init__(self):
@ -144,54 +154,75 @@ class KwThreadedSegment(ThreadedSegment):
self.updated = True self.updated = True
self.update_value = ({}, set()) self.update_value = ({}, set())
self.write_lock = Lock() self.write_lock = Lock()
self.new_queries = {} self.new_queries = []
@staticmethod @staticmethod
def key(**kwargs): def key(**kwargs):
return frozenset(kwargs.items()) return frozenset(kwargs.items())
def render(self, update_value, update_first, **kwargs): def render(self, update_value, update_first, key=None, after_update=False, **kwargs):
queries, crashed = update_value queries, crashed = update_value
key = self.key(**kwargs) if key is None:
key = self.key(**kwargs)
if key in crashed: if key in crashed:
return self.crashed_value return self.crashed_value
try: try:
update_state = queries[key][1] update_state = queries[key][1]
except KeyError: except KeyError:
# Allow only to forbid to compute missing values: in either user with self.write_lock:
# configuration or in subclasses. self.new_queries.append(key)
update_state = self.compute_state(key) if ((update_first and self.update_first) or self.run_once) else None if self.do_update_first or self.run_once:
if after_update:
self.error('internal error: value was not computed even though update_first was set')
update_state = None
else:
return self.render(
update_value=self.get_update_value(True),
update_first=False,
key=key,
after_update=True,
**kwargs
)
else:
update_state = None
with self.write_lock:
self.new_queries[key] = (monotonic(), update_state)
return self.render_one(update_state, **kwargs) return self.render_one(update_state, **kwargs)
def update_one(self, crashed, updates, key):
try:
updates[key] = (monotonic(), self.compute_state(key))
except Exception as e:
self.exception('Exception while computing state for {0!r}: {1}', key, str(e))
crashed.add(key)
except KeyboardInterrupt:
self.warn('Interrupt while computing state for {0!r}', key)
crashed.add(key)
def update(self, old_update_value): def update(self, old_update_value):
updates = {} updates = {}
crashed = set() crashed = set()
update_value = (updates, crashed) update_value = (updates, crashed)
queries = old_update_value[0] queries = old_update_value[0]
new_queries = self.new_queries
with self.write_lock: with self.write_lock:
if self.new_queries: self.new_queries = []
queries.update(self.new_queries)
self.new_queries.clear()
for key, (last_query_time, state) in queries.items(): for key, (last_query_time, state) in queries.items():
if last_query_time < monotonic() < last_query_time + self.drop_interval: if last_query_time < monotonic() < last_query_time + self.interval:
try: updates[key] = (last_query_time, state)
updates[key] = (last_query_time, self.compute_state(key)) else:
except Exception as e: self.update_one(crashed, updates, key)
self.exception('Exception while computing state for {0!r}: {1}', key, str(e))
crashed.add(key) for key in new_queries:
except KeyboardInterrupt: self.update_one(crashed, updates, key)
self.warn('Interrupt while computing state for {0!r}', key)
crashed.add(key)
return update_value return update_value
def set_state(self, interval=None, shutdown_event=None, **kwargs): def set_state(self, interval=None, update_first=True, shutdown_event=None, **kwargs):
self.set_interval(interval) self.set_interval(interval)
self.do_update_first = update_first and self.update_first
self.shutdown_event = shutdown_event or Event() self.shutdown_event = shutdown_event or Event()
@staticmethod @staticmethod

View File

@ -5,13 +5,14 @@ import sys
class Pl(object): class Pl(object):
def __init__(self): def __init__(self):
self.exceptions = []
self.errors = [] self.errors = []
self.warns = [] self.warns = []
self.debugs = [] self.debugs = []
self.prefix = None self.prefix = None
self.use_daemon_threads = True self.use_daemon_threads = True
for meth in ('error', 'warn', 'debug'): for meth in ('error', 'warn', 'debug', 'exception'):
exec (('def {0}(self, msg, *args, **kwargs):\n' exec (('def {0}(self, msg, *args, **kwargs):\n'
' self.{0}s.append((kwargs.get("prefix") or self.prefix, msg, args, kwargs))\n').format(meth)) ' self.{0}s.append((kwargs.get("prefix") or self.prefix, msg, args, kwargs))\n').format(meth))

View File

@ -1,13 +1,346 @@
# vim:fileencoding=utf-8:noet # vim:fileencoding=utf-8:noet
from __future__ import division
from powerline.lib import mergedicts, add_divider_highlight_group from powerline.lib import mergedicts, add_divider_highlight_group
from powerline.lib.humanize_bytes import humanize_bytes from powerline.lib.humanize_bytes import humanize_bytes
from powerline.lib.vcs import guess from powerline.lib.vcs import guess
from subprocess import call, PIPE from powerline.lib.threaded import ThreadedSegment, KwThreadedSegment
from powerline.lib.monotonic import monotonic
import threading
import os import os
import sys import sys
import re import re
from time import sleep
from subprocess import call, PIPE
from functools import partial from functools import partial
from tests import TestCase, SkipTest from tests import TestCase, SkipTest
from tests.lib import Pl
def thread_number():
return len(threading.enumerate())
class TestThreaded(TestCase):
def test_threaded_segment(self):
log = []
pl = Pl()
updates = [(None,)]
lock = threading.Lock()
event = threading.Event()
block_event = threading.Event()
class TestSegment(ThreadedSegment):
interval = 10
def set_state(self, **kwargs):
event.clear()
log.append(('set_state', kwargs))
return super(TestSegment, self).set_state(**kwargs)
def update(self, update_value):
block_event.wait()
event.set()
# Make sleep first to prevent some race conditions
log.append(('update', update_value))
with lock:
ret = updates[0]
if isinstance(ret, Exception):
raise ret
else:
return ret[0]
def render(self, update, **kwargs):
log.append(('render', update, kwargs))
if isinstance(update, Exception):
raise update
else:
return update
# Non-threaded tests
segment = TestSegment()
block_event.set()
updates[0] = (None,)
self.assertEqual(segment(pl=pl), None)
self.assertEqual(thread_number(), 1)
self.assertEqual(log, [
('set_state', {}),
('update', None),
('render', None, {'pl': pl, 'update_first': True}),
])
log[:] = ()
segment = TestSegment()
block_event.set()
updates[0] = ('abc',)
self.assertEqual(segment(pl=pl), 'abc')
self.assertEqual(thread_number(), 1)
self.assertEqual(log, [
('set_state', {}),
('update', None),
('render', 'abc', {'pl': pl, 'update_first': True}),
])
log[:] = ()
segment = TestSegment()
block_event.set()
updates[0] = ('abc',)
self.assertEqual(segment(pl=pl, update_first=False), 'abc')
self.assertEqual(thread_number(), 1)
self.assertEqual(log, [
('set_state', {}),
('update', None),
('render', 'abc', {'pl': pl, 'update_first': False}),
])
log[:] = ()
segment = TestSegment()
block_event.set()
updates[0] = ValueError('abc')
self.assertEqual(segment(pl=pl), None)
self.assertEqual(thread_number(), 1)
self.assertEqual(len(pl.exceptions), 1)
self.assertEqual(log, [
('set_state', {}),
('update', None),
])
log[:] = ()
pl.exceptions[:] = ()
segment = TestSegment()
block_event.set()
updates[0] = (TypeError('def'),)
self.assertRaises(TypeError, segment, pl=pl)
self.assertEqual(thread_number(), 1)
self.assertEqual(log, [
('set_state', {}),
('update', None),
('render', updates[0][0], {'pl': pl, 'update_first': True}),
])
log[:] = ()
# Threaded tests
segment = TestSegment()
block_event.clear()
kwargs = {'pl': pl, 'update_first': False, 'other': 1}
with lock:
updates[0] = ('abc',)
segment.startup(**kwargs)
ret = segment(**kwargs)
self.assertEqual(thread_number(), 2)
block_event.set()
event.wait()
segment.shutdown_event.set()
segment.thread.join()
self.assertEqual(ret, None)
self.assertEqual(log, [
('set_state', {'update_first': False, 'other': 1}),
('render', None, {'pl': pl, 'update_first': False, 'other': 1}),
('update', None),
])
log[:] = ()
segment = TestSegment()
block_event.set()
kwargs = {'pl': pl, 'update_first': True, 'other': 1}
with lock:
updates[0] = ('def',)
segment.startup(**kwargs)
ret = segment(**kwargs)
self.assertEqual(thread_number(), 2)
segment.shutdown_event.set()
segment.thread.join()
self.assertEqual(ret, 'def')
self.assertEqual(log, [
('set_state', {'update_first': True, 'other': 1}),
('update', None),
('render', 'def', {'pl': pl, 'update_first': True, 'other': 1}),
])
log[:] = ()
segment = TestSegment()
block_event.set()
kwargs = {'pl': pl, 'update_first': True, 'interval': 0.2}
with lock:
updates[0] = ('abc',)
segment.startup(**kwargs)
start = monotonic()
ret1 = segment(**kwargs)
with lock:
updates[0] = ('def',)
self.assertEqual(thread_number(), 2)
sleep(0.5)
ret2 = segment(**kwargs)
segment.shutdown_event.set()
segment.thread.join()
end = monotonic()
duration = end - start
self.assertEqual(ret1, 'abc')
self.assertEqual(ret2, 'def')
self.assertEqual(log[:5], [
('set_state', {'update_first': True, 'interval': 0.2}),
('update', None),
('render', 'abc', {'pl': pl, 'update_first': True, 'interval': 0.2}),
('update', 'abc'),
('update', 'def'),
])
num_runs = len([e for e in log if e[0] == 'update'])
self.assertAlmostEqual(duration / 0.2, num_runs, delta=1)
log[:] = ()
segment = TestSegment()
block_event.set()
kwargs = {'pl': pl, 'update_first': True, 'interval': 0.2}
with lock:
updates[0] = ('ghi',)
segment.startup(**kwargs)
start = monotonic()
ret1 = segment(**kwargs)
with lock:
updates[0] = TypeError('jkl')
self.assertEqual(thread_number(), 2)
sleep(0.5)
ret2 = segment(**kwargs)
segment.shutdown_event.set()
segment.thread.join()
end = monotonic()
duration = end - start
self.assertEqual(ret1, 'ghi')
self.assertEqual(ret2, None)
self.assertEqual(log[:5], [
('set_state', {'update_first': True, 'interval': 0.2}),
('update', None),
('render', 'ghi', {'pl': pl, 'update_first': True, 'interval': 0.2}),
('update', 'ghi'),
('update', 'ghi'),
])
num_runs = len([e for e in log if e[0] == 'update'])
self.assertAlmostEqual(duration / 0.2, num_runs, delta=1)
self.assertEqual(num_runs - 1, len(pl.exceptions))
log[:] = ()
def test_kw_threaded_segment(self):
log = []
pl = Pl()
event = threading.Event()
class TestSegment(KwThreadedSegment):
interval = 10
@staticmethod
def key(_key=(None,), **kwargs):
log.append(('key', _key, kwargs))
return _key
def compute_state(self, key):
event.set()
sleep(0.1)
log.append(('compute_state', key))
ret = key
if isinstance(ret, Exception):
raise ret
else:
return ret[0]
def render_one(self, state, **kwargs):
log.append(('render_one', state, kwargs))
if isinstance(state, Exception):
raise state
else:
return state
# Non-threaded tests
segment = TestSegment()
event.clear()
self.assertEqual(segment(pl=pl), None)
self.assertEqual(thread_number(), 1)
self.assertEqual(log, [
('key', (None,), {'pl': pl}),
('compute_state', (None,)),
('render_one', None, {'pl': pl}),
])
log[:] = ()
segment = TestSegment()
kwargs = {'pl': pl, '_key': ('abc',), 'update_first': False}
event.clear()
self.assertEqual(segment(**kwargs), 'abc')
kwargs.update(_key=('def',))
self.assertEqual(segment(**kwargs), 'def')
self.assertEqual(thread_number(), 1)
self.assertEqual(log, [
('key', ('abc',), {'pl': pl}),
('compute_state', ('abc',)),
('render_one', 'abc', {'pl': pl, '_key': ('abc',)}),
('key', ('def',), {'pl': pl}),
('compute_state', ('def',)),
('render_one', 'def', {'pl': pl, '_key': ('def',)}),
])
log[:] = ()
segment = TestSegment()
kwargs = {'pl': pl, '_key': ValueError('xyz'), 'update_first': False}
event.clear()
self.assertEqual(segment(**kwargs), None)
self.assertEqual(thread_number(), 1)
self.assertEqual(log, [
('key', kwargs['_key'], {'pl': pl}),
('compute_state', kwargs['_key']),
])
log[:] = ()
segment = TestSegment()
kwargs = {'pl': pl, '_key': (ValueError('abc'),), 'update_first': False}
event.clear()
self.assertRaises(ValueError, segment, **kwargs)
self.assertEqual(thread_number(), 1)
self.assertEqual(log, [
('key', kwargs['_key'], {'pl': pl}),
('compute_state', kwargs['_key']),
('render_one', kwargs['_key'][0], {'pl': pl, '_key': kwargs['_key']}),
])
log[:] = ()
# Threaded tests
segment = TestSegment()
kwargs = {'pl': pl, 'update_first': False, '_key': ('_abc',)}
event.clear()
segment.startup(**kwargs)
ret = segment(**kwargs)
self.assertEqual(thread_number(), 2)
segment.shutdown_event.set()
segment.thread.join()
self.assertEqual(ret, None)
self.assertEqual(log[:2], [
('key', kwargs['_key'], {'pl': pl}),
('render_one', None, {'pl': pl, '_key': kwargs['_key']}),
])
self.assertLessEqual(len(log), 3)
if len(log) > 2:
self.assertEqual(log[2], ('compute_state', kwargs['_key']))
log[:] = ()
segment = TestSegment()
kwargs = {'pl': pl, 'update_first': True, '_key': ('_abc',)}
event.clear()
segment.startup(**kwargs)
ret1 = segment(**kwargs)
kwargs.update(_key=('_def',))
ret2 = segment(**kwargs)
self.assertEqual(thread_number(), 2)
segment.shutdown_event.set()
segment.thread.join()
self.assertEqual(ret1, '_abc')
self.assertEqual(ret2, '_def')
self.assertEqual(log, [
('key', ('_abc',), {'pl': pl}),
('compute_state', ('_abc',)),
('render_one', '_abc', {'pl': pl, '_key': ('_abc',)}),
('key', ('_def',), {'pl': pl}),
('compute_state', ('_def',)),
('render_one', '_def', {'pl': pl, '_key': ('_def',)}),
])
log[:] = ()
class TestLib(TestCase): class TestLib(TestCase):
@ -41,12 +374,11 @@ class TestLib(TestCase):
class TestFilesystemWatchers(TestCase): class TestFilesystemWatchers(TestCase):
def do_test_for_change(self, watcher, path): def do_test_for_change(self, watcher, path):
import time st = monotonic()
st = time.time() while monotonic() - st < 1:
while time.time() - st < 1:
if watcher(path): if watcher(path):
return return
time.sleep(0.1) sleep(0.1)
self.fail('The change to {0} was not detected'.format(path)) self.fail('The change to {0} was not detected'.format(path))
def test_file_watcher(self): def test_file_watcher(self):
@ -134,9 +466,8 @@ use_mercurial = use_bzr = sys.version_info < (3, 0)
class TestVCS(TestCase): class TestVCS(TestCase):
def do_branch_rename_test(self, repo, q): def do_branch_rename_test(self, repo, q):
import time st = monotonic()
st = time.time() while monotonic() - st < 1:
while time.time() - st < 1:
# Give inotify time to deliver events # Give inotify time to deliver events
ans = repo.branch() ans = repo.branch()
if hasattr(q, '__call__'): if hasattr(q, '__call__'):
@ -145,7 +476,7 @@ class TestVCS(TestCase):
else: else:
if ans == q: if ans == q:
break break
time.sleep(0.01) sleep(0.01)
if hasattr(q, '__call__'): if hasattr(q, '__call__'):
self.assertTrue(q(ans)) self.assertTrue(q(ans))
else: else: