From 31dcfcff2ad9124d028e642e5dd61530714b15c7 Mon Sep 17 00:00:00 2001 From: Matthieu Nottale Date: Mon, 5 Mar 2018 14:28:46 +0100 Subject: [PATCH] Revamp ParallelStreamWriter to fix display issues. Signed-off-by: Matthieu Nottale --- compose/parallel.py | 112 ++++++++++++++++-------------- compose/service.py | 3 +- tests/integration/service_test.py | 3 + tests/unit/parallel_test.py | 3 + tests/unit/service_test.py | 2 + 5 files changed, 69 insertions(+), 54 deletions(-) diff --git a/compose/parallel.py b/compose/parallel.py index dd83c70cd..5d4791f97 100644 --- a/compose/parallel.py +++ b/compose/parallel.py @@ -43,7 +43,36 @@ class GlobalLimit(object): cls.global_limiter = Semaphore(value) -def parallel_execute(objects, func, get_name, msg, get_deps=None, limit=None, parent_objects=None): +def parallel_execute_watch(events, writer, errors, results, msg, get_name): + """ Watch events from a parallel execution, update status and fill errors and results. + Returns exception to re-raise. + """ + error_to_reraise = None + for obj, result, exception in events: + if exception is None: + writer.write(msg, get_name(obj), 'done', green) + results.append(result) + elif isinstance(exception, ImageNotFound): + # This is to bubble up ImageNotFound exceptions to the client so we + # can prompt the user if they want to rebuild. + errors[get_name(obj)] = exception.explanation + writer.write(msg, get_name(obj), 'error', red) + error_to_reraise = exception + elif isinstance(exception, APIError): + errors[get_name(obj)] = exception.explanation + writer.write(msg, get_name(obj), 'error', red) + elif isinstance(exception, (OperationFailedError, HealthCheckFailed, NoHealthCheckConfigured)): + errors[get_name(obj)] = exception.msg + writer.write(msg, get_name(obj), 'error', red) + elif isinstance(exception, UpstreamError): + writer.write(msg, get_name(obj), 'error', red) + else: + errors[get_name(obj)] = exception + error_to_reraise = exception + return error_to_reraise + + +def parallel_execute(objects, func, get_name, msg, get_deps=None, limit=None): """Runs func on objects in parallel while ensuring that func is ran on object only after it is ran on all its dependencies. @@ -53,45 +82,21 @@ def parallel_execute(objects, func, get_name, msg, get_deps=None, limit=None, pa objects = list(objects) stream = get_output_stream(sys.stderr) - writer = ParallelStreamWriter(stream, msg) + if ParallelStreamWriter.instance: + writer = ParallelStreamWriter.instance + else: + writer = ParallelStreamWriter(stream) - display_objects = list(parent_objects) if parent_objects else objects - - for obj in display_objects: - writer.add_object(get_name(obj)) - - # write data in a second loop to consider all objects for width alignment - # and avoid duplicates when parent_objects exists for obj in objects: - writer.write_initial(get_name(obj)) + writer.add_object(msg, get_name(obj)) + for obj in objects: + writer.write_initial(msg, get_name(obj)) events = parallel_execute_iter(objects, func, get_deps, limit) errors = {} results = [] - error_to_reraise = None - - for obj, result, exception in events: - if exception is None: - writer.write(get_name(obj), 'done', green) - results.append(result) - elif isinstance(exception, ImageNotFound): - # This is to bubble up ImageNotFound exceptions to the client so we - # can prompt the user if they want to rebuild. - errors[get_name(obj)] = exception.explanation - writer.write(get_name(obj), 'error', red) - error_to_reraise = exception - elif isinstance(exception, APIError): - errors[get_name(obj)] = exception.explanation - writer.write(get_name(obj), 'error', red) - elif isinstance(exception, (OperationFailedError, HealthCheckFailed, NoHealthCheckConfigured)): - errors[get_name(obj)] = exception.msg - writer.write(get_name(obj), 'error', red) - elif isinstance(exception, UpstreamError): - writer.write(get_name(obj), 'error', red) - else: - errors[get_name(obj)] = exception - error_to_reraise = exception + error_to_reraise = parallel_execute_watch(events, writer, errors, results, msg, get_name) for obj_name, error in errors.items(): stream.write("\nERROR: for {} {}\n".format(obj_name, error)) @@ -253,55 +258,58 @@ class ParallelStreamWriter(object): noansi = False lock = Lock() + instance = None @classmethod def set_noansi(cls, value=True): cls.noansi = value - def __init__(self, stream, msg): + def __init__(self, stream): self.stream = stream - self.msg = msg self.lines = [] self.width = 0 + ParallelStreamWriter.instance = self - def add_object(self, obj_index): - self.lines.append(obj_index) - self.width = max(self.width, len(obj_index)) - - def write_initial(self, obj_index): - if self.msg is None: + def add_object(self, msg, obj_index): + if msg is None: return - self.stream.write("{} {:<{width}} ... \r\n".format( - self.msg, self.lines[self.lines.index(obj_index)], width=self.width)) + self.lines.append(msg + obj_index) + self.width = max(self.width, len(msg + ' ' + obj_index)) + + def write_initial(self, msg, obj_index): + if msg is None: + return + self.stream.write("{:<{width}} ... \r\n".format( + msg + ' ' + obj_index, width=self.width)) self.stream.flush() - def _write_ansi(self, obj_index, status): + def _write_ansi(self, msg, obj_index, status): self.lock.acquire() - position = self.lines.index(obj_index) + position = self.lines.index(msg + obj_index) diff = len(self.lines) - position # move up self.stream.write("%c[%dA" % (27, diff)) # erase self.stream.write("%c[2K\r" % 27) - self.stream.write("{} {:<{width}} ... {}\r".format(self.msg, obj_index, + self.stream.write("{:<{width}} ... {}\r".format(msg + ' ' + obj_index, status, width=self.width)) # move back down self.stream.write("%c[%dB" % (27, diff)) self.stream.flush() self.lock.release() - def _write_noansi(self, obj_index, status): - self.stream.write("{} {:<{width}} ... {}\r\n".format(self.msg, obj_index, + def _write_noansi(self, msg, obj_index, status): + self.stream.write("{:<{width}} ... {}\r\n".format(msg + ' ' + obj_index, status, width=self.width)) self.stream.flush() - def write(self, obj_index, status, color_func): - if self.msg is None: + def write(self, msg, obj_index, status, color_func): + if msg is None: return if self.noansi: - self._write_noansi(obj_index, status) + self._write_noansi(msg, obj_index, status) else: - self._write_ansi(obj_index, color_func(status)) + self._write_ansi(msg, obj_index, color_func(status)) def parallel_operation(containers, operation, options, message): diff --git a/compose/service.py b/compose/service.py index b9f9af2cd..652080478 100644 --- a/compose/service.py +++ b/compose/service.py @@ -402,8 +402,7 @@ class Service(object): [ServiceName(self.project, self.name, index) for index in range(i, i + scale)], lambda service_name: create_and_start(self, service_name.number), lambda service_name: self.get_container_name(service_name.service, service_name.number), - "Creating", - parent_objects=project_services + "Creating" ) for error in errors.values(): raise OperationFailedError(error) diff --git a/tests/integration/service_test.py b/tests/integration/service_test.py index 2b6b7711e..6e86a02d4 100644 --- a/tests/integration/service_test.py +++ b/tests/integration/service_test.py @@ -35,6 +35,7 @@ from compose.const import LABEL_SERVICE from compose.const import LABEL_VERSION from compose.container import Container from compose.errors import OperationFailedError +from compose.parallel import ParallelStreamWriter from compose.project import OneOffFilter from compose.service import ConvergencePlan from compose.service import ConvergenceStrategy @@ -1197,6 +1198,7 @@ class ServiceTest(DockerClientTestCase): service.create_container(number=next_number) service.create_container(number=next_number + 1) + ParallelStreamWriter.instance = None with mock.patch('sys.stderr', new_callable=StringIO) as mock_stderr: service.scale(2) for container in service.containers(): @@ -1220,6 +1222,7 @@ class ServiceTest(DockerClientTestCase): for container in service.containers(): assert not container.is_running + ParallelStreamWriter.instance = None with mock.patch('sys.stderr', new_callable=StringIO) as mock_stderr: service.scale(2) diff --git a/tests/unit/parallel_test.py b/tests/unit/parallel_test.py index 4ebc24d8c..0735bfccb 100644 --- a/tests/unit/parallel_test.py +++ b/tests/unit/parallel_test.py @@ -143,6 +143,7 @@ class ParallelTest(unittest.TestCase): def test_parallel_execute_alignment(capsys): + ParallelStreamWriter.instance = None results, errors = parallel_execute( objects=["short", "a very long name"], func=lambda x: x, @@ -158,6 +159,7 @@ def test_parallel_execute_alignment(capsys): def test_parallel_execute_ansi(capsys): + ParallelStreamWriter.instance = None ParallelStreamWriter.set_noansi(value=False) results, errors = parallel_execute( objects=["something", "something more"], @@ -173,6 +175,7 @@ def test_parallel_execute_ansi(capsys): def test_parallel_execute_noansi(capsys): + ParallelStreamWriter.instance = None ParallelStreamWriter.set_noansi() results, errors = parallel_execute( objects=["something", "something more"], diff --git a/tests/unit/service_test.py b/tests/unit/service_test.py index c315dcc4d..9128b9550 100644 --- a/tests/unit/service_test.py +++ b/tests/unit/service_test.py @@ -20,6 +20,7 @@ from compose.const import LABEL_PROJECT from compose.const import LABEL_SERVICE from compose.const import SECRETS_PATH from compose.container import Container +from compose.parallel import ParallelStreamWriter from compose.project import OneOffFilter from compose.service import build_ulimits from compose.service import build_volume_binding @@ -727,6 +728,7 @@ class ServiceTest(unittest.TestCase): @mock.patch('compose.service.log', autospec=True) def test_only_log_warning_when_host_ports_clash(self, mock_log): self.mock_client.inspect_image.return_value = {'Id': 'abcd'} + ParallelStreamWriter.instance = None name = 'foo' service = Service( name,