mirror of
https://github.com/docker/compose.git
synced 2025-07-10 07:14:27 +02:00
Starting with Docker 20.10, the docker daemon has support for "dual logging", which allows reading back logs, irregardless of the logging-driver that is configured (except for "none" as logging driver). This patch removes the local check, which used a hard-coded list of logging drivers that are expected to support reading logs. When using an older version of Docker, the API should return an error that reading logs is not supported, so no local check should be needed. Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
201 lines
6.2 KiB
Python
201 lines
6.2 KiB
Python
import itertools
|
|
from io import StringIO
|
|
from queue import Queue
|
|
|
|
import pytest
|
|
import requests
|
|
from docker.errors import APIError
|
|
|
|
from compose.cli.log_printer import build_log_generator
|
|
from compose.cli.log_printer import build_log_presenters
|
|
from compose.cli.log_printer import consume_queue
|
|
from compose.cli.log_printer import QueueItem
|
|
from compose.cli.log_printer import wait_on_exit
|
|
from compose.cli.log_printer import watch_events
|
|
from compose.container import Container
|
|
from tests import mock
|
|
|
|
|
|
@pytest.fixture
|
|
def output_stream():
|
|
output = StringIO()
|
|
output.flush = mock.Mock()
|
|
return output
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_container():
|
|
return mock.Mock(spec=Container, name_without_project='web_1')
|
|
|
|
|
|
class TestLogPresenter:
|
|
|
|
def test_monochrome(self, mock_container):
|
|
presenters = build_log_presenters(['foo', 'bar'], True)
|
|
presenter = next(presenters)
|
|
actual = presenter.present(mock_container, "this line")
|
|
assert actual == "web_1 | this line"
|
|
|
|
def test_polychrome(self, mock_container):
|
|
presenters = build_log_presenters(['foo', 'bar'], False)
|
|
presenter = next(presenters)
|
|
actual = presenter.present(mock_container, "this line")
|
|
assert '\033[' in actual
|
|
|
|
|
|
def test_wait_on_exit():
|
|
exit_status = 3
|
|
mock_container = mock.Mock(
|
|
spec=Container,
|
|
name='cname',
|
|
wait=mock.Mock(return_value=exit_status))
|
|
|
|
expected = '{} exited with code {}\n'.format(mock_container.name, exit_status)
|
|
assert expected == wait_on_exit(mock_container)
|
|
|
|
|
|
def test_wait_on_exit_raises():
|
|
status_code = 500
|
|
|
|
def mock_wait():
|
|
resp = requests.Response()
|
|
resp.status_code = status_code
|
|
raise APIError('Bad server', resp)
|
|
|
|
mock_container = mock.Mock(
|
|
spec=Container,
|
|
name='cname',
|
|
wait=mock_wait
|
|
)
|
|
|
|
expected = 'Unexpected API error for {} (HTTP code {})\n'.format(
|
|
mock_container.name, status_code,
|
|
)
|
|
assert expected in wait_on_exit(mock_container)
|
|
|
|
|
|
class TestBuildLogGenerator:
|
|
|
|
def test_no_log_stream(self, mock_container):
|
|
mock_container.log_stream = None
|
|
mock_container.logs.return_value = iter([b"hello\nworld"])
|
|
log_args = {'follow': True}
|
|
|
|
generator = build_log_generator(mock_container, log_args)
|
|
assert next(generator) == "hello\n"
|
|
assert next(generator) == "world"
|
|
mock_container.logs.assert_called_once_with(
|
|
stdout=True,
|
|
stderr=True,
|
|
stream=True,
|
|
**log_args)
|
|
|
|
def test_with_log_stream(self, mock_container):
|
|
mock_container.log_stream = iter([b"hello\nworld"])
|
|
log_args = {'follow': True}
|
|
|
|
generator = build_log_generator(mock_container, log_args)
|
|
assert next(generator) == "hello\n"
|
|
assert next(generator) == "world"
|
|
|
|
def test_unicode(self, output_stream):
|
|
glyph = '\u2022\n'
|
|
mock_container.log_stream = iter([glyph.encode('utf-8')])
|
|
|
|
generator = build_log_generator(mock_container, {})
|
|
assert next(generator) == glyph
|
|
|
|
|
|
@pytest.fixture
|
|
def thread_map():
|
|
return {'cid': mock.Mock()}
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_presenters():
|
|
return itertools.cycle([mock.Mock()])
|
|
|
|
|
|
class TestWatchEvents:
|
|
|
|
def test_stop_event(self, thread_map, mock_presenters):
|
|
event_stream = [{'action': 'stop', 'id': 'cid'}]
|
|
watch_events(thread_map, event_stream, mock_presenters, ())
|
|
assert not thread_map
|
|
|
|
def test_start_event(self, thread_map, mock_presenters):
|
|
container_id = 'abcd'
|
|
event = {'action': 'start', 'id': container_id, 'container': mock.Mock()}
|
|
event_stream = [event]
|
|
thread_args = 'foo', 'bar'
|
|
|
|
with mock.patch(
|
|
'compose.cli.log_printer.build_thread',
|
|
autospec=True
|
|
) as mock_build_thread:
|
|
watch_events(thread_map, event_stream, mock_presenters, thread_args)
|
|
mock_build_thread.assert_called_once_with(
|
|
event['container'],
|
|
next(mock_presenters),
|
|
*thread_args)
|
|
assert container_id in thread_map
|
|
|
|
def test_container_attach_event(self, thread_map, mock_presenters):
|
|
container_id = 'abcd'
|
|
mock_container = mock.Mock(is_restarting=False)
|
|
mock_container.attach_log_stream.side_effect = APIError("race condition")
|
|
event_die = {'action': 'die', 'id': container_id}
|
|
event_start = {'action': 'start', 'id': container_id, 'container': mock_container}
|
|
event_stream = [event_die, event_start]
|
|
thread_args = 'foo', 'bar'
|
|
watch_events(thread_map, event_stream, mock_presenters, thread_args)
|
|
assert mock_container.attach_log_stream.called
|
|
|
|
def test_other_event(self, thread_map, mock_presenters):
|
|
container_id = 'abcd'
|
|
event_stream = [{'action': 'create', 'id': container_id}]
|
|
watch_events(thread_map, event_stream, mock_presenters, ())
|
|
assert container_id not in thread_map
|
|
|
|
|
|
class TestConsumeQueue:
|
|
|
|
def test_item_is_an_exception(self):
|
|
|
|
class Problem(Exception):
|
|
pass
|
|
|
|
queue = Queue()
|
|
error = Problem('oops')
|
|
for item in QueueItem.new('a'), QueueItem.new('b'), QueueItem.exception(error):
|
|
queue.put(item)
|
|
|
|
generator = consume_queue(queue, False)
|
|
assert next(generator) == 'a'
|
|
assert next(generator) == 'b'
|
|
with pytest.raises(Problem):
|
|
next(generator)
|
|
|
|
def test_item_is_stop_without_cascade_stop(self):
|
|
queue = Queue()
|
|
for item in QueueItem.stop(), QueueItem.new('a'), QueueItem.new('b'):
|
|
queue.put(item)
|
|
|
|
generator = consume_queue(queue, False)
|
|
assert next(generator) == 'a'
|
|
assert next(generator) == 'b'
|
|
|
|
def test_item_is_stop_with_cascade_stop(self):
|
|
"""Return the name of the container that caused the cascade_stop"""
|
|
queue = Queue()
|
|
for item in QueueItem.stop('foobar-1'), QueueItem.new('a'), QueueItem.new('b'):
|
|
queue.put(item)
|
|
|
|
generator = consume_queue(queue, True)
|
|
assert next(generator) == 'foobar-1'
|
|
|
|
def test_item_is_none_when_timeout_is_hit(self):
|
|
queue = Queue()
|
|
generator = consume_queue(queue, False)
|
|
assert next(generator) is None
|