Merge pull request #5250 from garribas/4801-progress-markers

Progress markers are not shown correctly for docker-compose up (fixes…
This commit is contained in:
Joffrey F 2017-11-09 14:29:34 -08:00 committed by GitHub
commit 0079ac52c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 53 additions and 20 deletions

View File

@ -26,7 +26,7 @@ log = logging.getLogger(__name__)
STOP = object() STOP = object()
def parallel_execute(objects, func, get_name, msg, get_deps=None, limit=None): def parallel_execute(objects, func, get_name, msg, get_deps=None, limit=None, parent_objects=None):
"""Runs func on objects in parallel while ensuring that func is """Runs func on objects in parallel while ensuring that func is
ran on object only after it is ran on all its dependencies. ran on object only after it is ran on all its dependencies.
@ -37,9 +37,19 @@ def parallel_execute(objects, func, get_name, msg, get_deps=None, limit=None):
stream = get_output_stream(sys.stderr) stream = get_output_stream(sys.stderr)
writer = ParallelStreamWriter(stream, msg) writer = ParallelStreamWriter(stream, msg)
for obj in objects:
if parent_objects:
display_objects = list(parent_objects)
else:
display_objects = objects
for obj in display_objects:
writer.add_object(get_name(obj)) writer.add_object(get_name(obj))
writer.write_initial()
# write data in a second loop to consider all objects for width alignment
# and avoid duplicates when parent_objects exists
for obj in objects:
writer.write_initial(get_name(obj))
events = parallel_execute_iter(objects, func, get_deps, limit) events = parallel_execute_iter(objects, func, get_deps, limit)
@ -237,12 +247,11 @@ class ParallelStreamWriter(object):
self.lines.append(obj_index) self.lines.append(obj_index)
self.width = max(self.width, len(obj_index)) self.width = max(self.width, len(obj_index))
def write_initial(self): def write_initial(self, obj_index):
if self.msg is None: if self.msg is None:
return return
for line in self.lines: self.stream.write("{} {:<{width}} ... \r\n".format(
self.stream.write("{} {:<{width}} ... \r\n".format(self.msg, line, self.msg, self.lines[self.lines.index(obj_index)], width=self.width))
width=self.width))
self.stream.flush() self.stream.flush()
def _write_ansi(self, obj_index, status): def _write_ansi(self, obj_index, status):

View File

@ -29,6 +29,7 @@ from .service import ConvergenceStrategy
from .service import NetworkMode from .service import NetworkMode
from .service import PidMode from .service import PidMode
from .service import Service from .service import Service
from .service import ServiceName
from .service import ServiceNetworkMode from .service import ServiceNetworkMode
from .service import ServicePidMode from .service import ServicePidMode
from .utils import microseconds_from_time_nano from .utils import microseconds_from_time_nano
@ -190,6 +191,25 @@ class Project(object):
service.remove_duplicate_containers() service.remove_duplicate_containers()
return services return services
def get_scaled_services(self, services, scale_override):
"""
Returns a list of this project's services as scaled ServiceName objects.
services: a list of Service objects
scale_override: a dict with the scale to apply to each service (k: service_name, v: scale)
"""
service_names = []
for service in services:
if service.name in scale_override:
scale = scale_override[service.name]
else:
scale = service.scale_num
for i in range(1, scale + 1):
service_names.append(ServiceName(self.name, service.name, i))
return service_names
def get_links(self, service_dict): def get_links(self, service_dict):
links = [] links = []
if 'links' in service_dict: if 'links' in service_dict:
@ -430,15 +450,18 @@ class Project(object):
for svc in services: for svc in services:
svc.ensure_image_exists(do_build=do_build) svc.ensure_image_exists(do_build=do_build)
plans = self._get_convergence_plans(services, strategy) plans = self._get_convergence_plans(services, strategy)
scaled_services = self.get_scaled_services(services, scale_override)
def do(service): def do(service):
return service.execute_convergence_plan( return service.execute_convergence_plan(
plans[service.name], plans[service.name],
timeout=timeout, timeout=timeout,
detached=detached, detached=detached,
scale_override=scale_override.get(service.name), scale_override=scale_override.get(service.name),
rescale=rescale, rescale=rescale,
start=start start=start,
project_services=scaled_services
) )
def get_deps(service): def get_deps(service):

View File

@ -381,11 +381,11 @@ class Service(object):
return has_diverged return has_diverged
def _execute_convergence_create(self, scale, detached, start): def _execute_convergence_create(self, scale, detached, start, project_services=None):
i = self._next_container_number() i = self._next_container_number()
def create_and_start(service, n): def create_and_start(service, n):
container = service.create_container(number=n) container = service.create_container(number=n, quiet=True)
if not detached: if not detached:
container.attach_log_stream() container.attach_log_stream()
if start: if start:
@ -393,10 +393,11 @@ class Service(object):
return container return container
containers, errors = parallel_execute( containers, errors = parallel_execute(
range(i, i + scale), [ServiceName(self.project, self.name, index) for index in range(i, i + scale)],
lambda n: create_and_start(self, n), lambda service_name: create_and_start(self, service_name.number),
lambda n: self.get_container_name(n), lambda service_name: self.get_container_name(service_name.service, service_name.number),
"Creating", "Creating",
parent_objects=project_services
) )
for error in errors.values(): for error in errors.values():
raise OperationFailedError(error) raise OperationFailedError(error)
@ -435,7 +436,7 @@ class Service(object):
if start: if start:
_, errors = parallel_execute( _, errors = parallel_execute(
containers, containers,
lambda c: self.start_container_if_stopped(c, attach_logs=not detached), lambda c: self.start_container_if_stopped(c, attach_logs=not detached, quiet=True),
lambda c: c.name, lambda c: c.name,
"Starting", "Starting",
) )
@ -462,7 +463,7 @@ class Service(object):
) )
def execute_convergence_plan(self, plan, timeout=None, detached=False, def execute_convergence_plan(self, plan, timeout=None, detached=False,
start=True, scale_override=None, rescale=True): start=True, scale_override=None, rescale=True, project_services=None):
(action, containers) = plan (action, containers) = plan
scale = scale_override if scale_override is not None else self.scale_num scale = scale_override if scale_override is not None else self.scale_num
containers = sorted(containers, key=attrgetter('number')) containers = sorted(containers, key=attrgetter('number'))
@ -471,7 +472,7 @@ class Service(object):
if action == 'create': if action == 'create':
return self._execute_convergence_create( return self._execute_convergence_create(
scale, detached, start scale, detached, start, project_services
) )
# The create action needs always needs an initial scale, but otherwise, # The create action needs always needs an initial scale, but otherwise,
@ -744,7 +745,7 @@ class Service(object):
container_options.update(override_options) container_options.update(override_options)
if not container_options.get('name'): if not container_options.get('name'):
container_options['name'] = self.get_container_name(number, one_off) container_options['name'] = self.get_container_name(self.name, number, one_off)
container_options.setdefault('detach', True) container_options.setdefault('detach', True)
@ -969,12 +970,12 @@ class Service(object):
def custom_container_name(self): def custom_container_name(self):
return self.options.get('container_name') return self.options.get('container_name')
def get_container_name(self, number, one_off=False): def get_container_name(self, service_name, number, one_off=False):
if self.custom_container_name and not one_off: if self.custom_container_name and not one_off:
return self.custom_container_name return self.custom_container_name
container_name = build_container_name( container_name = build_container_name(
self.project, self.name, number, one_off, self.project, service_name, number, one_off,
) )
ext_links_origins = [l.split(':')[0] for l in self.options.get('external_links', [])] ext_links_origins = [l.split(':')[0] for l in self.options.get('external_links', [])]
if container_name in ext_links_origins: if container_name in ext_links_origins:

View File

@ -175,7 +175,7 @@ class ServiceTest(unittest.TestCase):
external_links=['default_foo_1'] external_links=['default_foo_1']
) )
with self.assertRaises(DependencyError): with self.assertRaises(DependencyError):
service.get_container_name(1) service.get_container_name('foo', 1)
def test_mem_reservation(self): def test_mem_reservation(self):
self.mock_client.create_host_config.return_value = {} self.mock_client.create_host_config.return_value = {}