Attach to a container's log_stream before they're started

So we're not displaying output of all previous logs for a container, we attach,
if possible, to a container before the container is started.

LogPrinter checks if a container has a log_stream already attached and
print from that rather than always attempting to attach one itself.

Signed-off-by: Mazz Mosley <mazz@houseofmnowster.com>
This commit is contained in:
Mazz Mosley 2015-10-21 17:28:16 +01:00
parent a4b002a76e
commit 7603ebea9b
6 changed files with 66 additions and 27 deletions

View File

@ -73,9 +73,13 @@ def build_no_log_generator(container, prefix, color_func):
def build_log_generator(container, prefix, color_func): def build_log_generator(container, prefix, color_func):
# Attach to container before log printer starts running # if the container doesn't have a log_stream we need to attach to container
stream = container.attach(stdout=True, stderr=True, stream=True, logs=True) # before log printer starts running
line_generator = split_buffer(stream) if container.log_stream is None:
stream = container.attach(stdout=True, stderr=True, stream=True, logs=True)
line_generator = split_buffer(stream)
else:
line_generator = split_buffer(container.log_stream)
for line in line_generator: for line in line_generator:
yield prefix + line yield prefix + line

View File

@ -565,16 +565,18 @@ class TopLevelCommand(DocoptCommand):
start_deps = not options['--no-deps'] start_deps = not options['--no-deps']
service_names = options['SERVICE'] service_names = options['SERVICE']
timeout = int(options.get('--timeout') or DEFAULT_TIMEOUT) timeout = int(options.get('--timeout') or DEFAULT_TIMEOUT)
detached = options.get('-d')
to_attach = project.up( to_attach = project.up(
service_names=service_names, service_names=service_names,
start_deps=start_deps, start_deps=start_deps,
strategy=convergence_strategy_from_opts(options), strategy=convergence_strategy_from_opts(options),
do_build=not options['--no-build'], do_build=not options['--no-build'],
timeout=timeout timeout=timeout,
detached=detached
) )
if not options['-d']: if not detached:
log_printer = build_log_printer(to_attach, service_names, monochrome) log_printer = build_log_printer(to_attach, service_names, monochrome)
attach_to_logs(project, log_printer, service_names, timeout) attach_to_logs(project, log_printer, service_names, timeout)
@ -636,7 +638,10 @@ def convergence_strategy_from_opts(options):
def build_log_printer(containers, service_names, monochrome): def build_log_printer(containers, service_names, monochrome):
if service_names: if service_names:
containers = [c for c in containers if c.service in service_names] containers = [
container
for container in containers if container.service in service_names
]
return LogPrinter(containers, monochrome=monochrome) return LogPrinter(containers, monochrome=monochrome)

View File

@ -19,6 +19,7 @@ class Container(object):
self.client = client self.client = client
self.dictionary = dictionary self.dictionary = dictionary
self.has_been_inspected = has_been_inspected self.has_been_inspected = has_been_inspected
self.log_stream = None
@classmethod @classmethod
def from_ps(cls, client, dictionary, **kwargs): def from_ps(cls, client, dictionary, **kwargs):
@ -146,6 +147,13 @@ class Container(object):
log_type = self.log_driver log_type = self.log_driver
return not log_type or log_type == 'json-file' return not log_type or log_type == 'json-file'
def attach_log_stream(self):
"""A log stream can only be attached if the container uses a json-file
log driver.
"""
if self.has_api_logs:
self.log_stream = self.attach(stdout=True, stderr=True, stream=True)
def get(self, key): def get(self, key):
"""Return a value from the container or None if the value is not set. """Return a value from the container or None if the value is not set.

View File

@ -290,7 +290,8 @@ class Project(object):
start_deps=True, start_deps=True,
strategy=ConvergenceStrategy.changed, strategy=ConvergenceStrategy.changed,
do_build=True, do_build=True,
timeout=DEFAULT_TIMEOUT): timeout=DEFAULT_TIMEOUT,
detached=False):
services = self.get_services(service_names, include_deps=start_deps) services = self.get_services(service_names, include_deps=start_deps)
@ -308,7 +309,8 @@ class Project(object):
for container in service.execute_convergence_plan( for container in service.execute_convergence_plan(
plans[service.name], plans[service.name],
do_build=do_build, do_build=do_build,
timeout=timeout timeout=timeout,
detached=detached
) )
] ]

View File

@ -395,11 +395,17 @@ class Service(object):
def execute_convergence_plan(self, def execute_convergence_plan(self,
plan, plan,
do_build=True, do_build=True,
timeout=DEFAULT_TIMEOUT): timeout=DEFAULT_TIMEOUT,
detached=False):
(action, containers) = plan (action, containers) = plan
should_attach_logs = not detached
if action == 'create': if action == 'create':
container = self.create_container(do_build=do_build) container = self.create_container(do_build=do_build)
if should_attach_logs:
container.attach_log_stream()
self.start_container(container) self.start_container(container)
return [container] return [container]
@ -407,15 +413,16 @@ class Service(object):
elif action == 'recreate': elif action == 'recreate':
return [ return [
self.recreate_container( self.recreate_container(
c, container,
timeout=timeout timeout=timeout,
attach_logs=should_attach_logs
) )
for c in containers for container in containers
] ]
elif action == 'start': elif action == 'start':
for c in containers: for container in containers:
self.start_container_if_stopped(c) self.start_container_if_stopped(container, attach_logs=should_attach_logs)
return containers return containers
@ -428,16 +435,7 @@ class Service(object):
else: else:
raise Exception("Invalid action: {}".format(action)) raise Exception("Invalid action: {}".format(action))
def recreate_container(self, def _recreate_stop_container(self, container, timeout):
container,
timeout=DEFAULT_TIMEOUT):
"""Recreate a container.
The original container is renamed to a temporary name so that data
volumes can be copied to the new container, before the original
container is removed.
"""
log.info("Recreating %s" % container.name)
try: try:
container.stop(timeout=timeout) container.stop(timeout=timeout)
except APIError as e: except APIError as e:
@ -448,26 +446,46 @@ class Service(object):
else: else:
raise raise
def _recreate_rename_container(self, container):
# Use a hopefully unique container name by prepending the short id # Use a hopefully unique container name by prepending the short id
self.client.rename( self.client.rename(
container.id, container.id,
'%s_%s' % (container.short_id, container.name)) '%s_%s' % (container.short_id, container.name)
)
def recreate_container(self,
container,
timeout=DEFAULT_TIMEOUT,
attach_logs=False):
"""Recreate a container.
The original container is renamed to a temporary name so that data
volumes can be copied to the new container, before the original
container is removed.
"""
log.info("Recreating %s" % container.name)
self._recreate_stop_container(container, timeout)
self._recreate_rename_container(container)
new_container = self.create_container( new_container = self.create_container(
do_build=False, do_build=False,
previous_container=container, previous_container=container,
number=container.labels.get(LABEL_CONTAINER_NUMBER), number=container.labels.get(LABEL_CONTAINER_NUMBER),
quiet=True, quiet=True,
) )
if attach_logs:
new_container.attach_log_stream()
self.start_container(new_container) self.start_container(new_container)
container.remove() container.remove()
return new_container return new_container
def start_container_if_stopped(self, container): def start_container_if_stopped(self, container, attach_logs=False):
if container.is_running: if container.is_running:
return container return container
else: else:
log.info("Starting %s" % container.name) log.info("Starting %s" % container.name)
if attach_logs:
container.attach_log_stream()
return self.start_container(container) return self.start_container(container)
def start_container(self, container): def start_container(self, container):

View File

@ -18,6 +18,7 @@ from compose.service import ConvergenceStrategy
class ProjectTestCase(DockerClientTestCase): class ProjectTestCase(DockerClientTestCase):
def run_up(self, cfg, **kwargs): def run_up(self, cfg, **kwargs):
kwargs.setdefault('timeout', 1) kwargs.setdefault('timeout', 1)
kwargs.setdefault('detached', True)
project = self.make_project(cfg) project = self.make_project(cfg)
project.up(**kwargs) project.up(**kwargs)
@ -184,7 +185,8 @@ def converge(service,
do_build=True): do_build=True):
"""Create a converge plan from a strategy and execute the plan.""" """Create a converge plan from a strategy and execute the plan."""
plan = service.convergence_plan(strategy) plan = service.convergence_plan(strategy)
return service.execute_convergence_plan(plan, do_build=do_build, timeout=1) containers, logging_threads = zip(*service.execute_convergence_plan(plan, do_build=do_build, timeout=1))
return containers
class ServiceStateTest(DockerClientTestCase): class ServiceStateTest(DockerClientTestCase):