From 1c37f1abb6883b241db91c5b78079525f8c26a75 Mon Sep 17 00:00:00 2001 From: Nicolas De Loof Date: Thu, 5 Jun 2025 15:45:51 +0200 Subject: [PATCH] use logs API with Since to collect the very first logs after restart Signed-off-by: Nicolas De Loof --- cmd/formatter/logs.go | 4 --- pkg/api/api.go | 2 +- pkg/compose/convergence.go | 9 ++++-- pkg/compose/logs.go | 13 +++++--- pkg/compose/monitor.go | 61 +++++++++++++++++++++---------------- pkg/compose/printer.go | 62 ++++++-------------------------------- pkg/compose/up.go | 49 +++++++++++++++++++----------- 7 files changed, 92 insertions(+), 108 deletions(-) diff --git a/cmd/formatter/logs.go b/cmd/formatter/logs.go index 2faa77b65..430cf1b03 100644 --- a/cmd/formatter/logs.go +++ b/cmd/formatter/logs.go @@ -183,7 +183,3 @@ func (l logDecorator) Status(container, msg string) { l.decorated.Status(container, msg) l.After() } - -func (l logDecorator) Register(container string) { - l.decorated.Register(container) -} diff --git a/pkg/api/api.go b/pkg/api/api.go index 0bec86a6e..b57a142a6 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -667,7 +667,7 @@ type ContainerEvent struct { ID string Service string Line string - // ContainerEventExited only + // ExitCode is only set on ContainerEventExited events ExitCode int Restarting bool } diff --git a/pkg/compose/convergence.go b/pkg/compose/convergence.go index a4b0d0e6c..c43f6cc76 100644 --- a/pkg/compose/convergence.go +++ b/pkg/compose/convergence.go @@ -635,13 +635,18 @@ func (s *composeService) recreateContainer(ctx context.Context, project *types.P if inherit { inherited = &replaced } + + replacedContainerName := service.ContainerName + if replacedContainerName == "" { + replacedContainerName = service.Name + api.Separator + strconv.Itoa(number) + } name := getContainerName(project.Name, service, number) tmpName := fmt.Sprintf("%s_%s", replaced.ID[:12], name) opts := createOptions{ AutoRemove: false, AttachStdin: false, UseNetworkAliases: true, - Labels: mergeLabels(service.Labels, service.CustomLabels).Add(api.ContainerReplaceLabel, replaced.ID), + Labels: mergeLabels(service.Labels, service.CustomLabels).Add(api.ContainerReplaceLabel, replacedContainerName), } created, err = s.createMobyContainer(ctx, project, service, tmpName, number, inherited, opts, w) if err != nil { @@ -659,7 +664,7 @@ func (s *composeService) recreateContainer(ctx context.Context, project *types.P return created, err } - err = s.apiClient().ContainerRename(ctx, created.ID, name) + err = s.apiClient().ContainerRename(ctx, tmpName, name) if err != nil { return created, err } diff --git a/pkg/compose/logs.go b/pkg/compose/logs.go index 2976dace4..b3b44d53e 100644 --- a/pkg/compose/logs.go +++ b/pkg/compose/logs.go @@ -19,7 +19,6 @@ package compose import ( "context" "io" - "time" "github.com/containerd/errdefs" "github.com/docker/docker/api/types/container" @@ -73,9 +72,14 @@ func (s *composeService) Logs( if options.Follow { printer := newLogPrinter(consumer) - eg.Go(printer.Run) - monitor := newMonitor(s.apiClient(), options.Project) + monitor := newMonitor(s.apiClient(), projectName) + if len(options.Services) > 0 { + monitor.withServices(options.Services) + } else if options.Project != nil { + monitor.withServices(options.Project.ServiceNames()) + } + monitor.withListener(printer.HandleEvent) monitor.withListener(func(event api.ContainerEvent) { if event.Type == api.ContainerEventStarted { eg.Go(func() error { @@ -86,7 +90,7 @@ func (s *composeService) Logs( err = s.doLogContainer(ctx, consumer, event.Source, ctr, api.LogOptions{ Follow: options.Follow, - Since: time.Unix(0, event.Time).Format(time.RFC3339Nano), + Since: ctr.State.StartedAt, Until: options.Until, Tail: options.Tail, Timestamps: options.Timestamps, @@ -100,7 +104,6 @@ func (s *composeService) Logs( } }) eg.Go(func() error { - defer printer.Stop() return monitor.Start(ctx) }) } diff --git a/pkg/compose/monitor.go b/pkg/compose/monitor.go index 624f01d5d..b0f9cc0af 100644 --- a/pkg/compose/monitor.go +++ b/pkg/compose/monitor.go @@ -20,7 +20,6 @@ import ( "context" "strconv" - "github.com/compose-spec/compose-go/v2/types" "github.com/containerd/errdefs" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/events" @@ -34,23 +33,23 @@ import ( type monitor struct { api client.APIClient - project *types.Project + project string // services tells us which service to consider and those we can ignore, maybe ran by a concurrent compose command services map[string]bool listeners []api.ContainerEventListener } -func newMonitor(api client.APIClient, project *types.Project) *monitor { - services := map[string]bool{} - if project != nil { - for name := range project.Services { - services[name] = true - } - } +func newMonitor(api client.APIClient, project string) *monitor { return &monitor{ api: api, project: project, - services: services, + services: map[string]bool{}, + } +} + +func (c *monitor) withServices(services []string) { + for _, name := range services { + c.services[name] = true } } @@ -62,7 +61,7 @@ func (c *monitor) Start(ctx context.Context) error { initialState, err := c.api.ContainerList(ctx, container.ListOptions{ All: true, Filters: filters.NewArgs( - projectFilter(c.project.Name), + projectFilter(c.project), oneOffFilter(false), hasConfigHashLabel(), ), @@ -78,22 +77,24 @@ func (c *monitor) Start(ctx context.Context) error { containers.Add(ctr.ID) } } - restarting := utils.Set[string]{} evtCh, errCh := c.api.Events(ctx, events.ListOptions{ Filters: filters.NewArgs( filters.Arg("type", "container"), - projectFilter(c.project.Name)), + projectFilter(c.project)), }) for { + if len(containers) == 0 { + return nil + } select { case <-ctx.Done(): return nil case err := <-errCh: return err case event := <-evtCh: - if !c.services[event.Actor.Attributes[api.ServiceLabel]] { + if len(c.services) > 0 && !c.services[event.Actor.Attributes[api.ServiceLabel]] { continue } ctr, err := c.getContainerSummary(event) @@ -103,24 +104,35 @@ func (c *monitor) Start(ctx context.Context) error { switch event.Action { case events.ActionCreate: - containers.Add(ctr.ID) + if len(c.services) == 0 || c.services[ctr.Labels[api.ServiceLabel]] { + containers.Add(ctr.ID) + } + evtType := api.ContainerEventCreated + if _, ok := ctr.Labels[api.ContainerReplaceLabel]; ok { + evtType = api.ContainerEventRecreated + } for _, listener := range c.listeners { - listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventCreated)) + listener(newContainerEvent(event.TimeNano, ctr, evtType)) } logrus.Debugf("container %s created", ctr.Name) case events.ActionStart: restarted := restarting.Has(ctr.ID) - for _, listener := range c.listeners { - listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventStarted, func(e *api.ContainerEvent) { - e.Restarting = restarted - })) - } if restarted { logrus.Debugf("container %s restarted", ctr.Name) + for _, listener := range c.listeners { + listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventStarted, func(e *api.ContainerEvent) { + e.Restarting = restarted + })) + } } else { logrus.Debugf("container %s started", ctr.Name) + for _, listener := range c.listeners { + listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventStarted)) + } + } + if len(c.services) == 0 || c.services[ctr.Labels[api.ServiceLabel]] { + containers.Add(ctr.ID) } - containers.Add(ctr.ID) case events.ActionRestart: for _, listener := range c.listeners { listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventRestarted)) @@ -159,9 +171,6 @@ func (c *monitor) Start(ctx context.Context) error { } } } - if len(containers) == 0 { - return nil - } } } @@ -192,7 +201,7 @@ func (c *monitor) getContainerSummary(event events.Message) (*api.ContainerSumma ctr := &api.ContainerSummary{ ID: event.Actor.ID, Name: event.Actor.Attributes["name"], - Project: c.project.Name, + Project: c.project, Service: event.Actor.Attributes[api.ServiceLabel], Labels: event.Actor.Attributes, // More than just labels, but that'c the closest the API gives us } diff --git a/pkg/compose/printer.go b/pkg/compose/printer.go index 55084270a..079736869 100644 --- a/pkg/compose/printer.go +++ b/pkg/compose/printer.go @@ -18,7 +18,6 @@ package compose import ( "fmt" - "sync" "github.com/docker/compose/v2/pkg/api" ) @@ -26,72 +25,29 @@ import ( // logPrinter watch application containers and collect their logs type logPrinter interface { HandleEvent(event api.ContainerEvent) - Run() error - Stop() } type printer struct { - queue chan api.ContainerEvent consumer api.LogConsumer - stopCh chan struct{} // stopCh is a signal channel for producers to stop sending events to the queue - stop sync.Once } // newLogPrinter builds a LogPrinter passing containers logs to LogConsumer func newLogPrinter(consumer api.LogConsumer) logPrinter { printer := printer{ consumer: consumer, - queue: make(chan api.ContainerEvent), - stopCh: make(chan struct{}), - stop: sync.Once{}, } return &printer } -func (p *printer) Stop() { - p.stop.Do(func() { - close(p.stopCh) - for { - select { - case <-p.queue: - // purge the queue to free producers goroutines - // p.queue will be garbage collected - default: - return - } - } - }) -} - func (p *printer) HandleEvent(event api.ContainerEvent) { - select { - case <-p.stopCh: - return - default: - p.queue <- event - } -} - -func (p *printer) Run() error { - defer p.Stop() - - // containers we are tracking. Use true when container is running, false after we receive a stop|die signal - for { - select { - case <-p.stopCh: - return nil - case event := <-p.queue: - switch event.Type { - case api.ContainerEventExited, api.ContainerEventStopped, api.ContainerEventRecreated, api.ContainerEventRestarted: - p.consumer.Status(event.Source, fmt.Sprintf("exited with code %d", event.ExitCode)) - if event.Type == api.ContainerEventRecreated { - p.consumer.Status(event.Source, "has been recreated") - } - case api.ContainerEventLog, api.HookEventLog: - p.consumer.Log(event.Source, event.Line) - case api.ContainerEventErr: - p.consumer.Err(event.Source, event.Line) - } - } + switch event.Type { + case api.ContainerEventExited: + p.consumer.Status(event.Source, fmt.Sprintf("exited with code %d", event.ExitCode)) + case api.ContainerEventRecreated: + p.consumer.Status(event.Container.Labels[api.ContainerReplaceLabel], "has been recreated") + case api.ContainerEventLog, api.HookEventLog: + p.consumer.Log(event.Source, event.Line) + case api.ContainerEventErr: + p.consumer.Err(event.Source, event.Line) } } diff --git a/pkg/compose/up.go b/pkg/compose/up.go index 85fbe1ef6..04cf06e99 100644 --- a/pkg/compose/up.go +++ b/pkg/compose/up.go @@ -18,6 +18,7 @@ package compose import ( "context" + "errors" "fmt" "os" "os/signal" @@ -31,6 +32,7 @@ import ( "github.com/docker/compose/v2/internal/tracing" "github.com/docker/compose/v2/pkg/api" "github.com/docker/compose/v2/pkg/progress" + "github.com/docker/docker/errdefs" "github.com/eiannone/keyboard" "github.com/hashicorp/go-multierror" "github.com/sirupsen/logrus" @@ -166,7 +168,12 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options } } - monitor := newMonitor(s.apiClient(), project) + monitor := newMonitor(s.apiClient(), project.Name) + if len(options.Start.Services) > 0 { + monitor.withServices(options.Start.Services) + } else { + monitor.withServices(project.ServiceNames()) + } monitor.withListener(printer.HandleEvent) var exitCode int @@ -175,9 +182,12 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options // detect first container to exit to trigger application shutdown monitor.withListener(func(event api.ContainerEvent) { if once && event.Type == api.ContainerEventExited { + if options.Start.OnExit == api.CascadeFail && event.ExitCode == 0 { + return + } + once = false exitCode = event.ExitCode - printer.Stop() - _, _ = fmt.Fprintln(s.stdinfo(), "Aborting on container exit...") + _, _ = fmt.Fprintln(s.stdinfo(), progress.ErrorColor("Aborting on container exit...")) eg.Go(func() error { return progress.RunWithLog(context.WithoutCancel(ctx), func(ctx context.Context) error { return s.stop(ctx, project.Name, api.StopOptions{ @@ -186,7 +196,6 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options }, printer.HandleEvent) }, s.stdinfo(), logConsumer) }) - once = false } }) } @@ -203,29 +212,35 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options } monitor.withListener(func(event api.ContainerEvent) { - mustAttach := false - switch event.Type { - case api.ContainerEventCreated: - // A container has been added to the application (scale) - mustAttach = true - case api.ContainerEventStarted: - // A container is restarting - need to re-attach - mustAttach = event.Restarting + if event.Type != api.ContainerEventStarted { + return } - if mustAttach { + if event.Restarting || event.Container.Labels[api.ContainerReplaceLabel] != "" { eg.Go(func() error { - // FIXME as container already started, we might miss the very first logs - return s.doAttachContainer(ctx, event.Service, event.ID, event.Source, printer.HandleEvent) + ctr, err := s.apiClient().ContainerInspect(ctx, event.ID) + if err != nil { + return err + } + + err = s.doLogContainer(ctx, options.Start.Attach, event.Source, ctr, api.LogOptions{ + Follow: true, + Since: ctr.State.StartedAt, + }) + var notImplErr errdefs.ErrNotImplemented + if errors.As(err, ¬ImplErr) { + // container may be configured with logging_driver: none + // as container already started, we might miss the very first logs. But still better than none + return s.doAttachContainer(ctx, event.Service, event.ID, event.Source, printer.HandleEvent) + } + return err }) } }) eg.Go(func() error { err := monitor.Start(ctx) - fmt.Println("monitor complete") // Signal for the signal-handler goroutines to stop close(doneCh) - printer.Stop() return err })