Move queue logic out of parallel_execute()

Signed-off-by: Aanand Prasad <aanand.prasad@gmail.com>
This commit is contained in:
Aanand Prasad 2016-04-08 18:30:28 +01:00
parent 141b96bb31
commit af9526fb82
1 changed files with 14 additions and 14 deletions

View File

@ -32,22 +32,13 @@ def parallel_execute(objects, func, get_name, msg, get_deps=None):
for obj in objects: for obj in objects:
writer.initialize(get_name(obj)) writer.initialize(get_name(obj))
q = setup_queue(objects, func, get_deps) events = parallel_execute_stream(objects, func, get_deps)
done = 0
errors = {} errors = {}
results = [] results = []
error_to_reraise = None error_to_reraise = None
while done < len(objects): for obj, result, exception in events:
try:
obj, result, exception = q.get(timeout=1)
except Empty:
continue
# See https://github.com/docker/compose/issues/189
except thread.error:
raise ShutdownException()
if exception is None: if exception is None:
writer.write(get_name(obj), 'done') writer.write(get_name(obj), 'done')
results.append(result) results.append(result)
@ -59,7 +50,6 @@ def parallel_execute(objects, func, get_name, msg, get_deps=None):
else: else:
errors[get_name(obj)] = exception errors[get_name(obj)] = exception
error_to_reraise = exception error_to_reraise = exception
done += 1
for obj_name, error in errors.items(): for obj_name, error in errors.items():
stream.write("\nERROR: for {} {}\n".format(obj_name, error)) stream.write("\nERROR: for {} {}\n".format(obj_name, error))
@ -74,7 +64,7 @@ def _no_deps(x):
return [] return []
def setup_queue(objects, func, get_deps): def parallel_execute_stream(objects, func, get_deps):
if get_deps is None: if get_deps is None:
get_deps = _no_deps get_deps = _no_deps
@ -85,7 +75,17 @@ def setup_queue(objects, func, get_deps):
t.daemon = True t.daemon = True
t.start() t.start()
return output done = 0
while done < len(objects):
try:
yield output.get(timeout=1)
done += 1
except Empty:
continue
# See https://github.com/docker/compose/issues/189
except thread.error:
raise ShutdownException()
def queue_producer(obj, func, results): def queue_producer(obj, func, results):