detect required service are gone to stop watching

explicit API to stop the log printer

Signed-off-by: Nicolas De Loof <nicolas.deloof@gmail.com>
This commit is contained in:
Nicolas De Loof 2022-12-07 14:43:07 +01:00 committed by Nicolas De loof
parent 6b4ad0d1db
commit 804d7163a7
8 changed files with 73 additions and 45 deletions

View File

@ -214,6 +214,7 @@ func runUp(ctx context.Context, backend api.Service, createOptions createOptions
ExitCodeFrom: upOptions.exitCodeFrom, ExitCodeFrom: upOptions.exitCodeFrom,
CascadeStop: upOptions.cascadeStop, CascadeStop: upOptions.cascadeStop,
Wait: upOptions.wait, Wait: upOptions.wait,
Services: services,
}, },
}) })
} }

View File

@ -141,3 +141,14 @@ func (containers Containers) sorted() Containers {
}) })
return containers return containers
} }
func (containers Containers) remove(id string) Containers {
for i, c := range containers {
if c.ID == id {
l := len(containers) - 1
containers[i] = containers[l]
return containers[:l]
}
}
return containers
}

View File

@ -65,19 +65,16 @@ func (s *composeService) Logs(
if options.Follow { if options.Follow {
printer := newLogPrinter(consumer) printer := newLogPrinter(consumer)
eg.Go(func() error { for _, c := range containers {
for _, c := range containers { printer.HandleEvent(api.ContainerEvent{
printer.HandleEvent(api.ContainerEvent{ Type: api.ContainerEventAttach,
Type: api.ContainerEventAttach, Container: getContainerNameWithoutProject(c),
Container: getContainerNameWithoutProject(c), Service: c.Labels[api.ServiceLabel],
Service: c.Labels[api.ServiceLabel], })
}) }
}
return nil
})
eg.Go(func() error { eg.Go(func() error {
return s.watchContainers(ctx, projectName, options.Services, printer.HandleEvent, containers, func(c types.Container) error { return s.watchContainers(ctx, projectName, options.Services, nil, printer.HandleEvent, containers, func(c types.Container) error {
printer.HandleEvent(api.ContainerEvent{ printer.HandleEvent(api.ContainerEvent{
Type: api.ContainerEventAttach, Type: api.ContainerEventAttach,
Container: getContainerNameWithoutProject(c), Container: getContainerNameWithoutProject(c),

View File

@ -28,19 +28,23 @@ type logPrinter interface {
HandleEvent(event api.ContainerEvent) HandleEvent(event api.ContainerEvent)
Run(ctx context.Context, cascadeStop bool, exitCodeFrom string, stopFn func() error) (int, error) Run(ctx context.Context, cascadeStop bool, exitCodeFrom string, stopFn func() error) (int, error)
Cancel() Cancel()
Stop()
} }
type printer struct { type printer struct {
queue chan api.ContainerEvent queue chan api.ContainerEvent
consumer api.LogConsumer consumer api.LogConsumer
stopCh chan struct{}
} }
// newLogPrinter builds a LogPrinter passing containers logs to LogConsumer // newLogPrinter builds a LogPrinter passing containers logs to LogConsumer
func newLogPrinter(consumer api.LogConsumer) logPrinter { func newLogPrinter(consumer api.LogConsumer) logPrinter {
queue := make(chan api.ContainerEvent) queue := make(chan api.ContainerEvent)
stopCh := make(chan struct{}, 1) // printer MAY stop on his own, so Stop MUST not be blocking
printer := printer{ printer := printer{
consumer: consumer, consumer: consumer,
queue: queue, queue: queue,
stopCh: stopCh,
} }
return &printer return &printer
} }
@ -51,6 +55,10 @@ func (p *printer) Cancel() {
} }
} }
func (p *printer) Stop() {
p.stopCh <- struct{}{}
}
func (p *printer) HandleEvent(event api.ContainerEvent) { func (p *printer) HandleEvent(event api.ContainerEvent) {
p.queue <- event p.queue <- event
} }
@ -64,6 +72,8 @@ func (p *printer) Run(ctx context.Context, cascadeStop bool, exitCodeFrom string
containers := map[string]struct{}{} containers := map[string]struct{}{}
for { for {
select { select {
case <-p.stopCh:
return exitCode, nil
case <-ctx.Done(): case <-ctx.Done():
return exitCode, ctx.Err() return exitCode, ctx.Err()
case event := <-p.queue: case event := <-p.queue:

View File

@ -21,6 +21,7 @@ import (
"strings" "strings"
"github.com/compose-spec/compose-go/types" "github.com/compose-spec/compose-go/types"
"github.com/docker/compose/v2/pkg/utils"
moby "github.com/docker/docker/api/types" moby "github.com/docker/docker/api/types"
"github.com/pkg/errors" "github.com/pkg/errors"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
@ -50,13 +51,6 @@ func (s *composeService) start(ctx context.Context, projectName string, options
} }
} }
if len(options.Services) > 0 {
err := project.ForServices(options.Services)
if err != nil {
return err
}
}
eg, ctx := errgroup.WithContext(ctx) eg, ctx := errgroup.WithContext(ctx)
if listener != nil { if listener != nil {
attached, err := s.attach(ctx, project, listener, options.AttachTo) attached, err := s.attach(ctx, project, listener, options.AttachTo)
@ -65,7 +59,7 @@ func (s *composeService) start(ctx context.Context, projectName string, options
} }
eg.Go(func() error { eg.Go(func() error {
return s.watchContainers(context.Background(), project.Name, options.AttachTo, listener, attached, func(container moby.Container) error { return s.watchContainers(context.Background(), project.Name, options.AttachTo, options.Services, listener, attached, func(container moby.Container) error {
return s.attachContainer(ctx, container, listener) return s.attachContainer(ctx, container, listener)
}) })
}) })
@ -116,9 +110,20 @@ func getDependencyCondition(service types.ServiceConfig, project *types.Project)
type containerWatchFn func(container moby.Container) error type containerWatchFn func(container moby.Container) error
// watchContainers uses engine events to capture container start/die and notify ContainerEventListener // watchContainers uses engine events to capture container start/die and notify ContainerEventListener
func (s *composeService) watchContainers(ctx context.Context, projectName string, services []string, listener api.ContainerEventListener, containers Containers, onStart containerWatchFn) error { func (s *composeService) watchContainers(ctx context.Context, projectName string, services, required []string,
watched := map[string]int{} listener api.ContainerEventListener, containers Containers, onStart containerWatchFn) error {
if len(required) == 0 {
required = services
}
var (
expected Containers
watched = map[string]int{}
)
for _, c := range containers { for _, c := range containers {
if utils.Contains(required, c.Labels[api.ServiceLabel]) {
expected = append(expected, c)
}
watched[c.ID] = 0 watched[c.ID] = 0
} }
@ -143,22 +148,18 @@ func (s *composeService) watchContainers(ctx context.Context, projectName string
} }
name := getContainerNameWithoutProject(container) name := getContainerNameWithoutProject(container)
if event.Status == "stop" { service := container.Labels[api.ServiceLabel]
switch event.Status {
case "stop":
listener(api.ContainerEvent{ listener(api.ContainerEvent{
Type: api.ContainerEventStopped, Type: api.ContainerEventStopped,
Container: name, Container: name,
Service: container.Labels[api.ServiceLabel], Service: service,
}) })
delete(watched, container.ID) delete(watched, container.ID)
if len(watched) == 0 { expected = expected.remove(container.ID)
// all project containers stopped, we're done case "die":
stop()
}
return nil
}
if event.Status == "die" {
restarted := watched[container.ID] restarted := watched[container.ID]
watched[container.ID] = restarted + 1 watched[container.ID] = restarted + 1
// Container terminated. // Container terminated.
@ -167,7 +168,7 @@ func (s *composeService) watchContainers(ctx context.Context, projectName string
listener(api.ContainerEvent{ listener(api.ContainerEvent{
Type: api.ContainerEventExit, Type: api.ContainerEventExit,
Container: name, Container: name,
Service: container.Labels[api.ServiceLabel], Service: service,
ExitCode: inspected.State.ExitCode, ExitCode: inspected.State.ExitCode,
Restarting: willRestart, Restarting: willRestart,
}) })
@ -175,21 +176,15 @@ func (s *composeService) watchContainers(ctx context.Context, projectName string
if !willRestart { if !willRestart {
// we're done with this one // we're done with this one
delete(watched, container.ID) delete(watched, container.ID)
expected = expected.remove(container.ID)
} }
case "start":
if len(watched) == 0 {
// all project containers stopped, we're done
stop()
}
return nil
}
if event.Status == "start" {
count, ok := watched[container.ID] count, ok := watched[container.ID]
mustAttach := ok && count > 0 // Container restarted, need to re-attach mustAttach := ok && count > 0 // Container restarted, need to re-attach
if !ok { if !ok {
// A new container has just been added to service by scale // A new container has just been added to service by scale
watched[container.ID] = 0 watched[container.ID] = 0
expected = append(expected, container)
mustAttach = true mustAttach = true
} }
if mustAttach { if mustAttach {
@ -200,7 +195,9 @@ func (s *composeService) watchContainers(ctx context.Context, projectName string
} }
} }
} }
if len(expected) == 0 {
stop()
}
return nil return nil
}, },
}) })

View File

@ -23,11 +23,10 @@ import (
"os/signal" "os/signal"
"syscall" "syscall"
"github.com/docker/compose/v2/pkg/api"
"github.com/docker/compose/v2/pkg/progress"
"github.com/compose-spec/compose-go/types" "github.com/compose-spec/compose-go/types"
"github.com/docker/cli/cli" "github.com/docker/cli/cli"
"github.com/docker/compose/v2/pkg/api"
"github.com/docker/compose/v2/pkg/progress"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
@ -92,6 +91,7 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
return err return err
} }
printer.Stop()
err = eg.Wait() err = eg.Wait()
if exitCode != 0 { if exitCode != 0 {
errMsg := "" errMsg := ""

View File

@ -259,3 +259,13 @@ networks:
name: compose-e2e-convert-interpolate_default`, filepath.Join(wd, "fixtures", "simple-build-test", "nginx-build")), ExitCode: 0}) name: compose-e2e-convert-interpolate_default`, filepath.Join(wd, "fixtures", "simple-build-test", "nginx-build")), ExitCode: 0})
}) })
} }
func TestStopWithDependeciesAttached(t *testing.T) {
const projectName = "compose-e2e-stop-with-deps"
c := NewParallelCLI(t, WithEnv("COMMAND=echo hello"))
t.Run("up", func(t *testing.T) {
res := c.RunDockerComposeCmd(t, "-f", "./fixtures/dependencies/compose.yaml", "-p", projectName, "up", "--attach-dependencies", "foo")
res.Assert(t, icmd.Expected{Out: "exited with code 0"})
})
}

View File

@ -1,8 +1,10 @@
services: services:
foo: foo:
image: nginx:alpine image: nginx:alpine
command: "${COMMAND}"
depends_on: depends_on:
- bar - bar
bar: bar:
image: nginx:alpine image: nginx:alpine
scale: 2