diff --git a/api/compose/printer.go b/api/compose/printer.go new file mode 100644 index 000000000..21665f25b --- /dev/null +++ b/api/compose/printer.go @@ -0,0 +1,106 @@ +/* + Copyright 2020 Docker Compose CLI authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package compose + +import ( + "fmt" + + "github.com/sirupsen/logrus" +) + +// LogPrinter watch application containers an collect their logs +type LogPrinter interface { + HandleEvent(event ContainerEvent) + Run(cascadeStop bool, exitCodeFrom string, stopFn func() error) (int, error) + Cancel() +} + +// NewLogPrinter builds a LogPrinter passing containers logs to LogConsumer +func NewLogPrinter(consumer LogConsumer) LogPrinter { + queue := make(chan ContainerEvent) + printer := printer{ + consumer: consumer, + queue: queue, + } + return &printer +} + +func (p *printer) Cancel() { + p.queue <- ContainerEvent{ + Type: UserCancel, + } +} + +type printer struct { + queue chan ContainerEvent + consumer LogConsumer +} + +func (p *printer) HandleEvent(event ContainerEvent) { + p.queue <- event +} + +func (p *printer) Run(cascadeStop bool, exitCodeFrom string, stopFn func() error) (int, error) { + var ( + aborting bool + exitCode int + ) + containers := map[string]struct{}{} + for { + event := <-p.queue + switch event.Type { + case UserCancel: + aborting = true + case ContainerEventAttach: + if _, ok := containers[event.Container]; ok { + continue + } + containers[event.Container] = struct{}{} + p.consumer.Register(event.Container) + case ContainerEventExit: + delete(containers, event.Container) + if !aborting { + p.consumer.Status(event.Container, fmt.Sprintf("exited with code %d", event.ExitCode)) + } + if cascadeStop { + if !aborting { + aborting = true + fmt.Println("Aborting on container exit...") + err := stopFn() + if err != nil { + return 0, err + } + } + if exitCodeFrom == "" { + exitCodeFrom = event.Service + } + if exitCodeFrom == event.Service { + logrus.Error(event.ExitCode) + exitCode = event.ExitCode + } + } + if len(containers) == 0 { + // Last container terminated, done + return exitCode, nil + } + case ContainerEventLog: + if !aborting { + p.consumer.Log(event.Container, event.Service, event.Line) + } + } + } +} diff --git a/cli/cmd/compose/up.go b/cli/cmd/compose/up.go index bb3b80f7b..21465f64a 100644 --- a/cli/cmd/compose/up.go +++ b/cli/cmd/compose/up.go @@ -29,7 +29,6 @@ import ( "github.com/compose-spec/compose-go/types" "github.com/docker/cli/cli" - "github.com/sirupsen/logrus" "github.com/spf13/cobra" "golang.org/x/sync/errgroup" @@ -275,13 +274,12 @@ func runCreateStart(ctx context.Context, backend compose.Service, opts upOptions return nil } - queue := make(chan compose.ContainerEvent) - printer := printer{ - queue: queue, - } + consumer := formatter.NewLogConsumer(ctx, os.Stdout, !opts.noColor, !opts.noPrefix) + printer := compose.NewLogPrinter(consumer) signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) + stopFunc := func() error { ctx := context.Background() _, err := progress.Run(ctx, func(ctx context.Context) (string, error) { @@ -296,27 +294,21 @@ func runCreateStart(ctx context.Context, backend compose.Service, opts upOptions } go func() { <-signalChan - queue <- compose.ContainerEvent{ - Type: compose.UserCancel, - } + printer.Cancel() fmt.Println("Gracefully stopping... (press Ctrl+C again to force)") stopFunc() // nolint:errcheck }() - consumer := formatter.NewLogConsumer(ctx, os.Stdout, !opts.noColor, !opts.noPrefix) - var exitCode int eg, ctx := errgroup.WithContext(ctx) eg.Go(func() error { - code, err := printer.run(opts.cascadeStop, opts.exitCodeFrom, consumer, stopFunc) + code, err := printer.Run(opts.cascadeStop, opts.exitCodeFrom, stopFunc) exitCode = code return err }) err = backend.Start(ctx, project, compose.StartOptions{ - Attach: func(event compose.ContainerEvent) { - queue <- event - }, + Attach: printer.HandleEvent, Services: services, }) if err != nil { @@ -341,11 +333,7 @@ func setServiceScale(project *types.Project, name string, replicas int) error { if err != nil { return err } - if service.Deploy == nil { - service.Deploy = &types.DeployConfig{} - } - count := uint64(replicas) - service.Deploy.Replicas = &count + service.Scale = replicas project.Services[i] = service return nil } @@ -392,49 +380,3 @@ func setup(opts composeOptions, services []string) (*types.Project, error) { return project, nil } - -type printer struct { - queue chan compose.ContainerEvent -} - -func (p printer) run(cascadeStop bool, exitCodeFrom string, consumer compose.LogConsumer, stopFn func() error) (int, error) { - var aborting bool - var count int - for { - event := <-p.queue - switch event.Type { - case compose.UserCancel: - aborting = true - case compose.ContainerEventAttach: - consumer.Register(event.Container) - count++ - case compose.ContainerEventExit: - if !aborting { - consumer.Status(event.Container, fmt.Sprintf("exited with code %d", event.ExitCode)) - } - if cascadeStop { - if !aborting { - aborting = true - fmt.Println("Aborting on container exit...") - err := stopFn() - if err != nil { - return 0, err - } - } - if exitCodeFrom == "" || exitCodeFrom == event.Service { - logrus.Error(event.ExitCode) - return event.ExitCode, nil - } - } - count-- - if count == 0 { - // Last container terminated, done - return 0, nil - } - case compose.ContainerEventLog: - if !aborting { - consumer.Log(event.Container, event.Service, event.Line) - } - } - } -} diff --git a/cli/cmd/compose/up_test.go b/cli/cmd/compose/up_test.go index 566a68476..4042d9ca1 100644 --- a/cli/cmd/compose/up_test.go +++ b/cli/cmd/compose/up_test.go @@ -39,5 +39,5 @@ func TestApplyScaleOpt(t *testing.T) { assert.NilError(t, err) foo, err := p.GetService("foo") assert.NilError(t, err) - assert.Check(t, *foo.Deploy.Replicas == 2) + assert.Equal(t, foo.Scale, 2) } diff --git a/local/compose/attach.go b/local/compose/attach.go index ddef6eb40..fc8f67398 100644 --- a/local/compose/attach.go +++ b/local/compose/attach.go @@ -33,10 +33,6 @@ import ( ) func (s *composeService) attach(ctx context.Context, project *types.Project, listener compose.ContainerEventListener, selectedServices []string) (Containers, error) { - if len(selectedServices) == 0 { - selectedServices = project.ServiceNames() - } - containers, err := s.getContainers(ctx, project.Name, oneOffExclude, true, selectedServices...) if err != nil { return nil, err @@ -57,44 +53,6 @@ func (s *composeService) attach(ctx context.Context, project *types.Project, lis return nil, err } } - - // Watch events to capture container restart and re-attach - go func() { - crashed := map[string]struct{}{} - s.Events(ctx, project.Name, compose.EventsOptions{ // nolint: errcheck - Services: selectedServices, - Consumer: func(event compose.Event) error { - if event.Status == "die" { - crashed[event.Container] = struct{}{} - return nil - } - if _, ok := crashed[event.Container]; ok { - inspect, err := s.apiClient.ContainerInspect(ctx, event.Container) - if err != nil { - return err - } - - container := moby.Container{ - ID: event.Container, - Names: []string{inspect.Name}, - State: convert.ContainerRunning, - Labels: map[string]string{ - projectLabel: project.Name, - serviceLabel: event.Service, - }, - } - - // Just ignore errors when reattaching to already crashed containers - s.attachContainer(ctx, container, listener, project) // nolint: errcheck - delete(crashed, event.Container) - - s.waitContainer(container, listener) - } - return nil - }, - }) - }() - return containers, err } diff --git a/local/compose/down.go b/local/compose/down.go index 73582c36e..0d4a92201 100644 --- a/local/compose/down.go +++ b/local/compose/down.go @@ -191,18 +191,22 @@ func (s *composeService) removeVolume(ctx context.Context, id string, w progress } func (s *composeService) stopContainers(ctx context.Context, w progress.Writer, containers []moby.Container, timeout *time.Duration) error { + eg, ctx := errgroup.WithContext(ctx) for _, container := range containers { - toStop := container - eventName := getContainerProgressName(toStop) - w.Event(progress.StoppingEvent(eventName)) - err := s.apiClient.ContainerStop(ctx, toStop.ID, timeout) - if err != nil { - w.Event(progress.ErrorMessageEvent(eventName, "Error while Stopping")) - return err - } - w.Event(progress.StoppedEvent(eventName)) + container := container + eg.Go(func() error { + eventName := getContainerProgressName(container) + w.Event(progress.StoppingEvent(eventName)) + err := s.apiClient.ContainerStop(ctx, container.ID, timeout) + if err != nil { + w.Event(progress.ErrorMessageEvent(eventName, "Error while Stopping")) + return err + } + w.Event(progress.StoppedEvent(eventName)) + return nil + }) } - return nil + return eg.Wait() } func (s *composeService) removeContainers(ctx context.Context, w progress.Writer, containers []moby.Container, timeout *time.Duration) error { diff --git a/local/compose/start.go b/local/compose/start.go index b0f6354d4..626007048 100644 --- a/local/compose/start.go +++ b/local/compose/start.go @@ -20,12 +20,13 @@ import ( "context" "github.com/docker/compose-cli/api/compose" + convert "github.com/docker/compose-cli/local/moby" "github.com/docker/compose-cli/utils" "github.com/compose-spec/compose-go/types" moby "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" - "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" ) func (s *composeService) Start(ctx context.Context, project *types.Project, options compose.StartOptions) error { @@ -35,11 +36,52 @@ func (s *composeService) Start(ctx context.Context, project *types.Project, opti var containers Containers if options.Attach != nil { - c, err := s.attach(ctx, project, options.Attach, options.Services) + attached, err := s.attach(ctx, project, options.Attach, options.Services) if err != nil { return err } - containers = c + containers = attached + + // Watch events to capture container restart and re-attach + go func() { + watched := map[string]struct{}{} + for _, c := range containers { + watched[c.ID] = struct{}{} + } + s.Events(ctx, project.Name, compose.EventsOptions{ // nolint: errcheck + Services: options.Services, + Consumer: func(event compose.Event) error { + if event.Status == "start" { + inspect, err := s.apiClient.ContainerInspect(ctx, event.Container) + if err != nil { + return err + } + + container := moby.Container{ + ID: event.Container, + Names: []string{inspect.Name}, + State: convert.ContainerRunning, + Labels: map[string]string{ + projectLabel: project.Name, + serviceLabel: event.Service, + }, + } + + // Just ignore errors when reattaching to already crashed containers + s.attachContainer(ctx, container, options.Attach, project) // nolint: errcheck + + if _, ok := watched[inspect.ID]; !ok { + // a container has been added to the service, see --scale option + watched[inspect.ID] = struct{}{} + go func() { + s.waitContainer(container, options.Attach) // nolint: errcheck + }() + } + } + return nil + }, + }) + }() } err := InDependencyOrder(ctx, project, func(c context.Context, service types.ServiceConfig) error { @@ -56,16 +98,17 @@ func (s *composeService) Start(ctx context.Context, project *types.Project, opti return nil } + eg, ctx := errgroup.WithContext(ctx) for _, c := range containers { c := c - go func() { - s.waitContainer(c, options.Attach) - }() + eg.Go(func() error { + return s.waitContainer(c, options.Attach) + }) } - return nil + return eg.Wait() } -func (s *composeService) waitContainer(c moby.Container, listener compose.ContainerEventListener) { +func (s *composeService) waitContainer(c moby.Container, listener compose.ContainerEventListener) error { statusC, errC := s.apiClient.ContainerWait(context.Background(), c.ID, container.WaitConditionNotRunning) name := getContainerNameWithoutProject(c) select { @@ -76,7 +119,8 @@ func (s *composeService) waitContainer(c moby.Container, listener compose.Contai Service: c.Labels[serviceLabel], ExitCode: int(status.StatusCode), }) + return nil case err := <-errC: - logrus.Warnf("Unexpected API error for %s : %s", name, err.Error()) + return err } } diff --git a/local/compose/stop_test.go b/local/compose/stop_test.go index ea15b8a93..a3ce3818c 100644 --- a/local/compose/stop_test.go +++ b/local/compose/stop_test.go @@ -38,7 +38,7 @@ func TestStopTimeout(t *testing.T) { tested.apiClient = api ctx := context.Background() - api.EXPECT().ContainerList(ctx, projectFilterListOpt()).Return( + api.EXPECT().ContainerList(gomock.Any(), projectFilterListOpt()).Return( []moby.Container{ testContainer("service1", "123"), testContainer("service1", "456"), @@ -46,9 +46,9 @@ func TestStopTimeout(t *testing.T) { }, nil) timeout := time.Duration(2) * time.Second - api.EXPECT().ContainerStop(ctx, "123", &timeout).Return(nil) - api.EXPECT().ContainerStop(ctx, "456", &timeout).Return(nil) - api.EXPECT().ContainerStop(ctx, "789", &timeout).Return(nil) + api.EXPECT().ContainerStop(gomock.Any(), "123", &timeout).Return(nil) + api.EXPECT().ContainerStop(gomock.Any(), "456", &timeout).Return(nil) + api.EXPECT().ContainerStop(gomock.Any(), "789", &timeout).Return(nil) err := tested.Stop(ctx, &types.Project{ Name: testProject, diff --git a/utils/hash.go b/utils/hash.go index 2715f5b0d..ffe30cffb 100644 --- a/utils/hash.go +++ b/utils/hash.go @@ -29,6 +29,7 @@ func ServiceHash(o types.ServiceConfig) (string, error) { // remove the Build config when generating the service hash o.Build = nil o.PullPolicy = "" + o.Scale = 1 bytes, err := json.Marshal(o) if err != nil { return "", err