Revisit logs/up API to pass a LogConsumer vs io.Writer

Signed-off-by: Nicolas De Loof <nicolas.deloof@gmail.com>
This commit is contained in:
Nicolas De Loof 2020-12-07 09:09:12 +01:00
parent 4826d5155a
commit ca123e08eb
No known key found for this signature in database
GPG Key ID: 9858809D6F8F6E7E
11 changed files with 112 additions and 99 deletions

View File

@ -19,7 +19,6 @@ package aci
import ( import (
"context" "context"
"fmt" "fmt"
"io"
"net/http" "net/http"
"github.com/compose-spec/compose-go/types" "github.com/compose-spec/compose-go/types"
@ -60,7 +59,7 @@ func (cs *aciComposeService) Create(ctx context.Context, project *types.Project)
return errdefs.ErrNotImplemented return errdefs.ErrNotImplemented
} }
func (cs *aciComposeService) Start(ctx context.Context, project *types.Project, w io.Writer) error { func (cs *aciComposeService) Start(ctx context.Context, project *types.Project, consumer compose.LogConsumer) error {
return errdefs.ErrNotImplemented return errdefs.ErrNotImplemented
} }
@ -176,7 +175,7 @@ func (cs *aciComposeService) List(ctx context.Context, project string) ([]compos
return stacks, nil return stacks, nil
} }
func (cs *aciComposeService) Logs(ctx context.Context, project string, w io.Writer) error { func (cs *aciComposeService) Logs(ctx context.Context, projectName string, consumer compose.LogConsumer) error {
return errdefs.ErrNotImplemented return errdefs.ErrNotImplemented
} }

View File

@ -18,12 +18,11 @@ package client
import ( import (
"context" "context"
"io"
"github.com/compose-spec/compose-go/types"
"github.com/docker/compose-cli/api/compose" "github.com/docker/compose-cli/api/compose"
"github.com/docker/compose-cli/errdefs" "github.com/docker/compose-cli/errdefs"
"github.com/compose-spec/compose-go/types"
) )
type composeService struct { type composeService struct {
@ -45,7 +44,7 @@ func (c *composeService) Create(ctx context.Context, project *types.Project) err
return errdefs.ErrNotImplemented return errdefs.ErrNotImplemented
} }
func (c *composeService) Start(ctx context.Context, project *types.Project, w io.Writer) error { func (c *composeService) Start(ctx context.Context, project *types.Project, consumer compose.LogConsumer) error {
return errdefs.ErrNotImplemented return errdefs.ErrNotImplemented
} }
@ -57,7 +56,7 @@ func (c *composeService) Down(context.Context, string) error {
return errdefs.ErrNotImplemented return errdefs.ErrNotImplemented
} }
func (c *composeService) Logs(context.Context, string, io.Writer) error { func (c *composeService) Logs(context.Context, string, compose.LogConsumer) error {
return errdefs.ErrNotImplemented return errdefs.ErrNotImplemented
} }

View File

@ -18,7 +18,6 @@ package compose
import ( import (
"context" "context"
"io"
"github.com/compose-spec/compose-go/types" "github.com/compose-spec/compose-go/types"
) )
@ -34,13 +33,13 @@ type Service interface {
// Create executes the equivalent to a `compose create` // Create executes the equivalent to a `compose create`
Create(ctx context.Context, project *types.Project) error Create(ctx context.Context, project *types.Project) error
// Start executes the equivalent to a `compose start` // Start executes the equivalent to a `compose start`
Start(ctx context.Context, project *types.Project, w io.Writer) error Start(ctx context.Context, project *types.Project, consumer LogConsumer) error
// Up executes the equivalent to a `compose up` // Up executes the equivalent to a `compose up`
Up(ctx context.Context, project *types.Project, detach bool) error Up(ctx context.Context, project *types.Project, detach bool) error
// Down executes the equivalent to a `compose down` // Down executes the equivalent to a `compose down`
Down(ctx context.Context, projectName string) error Down(ctx context.Context, projectName string) error
// Logs executes the equivalent to a `compose logs` // Logs executes the equivalent to a `compose logs`
Logs(ctx context.Context, projectName string, w io.Writer) error Logs(ctx context.Context, projectName string, consumer LogConsumer) error
// Ps executes the equivalent to a `compose ps` // Ps executes the equivalent to a `compose ps`
Ps(ctx context.Context, projectName string) ([]ServiceStatus, error) Ps(ctx context.Context, projectName string) ([]ServiceStatus, error)
// List executes the equivalent to a `docker stack ls` // List executes the equivalent to a `docker stack ls`
@ -89,3 +88,8 @@ type Stack struct {
Status string Status string
Reason string Reason string
} }
// LogConsumer is a callback to process log messages from services
type LogConsumer interface {
Log(service, container, message string)
}

View File

@ -20,9 +20,10 @@ import (
"context" "context"
"os" "os"
"github.com/spf13/cobra"
"github.com/docker/compose-cli/api/client" "github.com/docker/compose-cli/api/client"
"github.com/docker/compose-cli/formatter"
"github.com/spf13/cobra"
) )
func logsCommand() *cobra.Command { func logsCommand() *cobra.Command {
@ -50,5 +51,6 @@ func runLogs(ctx context.Context, opts composeOptions) error {
if err != nil { if err != nil {
return err return err
} }
return c.ComposeService().Logs(ctx, projectName, os.Stdout) consumer := formatter.NewLogConsumer(ctx, os.Stdout)
return c.ComposeService().Logs(ctx, projectName, consumer)
} }

View File

@ -20,16 +20,17 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"io"
"os" "os"
"github.com/docker/compose-cli/api/client"
"github.com/docker/compose-cli/api/compose"
"github.com/docker/compose-cli/context/store"
"github.com/docker/compose-cli/formatter"
"github.com/docker/compose-cli/progress"
"github.com/compose-spec/compose-go/cli" "github.com/compose-spec/compose-go/cli"
"github.com/compose-spec/compose-go/types" "github.com/compose-spec/compose-go/types"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/docker/compose-cli/api/client"
"github.com/docker/compose-cli/context/store"
"github.com/docker/compose-cli/progress"
) )
func upCommand(contextType string) *cobra.Command { func upCommand(contextType string) *cobra.Command {
@ -83,12 +84,12 @@ func runCreateStart(ctx context.Context, opts composeOptions, services []string)
return err return err
} }
var w io.Writer var consumer compose.LogConsumer
if !opts.Detach { if !opts.Detach {
w = os.Stdout consumer = formatter.NewLogConsumer(ctx, os.Stdout)
} }
err = c.ComposeService().Start(ctx, project, w) err = c.ComposeService().Start(ctx, project, consumer)
if errors.Is(ctx.Err(), context.Canceled) { if errors.Is(ctx.Err(), context.Canceled) {
fmt.Println("Gracefully stopping...") fmt.Println("Gracefully stopping...")
ctx = context.Background() ctx = context.Background()

View File

@ -22,7 +22,6 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"os" "os"
"os/exec" "os/exec"
"path/filepath" "path/filepath"
@ -56,7 +55,7 @@ func (e ecsLocalSimulation) Create(ctx context.Context, project *types.Project)
return errdefs.ErrNotImplemented return errdefs.ErrNotImplemented
} }
func (e ecsLocalSimulation) Start(ctx context.Context, project *types.Project, w io.Writer) error { func (e ecsLocalSimulation) Start(ctx context.Context, project *types.Project, consumer compose.LogConsumer) error {
return errdefs.ErrNotImplemented return errdefs.ErrNotImplemented
} }
@ -181,7 +180,7 @@ services:
return cmd.Run() return cmd.Run()
} }
func (e ecsLocalSimulation) Logs(ctx context.Context, projectName string, w io.Writer) error { func (e ecsLocalSimulation) Logs(ctx context.Context, projectName string, consumer compose.LogConsumer) error {
list, err := e.moby.ContainerList(ctx, types2.ContainerListOptions{ list, err := e.moby.ContainerList(ctx, types2.ContainerListOptions{
Filters: filters.NewArgs(filters.Arg("label", "com.docker.compose.project="+projectName)), Filters: filters.NewArgs(filters.Arg("label", "com.docker.compose.project="+projectName)),
}) })

View File

@ -18,13 +18,11 @@ package ecs
import ( import (
"context" "context"
"io"
"github.com/docker/compose-cli/formatter" "github.com/docker/compose-cli/api/compose"
) )
func (b *ecsAPIService) Logs(ctx context.Context, project string, w io.Writer) error { func (b *ecsAPIService) Logs(ctx context.Context, projectName string, consumer compose.LogConsumer) error {
consumer := formatter.NewLogConsumer(ctx, w) err := b.aws.GetLogs(ctx, projectName, consumer.Log)
err := b.aws.GetLogs(ctx, project, consumer.Log)
return err return err
} }

View File

@ -19,13 +19,14 @@ package ecs
import ( import (
"context" "context"
"fmt" "fmt"
"io"
"os" "os"
"os/signal" "os/signal"
"syscall" "syscall"
"github.com/compose-spec/compose-go/types" "github.com/docker/compose-cli/api/compose"
"github.com/docker/compose-cli/errdefs" "github.com/docker/compose-cli/errdefs"
"github.com/compose-spec/compose-go/types"
) )
func (b *ecsAPIService) Build(ctx context.Context, project *types.Project) error { func (b *ecsAPIService) Build(ctx context.Context, project *types.Project) error {
@ -44,7 +45,7 @@ func (b *ecsAPIService) Create(ctx context.Context, project *types.Project) erro
return errdefs.ErrNotImplemented return errdefs.ErrNotImplemented
} }
func (b *ecsAPIService) Start(ctx context.Context, project *types.Project, w io.Writer) error { func (b *ecsAPIService) Start(ctx context.Context, project *types.Project, consumer compose.LogConsumer) error {
return errdefs.ErrNotImplemented return errdefs.ErrNotImplemented
} }

View File

@ -22,9 +22,6 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"io"
"github.com/compose-spec/compose-go/types"
"github.com/docker/compose-cli/api/compose" "github.com/docker/compose-cli/api/compose"
"github.com/docker/compose-cli/api/containers" "github.com/docker/compose-cli/api/containers"
@ -34,6 +31,8 @@ import (
"github.com/docker/compose-cli/backend" "github.com/docker/compose-cli/backend"
"github.com/docker/compose-cli/context/cloud" "github.com/docker/compose-cli/context/cloud"
"github.com/docker/compose-cli/errdefs" "github.com/docker/compose-cli/errdefs"
"github.com/compose-spec/compose-go/types"
) )
type apiService struct { type apiService struct {
@ -155,7 +154,7 @@ func (cs *composeService) Create(ctx context.Context, project *types.Project) er
return errdefs.ErrNotImplemented return errdefs.ErrNotImplemented
} }
func (cs *composeService) Start(ctx context.Context, project *types.Project, w io.Writer) error { func (cs *composeService) Start(ctx context.Context, project *types.Project, consumer compose.LogConsumer) error {
return errdefs.ErrNotImplemented return errdefs.ErrNotImplemented
} }
@ -176,7 +175,7 @@ func (cs *composeService) Ps(ctx context.Context, project string) ([]compose.Ser
func (cs *composeService) List(ctx context.Context, project string) ([]compose.Stack, error) { func (cs *composeService) List(ctx context.Context, project string) ([]compose.Stack, error) {
return nil, errdefs.ErrNotImplemented return nil, errdefs.ErrNotImplemented
} }
func (cs *composeService) Logs(ctx context.Context, project string, w io.Writer) error { func (cs *composeService) Logs(ctx context.Context, projectName string, consumer compose.LogConsumer) error {
return errdefs.ErrNotImplemented return errdefs.ErrNotImplemented
} }

View File

@ -23,11 +23,13 @@ import (
"io" "io"
"strconv" "strconv"
"strings" "strings"
"github.com/docker/compose-cli/api/compose"
) )
// NewLogConsumer creates a new LogConsumer // NewLogConsumer creates a new LogConsumer
func NewLogConsumer(ctx context.Context, w io.Writer) LogConsumer { func NewLogConsumer(ctx context.Context, w io.Writer) compose.LogConsumer {
return LogConsumer{ return &logConsumer{
ctx: ctx, ctx: ctx,
colors: map[string]colorFunc{}, colors: map[string]colorFunc{},
width: 0, width: 0,
@ -36,7 +38,7 @@ func NewLogConsumer(ctx context.Context, w io.Writer) LogConsumer {
} }
// Log formats a log message as received from service/container // Log formats a log message as received from service/container
func (l *LogConsumer) Log(service, container, message string) { func (l *logConsumer) Log(service, container, message string) {
if l.ctx.Err() != nil { if l.ctx.Err() != nil {
return return
} }
@ -54,16 +56,7 @@ func (l *LogConsumer) Log(service, container, message string) {
} }
} }
// GetWriter creates a io.Writer that will actually split by line and format by LogConsumer func (l *logConsumer) computeWidth() {
func (l *LogConsumer) GetWriter(service, container string) io.Writer {
return splitBuffer{
service: service,
container: container,
consumer: l,
}
}
func (l *LogConsumer) computeWidth() {
width := 0 width := 0
for n := range l.colors { for n := range l.colors {
if len(n) > width { if len(n) > width {
@ -74,25 +67,9 @@ func (l *LogConsumer) computeWidth() {
} }
// LogConsumer consume logs from services and format them // LogConsumer consume logs from services and format them
type LogConsumer struct { type logConsumer struct {
ctx context.Context ctx context.Context
colors map[string]colorFunc colors map[string]colorFunc
width int width int
writer io.Writer writer io.Writer
} }
type splitBuffer struct {
service string
container string
consumer *LogConsumer
}
func (s splitBuffer) Write(b []byte) (n int, err error) {
split := bytes.Split(b, []byte{'\n'})
for _, line := range split {
if len(line) != 0 {
s.consumer.Log(s.service, s.container, string(line))
}
}
return len(b), nil
}

View File

@ -19,6 +19,7 @@
package local package local
import ( import (
"bytes"
"context" "context"
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
@ -54,7 +55,6 @@ import (
"github.com/docker/compose-cli/api/compose" "github.com/docker/compose-cli/api/compose"
"github.com/docker/compose-cli/config" "github.com/docker/compose-cli/config"
errdefs2 "github.com/docker/compose-cli/errdefs" errdefs2 "github.com/docker/compose-cli/errdefs"
"github.com/docker/compose-cli/formatter"
"github.com/docker/compose-cli/progress" "github.com/docker/compose-cli/progress"
) )
@ -341,10 +341,10 @@ func (s *composeService) Create(ctx context.Context, project *types.Project) err
}) })
} }
func (s *composeService) Start(ctx context.Context, project *types.Project, w io.Writer) error { func (s *composeService) Start(ctx context.Context, project *types.Project, consumer compose.LogConsumer) error {
var group *errgroup.Group var group *errgroup.Group
if w != nil { if consumer != nil {
eg, err := s.attach(ctx, project, w) eg, err := s.attach(ctx, project, consumer)
if err != nil { if err != nil {
return err return err
} }
@ -363,8 +363,7 @@ func (s *composeService) Start(ctx context.Context, project *types.Project, w io
return nil return nil
} }
func (s *composeService) attach(ctx context.Context, project *types.Project, w io.Writer) (*errgroup.Group, error) { func (s *composeService) attach(ctx context.Context, project *types.Project, consumer compose.LogConsumer) (*errgroup.Group, error) {
consumer := formatter.NewLogConsumer(ctx, w)
containers, err := s.apiClient.ContainerList(ctx, moby.ContainerListOptions{ containers, err := s.apiClient.ContainerList(ctx, moby.ContainerListOptions{
Filters: filters.NewArgs( Filters: filters.NewArgs(
projectFilter(project.Name), projectFilter(project.Name),
@ -391,34 +390,49 @@ func (s *composeService) attach(ctx context.Context, project *types.Project, w i
return eg, nil return eg, nil
} }
func (s *composeService) attachContainer(ctx context.Context, container moby.Container, consumer formatter.LogConsumer, project *types.Project) error { func (s *composeService) attachContainer(ctx context.Context, container moby.Container, consumer compose.LogConsumer, project *types.Project) error {
serviceName := container.Labels[serviceLabel] serviceName := container.Labels[serviceLabel]
w := consumer.GetWriter(serviceName, container.ID) w := getWriter(serviceName, container.ID, consumer)
service, err := project.GetService(serviceName) service, err := project.GetService(serviceName)
if err != nil { if err != nil {
return err return err
} }
reader, err := s.getContainerStdout(ctx, container) return s.attachContainerStreams(ctx, container, service.Tty, nil, w)
}
func (s *composeService) attachContainerStreams(ctx context.Context, container moby.Container, tty bool, r io.Reader, w io.Writer) error {
stdin, stdout, err := s.getContainerStreams(ctx, container)
if err != nil { if err != nil {
return err return err
} }
go func() { go func() {
<-ctx.Done() <-ctx.Done()
reader.Close() //nolint:errcheck stdout.Close() //nolint:errcheck
stdin.Close() //nolint:errcheck
}() }()
if service.Tty { if r != nil && stdin != nil {
_, err = io.Copy(w, reader) go func() {
io.Copy(stdin, r) //nolint:errcheck
}()
}
if w != nil {
if tty {
_, err = io.Copy(w, stdout)
} else { } else {
_, err = stdcopy.StdCopy(w, w, reader) _, err = stdcopy.StdCopy(w, w, stdout)
}
} }
return err return err
} }
func (s *composeService) getContainerStdout(ctx context.Context, container moby.Container) (io.ReadCloser, error) { func (s *composeService) getContainerStreams(ctx context.Context, container moby.Container) (io.WriteCloser, io.ReadCloser, error) {
var reader io.ReadCloser var stdout io.ReadCloser
var stdin io.WriteCloser
if container.State == containerRunning { if container.State == containerRunning {
logs, err := s.apiClient.ContainerLogs(ctx, container.ID, moby.ContainerLogsOptions{ logs, err := s.apiClient.ContainerLogs(ctx, container.ID, moby.ContainerLogsOptions{
ShowStdout: true, ShowStdout: true,
@ -426,9 +440,9 @@ func (s *composeService) getContainerStdout(ctx context.Context, container moby.
Follow: true, Follow: true,
}) })
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
reader = logs stdout = logs
} else { } else {
cnx, err := s.apiClient.ContainerAttach(ctx, container.ID, moby.ContainerAttachOptions{ cnx, err := s.apiClient.ContainerAttach(ctx, container.ID, moby.ContainerAttachOptions{
Stream: true, Stream: true,
@ -437,16 +451,12 @@ func (s *composeService) getContainerStdout(ctx context.Context, container moby.
Stderr: true, Stderr: true,
}) })
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
reader = containerStdout{cnx} stdout = containerStdout{cnx}
stdin = containerStdin{cnx}
err = s.apiClient.ContainerStart(ctx, container.ID, moby.ContainerStartOptions{})
if err != nil {
return nil, err
} }
} return stdin, stdout, nil
return reader, nil
} }
func getContainerName(c moby.Container) string { func getContainerName(c moby.Container) string {
@ -575,7 +585,7 @@ func loadProjectOptionsFromLabels(c moby.Container) (*cli.ProjectOptions, error)
cli.WithName(c.Labels[projectLabel])) cli.WithName(c.Labels[projectLabel]))
} }
func (s *composeService) Logs(ctx context.Context, projectName string, w io.Writer) error { func (s *composeService) Logs(ctx context.Context, projectName string, consumer compose.LogConsumer) error {
list, err := s.apiClient.ContainerList(ctx, moby.ContainerListOptions{ list, err := s.apiClient.ContainerList(ctx, moby.ContainerListOptions{
Filters: filters.NewArgs( Filters: filters.NewArgs(
projectFilter(projectName), projectFilter(projectName),
@ -584,7 +594,6 @@ func (s *composeService) Logs(ctx context.Context, projectName string, w io.Writ
if err != nil { if err != nil {
return err return err
} }
consumer := formatter.NewLogConsumer(ctx, w)
eg, ctx := errgroup.WithContext(ctx) eg, ctx := errgroup.WithContext(ctx)
for _, c := range list { for _, c := range list {
service := c.Labels[serviceLabel] service := c.Labels[serviceLabel]
@ -604,7 +613,7 @@ func (s *composeService) Logs(ctx context.Context, projectName string, w io.Writ
if err != nil { if err != nil {
return err return err
} }
w := consumer.GetWriter(service, container.ID) w := getWriter(service, container.ID, consumer)
if container.Config.Tty { if container.Config.Tty {
_, err = io.Copy(w, r) _, err = io.Copy(w, r)
} else { } else {
@ -616,6 +625,31 @@ func (s *composeService) Logs(ctx context.Context, projectName string, w io.Writ
return eg.Wait() return eg.Wait()
} }
type splitBuffer struct {
service string
container string
consumer compose.LogConsumer
}
// getWriter creates a io.Writer that will actually split by line and format by LogConsumer
func getWriter(service, container string, l compose.LogConsumer) io.Writer {
return splitBuffer{
service: service,
container: container,
consumer: l,
}
}
func (s splitBuffer) Write(b []byte) (n int, err error) {
split := bytes.Split(b, []byte{'\n'})
for _, line := range split {
if len(line) != 0 {
s.consumer.Log(s.service, s.container, string(line))
}
}
return len(b), nil
}
func (s *composeService) Ps(ctx context.Context, projectName string) ([]compose.ServiceStatus, error) { func (s *composeService) Ps(ctx context.Context, projectName string) ([]compose.ServiceStatus, error) {
list, err := s.apiClient.ContainerList(ctx, moby.ContainerListOptions{ list, err := s.apiClient.ContainerList(ctx, moby.ContainerListOptions{
Filters: filters.NewArgs( Filters: filters.NewArgs(
@ -804,7 +838,7 @@ func getContainerCreateOptions(p *types.Project, s types.ServiceConfig, number i
StopTimeout: toSeconds(s.StopGracePeriod), StopTimeout: toSeconds(s.StopGracePeriod),
} }
mountOptions, err := buildContainerMountOptions(p, s, inherit) mountOptions, err := buildContainerMountOptions(s, inherit)
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, nil, err
} }
@ -851,7 +885,7 @@ func buildContainerBindingOptions(s types.ServiceConfig) nat.PortMap {
return bindings return bindings
} }
func buildContainerMountOptions(p *types.Project, s types.ServiceConfig, inherit *moby.Container) ([]mount.Mount, error) { func buildContainerMountOptions(s types.ServiceConfig, inherit *moby.Container) ([]mount.Mount, error) {
mounts := []mount.Mount{} mounts := []mount.Mount{}
var inherited []string var inherited []string
if inherit != nil { if inherit != nil {