mirror of https://github.com/docker/compose.git
271 lines
8.3 KiB
Python
271 lines
8.3 KiB
Python
from __future__ import absolute_import
|
|
from __future__ import unicode_literals
|
|
|
|
import sys
|
|
from collections import namedtuple
|
|
from itertools import cycle
|
|
from threading import Thread
|
|
|
|
from docker.errors import APIError
|
|
from six.moves import _thread as thread
|
|
from six.moves.queue import Empty
|
|
from six.moves.queue import Queue
|
|
|
|
from . import colors
|
|
from compose import utils
|
|
from compose.cli.signals import ShutdownException
|
|
from compose.utils import split_buffer
|
|
|
|
|
|
class LogPresenter(object):
|
|
|
|
def __init__(self, prefix_width, color_func):
|
|
self.prefix_width = prefix_width
|
|
self.color_func = color_func
|
|
|
|
def present(self, container, line):
|
|
prefix = container.name_without_project.ljust(self.prefix_width)
|
|
return '{prefix} {line}'.format(
|
|
prefix=self.color_func(prefix + ' |'),
|
|
line=line)
|
|
|
|
|
|
def build_log_presenters(service_names, monochrome):
|
|
"""Return an iterable of functions.
|
|
|
|
Each function can be used to format the logs output of a container.
|
|
"""
|
|
prefix_width = max_name_width(service_names)
|
|
|
|
def no_color(text):
|
|
return text
|
|
|
|
for color_func in cycle([no_color] if monochrome else colors.rainbow()):
|
|
yield LogPresenter(prefix_width, color_func)
|
|
|
|
|
|
def max_name_width(service_names, max_index_width=3):
|
|
"""Calculate the maximum width of container names so we can make the log
|
|
prefixes line up like so:
|
|
|
|
db_1 | Listening
|
|
web_1 | Listening
|
|
"""
|
|
return max(len(name) for name in service_names) + max_index_width
|
|
|
|
|
|
class LogPrinter(object):
|
|
"""Print logs from many containers to a single output stream."""
|
|
|
|
def __init__(self,
|
|
containers,
|
|
presenters,
|
|
event_stream,
|
|
output=sys.stdout,
|
|
cascade_stop=False,
|
|
log_args=None):
|
|
self.containers = containers
|
|
self.presenters = presenters
|
|
self.event_stream = event_stream
|
|
self.output = utils.get_output_stream(output)
|
|
self.cascade_stop = cascade_stop
|
|
self.log_args = log_args or {}
|
|
|
|
def run(self):
|
|
if not self.containers:
|
|
return
|
|
|
|
queue = Queue()
|
|
thread_args = queue, self.log_args
|
|
thread_map = build_thread_map(self.containers, self.presenters, thread_args)
|
|
start_producer_thread((
|
|
thread_map,
|
|
self.event_stream,
|
|
self.presenters,
|
|
thread_args))
|
|
|
|
for line in consume_queue(queue, self.cascade_stop):
|
|
remove_stopped_threads(thread_map)
|
|
|
|
if self.cascade_stop:
|
|
matching_container = [cont.name for cont in self.containers if cont.name == line]
|
|
if line in matching_container:
|
|
# Returning the name of the container that started the
|
|
# the cascade_stop so we can return the correct exit code
|
|
return line
|
|
|
|
if not line:
|
|
if not thread_map:
|
|
# There are no running containers left to tail, so exit
|
|
return
|
|
# We got an empty line because of a timeout, but there are still
|
|
# active containers to tail, so continue
|
|
continue
|
|
|
|
self.write(line)
|
|
|
|
def write(self, line):
|
|
try:
|
|
self.output.write(line)
|
|
except UnicodeEncodeError:
|
|
# This may happen if the user's locale settings don't support UTF-8
|
|
# and UTF-8 characters are present in the log line. The following
|
|
# will output a "degraded" log with unsupported characters
|
|
# replaced by `?`
|
|
self.output.write(line.encode('ascii', 'replace').decode())
|
|
self.output.flush()
|
|
|
|
|
|
def remove_stopped_threads(thread_map):
|
|
for container_id, tailer_thread in list(thread_map.items()):
|
|
if not tailer_thread.is_alive():
|
|
thread_map.pop(container_id, None)
|
|
|
|
|
|
def build_thread(container, presenter, queue, log_args):
|
|
tailer = Thread(
|
|
target=tail_container_logs,
|
|
args=(container, presenter, queue, log_args))
|
|
tailer.daemon = True
|
|
tailer.start()
|
|
return tailer
|
|
|
|
|
|
def build_thread_map(initial_containers, presenters, thread_args):
|
|
return {
|
|
container.id: build_thread(container, next(presenters), *thread_args)
|
|
# Container order is unspecified, so they are sorted by name in order to make
|
|
# container:presenter (log color) assignment deterministic when given a list of containers
|
|
# with the same names.
|
|
for container in sorted(initial_containers, key=lambda c: c.name)
|
|
}
|
|
|
|
|
|
class QueueItem(namedtuple('_QueueItem', 'item is_stop exc')):
|
|
|
|
@classmethod
|
|
def new(cls, item):
|
|
return cls(item, None, None)
|
|
|
|
@classmethod
|
|
def exception(cls, exc):
|
|
return cls(None, None, exc)
|
|
|
|
@classmethod
|
|
def stop(cls, item=None):
|
|
return cls(item, True, None)
|
|
|
|
|
|
def tail_container_logs(container, presenter, queue, log_args):
|
|
generator = get_log_generator(container)
|
|
|
|
try:
|
|
for item in generator(container, log_args):
|
|
queue.put(QueueItem.new(presenter.present(container, item)))
|
|
except Exception as e:
|
|
queue.put(QueueItem.exception(e))
|
|
return
|
|
if log_args.get('follow'):
|
|
queue.put(QueueItem.new(presenter.color_func(wait_on_exit(container))))
|
|
queue.put(QueueItem.stop(container.name))
|
|
|
|
|
|
def get_log_generator(container):
|
|
if container.has_api_logs:
|
|
return build_log_generator
|
|
return build_no_log_generator
|
|
|
|
|
|
def build_no_log_generator(container, log_args):
|
|
"""Return a generator that prints a warning about logs and waits for
|
|
container to exit.
|
|
"""
|
|
yield "WARNING: no logs are available with the '{}' log driver\n".format(
|
|
container.log_driver)
|
|
|
|
|
|
def build_log_generator(container, log_args):
|
|
# if the container doesn't have a log_stream we need to attach to container
|
|
# before log printer starts running
|
|
if container.log_stream is None:
|
|
stream = container.logs(stdout=True, stderr=True, stream=True, **log_args)
|
|
else:
|
|
stream = container.log_stream
|
|
|
|
return split_buffer(stream)
|
|
|
|
|
|
def wait_on_exit(container):
|
|
try:
|
|
exit_code = container.wait()
|
|
return "%s exited with code %s\n" % (container.name, exit_code)
|
|
except APIError as e:
|
|
return "Unexpected API error for %s (HTTP code %s)\nResponse body:\n%s\n" % (
|
|
container.name, e.response.status_code,
|
|
e.response.text or '[empty]'
|
|
)
|
|
|
|
|
|
def start_producer_thread(thread_args):
|
|
producer = Thread(target=watch_events, args=thread_args)
|
|
producer.daemon = True
|
|
producer.start()
|
|
|
|
|
|
def watch_events(thread_map, event_stream, presenters, thread_args):
|
|
crashed_containers = set()
|
|
for event in event_stream:
|
|
if event['action'] == 'stop':
|
|
thread_map.pop(event['id'], None)
|
|
|
|
if event['action'] == 'die':
|
|
thread_map.pop(event['id'], None)
|
|
crashed_containers.add(event['id'])
|
|
|
|
if event['action'] != 'start':
|
|
continue
|
|
|
|
if event['id'] in thread_map:
|
|
if thread_map[event['id']].is_alive():
|
|
continue
|
|
# Container was stopped and started, we need a new thread
|
|
thread_map.pop(event['id'], None)
|
|
|
|
# Container crashed so we should reattach to it
|
|
if event['id'] in crashed_containers:
|
|
container = event['container']
|
|
if not container.is_restarting:
|
|
try:
|
|
container.attach_log_stream()
|
|
except APIError:
|
|
# Just ignore errors when reattaching to already crashed containers
|
|
pass
|
|
crashed_containers.remove(event['id'])
|
|
|
|
thread_map[event['id']] = build_thread(
|
|
event['container'],
|
|
next(presenters),
|
|
*thread_args
|
|
)
|
|
|
|
|
|
def consume_queue(queue, cascade_stop):
|
|
"""Consume the queue by reading lines off of it and yielding them."""
|
|
while True:
|
|
try:
|
|
item = queue.get(timeout=0.1)
|
|
except Empty:
|
|
yield None
|
|
continue
|
|
# See https://github.com/docker/compose/issues/189
|
|
except thread.error:
|
|
raise ShutdownException()
|
|
|
|
if item.exc:
|
|
raise item.exc
|
|
|
|
if item.is_stop and not cascade_stop:
|
|
continue
|
|
|
|
yield item.item
|