From 65797558f8740fb2bab5333395e903264a4f1042 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Tue, 1 Mar 2016 17:44:25 -0500 Subject: [PATCH 1/9] Refactor log printing to support containers that are started later. Signed-off-by: Daniel Nephin --- compose/cli/log_printer.py | 174 ++++++++++++++++++++++------- compose/cli/multiplexer.py | 66 ----------- compose/project.py | 3 +- tests/unit/cli/log_printer_test.py | 39 +++++++ tests/unit/multiplexer_test.py | 61 ---------- tests/unit/project_test.py | 3 + 6 files changed, 176 insertions(+), 170 deletions(-) delete mode 100644 compose/cli/multiplexer.py delete mode 100644 tests/unit/multiplexer_test.py diff --git a/compose/cli/log_printer.py b/compose/cli/log_printer.py index 326676ba1..29a6159d9 100644 --- a/compose/cli/log_printer.py +++ b/compose/cli/log_printer.py @@ -3,66 +3,127 @@ from __future__ import unicode_literals import sys from itertools import cycle +from threading import Thread + +from six.moves import _thread as thread +from six.moves.queue import Empty +from six.moves.queue import Queue from . import colors -from .multiplexer import Multiplexer from compose import utils +from compose.cli.signals import ShutdownException from compose.utils import split_buffer +STOP = object() + + +class LogPresenter(object): + + def __init__(self, prefix_width, color_func): + self.prefix_width = prefix_width + self.color_func = color_func + + def present(self, container, line): + prefix = container.name_without_project.ljust(self.prefix_width) + return '{prefix} {line}'.format( + prefix=self.color_func(prefix + ' |'), + line=line) + + +def build_log_presenters(service_names, monochrome): + """Return an iterable of functions. + + Each function can be used to format the logs output of a container. + """ + prefix_width = max_name_width(service_names) + + def no_color(text): + return text + + for color_func in cycle([no_color] if monochrome else colors.rainbow()): + yield LogPresenter(prefix_width, color_func) + + +def max_name_width(service_names, max_index_width=3): + """Calculate the maximum width of container names so we can make the log + prefixes line up like so: + + db_1 | Listening + web_1 | Listening + """ + return max(len(name) for name in service_names) + max_index_width + + class LogPrinter(object): """Print logs from many containers to a single output stream.""" def __init__(self, containers, + presenters, + event_stream, output=sys.stdout, - monochrome=False, cascade_stop=False, log_args=None): - log_args = log_args or {} self.containers = containers + self.presenters = presenters + self.event_stream = event_stream self.output = utils.get_output_stream(output) - self.monochrome = monochrome self.cascade_stop = cascade_stop - self.log_args = log_args + self.log_args = log_args or {} def run(self): if not self.containers: return - prefix_width = max_name_width(self.containers) - generators = list(self._make_log_generators(self.monochrome, prefix_width)) - for line in Multiplexer(generators, cascade_stop=self.cascade_stop).loop(): + queue = Queue() + thread_args = queue, self.log_args + thread_map = build_thread_map(self.containers, self.presenters, thread_args) + start_producer_thread( + thread_map, + self.event_stream, + self.presenters, + thread_args) + + for line in consume_queue(queue, self.cascade_stop): self.output.write(line) self.output.flush() - def _make_log_generators(self, monochrome, prefix_width): - def no_color(text): - return text - - if monochrome: - color_funcs = cycle([no_color]) - else: - color_funcs = cycle(colors.rainbow()) - - for color_func, container in zip(color_funcs, self.containers): - generator_func = get_log_generator(container) - prefix = color_func(build_log_prefix(container, prefix_width)) - yield generator_func(container, prefix, color_func, self.log_args) + # TODO: this needs more logic + # TODO: does consume_queue need to yield Nones to get to this point? + if not thread_map: + return -def build_log_prefix(container, prefix_width): - return container.name_without_project.ljust(prefix_width) + ' | ' +def build_thread_map(initial_containers, presenters, thread_args): + def build_thread(container): + tailer = Thread( + target=tail_container_logs, + args=(container, presenters.next()) + thread_args) + tailer.daemon = True + tailer.start() + return tailer + + return { + container.id: build_thread(container) + for container in initial_containers + } -def max_name_width(containers): - """Calculate the maximum width of container names so we can make the log - prefixes line up like so: +def tail_container_logs(container, presenter, queue, log_args): + generator = get_log_generator(container) - db_1 | Listening - web_1 | Listening - """ - return max(len(container.name_without_project) for container in containers) + try: + for item in generator(container, log_args): + queue.put((item, None)) + + if log_args.get('follow'): + yield presenter.color_func(wait_on_exit(container)) + + queue.put((STOP, None)) + + except Exception as e: + queue.put((None, e)) def get_log_generator(container): @@ -71,32 +132,61 @@ def get_log_generator(container): return build_no_log_generator -def build_no_log_generator(container, prefix, color_func, log_args): +def build_no_log_generator(container, log_args): """Return a generator that prints a warning about logs and waits for container to exit. """ - yield "{} WARNING: no logs are available with the '{}' log driver\n".format( - prefix, + yield "WARNING: no logs are available with the '{}' log driver\n".format( container.log_driver) - if log_args.get('follow'): - yield color_func(wait_on_exit(container)) -def build_log_generator(container, prefix, color_func, log_args): +def build_log_generator(container, log_args): # if the container doesn't have a log_stream we need to attach to container # before log printer starts running if container.log_stream is None: stream = container.logs(stdout=True, stderr=True, stream=True, **log_args) - line_generator = split_buffer(stream) else: - line_generator = split_buffer(container.log_stream) + stream = container.log_stream - for line in line_generator: - yield prefix + line - if log_args.get('follow'): - yield color_func(wait_on_exit(container)) + return split_buffer(stream) def wait_on_exit(container): exit_code = container.wait() return "%s exited with code %s\n" % (container.name, exit_code) + + +def start_producer_thread(thread_map, event_stream, presenters, thread_args): + queue, log_args = thread_args + + def watch_events(): + for event in event_stream: + # TODO: handle start and stop events + pass + + producer = Thread(target=watch_events) + producer.daemon = True + producer.start() + + +def consume_queue(queue, cascade_stop): + """Consume the queue by reading lines off of it and yielding them.""" + while True: + try: + item, exception = queue.get(timeout=0.1) + except Empty: + pass + # See https://github.com/docker/compose/issues/189 + except thread.error: + raise ShutdownException() + + if exception: + raise exception + + if item is STOP: + if cascade_stop: + raise StopIteration + else: + continue + + yield item diff --git a/compose/cli/multiplexer.py b/compose/cli/multiplexer.py deleted file mode 100644 index ae8aa5916..000000000 --- a/compose/cli/multiplexer.py +++ /dev/null @@ -1,66 +0,0 @@ -from __future__ import absolute_import -from __future__ import unicode_literals - -from threading import Thread - -from six.moves import _thread as thread - -try: - from Queue import Queue, Empty -except ImportError: - from queue import Queue, Empty # Python 3.x - -from compose.cli.signals import ShutdownException - -STOP = object() - - -class Multiplexer(object): - """ - Create a single iterator from several iterators by running all of them in - parallel and yielding results as they come in. - """ - - def __init__(self, iterators, cascade_stop=False): - self.iterators = iterators - self.cascade_stop = cascade_stop - self._num_running = len(iterators) - self.queue = Queue() - - def loop(self): - self._init_readers() - - while self._num_running > 0: - try: - item, exception = self.queue.get(timeout=0.1) - - if exception: - raise exception - - if item is STOP: - if self.cascade_stop is True: - break - else: - self._num_running -= 1 - else: - yield item - except Empty: - pass - # See https://github.com/docker/compose/issues/189 - except thread.error: - raise ShutdownException() - - def _init_readers(self): - for iterator in self.iterators: - t = Thread(target=_enqueue_output, args=(iterator, self.queue)) - t.daemon = True - t.start() - - -def _enqueue_output(iterator, queue): - try: - for item in iterator: - queue.put((item, None)) - queue.put((STOP, None)) - except Exception as e: - queue.put((None, e)) diff --git a/compose/project.py b/compose/project.py index 3de68b2c6..1169f7dbe 100644 --- a/compose/project.py +++ b/compose/project.py @@ -309,7 +309,8 @@ class Project(object): 'attributes': { 'name': container.name, 'image': event['from'], - } + }, + 'container': container, } service_names = set(self.service_names) diff --git a/tests/unit/cli/log_printer_test.py b/tests/unit/cli/log_printer_test.py index 54fef0b23..81c694124 100644 --- a/tests/unit/cli/log_printer_test.py +++ b/tests/unit/cli/log_printer_test.py @@ -3,8 +3,11 @@ from __future__ import unicode_literals import pytest import six +from six.moves.queue import Queue +from compose.cli.log_printer import consume_queue from compose.cli.log_printer import LogPrinter +from compose.cli.log_printer import STOP from compose.cli.log_printer import wait_on_exit from compose.container import Container from tests import mock @@ -36,6 +39,7 @@ def mock_container(): return build_mock_container(reader) +@pytest.mark.skipif(True, reason="wip") class TestLogPrinter(object): def test_single_container(self, output_stream, mock_container): @@ -96,3 +100,38 @@ class TestLogPrinter(object): output = output_stream.getvalue() assert "WARNING: no logs are available with the 'none' log driver\n" in output assert "exited with code" not in output + + +class TestConsumeQueue(object): + + def test_item_is_an_exception(self): + + class Problem(Exception): + pass + + queue = Queue() + error = Problem('oops') + for item in ('a', None), ('b', None), (None, error): + queue.put(item) + + generator = consume_queue(queue, False) + assert generator.next() == 'a' + assert generator.next() == 'b' + with pytest.raises(Problem): + generator.next() + + def test_item_is_stop_without_cascade_stop(self): + queue = Queue() + for item in (STOP, None), ('a', None), ('b', None): + queue.put(item) + + generator = consume_queue(queue, False) + assert generator.next() == 'a' + assert generator.next() == 'b' + + def test_item_is_stop_with_cascade_stop(self): + queue = Queue() + for item in (STOP, None), ('a', None), ('b', None): + queue.put(item) + + assert list(consume_queue(queue, True)) == [] diff --git a/tests/unit/multiplexer_test.py b/tests/unit/multiplexer_test.py deleted file mode 100644 index 737ba25d6..000000000 --- a/tests/unit/multiplexer_test.py +++ /dev/null @@ -1,61 +0,0 @@ -from __future__ import absolute_import -from __future__ import unicode_literals - -import unittest -from time import sleep - -from compose.cli.multiplexer import Multiplexer - - -class MultiplexerTest(unittest.TestCase): - def test_no_iterators(self): - mux = Multiplexer([]) - self.assertEqual([], list(mux.loop())) - - def test_empty_iterators(self): - mux = Multiplexer([ - (x for x in []), - (x for x in []), - ]) - - self.assertEqual([], list(mux.loop())) - - def test_aggregates_output(self): - mux = Multiplexer([ - (x for x in [0, 2, 4]), - (x for x in [1, 3, 5]), - ]) - - self.assertEqual( - [0, 1, 2, 3, 4, 5], - sorted(list(mux.loop())), - ) - - def test_exception(self): - class Problem(Exception): - pass - - def problematic_iterator(): - yield 0 - yield 2 - raise Problem(":(") - - mux = Multiplexer([ - problematic_iterator(), - (x for x in [1, 3, 5]), - ]) - - with self.assertRaises(Problem): - list(mux.loop()) - - def test_cascade_stop(self): - def fast_stream(): - for num in range(3): - yield "stream1 %s" % num - - def slow_stream(): - sleep(5) - yield "stream2 FAIL" - - mux = Multiplexer([fast_stream(), slow_stream()], cascade_stop=True) - assert "stream2 FAIL" not in set(mux.loop()) diff --git a/tests/unit/project_test.py b/tests/unit/project_test.py index c28c21523..a815acdaa 100644 --- a/tests/unit/project_test.py +++ b/tests/unit/project_test.py @@ -307,6 +307,7 @@ class ProjectTest(unittest.TestCase): 'image': 'example/image', }, 'time': dt_with_microseconds(1420092061, 2), + 'container': Container(None, {'Id': 'abcde'}), }, { 'type': 'container', @@ -318,6 +319,7 @@ class ProjectTest(unittest.TestCase): 'image': 'example/image', }, 'time': dt_with_microseconds(1420092061, 3), + 'container': Container(None, {'Id': 'abcde'}), }, { 'type': 'container', @@ -329,6 +331,7 @@ class ProjectTest(unittest.TestCase): 'image': 'example/db', }, 'time': dt_with_microseconds(1420092061, 4), + 'container': Container(None, {'Id': 'ababa'}), }, ] From 44c1747127d320fe35b407aad775cb1a41fd77a4 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Wed, 2 Mar 2016 17:04:52 -0500 Subject: [PATCH 2/9] Add tests for reactive log printing. Signed-off-by: Daniel Nephin --- compose/cli/log_printer.py | 114 ++++++++++++++++--------- compose/cli/main.py | 56 ++++++++++--- compose/project.py | 1 + tests/acceptance/cli_test.py | 38 ++++++--- tests/unit/cli/log_printer_test.py | 128 +++++++++++++++-------------- tests/unit/cli/main_test.py | 14 ++-- 6 files changed, 218 insertions(+), 133 deletions(-) diff --git a/compose/cli/log_printer.py b/compose/cli/log_printer.py index 29a6159d9..fc36a6bca 100644 --- a/compose/cli/log_printer.py +++ b/compose/cli/log_printer.py @@ -2,6 +2,7 @@ from __future__ import absolute_import from __future__ import unicode_literals import sys +from collections import namedtuple from itertools import cycle from threading import Thread @@ -15,9 +16,6 @@ from compose.cli.signals import ShutdownException from compose.utils import split_buffer -STOP = object() - - class LogPresenter(object): def __init__(self, prefix_width, color_func): @@ -79,51 +77,74 @@ class LogPrinter(object): queue = Queue() thread_args = queue, self.log_args thread_map = build_thread_map(self.containers, self.presenters, thread_args) - start_producer_thread( + start_producer_thread(( thread_map, self.event_stream, self.presenters, - thread_args) + thread_args)) for line in consume_queue(queue, self.cascade_stop): + remove_stopped_threads(thread_map) + + if not line: + if not thread_map: + return + continue + self.output.write(line) self.output.flush() - # TODO: this needs more logic - # TODO: does consume_queue need to yield Nones to get to this point? - if not thread_map: - return + +def remove_stopped_threads(thread_map): + for container_id, tailer_thread in list(thread_map.items()): + if not tailer_thread.is_alive(): + thread_map.pop(container_id, None) + + +def build_thread(container, presenter, queue, log_args): + tailer = Thread( + target=tail_container_logs, + args=(container, presenter, queue, log_args)) + tailer.daemon = True + tailer.start() + return tailer def build_thread_map(initial_containers, presenters, thread_args): - def build_thread(container): - tailer = Thread( - target=tail_container_logs, - args=(container, presenters.next()) + thread_args) - tailer.daemon = True - tailer.start() - return tailer - return { - container.id: build_thread(container) + container.id: build_thread(container, presenters.next(), *thread_args) for container in initial_containers } +class QueueItem(namedtuple('_QueueItem', 'item is_stop exc')): + + @classmethod + def new(cls, item): + return cls(item, None, None) + + @classmethod + def exception(cls, exc): + return cls(None, None, exc) + + @classmethod + def stop(cls): + return cls(None, True, None) + + def tail_container_logs(container, presenter, queue, log_args): generator = get_log_generator(container) try: for item in generator(container, log_args): - queue.put((item, None)) - - if log_args.get('follow'): - yield presenter.color_func(wait_on_exit(container)) - - queue.put((STOP, None)) - + queue.put(QueueItem.new(presenter.present(container, item))) except Exception as e: - queue.put((None, e)) + queue.put(QueueItem.exception(e)) + return + + if log_args.get('follow'): + queue.put(QueueItem.new(presenter.color_func(wait_on_exit(container)))) + queue.put(QueueItem.stop()) def get_log_generator(container): @@ -156,37 +177,48 @@ def wait_on_exit(container): return "%s exited with code %s\n" % (container.name, exit_code) -def start_producer_thread(thread_map, event_stream, presenters, thread_args): - queue, log_args = thread_args - - def watch_events(): - for event in event_stream: - # TODO: handle start and stop events - pass - - producer = Thread(target=watch_events) +def start_producer_thread(thread_args): + producer = Thread(target=watch_events, args=thread_args) producer.daemon = True producer.start() +def watch_events(thread_map, event_stream, presenters, thread_args): + for event in event_stream: + if event['action'] != 'start': + continue + + if event['id'] in thread_map: + if thread_map[event['id']].is_alive(): + continue + # Container was stopped and started, we need a new thread + thread_map.pop(event['id'], None) + + thread_map[event['id']] = build_thread( + event['container'], + presenters.next(), + *thread_args) + + def consume_queue(queue, cascade_stop): """Consume the queue by reading lines off of it and yielding them.""" while True: try: - item, exception = queue.get(timeout=0.1) + item = queue.get(timeout=0.1) except Empty: - pass + yield None + continue # See https://github.com/docker/compose/issues/189 except thread.error: raise ShutdownException() - if exception: - raise exception + if item.exc: + raise item.exc - if item is STOP: + if item.is_stop: if cascade_stop: raise StopIteration else: continue - yield item + yield item.item diff --git a/compose/cli/main.py b/compose/cli/main.py index 663621682..da622bc17 100644 --- a/compose/cli/main.py +++ b/compose/cli/main.py @@ -35,6 +35,7 @@ from .docopt_command import NoSuchCommand from .errors import UserError from .formatter import ConsoleWarningFormatter from .formatter import Formatter +from .log_printer import build_log_presenters from .log_printer import LogPrinter from .utils import get_version_info from .utils import yesno @@ -277,6 +278,7 @@ class TopLevelCommand(object): def json_format_event(event): event['time'] = event['time'].isoformat() + event.pop('container') return json.dumps(event) for event in self.project.events(): @@ -374,7 +376,6 @@ class TopLevelCommand(object): """ containers = self.project.containers(service_names=options['SERVICE'], stopped=True) - monochrome = options['--no-color'] tail = options['--tail'] if tail is not None: if tail.isdigit(): @@ -387,7 +388,11 @@ class TopLevelCommand(object): 'timestamps': options['--timestamps'] } print("Attaching to", list_containers(containers)) - LogPrinter(containers, monochrome=monochrome, log_args=log_args).run() + log_printer_from_project( + project, + containers, + options['--no-color'], + log_args).run() def pause(self, options): """ @@ -693,7 +698,6 @@ class TopLevelCommand(object): when attached or when containers are already running. (default: 10) """ - monochrome = options['--no-color'] start_deps = not options['--no-deps'] cascade_stop = options['--abort-on-container-exit'] service_names = options['SERVICE'] @@ -704,7 +708,10 @@ class TopLevelCommand(object): raise UserError("--abort-on-container-exit and -d cannot be combined.") with up_shutdown_context(self.project, service_names, timeout, detached): - to_attach = self.project.up( + # start the event stream first so we don't lose any events + event_stream = project.events() + + to_attach = project.up( service_names=service_names, start_deps=start_deps, strategy=convergence_strategy_from_opts(options), @@ -714,8 +721,14 @@ class TopLevelCommand(object): if detached: return - log_args = {'follow': True} - log_printer = build_log_printer(to_attach, service_names, monochrome, cascade_stop, log_args) + + log_printer = log_printer_from_project( + project, + filter_containers_to_service_names(to_attach, service_names), + options['--no-color'], + {'follow': True}, + cascade_stop, + event_stream=event_stream) print("Attaching to", list_containers(log_printer.containers)) log_printer.run() @@ -827,13 +840,30 @@ def run_one_off_container(container_options, project, service, options): sys.exit(exit_code) -def build_log_printer(containers, service_names, monochrome, cascade_stop, log_args): - if service_names: - containers = [ - container - for container in containers if container.service in service_names - ] - return LogPrinter(containers, monochrome=monochrome, cascade_stop=cascade_stop, log_args=log_args) +def log_printer_from_project( + project, + containers, + monochrome, + log_args, + cascade_stop=False, + event_stream=None, +): + return LogPrinter( + containers, + build_log_presenters(project.service_names, monochrome), + event_stream or project.events(), + cascade_stop=cascade_stop, + log_args=log_args) + + +def filter_containers_to_service_names(containers, service_names): + if not service_names: + return containers + + return [ + container + for container in containers if container.service in service_names + ] @contextlib.contextmanager diff --git a/compose/project.py b/compose/project.py index 1169f7dbe..b40a9c38c 100644 --- a/compose/project.py +++ b/compose/project.py @@ -324,6 +324,7 @@ class Project(object): continue # TODO: get labels from the API v1.22 , see github issue 2618 + # TODO: this can fail if the conatiner is removed, wrap in try/except container = Container.from_id(self.client, event['id']) if container.service not in service_names: continue diff --git a/tests/acceptance/cli_test.py b/tests/acceptance/cli_test.py index 825b97bed..c2116553e 100644 --- a/tests/acceptance/cli_test.py +++ b/tests/acceptance/cli_test.py @@ -1188,7 +1188,7 @@ class CLITestCase(DockerClientTestCase): def test_logs_follow(self): self.base_dir = 'tests/fixtures/echo-services' - self.dispatch(['up', '-d'], None) + self.dispatch(['up', '-d']) result = self.dispatch(['logs', '-f']) @@ -1197,29 +1197,43 @@ class CLITestCase(DockerClientTestCase): assert 'another' in result.stdout assert 'exited with code 0' in result.stdout - def test_logs_unfollow(self): + def test_logs_follow_logs_from_new_containers(self): self.base_dir = 'tests/fixtures/logs-composefile' - self.dispatch(['up', '-d'], None) + self.dispatch(['up', '-d', 'simple']) + + proc = start_process(self.base_dir, ['logs', '-f']) + + self.dispatch(['up', '-d', 'another']) + wait_on_condition(ContainerStateCondition( + self.project.client, + 'logscomposefile_another_1', + running=False)) + + os.kill(proc.pid, signal.SIGINT) + result = wait_on_process(proc, returncode=1) + assert 'test' in result.stdout + + def test_logs_default(self): + self.base_dir = 'tests/fixtures/logs-composefile' + self.dispatch(['up', '-d']) result = self.dispatch(['logs']) - - assert result.stdout.count('\n') >= 1 - assert 'exited with code 0' not in result.stdout + assert 'hello' in result.stdout + assert 'test' in result.stdout + assert 'exited with' not in result.stdout def test_logs_timestamps(self): self.base_dir = 'tests/fixtures/echo-services' - self.dispatch(['up', '-d'], None) - - result = self.dispatch(['logs', '-f', '-t'], None) + self.dispatch(['up', '-d']) + result = self.dispatch(['logs', '-f', '-t']) self.assertRegexpMatches(result.stdout, '(\d{4})-(\d{2})-(\d{2})T(\d{2})\:(\d{2})\:(\d{2})') def test_logs_tail(self): self.base_dir = 'tests/fixtures/logs-tail-composefile' - self.dispatch(['up'], None) - - result = self.dispatch(['logs', '--tail', '2'], None) + self.dispatch(['up']) + result = self.dispatch(['logs', '--tail', '2']) assert result.stdout.count('\n') == 3 def test_kill(self): diff --git a/tests/unit/cli/log_printer_test.py b/tests/unit/cli/log_printer_test.py index 81c694124..7be1d3039 100644 --- a/tests/unit/cli/log_printer_test.py +++ b/tests/unit/cli/log_printer_test.py @@ -5,9 +5,11 @@ import pytest import six from six.moves.queue import Queue +from compose.cli.log_printer import build_log_generator +from compose.cli.log_printer import build_log_presenters +from compose.cli.log_printer import build_no_log_generator from compose.cli.log_printer import consume_queue -from compose.cli.log_printer import LogPrinter -from compose.cli.log_printer import STOP +from compose.cli.log_printer import QueueItem from compose.cli.log_printer import wait_on_exit from compose.container import Container from tests import mock @@ -34,72 +36,73 @@ def output_stream(): @pytest.fixture def mock_container(): - def reader(*args, **kwargs): - yield b"hello\nworld" - return build_mock_container(reader) + return mock.Mock(spec=Container, name_without_project='web_1') -@pytest.mark.skipif(True, reason="wip") -class TestLogPrinter(object): +class TestLogPresenter(object): - def test_single_container(self, output_stream, mock_container): - LogPrinter([mock_container], output=output_stream, log_args={'follow': True}).run() + def test_monochrome(self, mock_container): + presenters = build_log_presenters(['foo', 'bar'], True) + presenter = presenters.next() + actual = presenter.present(mock_container, "this line") + assert actual == "web_1 | this line" - output = output_stream.getvalue() - assert 'hello' in output - assert 'world' in output - # Call count is 2 lines + "container exited line" - assert output_stream.flush.call_count == 3 + def test_polychrome(self, mock_container): + presenters = build_log_presenters(['foo', 'bar'], False) + presenter = presenters.next() + actual = presenter.present(mock_container, "this line") + assert '\033[' in actual - def test_single_container_without_stream(self, output_stream, mock_container): - LogPrinter([mock_container], output=output_stream).run() - output = output_stream.getvalue() - assert 'hello' in output - assert 'world' in output - # Call count is 2 lines - assert output_stream.flush.call_count == 2 +def test_wait_on_exit(): + exit_status = 3 + mock_container = mock.Mock( + spec=Container, + name='cname', + wait=mock.Mock(return_value=exit_status)) - def test_monochrome(self, output_stream, mock_container): - LogPrinter([mock_container], output=output_stream, monochrome=True).run() - assert '\033[' not in output_stream.getvalue() + expected = '{} exited with code {}\n'.format(mock_container.name, exit_status) + assert expected == wait_on_exit(mock_container) - def test_polychrome(self, output_stream, mock_container): - LogPrinter([mock_container], output=output_stream).run() - assert '\033[' in output_stream.getvalue() + +def test_build_no_log_generator(mock_container): + mock_container.has_api_logs = False + mock_container.log_driver = 'none' + output, = build_no_log_generator(mock_container, None) + assert "WARNING: no logs are available with the 'none' log driver\n" in output + assert "exited with code" not in output + + +class TestBuildLogGenerator(object): + + def test_no_log_stream(self, mock_container): + mock_container.log_stream = None + mock_container.logs.return_value = iter([b"hello\nworld"]) + log_args = {'follow': True} + + generator = build_log_generator(mock_container, log_args) + assert generator.next() == "hello\n" + assert generator.next() == "world" + mock_container.logs.assert_called_once_with( + stdout=True, + stderr=True, + stream=True, + **log_args) + + def test_with_log_stream(self, mock_container): + mock_container.log_stream = iter([b"hello\nworld"]) + log_args = {'follow': True} + + generator = build_log_generator(mock_container, log_args) + assert generator.next() == "hello\n" + assert generator.next() == "world" def test_unicode(self, output_stream): - glyph = u'\u2022' + glyph = u'\u2022\n' + mock_container.log_stream = iter([glyph.encode('utf-8')]) - def reader(*args, **kwargs): - yield glyph.encode('utf-8') + b'\n' - - container = build_mock_container(reader) - LogPrinter([container], output=output_stream).run() - output = output_stream.getvalue() - if six.PY2: - output = output.decode('utf-8') - - assert glyph in output - - def test_wait_on_exit(self): - exit_status = 3 - mock_container = mock.Mock( - spec=Container, - name='cname', - wait=mock.Mock(return_value=exit_status)) - - expected = '{} exited with code {}\n'.format(mock_container.name, exit_status) - assert expected == wait_on_exit(mock_container) - - def test_generator_with_no_logs(self, mock_container, output_stream): - mock_container.has_api_logs = False - mock_container.log_driver = 'none' - LogPrinter([mock_container], output=output_stream).run() - - output = output_stream.getvalue() - assert "WARNING: no logs are available with the 'none' log driver\n" in output - assert "exited with code" not in output + generator = build_log_generator(mock_container, {}) + assert generator.next() == glyph class TestConsumeQueue(object): @@ -111,7 +114,7 @@ class TestConsumeQueue(object): queue = Queue() error = Problem('oops') - for item in ('a', None), ('b', None), (None, error): + for item in QueueItem.new('a'), QueueItem.new('b'), QueueItem.exception(error): queue.put(item) generator = consume_queue(queue, False) @@ -122,7 +125,7 @@ class TestConsumeQueue(object): def test_item_is_stop_without_cascade_stop(self): queue = Queue() - for item in (STOP, None), ('a', None), ('b', None): + for item in QueueItem.stop(), QueueItem.new('a'), QueueItem.new('b'): queue.put(item) generator = consume_queue(queue, False) @@ -131,7 +134,12 @@ class TestConsumeQueue(object): def test_item_is_stop_with_cascade_stop(self): queue = Queue() - for item in (STOP, None), ('a', None), ('b', None): + for item in QueueItem.stop(), QueueItem.new('a'), QueueItem.new('b'): queue.put(item) assert list(consume_queue(queue, True)) == [] + + def test_item_is_none_when_timeout_is_hit(self): + queue = Queue() + generator = consume_queue(queue, False) + assert generator.next() is None diff --git a/tests/unit/cli/main_test.py b/tests/unit/cli/main_test.py index 9b24776f8..dc5278800 100644 --- a/tests/unit/cli/main_test.py +++ b/tests/unit/cli/main_test.py @@ -8,8 +8,8 @@ import pytest from compose import container from compose.cli.errors import UserError from compose.cli.formatter import ConsoleWarningFormatter -from compose.cli.main import build_log_printer from compose.cli.main import convergence_strategy_from_opts +from compose.cli.main import filter_containers_to_service_names from compose.cli.main import setup_console_handler from compose.service import ConvergenceStrategy from tests import mock @@ -32,7 +32,7 @@ def logging_handler(): class TestCLIMainTestCase(object): - def test_build_log_printer(self): + def test_filter_containers_to_service_names(self): containers = [ mock_container('web', 1), mock_container('web', 2), @@ -41,18 +41,18 @@ class TestCLIMainTestCase(object): mock_container('another', 1), ] service_names = ['web', 'db'] - log_printer = build_log_printer(containers, service_names, True, False, {'follow': True}) - assert log_printer.containers == containers[:3] + actual = filter_containers_to_service_names(containers, service_names) + assert actual == containers[:3] - def test_build_log_printer_all_services(self): + def test_filter_containers_to_service_names_all(self): containers = [ mock_container('web', 1), mock_container('db', 1), mock_container('other', 1), ] service_names = [] - log_printer = build_log_printer(containers, service_names, True, False, {'follow': True}) - assert log_printer.containers == containers + actual = filter_containers_to_service_names(containers, service_names) + assert actual == containers class TestSetupConsoleHandlerTestCase(object): From 4cad2a0c5f973c51675e26b67cb84bb1fa03b0f8 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Thu, 3 Mar 2016 18:53:47 -0500 Subject: [PATCH 3/9] Handle events for removed containers. Signed-off-by: Daniel Nephin --- compose/project.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/compose/project.py b/compose/project.py index b40a9c38c..9a2b46e1b 100644 --- a/compose/project.py +++ b/compose/project.py @@ -324,8 +324,11 @@ class Project(object): continue # TODO: get labels from the API v1.22 , see github issue 2618 - # TODO: this can fail if the conatiner is removed, wrap in try/except - container = Container.from_id(self.client, event['id']) + try: + # this can fail if the conatiner has been removed + container = Container.from_id(self.client, event['id']) + except APIError: + continue if container.service not in service_names: continue yield build_container_event(event, container) From 4312c93eae2594aafacb695be50480ac6b0341d5 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Thu, 3 Mar 2016 18:57:07 -0500 Subject: [PATCH 4/9] Add an acceptance test to show logs behaves properly for stopped containers. Signed-off-by: Daniel Nephin --- tests/acceptance/cli_test.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/acceptance/cli_test.py b/tests/acceptance/cli_test.py index c2116553e..d3d4b3c06 100644 --- a/tests/acceptance/cli_test.py +++ b/tests/acceptance/cli_test.py @@ -1222,6 +1222,15 @@ class CLITestCase(DockerClientTestCase): assert 'test' in result.stdout assert 'exited with' not in result.stdout + def test_logs_on_stopped_containers_exits(self): + self.base_dir = 'tests/fixtures/echo-services' + self.dispatch(['up']) + + result = self.dispatch(['logs']) + assert 'simple' in result.stdout + assert 'another' in result.stdout + assert 'exited with' not in result.stdout + def test_logs_timestamps(self): self.base_dir = 'tests/fixtures/echo-services' self.dispatch(['up', '-d']) From 48ed68eeaa371ee31b8aac7186681d86eb84015e Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 7 Mar 2016 14:56:14 -0500 Subject: [PATCH 5/9] Fix geneartors for python3. Signed-off-by: Daniel Nephin --- compose/cli/log_printer.py | 4 ++-- tests/unit/cli/log_printer_test.py | 26 +++++++++++++------------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/compose/cli/log_printer.py b/compose/cli/log_printer.py index fc36a6bca..22312c008 100644 --- a/compose/cli/log_printer.py +++ b/compose/cli/log_printer.py @@ -112,7 +112,7 @@ def build_thread(container, presenter, queue, log_args): def build_thread_map(initial_containers, presenters, thread_args): return { - container.id: build_thread(container, presenters.next(), *thread_args) + container.id: build_thread(container, next(presenters), *thread_args) for container in initial_containers } @@ -196,7 +196,7 @@ def watch_events(thread_map, event_stream, presenters, thread_args): thread_map[event['id']] = build_thread( event['container'], - presenters.next(), + next(presenters), *thread_args) diff --git a/tests/unit/cli/log_printer_test.py b/tests/unit/cli/log_printer_test.py index 7be1d3039..33b2f1669 100644 --- a/tests/unit/cli/log_printer_test.py +++ b/tests/unit/cli/log_printer_test.py @@ -43,13 +43,13 @@ class TestLogPresenter(object): def test_monochrome(self, mock_container): presenters = build_log_presenters(['foo', 'bar'], True) - presenter = presenters.next() + presenter = next(presenters) actual = presenter.present(mock_container, "this line") assert actual == "web_1 | this line" def test_polychrome(self, mock_container): presenters = build_log_presenters(['foo', 'bar'], False) - presenter = presenters.next() + presenter = next(presenters) actual = presenter.present(mock_container, "this line") assert '\033[' in actual @@ -81,8 +81,8 @@ class TestBuildLogGenerator(object): log_args = {'follow': True} generator = build_log_generator(mock_container, log_args) - assert generator.next() == "hello\n" - assert generator.next() == "world" + assert next(generator) == "hello\n" + assert next(generator) == "world" mock_container.logs.assert_called_once_with( stdout=True, stderr=True, @@ -94,15 +94,15 @@ class TestBuildLogGenerator(object): log_args = {'follow': True} generator = build_log_generator(mock_container, log_args) - assert generator.next() == "hello\n" - assert generator.next() == "world" + assert next(generator) == "hello\n" + assert next(generator) == "world" def test_unicode(self, output_stream): glyph = u'\u2022\n' mock_container.log_stream = iter([glyph.encode('utf-8')]) generator = build_log_generator(mock_container, {}) - assert generator.next() == glyph + assert next(generator) == glyph class TestConsumeQueue(object): @@ -118,10 +118,10 @@ class TestConsumeQueue(object): queue.put(item) generator = consume_queue(queue, False) - assert generator.next() == 'a' - assert generator.next() == 'b' + assert next(generator) == 'a' + assert next(generator) == 'b' with pytest.raises(Problem): - generator.next() + next(generator) def test_item_is_stop_without_cascade_stop(self): queue = Queue() @@ -129,8 +129,8 @@ class TestConsumeQueue(object): queue.put(item) generator = consume_queue(queue, False) - assert generator.next() == 'a' - assert generator.next() == 'b' + assert next(generator) == 'a' + assert next(generator) == 'b' def test_item_is_stop_with_cascade_stop(self): queue = Queue() @@ -142,4 +142,4 @@ class TestConsumeQueue(object): def test_item_is_none_when_timeout_is_hit(self): queue = Queue() generator = consume_queue(queue, False) - assert generator.next() is None + assert next(generator) is None From 3f7e5bf76895413048ed5af88279899261394b32 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 7 Mar 2016 15:04:42 -0500 Subject: [PATCH 6/9] Filter logs by service names. Signed-off-by: Daniel Nephin --- compose/cli/main.py | 2 +- compose/project.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/compose/cli/main.py b/compose/cli/main.py index da622bc17..468e10c4e 100644 --- a/compose/cli/main.py +++ b/compose/cli/main.py @@ -709,7 +709,7 @@ class TopLevelCommand(object): with up_shutdown_context(self.project, service_names, timeout, detached): # start the event stream first so we don't lose any events - event_stream = project.events() + event_stream = project.events(service_names=service_names) to_attach = project.up( service_names=service_names, diff --git a/compose/project.py b/compose/project.py index 9a2b46e1b..4e25e498c 100644 --- a/compose/project.py +++ b/compose/project.py @@ -295,7 +295,7 @@ class Project(object): detached=True, start=False) - def events(self): + def events(self, service_names=None): def build_container_event(event, container): time = datetime.datetime.fromtimestamp(event['time']) time = time.replace( @@ -313,7 +313,7 @@ class Project(object): 'container': container, } - service_names = set(self.service_names) + service_names = set(service_names or self.service_names) for event in self.client.events( filters={'label': self.labels()}, decode=True From 8d9adc0902bf7c4b056007d7e6fb6188f2193fdf Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 7 Mar 2016 15:08:31 -0500 Subject: [PATCH 7/9] Fix flaky log test by using container status, instead of boolean state. Signed-off-by: Daniel Nephin --- tests/acceptance/cli_test.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/tests/acceptance/cli_test.py b/tests/acceptance/cli_test.py index d3d4b3c06..095fb3f17 100644 --- a/tests/acceptance/cli_test.py +++ b/tests/acceptance/cli_test.py @@ -78,21 +78,20 @@ class ContainerCountCondition(object): class ContainerStateCondition(object): - def __init__(self, client, name, running): + def __init__(self, client, name, status): self.client = client self.name = name - self.running = running + self.status = status def __call__(self): try: container = self.client.inspect_container(self.name) - return container['State']['Running'] == self.running + return container['State']['Status'] == self.status except errors.APIError: return False def __str__(self): - state = 'running' if self.running else 'stopped' - return "waiting for container to be %s" % state + return "waiting for container to be %s" % self.status class CLITestCase(DockerClientTestCase): @@ -1073,26 +1072,26 @@ class CLITestCase(DockerClientTestCase): wait_on_condition(ContainerStateCondition( self.project.client, 'simplecomposefile_simple_run_1', - running=True)) + 'running')) os.kill(proc.pid, signal.SIGINT) wait_on_condition(ContainerStateCondition( self.project.client, 'simplecomposefile_simple_run_1', - running=False)) + 'exited')) def test_run_handles_sigterm(self): proc = start_process(self.base_dir, ['run', '-T', 'simple', 'top']) wait_on_condition(ContainerStateCondition( self.project.client, 'simplecomposefile_simple_run_1', - running=True)) + 'running')) os.kill(proc.pid, signal.SIGTERM) wait_on_condition(ContainerStateCondition( self.project.client, 'simplecomposefile_simple_run_1', - running=False)) + 'exited')) def test_rm(self): service = self.project.get_service('simple') @@ -1207,7 +1206,7 @@ class CLITestCase(DockerClientTestCase): wait_on_condition(ContainerStateCondition( self.project.client, 'logscomposefile_another_1', - running=False)) + 'exited')) os.kill(proc.pid, signal.SIGINT) result = wait_on_process(proc, returncode=1) From e8a93821d43753f19f0511ae8903fe05dac534d5 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 7 Mar 2016 15:34:53 -0500 Subject: [PATCH 8/9] Fix race condition where a container stopping and starting again would cause logs to miss logs. Signed-off-by: Daniel Nephin --- compose/cli/log_printer.py | 3 ++ tests/unit/cli/log_printer_test.py | 56 +++++++++++++++++++++++------- 2 files changed, 47 insertions(+), 12 deletions(-) diff --git a/compose/cli/log_printer.py b/compose/cli/log_printer.py index 22312c008..367a534eb 100644 --- a/compose/cli/log_printer.py +++ b/compose/cli/log_printer.py @@ -185,6 +185,9 @@ def start_producer_thread(thread_args): def watch_events(thread_map, event_stream, presenters, thread_args): for event in event_stream: + if event['action'] == 'stop': + thread_map.pop(event['id'], None) + if event['action'] != 'start': continue diff --git a/tests/unit/cli/log_printer_test.py b/tests/unit/cli/log_printer_test.py index 33b2f1669..ab48eefc0 100644 --- a/tests/unit/cli/log_printer_test.py +++ b/tests/unit/cli/log_printer_test.py @@ -1,6 +1,8 @@ from __future__ import absolute_import from __future__ import unicode_literals +import itertools + import pytest import six from six.moves.queue import Queue @@ -11,22 +13,11 @@ from compose.cli.log_printer import build_no_log_generator from compose.cli.log_printer import consume_queue from compose.cli.log_printer import QueueItem from compose.cli.log_printer import wait_on_exit +from compose.cli.log_printer import watch_events from compose.container import Container from tests import mock -def build_mock_container(reader): - return mock.Mock( - spec=Container, - name='myapp_web_1', - name_without_project='web_1', - has_api_logs=True, - log_stream=None, - logs=reader, - wait=mock.Mock(return_value=0), - ) - - @pytest.fixture def output_stream(): output = six.StringIO() @@ -105,6 +96,47 @@ class TestBuildLogGenerator(object): assert next(generator) == glyph +@pytest.fixture +def thread_map(): + return {'cid': mock.Mock()} + + +@pytest.fixture +def mock_presenters(): + return itertools.cycle([mock.Mock()]) + + +class TestWatchEvents(object): + + def test_stop_event(self, thread_map, mock_presenters): + event_stream = [{'action': 'stop', 'id': 'cid'}] + watch_events(thread_map, event_stream, mock_presenters, ()) + assert not thread_map + + def test_start_event(self, thread_map, mock_presenters): + container_id = 'abcd' + event = {'action': 'start', 'id': container_id, 'container': mock.Mock()} + event_stream = [event] + thread_args = 'foo', 'bar' + + with mock.patch( + 'compose.cli.log_printer.build_thread', + autospec=True + ) as mock_build_thread: + watch_events(thread_map, event_stream, mock_presenters, thread_args) + mock_build_thread.assert_called_once_with( + event['container'], + next(mock_presenters), + *thread_args) + assert container_id in thread_map + + def test_other_event(self, thread_map, mock_presenters): + container_id = 'abcd' + event_stream = [{'action': 'create', 'id': container_id}] + watch_events(thread_map, event_stream, mock_presenters, ()) + assert container_id not in thread_map + + class TestConsumeQueue(object): def test_item_is_an_exception(self): From bf96edfe11789d4ce13b869be578cc274794cdfc Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 7 Mar 2016 15:58:25 -0500 Subject: [PATCH 9/9] Reduce the args of some functions by including presenters as part of the thread_args. Signed-off-by: Daniel Nephin --- compose/cli/log_printer.py | 3 +++ compose/cli/main.py | 11 ++++------- tests/acceptance/cli_test.py | 4 ++-- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/compose/cli/log_printer.py b/compose/cli/log_printer.py index 367a534eb..b48462ff5 100644 --- a/compose/cli/log_printer.py +++ b/compose/cli/log_printer.py @@ -88,7 +88,10 @@ class LogPrinter(object): if not line: if not thread_map: + # There are no running containers left to tail, so exit return + # We got an empty line because of a timeout, but there are still + # active containers to tail, so continue continue self.output.write(line) diff --git a/compose/cli/main.py b/compose/cli/main.py index 468e10c4e..52b4a03bb 100644 --- a/compose/cli/main.py +++ b/compose/cli/main.py @@ -389,7 +389,7 @@ class TopLevelCommand(object): } print("Attaching to", list_containers(containers)) log_printer_from_project( - project, + self.project, containers, options['--no-color'], log_args).run() @@ -708,10 +708,7 @@ class TopLevelCommand(object): raise UserError("--abort-on-container-exit and -d cannot be combined.") with up_shutdown_context(self.project, service_names, timeout, detached): - # start the event stream first so we don't lose any events - event_stream = project.events(service_names=service_names) - - to_attach = project.up( + to_attach = self.project.up( service_names=service_names, start_deps=start_deps, strategy=convergence_strategy_from_opts(options), @@ -723,12 +720,12 @@ class TopLevelCommand(object): return log_printer = log_printer_from_project( - project, + self.project, filter_containers_to_service_names(to_attach, service_names), options['--no-color'], {'follow': True}, cascade_stop, - event_stream=event_stream) + event_stream=self.project.events(service_names=service_names)) print("Attaching to", list_containers(log_printer.containers)) log_printer.run() diff --git a/tests/acceptance/cli_test.py b/tests/acceptance/cli_test.py index 095fb3f17..ab74f14e6 100644 --- a/tests/acceptance/cli_test.py +++ b/tests/acceptance/cli_test.py @@ -396,8 +396,8 @@ class CLITestCase(DockerClientTestCase): self.base_dir = 'tests/fixtures/echo-services' result = self.dispatch(['up', '--no-color']) - assert 'simple_1 | simple' in result.stdout - assert 'another_1 | another' in result.stdout + assert 'simple_1 | simple' in result.stdout + assert 'another_1 | another' in result.stdout assert 'simple_1 exited with code 0' in result.stdout assert 'another_1 exited with code 0' in result.stdout