diff --git a/compose/service.py b/compose/service.py index 006696c2d..cda68b7f5 100644 --- a/compose/service.py +++ b/compose/service.py @@ -24,7 +24,7 @@ from .const import ( from .container import Container from .legacy import check_for_legacy_containers from .progress_stream import stream_output, StreamOutputError -from .utils import json_hash +from .utils import json_hash, parallel_create_execute, parallel_execute log = logging.getLogger(__name__) @@ -162,36 +162,43 @@ class Service(object): 'for this service are created on a single host, the port will clash.' % self.name) - # Create enough containers - containers = self.containers(stopped=True) - while len(containers) < desired_num: - containers.append(self.create_container()) + def create_and_start(number): + container = self.create_container(number=number, quiet=True) + container.start() + return container - running_containers = [] - stopped_containers = [] - for c in containers: - if c.is_running: - running_containers.append(c) - else: - stopped_containers.append(c) - running_containers.sort(key=lambda c: c.number) - stopped_containers.sort(key=lambda c: c.number) + msgs = {'doing': 'Creating', 'done': 'Started'} - # Stop containers - while len(running_containers) > desired_num: - c = running_containers.pop() - log.info("Stopping %s..." % c.name) - c.stop(timeout=timeout) - stopped_containers.append(c) + running_containers = self.containers(stopped=False) + num_running = len(running_containers) - # Start containers - while len(running_containers) < desired_num: - c = stopped_containers.pop(0) - log.info("Starting %s..." % c.name) - self.start_container(c) - running_containers.append(c) + if desired_num == num_running: + # do nothing as we already have the desired number + log.info('Desired container number already achieved') + return - self.remove_stopped() + if desired_num > num_running: + num_to_create = desired_num - num_running + next_number = self._next_container_number() + container_numbers = [ + number for number in range( + next_number, next_number + num_to_create + ) + ] + parallel_create_execute(create_and_start, container_numbers, msgs) + + if desired_num < num_running: + sorted_running_containers = sorted(running_containers, key=attrgetter('number')) + + if desired_num < num_running: + # count number of running containers. + num_to_stop = num_running - desired_num + + containers_to_stop = sorted_running_containers[-num_to_stop:] + # TODO: refactor these out? + parallel_execute("stop", containers_to_stop, "Stopping", "Stopped") + parallel_execute("remove", containers_to_stop, "Removing", "Removed") + # self.remove_stopped() def remove_stopped(self, **options): for c in self.containers(stopped=True): diff --git a/compose/utils.py b/compose/utils.py index b6ee63d03..af6aa902a 100644 --- a/compose/utils.py +++ b/compose/utils.py @@ -12,6 +12,51 @@ from threading import Thread log = logging.getLogger(__name__) +def parallel_create_execute(create_function, container_numbers, msgs={}, **options): + """ + Parallel container creation by calling the create_function for each new container + number passed in. + """ + stream = codecs.getwriter('utf-8')(sys.stdout) + lines = [] + errors = {} + + for number in container_numbers: + write_out_msg(stream, lines, number, msgs['doing']) + + q = Queue() + + def inner_call_function(create_function, number): + try: + container = create_function(number) + except APIError as e: + errors[number] = e.explanation + q.put(container) + + for number in container_numbers: + t = Thread( + target=inner_call_function, + args=(create_function, number), + kwargs=options, + ) + t.daemon = True + t.start() + + done = 0 + total_to_create = len(container_numbers) + while done < total_to_create: + try: + container = q.get(timeout=1) + write_out_msg(stream, lines, container.name, msgs['done']) + done += 1 + except Empty: + pass + + if errors: + for number in errors: + stream.write("ERROR: for {} {} \n".format(number, errors[number])) + + def parallel_execute(command, containers, doing_msg, done_msg, **options): """ Execute a given command upon a list of containers in parallel.