From 2160c787e3fde64af1c90ce32dcc92b63eaf3c73 Mon Sep 17 00:00:00 2001 From: Aanand Prasad Date: Mon, 11 Apr 2016 17:05:52 +0100 Subject: [PATCH] Fix race condition If processing of all objects finishes before the queue is drained, parallel_execute_iter() returns prematurely. Signed-off-by: Aanand Prasad --- compose/parallel.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/compose/parallel.py b/compose/parallel.py index ee3d5777b..63417dcb0 100644 --- a/compose/parallel.py +++ b/compose/parallel.py @@ -17,6 +17,8 @@ from compose.utils import get_output_stream log = logging.getLogger(__name__) +STOP = object() + def parallel_execute(objects, func, get_name, msg, get_deps=None): """Runs func on objects in parallel while ensuring that func is @@ -108,7 +110,7 @@ def parallel_execute_iter(objects, func, get_deps): results = Queue() state = State(objects) - while not state.is_done(): + while True: feed_queue(objects, func, get_deps, results, state) try: @@ -119,6 +121,9 @@ def parallel_execute_iter(objects, func, get_deps): except thread.error: raise ShutdownException() + if event is STOP: + break + obj, _, exception = event if exception is None: log.debug('Finished processing: {}'.format(obj)) @@ -170,6 +175,9 @@ def feed_queue(objects, func, get_deps, results, state): t.start() state.started.add(obj) + if state.is_done(): + results.put(STOP) + class UpstreamError(Exception): pass