From 0e3db185cf79e6638c2660be8e052af113ed7337 Mon Sep 17 00:00:00 2001 From: Aanand Prasad Date: Mon, 11 Apr 2016 12:37:00 +0100 Subject: [PATCH 1/4] Small refactor to feed_queue() Put the event tuple into the results queue rather than yielding it from the function. Signed-off-by: Aanand Prasad --- compose/parallel.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/compose/parallel.py b/compose/parallel.py index e360ca357..ace1f029c 100644 --- a/compose/parallel.py +++ b/compose/parallel.py @@ -87,8 +87,7 @@ def parallel_execute_stream(objects, func, get_deps): state = State(objects) while not state.is_done(): - for event in feed_queue(objects, func, get_deps, results, state): - yield event + feed_queue(objects, func, get_deps, results, state) try: event = results.get(timeout=0.1) @@ -126,7 +125,7 @@ def feed_queue(objects, func, get_deps, results, state): if any(dep in state.failed for dep in deps): log.debug('{} has upstream errors - not processing'.format(obj)) - yield (obj, None, UpstreamError()) + results.put((obj, None, UpstreamError())) state.failed.add(obj) elif all( dep not in objects or dep in state.finished From 0671b8b8c3ce1873db87c4233f88e64876d43c6a Mon Sep 17 00:00:00 2001 From: Aanand Prasad Date: Mon, 11 Apr 2016 12:49:04 +0100 Subject: [PATCH 2/4] Document parallel helper functions Signed-off-by: Aanand Prasad --- compose/parallel.py | 39 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 36 insertions(+), 3 deletions(-) diff --git a/compose/parallel.py b/compose/parallel.py index ace1f029c..d9c24ab66 100644 --- a/compose/parallel.py +++ b/compose/parallel.py @@ -65,12 +65,19 @@ def _no_deps(x): class State(object): + """ + Holds the state of a partially-complete parallel operation. + + state.started: objects being processed + state.finished: objects which have been processed + state.failed: objects which either failed or whose dependencies failed + """ def __init__(self, objects): self.objects = objects - self.started = set() # objects being processed - self.finished = set() # objects which have been processed - self.failed = set() # objects which either failed or whose dependencies failed + self.started = set() + self.finished = set() + self.failed = set() def is_done(self): return len(self.finished) + len(self.failed) >= len(self.objects) @@ -80,6 +87,21 @@ class State(object): def parallel_execute_stream(objects, func, get_deps): + """ + Runs func on objects in parallel while ensuring that func is + ran on object only after it is ran on all its dependencies. + + Returns an iterator of tuples which look like: + + # if func returned normally when run on object + (object, result, None) + + # if func raised an exception when run on object + (object, None, exception) + + # if func raised an exception when run on one of object's dependencies + (object, None, UpstreamError()) + """ if get_deps is None: get_deps = _no_deps @@ -109,6 +131,10 @@ def parallel_execute_stream(objects, func, get_deps): def queue_producer(obj, func, results): + """ + The entry point for a producer thread which runs func on a single object. + Places a tuple on the results queue once func has either returned or raised. + """ try: result = func(obj) results.put((obj, result, None)) @@ -117,6 +143,13 @@ def queue_producer(obj, func, results): def feed_queue(objects, func, get_deps, results, state): + """ + Starts producer threads for any objects which are ready to be processed + (i.e. they have no dependencies which haven't been successfully processed). + + Shortcuts any objects whose dependencies have failed and places an + (object, None, UpstreamError()) tuple on the results queue. + """ pending = state.pending() log.debug('Pending: {}'.format(pending)) From 15c5bc2e6c79cdb2edac4f8cab10d7bcbfc175d1 Mon Sep 17 00:00:00 2001 From: Aanand Prasad Date: Mon, 11 Apr 2016 13:03:35 +0100 Subject: [PATCH 3/4] Rename a couple of functions in parallel.py Signed-off-by: Aanand Prasad --- compose/parallel.py | 8 ++++---- tests/unit/parallel_test.py | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/compose/parallel.py b/compose/parallel.py index d9c24ab66..ee3d5777b 100644 --- a/compose/parallel.py +++ b/compose/parallel.py @@ -32,7 +32,7 @@ def parallel_execute(objects, func, get_name, msg, get_deps=None): for obj in objects: writer.initialize(get_name(obj)) - events = parallel_execute_stream(objects, func, get_deps) + events = parallel_execute_iter(objects, func, get_deps) errors = {} results = [] @@ -86,7 +86,7 @@ class State(object): return set(self.objects) - self.started - self.finished - self.failed -def parallel_execute_stream(objects, func, get_deps): +def parallel_execute_iter(objects, func, get_deps): """ Runs func on objects in parallel while ensuring that func is ran on object only after it is ran on all its dependencies. @@ -130,7 +130,7 @@ def parallel_execute_stream(objects, func, get_deps): yield event -def queue_producer(obj, func, results): +def producer(obj, func, results): """ The entry point for a producer thread which runs func on a single object. Places a tuple on the results queue once func has either returned or raised. @@ -165,7 +165,7 @@ def feed_queue(objects, func, get_deps, results, state): for dep in deps ): log.debug('Starting producer thread for {}'.format(obj)) - t = Thread(target=queue_producer, args=(obj, func, results)) + t = Thread(target=producer, args=(obj, func, results)) t.daemon = True t.start() state.started.add(obj) diff --git a/tests/unit/parallel_test.py b/tests/unit/parallel_test.py index 9ed1b3623..45b0db1db 100644 --- a/tests/unit/parallel_test.py +++ b/tests/unit/parallel_test.py @@ -5,7 +5,7 @@ import six from docker.errors import APIError from compose.parallel import parallel_execute -from compose.parallel import parallel_execute_stream +from compose.parallel import parallel_execute_iter from compose.parallel import UpstreamError @@ -81,7 +81,7 @@ def test_parallel_execute_with_upstream_errors(): events = [ (obj, result, type(exception)) for obj, result, exception - in parallel_execute_stream(objects, process, get_deps) + in parallel_execute_iter(objects, process, get_deps) ] assert (cache, None, type(None)) in events From 7cfb5e7bc9fb93549de0915f378d6cd831835d52 Mon Sep 17 00:00:00 2001 From: Aanand Prasad Date: Mon, 11 Apr 2016 17:05:52 +0100 Subject: [PATCH 4/4] Fix race condition If processing of all objects finishes before the queue is drained, parallel_execute_iter() returns prematurely. Signed-off-by: Aanand Prasad --- compose/parallel.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/compose/parallel.py b/compose/parallel.py index ee3d5777b..63417dcb0 100644 --- a/compose/parallel.py +++ b/compose/parallel.py @@ -17,6 +17,8 @@ from compose.utils import get_output_stream log = logging.getLogger(__name__) +STOP = object() + def parallel_execute(objects, func, get_name, msg, get_deps=None): """Runs func on objects in parallel while ensuring that func is @@ -108,7 +110,7 @@ def parallel_execute_iter(objects, func, get_deps): results = Queue() state = State(objects) - while not state.is_done(): + while True: feed_queue(objects, func, get_deps, results, state) try: @@ -119,6 +121,9 @@ def parallel_execute_iter(objects, func, get_deps): except thread.error: raise ShutdownException() + if event is STOP: + break + obj, _, exception = event if exception is None: log.debug('Finished processing: {}'.format(obj)) @@ -170,6 +175,9 @@ def feed_queue(objects, func, get_deps, results, state): t.start() state.started.add(obj) + if state.is_done(): + results.put(STOP) + class UpstreamError(Exception): pass