Fix flaky multiplex test.

Signed-off-by: Daniel Nephin <dnephin@docker.com>
This commit is contained in:
Daniel Nephin 2016-01-13 20:51:39 -05:00
parent 153185eadb
commit 6877c6ca06
1 changed files with 9 additions and 14 deletions

View File

@ -49,18 +49,13 @@ class MultiplexerTest(unittest.TestCase):
list(mux.loop())
def test_cascade_stop(self):
mux = Multiplexer([
((lambda x: sleep(0.01) or x)(x) for x in ['after 0.01 sec T1',
'after 0.02 sec T1',
'after 0.03 sec T1']),
((lambda x: sleep(0.02) or x)(x) for x in ['after 0.02 sec T2',
'after 0.04 sec T2',
'after 0.06 sec T2']),
], cascade_stop=True)
def fast_stream():
for num in range(3):
yield "stream1 %s" % num
self.assertEqual(
['after 0.01 sec T1',
'after 0.02 sec T1',
'after 0.02 sec T2',
'after 0.03 sec T1'],
sorted(list(mux.loop())))
def slow_stream():
sleep(5)
yield "stream2 FAIL"
mux = Multiplexer([fast_stream(), slow_stream()], cascade_stop=True)
assert "stream2 FAIL" not in set(mux.loop())