diff --git a/compose/parallel.py b/compose/parallel.py index 4810a1064..439f0f44b 100644 --- a/compose/parallel.py +++ b/compose/parallel.py @@ -14,68 +14,98 @@ from compose.cli.signals import ShutdownException from compose.utils import get_output_stream -def perform_operation(func, arg, callback, index): - try: - callback((index, func(arg))) - except Exception as e: - callback((index, e)) +def parallel_execute(objects, func, get_name, msg, get_deps=None): + """Runs func on objects in parallel while ensuring that func is + ran on object only after it is ran on all its dependencies. - -def parallel_execute(objects, func, index_func, msg): - """For a given list of objects, call the callable passing in the first - object we give it. + get_deps called on object must return a collection with its dependencies. + get_name called on object must return its name. """ objects = list(objects) stream = get_output_stream(sys.stderr) + writer = ParallelStreamWriter(stream, msg) - q = setup_queue(writer, objects, func, index_func) + for obj in objects: + writer.initialize(get_name(obj)) + + q = setup_queue(objects, func, get_deps, get_name) done = 0 errors = {} + error_to_reraise = None + returned = [None] * len(objects) while done < len(objects): try: - msg_index, result = q.get(timeout=1) + obj, result, exception = q.get(timeout=1) except Empty: continue # See https://github.com/docker/compose/issues/189 except thread.error: raise ShutdownException() - if isinstance(result, APIError): - errors[msg_index] = "error", result.explanation - writer.write(msg_index, 'error') - elif isinstance(result, Exception): - errors[msg_index] = "unexpected_exception", result + if exception is None: + writer.write(get_name(obj), 'done') + returned[objects.index(obj)] = result + elif isinstance(exception, APIError): + errors[get_name(obj)] = exception.explanation + writer.write(get_name(obj), 'error') else: - writer.write(msg_index, 'done') + errors[get_name(obj)] = exception + error_to_reraise = exception + done += 1 - if not errors: - return + for obj_name, error in errors.items(): + stream.write("\nERROR: for {} {}\n".format(obj_name, error)) - stream.write("\n") - for msg_index, (result, error) in errors.items(): - stream.write("ERROR: for {} {} \n".format(msg_index, error)) - if result == 'unexpected_exception': - raise error + if error_to_reraise: + raise error_to_reraise + + return returned -def setup_queue(writer, objects, func, index_func): - for obj in objects: - writer.initialize(index_func(obj)) +def _no_deps(x): + return [] - q = Queue() - # TODO: limit the number of threads #1828 - for obj in objects: - t = Thread( - target=perform_operation, - args=(func, obj, q.put, index_func(obj))) - t.daemon = True - t.start() +def setup_queue(objects, func, get_deps, get_name): + if get_deps is None: + get_deps = _no_deps - return q + results = Queue() + + started = set() # objects, threads were started for + finished = set() # already finished objects + + def do_op(obj): + try: + result = func(obj) + results.put((obj, result, None)) + except Exception as e: + results.put((obj, None, e)) + + finished.add(obj) + feed() + + def ready(obj): + # Is object ready for performing operation + return obj not in started and all( + dep not in objects or dep in finished + for dep in get_deps(obj) + ) + + def feed(): + ready_objects = [o for o in objects if ready(o)] + for obj in ready_objects: + started.add(obj) + t = Thread(target=do_op, + args=(obj,)) + t.daemon = True + t.start() + + feed() + return results class ParallelStreamWriter(object): @@ -91,11 +121,15 @@ class ParallelStreamWriter(object): self.lines = [] def initialize(self, obj_index): + if self.msg is None: + return self.lines.append(obj_index) self.stream.write("{} {} ... \r\n".format(self.msg, obj_index)) self.stream.flush() def write(self, obj_index, status): + if self.msg is None: + return position = self.lines.index(obj_index) diff = len(self.lines) - position # move up diff --git a/compose/project.py b/compose/project.py index c964417ff..3de68b2c6 100644 --- a/compose/project.py +++ b/compose/project.py @@ -3,6 +3,7 @@ from __future__ import unicode_literals import datetime import logging +import operator from functools import reduce from docker.errors import APIError @@ -200,13 +201,40 @@ class Project(object): def start(self, service_names=None, **options): containers = [] - for service in self.get_services(service_names): - service_containers = service.start(**options) + + def start_service(service): + service_containers = service.start(quiet=True, **options) containers.extend(service_containers) + + services = self.get_services(service_names) + + def get_deps(service): + return {self.get_service(dep) for dep in service.get_dependency_names()} + + parallel.parallel_execute( + services, + start_service, + operator.attrgetter('name'), + 'Starting', + get_deps) + return containers def stop(self, service_names=None, **options): - parallel.parallel_stop(self.containers(service_names), options) + containers = self.containers(service_names) + + def get_deps(container): + # actually returning inversed dependencies + return {other for other in containers + if container.service in + self.get_service(other.service).get_dependency_names()} + + parallel.parallel_execute( + containers, + operator.methodcaller('stop', **options), + operator.attrgetter('name'), + 'Stopping', + get_deps) def pause(self, service_names=None, **options): containers = self.containers(service_names) @@ -314,15 +342,33 @@ class Project(object): include_deps=start_deps) plans = self._get_convergence_plans(services, strategy) - return [ - container - for service in services - for container in service.execute_convergence_plan( + + for svc in services: + svc.ensure_image_exists(do_build=do_build) + + def do(service): + return service.execute_convergence_plan( plans[service.name], do_build=do_build, timeout=timeout, detached=detached ) + + def get_deps(service): + return {self.get_service(dep) for dep in service.get_dependency_names()} + + results = parallel.parallel_execute( + services, + do, + operator.attrgetter('name'), + None, + get_deps + ) + return [ + container + for svc_containers in results + if svc_containers is not None + for container in svc_containers ] def initialize(self): diff --git a/compose/service.py b/compose/service.py index fad1c4d93..30d28e4c6 100644 --- a/compose/service.py +++ b/compose/service.py @@ -436,9 +436,10 @@ class Service(object): container.remove() return new_container - def start_container_if_stopped(self, container, attach_logs=False): + def start_container_if_stopped(self, container, attach_logs=False, quiet=False): if not container.is_running: - log.info("Starting %s" % container.name) + if not quiet: + log.info("Starting %s" % container.name) if attach_logs: container.attach_log_stream() return self.start_container(container) diff --git a/tests/acceptance/cli_test.py b/tests/acceptance/cli_test.py index c94578a1b..825b97bed 100644 --- a/tests/acceptance/cli_test.py +++ b/tests/acceptance/cli_test.py @@ -8,6 +8,7 @@ import shlex import signal import subprocess import time +from collections import Counter from collections import namedtuple from operator import attrgetter @@ -1346,7 +1347,7 @@ class CLITestCase(DockerClientTestCase): os.kill(events_proc.pid, signal.SIGINT) result = wait_on_process(events_proc, returncode=1) lines = [json.loads(line) for line in result.stdout.rstrip().split('\n')] - assert [e['action'] for e in lines] == ['create', 'start', 'create', 'start'] + assert Counter(e['action'] for e in lines) == {'create': 2, 'start': 2} def test_events_human_readable(self): events_proc = start_process(self.base_dir, ['events'])