From 3661e8bc7419ae34e4639edec91df2e1db707312 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Fri, 2 Oct 2015 19:47:27 -0400 Subject: [PATCH] Fix build against the swarm cluster by joining buffered output before parsing json. Signed-off-by: Daniel Nephin --- compose/cli/log_printer.py | 4 ++-- compose/cli/utils.py | 26 ---------------------- compose/progress_stream.py | 6 +----- compose/service.py | 4 +++- compose/utils.py | 38 +++++++++++++++++++++++++++++++++ tests/integration/testcases.py | 6 ++++-- tests/unit/split_buffer_test.py | 2 +- 7 files changed, 49 insertions(+), 37 deletions(-) diff --git a/compose/cli/log_printer.py b/compose/cli/log_printer.py index 845f799b7..6e1499e1d 100644 --- a/compose/cli/log_printer.py +++ b/compose/cli/log_printer.py @@ -6,8 +6,8 @@ from itertools import cycle from . import colors from .multiplexer import Multiplexer -from .utils import split_buffer from compose import utils +from compose.utils import split_buffer class LogPrinter(object): @@ -75,7 +75,7 @@ def build_no_log_generator(container, prefix, color_func): def build_log_generator(container, prefix, color_func): # Attach to container before log printer starts running stream = container.attach(stdout=True, stderr=True, stream=True, logs=True) - line_generator = split_buffer(stream, u'\n') + line_generator = split_buffer(stream) for line in line_generator: yield prefix + line diff --git a/compose/cli/utils.py b/compose/cli/utils.py index 5840f0a8c..07510e2f3 100644 --- a/compose/cli/utils.py +++ b/compose/cli/utils.py @@ -7,7 +7,6 @@ import platform import ssl import subprocess -import six from docker import version as docker_py_version from six.moves import input @@ -36,31 +35,6 @@ def yesno(prompt, default=None): return None -def split_buffer(reader, separator): - """ - Given a generator which yields strings and a separator string, - joins all input, splits on the separator and yields each chunk. - - Unlike string.split(), each chunk includes the trailing - separator, except for the last one if none was found on the end - of the input. - """ - buffered = six.text_type('') - separator = six.text_type(separator) - - for data in reader: - buffered += data.decode('utf-8') - while True: - index = buffered.find(separator) - if index == -1: - break - yield buffered[:index + 1] - buffered = buffered[index + 1:] - - if len(buffered) > 0: - yield buffered - - def call_silently(*args, **kwargs): """ Like subprocess.call(), but redirects stdout and stderr to /dev/null. diff --git a/compose/progress_stream.py b/compose/progress_stream.py index c44b33e56..ca8f35135 100644 --- a/compose/progress_stream.py +++ b/compose/progress_stream.py @@ -1,7 +1,5 @@ import json -import six - from compose import utils @@ -16,9 +14,7 @@ def stream_output(output, stream): lines = {} diff = 0 - for chunk in output: - if six.PY3: - chunk = chunk.decode('utf-8') + for chunk in utils.stream_as_text(output): event = json.loads(chunk) all_events.append(event) diff --git a/compose/service.py b/compose/service.py index c9ca00ae4..bce2e534c 100644 --- a/compose/service.py +++ b/compose/service.py @@ -33,6 +33,8 @@ from .progress_stream import stream_output from .progress_stream import StreamOutputError from .utils import json_hash from .utils import parallel_execute +from .utils import split_buffer + log = logging.getLogger(__name__) @@ -722,7 +724,7 @@ class Service(object): ) try: - all_events = stream_output(build_output, sys.stdout) + all_events = stream_output(split_buffer(build_output), sys.stdout) except StreamOutputError as e: raise BuildError(self, six.text_type(e)) diff --git a/compose/utils.py b/compose/utils.py index e0304ba50..f201e2d6c 100644 --- a/compose/utils.py +++ b/compose/utils.py @@ -83,6 +83,44 @@ def get_output_stream(stream): return codecs.getwriter('utf-8')(stream) +def stream_as_text(stream): + """Given a stream of bytes or text, if any of the items in the stream + are bytes convert them to text. + + This function can be removed once docker-py returns text streams instead + of byte streams. + """ + for data in stream: + if not isinstance(data, six.text_type): + data = data.decode('utf-8') + yield data + + +def split_buffer(reader, separator=u'\n'): + """ + Given a generator which yields strings and a separator string, + joins all input, splits on the separator and yields each chunk. + + Unlike string.split(), each chunk includes the trailing + separator, except for the last one if none was found on the end + of the input. + """ + buffered = six.text_type('') + separator = six.text_type(separator) + + for data in stream_as_text(reader): + buffered += data + while True: + index = buffered.find(separator) + if index == -1: + break + yield buffered[:index + 1] + buffered = buffered[index + 1:] + + if len(buffered) > 0: + yield buffered + + def write_out_msg(stream, lines, msg_index, msg, status="done"): """ Using special ANSI code characters we can write out the msg over the top of diff --git a/tests/integration/testcases.py b/tests/integration/testcases.py index 26a0a108a..7dec3728b 100644 --- a/tests/integration/testcases.py +++ b/tests/integration/testcases.py @@ -9,6 +9,8 @@ from compose.config.config import ServiceLoader from compose.const import LABEL_PROJECT from compose.progress_stream import stream_output from compose.service import Service +from compose.utils import split_buffer +from compose.utils import stream_as_text def pull_busybox(client): @@ -71,5 +73,5 @@ class DockerClientTestCase(unittest.TestCase): def check_build(self, *args, **kwargs): kwargs.setdefault('rm', True) - build_output = self.client.build(*args, **kwargs) - stream_output(build_output, open('/dev/null', 'w')) + build_output = stream_as_text(self.client.build(*args, **kwargs)) + stream_output(split_buffer(build_output), open('/dev/null', 'w')) diff --git a/tests/unit/split_buffer_test.py b/tests/unit/split_buffer_test.py index 47c72f086..1775e4cb1 100644 --- a/tests/unit/split_buffer_test.py +++ b/tests/unit/split_buffer_test.py @@ -2,7 +2,7 @@ from __future__ import absolute_import from __future__ import unicode_literals from .. import unittest -from compose.cli.utils import split_buffer +from compose.utils import split_buffer class SplitBufferTest(unittest.TestCase):