From cdef2b5e3bc8cbffeea8d13d81eef1cc1f6d1a6e Mon Sep 17 00:00:00 2001 From: Aanand Prasad Date: Fri, 8 Apr 2016 12:58:19 +0100 Subject: [PATCH] Refactor setup_queue() - Stop sharing set objects across threads - Use a second queue to signal when producer threads are done - Use a single consumer thread to check dependencies and kick off new producers Signed-off-by: Aanand Prasad --- compose/parallel.py | 64 ++++++++++++++++++++++++++++++--------------- compose/service.py | 3 +++ 2 files changed, 46 insertions(+), 21 deletions(-) diff --git a/compose/parallel.py b/compose/parallel.py index c629a1abf..79699236d 100644 --- a/compose/parallel.py +++ b/compose/parallel.py @@ -1,6 +1,7 @@ from __future__ import absolute_import from __future__ import unicode_literals +import logging import operator import sys from threading import Thread @@ -14,6 +15,9 @@ from compose.cli.signals import ShutdownException from compose.utils import get_output_stream +log = logging.getLogger(__name__) + + def parallel_execute(objects, func, get_name, msg, get_deps=None): """Runs func on objects in parallel while ensuring that func is ran on object only after it is ran on all its dependencies. @@ -73,35 +77,53 @@ def setup_queue(objects, func, get_deps, get_name): get_deps = _no_deps results = Queue() - started = set() # objects being processed - finished = set() # objects which have been processed + output = Queue() - def do_op(obj): + def consumer(): + started = set() # objects being processed + finished = set() # objects which have been processed + + def ready(obj): + """ + Returns true if obj is ready to be processed: + - all dependencies have been processed + - obj is not already being processed + """ + return obj not in started and all( + dep not in objects or dep in finished + for dep in get_deps(obj) + ) + + while len(finished) < len(objects): + for obj in filter(ready, objects): + log.debug('Starting producer thread for {}'.format(obj)) + t = Thread(target=producer, args=(obj,)) + t.daemon = True + t.start() + started.add(obj) + + try: + event = results.get(timeout=1) + except Empty: + continue + + obj = event[0] + log.debug('Finished processing: {}'.format(obj)) + finished.add(obj) + output.put(event) + + def producer(obj): try: result = func(obj) results.put((obj, result, None)) except Exception as e: results.put((obj, None, e)) - finished.add(obj) - feed() + t = Thread(target=consumer) + t.daemon = True + t.start() - def ready(obj): - # Is object ready for performing operation - return obj not in started and all( - dep not in objects or dep in finished - for dep in get_deps(obj) - ) - - def feed(): - for obj in filter(ready, objects): - started.add(obj) - t = Thread(target=do_op, args=(obj,)) - t.daemon = True - t.start() - - feed() - return results + return output class ParallelStreamWriter(object): diff --git a/compose/service.py b/compose/service.py index ed45f0781..05cfc7c61 100644 --- a/compose/service.py +++ b/compose/service.py @@ -135,6 +135,9 @@ class Service(object): self.networks = networks or {} self.options = options + def __repr__(self): + return ''.format(self.name) + def containers(self, stopped=False, one_off=False, filters={}): filters.update({'label': self.labels(one_off=one_off)})