diff --git a/compose/parallel.py b/compose/parallel.py index 8172d8ead..b3ca01530 100644 --- a/compose/parallel.py +++ b/compose/parallel.py @@ -69,24 +69,33 @@ def parallel_execute_stream(objects, func, get_deps): get_deps = _no_deps results = Queue() - output = Queue() - t = Thread(target=queue_consumer, args=(objects, func, get_deps, results, output)) - t.daemon = True - t.start() + started = set() # objects being processed + finished = set() # objects which have been processed + failed = set() # objects which either failed or whose dependencies failed - done = 0 + while len(finished) + len(failed) < len(objects): + for event in feed_queue(objects, func, get_deps, results, started, finished, failed): + yield event - while done < len(objects): try: - yield output.get(timeout=1) - done += 1 + event = results.get(timeout=1) except Empty: continue # See https://github.com/docker/compose/issues/189 except thread.error: raise ShutdownException() + obj, _, exception = event + if exception is None: + log.debug('Finished processing: {}'.format(obj)) + finished.add(obj) + else: + log.debug('Failed: {}'.format(obj)) + failed.add(obj) + + yield event + def queue_producer(obj, func, results): try: @@ -96,46 +105,26 @@ def queue_producer(obj, func, results): results.put((obj, None, e)) -def queue_consumer(objects, func, get_deps, results, output): - started = set() # objects being processed - finished = set() # objects which have been processed - failed = set() # objects which either failed or whose dependencies failed +def feed_queue(objects, func, get_deps, results, started, finished, failed): + pending = set(objects) - started - finished - failed + log.debug('Pending: {}'.format(pending)) - while len(finished) + len(failed) < len(objects): - pending = set(objects) - started - finished - failed - log.debug('Pending: {}'.format(pending)) + for obj in pending: + deps = get_deps(obj) - for obj in pending: - deps = get_deps(obj) - - if any(dep in failed for dep in deps): - log.debug('{} has upstream errors - not processing'.format(obj)) - output.put((obj, None, UpstreamError())) - failed.add(obj) - elif all( - dep not in objects or dep in finished - for dep in deps - ): - log.debug('Starting producer thread for {}'.format(obj)) - t = Thread(target=queue_producer, args=(obj, func, results)) - t.daemon = True - t.start() - started.add(obj) - - try: - event = results.get(timeout=1) - except Empty: - continue - - obj, _, exception = event - if exception is None: - log.debug('Finished processing: {}'.format(obj)) - finished.add(obj) - else: - log.debug('Failed: {}'.format(obj)) + if any(dep in failed for dep in deps): + log.debug('{} has upstream errors - not processing'.format(obj)) + yield (obj, None, UpstreamError()) failed.add(obj) - - output.put(event) + elif all( + dep not in objects or dep in finished + for dep in deps + ): + log.debug('Starting producer thread for {}'.format(obj)) + t = Thread(target=queue_producer, args=(obj, func, results)) + t.daemon = True + t.start() + started.add(obj) class UpstreamError(Exception):