diff --git a/compose/cli/log_printer.py b/compose/cli/log_printer.py index ce7e10653..9c5d35e18 100644 --- a/compose/cli/log_printer.py +++ b/compose/cli/log_printer.py @@ -4,7 +4,7 @@ import sys from itertools import cycle -from .multiplexer import Multiplexer, STOP +from .multiplexer import Multiplexer from . import colors from .utils import split_buffer @@ -61,7 +61,6 @@ class LogPrinter(object): exit_code = container.wait() yield color_fn("%s exited with code %s\n" % (container.name, exit_code)) - yield STOP def _generate_prefix(self, container): """ diff --git a/compose/cli/multiplexer.py b/compose/cli/multiplexer.py index 849dbd66a..955af6322 100644 --- a/compose/cli/multiplexer.py +++ b/compose/cli/multiplexer.py @@ -7,36 +7,48 @@ except ImportError: from queue import Queue, Empty # Python 3.x -# Yield STOP from an input generator to stop the -# top-level loop without processing any more input. STOP = object() class Multiplexer(object): - def __init__(self, generators): - self.generators = generators + """ + Create a single iterator from several iterators by running all of them in + parallel and yielding results as they come in. + """ + + def __init__(self, iterators): + self.iterators = iterators + self._num_running = len(iterators) self.queue = Queue() def loop(self): self._init_readers() - while True: + while self._num_running > 0: try: - item = self.queue.get(timeout=0.1) + item, exception = self.queue.get(timeout=0.1) + + if exception: + raise exception + if item is STOP: - break + self._num_running -= 1 else: yield item except Empty: pass def _init_readers(self): - for generator in self.generators: - t = Thread(target=_enqueue_output, args=(generator, self.queue)) + for iterator in self.iterators: + t = Thread(target=_enqueue_output, args=(iterator, self.queue)) t.daemon = True t.start() -def _enqueue_output(generator, queue): - for item in generator: - queue.put(item) +def _enqueue_output(iterator, queue): + try: + for item in iterator: + queue.put((item, None)) + queue.put((STOP, None)) + except Exception as e: + queue.put((None, e)) diff --git a/tests/unit/multiplexer_test.py b/tests/unit/multiplexer_test.py new file mode 100644 index 000000000..d565d39d1 --- /dev/null +++ b/tests/unit/multiplexer_test.py @@ -0,0 +1,45 @@ +import unittest + +from compose.cli.multiplexer import Multiplexer + + +class MultiplexerTest(unittest.TestCase): + def test_no_iterators(self): + mux = Multiplexer([]) + self.assertEqual([], list(mux.loop())) + + def test_empty_iterators(self): + mux = Multiplexer([ + (x for x in []), + (x for x in []), + ]) + + self.assertEqual([], list(mux.loop())) + + def test_aggregates_output(self): + mux = Multiplexer([ + (x for x in [0, 2, 4]), + (x for x in [1, 3, 5]), + ]) + + self.assertEqual( + [0, 1, 2, 3, 4, 5], + sorted(list(mux.loop())), + ) + + def test_exception(self): + class Problem(Exception): + pass + + def problematic_iterator(): + yield 0 + yield 2 + raise Problem(":(") + + mux = Multiplexer([ + problematic_iterator(), + (x for x in [1, 3, 5]), + ]) + + with self.assertRaises(Problem): + list(mux.loop())