pkg/compose: composeService.Up: rewrite without go-multierror

- Use a errgroup.Group and add a appendErr utility to not fail-fast,
  but collect errors.
- replace doneCh for a global context to cancel goroutines
- Commented out attachCtx code, as it didn't appear to be functional
  (as it wouldn't be cancelled).

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
This commit is contained in:
Sebastiaan van Stijn 2025-08-22 18:16:39 +02:00 committed by Nicolas De loof
parent 6078b4d99d
commit 1d69f4a68c
2 changed files with 60 additions and 33 deletions

2
go.mod
View File

@ -23,7 +23,6 @@ require (
github.com/eiannone/keyboard v0.0.0-20220611211555-0d226195f203 github.com/eiannone/keyboard v0.0.0-20220611211555-0d226195f203
github.com/fsnotify/fsevents v0.2.0 github.com/fsnotify/fsevents v0.2.0
github.com/google/go-cmp v0.7.0 github.com/google/go-cmp v0.7.0
github.com/hashicorp/go-multierror v1.1.1
github.com/hashicorp/go-version v1.7.0 github.com/hashicorp/go-version v1.7.0
github.com/jonboulle/clockwork v0.5.0 github.com/jonboulle/clockwork v0.5.0
github.com/mattn/go-shellwords v1.0.12 github.com/mattn/go-shellwords v1.0.12
@ -117,6 +116,7 @@ require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/in-toto/in-toto-golang v0.9.0 // indirect github.com/in-toto/in-toto-golang v0.9.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf // indirect github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf // indirect

View File

@ -18,10 +18,12 @@ package compose
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"os" "os"
"os/signal" "os/signal"
"slices" "slices"
"sync"
"sync/atomic" "sync/atomic"
"syscall" "syscall"
@ -33,7 +35,6 @@ import (
"github.com/docker/compose/v2/pkg/api" "github.com/docker/compose/v2/pkg/api"
"github.com/docker/compose/v2/pkg/progress" "github.com/docker/compose/v2/pkg/progress"
"github.com/eiannone/keyboard" "github.com/eiannone/keyboard"
"github.com/hashicorp/go-multierror"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
@ -61,14 +62,11 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
return err return err
} }
var eg multierror.Group
// if we get a second signal during shutdown, we kill the services // if we get a second signal during shutdown, we kill the services
// immediately, so the channel needs to have sufficient capacity or // immediately, so the channel needs to have sufficient capacity or
// we might miss a signal while setting up the second channel read // we might miss a signal while setting up the second channel read
// (this is also why signal.Notify is used vs signal.NotifyContext) // (this is also why signal.Notify is used vs signal.NotifyContext)
signalChan := make(chan os.Signal, 2) signalChan := make(chan os.Signal, 2)
defer close(signalChan)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
defer signal.Stop(signalChan) defer signal.Stop(signalChan)
var isTerminated atomic.Bool var isTerminated atomic.Bool
@ -103,26 +101,45 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
printer := newLogPrinter(logConsumer) printer := newLogPrinter(logConsumer)
doneCh := make(chan bool) // global context to handle canceling goroutines
globalCtx, cancel := context.WithCancel(ctx)
defer cancel()
var (
eg errgroup.Group
mu sync.Mutex
errs []error
)
appendErr := func(err error) {
if err != nil {
mu.Lock()
errs = append(errs, err)
mu.Unlock()
}
}
eg.Go(func() error { eg.Go(func() error {
first := true first := true
gracefulTeardown := func() { gracefulTeardown := func() {
first = false first = false
fmt.Println("Gracefully Stopping... press Ctrl+C again to force") fmt.Println("Gracefully Stopping... press Ctrl+C again to force")
eg.Go(func() error { eg.Go(func() error {
return progress.RunWithLog(context.WithoutCancel(ctx), func(ctx context.Context) error { err := progress.RunWithLog(context.WithoutCancel(globalCtx), func(c context.Context) error {
return s.stop(ctx, project.Name, api.StopOptions{ return s.stop(c, project.Name, api.StopOptions{
Services: options.Create.Services, Services: options.Create.Services,
Project: project, Project: project,
}, printer.HandleEvent) }, printer.HandleEvent)
}, s.stdinfo(), logConsumer) }, s.stdinfo(), logConsumer)
appendErr(err)
return nil
}) })
isTerminated.Store(true) isTerminated.Store(true)
} }
for { for {
select { select {
case <-doneCh: case <-globalCtx.Done():
if watcher != nil { if watcher != nil {
return watcher.Stop() return watcher.Stop()
} }
@ -133,12 +150,12 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
} }
case <-signalChan: case <-signalChan:
if first { if first {
keyboard.Close() //nolint:errcheck _ = keyboard.Close()
gracefulTeardown() gracefulTeardown()
break break
} }
eg.Go(func() error { eg.Go(func() error {
err := s.kill(context.WithoutCancel(ctx), project.Name, api.KillOptions{ err := s.kill(context.WithoutCancel(globalCtx), project.Name, api.KillOptions{
Services: options.Create.Services, Services: options.Create.Services,
Project: project, Project: project,
All: true, All: true,
@ -148,18 +165,21 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
return nil return nil
} }
return err appendErr(err)
return nil
}) })
return nil return nil
case event := <-kEvents: case event := <-kEvents:
navigationMenu.HandleKeyEvents(ctx, event, project, options) navigationMenu.HandleKeyEvents(globalCtx, event, project, options)
} }
} }
}) })
if options.Start.Watch && watcher != nil { if options.Start.Watch && watcher != nil {
err = watcher.Start(ctx) if err := watcher.Start(globalCtx); err != nil {
if err != nil { // cancel the global context to terminate background goroutines
cancel()
_ = eg.Wait()
return err return err
} }
} }
@ -186,12 +206,14 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
exitCode = event.ExitCode exitCode = event.ExitCode
_, _ = fmt.Fprintln(s.stdinfo(), progress.ErrorColor("Aborting on container exit...")) _, _ = fmt.Fprintln(s.stdinfo(), progress.ErrorColor("Aborting on container exit..."))
eg.Go(func() error { eg.Go(func() error {
return progress.RunWithLog(context.WithoutCancel(ctx), func(ctx context.Context) error { err := progress.RunWithLog(context.WithoutCancel(globalCtx), func(c context.Context) error {
return s.stop(ctx, project.Name, api.StopOptions{ return s.stop(c, project.Name, api.StopOptions{
Services: options.Create.Services, Services: options.Create.Services,
Project: project, Project: project,
}, printer.HandleEvent) }, printer.HandleEvent)
}, s.stdinfo(), logConsumer) }, s.stdinfo(), logConsumer)
appendErr(err)
return nil
}) })
} }
}) })
@ -208,13 +230,10 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
}) })
} }
// use an independent context tied to the errgroup for background attach operations containers, err := s.attach(globalCtx, project, printer.HandleEvent, options.Start.AttachTo)
// the primary context is still used for other operations
// this means that once any attach operation fails, all other attaches are cancelled,
// but an attach failing won't interfere with the rest of the start
_, attachCtx := errgroup.WithContext(ctx)
containers, err := s.attach(attachCtx, project, printer.HandleEvent, options.Start.AttachTo)
if err != nil { if err != nil {
cancel()
_ = eg.Wait()
return err return err
} }
attached := make([]string, len(containers)) attached := make([]string, len(containers))
@ -230,38 +249,46 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
return return
} }
eg.Go(func() error { eg.Go(func() error {
ctr, err := s.apiClient().ContainerInspect(ctx, event.ID) ctr, err := s.apiClient().ContainerInspect(globalCtx, event.ID)
if err != nil { if err != nil {
return err appendErr(err)
return nil
} }
err = s.doLogContainer(ctx, options.Start.Attach, event.Source, ctr, api.LogOptions{ err = s.doLogContainer(globalCtx, options.Start.Attach, event.Source, ctr, api.LogOptions{
Follow: true, Follow: true,
Since: ctr.State.StartedAt, Since: ctr.State.StartedAt,
}) })
if errdefs.IsNotImplemented(err) { if errdefs.IsNotImplemented(err) {
// container may be configured with logging_driver: none // container may be configured with logging_driver: none
// as container already started, we might miss the very first logs. But still better than none // as container already started, we might miss the very first logs. But still better than none
return s.doAttachContainer(ctx, event.Service, event.ID, event.Source, printer.HandleEvent) err := s.doAttachContainer(globalCtx, event.Service, event.ID, event.Source, printer.HandleEvent)
appendErr(err)
return nil
} }
return err appendErr(err)
return nil
}) })
}) })
eg.Go(func() error { eg.Go(func() error {
err := monitor.Start(context.Background()) err := monitor.Start(globalCtx)
// Signal for the signal-handler goroutines to stop // cancel the global context to terminate signal-handler goroutines
close(doneCh) cancel()
return err appendErr(err)
return nil
}) })
// We use the parent context without cancellation as we manage sigterm to stop the stack // We use the parent context without cancellation as we manage sigterm to stop the stack
err = s.start(context.WithoutCancel(ctx), project.Name, options.Start, printer.HandleEvent) err = s.start(context.WithoutCancel(ctx), project.Name, options.Start, printer.HandleEvent)
if err != nil && !isTerminated.Load() { // Ignore error if the process is terminated if err != nil && !isTerminated.Load() { // Ignore error if the process is terminated
cancel()
_ = eg.Wait()
return err return err
} }
err = eg.Wait().ErrorOrNil() _ = eg.Wait()
err = errors.Join(errs...)
if exitCode != 0 { if exitCode != 0 {
errMsg := "" errMsg := ""
if err != nil { if err != nil {