Merge pull request from docker/logConsumer

Revisit logs/up API to pass a LogConsumer vs io.Writer
This commit is contained in:
Nicolas De loof 2020-12-07 14:57:43 +01:00 committed by GitHub
commit 576aa46d21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 110 additions and 97 deletions
aci
api
client
compose
cli/cmd/compose
ecs
example
formatter
local

View File

@ -19,7 +19,6 @@ package aci
import (
"context"
"fmt"
"io"
"net/http"
"github.com/compose-spec/compose-go/types"
@ -60,7 +59,7 @@ func (cs *aciComposeService) Create(ctx context.Context, project *types.Project)
return errdefs.ErrNotImplemented
}
func (cs *aciComposeService) Start(ctx context.Context, project *types.Project, w io.Writer) error {
func (cs *aciComposeService) Start(ctx context.Context, project *types.Project, consumer compose.LogConsumer) error {
return errdefs.ErrNotImplemented
}
@ -176,7 +175,7 @@ func (cs *aciComposeService) List(ctx context.Context, project string) ([]compos
return stacks, nil
}
func (cs *aciComposeService) Logs(ctx context.Context, project string, w io.Writer) error {
func (cs *aciComposeService) Logs(ctx context.Context, projectName string, consumer compose.LogConsumer) error {
return errdefs.ErrNotImplemented
}

View File

@ -18,12 +18,11 @@ package client
import (
"context"
"io"
"github.com/compose-spec/compose-go/types"
"github.com/docker/compose-cli/api/compose"
"github.com/docker/compose-cli/errdefs"
"github.com/compose-spec/compose-go/types"
)
type composeService struct {
@ -45,7 +44,7 @@ func (c *composeService) Create(ctx context.Context, project *types.Project) err
return errdefs.ErrNotImplemented
}
func (c *composeService) Start(ctx context.Context, project *types.Project, w io.Writer) error {
func (c *composeService) Start(ctx context.Context, project *types.Project, consumer compose.LogConsumer) error {
return errdefs.ErrNotImplemented
}
@ -57,7 +56,7 @@ func (c *composeService) Down(context.Context, string) error {
return errdefs.ErrNotImplemented
}
func (c *composeService) Logs(context.Context, string, io.Writer) error {
func (c *composeService) Logs(context.Context, string, compose.LogConsumer) error {
return errdefs.ErrNotImplemented
}

View File

@ -18,7 +18,6 @@ package compose
import (
"context"
"io"
"github.com/compose-spec/compose-go/types"
)
@ -34,13 +33,13 @@ type Service interface {
// Create executes the equivalent to a `compose create`
Create(ctx context.Context, project *types.Project) error
// Start executes the equivalent to a `compose start`
Start(ctx context.Context, project *types.Project, w io.Writer) error
Start(ctx context.Context, project *types.Project, consumer LogConsumer) error
// Up executes the equivalent to a `compose up`
Up(ctx context.Context, project *types.Project, detach bool) error
// Down executes the equivalent to a `compose down`
Down(ctx context.Context, projectName string) error
// Logs executes the equivalent to a `compose logs`
Logs(ctx context.Context, projectName string, w io.Writer) error
Logs(ctx context.Context, projectName string, consumer LogConsumer) error
// Ps executes the equivalent to a `compose ps`
Ps(ctx context.Context, projectName string) ([]ServiceStatus, error)
// List executes the equivalent to a `docker stack ls`
@ -89,3 +88,8 @@ type Stack struct {
Status string
Reason string
}
// LogConsumer is a callback to process log messages from services
type LogConsumer interface {
Log(service, container, message string)
}

View File

@ -20,9 +20,10 @@ import (
"context"
"os"
"github.com/spf13/cobra"
"github.com/docker/compose-cli/api/client"
"github.com/docker/compose-cli/formatter"
"github.com/spf13/cobra"
)
func logsCommand() *cobra.Command {
@ -50,5 +51,6 @@ func runLogs(ctx context.Context, opts composeOptions) error {
if err != nil {
return err
}
return c.ComposeService().Logs(ctx, projectName, os.Stdout)
consumer := formatter.NewLogConsumer(ctx, os.Stdout)
return c.ComposeService().Logs(ctx, projectName, consumer)
}

View File

@ -20,16 +20,17 @@ import (
"context"
"errors"
"fmt"
"io"
"os"
"github.com/docker/compose-cli/api/client"
"github.com/docker/compose-cli/api/compose"
"github.com/docker/compose-cli/context/store"
"github.com/docker/compose-cli/formatter"
"github.com/docker/compose-cli/progress"
"github.com/compose-spec/compose-go/cli"
"github.com/compose-spec/compose-go/types"
"github.com/spf13/cobra"
"github.com/docker/compose-cli/api/client"
"github.com/docker/compose-cli/context/store"
"github.com/docker/compose-cli/progress"
)
func upCommand(contextType string) *cobra.Command {
@ -83,12 +84,12 @@ func runCreateStart(ctx context.Context, opts composeOptions, services []string)
return err
}
var w io.Writer
var consumer compose.LogConsumer
if !opts.Detach {
w = os.Stdout
consumer = formatter.NewLogConsumer(ctx, os.Stdout)
}
err = c.ComposeService().Start(ctx, project, w)
err = c.ComposeService().Start(ctx, project, consumer)
if errors.Is(ctx.Err(), context.Canceled) {
fmt.Println("Gracefully stopping...")
ctx = context.Background()

View File

@ -22,7 +22,6 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
@ -56,7 +55,7 @@ func (e ecsLocalSimulation) Create(ctx context.Context, project *types.Project)
return errdefs.ErrNotImplemented
}
func (e ecsLocalSimulation) Start(ctx context.Context, project *types.Project, w io.Writer) error {
func (e ecsLocalSimulation) Start(ctx context.Context, project *types.Project, consumer compose.LogConsumer) error {
return errdefs.ErrNotImplemented
}
@ -181,7 +180,7 @@ services:
return cmd.Run()
}
func (e ecsLocalSimulation) Logs(ctx context.Context, projectName string, w io.Writer) error {
func (e ecsLocalSimulation) Logs(ctx context.Context, projectName string, consumer compose.LogConsumer) error {
list, err := e.moby.ContainerList(ctx, types2.ContainerListOptions{
Filters: filters.NewArgs(filters.Arg("label", "com.docker.compose.project="+projectName)),
})

View File

@ -18,13 +18,11 @@ package ecs
import (
"context"
"io"
"github.com/docker/compose-cli/formatter"
"github.com/docker/compose-cli/api/compose"
)
func (b *ecsAPIService) Logs(ctx context.Context, project string, w io.Writer) error {
consumer := formatter.NewLogConsumer(ctx, w)
err := b.aws.GetLogs(ctx, project, consumer.Log)
func (b *ecsAPIService) Logs(ctx context.Context, projectName string, consumer compose.LogConsumer) error {
err := b.aws.GetLogs(ctx, projectName, consumer.Log)
return err
}

View File

@ -19,13 +19,14 @@ package ecs
import (
"context"
"fmt"
"io"
"os"
"os/signal"
"syscall"
"github.com/compose-spec/compose-go/types"
"github.com/docker/compose-cli/api/compose"
"github.com/docker/compose-cli/errdefs"
"github.com/compose-spec/compose-go/types"
)
func (b *ecsAPIService) Build(ctx context.Context, project *types.Project) error {
@ -44,7 +45,7 @@ func (b *ecsAPIService) Create(ctx context.Context, project *types.Project) erro
return errdefs.ErrNotImplemented
}
func (b *ecsAPIService) Start(ctx context.Context, project *types.Project, w io.Writer) error {
func (b *ecsAPIService) Start(ctx context.Context, project *types.Project, consumer compose.LogConsumer) error {
return errdefs.ErrNotImplemented
}

View File

@ -22,9 +22,6 @@ import (
"context"
"errors"
"fmt"
"io"
"github.com/compose-spec/compose-go/types"
"github.com/docker/compose-cli/api/compose"
"github.com/docker/compose-cli/api/containers"
@ -34,6 +31,8 @@ import (
"github.com/docker/compose-cli/backend"
"github.com/docker/compose-cli/context/cloud"
"github.com/docker/compose-cli/errdefs"
"github.com/compose-spec/compose-go/types"
)
type apiService struct {
@ -155,7 +154,7 @@ func (cs *composeService) Create(ctx context.Context, project *types.Project) er
return errdefs.ErrNotImplemented
}
func (cs *composeService) Start(ctx context.Context, project *types.Project, w io.Writer) error {
func (cs *composeService) Start(ctx context.Context, project *types.Project, consumer compose.LogConsumer) error {
return errdefs.ErrNotImplemented
}
@ -176,7 +175,7 @@ func (cs *composeService) Ps(ctx context.Context, project string) ([]compose.Ser
func (cs *composeService) List(ctx context.Context, project string) ([]compose.Stack, error) {
return nil, errdefs.ErrNotImplemented
}
func (cs *composeService) Logs(ctx context.Context, project string, w io.Writer) error {
func (cs *composeService) Logs(ctx context.Context, projectName string, consumer compose.LogConsumer) error {
return errdefs.ErrNotImplemented
}

View File

@ -23,11 +23,13 @@ import (
"io"
"strconv"
"strings"
"github.com/docker/compose-cli/api/compose"
)
// NewLogConsumer creates a new LogConsumer
func NewLogConsumer(ctx context.Context, w io.Writer) LogConsumer {
return LogConsumer{
func NewLogConsumer(ctx context.Context, w io.Writer) compose.LogConsumer {
return &logConsumer{
ctx: ctx,
colors: map[string]colorFunc{},
width: 0,
@ -36,7 +38,7 @@ func NewLogConsumer(ctx context.Context, w io.Writer) LogConsumer {
}
// Log formats a log message as received from service/container
func (l *LogConsumer) Log(service, container, message string) {
func (l *logConsumer) Log(service, container, message string) {
if l.ctx.Err() != nil {
return
}
@ -54,16 +56,7 @@ func (l *LogConsumer) Log(service, container, message string) {
}
}
// GetWriter creates a io.Writer that will actually split by line and format by LogConsumer
func (l *LogConsumer) GetWriter(service, container string) io.Writer {
return splitBuffer{
service: service,
container: container,
consumer: l,
}
}
func (l *LogConsumer) computeWidth() {
func (l *logConsumer) computeWidth() {
width := 0
for n := range l.colors {
if len(n) > width {
@ -74,25 +67,9 @@ func (l *LogConsumer) computeWidth() {
}
// LogConsumer consume logs from services and format them
type LogConsumer struct {
type logConsumer struct {
ctx context.Context
colors map[string]colorFunc
width int
writer io.Writer
}
type splitBuffer struct {
service string
container string
consumer *LogConsumer
}
func (s splitBuffer) Write(b []byte) (n int, err error) {
split := bytes.Split(b, []byte{'\n'})
for _, line := range split {
if len(line) != 0 {
s.consumer.Log(s.service, s.container, string(line))
}
}
return len(b), nil
}

View File

@ -17,6 +17,7 @@
package local
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
@ -52,7 +53,6 @@ import (
"github.com/docker/compose-cli/api/compose"
"github.com/docker/compose-cli/config"
errdefs2 "github.com/docker/compose-cli/errdefs"
"github.com/docker/compose-cli/formatter"
"github.com/docker/compose-cli/progress"
)
@ -339,10 +339,10 @@ func (s *composeService) Create(ctx context.Context, project *types.Project) err
})
}
func (s *composeService) Start(ctx context.Context, project *types.Project, w io.Writer) error {
func (s *composeService) Start(ctx context.Context, project *types.Project, consumer compose.LogConsumer) error {
var group *errgroup.Group
if w != nil {
eg, err := s.attach(ctx, project, w)
if consumer != nil {
eg, err := s.attach(ctx, project, consumer)
if err != nil {
return err
}
@ -361,8 +361,7 @@ func (s *composeService) Start(ctx context.Context, project *types.Project, w io
return nil
}
func (s *composeService) attach(ctx context.Context, project *types.Project, w io.Writer) (*errgroup.Group, error) {
consumer := formatter.NewLogConsumer(ctx, w)
func (s *composeService) attach(ctx context.Context, project *types.Project, consumer compose.LogConsumer) (*errgroup.Group, error) {
containers, err := s.apiClient.ContainerList(ctx, moby.ContainerListOptions{
Filters: filters.NewArgs(
projectFilter(project.Name),
@ -389,34 +388,49 @@ func (s *composeService) attach(ctx context.Context, project *types.Project, w i
return eg, nil
}
func (s *composeService) attachContainer(ctx context.Context, container moby.Container, consumer formatter.LogConsumer, project *types.Project) error {
func (s *composeService) attachContainer(ctx context.Context, container moby.Container, consumer compose.LogConsumer, project *types.Project) error {
serviceName := container.Labels[serviceLabel]
w := consumer.GetWriter(serviceName, container.ID)
w := getWriter(serviceName, container.ID, consumer)
service, err := project.GetService(serviceName)
if err != nil {
return err
}
reader, err := s.getContainerStdout(ctx, container)
return s.attachContainerStreams(ctx, container, service.Tty, nil, w)
}
func (s *composeService) attachContainerStreams(ctx context.Context, container moby.Container, tty bool, r io.Reader, w io.Writer) error {
stdin, stdout, err := s.getContainerStreams(ctx, container)
if err != nil {
return err
}
go func() {
<-ctx.Done()
reader.Close() //nolint:errcheck
stdout.Close() //nolint:errcheck
stdin.Close() //nolint:errcheck
}()
if service.Tty {
_, err = io.Copy(w, reader)
} else {
_, err = stdcopy.StdCopy(w, w, reader)
if r != nil && stdin != nil {
go func() {
io.Copy(stdin, r) //nolint:errcheck
}()
}
if w != nil {
if tty {
_, err = io.Copy(w, stdout)
} else {
_, err = stdcopy.StdCopy(w, w, stdout)
}
}
return err
}
func (s *composeService) getContainerStdout(ctx context.Context, container moby.Container) (io.ReadCloser, error) {
var reader io.ReadCloser
func (s *composeService) getContainerStreams(ctx context.Context, container moby.Container) (io.WriteCloser, io.ReadCloser, error) {
var stdout io.ReadCloser
var stdin io.WriteCloser
if container.State == containerRunning {
logs, err := s.apiClient.ContainerLogs(ctx, container.ID, moby.ContainerLogsOptions{
ShowStdout: true,
@ -424,9 +438,9 @@ func (s *composeService) getContainerStdout(ctx context.Context, container moby.
Follow: true,
})
if err != nil {
return nil, err
return nil, nil, err
}
reader = logs
stdout = logs
} else {
cnx, err := s.apiClient.ContainerAttach(ctx, container.ID, moby.ContainerAttachOptions{
Stream: true,
@ -435,16 +449,12 @@ func (s *composeService) getContainerStdout(ctx context.Context, container moby.
Stderr: true,
})
if err != nil {
return nil, err
}
reader = containerStdout{cnx}
err = s.apiClient.ContainerStart(ctx, container.ID, moby.ContainerStartOptions{})
if err != nil {
return nil, err
return nil, nil, err
}
stdout = containerStdout{cnx}
stdin = containerStdin{cnx}
}
return reader, nil
return stdin, stdout, nil
}
func getContainerName(c moby.Container) string {
@ -575,7 +585,7 @@ func loadProjectOptionsFromLabels(c moby.Container) (*cli.ProjectOptions, error)
cli.WithName(c.Labels[projectLabel]))
}
func (s *composeService) Logs(ctx context.Context, projectName string, w io.Writer) error {
func (s *composeService) Logs(ctx context.Context, projectName string, consumer compose.LogConsumer) error {
list, err := s.apiClient.ContainerList(ctx, moby.ContainerListOptions{
Filters: filters.NewArgs(
projectFilter(projectName),
@ -584,7 +594,6 @@ func (s *composeService) Logs(ctx context.Context, projectName string, w io.Writ
if err != nil {
return err
}
consumer := formatter.NewLogConsumer(ctx, w)
eg, ctx := errgroup.WithContext(ctx)
for _, c := range list {
service := c.Labels[serviceLabel]
@ -604,7 +613,7 @@ func (s *composeService) Logs(ctx context.Context, projectName string, w io.Writ
if err != nil {
return err
}
w := consumer.GetWriter(service, container.ID)
w := getWriter(service, container.ID, consumer)
if container.Config.Tty {
_, err = io.Copy(w, r)
} else {
@ -616,6 +625,31 @@ func (s *composeService) Logs(ctx context.Context, projectName string, w io.Writ
return eg.Wait()
}
type splitBuffer struct {
service string
container string
consumer compose.LogConsumer
}
// getWriter creates a io.Writer that will actually split by line and format by LogConsumer
func getWriter(service, container string, l compose.LogConsumer) io.Writer {
return splitBuffer{
service: service,
container: container,
consumer: l,
}
}
func (s splitBuffer) Write(b []byte) (n int, err error) {
split := bytes.Split(b, []byte{'\n'})
for _, line := range split {
if len(line) != 0 {
s.consumer.Log(s.service, s.container, string(line))
}
}
return len(b), nil
}
func (s *composeService) Ps(ctx context.Context, projectName string) ([]compose.ServiceStatus, error) {
list, err := s.apiClient.ContainerList(ctx, moby.ContainerListOptions{
Filters: filters.NewArgs(