From 720dc893e2ab8411dd1c242944dc128675113137 Mon Sep 17 00:00:00 2001 From: Aanand Prasad Date: Mon, 11 Apr 2016 12:49:04 +0100 Subject: [PATCH] Document parallel helper functions Signed-off-by: Aanand Prasad --- compose/parallel.py | 39 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 36 insertions(+), 3 deletions(-) diff --git a/compose/parallel.py b/compose/parallel.py index ace1f029c..d9c24ab66 100644 --- a/compose/parallel.py +++ b/compose/parallel.py @@ -65,12 +65,19 @@ def _no_deps(x): class State(object): + """ + Holds the state of a partially-complete parallel operation. + + state.started: objects being processed + state.finished: objects which have been processed + state.failed: objects which either failed or whose dependencies failed + """ def __init__(self, objects): self.objects = objects - self.started = set() # objects being processed - self.finished = set() # objects which have been processed - self.failed = set() # objects which either failed or whose dependencies failed + self.started = set() + self.finished = set() + self.failed = set() def is_done(self): return len(self.finished) + len(self.failed) >= len(self.objects) @@ -80,6 +87,21 @@ class State(object): def parallel_execute_stream(objects, func, get_deps): + """ + Runs func on objects in parallel while ensuring that func is + ran on object only after it is ran on all its dependencies. + + Returns an iterator of tuples which look like: + + # if func returned normally when run on object + (object, result, None) + + # if func raised an exception when run on object + (object, None, exception) + + # if func raised an exception when run on one of object's dependencies + (object, None, UpstreamError()) + """ if get_deps is None: get_deps = _no_deps @@ -109,6 +131,10 @@ def parallel_execute_stream(objects, func, get_deps): def queue_producer(obj, func, results): + """ + The entry point for a producer thread which runs func on a single object. + Places a tuple on the results queue once func has either returned or raised. + """ try: result = func(obj) results.put((obj, result, None)) @@ -117,6 +143,13 @@ def queue_producer(obj, func, results): def feed_queue(objects, func, get_deps, results, state): + """ + Starts producer threads for any objects which are ready to be processed + (i.e. they have no dependencies which haven't been successfully processed). + + Shortcuts any objects whose dependencies have failed and places an + (object, None, UpstreamError()) tuple on the results queue. + """ pending = state.pending() log.debug('Pending: {}'.format(pending))