use logs API with Since to collect the very first logs after restart

Signed-off-by: Nicolas De Loof <nicolas.deloof@gmail.com>
This commit is contained in:
Nicolas De Loof 2025-06-05 15:45:51 +02:00 committed by Guillaume Lours
parent 485b6200ee
commit 1c37f1abb6
7 changed files with 92 additions and 108 deletions

View File

@ -183,7 +183,3 @@ func (l logDecorator) Status(container, msg string) {
l.decorated.Status(container, msg) l.decorated.Status(container, msg)
l.After() l.After()
} }
func (l logDecorator) Register(container string) {
l.decorated.Register(container)
}

View File

@ -667,7 +667,7 @@ type ContainerEvent struct {
ID string ID string
Service string Service string
Line string Line string
// ContainerEventExited only // ExitCode is only set on ContainerEventExited events
ExitCode int ExitCode int
Restarting bool Restarting bool
} }

View File

@ -635,13 +635,18 @@ func (s *composeService) recreateContainer(ctx context.Context, project *types.P
if inherit { if inherit {
inherited = &replaced inherited = &replaced
} }
replacedContainerName := service.ContainerName
if replacedContainerName == "" {
replacedContainerName = service.Name + api.Separator + strconv.Itoa(number)
}
name := getContainerName(project.Name, service, number) name := getContainerName(project.Name, service, number)
tmpName := fmt.Sprintf("%s_%s", replaced.ID[:12], name) tmpName := fmt.Sprintf("%s_%s", replaced.ID[:12], name)
opts := createOptions{ opts := createOptions{
AutoRemove: false, AutoRemove: false,
AttachStdin: false, AttachStdin: false,
UseNetworkAliases: true, UseNetworkAliases: true,
Labels: mergeLabels(service.Labels, service.CustomLabels).Add(api.ContainerReplaceLabel, replaced.ID), Labels: mergeLabels(service.Labels, service.CustomLabels).Add(api.ContainerReplaceLabel, replacedContainerName),
} }
created, err = s.createMobyContainer(ctx, project, service, tmpName, number, inherited, opts, w) created, err = s.createMobyContainer(ctx, project, service, tmpName, number, inherited, opts, w)
if err != nil { if err != nil {
@ -659,7 +664,7 @@ func (s *composeService) recreateContainer(ctx context.Context, project *types.P
return created, err return created, err
} }
err = s.apiClient().ContainerRename(ctx, created.ID, name) err = s.apiClient().ContainerRename(ctx, tmpName, name)
if err != nil { if err != nil {
return created, err return created, err
} }

View File

@ -19,7 +19,6 @@ package compose
import ( import (
"context" "context"
"io" "io"
"time"
"github.com/containerd/errdefs" "github.com/containerd/errdefs"
"github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/container"
@ -73,9 +72,14 @@ func (s *composeService) Logs(
if options.Follow { if options.Follow {
printer := newLogPrinter(consumer) printer := newLogPrinter(consumer)
eg.Go(printer.Run)
monitor := newMonitor(s.apiClient(), options.Project) monitor := newMonitor(s.apiClient(), projectName)
if len(options.Services) > 0 {
monitor.withServices(options.Services)
} else if options.Project != nil {
monitor.withServices(options.Project.ServiceNames())
}
monitor.withListener(printer.HandleEvent)
monitor.withListener(func(event api.ContainerEvent) { monitor.withListener(func(event api.ContainerEvent) {
if event.Type == api.ContainerEventStarted { if event.Type == api.ContainerEventStarted {
eg.Go(func() error { eg.Go(func() error {
@ -86,7 +90,7 @@ func (s *composeService) Logs(
err = s.doLogContainer(ctx, consumer, event.Source, ctr, api.LogOptions{ err = s.doLogContainer(ctx, consumer, event.Source, ctr, api.LogOptions{
Follow: options.Follow, Follow: options.Follow,
Since: time.Unix(0, event.Time).Format(time.RFC3339Nano), Since: ctr.State.StartedAt,
Until: options.Until, Until: options.Until,
Tail: options.Tail, Tail: options.Tail,
Timestamps: options.Timestamps, Timestamps: options.Timestamps,
@ -100,7 +104,6 @@ func (s *composeService) Logs(
} }
}) })
eg.Go(func() error { eg.Go(func() error {
defer printer.Stop()
return monitor.Start(ctx) return monitor.Start(ctx)
}) })
} }

View File

@ -20,7 +20,6 @@ import (
"context" "context"
"strconv" "strconv"
"github.com/compose-spec/compose-go/v2/types"
"github.com/containerd/errdefs" "github.com/containerd/errdefs"
"github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/events" "github.com/docker/docker/api/types/events"
@ -34,23 +33,23 @@ import (
type monitor struct { type monitor struct {
api client.APIClient api client.APIClient
project *types.Project project string
// services tells us which service to consider and those we can ignore, maybe ran by a concurrent compose command // services tells us which service to consider and those we can ignore, maybe ran by a concurrent compose command
services map[string]bool services map[string]bool
listeners []api.ContainerEventListener listeners []api.ContainerEventListener
} }
func newMonitor(api client.APIClient, project *types.Project) *monitor { func newMonitor(api client.APIClient, project string) *monitor {
services := map[string]bool{}
if project != nil {
for name := range project.Services {
services[name] = true
}
}
return &monitor{ return &monitor{
api: api, api: api,
project: project, project: project,
services: services, services: map[string]bool{},
}
}
func (c *monitor) withServices(services []string) {
for _, name := range services {
c.services[name] = true
} }
} }
@ -62,7 +61,7 @@ func (c *monitor) Start(ctx context.Context) error {
initialState, err := c.api.ContainerList(ctx, container.ListOptions{ initialState, err := c.api.ContainerList(ctx, container.ListOptions{
All: true, All: true,
Filters: filters.NewArgs( Filters: filters.NewArgs(
projectFilter(c.project.Name), projectFilter(c.project),
oneOffFilter(false), oneOffFilter(false),
hasConfigHashLabel(), hasConfigHashLabel(),
), ),
@ -78,22 +77,24 @@ func (c *monitor) Start(ctx context.Context) error {
containers.Add(ctr.ID) containers.Add(ctr.ID)
} }
} }
restarting := utils.Set[string]{} restarting := utils.Set[string]{}
evtCh, errCh := c.api.Events(ctx, events.ListOptions{ evtCh, errCh := c.api.Events(ctx, events.ListOptions{
Filters: filters.NewArgs( Filters: filters.NewArgs(
filters.Arg("type", "container"), filters.Arg("type", "container"),
projectFilter(c.project.Name)), projectFilter(c.project)),
}) })
for { for {
if len(containers) == 0 {
return nil
}
select { select {
case <-ctx.Done(): case <-ctx.Done():
return nil return nil
case err := <-errCh: case err := <-errCh:
return err return err
case event := <-evtCh: case event := <-evtCh:
if !c.services[event.Actor.Attributes[api.ServiceLabel]] { if len(c.services) > 0 && !c.services[event.Actor.Attributes[api.ServiceLabel]] {
continue continue
} }
ctr, err := c.getContainerSummary(event) ctr, err := c.getContainerSummary(event)
@ -103,24 +104,35 @@ func (c *monitor) Start(ctx context.Context) error {
switch event.Action { switch event.Action {
case events.ActionCreate: case events.ActionCreate:
if len(c.services) == 0 || c.services[ctr.Labels[api.ServiceLabel]] {
containers.Add(ctr.ID) containers.Add(ctr.ID)
}
evtType := api.ContainerEventCreated
if _, ok := ctr.Labels[api.ContainerReplaceLabel]; ok {
evtType = api.ContainerEventRecreated
}
for _, listener := range c.listeners { for _, listener := range c.listeners {
listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventCreated)) listener(newContainerEvent(event.TimeNano, ctr, evtType))
} }
logrus.Debugf("container %s created", ctr.Name) logrus.Debugf("container %s created", ctr.Name)
case events.ActionStart: case events.ActionStart:
restarted := restarting.Has(ctr.ID) restarted := restarting.Has(ctr.ID)
if restarted {
logrus.Debugf("container %s restarted", ctr.Name)
for _, listener := range c.listeners { for _, listener := range c.listeners {
listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventStarted, func(e *api.ContainerEvent) { listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventStarted, func(e *api.ContainerEvent) {
e.Restarting = restarted e.Restarting = restarted
})) }))
} }
if restarted {
logrus.Debugf("container %s restarted", ctr.Name)
} else { } else {
logrus.Debugf("container %s started", ctr.Name) logrus.Debugf("container %s started", ctr.Name)
for _, listener := range c.listeners {
listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventStarted))
} }
}
if len(c.services) == 0 || c.services[ctr.Labels[api.ServiceLabel]] {
containers.Add(ctr.ID) containers.Add(ctr.ID)
}
case events.ActionRestart: case events.ActionRestart:
for _, listener := range c.listeners { for _, listener := range c.listeners {
listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventRestarted)) listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventRestarted))
@ -159,9 +171,6 @@ func (c *monitor) Start(ctx context.Context) error {
} }
} }
} }
if len(containers) == 0 {
return nil
}
} }
} }
@ -192,7 +201,7 @@ func (c *monitor) getContainerSummary(event events.Message) (*api.ContainerSumma
ctr := &api.ContainerSummary{ ctr := &api.ContainerSummary{
ID: event.Actor.ID, ID: event.Actor.ID,
Name: event.Actor.Attributes["name"], Name: event.Actor.Attributes["name"],
Project: c.project.Name, Project: c.project,
Service: event.Actor.Attributes[api.ServiceLabel], Service: event.Actor.Attributes[api.ServiceLabel],
Labels: event.Actor.Attributes, // More than just labels, but that'c the closest the API gives us Labels: event.Actor.Attributes, // More than just labels, but that'c the closest the API gives us
} }

View File

@ -18,7 +18,6 @@ package compose
import ( import (
"fmt" "fmt"
"sync"
"github.com/docker/compose/v2/pkg/api" "github.com/docker/compose/v2/pkg/api"
) )
@ -26,72 +25,29 @@ import (
// logPrinter watch application containers and collect their logs // logPrinter watch application containers and collect their logs
type logPrinter interface { type logPrinter interface {
HandleEvent(event api.ContainerEvent) HandleEvent(event api.ContainerEvent)
Run() error
Stop()
} }
type printer struct { type printer struct {
queue chan api.ContainerEvent
consumer api.LogConsumer consumer api.LogConsumer
stopCh chan struct{} // stopCh is a signal channel for producers to stop sending events to the queue
stop sync.Once
} }
// newLogPrinter builds a LogPrinter passing containers logs to LogConsumer // newLogPrinter builds a LogPrinter passing containers logs to LogConsumer
func newLogPrinter(consumer api.LogConsumer) logPrinter { func newLogPrinter(consumer api.LogConsumer) logPrinter {
printer := printer{ printer := printer{
consumer: consumer, consumer: consumer,
queue: make(chan api.ContainerEvent),
stopCh: make(chan struct{}),
stop: sync.Once{},
} }
return &printer return &printer
} }
func (p *printer) Stop() {
p.stop.Do(func() {
close(p.stopCh)
for {
select {
case <-p.queue:
// purge the queue to free producers goroutines
// p.queue will be garbage collected
default:
return
}
}
})
}
func (p *printer) HandleEvent(event api.ContainerEvent) { func (p *printer) HandleEvent(event api.ContainerEvent) {
select {
case <-p.stopCh:
return
default:
p.queue <- event
}
}
func (p *printer) Run() error {
defer p.Stop()
// containers we are tracking. Use true when container is running, false after we receive a stop|die signal
for {
select {
case <-p.stopCh:
return nil
case event := <-p.queue:
switch event.Type { switch event.Type {
case api.ContainerEventExited, api.ContainerEventStopped, api.ContainerEventRecreated, api.ContainerEventRestarted: case api.ContainerEventExited:
p.consumer.Status(event.Source, fmt.Sprintf("exited with code %d", event.ExitCode)) p.consumer.Status(event.Source, fmt.Sprintf("exited with code %d", event.ExitCode))
if event.Type == api.ContainerEventRecreated { case api.ContainerEventRecreated:
p.consumer.Status(event.Source, "has been recreated") p.consumer.Status(event.Container.Labels[api.ContainerReplaceLabel], "has been recreated")
}
case api.ContainerEventLog, api.HookEventLog: case api.ContainerEventLog, api.HookEventLog:
p.consumer.Log(event.Source, event.Line) p.consumer.Log(event.Source, event.Line)
case api.ContainerEventErr: case api.ContainerEventErr:
p.consumer.Err(event.Source, event.Line) p.consumer.Err(event.Source, event.Line)
} }
} }
}
}

View File

@ -18,6 +18,7 @@ package compose
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"os" "os"
"os/signal" "os/signal"
@ -31,6 +32,7 @@ import (
"github.com/docker/compose/v2/internal/tracing" "github.com/docker/compose/v2/internal/tracing"
"github.com/docker/compose/v2/pkg/api" "github.com/docker/compose/v2/pkg/api"
"github.com/docker/compose/v2/pkg/progress" "github.com/docker/compose/v2/pkg/progress"
"github.com/docker/docker/errdefs"
"github.com/eiannone/keyboard" "github.com/eiannone/keyboard"
"github.com/hashicorp/go-multierror" "github.com/hashicorp/go-multierror"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -166,7 +168,12 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
} }
} }
monitor := newMonitor(s.apiClient(), project) monitor := newMonitor(s.apiClient(), project.Name)
if len(options.Start.Services) > 0 {
monitor.withServices(options.Start.Services)
} else {
monitor.withServices(project.ServiceNames())
}
monitor.withListener(printer.HandleEvent) monitor.withListener(printer.HandleEvent)
var exitCode int var exitCode int
@ -175,9 +182,12 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
// detect first container to exit to trigger application shutdown // detect first container to exit to trigger application shutdown
monitor.withListener(func(event api.ContainerEvent) { monitor.withListener(func(event api.ContainerEvent) {
if once && event.Type == api.ContainerEventExited { if once && event.Type == api.ContainerEventExited {
if options.Start.OnExit == api.CascadeFail && event.ExitCode == 0 {
return
}
once = false
exitCode = event.ExitCode exitCode = event.ExitCode
printer.Stop() _, _ = fmt.Fprintln(s.stdinfo(), progress.ErrorColor("Aborting on container exit..."))
_, _ = fmt.Fprintln(s.stdinfo(), "Aborting on container exit...")
eg.Go(func() error { eg.Go(func() error {
return progress.RunWithLog(context.WithoutCancel(ctx), func(ctx context.Context) error { return progress.RunWithLog(context.WithoutCancel(ctx), func(ctx context.Context) error {
return s.stop(ctx, project.Name, api.StopOptions{ return s.stop(ctx, project.Name, api.StopOptions{
@ -186,7 +196,6 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
}, printer.HandleEvent) }, printer.HandleEvent)
}, s.stdinfo(), logConsumer) }, s.stdinfo(), logConsumer)
}) })
once = false
} }
}) })
} }
@ -203,29 +212,35 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
} }
monitor.withListener(func(event api.ContainerEvent) { monitor.withListener(func(event api.ContainerEvent) {
mustAttach := false if event.Type != api.ContainerEventStarted {
switch event.Type { return
case api.ContainerEventCreated:
// A container has been added to the application (scale)
mustAttach = true
case api.ContainerEventStarted:
// A container is restarting - need to re-attach
mustAttach = event.Restarting
} }
if mustAttach { if event.Restarting || event.Container.Labels[api.ContainerReplaceLabel] != "" {
eg.Go(func() error { eg.Go(func() error {
// FIXME as container already started, we might miss the very first logs ctr, err := s.apiClient().ContainerInspect(ctx, event.ID)
if err != nil {
return err
}
err = s.doLogContainer(ctx, options.Start.Attach, event.Source, ctr, api.LogOptions{
Follow: true,
Since: ctr.State.StartedAt,
})
var notImplErr errdefs.ErrNotImplemented
if errors.As(err, &notImplErr) {
// container may be configured with logging_driver: none
// as container already started, we might miss the very first logs. But still better than none
return s.doAttachContainer(ctx, event.Service, event.ID, event.Source, printer.HandleEvent) return s.doAttachContainer(ctx, event.Service, event.ID, event.Source, printer.HandleEvent)
}
return err
}) })
} }
}) })
eg.Go(func() error { eg.Go(func() error {
err := monitor.Start(ctx) err := monitor.Start(ctx)
fmt.Println("monitor complete")
// Signal for the signal-handler goroutines to stop // Signal for the signal-handler goroutines to stop
close(doneCh) close(doneCh)
printer.Stop()
return err return err
}) })