attach to log stream by default on `up`

Signed-off-by: Nicolas De Loof <nicolas.deloof@gmail.com>
This commit is contained in:
Nicolas De Loof 2020-12-01 20:58:03 +01:00
parent 809c2bc45a
commit 39e4107e12
No known key found for this signature in database
GPG Key ID: 9858809D6F8F6E7E
15 changed files with 256 additions and 61 deletions

View File

@ -56,7 +56,7 @@ func (cs *aciComposeService) Pull(ctx context.Context, project *types.Project) e
return errdefs.ErrNotImplemented
}
func (cs *aciComposeService) Up(ctx context.Context, project *types.Project, detach bool) error {
func (cs *aciComposeService) Up(ctx context.Context, project *types.Project, detach bool, w io.Writer) error {
logrus.Debugf("Up on project with name %q", project.Name)
if err := autocreateFileshares(ctx, project); err != nil {

View File

@ -41,7 +41,7 @@ func (c *composeService) Pull(ctx context.Context, project *types.Project) error
return errdefs.ErrNotImplemented
}
func (c *composeService) Up(context.Context, *types.Project, bool) error {
func (c *composeService) Up(context.Context, *types.Project, bool, io.Writer) error {
return errdefs.ErrNotImplemented
}

View File

@ -32,7 +32,7 @@ type Service interface {
// Pull executes the equivalent of a `compose pull`
Pull(ctx context.Context, project *types.Project) error
// Up executes the equivalent to a `compose up`
Up(ctx context.Context, project *types.Project, detach bool) error
Up(ctx context.Context, project *types.Project, detach bool, w io.Writer) error
// Down executes the equivalent to a `compose down`
Down(ctx context.Context, projectName string) error
// Logs executes the equivalent to a `compose logs`

View File

@ -18,13 +18,14 @@ package compose
import (
"context"
"github.com/docker/compose-cli/progress"
"os"
"github.com/compose-spec/compose-go/cli"
"github.com/spf13/cobra"
"github.com/docker/compose-cli/api/client"
"github.com/docker/compose-cli/context/store"
"github.com/docker/compose-cli/progress"
)
func upCommand(contextType string) *cobra.Command {
@ -54,25 +55,26 @@ func runUp(ctx context.Context, opts composeOptions, services []string) error {
return err
}
_, err = progress.Run(ctx, func(ctx context.Context) (string, error) {
options, err := opts.toProjectOptions()
if err != nil {
return "", err
}
project, err := cli.ProjectFromOptions(options)
if err != nil {
return "", err
}
if opts.DomainName != "" {
// arbitrarily set the domain name on the first service ; ACI backend will expose the entire project
project.Services[0].DomainName = opts.DomainName
}
options, err := opts.toProjectOptions()
if err != nil {
return err
}
project, err := cli.ProjectFromOptions(options)
if err != nil {
return err
}
if opts.DomainName != "" {
// arbitrarily set the domain name on the first service ; ACI backend will expose the entire project
project.Services[0].DomainName = opts.DomainName
}
err = filter(project, services)
if err != nil {
return "", err
}
return "", c.ComposeService().Up(ctx, project, opts.Detach)
err = filter(project, services)
if err != nil {
return err
}
_, err = progress.Run(ctx, func(ctx context.Context) (string, error) {
return "", c.ComposeService().Up(ctx, project, opts.Detach, os.Stdout)
})
return err
}

View File

@ -53,7 +53,7 @@ func (e ecsLocalSimulation) Pull(ctx context.Context, project *types.Project) er
return errdefs.ErrNotImplemented
}
func (e ecsLocalSimulation) Up(ctx context.Context, project *types.Project, detach bool) error {
func (e ecsLocalSimulation) Up(ctx context.Context, project *types.Project, detach bool, w io.Writer) error {
cmd := exec.Command("docker-compose", "version", "--short")
b := bytes.Buffer{}
b.WriteString("v")

View File

@ -24,7 +24,7 @@ import (
)
func (b *ecsAPIService) Logs(ctx context.Context, project string, w io.Writer) error {
consumer := formatter.NewLogConsumer(w)
consumer := formatter.NewLogConsumer(ctx, w)
err := b.aws.GetLogs(ctx, project, consumer.Log)
return err
}

View File

@ -19,6 +19,7 @@ package ecs
import (
"context"
"fmt"
"io"
"os"
"os/signal"
"syscall"
@ -39,7 +40,7 @@ func (b *ecsAPIService) Pull(ctx context.Context, project *types.Project) error
return errdefs.ErrNotImplemented
}
func (b *ecsAPIService) Up(ctx context.Context, project *types.Project, detach bool) error {
func (b *ecsAPIService) Up(ctx context.Context, project *types.Project, detach bool, w io.Writer) error {
err := b.aws.CheckRequirements(ctx, b.Region)
if err != nil {
return err

View File

@ -151,7 +151,7 @@ func (cs *composeService) Pull(ctx context.Context, project *types.Project) erro
return errdefs.ErrNotImplemented
}
func (cs *composeService) Up(ctx context.Context, project *types.Project, detach bool) error {
func (cs *composeService) Up(ctx context.Context, project *types.Project, detach bool, w io.Writer) error {
fmt.Printf("Up command on project %q", project.Name)
return nil
}

View File

@ -18,6 +18,7 @@ package formatter
import (
"bytes"
"context"
"fmt"
"io"
"strconv"
@ -25,8 +26,9 @@ import (
)
// NewLogConsumer creates a new LogConsumer
func NewLogConsumer(w io.Writer) LogConsumer {
func NewLogConsumer(ctx context.Context, w io.Writer) LogConsumer {
return LogConsumer{
ctx: ctx,
colors: map[string]colorFunc{},
width: 0,
writer: w,
@ -35,6 +37,9 @@ func NewLogConsumer(w io.Writer) LogConsumer {
// Log formats a log message as received from service/container
func (l *LogConsumer) Log(service, container, message string) {
if l.ctx.Err() == context.Canceled {
return
}
cf, ok := l.colors[service]
if !ok {
cf = <-loop
@ -70,6 +75,7 @@ func (l *LogConsumer) computeWidth() {
// LogConsumer consume logs from services and format them
type LogConsumer struct {
ctx context.Context
colors map[string]colorFunc
width int
writer io.Writer

View File

@ -293,13 +293,14 @@ func toProgressEvent(prefix string, jm jsonmessage.JSONMessage, w progress.Write
})
}
func (s *composeService) Up(ctx context.Context, project *types.Project, detach bool) error {
func (s *composeService) Up(ctx context.Context, project *types.Project, detach bool, w io.Writer) error {
err := s.ensureImagesExists(ctx, project)
if err != nil {
return err
}
for k, network := range project.Networks {
if !network.External.External && network.Name == k {
if !network.External.External && network.Name != "" {
network.Name = fmt.Sprintf("%s_%s", project.Name, k)
project.Networks[k] = network
}
@ -329,9 +330,117 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, detach
err = InDependencyOrder(ctx, project, func(c context.Context, service types.ServiceConfig) error {
return s.ensureService(c, project, service)
})
if err != nil {
return err
}
if detach {
err = inDependencyOrder(ctx, project, func(c context.Context, service types.ServiceConfig) error {
return s.startService(ctx, project, service)
})
return err
}
if detach {
return nil
}
progress.ContextWriter(ctx).Stop()
return s.attach(ctx, project, w)
}
func (s *composeService) attach(ctx context.Context, project *types.Project, w io.Writer) error {
consumer := formatter.NewLogConsumer(ctx, w)
containers, err := s.apiClient.ContainerList(ctx, moby.ContainerListOptions{
Filters: filters.NewArgs(
projectFilter(project.Name),
),
All: true,
})
if err != nil {
return err
}
var names []string
for _, c := range containers {
names = append(names, getContainerName(c))
}
fmt.Printf("Attaching to %s\n", strings.Join(names, ", "))
eg, ctx := errgroup.WithContext(ctx)
for _, c := range containers {
container := c
eg.Go(func() error {
return s.attachContainer(ctx, container, project, consumer)
})
}
eg.Go(func() error {
<-ctx.Done()
fmt.Println("Gracefully stopping...")
ctx = context.Background()
_, err = progress.Run(ctx, func(ctx context.Context) (string, error) {
return "", s.Down(ctx, project.Name)
})
return nil
})
return eg.Wait()
}
func (s *composeService) attachContainer(ctx context.Context, container moby.Container, project *types.Project, consumer formatter.LogConsumer) error {
serviceName := container.Labels[serviceLabel]
service, err := project.GetService(serviceName)
if err != nil {
return err
}
reader, err := s.getContainerStdout(ctx, container)
if err != nil {
return err
}
w := consumer.GetWriter(serviceName, container.ID)
if service.Tty {
_, err = io.Copy(w, reader)
} else {
_, err = stdcopy.StdCopy(w, w, reader)
}
return err
}
func (s *composeService) getContainerStdout(ctx context.Context, container moby.Container) (io.Reader, error) {
var reader io.Reader
if container.State == containerRunning {
logs, err := s.apiClient.ContainerLogs(ctx, container.ID, moby.ContainerLogsOptions{
ShowStdout: true,
ShowStderr: true,
Follow: true,
})
if err != nil {
return nil, err
}
reader = logs
} else {
cnx, err := s.apiClient.ContainerAttach(ctx, container.ID, moby.ContainerAttachOptions{
Stream: true,
Stdin: true,
Stdout: true,
Stderr: true,
})
if err != nil {
return nil, err
}
reader = cnx.Reader
err = s.apiClient.ContainerStart(ctx, container.ID, moby.ContainerStartOptions{})
if err != nil {
return nil, err
}
}
return reader, nil
}
func getContainerName(c moby.Container) string {
// Names return container canonical name /foo + link aliases /linked_by/foo
for _, name := range c.Names {
@ -367,6 +476,7 @@ func (s *composeService) Down(ctx context.Context, projectName string) error {
Filters: filters.NewArgs(
projectFilter(projectName),
),
All: true,
})
if err != nil {
return err
@ -467,7 +577,7 @@ func (s *composeService) Logs(ctx context.Context, projectName string, w io.Writ
if err != nil {
return err
}
consumer := formatter.NewLogConsumer(w)
consumer := formatter.NewLogConsumer(ctx, w)
eg, ctx := errgroup.WithContext(ctx)
for _, c := range list {
service := c.Labels[serviceLabel]
@ -521,7 +631,7 @@ func containersToServiceStatus(containers []moby.Container) ([]compose.ServiceSt
containers := containersByLabel[service]
runnningContainers := []moby.Container{}
for _, container := range containers {
if container.State == "running" {
if container.State == containerRunning {
runnningContainers = append(runnningContainers, container)
}
}

29
local/container.go Normal file
View File

@ -0,0 +1,29 @@
// +build local
/*
Copyright 2020 Docker Compose CLI authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package local
const (
containerCreated = "created"
containerRestarting = "restarting"
containerRunning = "running"
containerRemoving = "removing" //nolint
containerPaused = "paused" //nolint
containerExited = "exited" //nolint
containerDead = "dead" //nolint
)

View File

@ -39,16 +39,12 @@ const (
)
func (s *composeService) ensureService(ctx context.Context, project *types.Project, service types.ServiceConfig) error {
err := s.waitDependencies(ctx, project, service)
if err != nil {
return err
}
actual, err := s.apiClient.ContainerList(ctx, moby.ContainerListOptions{
Filters: filters.NewArgs(
filters.Arg("label", fmt.Sprintf("%s=%s", projectLabel, project.Name)),
filters.Arg("label", fmt.Sprintf("%s=%s", serviceLabel, service.Name)),
projectFilter(project.Name),
serviceFilter(service.Name),
),
All: true,
})
if err != nil {
return err
@ -93,6 +89,8 @@ func (s *composeService) ensureService(ctx context.Context, project *types.Proje
for _, container := range actual {
container := container
name := getContainerName(container)
diverged := container.Labels[configHashLabel] != expected
if diverged || service.Extensions[extLifecycle] == forceRecreate {
eg.Go(func() error {
@ -101,14 +99,18 @@ func (s *composeService) ensureService(ctx context.Context, project *types.Proje
continue
}
if container.State == "running" {
// already running, skip
continue
w := progress.ContextWriter(ctx)
switch container.State {
case containerRunning:
w.Event(progress.RunningEvent(name))
case containerCreated:
case containerRestarting:
w.Event(progress.CreatedEvent(name))
default:
eg.Go(func() error {
return s.restartContainer(ctx, container)
})
}
eg.Go(func() error {
return s.restartContainer(ctx, service, container)
})
}
return eg.Wait()
}
@ -163,21 +165,19 @@ func getScale(config types.ServiceConfig) int {
}
func (s *composeService) createContainer(ctx context.Context, project *types.Project, service types.ServiceConfig, name string, number int) error {
eventName := fmt.Sprintf("Service %q", service.Name)
w := progress.ContextWriter(ctx)
w.Event(progress.CreatingEvent(eventName))
w.Event(progress.CreatingEvent(name))
err := s.runContainer(ctx, project, service, name, number, nil)
if err != nil {
return err
}
w.Event(progress.CreatedEvent(eventName))
w.Event(progress.CreatedEvent(name))
return nil
}
func (s *composeService) recreateContainer(ctx context.Context, project *types.Project, service types.ServiceConfig, container moby.Container) error {
w := progress.ContextWriter(ctx)
eventName := fmt.Sprintf("Service %q", service.Name)
w.Event(progress.NewEvent(eventName, progress.Working, "Recreate"))
w.Event(progress.NewEvent(getContainerName(container), progress.Working, "Recreate"))
err := s.apiClient.ContainerStop(ctx, container.ID, nil)
if err != nil {
return err
@ -200,7 +200,7 @@ func (s *composeService) recreateContainer(ctx context.Context, project *types.P
if err != nil {
return err
}
w.Event(progress.NewEvent(eventName, progress.Done, "Recreated"))
w.Event(progress.NewEvent(getContainerName(container), progress.Done, "Recreated"))
setDependentLifecycle(project, service.Name, forceRecreate)
return nil
}
@ -218,15 +218,14 @@ func setDependentLifecycle(project *types.Project, service string, strategy stri
}
}
func (s *composeService) restartContainer(ctx context.Context, service types.ServiceConfig, container moby.Container) error {
func (s *composeService) restartContainer(ctx context.Context, container moby.Container) error {
w := progress.ContextWriter(ctx)
eventName := fmt.Sprintf("Service %q", service.Name)
w.Event(progress.NewEvent(eventName, progress.Working, "Restart"))
w.Event(progress.NewEvent(getContainerName(container), progress.Working, "Restart"))
err := s.apiClient.ContainerStart(ctx, container.ID, moby.ContainerStartOptions{})
if err != nil {
return err
}
w.Event(progress.NewEvent(eventName, progress.Done, "Restarted"))
w.Event(progress.NewEvent(getContainerName(container), progress.Done, "Restarted"))
return nil
}
@ -247,10 +246,6 @@ func (s *composeService) runContainer(ctx context.Context, project *types.Projec
return err
}
}
err = s.apiClient.ContainerStart(ctx, id, moby.ContainerStartOptions{})
if err != nil {
return err
}
return nil
}
@ -291,5 +286,38 @@ func (s *composeService) isServiceHealthy(ctx context.Context, project *types.Pr
}
}
return true, nil
}
func (s *composeService) startService(ctx context.Context, project *types.Project, service types.ServiceConfig) error {
err := s.waitDependencies(ctx, project, service)
if err != nil {
return err
}
containers, err := s.apiClient.ContainerList(ctx, moby.ContainerListOptions{
Filters: filters.NewArgs(
projectFilter(project.Name),
serviceFilter(service.Name),
),
All: true,
})
if err != nil {
return err
}
eg, ctx := errgroup.WithContext(ctx)
for _, c := range containers {
container := c
if container.State == containerRunning {
continue
}
eg.Go(func() error {
w := progress.ContextWriter(ctx)
w.Event(progress.StartingEvent(getContainerName(container)))
err := s.apiClient.ContainerStart(ctx, container.ID, moby.ContainerStartOptions{})
if err == nil {
w.Event(progress.StartedEvent(getContainerName(container)))
}
return err
})
}
return eg.Wait()
}

View File

@ -51,3 +51,7 @@ func serviceFilter(serviceName string) filters.KeyValuePair {
func hasProjectLabelFilter() filters.KeyValuePair {
return filters.Arg("label", projectLabel)
}
func serviceFilter(serviceName string) filters.KeyValuePair {
return filters.Arg("label", fmt.Sprintf("%s=%s", serviceLabel, serviceName))
}

View File

@ -58,6 +58,21 @@ func CreatingEvent(ID string) Event {
return NewEvent(ID, Working, "Creating")
}
// StartingEvent creates a new Starting in progress Event
func StartingEvent(ID string) Event {
return NewEvent(ID, Working, "Starting")
}
// StartedEvent creates a new Started in progress Event
func StartedEvent(ID string) Event {
return NewEvent(ID, Done, "Started")
}
// RunningEvent creates a new Running in progress Event
func RunningEvent(ID string) Event {
return NewEvent(ID, Done, "Running")
}
// CreatedEvent creates a new Created (done) Event
func CreatedEvent(ID string) Event {
return NewEvent(ID, Done, "Created")

View File

@ -30,7 +30,7 @@ func (p *proxy) Up(ctx context.Context, request *composev1.ComposeUpRequest) (*c
if err != nil {
return nil, err
}
return &composev1.ComposeUpResponse{ProjectName: project.Name}, Client(ctx).ComposeService().Up(ctx, project, true)
return &composev1.ComposeUpResponse{ProjectName: project.Name}, Client(ctx).ComposeService().Up(ctx, project, true, nil)
}
func (p *proxy) Down(ctx context.Context, request *composev1.ComposeDownRequest) (*composev1.ComposeDownResponse, error) {