Add tests for reactive log printing.

Signed-off-by: Daniel Nephin <dnephin@docker.com>
This commit is contained in:
Daniel Nephin 2016-03-02 17:04:52 -05:00
parent 65797558f8
commit 44c1747127
6 changed files with 218 additions and 133 deletions

View File

@ -2,6 +2,7 @@ from __future__ import absolute_import
from __future__ import unicode_literals from __future__ import unicode_literals
import sys import sys
from collections import namedtuple
from itertools import cycle from itertools import cycle
from threading import Thread from threading import Thread
@ -15,9 +16,6 @@ from compose.cli.signals import ShutdownException
from compose.utils import split_buffer from compose.utils import split_buffer
STOP = object()
class LogPresenter(object): class LogPresenter(object):
def __init__(self, prefix_width, color_func): def __init__(self, prefix_width, color_func):
@ -79,51 +77,74 @@ class LogPrinter(object):
queue = Queue() queue = Queue()
thread_args = queue, self.log_args thread_args = queue, self.log_args
thread_map = build_thread_map(self.containers, self.presenters, thread_args) thread_map = build_thread_map(self.containers, self.presenters, thread_args)
start_producer_thread( start_producer_thread((
thread_map, thread_map,
self.event_stream, self.event_stream,
self.presenters, self.presenters,
thread_args) thread_args))
for line in consume_queue(queue, self.cascade_stop): for line in consume_queue(queue, self.cascade_stop):
remove_stopped_threads(thread_map)
if not line:
if not thread_map:
return
continue
self.output.write(line) self.output.write(line)
self.output.flush() self.output.flush()
# TODO: this needs more logic
# TODO: does consume_queue need to yield Nones to get to this point? def remove_stopped_threads(thread_map):
if not thread_map: for container_id, tailer_thread in list(thread_map.items()):
return if not tailer_thread.is_alive():
thread_map.pop(container_id, None)
def build_thread_map(initial_containers, presenters, thread_args): def build_thread(container, presenter, queue, log_args):
def build_thread(container):
tailer = Thread( tailer = Thread(
target=tail_container_logs, target=tail_container_logs,
args=(container, presenters.next()) + thread_args) args=(container, presenter, queue, log_args))
tailer.daemon = True tailer.daemon = True
tailer.start() tailer.start()
return tailer return tailer
def build_thread_map(initial_containers, presenters, thread_args):
return { return {
container.id: build_thread(container) container.id: build_thread(container, presenters.next(), *thread_args)
for container in initial_containers for container in initial_containers
} }
class QueueItem(namedtuple('_QueueItem', 'item is_stop exc')):
@classmethod
def new(cls, item):
return cls(item, None, None)
@classmethod
def exception(cls, exc):
return cls(None, None, exc)
@classmethod
def stop(cls):
return cls(None, True, None)
def tail_container_logs(container, presenter, queue, log_args): def tail_container_logs(container, presenter, queue, log_args):
generator = get_log_generator(container) generator = get_log_generator(container)
try: try:
for item in generator(container, log_args): for item in generator(container, log_args):
queue.put((item, None)) queue.put(QueueItem.new(presenter.present(container, item)))
except Exception as e:
queue.put(QueueItem.exception(e))
return
if log_args.get('follow'): if log_args.get('follow'):
yield presenter.color_func(wait_on_exit(container)) queue.put(QueueItem.new(presenter.color_func(wait_on_exit(container))))
queue.put(QueueItem.stop())
queue.put((STOP, None))
except Exception as e:
queue.put((None, e))
def get_log_generator(container): def get_log_generator(container):
@ -156,37 +177,48 @@ def wait_on_exit(container):
return "%s exited with code %s\n" % (container.name, exit_code) return "%s exited with code %s\n" % (container.name, exit_code)
def start_producer_thread(thread_map, event_stream, presenters, thread_args): def start_producer_thread(thread_args):
queue, log_args = thread_args producer = Thread(target=watch_events, args=thread_args)
def watch_events():
for event in event_stream:
# TODO: handle start and stop events
pass
producer = Thread(target=watch_events)
producer.daemon = True producer.daemon = True
producer.start() producer.start()
def watch_events(thread_map, event_stream, presenters, thread_args):
for event in event_stream:
if event['action'] != 'start':
continue
if event['id'] in thread_map:
if thread_map[event['id']].is_alive():
continue
# Container was stopped and started, we need a new thread
thread_map.pop(event['id'], None)
thread_map[event['id']] = build_thread(
event['container'],
presenters.next(),
*thread_args)
def consume_queue(queue, cascade_stop): def consume_queue(queue, cascade_stop):
"""Consume the queue by reading lines off of it and yielding them.""" """Consume the queue by reading lines off of it and yielding them."""
while True: while True:
try: try:
item, exception = queue.get(timeout=0.1) item = queue.get(timeout=0.1)
except Empty: except Empty:
pass yield None
continue
# See https://github.com/docker/compose/issues/189 # See https://github.com/docker/compose/issues/189
except thread.error: except thread.error:
raise ShutdownException() raise ShutdownException()
if exception: if item.exc:
raise exception raise item.exc
if item is STOP: if item.is_stop:
if cascade_stop: if cascade_stop:
raise StopIteration raise StopIteration
else: else:
continue continue
yield item yield item.item

View File

@ -35,6 +35,7 @@ from .docopt_command import NoSuchCommand
from .errors import UserError from .errors import UserError
from .formatter import ConsoleWarningFormatter from .formatter import ConsoleWarningFormatter
from .formatter import Formatter from .formatter import Formatter
from .log_printer import build_log_presenters
from .log_printer import LogPrinter from .log_printer import LogPrinter
from .utils import get_version_info from .utils import get_version_info
from .utils import yesno from .utils import yesno
@ -277,6 +278,7 @@ class TopLevelCommand(object):
def json_format_event(event): def json_format_event(event):
event['time'] = event['time'].isoformat() event['time'] = event['time'].isoformat()
event.pop('container')
return json.dumps(event) return json.dumps(event)
for event in self.project.events(): for event in self.project.events():
@ -374,7 +376,6 @@ class TopLevelCommand(object):
""" """
containers = self.project.containers(service_names=options['SERVICE'], stopped=True) containers = self.project.containers(service_names=options['SERVICE'], stopped=True)
monochrome = options['--no-color']
tail = options['--tail'] tail = options['--tail']
if tail is not None: if tail is not None:
if tail.isdigit(): if tail.isdigit():
@ -387,7 +388,11 @@ class TopLevelCommand(object):
'timestamps': options['--timestamps'] 'timestamps': options['--timestamps']
} }
print("Attaching to", list_containers(containers)) print("Attaching to", list_containers(containers))
LogPrinter(containers, monochrome=monochrome, log_args=log_args).run() log_printer_from_project(
project,
containers,
options['--no-color'],
log_args).run()
def pause(self, options): def pause(self, options):
""" """
@ -693,7 +698,6 @@ class TopLevelCommand(object):
when attached or when containers are already when attached or when containers are already
running. (default: 10) running. (default: 10)
""" """
monochrome = options['--no-color']
start_deps = not options['--no-deps'] start_deps = not options['--no-deps']
cascade_stop = options['--abort-on-container-exit'] cascade_stop = options['--abort-on-container-exit']
service_names = options['SERVICE'] service_names = options['SERVICE']
@ -704,7 +708,10 @@ class TopLevelCommand(object):
raise UserError("--abort-on-container-exit and -d cannot be combined.") raise UserError("--abort-on-container-exit and -d cannot be combined.")
with up_shutdown_context(self.project, service_names, timeout, detached): with up_shutdown_context(self.project, service_names, timeout, detached):
to_attach = self.project.up( # start the event stream first so we don't lose any events
event_stream = project.events()
to_attach = project.up(
service_names=service_names, service_names=service_names,
start_deps=start_deps, start_deps=start_deps,
strategy=convergence_strategy_from_opts(options), strategy=convergence_strategy_from_opts(options),
@ -714,8 +721,14 @@ class TopLevelCommand(object):
if detached: if detached:
return return
log_args = {'follow': True}
log_printer = build_log_printer(to_attach, service_names, monochrome, cascade_stop, log_args) log_printer = log_printer_from_project(
project,
filter_containers_to_service_names(to_attach, service_names),
options['--no-color'],
{'follow': True},
cascade_stop,
event_stream=event_stream)
print("Attaching to", list_containers(log_printer.containers)) print("Attaching to", list_containers(log_printer.containers))
log_printer.run() log_printer.run()
@ -827,13 +840,30 @@ def run_one_off_container(container_options, project, service, options):
sys.exit(exit_code) sys.exit(exit_code)
def build_log_printer(containers, service_names, monochrome, cascade_stop, log_args): def log_printer_from_project(
if service_names: project,
containers = [ containers,
monochrome,
log_args,
cascade_stop=False,
event_stream=None,
):
return LogPrinter(
containers,
build_log_presenters(project.service_names, monochrome),
event_stream or project.events(),
cascade_stop=cascade_stop,
log_args=log_args)
def filter_containers_to_service_names(containers, service_names):
if not service_names:
return containers
return [
container container
for container in containers if container.service in service_names for container in containers if container.service in service_names
] ]
return LogPrinter(containers, monochrome=monochrome, cascade_stop=cascade_stop, log_args=log_args)
@contextlib.contextmanager @contextlib.contextmanager

View File

@ -324,6 +324,7 @@ class Project(object):
continue continue
# TODO: get labels from the API v1.22 , see github issue 2618 # TODO: get labels from the API v1.22 , see github issue 2618
# TODO: this can fail if the conatiner is removed, wrap in try/except
container = Container.from_id(self.client, event['id']) container = Container.from_id(self.client, event['id'])
if container.service not in service_names: if container.service not in service_names:
continue continue

View File

@ -1188,7 +1188,7 @@ class CLITestCase(DockerClientTestCase):
def test_logs_follow(self): def test_logs_follow(self):
self.base_dir = 'tests/fixtures/echo-services' self.base_dir = 'tests/fixtures/echo-services'
self.dispatch(['up', '-d'], None) self.dispatch(['up', '-d'])
result = self.dispatch(['logs', '-f']) result = self.dispatch(['logs', '-f'])
@ -1197,29 +1197,43 @@ class CLITestCase(DockerClientTestCase):
assert 'another' in result.stdout assert 'another' in result.stdout
assert 'exited with code 0' in result.stdout assert 'exited with code 0' in result.stdout
def test_logs_unfollow(self): def test_logs_follow_logs_from_new_containers(self):
self.base_dir = 'tests/fixtures/logs-composefile' self.base_dir = 'tests/fixtures/logs-composefile'
self.dispatch(['up', '-d'], None) self.dispatch(['up', '-d', 'simple'])
proc = start_process(self.base_dir, ['logs', '-f'])
self.dispatch(['up', '-d', 'another'])
wait_on_condition(ContainerStateCondition(
self.project.client,
'logscomposefile_another_1',
running=False))
os.kill(proc.pid, signal.SIGINT)
result = wait_on_process(proc, returncode=1)
assert 'test' in result.stdout
def test_logs_default(self):
self.base_dir = 'tests/fixtures/logs-composefile'
self.dispatch(['up', '-d'])
result = self.dispatch(['logs']) result = self.dispatch(['logs'])
assert 'hello' in result.stdout
assert result.stdout.count('\n') >= 1 assert 'test' in result.stdout
assert 'exited with code 0' not in result.stdout assert 'exited with' not in result.stdout
def test_logs_timestamps(self): def test_logs_timestamps(self):
self.base_dir = 'tests/fixtures/echo-services' self.base_dir = 'tests/fixtures/echo-services'
self.dispatch(['up', '-d'], None) self.dispatch(['up', '-d'])
result = self.dispatch(['logs', '-f', '-t'], None)
result = self.dispatch(['logs', '-f', '-t'])
self.assertRegexpMatches(result.stdout, '(\d{4})-(\d{2})-(\d{2})T(\d{2})\:(\d{2})\:(\d{2})') self.assertRegexpMatches(result.stdout, '(\d{4})-(\d{2})-(\d{2})T(\d{2})\:(\d{2})\:(\d{2})')
def test_logs_tail(self): def test_logs_tail(self):
self.base_dir = 'tests/fixtures/logs-tail-composefile' self.base_dir = 'tests/fixtures/logs-tail-composefile'
self.dispatch(['up'], None) self.dispatch(['up'])
result = self.dispatch(['logs', '--tail', '2'], None)
result = self.dispatch(['logs', '--tail', '2'])
assert result.stdout.count('\n') == 3 assert result.stdout.count('\n') == 3
def test_kill(self): def test_kill(self):

View File

@ -5,9 +5,11 @@ import pytest
import six import six
from six.moves.queue import Queue from six.moves.queue import Queue
from compose.cli.log_printer import build_log_generator
from compose.cli.log_printer import build_log_presenters
from compose.cli.log_printer import build_no_log_generator
from compose.cli.log_printer import consume_queue from compose.cli.log_printer import consume_queue
from compose.cli.log_printer import LogPrinter from compose.cli.log_printer import QueueItem
from compose.cli.log_printer import STOP
from compose.cli.log_printer import wait_on_exit from compose.cli.log_printer import wait_on_exit
from compose.container import Container from compose.container import Container
from tests import mock from tests import mock
@ -34,55 +36,25 @@ def output_stream():
@pytest.fixture @pytest.fixture
def mock_container(): def mock_container():
def reader(*args, **kwargs): return mock.Mock(spec=Container, name_without_project='web_1')
yield b"hello\nworld"
return build_mock_container(reader)
@pytest.mark.skipif(True, reason="wip") class TestLogPresenter(object):
class TestLogPrinter(object):
def test_single_container(self, output_stream, mock_container): def test_monochrome(self, mock_container):
LogPrinter([mock_container], output=output_stream, log_args={'follow': True}).run() presenters = build_log_presenters(['foo', 'bar'], True)
presenter = presenters.next()
actual = presenter.present(mock_container, "this line")
assert actual == "web_1 | this line"
output = output_stream.getvalue() def test_polychrome(self, mock_container):
assert 'hello' in output presenters = build_log_presenters(['foo', 'bar'], False)
assert 'world' in output presenter = presenters.next()
# Call count is 2 lines + "container exited line" actual = presenter.present(mock_container, "this line")
assert output_stream.flush.call_count == 3 assert '\033[' in actual
def test_single_container_without_stream(self, output_stream, mock_container):
LogPrinter([mock_container], output=output_stream).run()
output = output_stream.getvalue() def test_wait_on_exit():
assert 'hello' in output
assert 'world' in output
# Call count is 2 lines
assert output_stream.flush.call_count == 2
def test_monochrome(self, output_stream, mock_container):
LogPrinter([mock_container], output=output_stream, monochrome=True).run()
assert '\033[' not in output_stream.getvalue()
def test_polychrome(self, output_stream, mock_container):
LogPrinter([mock_container], output=output_stream).run()
assert '\033[' in output_stream.getvalue()
def test_unicode(self, output_stream):
glyph = u'\u2022'
def reader(*args, **kwargs):
yield glyph.encode('utf-8') + b'\n'
container = build_mock_container(reader)
LogPrinter([container], output=output_stream).run()
output = output_stream.getvalue()
if six.PY2:
output = output.decode('utf-8')
assert glyph in output
def test_wait_on_exit(self):
exit_status = 3 exit_status = 3
mock_container = mock.Mock( mock_container = mock.Mock(
spec=Container, spec=Container,
@ -92,16 +64,47 @@ class TestLogPrinter(object):
expected = '{} exited with code {}\n'.format(mock_container.name, exit_status) expected = '{} exited with code {}\n'.format(mock_container.name, exit_status)
assert expected == wait_on_exit(mock_container) assert expected == wait_on_exit(mock_container)
def test_generator_with_no_logs(self, mock_container, output_stream):
def test_build_no_log_generator(mock_container):
mock_container.has_api_logs = False mock_container.has_api_logs = False
mock_container.log_driver = 'none' mock_container.log_driver = 'none'
LogPrinter([mock_container], output=output_stream).run() output, = build_no_log_generator(mock_container, None)
output = output_stream.getvalue()
assert "WARNING: no logs are available with the 'none' log driver\n" in output assert "WARNING: no logs are available with the 'none' log driver\n" in output
assert "exited with code" not in output assert "exited with code" not in output
class TestBuildLogGenerator(object):
def test_no_log_stream(self, mock_container):
mock_container.log_stream = None
mock_container.logs.return_value = iter([b"hello\nworld"])
log_args = {'follow': True}
generator = build_log_generator(mock_container, log_args)
assert generator.next() == "hello\n"
assert generator.next() == "world"
mock_container.logs.assert_called_once_with(
stdout=True,
stderr=True,
stream=True,
**log_args)
def test_with_log_stream(self, mock_container):
mock_container.log_stream = iter([b"hello\nworld"])
log_args = {'follow': True}
generator = build_log_generator(mock_container, log_args)
assert generator.next() == "hello\n"
assert generator.next() == "world"
def test_unicode(self, output_stream):
glyph = u'\u2022\n'
mock_container.log_stream = iter([glyph.encode('utf-8')])
generator = build_log_generator(mock_container, {})
assert generator.next() == glyph
class TestConsumeQueue(object): class TestConsumeQueue(object):
def test_item_is_an_exception(self): def test_item_is_an_exception(self):
@ -111,7 +114,7 @@ class TestConsumeQueue(object):
queue = Queue() queue = Queue()
error = Problem('oops') error = Problem('oops')
for item in ('a', None), ('b', None), (None, error): for item in QueueItem.new('a'), QueueItem.new('b'), QueueItem.exception(error):
queue.put(item) queue.put(item)
generator = consume_queue(queue, False) generator = consume_queue(queue, False)
@ -122,7 +125,7 @@ class TestConsumeQueue(object):
def test_item_is_stop_without_cascade_stop(self): def test_item_is_stop_without_cascade_stop(self):
queue = Queue() queue = Queue()
for item in (STOP, None), ('a', None), ('b', None): for item in QueueItem.stop(), QueueItem.new('a'), QueueItem.new('b'):
queue.put(item) queue.put(item)
generator = consume_queue(queue, False) generator = consume_queue(queue, False)
@ -131,7 +134,12 @@ class TestConsumeQueue(object):
def test_item_is_stop_with_cascade_stop(self): def test_item_is_stop_with_cascade_stop(self):
queue = Queue() queue = Queue()
for item in (STOP, None), ('a', None), ('b', None): for item in QueueItem.stop(), QueueItem.new('a'), QueueItem.new('b'):
queue.put(item) queue.put(item)
assert list(consume_queue(queue, True)) == [] assert list(consume_queue(queue, True)) == []
def test_item_is_none_when_timeout_is_hit(self):
queue = Queue()
generator = consume_queue(queue, False)
assert generator.next() is None

View File

@ -8,8 +8,8 @@ import pytest
from compose import container from compose import container
from compose.cli.errors import UserError from compose.cli.errors import UserError
from compose.cli.formatter import ConsoleWarningFormatter from compose.cli.formatter import ConsoleWarningFormatter
from compose.cli.main import build_log_printer
from compose.cli.main import convergence_strategy_from_opts from compose.cli.main import convergence_strategy_from_opts
from compose.cli.main import filter_containers_to_service_names
from compose.cli.main import setup_console_handler from compose.cli.main import setup_console_handler
from compose.service import ConvergenceStrategy from compose.service import ConvergenceStrategy
from tests import mock from tests import mock
@ -32,7 +32,7 @@ def logging_handler():
class TestCLIMainTestCase(object): class TestCLIMainTestCase(object):
def test_build_log_printer(self): def test_filter_containers_to_service_names(self):
containers = [ containers = [
mock_container('web', 1), mock_container('web', 1),
mock_container('web', 2), mock_container('web', 2),
@ -41,18 +41,18 @@ class TestCLIMainTestCase(object):
mock_container('another', 1), mock_container('another', 1),
] ]
service_names = ['web', 'db'] service_names = ['web', 'db']
log_printer = build_log_printer(containers, service_names, True, False, {'follow': True}) actual = filter_containers_to_service_names(containers, service_names)
assert log_printer.containers == containers[:3] assert actual == containers[:3]
def test_build_log_printer_all_services(self): def test_filter_containers_to_service_names_all(self):
containers = [ containers = [
mock_container('web', 1), mock_container('web', 1),
mock_container('db', 1), mock_container('db', 1),
mock_container('other', 1), mock_container('other', 1),
] ]
service_names = [] service_names = []
log_printer = build_log_printer(containers, service_names, True, False, {'follow': True}) actual = filter_containers_to_service_names(containers, service_names)
assert log_printer.containers == containers assert actual == containers
class TestSetupConsoleHandlerTestCase(object): class TestSetupConsoleHandlerTestCase(object):