Merge pull request #2679 from IlyaSkriblovsky/dependency-ordered-start-stop

Parallel up/start/stop
This commit is contained in:
Aanand Prasad 2016-03-15 11:51:44 +00:00
commit 94b1862579
5 changed files with 129 additions and 53 deletions

View File

@ -14,68 +14,98 @@ from compose.cli.signals import ShutdownException
from compose.utils import get_output_stream from compose.utils import get_output_stream
def perform_operation(func, arg, callback, index): def parallel_execute(objects, func, get_name, msg, get_deps=None):
try: """Runs func on objects in parallel while ensuring that func is
callback((index, func(arg))) ran on object only after it is ran on all its dependencies.
except Exception as e:
callback((index, e))
get_deps called on object must return a collection with its dependencies.
def parallel_execute(objects, func, index_func, msg): get_name called on object must return its name.
"""For a given list of objects, call the callable passing in the first
object we give it.
""" """
objects = list(objects) objects = list(objects)
stream = get_output_stream(sys.stderr) stream = get_output_stream(sys.stderr)
writer = ParallelStreamWriter(stream, msg) writer = ParallelStreamWriter(stream, msg)
q = setup_queue(writer, objects, func, index_func) for obj in objects:
writer.initialize(get_name(obj))
q = setup_queue(objects, func, get_deps, get_name)
done = 0 done = 0
errors = {} errors = {}
error_to_reraise = None
returned = [None] * len(objects)
while done < len(objects): while done < len(objects):
try: try:
msg_index, result = q.get(timeout=1) obj, result, exception = q.get(timeout=1)
except Empty: except Empty:
continue continue
# See https://github.com/docker/compose/issues/189 # See https://github.com/docker/compose/issues/189
except thread.error: except thread.error:
raise ShutdownException() raise ShutdownException()
if isinstance(result, APIError): if exception is None:
errors[msg_index] = "error", result.explanation writer.write(get_name(obj), 'done')
writer.write(msg_index, 'error') returned[objects.index(obj)] = result
elif isinstance(result, Exception): elif isinstance(exception, APIError):
errors[msg_index] = "unexpected_exception", result errors[get_name(obj)] = exception.explanation
writer.write(get_name(obj), 'error')
else: else:
writer.write(msg_index, 'done') errors[get_name(obj)] = exception
error_to_reraise = exception
done += 1 done += 1
if not errors: for obj_name, error in errors.items():
return stream.write("\nERROR: for {} {}\n".format(obj_name, error))
stream.write("\n") if error_to_reraise:
for msg_index, (result, error) in errors.items(): raise error_to_reraise
stream.write("ERROR: for {} {} \n".format(msg_index, error))
if result == 'unexpected_exception': return returned
raise error
def setup_queue(writer, objects, func, index_func): def _no_deps(x):
for obj in objects: return []
writer.initialize(index_func(obj))
q = Queue()
# TODO: limit the number of threads #1828 def setup_queue(objects, func, get_deps, get_name):
for obj in objects: if get_deps is None:
t = Thread( get_deps = _no_deps
target=perform_operation,
args=(func, obj, q.put, index_func(obj)))
t.daemon = True
t.start()
return q results = Queue()
started = set() # objects, threads were started for
finished = set() # already finished objects
def do_op(obj):
try:
result = func(obj)
results.put((obj, result, None))
except Exception as e:
results.put((obj, None, e))
finished.add(obj)
feed()
def ready(obj):
# Is object ready for performing operation
return obj not in started and all(
dep not in objects or dep in finished
for dep in get_deps(obj)
)
def feed():
ready_objects = [o for o in objects if ready(o)]
for obj in ready_objects:
started.add(obj)
t = Thread(target=do_op,
args=(obj,))
t.daemon = True
t.start()
feed()
return results
class ParallelStreamWriter(object): class ParallelStreamWriter(object):
@ -91,11 +121,15 @@ class ParallelStreamWriter(object):
self.lines = [] self.lines = []
def initialize(self, obj_index): def initialize(self, obj_index):
if self.msg is None:
return
self.lines.append(obj_index) self.lines.append(obj_index)
self.stream.write("{} {} ... \r\n".format(self.msg, obj_index)) self.stream.write("{} {} ... \r\n".format(self.msg, obj_index))
self.stream.flush() self.stream.flush()
def write(self, obj_index, status): def write(self, obj_index, status):
if self.msg is None:
return
position = self.lines.index(obj_index) position = self.lines.index(obj_index)
diff = len(self.lines) - position diff = len(self.lines) - position
# move up # move up
@ -121,10 +155,6 @@ def parallel_remove(containers, options):
parallel_operation(stopped_containers, 'remove', options, 'Removing') parallel_operation(stopped_containers, 'remove', options, 'Removing')
def parallel_stop(containers, options):
parallel_operation(containers, 'stop', options, 'Stopping')
def parallel_start(containers, options): def parallel_start(containers, options):
parallel_operation(containers, 'start', options, 'Starting') parallel_operation(containers, 'start', options, 'Starting')

View File

@ -3,6 +3,7 @@ from __future__ import unicode_literals
import datetime import datetime
import logging import logging
import operator
from functools import reduce from functools import reduce
from docker.errors import APIError from docker.errors import APIError
@ -200,13 +201,40 @@ class Project(object):
def start(self, service_names=None, **options): def start(self, service_names=None, **options):
containers = [] containers = []
for service in self.get_services(service_names):
service_containers = service.start(**options) def start_service(service):
service_containers = service.start(quiet=True, **options)
containers.extend(service_containers) containers.extend(service_containers)
services = self.get_services(service_names)
def get_deps(service):
return {self.get_service(dep) for dep in service.get_dependency_names()}
parallel.parallel_execute(
services,
start_service,
operator.attrgetter('name'),
'Starting',
get_deps)
return containers return containers
def stop(self, service_names=None, **options): def stop(self, service_names=None, **options):
parallel.parallel_stop(self.containers(service_names), options) containers = self.containers(service_names)
def get_deps(container):
# actually returning inversed dependencies
return {other for other in containers
if container.service in
self.get_service(other.service).get_dependency_names()}
parallel.parallel_execute(
containers,
operator.methodcaller('stop', **options),
operator.attrgetter('name'),
'Stopping',
get_deps)
def pause(self, service_names=None, **options): def pause(self, service_names=None, **options):
containers = self.containers(service_names) containers = self.containers(service_names)
@ -314,15 +342,33 @@ class Project(object):
include_deps=start_deps) include_deps=start_deps)
plans = self._get_convergence_plans(services, strategy) plans = self._get_convergence_plans(services, strategy)
return [
container for svc in services:
for service in services svc.ensure_image_exists(do_build=do_build)
for container in service.execute_convergence_plan(
def do(service):
return service.execute_convergence_plan(
plans[service.name], plans[service.name],
do_build=do_build, do_build=do_build,
timeout=timeout, timeout=timeout,
detached=detached detached=detached
) )
def get_deps(service):
return {self.get_service(dep) for dep in service.get_dependency_names()}
results = parallel.parallel_execute(
services,
do,
operator.attrgetter('name'),
None,
get_deps
)
return [
container
for svc_containers in results
if svc_containers is not None
for container in svc_containers
] ]
def initialize(self): def initialize(self):

View File

@ -436,9 +436,10 @@ class Service(object):
container.remove() container.remove()
return new_container return new_container
def start_container_if_stopped(self, container, attach_logs=False): def start_container_if_stopped(self, container, attach_logs=False, quiet=False):
if not container.is_running: if not container.is_running:
log.info("Starting %s" % container.name) if not quiet:
log.info("Starting %s" % container.name)
if attach_logs: if attach_logs:
container.attach_log_stream() container.attach_log_stream()
return self.start_container(container) return self.start_container(container)

View File

@ -8,6 +8,7 @@ import shlex
import signal import signal
import subprocess import subprocess
import time import time
from collections import Counter
from collections import namedtuple from collections import namedtuple
from operator import attrgetter from operator import attrgetter
@ -1346,7 +1347,7 @@ class CLITestCase(DockerClientTestCase):
os.kill(events_proc.pid, signal.SIGINT) os.kill(events_proc.pid, signal.SIGINT)
result = wait_on_process(events_proc, returncode=1) result = wait_on_process(events_proc, returncode=1)
lines = [json.loads(line) for line in result.stdout.rstrip().split('\n')] lines = [json.loads(line) for line in result.stdout.rstrip().split('\n')]
assert [e['action'] for e in lines] == ['create', 'start', 'create', 'start'] assert Counter(e['action'] for e in lines) == {'create': 2, 'start': 2}
def test_events_human_readable(self): def test_events_human_readable(self):
events_proc = start_process(self.base_dir, ['events']) events_proc = start_process(self.base_dir, ['events'])

View File

@ -5,7 +5,6 @@ import random
import py import py
import pytest import pytest
from docker.errors import APIError
from docker.errors import NotFound from docker.errors import NotFound
from ..helpers import build_config from ..helpers import build_config
@ -738,8 +737,7 @@ class ProjectTest(DockerClientTestCase):
config_data=config_data, config_data=config_data,
) )
with self.assertRaises(APIError): assert len(project.up()) == 0
project.up()
@v2_only() @v2_only()
def test_project_up_volumes(self): def test_project_up_volumes(self):