From 6f6ae071d64a514137350c86c2d1a8d97bf686ee Mon Sep 17 00:00:00 2001 From: Nicolas De Loof Date: Wed, 26 May 2021 12:17:37 +0200 Subject: [PATCH] improve container events watch robustness Signed-off-by: Nicolas De Loof --- api/compose/api.go | 4 +- api/compose/printer.go | 15 +- local/compose/start.go | 151 +++++++++--------- local/e2e/compose/compose_test.go | 9 +- .../fixtures/attach-restart/compose.yaml | 5 +- 5 files changed, 96 insertions(+), 88 deletions(-) diff --git a/api/compose/api.go b/api/compose/api.go index 41a250f4b..115dbc09d 100644 --- a/api/compose/api.go +++ b/api/compose/api.go @@ -379,7 +379,9 @@ type ContainerEvent struct { Container string Service string Line string - ExitCode int + // ContainerEventExit only + ExitCode int + Restarting bool } const ( diff --git a/api/compose/printer.go b/api/compose/printer.go index 21665f25b..31c8f1d35 100644 --- a/api/compose/printer.go +++ b/api/compose/printer.go @@ -62,19 +62,22 @@ func (p *printer) Run(cascadeStop bool, exitCodeFrom string, stopFn func() error containers := map[string]struct{}{} for { event := <-p.queue + container := event.Container switch event.Type { case UserCancel: aborting = true case ContainerEventAttach: - if _, ok := containers[event.Container]; ok { + if _, ok := containers[container]; ok { continue } - containers[event.Container] = struct{}{} - p.consumer.Register(event.Container) + containers[container] = struct{}{} + p.consumer.Register(container) case ContainerEventExit: - delete(containers, event.Container) + if !event.Restarting { + delete(containers, container) + } if !aborting { - p.consumer.Status(event.Container, fmt.Sprintf("exited with code %d", event.ExitCode)) + p.consumer.Status(container, fmt.Sprintf("exited with code %d", event.ExitCode)) } if cascadeStop { if !aborting { @@ -99,7 +102,7 @@ func (p *printer) Run(cascadeStop bool, exitCodeFrom string, stopFn func() error } case ContainerEventLog: if !aborting { - p.consumer.Log(event.Container, event.Service, event.Line) + p.consumer.Log(container, event.Service, event.Line) } } } diff --git a/local/compose/start.go b/local/compose/start.go index 626007048..f09a4368b 100644 --- a/local/compose/start.go +++ b/local/compose/start.go @@ -20,68 +20,30 @@ import ( "context" "github.com/docker/compose-cli/api/compose" - convert "github.com/docker/compose-cli/local/moby" "github.com/docker/compose-cli/utils" "github.com/compose-spec/compose-go/types" moby "github.com/docker/docker/api/types" - "github.com/docker/docker/api/types/container" + "github.com/pkg/errors" "golang.org/x/sync/errgroup" ) func (s *composeService) Start(ctx context.Context, project *types.Project, options compose.StartOptions) error { + listener := options.Attach if len(options.Services) == 0 { options.Services = project.ServiceNames() } - var containers Containers - if options.Attach != nil { - attached, err := s.attach(ctx, project, options.Attach, options.Services) + eg, ctx := errgroup.WithContext(ctx) + if listener != nil { + attached, err := s.attach(ctx, project, listener, options.Services) if err != nil { return err } - containers = attached - // Watch events to capture container restart and re-attach - go func() { - watched := map[string]struct{}{} - for _, c := range containers { - watched[c.ID] = struct{}{} - } - s.Events(ctx, project.Name, compose.EventsOptions{ // nolint: errcheck - Services: options.Services, - Consumer: func(event compose.Event) error { - if event.Status == "start" { - inspect, err := s.apiClient.ContainerInspect(ctx, event.Container) - if err != nil { - return err - } - - container := moby.Container{ - ID: event.Container, - Names: []string{inspect.Name}, - State: convert.ContainerRunning, - Labels: map[string]string{ - projectLabel: project.Name, - serviceLabel: event.Service, - }, - } - - // Just ignore errors when reattaching to already crashed containers - s.attachContainer(ctx, container, options.Attach, project) // nolint: errcheck - - if _, ok := watched[inspect.ID]; !ok { - // a container has been added to the service, see --scale option - watched[inspect.ID] = struct{}{} - go func() { - s.waitContainer(container, options.Attach) // nolint: errcheck - }() - } - } - return nil - }, - }) - }() + eg.Go(func() error { + return s.watchContainers(project, options.Services, listener, attached) + }) } err := InDependencyOrder(ctx, project, func(c context.Context, service types.ServiceConfig) error { @@ -93,34 +55,79 @@ func (s *composeService) Start(ctx context.Context, project *types.Project, opti if err != nil { return err } - - if options.Attach == nil { - return nil - } - - eg, ctx := errgroup.WithContext(ctx) - for _, c := range containers { - c := c - eg.Go(func() error { - return s.waitContainer(c, options.Attach) - }) - } return eg.Wait() } -func (s *composeService) waitContainer(c moby.Container, listener compose.ContainerEventListener) error { - statusC, errC := s.apiClient.ContainerWait(context.Background(), c.ID, container.WaitConditionNotRunning) - name := getContainerNameWithoutProject(c) - select { - case status := <-statusC: - listener(compose.ContainerEvent{ - Type: compose.ContainerEventExit, - Container: name, - Service: c.Labels[serviceLabel], - ExitCode: int(status.StatusCode), - }) - return nil - case err := <-errC: - return err +// watchContainers uses engine events to capture container start/die and notify ContainerEventListener +func (s *composeService) watchContainers(project *types.Project, services []string, listener compose.ContainerEventListener, containers Containers) error { + watched := map[string]int{} + for _, c := range containers { + watched[c.ID] = 0 } + + ctx, stop := context.WithCancel(context.Background()) + err := s.Events(ctx, project.Name, compose.EventsOptions{ + Services: services, + Consumer: func(event compose.Event) error { + inspected, err := s.apiClient.ContainerInspect(ctx, event.Container) + if err != nil { + return err + } + container := moby.Container{ + ID: inspected.ID, + Names: []string{inspected.Name}, + Labels: inspected.Config.Labels, + } + name := getContainerNameWithoutProject(container) + + if event.Status == "die" { + restarted := watched[container.ID] + watched[container.ID] = restarted + 1 + // Container terminated. + willRestart := inspected.HostConfig.RestartPolicy.MaximumRetryCount > restarted + + listener(compose.ContainerEvent{ + Type: compose.ContainerEventExit, + Container: name, + Service: container.Labels[serviceLabel], + ExitCode: inspected.State.ExitCode, + Restarting: willRestart, + }) + + if !willRestart { + // we're done with this one + delete(watched, container.ID) + } + + if len(watched) == 0 { + // all project containers stopped, we're done + stop() + } + return nil + } + + if event.Status == "start" { + count, ok := watched[container.ID] + mustAttach := ok && count > 0 // Container restarted, need to re-attach + if !ok { + // A new container has just been added to service by scale + watched[container.ID] = 0 + mustAttach = true + } + if mustAttach { + // Container restarted, need to re-attach + err := s.attachContainer(ctx, container, listener, project) + if err != nil { + return err + } + } + } + + return nil + }, + }) + if errors.Is(ctx.Err(), context.Canceled) { + return nil + } + return err } diff --git a/local/e2e/compose/compose_test.go b/local/e2e/compose/compose_test.go index 459e96c41..e90db4f3b 100644 --- a/local/e2e/compose/compose_test.go +++ b/local/e2e/compose/compose_test.go @@ -27,6 +27,7 @@ import ( "testing" "time" + testify "github.com/stretchr/testify/assert" "gotest.tools/v3/assert" "gotest.tools/v3/icmd" @@ -197,10 +198,10 @@ func TestAttachRestart(t *testing.T) { c.WaitForCondition(func() (bool, string) { debug := res.Combined() - return strings.Count(res.Stdout(), "another_1 exited with code 1") == 3, fmt.Sprintf("'another_1 exited with code 1' not found 3 times in : \n%s\n", debug) + return strings.Count(res.Stdout(), "failing_1 exited with code 1") == 3, fmt.Sprintf("'failing_1 exited with code 1' not found 3 times in : \n%s\n", debug) }, 2*time.Minute, 2*time.Second) - assert.Equal(t, strings.Count(res.Stdout(), "another_1 | world"), 3, res.Combined()) + assert.Equal(t, strings.Count(res.Stdout(), "failing_1 | world"), 3, res.Combined()) } func TestInitContainer(t *testing.T) { @@ -208,7 +209,5 @@ func TestInitContainer(t *testing.T) { res := c.RunDockerOrExitError("compose", "--ansi=never", "--project-directory", "./fixtures/init-container", "up") defer c.RunDockerOrExitError("compose", "-p", "init-container", "down") - output := res.Stdout() - - assert.Assert(t, strings.Contains(output, "foo_1 | hello\nbar_1 | world"), res.Combined()) + testify.Regexp(t, "foo_1 | hello(?m:.*)bar_1 | world", res.Stdout()) } diff --git a/local/e2e/compose/fixtures/attach-restart/compose.yaml b/local/e2e/compose/fixtures/attach-restart/compose.yaml index eb364bda7..d92143677 100644 --- a/local/e2e/compose/fixtures/attach-restart/compose.yaml +++ b/local/e2e/compose/fixtures/attach-restart/compose.yaml @@ -1,8 +1,5 @@ services: - simple: - image: alpine - command: sh -c "sleep infinity" - another: + failing: image: alpine command: sh -c "sleep 0.1 && echo world && /bin/false" deploy: