Merge pull request #1707 from docker/up_follow

This commit is contained in:
Nicolas De loof 2021-05-31 10:54:40 +02:00 committed by GitHub
commit 2315a1dad7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 226 additions and 163 deletions

View File

@ -379,7 +379,9 @@ type ContainerEvent struct {
Container string
Service string
Line string
ExitCode int
// ContainerEventExit only
ExitCode int
Restarting bool
}
const (

109
api/compose/printer.go Normal file
View File

@ -0,0 +1,109 @@
/*
Copyright 2020 Docker Compose CLI authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package compose
import (
"fmt"
"github.com/sirupsen/logrus"
)
// LogPrinter watch application containers an collect their logs
type LogPrinter interface {
HandleEvent(event ContainerEvent)
Run(cascadeStop bool, exitCodeFrom string, stopFn func() error) (int, error)
Cancel()
}
// NewLogPrinter builds a LogPrinter passing containers logs to LogConsumer
func NewLogPrinter(consumer LogConsumer) LogPrinter {
queue := make(chan ContainerEvent)
printer := printer{
consumer: consumer,
queue: queue,
}
return &printer
}
func (p *printer) Cancel() {
p.queue <- ContainerEvent{
Type: UserCancel,
}
}
type printer struct {
queue chan ContainerEvent
consumer LogConsumer
}
func (p *printer) HandleEvent(event ContainerEvent) {
p.queue <- event
}
func (p *printer) Run(cascadeStop bool, exitCodeFrom string, stopFn func() error) (int, error) {
var (
aborting bool
exitCode int
)
containers := map[string]struct{}{}
for {
event := <-p.queue
container := event.Container
switch event.Type {
case UserCancel:
aborting = true
case ContainerEventAttach:
if _, ok := containers[container]; ok {
continue
}
containers[container] = struct{}{}
p.consumer.Register(container)
case ContainerEventExit:
if !event.Restarting {
delete(containers, container)
}
if !aborting {
p.consumer.Status(container, fmt.Sprintf("exited with code %d", event.ExitCode))
}
if cascadeStop {
if !aborting {
aborting = true
fmt.Println("Aborting on container exit...")
err := stopFn()
if err != nil {
return 0, err
}
}
if exitCodeFrom == "" {
exitCodeFrom = event.Service
}
if exitCodeFrom == event.Service {
logrus.Error(event.ExitCode)
exitCode = event.ExitCode
}
}
if len(containers) == 0 {
// Last container terminated, done
return exitCode, nil
}
case ContainerEventLog:
if !aborting {
p.consumer.Log(container, event.Service, event.Line)
}
}
}
}

View File

@ -29,7 +29,6 @@ import (
"github.com/compose-spec/compose-go/types"
"github.com/docker/cli/cli"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
@ -275,13 +274,12 @@ func runCreateStart(ctx context.Context, backend compose.Service, opts upOptions
return nil
}
queue := make(chan compose.ContainerEvent)
printer := printer{
queue: queue,
}
consumer := formatter.NewLogConsumer(ctx, os.Stdout, !opts.noColor, !opts.noPrefix)
printer := compose.NewLogPrinter(consumer)
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
stopFunc := func() error {
ctx := context.Background()
_, err := progress.Run(ctx, func(ctx context.Context) (string, error) {
@ -296,27 +294,21 @@ func runCreateStart(ctx context.Context, backend compose.Service, opts upOptions
}
go func() {
<-signalChan
queue <- compose.ContainerEvent{
Type: compose.UserCancel,
}
printer.Cancel()
fmt.Println("Gracefully stopping... (press Ctrl+C again to force)")
stopFunc() // nolint:errcheck
}()
consumer := formatter.NewLogConsumer(ctx, os.Stdout, !opts.noColor, !opts.noPrefix)
var exitCode int
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
code, err := printer.run(opts.cascadeStop, opts.exitCodeFrom, consumer, stopFunc)
code, err := printer.Run(opts.cascadeStop, opts.exitCodeFrom, stopFunc)
exitCode = code
return err
})
err = backend.Start(ctx, project, compose.StartOptions{
Attach: func(event compose.ContainerEvent) {
queue <- event
},
Attach: printer.HandleEvent,
Services: services,
})
if err != nil {
@ -341,11 +333,7 @@ func setServiceScale(project *types.Project, name string, replicas int) error {
if err != nil {
return err
}
if service.Deploy == nil {
service.Deploy = &types.DeployConfig{}
}
count := uint64(replicas)
service.Deploy.Replicas = &count
service.Scale = replicas
project.Services[i] = service
return nil
}
@ -392,49 +380,3 @@ func setup(opts composeOptions, services []string) (*types.Project, error) {
return project, nil
}
type printer struct {
queue chan compose.ContainerEvent
}
func (p printer) run(cascadeStop bool, exitCodeFrom string, consumer compose.LogConsumer, stopFn func() error) (int, error) {
var aborting bool
var count int
for {
event := <-p.queue
switch event.Type {
case compose.UserCancel:
aborting = true
case compose.ContainerEventAttach:
consumer.Register(event.Container)
count++
case compose.ContainerEventExit:
if !aborting {
consumer.Status(event.Container, fmt.Sprintf("exited with code %d", event.ExitCode))
}
if cascadeStop {
if !aborting {
aborting = true
fmt.Println("Aborting on container exit...")
err := stopFn()
if err != nil {
return 0, err
}
}
if exitCodeFrom == "" || exitCodeFrom == event.Service {
logrus.Error(event.ExitCode)
return event.ExitCode, nil
}
}
count--
if count == 0 {
// Last container terminated, done
return 0, nil
}
case compose.ContainerEventLog:
if !aborting {
consumer.Log(event.Container, event.Service, event.Line)
}
}
}
}

View File

@ -39,5 +39,5 @@ func TestApplyScaleOpt(t *testing.T) {
assert.NilError(t, err)
foo, err := p.GetService("foo")
assert.NilError(t, err)
assert.Check(t, *foo.Deploy.Replicas == 2)
assert.Equal(t, foo.Scale, 2)
}

View File

@ -33,10 +33,6 @@ import (
)
func (s *composeService) attach(ctx context.Context, project *types.Project, listener compose.ContainerEventListener, selectedServices []string) (Containers, error) {
if len(selectedServices) == 0 {
selectedServices = project.ServiceNames()
}
containers, err := s.getContainers(ctx, project.Name, oneOffExclude, true, selectedServices...)
if err != nil {
return nil, err
@ -57,44 +53,6 @@ func (s *composeService) attach(ctx context.Context, project *types.Project, lis
return nil, err
}
}
// Watch events to capture container restart and re-attach
go func() {
crashed := map[string]struct{}{}
s.Events(ctx, project.Name, compose.EventsOptions{ // nolint: errcheck
Services: selectedServices,
Consumer: func(event compose.Event) error {
if event.Status == "die" {
crashed[event.Container] = struct{}{}
return nil
}
if _, ok := crashed[event.Container]; ok {
inspect, err := s.apiClient.ContainerInspect(ctx, event.Container)
if err != nil {
return err
}
container := moby.Container{
ID: event.Container,
Names: []string{inspect.Name},
State: convert.ContainerRunning,
Labels: map[string]string{
projectLabel: project.Name,
serviceLabel: event.Service,
},
}
// Just ignore errors when reattaching to already crashed containers
s.attachContainer(ctx, container, listener, project) // nolint: errcheck
delete(crashed, event.Container)
s.waitContainer(container, listener)
}
return nil
},
})
}()
return containers, err
}

View File

@ -191,18 +191,22 @@ func (s *composeService) removeVolume(ctx context.Context, id string, w progress
}
func (s *composeService) stopContainers(ctx context.Context, w progress.Writer, containers []moby.Container, timeout *time.Duration) error {
eg, ctx := errgroup.WithContext(ctx)
for _, container := range containers {
toStop := container
eventName := getContainerProgressName(toStop)
w.Event(progress.StoppingEvent(eventName))
err := s.apiClient.ContainerStop(ctx, toStop.ID, timeout)
if err != nil {
w.Event(progress.ErrorMessageEvent(eventName, "Error while Stopping"))
return err
}
w.Event(progress.StoppedEvent(eventName))
container := container
eg.Go(func() error {
eventName := getContainerProgressName(container)
w.Event(progress.StoppingEvent(eventName))
err := s.apiClient.ContainerStop(ctx, container.ID, timeout)
if err != nil {
w.Event(progress.ErrorMessageEvent(eventName, "Error while Stopping"))
return err
}
w.Event(progress.StoppedEvent(eventName))
return nil
})
}
return nil
return eg.Wait()
}
func (s *composeService) removeContainers(ctx context.Context, w progress.Writer, containers []moby.Container, timeout *time.Duration) error {

View File

@ -24,22 +24,26 @@ import (
"github.com/compose-spec/compose-go/types"
moby "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/sirupsen/logrus"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
func (s *composeService) Start(ctx context.Context, project *types.Project, options compose.StartOptions) error {
listener := options.Attach
if len(options.Services) == 0 {
options.Services = project.ServiceNames()
}
var containers Containers
if options.Attach != nil {
c, err := s.attach(ctx, project, options.Attach, options.Services)
eg, ctx := errgroup.WithContext(ctx)
if listener != nil {
attached, err := s.attach(ctx, project, listener, options.Services)
if err != nil {
return err
}
containers = c
eg.Go(func() error {
return s.watchContainers(project, options.Services, listener, attached)
})
}
err := InDependencyOrder(ctx, project, func(c context.Context, service types.ServiceConfig) error {
@ -51,32 +55,79 @@ func (s *composeService) Start(ctx context.Context, project *types.Project, opti
if err != nil {
return err
}
return eg.Wait()
}
if options.Attach == nil {
// watchContainers uses engine events to capture container start/die and notify ContainerEventListener
func (s *composeService) watchContainers(project *types.Project, services []string, listener compose.ContainerEventListener, containers Containers) error {
watched := map[string]int{}
for _, c := range containers {
watched[c.ID] = 0
}
ctx, stop := context.WithCancel(context.Background())
err := s.Events(ctx, project.Name, compose.EventsOptions{
Services: services,
Consumer: func(event compose.Event) error {
inspected, err := s.apiClient.ContainerInspect(ctx, event.Container)
if err != nil {
return err
}
container := moby.Container{
ID: inspected.ID,
Names: []string{inspected.Name},
Labels: inspected.Config.Labels,
}
name := getContainerNameWithoutProject(container)
if event.Status == "die" {
restarted := watched[container.ID]
watched[container.ID] = restarted + 1
// Container terminated.
willRestart := inspected.HostConfig.RestartPolicy.MaximumRetryCount > restarted
listener(compose.ContainerEvent{
Type: compose.ContainerEventExit,
Container: name,
Service: container.Labels[serviceLabel],
ExitCode: inspected.State.ExitCode,
Restarting: willRestart,
})
if !willRestart {
// we're done with this one
delete(watched, container.ID)
}
if len(watched) == 0 {
// all project containers stopped, we're done
stop()
}
return nil
}
if event.Status == "start" {
count, ok := watched[container.ID]
mustAttach := ok && count > 0 // Container restarted, need to re-attach
if !ok {
// A new container has just been added to service by scale
watched[container.ID] = 0
mustAttach = true
}
if mustAttach {
// Container restarted, need to re-attach
err := s.attachContainer(ctx, container, listener, project)
if err != nil {
return err
}
}
}
return nil
},
})
if errors.Is(ctx.Err(), context.Canceled) {
return nil
}
for _, c := range containers {
c := c
go func() {
s.waitContainer(c, options.Attach)
}()
}
return nil
}
func (s *composeService) waitContainer(c moby.Container, listener compose.ContainerEventListener) {
statusC, errC := s.apiClient.ContainerWait(context.Background(), c.ID, container.WaitConditionNotRunning)
name := getContainerNameWithoutProject(c)
select {
case status := <-statusC:
listener(compose.ContainerEvent{
Type: compose.ContainerEventExit,
Container: name,
Service: c.Labels[serviceLabel],
ExitCode: int(status.StatusCode),
})
case err := <-errC:
logrus.Warnf("Unexpected API error for %s : %s", name, err.Error())
}
return err
}

View File

@ -38,7 +38,7 @@ func TestStopTimeout(t *testing.T) {
tested.apiClient = api
ctx := context.Background()
api.EXPECT().ContainerList(ctx, projectFilterListOpt()).Return(
api.EXPECT().ContainerList(gomock.Any(), projectFilterListOpt()).Return(
[]moby.Container{
testContainer("service1", "123"),
testContainer("service1", "456"),
@ -46,9 +46,9 @@ func TestStopTimeout(t *testing.T) {
}, nil)
timeout := time.Duration(2) * time.Second
api.EXPECT().ContainerStop(ctx, "123", &timeout).Return(nil)
api.EXPECT().ContainerStop(ctx, "456", &timeout).Return(nil)
api.EXPECT().ContainerStop(ctx, "789", &timeout).Return(nil)
api.EXPECT().ContainerStop(gomock.Any(), "123", &timeout).Return(nil)
api.EXPECT().ContainerStop(gomock.Any(), "456", &timeout).Return(nil)
api.EXPECT().ContainerStop(gomock.Any(), "789", &timeout).Return(nil)
err := tested.Stop(ctx, &types.Project{
Name: testProject,

View File

@ -27,6 +27,7 @@ import (
"testing"
"time"
testify "github.com/stretchr/testify/assert"
"gotest.tools/v3/assert"
"gotest.tools/v3/icmd"
@ -197,10 +198,10 @@ func TestAttachRestart(t *testing.T) {
c.WaitForCondition(func() (bool, string) {
debug := res.Combined()
return strings.Count(res.Stdout(), "another_1 exited with code 1") == 3, fmt.Sprintf("'another_1 exited with code 1' not found 3 times in : \n%s\n", debug)
return strings.Count(res.Stdout(), "failing_1 exited with code 1") == 3, fmt.Sprintf("'failing_1 exited with code 1' not found 3 times in : \n%s\n", debug)
}, 2*time.Minute, 2*time.Second)
assert.Equal(t, strings.Count(res.Stdout(), "another_1 | world"), 3, res.Combined())
assert.Equal(t, strings.Count(res.Stdout(), "failing_1 | world"), 3, res.Combined())
}
func TestInitContainer(t *testing.T) {
@ -208,7 +209,5 @@ func TestInitContainer(t *testing.T) {
res := c.RunDockerOrExitError("compose", "--ansi=never", "--project-directory", "./fixtures/init-container", "up")
defer c.RunDockerOrExitError("compose", "-p", "init-container", "down")
output := res.Stdout()
assert.Assert(t, strings.Contains(output, "foo_1 | hello\nbar_1 | world"), res.Combined())
testify.Regexp(t, "foo_1 | hello(?m:.*)bar_1 | world", res.Stdout())
}

View File

@ -1,8 +1,5 @@
services:
simple:
image: alpine
command: sh -c "sleep infinity"
another:
failing:
image: alpine
command: sh -c "sleep 0.1 && echo world && /bin/false"
deploy:

View File

@ -29,6 +29,7 @@ func ServiceHash(o types.ServiceConfig) (string, error) {
// remove the Build config when generating the service hash
o.Build = nil
o.PullPolicy = ""
o.Scale = 1
bytes, err := json.Marshal(o)
if err != nil {
return "", err