Hold state in an object

Signed-off-by: Aanand Prasad <aanand.prasad@gmail.com>
This commit is contained in:
Aanand Prasad 2016-04-08 19:06:07 +01:00 committed by Daniel Nephin
parent 79edda6804
commit de6496c6c9
1 changed files with 26 additions and 14 deletions

View File

@ -64,18 +64,30 @@ def _no_deps(x):
return []
class State(object):
def __init__(self, objects):
self.objects = objects
self.started = set() # objects being processed
self.finished = set() # objects which have been processed
self.failed = set() # objects which either failed or whose dependencies failed
def is_done(self):
return len(self.finished) + len(self.failed) >= len(self.objects)
def pending(self):
return set(self.objects) - self.started - self.finished - self.failed
def parallel_execute_stream(objects, func, get_deps):
if get_deps is None:
get_deps = _no_deps
results = Queue()
state = State(objects)
started = set() # objects being processed
finished = set() # objects which have been processed
failed = set() # objects which either failed or whose dependencies failed
while len(finished) + len(failed) < len(objects):
for event in feed_queue(objects, func, get_deps, results, started, finished, failed):
while not state.is_done():
for event in feed_queue(objects, func, get_deps, results, state):
yield event
try:
@ -89,10 +101,10 @@ def parallel_execute_stream(objects, func, get_deps):
obj, _, exception = event
if exception is None:
log.debug('Finished processing: {}'.format(obj))
finished.add(obj)
state.finished.add(obj)
else:
log.debug('Failed: {}'.format(obj))
failed.add(obj)
state.failed.add(obj)
yield event
@ -105,26 +117,26 @@ def queue_producer(obj, func, results):
results.put((obj, None, e))
def feed_queue(objects, func, get_deps, results, started, finished, failed):
pending = set(objects) - started - finished - failed
def feed_queue(objects, func, get_deps, results, state):
pending = state.pending()
log.debug('Pending: {}'.format(pending))
for obj in pending:
deps = get_deps(obj)
if any(dep in failed for dep in deps):
if any(dep in state.failed for dep in deps):
log.debug('{} has upstream errors - not processing'.format(obj))
yield (obj, None, UpstreamError())
failed.add(obj)
state.failed.add(obj)
elif all(
dep not in objects or dep in finished
dep not in objects or dep in state.finished
for dep in deps
):
log.debug('Starting producer thread for {}'.format(obj))
t = Thread(target=queue_producer, args=(obj, func, results))
t.daemon = True
t.start()
started.add(obj)
state.started.add(obj)
class UpstreamError(Exception):