From af9526fb820f40a8b7eafb16d29f990b1696f4fe Mon Sep 17 00:00:00 2001 From: Aanand Prasad Date: Fri, 8 Apr 2016 18:30:28 +0100 Subject: [PATCH] Move queue logic out of parallel_execute() Signed-off-by: Aanand Prasad --- compose/parallel.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/compose/parallel.py b/compose/parallel.py index 745d46351..8172d8ead 100644 --- a/compose/parallel.py +++ b/compose/parallel.py @@ -32,22 +32,13 @@ def parallel_execute(objects, func, get_name, msg, get_deps=None): for obj in objects: writer.initialize(get_name(obj)) - q = setup_queue(objects, func, get_deps) + events = parallel_execute_stream(objects, func, get_deps) - done = 0 errors = {} results = [] error_to_reraise = None - while done < len(objects): - try: - obj, result, exception = q.get(timeout=1) - except Empty: - continue - # See https://github.com/docker/compose/issues/189 - except thread.error: - raise ShutdownException() - + for obj, result, exception in events: if exception is None: writer.write(get_name(obj), 'done') results.append(result) @@ -59,7 +50,6 @@ def parallel_execute(objects, func, get_name, msg, get_deps=None): else: errors[get_name(obj)] = exception error_to_reraise = exception - done += 1 for obj_name, error in errors.items(): stream.write("\nERROR: for {} {}\n".format(obj_name, error)) @@ -74,7 +64,7 @@ def _no_deps(x): return [] -def setup_queue(objects, func, get_deps): +def parallel_execute_stream(objects, func, get_deps): if get_deps is None: get_deps = _no_deps @@ -85,7 +75,17 @@ def setup_queue(objects, func, get_deps): t.daemon = True t.start() - return output + done = 0 + + while done < len(objects): + try: + yield output.get(timeout=1) + done += 1 + except Empty: + continue + # See https://github.com/docker/compose/issues/189 + except thread.error: + raise ShutdownException() def queue_producer(obj, func, results):