From 15d0c60a73bf700400de826bd122f3f1c30bd0c0 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 5 Oct 2015 12:56:10 -0400 Subject: [PATCH] Fix split buffer with inconsistently delimited json objects. Signed-off-by: Daniel Nephin --- compose/progress_stream.py | 5 +--- compose/service.py | 3 +- compose/utils.py | 52 ++++++++++++++++++++++++++------- tests/integration/testcases.py | 6 ++-- tests/unit/split_buffer_test.py | 2 +- tests/unit/utils_test.py | 16 ++++++++++ 6 files changed, 62 insertions(+), 22 deletions(-) create mode 100644 tests/unit/utils_test.py diff --git a/compose/progress_stream.py b/compose/progress_stream.py index ca8f35135..ac8e4b410 100644 --- a/compose/progress_stream.py +++ b/compose/progress_stream.py @@ -1,5 +1,3 @@ -import json - from compose import utils @@ -14,8 +12,7 @@ def stream_output(output, stream): lines = {} diff = 0 - for chunk in utils.stream_as_text(output): - event = json.loads(chunk) + for event in utils.json_stream(output): all_events.append(event) if 'progress' in event or 'progressDetail' in event: diff --git a/compose/service.py b/compose/service.py index bce2e534c..698ab4844 100644 --- a/compose/service.py +++ b/compose/service.py @@ -33,7 +33,6 @@ from .progress_stream import stream_output from .progress_stream import StreamOutputError from .utils import json_hash from .utils import parallel_execute -from .utils import split_buffer log = logging.getLogger(__name__) @@ -724,7 +723,7 @@ class Service(object): ) try: - all_events = stream_output(split_buffer(build_output), sys.stdout) + all_events = stream_output(build_output, sys.stdout) except StreamOutputError as e: raise BuildError(self, six.text_type(e)) diff --git a/compose/utils.py b/compose/utils.py index f201e2d6c..c8fddc5f1 100644 --- a/compose/utils.py +++ b/compose/utils.py @@ -1,6 +1,7 @@ import codecs import hashlib import json +import json.decoder import logging import sys from threading import Thread @@ -13,6 +14,8 @@ from six.moves.queue import Queue log = logging.getLogger(__name__) +json_decoder = json.JSONDecoder() + def parallel_execute(objects, obj_callable, msg_index, msg): """ @@ -96,29 +99,56 @@ def stream_as_text(stream): yield data -def split_buffer(reader, separator=u'\n'): - """ - Given a generator which yields strings and a separator string, +def line_splitter(buffer, separator=u'\n'): + index = buffer.find(six.text_type(separator)) + if index == -1: + return None, None + return buffer[:index + 1], buffer[index + 1:] + + +def split_buffer(stream, splitter=None, decoder=lambda a: a): + """Given a generator which yields strings and a splitter function, joins all input, splits on the separator and yields each chunk. Unlike string.split(), each chunk includes the trailing separator, except for the last one if none was found on the end of the input. """ + splitter = splitter or line_splitter buffered = six.text_type('') - separator = six.text_type(separator) - for data in stream_as_text(reader): + for data in stream_as_text(stream): buffered += data while True: - index = buffered.find(separator) - if index == -1: + item, rest = splitter(buffered) + if not item: break - yield buffered[:index + 1] - buffered = buffered[index + 1:] - if len(buffered) > 0: - yield buffered + buffered = rest + yield item + + if buffered: + yield decoder(buffered) + + +def json_splitter(buffer): + """Attempt to parse a json object from a buffer. If there is at least one + object, return it and the rest of the buffer, otherwise return None. + """ + try: + obj, index = json_decoder.raw_decode(buffer) + rest = buffer[json.decoder.WHITESPACE.match(buffer, index).end():] + return obj, rest + except ValueError: + return None, None + + +def json_stream(stream): + """Given a stream of text, return a stream of json objects. + This handles streams which are inconsistently buffered (some entries may + be newline delimited, and others are not). + """ + return split_buffer(stream_as_text(stream), json_splitter, json_decoder.decode) def write_out_msg(stream, lines, msg_index, msg, status="done"): diff --git a/tests/integration/testcases.py b/tests/integration/testcases.py index 7dec3728b..26a0a108a 100644 --- a/tests/integration/testcases.py +++ b/tests/integration/testcases.py @@ -9,8 +9,6 @@ from compose.config.config import ServiceLoader from compose.const import LABEL_PROJECT from compose.progress_stream import stream_output from compose.service import Service -from compose.utils import split_buffer -from compose.utils import stream_as_text def pull_busybox(client): @@ -73,5 +71,5 @@ class DockerClientTestCase(unittest.TestCase): def check_build(self, *args, **kwargs): kwargs.setdefault('rm', True) - build_output = stream_as_text(self.client.build(*args, **kwargs)) - stream_output(split_buffer(build_output), open('/dev/null', 'w')) + build_output = self.client.build(*args, **kwargs) + stream_output(build_output, open('/dev/null', 'w')) diff --git a/tests/unit/split_buffer_test.py b/tests/unit/split_buffer_test.py index 1775e4cb1..c41ea27d4 100644 --- a/tests/unit/split_buffer_test.py +++ b/tests/unit/split_buffer_test.py @@ -47,7 +47,7 @@ class SplitBufferTest(unittest.TestCase): self.assert_produces(reader, [string]) def assert_produces(self, reader, expectations): - split = split_buffer(reader(), u'\n') + split = split_buffer(reader()) for (actual, expected) in zip(split, expectations): self.assertEqual(type(actual), type(expected)) diff --git a/tests/unit/utils_test.py b/tests/unit/utils_test.py new file mode 100644 index 000000000..b272c7349 --- /dev/null +++ b/tests/unit/utils_test.py @@ -0,0 +1,16 @@ +from .. import unittest +from compose import utils + + +class JsonSplitterTestCase(unittest.TestCase): + + def test_json_splitter_no_object(self): + data = '{"foo": "bar' + self.assertEqual(utils.json_splitter(data), (None, None)) + + def test_json_splitter_with_object(self): + data = '{"foo": "bar"}\n \n{"next": "obj"}' + self.assertEqual( + utils.json_splitter(data), + ({'foo': 'bar'}, '{"next": "obj"}') + )