diff --git a/compose/parallel.py b/compose/parallel.py index b3ca01530..f400b2235 100644 --- a/compose/parallel.py +++ b/compose/parallel.py @@ -64,18 +64,30 @@ def _no_deps(x): return [] +class State(object): + def __init__(self, objects): + self.objects = objects + + self.started = set() # objects being processed + self.finished = set() # objects which have been processed + self.failed = set() # objects which either failed or whose dependencies failed + + def is_done(self): + return len(self.finished) + len(self.failed) >= len(self.objects) + + def pending(self): + return set(self.objects) - self.started - self.finished - self.failed + + def parallel_execute_stream(objects, func, get_deps): if get_deps is None: get_deps = _no_deps results = Queue() + state = State(objects) - started = set() # objects being processed - finished = set() # objects which have been processed - failed = set() # objects which either failed or whose dependencies failed - - while len(finished) + len(failed) < len(objects): - for event in feed_queue(objects, func, get_deps, results, started, finished, failed): + while not state.is_done(): + for event in feed_queue(objects, func, get_deps, results, state): yield event try: @@ -89,10 +101,10 @@ def parallel_execute_stream(objects, func, get_deps): obj, _, exception = event if exception is None: log.debug('Finished processing: {}'.format(obj)) - finished.add(obj) + state.finished.add(obj) else: log.debug('Failed: {}'.format(obj)) - failed.add(obj) + state.failed.add(obj) yield event @@ -105,26 +117,26 @@ def queue_producer(obj, func, results): results.put((obj, None, e)) -def feed_queue(objects, func, get_deps, results, started, finished, failed): - pending = set(objects) - started - finished - failed +def feed_queue(objects, func, get_deps, results, state): + pending = state.pending() log.debug('Pending: {}'.format(pending)) for obj in pending: deps = get_deps(obj) - if any(dep in failed for dep in deps): + if any(dep in state.failed for dep in deps): log.debug('{} has upstream errors - not processing'.format(obj)) yield (obj, None, UpstreamError()) - failed.add(obj) + state.failed.add(obj) elif all( - dep not in objects or dep in finished + dep not in objects or dep in state.finished for dep in deps ): log.debug('Starting producer thread for {}'.format(obj)) t = Thread(target=queue_producer, args=(obj, func, results)) t.daemon = True t.start() - started.add(obj) + state.started.add(obj) class UpstreamError(Exception):