diff --git a/aci/compose.go b/aci/compose.go index 7756b1f87..93976e7ba 100644 --- a/aci/compose.go +++ b/aci/compose.go @@ -19,7 +19,6 @@ package aci import ( "context" "fmt" - "io" "net/http" "github.com/compose-spec/compose-go/types" @@ -60,7 +59,7 @@ func (cs *aciComposeService) Create(ctx context.Context, project *types.Project) return errdefs.ErrNotImplemented } -func (cs *aciComposeService) Start(ctx context.Context, project *types.Project, w io.Writer) error { +func (cs *aciComposeService) Start(ctx context.Context, project *types.Project, consumer compose.LogConsumer) error { return errdefs.ErrNotImplemented } @@ -176,7 +175,7 @@ func (cs *aciComposeService) List(ctx context.Context, project string) ([]compos return stacks, nil } -func (cs *aciComposeService) Logs(ctx context.Context, project string, w io.Writer) error { +func (cs *aciComposeService) Logs(ctx context.Context, projectName string, consumer compose.LogConsumer) error { return errdefs.ErrNotImplemented } diff --git a/api/client/compose.go b/api/client/compose.go index 21e374e91..3503e83a4 100644 --- a/api/client/compose.go +++ b/api/client/compose.go @@ -18,12 +18,11 @@ package client import ( "context" - "io" - - "github.com/compose-spec/compose-go/types" "github.com/docker/compose-cli/api/compose" "github.com/docker/compose-cli/errdefs" + + "github.com/compose-spec/compose-go/types" ) type composeService struct { @@ -45,7 +44,7 @@ func (c *composeService) Create(ctx context.Context, project *types.Project) err return errdefs.ErrNotImplemented } -func (c *composeService) Start(ctx context.Context, project *types.Project, w io.Writer) error { +func (c *composeService) Start(ctx context.Context, project *types.Project, consumer compose.LogConsumer) error { return errdefs.ErrNotImplemented } @@ -57,7 +56,7 @@ func (c *composeService) Down(context.Context, string) error { return errdefs.ErrNotImplemented } -func (c *composeService) Logs(context.Context, string, io.Writer) error { +func (c *composeService) Logs(context.Context, string, compose.LogConsumer) error { return errdefs.ErrNotImplemented } diff --git a/api/compose/api.go b/api/compose/api.go index 0f578430b..95cbb5b98 100644 --- a/api/compose/api.go +++ b/api/compose/api.go @@ -18,7 +18,6 @@ package compose import ( "context" - "io" "github.com/compose-spec/compose-go/types" ) @@ -34,13 +33,13 @@ type Service interface { // Create executes the equivalent to a `compose create` Create(ctx context.Context, project *types.Project) error // Start executes the equivalent to a `compose start` - Start(ctx context.Context, project *types.Project, w io.Writer) error + Start(ctx context.Context, project *types.Project, consumer LogConsumer) error // Up executes the equivalent to a `compose up` Up(ctx context.Context, project *types.Project, detach bool) error // Down executes the equivalent to a `compose down` Down(ctx context.Context, projectName string) error // Logs executes the equivalent to a `compose logs` - Logs(ctx context.Context, projectName string, w io.Writer) error + Logs(ctx context.Context, projectName string, consumer LogConsumer) error // Ps executes the equivalent to a `compose ps` Ps(ctx context.Context, projectName string) ([]ServiceStatus, error) // List executes the equivalent to a `docker stack ls` @@ -89,3 +88,8 @@ type Stack struct { Status string Reason string } + +// LogConsumer is a callback to process log messages from services +type LogConsumer interface { + Log(service, container, message string) +} diff --git a/cli/cmd/compose/logs.go b/cli/cmd/compose/logs.go index 4f08f3aad..05de69873 100644 --- a/cli/cmd/compose/logs.go +++ b/cli/cmd/compose/logs.go @@ -20,9 +20,10 @@ import ( "context" "os" - "github.com/spf13/cobra" - "github.com/docker/compose-cli/api/client" + "github.com/docker/compose-cli/formatter" + + "github.com/spf13/cobra" ) func logsCommand() *cobra.Command { @@ -50,5 +51,6 @@ func runLogs(ctx context.Context, opts composeOptions) error { if err != nil { return err } - return c.ComposeService().Logs(ctx, projectName, os.Stdout) + consumer := formatter.NewLogConsumer(ctx, os.Stdout) + return c.ComposeService().Logs(ctx, projectName, consumer) } diff --git a/cli/cmd/compose/up.go b/cli/cmd/compose/up.go index 14fa2cba4..d1eb2bee4 100644 --- a/cli/cmd/compose/up.go +++ b/cli/cmd/compose/up.go @@ -20,16 +20,17 @@ import ( "context" "errors" "fmt" - "io" "os" + "github.com/docker/compose-cli/api/client" + "github.com/docker/compose-cli/api/compose" + "github.com/docker/compose-cli/context/store" + "github.com/docker/compose-cli/formatter" + "github.com/docker/compose-cli/progress" + "github.com/compose-spec/compose-go/cli" "github.com/compose-spec/compose-go/types" "github.com/spf13/cobra" - - "github.com/docker/compose-cli/api/client" - "github.com/docker/compose-cli/context/store" - "github.com/docker/compose-cli/progress" ) func upCommand(contextType string) *cobra.Command { @@ -83,12 +84,12 @@ func runCreateStart(ctx context.Context, opts composeOptions, services []string) return err } - var w io.Writer + var consumer compose.LogConsumer if !opts.Detach { - w = os.Stdout + consumer = formatter.NewLogConsumer(ctx, os.Stdout) } - err = c.ComposeService().Start(ctx, project, w) + err = c.ComposeService().Start(ctx, project, consumer) if errors.Is(ctx.Err(), context.Canceled) { fmt.Println("Gracefully stopping...") ctx = context.Background() diff --git a/ecs/local/compose.go b/ecs/local/compose.go index adeb06baf..f1ad7b58d 100644 --- a/ecs/local/compose.go +++ b/ecs/local/compose.go @@ -22,7 +22,6 @@ import ( "context" "encoding/json" "fmt" - "io" "os" "os/exec" "path/filepath" @@ -56,7 +55,7 @@ func (e ecsLocalSimulation) Create(ctx context.Context, project *types.Project) return errdefs.ErrNotImplemented } -func (e ecsLocalSimulation) Start(ctx context.Context, project *types.Project, w io.Writer) error { +func (e ecsLocalSimulation) Start(ctx context.Context, project *types.Project, consumer compose.LogConsumer) error { return errdefs.ErrNotImplemented } @@ -181,7 +180,7 @@ services: return cmd.Run() } -func (e ecsLocalSimulation) Logs(ctx context.Context, projectName string, w io.Writer) error { +func (e ecsLocalSimulation) Logs(ctx context.Context, projectName string, consumer compose.LogConsumer) error { list, err := e.moby.ContainerList(ctx, types2.ContainerListOptions{ Filters: filters.NewArgs(filters.Arg("label", "com.docker.compose.project="+projectName)), }) diff --git a/ecs/logs.go b/ecs/logs.go index bdda7451a..fd991e961 100644 --- a/ecs/logs.go +++ b/ecs/logs.go @@ -18,13 +18,11 @@ package ecs import ( "context" - "io" - "github.com/docker/compose-cli/formatter" + "github.com/docker/compose-cli/api/compose" ) -func (b *ecsAPIService) Logs(ctx context.Context, project string, w io.Writer) error { - consumer := formatter.NewLogConsumer(ctx, w) - err := b.aws.GetLogs(ctx, project, consumer.Log) +func (b *ecsAPIService) Logs(ctx context.Context, projectName string, consumer compose.LogConsumer) error { + err := b.aws.GetLogs(ctx, projectName, consumer.Log) return err } diff --git a/ecs/up.go b/ecs/up.go index cab0b6f06..0af2fe70c 100644 --- a/ecs/up.go +++ b/ecs/up.go @@ -19,13 +19,14 @@ package ecs import ( "context" "fmt" - "io" "os" "os/signal" "syscall" - "github.com/compose-spec/compose-go/types" + "github.com/docker/compose-cli/api/compose" "github.com/docker/compose-cli/errdefs" + + "github.com/compose-spec/compose-go/types" ) func (b *ecsAPIService) Build(ctx context.Context, project *types.Project) error { @@ -44,7 +45,7 @@ func (b *ecsAPIService) Create(ctx context.Context, project *types.Project) erro return errdefs.ErrNotImplemented } -func (b *ecsAPIService) Start(ctx context.Context, project *types.Project, w io.Writer) error { +func (b *ecsAPIService) Start(ctx context.Context, project *types.Project, consumer compose.LogConsumer) error { return errdefs.ErrNotImplemented } diff --git a/example/backend.go b/example/backend.go index e6c1fc6df..540849505 100644 --- a/example/backend.go +++ b/example/backend.go @@ -22,9 +22,6 @@ import ( "context" "errors" "fmt" - "io" - - "github.com/compose-spec/compose-go/types" "github.com/docker/compose-cli/api/compose" "github.com/docker/compose-cli/api/containers" @@ -34,6 +31,8 @@ import ( "github.com/docker/compose-cli/backend" "github.com/docker/compose-cli/context/cloud" "github.com/docker/compose-cli/errdefs" + + "github.com/compose-spec/compose-go/types" ) type apiService struct { @@ -155,7 +154,7 @@ func (cs *composeService) Create(ctx context.Context, project *types.Project) er return errdefs.ErrNotImplemented } -func (cs *composeService) Start(ctx context.Context, project *types.Project, w io.Writer) error { +func (cs *composeService) Start(ctx context.Context, project *types.Project, consumer compose.LogConsumer) error { return errdefs.ErrNotImplemented } @@ -176,7 +175,7 @@ func (cs *composeService) Ps(ctx context.Context, project string) ([]compose.Ser func (cs *composeService) List(ctx context.Context, project string) ([]compose.Stack, error) { return nil, errdefs.ErrNotImplemented } -func (cs *composeService) Logs(ctx context.Context, project string, w io.Writer) error { +func (cs *composeService) Logs(ctx context.Context, projectName string, consumer compose.LogConsumer) error { return errdefs.ErrNotImplemented } diff --git a/formatter/logs.go b/formatter/logs.go index d01b9d01f..b70b9f458 100644 --- a/formatter/logs.go +++ b/formatter/logs.go @@ -23,11 +23,13 @@ import ( "io" "strconv" "strings" + + "github.com/docker/compose-cli/api/compose" ) // NewLogConsumer creates a new LogConsumer -func NewLogConsumer(ctx context.Context, w io.Writer) LogConsumer { - return LogConsumer{ +func NewLogConsumer(ctx context.Context, w io.Writer) compose.LogConsumer { + return &logConsumer{ ctx: ctx, colors: map[string]colorFunc{}, width: 0, @@ -36,7 +38,7 @@ func NewLogConsumer(ctx context.Context, w io.Writer) LogConsumer { } // Log formats a log message as received from service/container -func (l *LogConsumer) Log(service, container, message string) { +func (l *logConsumer) Log(service, container, message string) { if l.ctx.Err() != nil { return } @@ -54,16 +56,7 @@ func (l *LogConsumer) Log(service, container, message string) { } } -// GetWriter creates a io.Writer that will actually split by line and format by LogConsumer -func (l *LogConsumer) GetWriter(service, container string) io.Writer { - return splitBuffer{ - service: service, - container: container, - consumer: l, - } -} - -func (l *LogConsumer) computeWidth() { +func (l *logConsumer) computeWidth() { width := 0 for n := range l.colors { if len(n) > width { @@ -74,25 +67,9 @@ func (l *LogConsumer) computeWidth() { } // LogConsumer consume logs from services and format them -type LogConsumer struct { +type logConsumer struct { ctx context.Context colors map[string]colorFunc width int writer io.Writer } - -type splitBuffer struct { - service string - container string - consumer *LogConsumer -} - -func (s splitBuffer) Write(b []byte) (n int, err error) { - split := bytes.Split(b, []byte{'\n'}) - for _, line := range split { - if len(line) != 0 { - s.consumer.Log(s.service, s.container, string(line)) - } - } - return len(b), nil -} diff --git a/local/compose.go b/local/compose.go index 3caabdd66..ea6ac9c82 100644 --- a/local/compose.go +++ b/local/compose.go @@ -19,6 +19,7 @@ package local import ( + "bytes" "context" "encoding/base64" "encoding/json" @@ -54,7 +55,6 @@ import ( "github.com/docker/compose-cli/api/compose" "github.com/docker/compose-cli/config" errdefs2 "github.com/docker/compose-cli/errdefs" - "github.com/docker/compose-cli/formatter" "github.com/docker/compose-cli/progress" ) @@ -341,10 +341,10 @@ func (s *composeService) Create(ctx context.Context, project *types.Project) err }) } -func (s *composeService) Start(ctx context.Context, project *types.Project, w io.Writer) error { +func (s *composeService) Start(ctx context.Context, project *types.Project, consumer compose.LogConsumer) error { var group *errgroup.Group - if w != nil { - eg, err := s.attach(ctx, project, w) + if consumer != nil { + eg, err := s.attach(ctx, project, consumer) if err != nil { return err } @@ -363,8 +363,7 @@ func (s *composeService) Start(ctx context.Context, project *types.Project, w io return nil } -func (s *composeService) attach(ctx context.Context, project *types.Project, w io.Writer) (*errgroup.Group, error) { - consumer := formatter.NewLogConsumer(ctx, w) +func (s *composeService) attach(ctx context.Context, project *types.Project, consumer compose.LogConsumer) (*errgroup.Group, error) { containers, err := s.apiClient.ContainerList(ctx, moby.ContainerListOptions{ Filters: filters.NewArgs( projectFilter(project.Name), @@ -391,34 +390,49 @@ func (s *composeService) attach(ctx context.Context, project *types.Project, w i return eg, nil } -func (s *composeService) attachContainer(ctx context.Context, container moby.Container, consumer formatter.LogConsumer, project *types.Project) error { +func (s *composeService) attachContainer(ctx context.Context, container moby.Container, consumer compose.LogConsumer, project *types.Project) error { serviceName := container.Labels[serviceLabel] - w := consumer.GetWriter(serviceName, container.ID) + w := getWriter(serviceName, container.ID, consumer) + service, err := project.GetService(serviceName) if err != nil { return err } - reader, err := s.getContainerStdout(ctx, container) + return s.attachContainerStreams(ctx, container, service.Tty, nil, w) +} + +func (s *composeService) attachContainerStreams(ctx context.Context, container moby.Container, tty bool, r io.Reader, w io.Writer) error { + stdin, stdout, err := s.getContainerStreams(ctx, container) if err != nil { return err } go func() { <-ctx.Done() - reader.Close() //nolint:errcheck + stdout.Close() //nolint:errcheck + stdin.Close() //nolint:errcheck }() - if service.Tty { - _, err = io.Copy(w, reader) - } else { - _, err = stdcopy.StdCopy(w, w, reader) + if r != nil && stdin != nil { + go func() { + io.Copy(stdin, r) //nolint:errcheck + }() + } + + if w != nil { + if tty { + _, err = io.Copy(w, stdout) + } else { + _, err = stdcopy.StdCopy(w, w, stdout) + } } return err } -func (s *composeService) getContainerStdout(ctx context.Context, container moby.Container) (io.ReadCloser, error) { - var reader io.ReadCloser +func (s *composeService) getContainerStreams(ctx context.Context, container moby.Container) (io.WriteCloser, io.ReadCloser, error) { + var stdout io.ReadCloser + var stdin io.WriteCloser if container.State == containerRunning { logs, err := s.apiClient.ContainerLogs(ctx, container.ID, moby.ContainerLogsOptions{ ShowStdout: true, @@ -426,9 +440,9 @@ func (s *composeService) getContainerStdout(ctx context.Context, container moby. Follow: true, }) if err != nil { - return nil, err + return nil, nil, err } - reader = logs + stdout = logs } else { cnx, err := s.apiClient.ContainerAttach(ctx, container.ID, moby.ContainerAttachOptions{ Stream: true, @@ -437,16 +451,12 @@ func (s *composeService) getContainerStdout(ctx context.Context, container moby. Stderr: true, }) if err != nil { - return nil, err - } - reader = containerStdout{cnx} - - err = s.apiClient.ContainerStart(ctx, container.ID, moby.ContainerStartOptions{}) - if err != nil { - return nil, err + return nil, nil, err } + stdout = containerStdout{cnx} + stdin = containerStdin{cnx} } - return reader, nil + return stdin, stdout, nil } func getContainerName(c moby.Container) string { @@ -575,7 +585,7 @@ func loadProjectOptionsFromLabels(c moby.Container) (*cli.ProjectOptions, error) cli.WithName(c.Labels[projectLabel])) } -func (s *composeService) Logs(ctx context.Context, projectName string, w io.Writer) error { +func (s *composeService) Logs(ctx context.Context, projectName string, consumer compose.LogConsumer) error { list, err := s.apiClient.ContainerList(ctx, moby.ContainerListOptions{ Filters: filters.NewArgs( projectFilter(projectName), @@ -584,7 +594,6 @@ func (s *composeService) Logs(ctx context.Context, projectName string, w io.Writ if err != nil { return err } - consumer := formatter.NewLogConsumer(ctx, w) eg, ctx := errgroup.WithContext(ctx) for _, c := range list { service := c.Labels[serviceLabel] @@ -604,7 +613,7 @@ func (s *composeService) Logs(ctx context.Context, projectName string, w io.Writ if err != nil { return err } - w := consumer.GetWriter(service, container.ID) + w := getWriter(service, container.ID, consumer) if container.Config.Tty { _, err = io.Copy(w, r) } else { @@ -616,6 +625,31 @@ func (s *composeService) Logs(ctx context.Context, projectName string, w io.Writ return eg.Wait() } +type splitBuffer struct { + service string + container string + consumer compose.LogConsumer +} + +// getWriter creates a io.Writer that will actually split by line and format by LogConsumer +func getWriter(service, container string, l compose.LogConsumer) io.Writer { + return splitBuffer{ + service: service, + container: container, + consumer: l, + } +} + +func (s splitBuffer) Write(b []byte) (n int, err error) { + split := bytes.Split(b, []byte{'\n'}) + for _, line := range split { + if len(line) != 0 { + s.consumer.Log(s.service, s.container, string(line)) + } + } + return len(b), nil +} + func (s *composeService) Ps(ctx context.Context, projectName string) ([]compose.ServiceStatus, error) { list, err := s.apiClient.ContainerList(ctx, moby.ContainerListOptions{ Filters: filters.NewArgs( @@ -804,7 +838,7 @@ func getContainerCreateOptions(p *types.Project, s types.ServiceConfig, number i StopTimeout: toSeconds(s.StopGracePeriod), } - mountOptions, err := buildContainerMountOptions(p, s, inherit) + mountOptions, err := buildContainerMountOptions(s, inherit) if err != nil { return nil, nil, nil, err } @@ -851,7 +885,7 @@ func buildContainerBindingOptions(s types.ServiceConfig) nat.PortMap { return bindings } -func buildContainerMountOptions(p *types.Project, s types.ServiceConfig, inherit *moby.Container) ([]mount.Mount, error) { +func buildContainerMountOptions(s types.ServiceConfig, inherit *moby.Container) ([]mount.Mount, error) { mounts := []mount.Mount{} var inherited []string if inherit != nil {