compose/tests/unit/multiplexer_test.py

62 lines
1.4 KiB
Python

from __future__ import absolute_import
from __future__ import unicode_literals
import unittest
from time import sleep
from compose.cli.multiplexer import Multiplexer
class MultiplexerTest(unittest.TestCase):
def test_no_iterators(self):
mux = Multiplexer([])
self.assertEqual([], list(mux.loop()))
def test_empty_iterators(self):
mux = Multiplexer([
(x for x in []),
(x for x in []),
])
self.assertEqual([], list(mux.loop()))
def test_aggregates_output(self):
mux = Multiplexer([
(x for x in [0, 2, 4]),
(x for x in [1, 3, 5]),
])
self.assertEqual(
[0, 1, 2, 3, 4, 5],
sorted(list(mux.loop())),
)
def test_exception(self):
class Problem(Exception):
pass
def problematic_iterator():
yield 0
yield 2
raise Problem(":(")
mux = Multiplexer([
problematic_iterator(),
(x for x in [1, 3, 5]),
])
with self.assertRaises(Problem):
list(mux.loop())
def test_cascade_stop(self):
def fast_stream():
for num in range(3):
yield "stream1 %s" % num
def slow_stream():
sleep(5)
yield "stream2 FAIL"
mux = Multiplexer([fast_stream(), slow_stream()], cascade_stop=True)
assert "stream2 FAIL" not in set(mux.loop())