Merge pull request #1396 from docker/events

introduce docker compose events
This commit is contained in:
Nicolas De loof 2021-03-08 14:37:15 +01:00 committed by GitHub
commit 4a8a1aeb48
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 403 additions and 52 deletions

View File

@ -229,3 +229,7 @@ func (cs *aciComposeService) Exec(ctx context.Context, project *types.Project, o
func (cs *aciComposeService) Top(ctx context.Context, projectName string, services []string) ([]compose.ContainerProcSummary, error) {
return nil, errdefs.ErrNotImplemented
}
func (cs *aciComposeService) Events(ctx context.Context, project string, options compose.EventsOptions) error {
return errdefs.ErrNotImplemented
}

View File

@ -103,3 +103,7 @@ func (c *composeService) UnPause(ctx context.Context, project *types.Project) er
func (c *composeService) Top(ctx context.Context, projectName string, services []string) ([]compose.ContainerProcSummary, error) {
return nil, errdefs.ErrNotImplemented
}
func (c *composeService) Events(ctx context.Context, project string, options compose.EventsOptions) error {
return errdefs.ErrNotImplemented
}

View File

@ -18,6 +18,7 @@ package compose
import (
"context"
"fmt"
"io"
"strings"
"time"
@ -65,6 +66,8 @@ type Service interface {
UnPause(ctx context.Context, project *types.Project) error
// Top executes the equivalent to a `compose top`
Top(ctx context.Context, projectName string, services []string) ([]ContainerProcSummary, error)
// Events executes the equivalent to a `compose events`
Events(ctx context.Context, project string, options EventsOptions) error
}
// BuildOptions group options of the Build API
@ -156,7 +159,7 @@ type RemoveOptions struct {
Force bool
}
// RunOptions options to execute compose run
// RunOptions group options of the Run API
type RunOptions struct {
Name string
Service string
@ -177,6 +180,31 @@ type RunOptions struct {
Index int
}
// EventsOptions group options of the Events API
type EventsOptions struct {
Services []string
Consumer func(event Event) error
}
// Event is a container runtime event served by Events API
type Event struct {
Timestamp time.Time
Service string
Container string
Status string
Attributes map[string]string
}
func (e Event) String() string {
t := e.Timestamp.Format("2006-01-02 15:04:05.000000")
var attr []string
for k, v := range e.Attributes {
attr = append(attr, fmt.Sprintf("%s=%s", k, v))
}
return fmt.Sprintf("%s container %s %s (%s)\n", t, e.Status, e.Container, strings.Join(attr, ", "))
}
// EnvironmentMap return RunOptions.Environment as a MappingWithEquals
func (opts *RunOptions) EnvironmentMap() types.MappingWithEquals {
environment := types.MappingWithEquals{}

View File

@ -27,6 +27,7 @@ import (
"github.com/spf13/pflag"
"github.com/docker/compose-cli/api/context/store"
"github.com/docker/compose-cli/cli/formatter"
)
// Warning is a global warning to be displayed to user on command failure
@ -100,11 +101,13 @@ func (o *projectOptions) toProjectOptions() (*cli.ProjectOptions, error) {
// Command returns the compose command with its child commands
func Command(contextType string) *cobra.Command {
opts := projectOptions{}
var ansi string
command := &cobra.Command{
Short: "Docker Compose",
Use: "compose",
TraverseChildren: true,
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
formatter.SetANSIMode(ansi)
if opts.WorkDir != "" {
if opts.ProjectDir != "" {
return errors.New(aec.Apply(`cannot specify DEPRECATED "--workdir" and "--project-directory". Please use only "--project-directory" instead.`, aec.RedF))
@ -136,6 +139,7 @@ func Command(contextType string) *cobra.Command {
pauseCommand(&opts),
unpauseCommand(&opts),
topCommand(&opts),
eventsCommand(&opts),
)
if contextType == store.LocalContextType || contextType == store.DefaultContextType {
@ -148,5 +152,6 @@ func Command(contextType string) *cobra.Command {
}
command.Flags().SetInterspersed(false)
opts.addProjectFlags(command.Flags())
command.Flags().StringVar(&ansi, "ansi", "auto", `Control when to print ANSI control characters ("never"|"always"|"auto")`)
return command
}

86
cli/cmd/compose/events.go Normal file
View File

@ -0,0 +1,86 @@
/*
Copyright 2020 Docker Compose CLI authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package compose
import (
"context"
"encoding/json"
"fmt"
"github.com/docker/compose-cli/api/client"
"github.com/docker/compose-cli/api/compose"
"github.com/spf13/cobra"
)
type eventsOpts struct {
*composeOptions
json bool
}
func eventsCommand(p *projectOptions) *cobra.Command {
opts := eventsOpts{
composeOptions: &composeOptions{
projectOptions: p,
},
}
cmd := &cobra.Command{
Use: "events [options] [--] [SERVICE...]",
Short: "Receive real time events from containers.",
RunE: func(cmd *cobra.Command, args []string) error {
return runEvents(cmd.Context(), opts, args)
},
}
cmd.Flags().BoolVar(&opts.json, "json", false, "Output events as a stream of json objects")
return cmd
}
func runEvents(ctx context.Context, opts eventsOpts, services []string) error {
c, err := client.NewWithDefaultLocalBackend(ctx)
if err != nil {
return err
}
project, err := opts.toProjectName()
if err != nil {
return err
}
return c.ComposeService().Events(ctx, project, compose.EventsOptions{
Services: services,
Consumer: func(event compose.Event) error {
if opts.json {
marshal, err := json.Marshal(map[string]interface{}{
"time": event.Timestamp,
"type": "container",
"service": event.Service,
"id": event.Container,
"action": event.Status,
"attributes": event.Attributes,
})
if err != nil {
return err
}
fmt.Println(string(marshal))
} else {
fmt.Println(event)
}
return nil
},
})
}

View File

@ -18,7 +18,10 @@ package formatter
import (
"fmt"
"os"
"strconv"
"github.com/mattn/go-isatty"
)
var names = []string{
@ -32,6 +35,36 @@ var names = []string{
"white",
}
const (
// Never use ANSI codes
Never = "never"
// Always use ANSI codes
Always = "always"
// Auto detect terminal is a tty and can use ANSI codes
Auto = "auto"
)
// SetANSIMode configure formatter for colored output on ANSI-compliant console
func SetANSIMode(ansi string) {
if !useAnsi(ansi) {
nextColor = func() colorFunc {
return monochrome
}
}
}
func useAnsi(ansi string) bool {
switch ansi {
case Always:
return true
case Auto:
return isatty.IsTerminal(os.Stdout.Fd())
}
return false
}
// colorFunc use ANSI codes to render colored text on console
type colorFunc func(s string) string
@ -53,6 +86,12 @@ func makeColorFunc(code string) colorFunc {
}
}
var nextColor func() colorFunc = rainbowColor
func rainbowColor() colorFunc {
return <-loop
}
var loop = make(chan colorFunc)
func init() {

View File

@ -45,7 +45,7 @@ func (l *logConsumer) Register(name string, id string) {
func (l *logConsumer) register(name string, id string) *presenter {
cf := monochrome
if l.color {
cf = <-loop
cf = nextColor()
}
p := &presenter{
colors: cf,

View File

@ -195,3 +195,7 @@ func (e ecsLocalSimulation) UnPause(ctx context.Context, project *types.Project)
func (e ecsLocalSimulation) Top(ctx context.Context, projectName string, services []string) ([]compose.ContainerProcSummary, error) {
return e.compose.Top(ctx, projectName, services)
}
func (e ecsLocalSimulation) Events(ctx context.Context, project string, options compose.EventsOptions) error {
return e.compose.Events(ctx, project, options)
}

View File

@ -63,6 +63,10 @@ func (b *ecsAPIService) UnPause(ctx context.Context, project *types.Project) err
return errdefs.ErrNotImplemented
}
func (b *ecsAPIService) Events(ctx context.Context, project string, options compose.EventsOptions) error {
return errdefs.ErrNotImplemented
}
func (b *ecsAPIService) Up(ctx context.Context, project *types.Project, options compose.UpOptions) error {
logrus.Debugf("deploying on AWS with region=%q", b.Region)
err := b.aws.CheckRequirements(ctx, b.Region)

1
go.mod
View File

@ -39,6 +39,7 @@ require (
github.com/joho/godotenv v1.3.0
github.com/labstack/echo v3.3.10+incompatible
github.com/labstack/gommon v0.3.0 // indirect
github.com/mattn/go-isatty v0.0.12
github.com/mattn/go-shellwords v1.0.11
github.com/moby/buildkit v0.8.1-0.20201205083753-0af7b1b9c693
github.com/moby/term v0.0.0-20201110203204-bea5bbe245bf

View File

@ -258,3 +258,7 @@ func (s *composeService) UnPause(ctx context.Context, project *types.Project) er
func (s *composeService) Top(ctx context.Context, projectName string, services []string) ([]compose.ContainerProcSummary, error) {
return nil, errdefs.ErrNotImplemented
}
func (s *composeService) Events(ctx context.Context, project string, options compose.EventsOptions) error {
return errdefs.ErrNotImplemented
}

View File

@ -31,7 +31,7 @@ import (
"github.com/docker/docker/pkg/stdcopy"
)
func (s *composeService) attach(ctx context.Context, project *types.Project, consumer compose.ContainerEventListener, selectedServices []string) (Containers, error) {
func (s *composeService) attach(ctx context.Context, project *types.Project, listener compose.ContainerEventListener, selectedServices []string) (Containers, error) {
containers, err := s.getContainers(ctx, project, oneOffExclude, selectedServices)
if err != nil {
return nil, err
@ -47,33 +47,72 @@ func (s *composeService) attach(ctx context.Context, project *types.Project, con
fmt.Printf("Attaching to %s\n", strings.Join(names, ", "))
for _, container := range containers {
consumer(compose.ContainerEvent{
Type: compose.ContainerEventAttach,
Source: container.ID,
Name: getContainerNameWithoutProject(container),
Service: container.Labels[serviceLabel],
})
err := s.attachContainer(ctx, container, consumer, project)
err := s.attachContainer(ctx, container, listener, project)
if err != nil {
return nil, err
}
}
return containers, nil
// Watch events to capture container restart and re-attach
go func() {
crashed := map[string]struct{}{}
s.Events(ctx, project.Name, compose.EventsOptions{ // nolint: errcheck
Services: selectedServices,
Consumer: func(event compose.Event) error {
if event.Status == "die" {
crashed[event.Container] = struct{}{}
return nil
}
if _, ok := crashed[event.Container]; ok {
inspect, err := s.apiClient.ContainerInspect(ctx, event.Container)
if err != nil {
return err
}
container := moby.Container{
ID: event.Container,
Names: []string{inspect.Name},
State: convert.ContainerRunning,
Labels: map[string]string{
projectLabel: project.Name,
serviceLabel: event.Service,
},
}
// Just ignore errors when reattaching to already crashed containers
s.attachContainer(ctx, container, listener, project) // nolint: errcheck
delete(crashed, event.Container)
s.waitContainer(ctx, container, listener)
}
return nil
},
})
}()
return containers, err
}
func (s *composeService) attachContainer(ctx context.Context, container moby.Container, consumer compose.ContainerEventListener, project *types.Project) error {
func (s *composeService) attachContainer(ctx context.Context, container moby.Container, listener compose.ContainerEventListener, project *types.Project) error {
serviceName := container.Labels[serviceLabel]
w := utils.GetWriter(getContainerNameWithoutProject(container), serviceName, container.ID, consumer)
w := utils.GetWriter(getContainerNameWithoutProject(container), serviceName, container.ID, listener)
service, err := project.GetService(serviceName)
if err != nil {
return err
}
return s.attachContainerStreams(ctx, container, service.Tty, nil, w)
listener(compose.ContainerEvent{
Type: compose.ContainerEventAttach,
Source: container.ID,
Name: getContainerNameWithoutProject(container),
Service: container.Labels[serviceLabel],
})
return s.attachContainerStreams(ctx, container.ID, service.Tty, nil, w)
}
func (s *composeService) attachContainerStreams(ctx context.Context, container moby.Container, tty bool, r io.Reader, w io.Writer) error {
func (s *composeService) attachContainerStreams(ctx context.Context, container string, tty bool, r io.Reader, w io.Writer) error {
stdin, stdout, err := s.getContainerStreams(ctx, container)
if err != nil {
return err
@ -105,32 +144,30 @@ func (s *composeService) attachContainerStreams(ctx context.Context, container m
return nil
}
func (s *composeService) getContainerStreams(ctx context.Context, container moby.Container) (io.WriteCloser, io.ReadCloser, error) {
func (s *composeService) getContainerStreams(ctx context.Context, container string) (io.WriteCloser, io.ReadCloser, error) {
var stdout io.ReadCloser
var stdin io.WriteCloser
if container.State == convert.ContainerRunning {
logs, err := s.apiClient.ContainerLogs(ctx, container.ID, moby.ContainerLogsOptions{
ShowStdout: true,
ShowStderr: true,
Follow: true,
})
if err != nil {
return nil, nil, err
}
stdout = logs
} else {
cnx, err := s.apiClient.ContainerAttach(ctx, container.ID, moby.ContainerAttachOptions{
Stream: true,
Stdin: true,
Stdout: true,
Stderr: true,
Logs: false,
})
if err != nil {
return nil, nil, err
}
cnx, err := s.apiClient.ContainerAttach(ctx, container, moby.ContainerAttachOptions{
Stream: true,
Stdin: true,
Stdout: true,
Stderr: true,
Logs: false,
})
if err == nil {
stdout = convert.ContainerStdout{HijackedResponse: cnx}
stdin = convert.ContainerStdin{HijackedResponse: cnx}
return stdin, stdout, nil
}
return stdin, stdout, nil
// Fallback to logs API
logs, err := s.apiClient.ContainerLogs(ctx, container, moby.ContainerLogsOptions{
ShowStdout: true,
ShowStderr: true,
Follow: true,
})
if err != nil {
return nil, nil, err
}
return stdin, logs, nil
}

View File

@ -272,6 +272,7 @@ func (s *composeService) getCreateOptions(ctx context.Context, p *types.Project,
if err != nil {
return nil, nil, nil, err
}
hostConfig := container.HostConfig{
AutoRemove: autoRemove,
Binds: binds,
@ -281,6 +282,7 @@ func (s *composeService) getCreateOptions(ctx context.Context, p *types.Project,
NetworkMode: networkMode,
Init: service.Init,
ReadonlyRootfs: service.ReadOnly,
RestartPolicy: getRestartPolicy(service),
// ShmSize: , TODO
Sysctls: service.Sysctls,
PortBindings: portBindings,
@ -293,6 +295,33 @@ func (s *composeService) getCreateOptions(ctx context.Context, p *types.Project,
return &containerConfig, &hostConfig, networkConfig, nil
}
func getRestartPolicy(service types.ServiceConfig) container.RestartPolicy {
var restart container.RestartPolicy
if service.Restart != "" {
split := strings.Split(service.Restart, ":")
var attempts int
if len(split) > 1 {
attempts, _ = strconv.Atoi(split[1])
}
restart = container.RestartPolicy{
Name: split[0],
MaximumRetryCount: attempts,
}
}
if service.Deploy != nil && service.Deploy.RestartPolicy != nil {
policy := *service.Deploy.RestartPolicy
var attempts int
if policy.MaxAttempts != nil {
attempts = int(*policy.MaxAttempts)
}
restart = container.RestartPolicy{
Name: policy.Condition,
MaximumRetryCount: attempts,
}
}
return restart
}
func getDeployResources(s types.ServiceConfig) container.Resources {
resources := container.Resources{}
if s.Deploy == nil {

75
local/compose/events.go Normal file
View File

@ -0,0 +1,75 @@
/*
Copyright 2020 Docker Compose CLI authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package compose
import (
"context"
"strings"
"time"
"github.com/docker/compose-cli/api/compose"
"github.com/docker/compose-cli/utils"
moby "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
)
func (s *composeService) Events(ctx context.Context, project string, options compose.EventsOptions) error {
events, errors := s.apiClient.Events(ctx, moby.EventsOptions{
Filters: filters.NewArgs(projectFilter(project)),
})
for {
select {
case event := <-events:
// TODO: support other event types
if event.Type != "container" {
continue
}
service := event.Actor.Attributes[serviceLabel]
if len(options.Services) > 0 && !utils.StringContains(options.Services, service) {
continue
}
attributes := map[string]string{}
for k, v := range event.Actor.Attributes {
if strings.HasPrefix(k, "com.docker.compose.") {
continue
}
attributes[k] = v
}
timestamp := time.Unix(event.Time, 0)
if event.TimeNano != 0 {
timestamp = time.Unix(0, event.TimeNano)
}
err := options.Consumer(compose.Event{
Timestamp: timestamp,
Service: service,
Container: event.ID,
Status: event.Status,
Attributes: attributes,
})
if err != nil {
return err
}
case err := <-errors:
return err
}
}
}

View File

@ -86,7 +86,7 @@ func (s *composeService) RunOneOffContainer(ctx context.Context, project *types.
return 0, err
}
oneoffContainer := containers[0]
err = s.attachContainerStreams(ctx, oneoffContainer, service.Tty, opts.Reader, opts.Writer)
err = s.attachContainerStreams(ctx, oneoffContainer.ID, service.Tty, opts.Reader, opts.Writer)
if err != nil {
return 0, err
}

View File

@ -22,6 +22,7 @@ import (
"github.com/docker/compose-cli/api/compose"
"github.com/compose-spec/compose-go/types"
moby "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/sirupsen/logrus"
)
@ -50,20 +51,25 @@ func (s *composeService) Start(ctx context.Context, project *types.Project, opti
for _, c := range containers {
c := c
go func() {
statusC, errC := s.apiClient.ContainerWait(context.Background(), c.ID, container.WaitConditionNotRunning)
select {
case status := <-statusC:
options.Attach(compose.ContainerEvent{
Type: compose.ContainerEventExit,
Source: c.ID,
Name: getCanonicalContainerName(c),
Service: c.Labels[serviceLabel],
ExitCode: int(status.StatusCode),
})
case err := <-errC:
logrus.Warnf("Unexpected API error for %s : %s\n", getCanonicalContainerName(c), err.Error())
}
s.waitContainer(ctx, c, options.Attach)
}()
}
return nil
}
func (s *composeService) waitContainer(ctx context.Context, c moby.Container, listener compose.ContainerEventListener) {
statusC, errC := s.apiClient.ContainerWait(ctx, c.ID, container.WaitConditionNotRunning)
name := getCanonicalContainerName(c)
select {
case status := <-statusC:
listener(compose.ContainerEvent{
Type: compose.ContainerEventExit,
Source: c.ID,
Name: name,
Service: c.Labels[serviceLabel],
ExitCode: int(status.StatusCode),
})
case err := <-errC:
logrus.Warnf("Unexpected API error for %s : %s", name, err.Error())
}
}

View File

@ -132,3 +132,17 @@ func TestComposePull(t *testing.T) {
assert.Assert(t, strings.Contains(output, "simple Pulled"))
assert.Assert(t, strings.Contains(output, "another Pulled"))
}
func TestAttachRestart(t *testing.T) {
c := NewParallelE2eCLI(t, binDir)
res := c.RunDockerOrExitError("compose", "--ansi=never", "--project-directory", "fixtures/attach-restart", "up")
output := res.Stdout()
assert.Assert(t, strings.Contains(output, `another_1 | world
attach-restart_another_1 exited with code 1
another_1 | world
attach-restart_another_1 exited with code 1
another_1 | world
attach-restart_another_1 exited with code 1`), res.Combined())
}

View File

@ -0,0 +1,11 @@
services:
simple:
image: busybox:1.31.0-uclibc
command: sh -c "sleep 5"
another:
image: busybox:1.31.0-uclibc
command: sh -c "sleep 0.1 && echo world && /bin/false"
deploy:
restart_policy:
condition: "on-failure"
max_attempts: 2