From 57eb1c463f6f6f6be98c0f8371b69352e45d29c6 Mon Sep 17 00:00:00 2001 From: Guillermo Arribas Date: Tue, 10 Oct 2017 11:55:14 -0300 Subject: [PATCH] Progress markers are not shown correctly for docker-compose up (fixes #4801) Signed-off-by: Guillermo Arribas --- compose/parallel.py | 23 ++++++++++++++++------- compose/project.py | 25 ++++++++++++++++++++++++- compose/service.py | 23 ++++++++++++----------- tests/unit/service_test.py | 2 +- 4 files changed, 53 insertions(+), 20 deletions(-) diff --git a/compose/parallel.py b/compose/parallel.py index d455711dd..f271561ff 100644 --- a/compose/parallel.py +++ b/compose/parallel.py @@ -26,7 +26,7 @@ log = logging.getLogger(__name__) STOP = object() -def parallel_execute(objects, func, get_name, msg, get_deps=None, limit=None): +def parallel_execute(objects, func, get_name, msg, get_deps=None, limit=None, parent_objects=None): """Runs func on objects in parallel while ensuring that func is ran on object only after it is ran on all its dependencies. @@ -37,9 +37,19 @@ def parallel_execute(objects, func, get_name, msg, get_deps=None, limit=None): stream = get_output_stream(sys.stderr) writer = ParallelStreamWriter(stream, msg) - for obj in objects: + + if parent_objects: + display_objects = list(parent_objects) + else: + display_objects = objects + + for obj in display_objects: writer.add_object(get_name(obj)) - writer.write_initial() + + # write data in a second loop to consider all objects for width alignment + # and avoid duplicates when parent_objects exists + for obj in objects: + writer.write_initial(get_name(obj)) events = parallel_execute_iter(objects, func, get_deps, limit) @@ -237,12 +247,11 @@ class ParallelStreamWriter(object): self.lines.append(obj_index) self.width = max(self.width, len(obj_index)) - def write_initial(self): + def write_initial(self, obj_index): if self.msg is None: return - for line in self.lines: - self.stream.write("{} {:<{width}} ... \r\n".format(self.msg, line, - width=self.width)) + self.stream.write("{} {:<{width}} ... \r\n".format( + self.msg, self.lines[self.lines.index(obj_index)], width=self.width)) self.stream.flush() def _write_ansi(self, obj_index, status): diff --git a/compose/project.py b/compose/project.py index c8b57edd2..f6bd30a88 100644 --- a/compose/project.py +++ b/compose/project.py @@ -29,6 +29,7 @@ from .service import ConvergenceStrategy from .service import NetworkMode from .service import PidMode from .service import Service +from .service import ServiceName from .service import ServiceNetworkMode from .service import ServicePidMode from .utils import microseconds_from_time_nano @@ -190,6 +191,25 @@ class Project(object): service.remove_duplicate_containers() return services + def get_scaled_services(self, services, scale_override): + """ + Returns a list of this project's services as scaled ServiceName objects. + + services: a list of Service objects + scale_override: a dict with the scale to apply to each service (k: service_name, v: scale) + """ + service_names = [] + for service in services: + if service.name in scale_override: + scale = scale_override[service.name] + else: + scale = service.scale_num + + for i in range(1, scale + 1): + service_names.append(ServiceName(self.name, service.name, i)) + + return service_names + def get_links(self, service_dict): links = [] if 'links' in service_dict: @@ -430,15 +450,18 @@ class Project(object): for svc in services: svc.ensure_image_exists(do_build=do_build) plans = self._get_convergence_plans(services, strategy) + scaled_services = self.get_scaled_services(services, scale_override) def do(service): + return service.execute_convergence_plan( plans[service.name], timeout=timeout, detached=detached, scale_override=scale_override.get(service.name), rescale=rescale, - start=start + start=start, + project_services=scaled_services ) def get_deps(service): diff --git a/compose/service.py b/compose/service.py index 1a18c6654..48d428cb8 100644 --- a/compose/service.py +++ b/compose/service.py @@ -378,11 +378,11 @@ class Service(object): return has_diverged - def _execute_convergence_create(self, scale, detached, start): + def _execute_convergence_create(self, scale, detached, start, project_services=None): i = self._next_container_number() def create_and_start(service, n): - container = service.create_container(number=n) + container = service.create_container(number=n, quiet=True) if not detached: container.attach_log_stream() if start: @@ -390,10 +390,11 @@ class Service(object): return container containers, errors = parallel_execute( - range(i, i + scale), - lambda n: create_and_start(self, n), - lambda n: self.get_container_name(n), + [ServiceName(self.project, self.name, index) for index in range(i, i + scale)], + lambda service_name: create_and_start(self, service_name.number), + lambda service_name: self.get_container_name(service_name.service, service_name.number), "Creating", + parent_objects=project_services ) for error in errors.values(): raise OperationFailedError(error) @@ -432,7 +433,7 @@ class Service(object): if start: _, errors = parallel_execute( containers, - lambda c: self.start_container_if_stopped(c, attach_logs=not detached), + lambda c: self.start_container_if_stopped(c, attach_logs=not detached, quiet=True), lambda c: c.name, "Starting", ) @@ -459,7 +460,7 @@ class Service(object): ) def execute_convergence_plan(self, plan, timeout=None, detached=False, - start=True, scale_override=None, rescale=True): + start=True, scale_override=None, rescale=True, project_services=None): (action, containers) = plan scale = scale_override if scale_override is not None else self.scale_num containers = sorted(containers, key=attrgetter('number')) @@ -468,7 +469,7 @@ class Service(object): if action == 'create': return self._execute_convergence_create( - scale, detached, start + scale, detached, start, project_services ) # The create action needs always needs an initial scale, but otherwise, @@ -741,7 +742,7 @@ class Service(object): container_options.update(override_options) if not container_options.get('name'): - container_options['name'] = self.get_container_name(number, one_off) + container_options['name'] = self.get_container_name(self.name, number, one_off) container_options.setdefault('detach', True) @@ -960,12 +961,12 @@ class Service(object): def custom_container_name(self): return self.options.get('container_name') - def get_container_name(self, number, one_off=False): + def get_container_name(self, service_name, number, one_off=False): if self.custom_container_name and not one_off: return self.custom_container_name container_name = build_container_name( - self.project, self.name, number, one_off, + self.project, service_name, number, one_off, ) ext_links_origins = [l.split(':')[0] for l in self.options.get('external_links', [])] if container_name in ext_links_origins: diff --git a/tests/unit/service_test.py b/tests/unit/service_test.py index 7d61807ba..0bf0280de 100644 --- a/tests/unit/service_test.py +++ b/tests/unit/service_test.py @@ -179,7 +179,7 @@ class ServiceTest(unittest.TestCase): external_links=['default_foo_1'] ) with self.assertRaises(DependencyError): - service.get_container_name(1) + service.get_container_name('foo', 1) def test_mem_reservation(self): self.mock_client.create_host_config.return_value = {}