Refactor KwThreadedSegment

Fixes #813
This commit is contained in:
ZyX 2014-02-22 17:24:21 +04:00
parent 84765838d5
commit 725ff69be0
1 changed files with 26 additions and 17 deletions

View File

@ -144,7 +144,7 @@ class KwThreadedSegment(ThreadedSegment):
self.updated = True
self.update_value = ({}, set())
self.write_lock = Lock()
self.new_queries = {}
self.new_queries = []
@staticmethod
def key(**kwargs):
@ -159,34 +159,43 @@ class KwThreadedSegment(ThreadedSegment):
try:
update_state = queries[key][1]
except KeyError:
# Allow only to forbid to compute missing values: in either user
# configuration or in subclasses.
update_state = self.compute_state(key) if ((update_first and self.update_first) or self.run_once) else None
with self.write_lock:
self.new_queries.append(key)
if update_first and self.update_first:
return self.render(update_value=self.get_update_value(True), update_first=False, **kwargs)
else:
update_state = None
with self.write_lock:
self.new_queries[key] = (monotonic(), update_state)
return self.render_one(update_state, **kwargs)
def update_one(self, crashed, updates, key):
try:
updates[key] = (monotonic(), self.compute_state(key))
except Exception as e:
self.exception('Exception while computing state for {0!r}: {1}', key, str(e))
crashed.add(key)
except KeyboardInterrupt:
self.warn('Interrupt while computing state for {0!r}', key)
crashed.add(key)
def update(self, old_update_value):
updates = {}
crashed = set()
update_value = (updates, crashed)
queries = old_update_value[0]
new_queries = self.new_queries
with self.write_lock:
if self.new_queries:
queries.update(self.new_queries)
self.new_queries.clear()
self.new_queries = []
for key, (last_query_time, state) in queries.items():
if last_query_time < monotonic() < last_query_time + self.drop_interval:
try:
updates[key] = (last_query_time, self.compute_state(key))
except Exception as e:
self.exception('Exception while computing state for {0!r}: {1}', key, str(e))
crashed.add(key)
except KeyboardInterrupt:
self.warn('Interrupt while computing state for {0!r}', key)
crashed.add(key)
updates[key] = (last_query_time, state)
else:
self.update_one(crashed, updates, key)
for key in new_queries:
self.update_one(crashed, updates, key)
return update_value