Revamp ParallelStreamWriter to fix display issues.

Signed-off-by: Matthieu Nottale <matthieu.nottale@docker.com>
This commit is contained in:
Matthieu Nottale 2018-03-05 14:28:46 +01:00
parent eee55231b8
commit 31dcfcff2a
5 changed files with 69 additions and 54 deletions

View File

@ -43,7 +43,36 @@ class GlobalLimit(object):
cls.global_limiter = Semaphore(value) cls.global_limiter = Semaphore(value)
def parallel_execute(objects, func, get_name, msg, get_deps=None, limit=None, parent_objects=None): def parallel_execute_watch(events, writer, errors, results, msg, get_name):
""" Watch events from a parallel execution, update status and fill errors and results.
Returns exception to re-raise.
"""
error_to_reraise = None
for obj, result, exception in events:
if exception is None:
writer.write(msg, get_name(obj), 'done', green)
results.append(result)
elif isinstance(exception, ImageNotFound):
# This is to bubble up ImageNotFound exceptions to the client so we
# can prompt the user if they want to rebuild.
errors[get_name(obj)] = exception.explanation
writer.write(msg, get_name(obj), 'error', red)
error_to_reraise = exception
elif isinstance(exception, APIError):
errors[get_name(obj)] = exception.explanation
writer.write(msg, get_name(obj), 'error', red)
elif isinstance(exception, (OperationFailedError, HealthCheckFailed, NoHealthCheckConfigured)):
errors[get_name(obj)] = exception.msg
writer.write(msg, get_name(obj), 'error', red)
elif isinstance(exception, UpstreamError):
writer.write(msg, get_name(obj), 'error', red)
else:
errors[get_name(obj)] = exception
error_to_reraise = exception
return error_to_reraise
def parallel_execute(objects, func, get_name, msg, get_deps=None, limit=None):
"""Runs func on objects in parallel while ensuring that func is """Runs func on objects in parallel while ensuring that func is
ran on object only after it is ran on all its dependencies. ran on object only after it is ran on all its dependencies.
@ -53,45 +82,21 @@ def parallel_execute(objects, func, get_name, msg, get_deps=None, limit=None, pa
objects = list(objects) objects = list(objects)
stream = get_output_stream(sys.stderr) stream = get_output_stream(sys.stderr)
writer = ParallelStreamWriter(stream, msg) if ParallelStreamWriter.instance:
writer = ParallelStreamWriter.instance
else:
writer = ParallelStreamWriter(stream)
display_objects = list(parent_objects) if parent_objects else objects
for obj in display_objects:
writer.add_object(get_name(obj))
# write data in a second loop to consider all objects for width alignment
# and avoid duplicates when parent_objects exists
for obj in objects: for obj in objects:
writer.write_initial(get_name(obj)) writer.add_object(msg, get_name(obj))
for obj in objects:
writer.write_initial(msg, get_name(obj))
events = parallel_execute_iter(objects, func, get_deps, limit) events = parallel_execute_iter(objects, func, get_deps, limit)
errors = {} errors = {}
results = [] results = []
error_to_reraise = None error_to_reraise = parallel_execute_watch(events, writer, errors, results, msg, get_name)
for obj, result, exception in events:
if exception is None:
writer.write(get_name(obj), 'done', green)
results.append(result)
elif isinstance(exception, ImageNotFound):
# This is to bubble up ImageNotFound exceptions to the client so we
# can prompt the user if they want to rebuild.
errors[get_name(obj)] = exception.explanation
writer.write(get_name(obj), 'error', red)
error_to_reraise = exception
elif isinstance(exception, APIError):
errors[get_name(obj)] = exception.explanation
writer.write(get_name(obj), 'error', red)
elif isinstance(exception, (OperationFailedError, HealthCheckFailed, NoHealthCheckConfigured)):
errors[get_name(obj)] = exception.msg
writer.write(get_name(obj), 'error', red)
elif isinstance(exception, UpstreamError):
writer.write(get_name(obj), 'error', red)
else:
errors[get_name(obj)] = exception
error_to_reraise = exception
for obj_name, error in errors.items(): for obj_name, error in errors.items():
stream.write("\nERROR: for {} {}\n".format(obj_name, error)) stream.write("\nERROR: for {} {}\n".format(obj_name, error))
@ -253,55 +258,58 @@ class ParallelStreamWriter(object):
noansi = False noansi = False
lock = Lock() lock = Lock()
instance = None
@classmethod @classmethod
def set_noansi(cls, value=True): def set_noansi(cls, value=True):
cls.noansi = value cls.noansi = value
def __init__(self, stream, msg): def __init__(self, stream):
self.stream = stream self.stream = stream
self.msg = msg
self.lines = [] self.lines = []
self.width = 0 self.width = 0
ParallelStreamWriter.instance = self
def add_object(self, obj_index): def add_object(self, msg, obj_index):
self.lines.append(obj_index) if msg is None:
self.width = max(self.width, len(obj_index))
def write_initial(self, obj_index):
if self.msg is None:
return return
self.stream.write("{} {:<{width}} ... \r\n".format( self.lines.append(msg + obj_index)
self.msg, self.lines[self.lines.index(obj_index)], width=self.width)) self.width = max(self.width, len(msg + ' ' + obj_index))
def write_initial(self, msg, obj_index):
if msg is None:
return
self.stream.write("{:<{width}} ... \r\n".format(
msg + ' ' + obj_index, width=self.width))
self.stream.flush() self.stream.flush()
def _write_ansi(self, obj_index, status): def _write_ansi(self, msg, obj_index, status):
self.lock.acquire() self.lock.acquire()
position = self.lines.index(obj_index) position = self.lines.index(msg + obj_index)
diff = len(self.lines) - position diff = len(self.lines) - position
# move up # move up
self.stream.write("%c[%dA" % (27, diff)) self.stream.write("%c[%dA" % (27, diff))
# erase # erase
self.stream.write("%c[2K\r" % 27) self.stream.write("%c[2K\r" % 27)
self.stream.write("{} {:<{width}} ... {}\r".format(self.msg, obj_index, self.stream.write("{:<{width}} ... {}\r".format(msg + ' ' + obj_index,
status, width=self.width)) status, width=self.width))
# move back down # move back down
self.stream.write("%c[%dB" % (27, diff)) self.stream.write("%c[%dB" % (27, diff))
self.stream.flush() self.stream.flush()
self.lock.release() self.lock.release()
def _write_noansi(self, obj_index, status): def _write_noansi(self, msg, obj_index, status):
self.stream.write("{} {:<{width}} ... {}\r\n".format(self.msg, obj_index, self.stream.write("{:<{width}} ... {}\r\n".format(msg + ' ' + obj_index,
status, width=self.width)) status, width=self.width))
self.stream.flush() self.stream.flush()
def write(self, obj_index, status, color_func): def write(self, msg, obj_index, status, color_func):
if self.msg is None: if msg is None:
return return
if self.noansi: if self.noansi:
self._write_noansi(obj_index, status) self._write_noansi(msg, obj_index, status)
else: else:
self._write_ansi(obj_index, color_func(status)) self._write_ansi(msg, obj_index, color_func(status))
def parallel_operation(containers, operation, options, message): def parallel_operation(containers, operation, options, message):

View File

@ -402,8 +402,7 @@ class Service(object):
[ServiceName(self.project, self.name, index) for index in range(i, i + scale)], [ServiceName(self.project, self.name, index) for index in range(i, i + scale)],
lambda service_name: create_and_start(self, service_name.number), lambda service_name: create_and_start(self, service_name.number),
lambda service_name: self.get_container_name(service_name.service, service_name.number), lambda service_name: self.get_container_name(service_name.service, service_name.number),
"Creating", "Creating"
parent_objects=project_services
) )
for error in errors.values(): for error in errors.values():
raise OperationFailedError(error) raise OperationFailedError(error)

View File

@ -35,6 +35,7 @@ from compose.const import LABEL_SERVICE
from compose.const import LABEL_VERSION from compose.const import LABEL_VERSION
from compose.container import Container from compose.container import Container
from compose.errors import OperationFailedError from compose.errors import OperationFailedError
from compose.parallel import ParallelStreamWriter
from compose.project import OneOffFilter from compose.project import OneOffFilter
from compose.service import ConvergencePlan from compose.service import ConvergencePlan
from compose.service import ConvergenceStrategy from compose.service import ConvergenceStrategy
@ -1197,6 +1198,7 @@ class ServiceTest(DockerClientTestCase):
service.create_container(number=next_number) service.create_container(number=next_number)
service.create_container(number=next_number + 1) service.create_container(number=next_number + 1)
ParallelStreamWriter.instance = None
with mock.patch('sys.stderr', new_callable=StringIO) as mock_stderr: with mock.patch('sys.stderr', new_callable=StringIO) as mock_stderr:
service.scale(2) service.scale(2)
for container in service.containers(): for container in service.containers():
@ -1220,6 +1222,7 @@ class ServiceTest(DockerClientTestCase):
for container in service.containers(): for container in service.containers():
assert not container.is_running assert not container.is_running
ParallelStreamWriter.instance = None
with mock.patch('sys.stderr', new_callable=StringIO) as mock_stderr: with mock.patch('sys.stderr', new_callable=StringIO) as mock_stderr:
service.scale(2) service.scale(2)

View File

@ -143,6 +143,7 @@ class ParallelTest(unittest.TestCase):
def test_parallel_execute_alignment(capsys): def test_parallel_execute_alignment(capsys):
ParallelStreamWriter.instance = None
results, errors = parallel_execute( results, errors = parallel_execute(
objects=["short", "a very long name"], objects=["short", "a very long name"],
func=lambda x: x, func=lambda x: x,
@ -158,6 +159,7 @@ def test_parallel_execute_alignment(capsys):
def test_parallel_execute_ansi(capsys): def test_parallel_execute_ansi(capsys):
ParallelStreamWriter.instance = None
ParallelStreamWriter.set_noansi(value=False) ParallelStreamWriter.set_noansi(value=False)
results, errors = parallel_execute( results, errors = parallel_execute(
objects=["something", "something more"], objects=["something", "something more"],
@ -173,6 +175,7 @@ def test_parallel_execute_ansi(capsys):
def test_parallel_execute_noansi(capsys): def test_parallel_execute_noansi(capsys):
ParallelStreamWriter.instance = None
ParallelStreamWriter.set_noansi() ParallelStreamWriter.set_noansi()
results, errors = parallel_execute( results, errors = parallel_execute(
objects=["something", "something more"], objects=["something", "something more"],

View File

@ -20,6 +20,7 @@ from compose.const import LABEL_PROJECT
from compose.const import LABEL_SERVICE from compose.const import LABEL_SERVICE
from compose.const import SECRETS_PATH from compose.const import SECRETS_PATH
from compose.container import Container from compose.container import Container
from compose.parallel import ParallelStreamWriter
from compose.project import OneOffFilter from compose.project import OneOffFilter
from compose.service import build_ulimits from compose.service import build_ulimits
from compose.service import build_volume_binding from compose.service import build_volume_binding
@ -727,6 +728,7 @@ class ServiceTest(unittest.TestCase):
@mock.patch('compose.service.log', autospec=True) @mock.patch('compose.service.log', autospec=True)
def test_only_log_warning_when_host_ports_clash(self, mock_log): def test_only_log_warning_when_host_ports_clash(self, mock_log):
self.mock_client.inspect_image.return_value = {'Id': 'abcd'} self.mock_client.inspect_image.return_value = {'Id': 'abcd'}
ParallelStreamWriter.instance = None
name = 'foo' name = 'foo'
service = Service( service = Service(
name, name,