Merge pull request #1731 from aanand/fix-ctrl-c

Make parallel tasks interruptible with Ctrl-C
This commit is contained in:
Mazz Mosley 2015-07-20 16:05:26 +01:00
commit 85c90daa18
5 changed files with 21 additions and 25 deletions

View File

@ -1,5 +1,4 @@
DEFAULT_MAX_WORKERS = 20
DEFAULT_TIMEOUT = 10 DEFAULT_TIMEOUT = 10
LABEL_CONTAINER_NUMBER = 'com.docker.compose.container-number' LABEL_CONTAINER_NUMBER = 'com.docker.compose.container-number'
LABEL_ONE_OFF = 'com.docker.compose.oneoff' LABEL_ONE_OFF = 'com.docker.compose.oneoff'

View File

@ -2,13 +2,11 @@ import codecs
import hashlib import hashlib
import json import json
import logging import logging
import os
import sys import sys
from docker.errors import APIError from docker.errors import APIError
import concurrent.futures from Queue import Queue, Empty
from threading import Thread
from .const import DEFAULT_MAX_WORKERS
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -18,7 +16,6 @@ def parallel_execute(command, containers, doing_msg, done_msg, **options):
""" """
Execute a given command upon a list of containers in parallel. Execute a given command upon a list of containers in parallel.
""" """
max_workers = os.environ.get('COMPOSE_MAX_WORKERS', DEFAULT_MAX_WORKERS)
stream = codecs.getwriter('utf-8')(sys.stdout) stream = codecs.getwriter('utf-8')(sys.stdout)
lines = [] lines = []
errors = {} errors = {}
@ -26,25 +23,33 @@ def parallel_execute(command, containers, doing_msg, done_msg, **options):
for container in containers: for container in containers:
write_out_msg(stream, lines, container.name, doing_msg) write_out_msg(stream, lines, container.name, doing_msg)
q = Queue()
def container_command_execute(container, command, **options): def container_command_execute(container, command, **options):
try: try:
getattr(container, command)(**options) getattr(container, command)(**options)
except APIError as e: except APIError as e:
errors[container.name] = e.explanation errors[container.name] = e.explanation
q.put(container)
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: for container in containers:
future_container = { t = Thread(
executor.submit( target=container_command_execute,
container_command_execute, args=(container, command),
container, kwargs=options,
command, )
**options t.daemon = True
): container for container in containers t.start()
}
for future in concurrent.futures.as_completed(future_container): done = 0
container = future_container[future]
while done < len(containers):
try:
container = q.get(timeout=1)
write_out_msg(stream, lines, container.name, done_msg) write_out_msg(stream, lines, container.name, done_msg)
done += 1
except Empty:
pass
if errors: if errors:
for container in errors: for container in errors:

View File

@ -44,12 +44,6 @@ the `docker` daemon.
Configures the path to the `ca.pem`, `cert.pem`, and `key.pem` files used for TLS verification. Defaults to `~/.docker`. Configures the path to the `ca.pem`, `cert.pem`, and `key.pem` files used for TLS verification. Defaults to `~/.docker`.
### COMPOSE\_MAX\_WORKERS
Configures the maximum number of worker threads to be used when executing
commands in parallel. Only a subset of commands execute in parallel, `stop`,
`kill` and `rm`.

View File

@ -2,7 +2,6 @@ PyYAML==3.10
docker-py==1.3.0 docker-py==1.3.0
dockerpty==0.3.4 dockerpty==0.3.4
docopt==0.6.1 docopt==0.6.1
futures==3.0.3
requests==2.6.1 requests==2.6.1
six==1.7.3 six==1.7.3
texttable==0.8.2 texttable==0.8.2

View File

@ -33,7 +33,6 @@ install_requires = [
'docker-py >= 1.3.0, < 1.4', 'docker-py >= 1.3.0, < 1.4',
'dockerpty >= 0.3.4, < 0.4', 'dockerpty >= 0.3.4, < 0.4',
'six >= 1.3.0, < 2', 'six >= 1.3.0, < 2',
'futures >= 3.0.3',
] ]