From a68ca199a2f1258d96b09cf2ea62a819aecfcbdb Mon Sep 17 00:00:00 2001 From: Mazz Mosley Date: Mon, 13 Jul 2015 14:03:44 +0100 Subject: [PATCH] Execute container commands in parallel Commands able to use this parallelisation are `stop`, `kill` and `rm`. We're using a backported function from python 3, to allow us to make the most of a pool of threads without having to write the low level code for managing this ourselves. A default value for number of threads is a low enough number so it shouldn't cause performance problems but if someone knows the capability of their system and wants to increase it, they can via an environment variable DEFAULT_MAX_WORKERS Signed-off-by: Mazz Mosley --- compose/const.py | 1 + compose/project.py | 22 +++++++++++----------- compose/service.py | 3 +++ compose/utils.py | 36 +++++++++++++++++++++++++++++++++++- requirements.txt | 1 + setup.py | 1 + 6 files changed, 52 insertions(+), 12 deletions(-) diff --git a/compose/const.py b/compose/const.py index 709c3a10d..9c39d5f89 100644 --- a/compose/const.py +++ b/compose/const.py @@ -1,4 +1,5 @@ +DEFAULT_MAX_WORKERS = 5 DEFAULT_TIMEOUT = 10 LABEL_CONTAINER_NUMBER = 'com.docker.compose.container-number' LABEL_ONE_OFF = 'com.docker.compose.oneoff' diff --git a/compose/project.py b/compose/project.py index 11c1e1ce9..7928316a6 100644 --- a/compose/project.py +++ b/compose/project.py @@ -1,15 +1,16 @@ from __future__ import unicode_literals from __future__ import absolute_import -import logging from functools import reduce +import logging from docker.errors import APIError from .config import get_service_name_from_net, ConfigurationError -from .const import LABEL_PROJECT, LABEL_SERVICE, LABEL_ONE_OFF, DEFAULT_TIMEOUT -from .service import Service +from .const import DEFAULT_TIMEOUT, LABEL_PROJECT, LABEL_SERVICE, LABEL_ONE_OFF from .container import Container from .legacy import check_for_legacy_containers +from .service import Service +from .utils import parallel_execute log = logging.getLogger(__name__) @@ -197,12 +198,15 @@ class Project(object): service.start(**options) def stop(self, service_names=None, **options): - for service in reversed(self.get_services(service_names)): - service.stop(**options) + parallel_execute("stop", self.containers(service_names), "Stopping", "Stopped", **options) def kill(self, service_names=None, **options): - for service in reversed(self.get_services(service_names)): - service.kill(**options) + parallel_execute("kill", self.containers(service_names), "Killing", "Killed", **options) + + def remove_stopped(self, service_names=None, **options): + all_containers = self.containers(service_names, stopped=True) + stopped_containers = [c for c in all_containers if not c.is_running] + parallel_execute("remove", stopped_containers, "Removing", "Removed", **options) def restart(self, service_names=None, **options): for service in self.get_services(service_names): @@ -284,10 +288,6 @@ class Project(object): for service in self.get_services(service_names, include_deps=True): service.pull(insecure_registry=insecure_registry) - def remove_stopped(self, service_names=None, **options): - for service in self.get_services(service_names): - service.remove_stopped(**options) - def containers(self, service_names=None, stopped=False, one_off=False): if service_names: self.validate_service_names(service_names) diff --git a/compose/service.py b/compose/service.py index 9a03192e6..213f54fad 100644 --- a/compose/service.py +++ b/compose/service.py @@ -129,6 +129,7 @@ class Service(object): for c in self.containers(stopped=True): self.start_container_if_stopped(c, **options) + # TODO: remove these functions, project takes care of starting/stopping, def stop(self, **options): for c in self.containers(): log.info("Stopping %s..." % c.name) @@ -144,6 +145,8 @@ class Service(object): log.info("Restarting %s..." % c.name) c.restart(**options) + # end TODO + def scale(self, desired_num): """ Adjusts the number of containers to the specified number and ensures diff --git a/compose/utils.py b/compose/utils.py index 76a4c6b93..cc7bd5dd0 100644 --- a/compose/utils.py +++ b/compose/utils.py @@ -1,5 +1,39 @@ -import json import hashlib +import json +import logging +import os + +import concurrent.futures + +from .const import DEFAULT_MAX_WORKERS + + +log = logging.getLogger(__name__) + + +def parallel_execute(command, containers, doing_msg, done_msg, **options): + """ + Execute a given command upon a list of containers in parallel. + """ + max_workers = os.environ.get('MAX_WORKERS', DEFAULT_MAX_WORKERS) + + def container_command_execute(container, command, **options): + log.info("{} {}...".format(doing_msg, container.name)) + return getattr(container, command)(**options) + + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + future_container = { + executor.submit( + container_command_execute, + container, + command, + **options + ): container for container in containers + } + + for future in concurrent.futures.as_completed(future_container): + container = future_container[future] + log.info("{} {}".format(done_msg, container.name)) def json_hash(obj): diff --git a/requirements.txt b/requirements.txt index 69bd4c5f9..4a0c5be53 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,7 @@ PyYAML==3.10 docker-py==1.2.3 dockerpty==0.3.4 docopt==0.6.1 +futures==3.0.3 requests==2.6.1 six==1.7.3 texttable==0.8.2 diff --git a/setup.py b/setup.py index d2e81e175..ebd531192 100644 --- a/setup.py +++ b/setup.py @@ -33,6 +33,7 @@ install_requires = [ 'docker-py >= 1.2.3, < 1.3', 'dockerpty >= 0.3.4, < 0.4', 'six >= 1.3.0, < 2', + 'futures >= 3.0.3', ]