Merge pull request #998 from docker/attach

This commit is contained in:
Nicolas De loof 2020-12-04 09:06:58 +01:00 committed by GitHub
commit 54c2f03424
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 387 additions and 62 deletions

View File

@ -56,6 +56,14 @@ func (cs *aciComposeService) Pull(ctx context.Context, project *types.Project) e
return errdefs.ErrNotImplemented
}
func (cs *aciComposeService) Create(ctx context.Context, project *types.Project) error {
return errdefs.ErrNotImplemented
}
func (cs *aciComposeService) Start(ctx context.Context, project *types.Project, w io.Writer) error {
return errdefs.ErrNotImplemented
}
func (cs *aciComposeService) Up(ctx context.Context, project *types.Project, detach bool) error {
logrus.Debugf("Up on project with name %q", project.Name)

View File

@ -41,6 +41,14 @@ func (c *composeService) Pull(ctx context.Context, project *types.Project) error
return errdefs.ErrNotImplemented
}
func (c *composeService) Create(ctx context.Context, project *types.Project) error {
return errdefs.ErrNotImplemented
}
func (c *composeService) Start(ctx context.Context, project *types.Project, w io.Writer) error {
return errdefs.ErrNotImplemented
}
func (c *composeService) Up(context.Context, *types.Project, bool) error {
return errdefs.ErrNotImplemented
}

View File

@ -31,6 +31,10 @@ type Service interface {
Push(ctx context.Context, project *types.Project) error
// Pull executes the equivalent of a `compose pull`
Pull(ctx context.Context, project *types.Project) error
// Create executes the equivalent to a `compose create`
Create(ctx context.Context, project *types.Project) error
// Start executes the equivalent to a `compose start`
Start(ctx context.Context, project *types.Project, w io.Writer) error
// Up executes the equivalent to a `compose up`
Up(ctx context.Context, project *types.Project, detach bool) error
// Down executes the equivalent to a `compose down`

View File

@ -18,8 +18,13 @@ package compose
import (
"context"
"errors"
"fmt"
"io"
"os"
"github.com/compose-spec/compose-go/cli"
"github.com/compose-spec/compose-go/types"
"github.com/spf13/cobra"
"github.com/docker/compose-cli/api/client"
@ -32,7 +37,12 @@ func upCommand(contextType string) *cobra.Command {
upCmd := &cobra.Command{
Use: "up [SERVICE...]",
RunE: func(cmd *cobra.Command, args []string) error {
return runUp(cmd.Context(), opts, args)
switch contextType {
case store.LocalContextType:
return runCreateStart(cmd.Context(), opts, args)
default:
return runUp(cmd.Context(), opts, args)
}
},
}
upCmd.Flags().StringVarP(&opts.Name, "project-name", "p", "", "Project name")
@ -49,30 +59,68 @@ func upCommand(contextType string) *cobra.Command {
}
func runUp(ctx context.Context, opts composeOptions, services []string) error {
c, err := client.New(ctx)
c, project, err := setup(ctx, opts, services)
if err != nil {
return err
}
_, err = progress.Run(ctx, func(ctx context.Context) (string, error) {
options, err := opts.toProjectOptions()
if err != nil {
return "", err
}
project, err := cli.ProjectFromOptions(options)
if err != nil {
return "", err
}
if opts.DomainName != "" {
// arbitrarily set the domain name on the first service ; ACI backend will expose the entire project
project.Services[0].DomainName = opts.DomainName
}
err = filter(project, services)
if err != nil {
return "", err
}
return "", c.ComposeService().Up(ctx, project, opts.Detach)
})
return err
}
func runCreateStart(ctx context.Context, opts composeOptions, services []string) error {
c, project, err := setup(ctx, opts, services)
if err != nil {
return err
}
_, err = progress.Run(ctx, func(ctx context.Context) (string, error) {
return "", c.ComposeService().Create(ctx, project)
})
if err != nil {
return err
}
var w io.Writer
if !opts.Detach {
w = os.Stdout
}
err = c.ComposeService().Start(ctx, project, w)
if errors.Is(ctx.Err(), context.Canceled) {
fmt.Println("Gracefully stopping...")
ctx = context.Background()
_, err = progress.Run(ctx, func(ctx context.Context) (string, error) {
return "", c.ComposeService().Down(ctx, project.Name)
})
}
return err
}
func setup(ctx context.Context, opts composeOptions, services []string) (*client.Client, *types.Project, error) {
c, err := client.New(ctx)
if err != nil {
return nil, nil, err
}
options, err := opts.toProjectOptions()
if err != nil {
return nil, nil, err
}
project, err := cli.ProjectFromOptions(options)
if err != nil {
return nil, nil, err
}
if opts.DomainName != "" {
// arbitrarily set the domain name on the first service ; ACI backend will expose the entire project
project.Services[0].DomainName = opts.DomainName
}
err = filter(project, services)
if err != nil {
return nil, nil, err
}
return c, project, nil
}

View File

@ -28,17 +28,16 @@ import (
"path/filepath"
"strings"
types2 "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
"github.com/docker/compose-cli/api/compose"
"github.com/docker/compose-cli/errdefs"
"github.com/aws/aws-sdk-go/aws"
"github.com/compose-spec/compose-go/types"
types2 "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
"github.com/pkg/errors"
"github.com/sanathkr/go-yaml"
"golang.org/x/mod/semver"
"github.com/docker/compose-cli/api/compose"
"github.com/docker/compose-cli/errdefs"
)
func (e ecsLocalSimulation) Build(ctx context.Context, project *types.Project) error {
@ -53,7 +52,16 @@ func (e ecsLocalSimulation) Pull(ctx context.Context, project *types.Project) er
return errdefs.ErrNotImplemented
}
func (e ecsLocalSimulation) Create(ctx context.Context, project *types.Project) error {
return errdefs.ErrNotImplemented
}
func (e ecsLocalSimulation) Start(ctx context.Context, project *types.Project, w io.Writer) error {
return errdefs.ErrNotImplemented
}
func (e ecsLocalSimulation) Up(ctx context.Context, project *types.Project, detach bool) error {
cmd := exec.Command("docker-compose", "version", "--short")
b := bytes.Buffer{}
b.WriteString("v")

View File

@ -24,7 +24,7 @@ import (
)
func (b *ecsAPIService) Logs(ctx context.Context, project string, w io.Writer) error {
consumer := formatter.NewLogConsumer(w)
consumer := formatter.NewLogConsumer(ctx, w)
err := b.aws.GetLogs(ctx, project, consumer.Log)
return err
}

View File

@ -19,6 +19,7 @@ package ecs
import (
"context"
"fmt"
"io"
"os"
"os/signal"
"syscall"
@ -39,7 +40,16 @@ func (b *ecsAPIService) Pull(ctx context.Context, project *types.Project) error
return errdefs.ErrNotImplemented
}
func (b *ecsAPIService) Create(ctx context.Context, project *types.Project) error {
return errdefs.ErrNotImplemented
}
func (b *ecsAPIService) Start(ctx context.Context, project *types.Project, w io.Writer) error {
return errdefs.ErrNotImplemented
}
func (b *ecsAPIService) Up(ctx context.Context, project *types.Project, detach bool) error {
err := b.aws.CheckRequirements(ctx, b.Region)
if err != nil {
return err

View File

@ -151,7 +151,16 @@ func (cs *composeService) Pull(ctx context.Context, project *types.Project) erro
return errdefs.ErrNotImplemented
}
func (cs *composeService) Create(ctx context.Context, project *types.Project) error {
return errdefs.ErrNotImplemented
}
func (cs *composeService) Start(ctx context.Context, project *types.Project, w io.Writer) error {
return errdefs.ErrNotImplemented
}
func (cs *composeService) Up(ctx context.Context, project *types.Project, detach bool) error {
fmt.Printf("Up command on project %q", project.Name)
return nil
}

View File

@ -18,6 +18,7 @@ package formatter
import (
"bytes"
"context"
"fmt"
"io"
"strconv"
@ -25,8 +26,9 @@ import (
)
// NewLogConsumer creates a new LogConsumer
func NewLogConsumer(w io.Writer) LogConsumer {
func NewLogConsumer(ctx context.Context, w io.Writer) LogConsumer {
return LogConsumer{
ctx: ctx,
colors: map[string]colorFunc{},
width: 0,
writer: w,
@ -35,6 +37,9 @@ func NewLogConsumer(w io.Writer) LogConsumer {
// Log formats a log message as received from service/container
func (l *LogConsumer) Log(service, container, message string) {
if l.ctx.Err() != nil {
return
}
cf, ok := l.colors[service]
if !ok {
cf = <-loop
@ -70,6 +75,7 @@ func (l *LogConsumer) computeWidth() {
// LogConsumer consume logs from services and format them
type LogConsumer struct {
ctx context.Context
colors map[string]colorFunc
width int
writer io.Writer

View File

@ -53,6 +53,7 @@ import (
"github.com/docker/compose-cli/api/compose"
"github.com/docker/compose-cli/config"
errdefs2 "github.com/docker/compose-cli/errdefs"
"github.com/docker/compose-cli/formatter"
"github.com/docker/compose-cli/progress"
)
@ -294,12 +295,17 @@ func toProgressEvent(prefix string, jm jsonmessage.JSONMessage, w progress.Write
}
func (s *composeService) Up(ctx context.Context, project *types.Project, detach bool) error {
return errdefs2.ErrNotImplemented
}
func (s *composeService) Create(ctx context.Context, project *types.Project) error {
err := s.ensureImagesExists(ctx, project)
if err != nil {
return err
}
for k, network := range project.Networks {
if !network.External.External && network.Name == k {
if !network.External.External && network.Name != "" {
network.Name = fmt.Sprintf("%s_%s", project.Name, k)
project.Networks[k] = network
}
@ -326,12 +332,119 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, detach
}
}
err = InDependencyOrder(ctx, project, func(c context.Context, service types.ServiceConfig) error {
return InDependencyOrder(ctx, project, func(c context.Context, service types.ServiceConfig) error {
return s.ensureService(c, project, service)
})
}
func (s *composeService) Start(ctx context.Context, project *types.Project, w io.Writer) error {
var group *errgroup.Group
if w != nil {
eg, err := s.attach(ctx, project, w)
if err != nil {
return err
}
group = eg
}
err := InDependencyOrder(ctx, project, func(c context.Context, service types.ServiceConfig) error {
return s.startService(ctx, project, service)
})
if err != nil {
return err
}
if group != nil {
return group.Wait()
}
return nil
}
func (s *composeService) attach(ctx context.Context, project *types.Project, w io.Writer) (*errgroup.Group, error) {
consumer := formatter.NewLogConsumer(ctx, w)
containers, err := s.apiClient.ContainerList(ctx, moby.ContainerListOptions{
Filters: filters.NewArgs(
projectFilter(project.Name),
),
All: true,
})
if err != nil {
return nil, err
}
var names []string
for _, c := range containers {
names = append(names, getContainerName(c))
}
fmt.Printf("Attaching to %s\n", strings.Join(names, ", "))
eg, ctx := errgroup.WithContext(ctx)
for _, c := range containers {
container := c
eg.Go(func() error {
return s.attachContainer(ctx, container, consumer, project)
})
}
return eg, nil
}
func (s *composeService) attachContainer(ctx context.Context, container moby.Container, consumer formatter.LogConsumer, project *types.Project) error {
serviceName := container.Labels[serviceLabel]
w := consumer.GetWriter(serviceName, container.ID)
service, err := project.GetService(serviceName)
if err != nil {
return err
}
reader, err := s.getContainerStdout(ctx, container)
if err != nil {
return err
}
go func() {
<-ctx.Done()
reader.Close() //nolint:errcheck
}()
if service.Tty {
_, err = io.Copy(w, reader)
} else {
_, err = stdcopy.StdCopy(w, w, reader)
}
return err
}
func (s *composeService) getContainerStdout(ctx context.Context, container moby.Container) (io.ReadCloser, error) {
var reader io.ReadCloser
if container.State == containerRunning {
logs, err := s.apiClient.ContainerLogs(ctx, container.ID, moby.ContainerLogsOptions{
ShowStdout: true,
ShowStderr: true,
Follow: true,
})
if err != nil {
return nil, err
}
reader = logs
} else {
cnx, err := s.apiClient.ContainerAttach(ctx, container.ID, moby.ContainerAttachOptions{
Stream: true,
Stdin: true,
Stdout: true,
Stderr: true,
})
if err != nil {
return nil, err
}
reader = containerStdout{cnx}
err = s.apiClient.ContainerStart(ctx, container.ID, moby.ContainerStartOptions{})
if err != nil {
return nil, err
}
}
return reader, nil
}
func getContainerName(c moby.Container) string {
// Names return container canonical name /foo + link aliases /linked_by/foo
for _, name := range c.Names {
@ -467,7 +580,7 @@ func (s *composeService) Logs(ctx context.Context, projectName string, w io.Writ
if err != nil {
return err
}
consumer := formatter.NewLogConsumer(w)
consumer := formatter.NewLogConsumer(ctx, w)
eg, ctx := errgroup.WithContext(ctx)
for _, c := range list {
service := c.Labels[serviceLabel]
@ -521,7 +634,7 @@ func containersToServiceStatus(containers []moby.Container) ([]compose.ServiceSt
containers := containersByLabel[service]
runnningContainers := []moby.Container{}
for _, container := range containers {
if container.State == "running" {
if container.State == containerRunning {
runnningContainers = append(runnningContainers, container)
}
}

64
local/container.go Normal file
View File

@ -0,0 +1,64 @@
// +build local
/*
Copyright 2020 Docker Compose CLI authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package local
import (
"io"
moby "github.com/docker/docker/api/types"
)
const (
containerCreated = "created"
containerRestarting = "restarting"
containerRunning = "running"
containerRemoving = "removing" //nolint
containerPaused = "paused" //nolint
containerExited = "exited" //nolint
containerDead = "dead" //nolint
)
var _ io.ReadCloser = containerStdout{}
type containerStdout struct {
moby.HijackedResponse
}
func (l containerStdout) Read(p []byte) (n int, err error) {
return l.Reader.Read(p)
}
func (l containerStdout) Close() error {
l.HijackedResponse.Close()
return nil
}
var _ io.WriteCloser = containerStdin{}
type containerStdin struct {
moby.HijackedResponse
}
func (c containerStdin) Write(p []byte) (n int, err error) {
return c.Conn.Write(p)
}
func (c containerStdin) Close() error {
return c.CloseWrite()
}

View File

@ -143,7 +143,11 @@ func (cs *containerService) Run(ctx context.Context, r containers.ContainerConfi
return cs.apiClient.ContainerStart(ctx, id, types.ContainerStartOptions{})
}
func (cs *containerService) create(ctx context.Context, containerConfig *container.Config, hostConfig *container.HostConfig, networkingConfig *network.NetworkingConfig, platform *specs.Platform, name string) (string, error) {
func (cs *containerService) create(ctx context.Context,
containerConfig *container.Config,
hostConfig *container.HostConfig,
networkingConfig *network.NetworkingConfig,
platform *specs.Platform, name string) (string, error) {
created, err := cs.apiClient.ContainerCreate(ctx, containerConfig, hostConfig, networkingConfig, platform, name)
if err != nil {

View File

@ -39,16 +39,12 @@ const (
)
func (s *composeService) ensureService(ctx context.Context, project *types.Project, service types.ServiceConfig) error {
err := s.waitDependencies(ctx, project, service)
if err != nil {
return err
}
actual, err := s.apiClient.ContainerList(ctx, moby.ContainerListOptions{
Filters: filters.NewArgs(
filters.Arg("label", fmt.Sprintf("%s=%s", projectLabel, project.Name)),
filters.Arg("label", fmt.Sprintf("%s=%s", serviceLabel, service.Name)),
projectFilter(project.Name),
serviceFilter(service.Name),
),
All: true,
})
if err != nil {
return err
@ -93,6 +89,8 @@ func (s *composeService) ensureService(ctx context.Context, project *types.Proje
for _, container := range actual {
container := container
name := getContainerName(container)
diverged := container.Labels[configHashLabel] != expected
if diverged || service.Extensions[extLifecycle] == forceRecreate {
eg.Go(func() error {
@ -101,14 +99,18 @@ func (s *composeService) ensureService(ctx context.Context, project *types.Proje
continue
}
if container.State == "running" {
// already running, skip
continue
w := progress.ContextWriter(ctx)
switch container.State {
case containerRunning:
w.Event(progress.RunningEvent(name))
case containerCreated:
case containerRestarting:
w.Event(progress.CreatedEvent(name))
default:
eg.Go(func() error {
return s.restartContainer(ctx, container)
})
}
eg.Go(func() error {
return s.restartContainer(ctx, service, container)
})
}
return eg.Wait()
}
@ -163,21 +165,19 @@ func getScale(config types.ServiceConfig) int {
}
func (s *composeService) createContainer(ctx context.Context, project *types.Project, service types.ServiceConfig, name string, number int) error {
eventName := fmt.Sprintf("Service %q", service.Name)
w := progress.ContextWriter(ctx)
w.Event(progress.CreatingEvent(eventName))
w.Event(progress.CreatingEvent(name))
err := s.runContainer(ctx, project, service, name, number, nil)
if err != nil {
return err
}
w.Event(progress.CreatedEvent(eventName))
w.Event(progress.CreatedEvent(name))
return nil
}
func (s *composeService) recreateContainer(ctx context.Context, project *types.Project, service types.ServiceConfig, container moby.Container) error {
w := progress.ContextWriter(ctx)
eventName := fmt.Sprintf("Service %q", service.Name)
w.Event(progress.NewEvent(eventName, progress.Working, "Recreate"))
w.Event(progress.NewEvent(getContainerName(container), progress.Working, "Recreate"))
err := s.apiClient.ContainerStop(ctx, container.ID, nil)
if err != nil {
return err
@ -200,7 +200,7 @@ func (s *composeService) recreateContainer(ctx context.Context, project *types.P
if err != nil {
return err
}
w.Event(progress.NewEvent(eventName, progress.Done, "Recreated"))
w.Event(progress.NewEvent(getContainerName(container), progress.Done, "Recreated"))
setDependentLifecycle(project, service.Name, forceRecreate)
return nil
}
@ -218,15 +218,14 @@ func setDependentLifecycle(project *types.Project, service string, strategy stri
}
}
func (s *composeService) restartContainer(ctx context.Context, service types.ServiceConfig, container moby.Container) error {
func (s *composeService) restartContainer(ctx context.Context, container moby.Container) error {
w := progress.ContextWriter(ctx)
eventName := fmt.Sprintf("Service %q", service.Name)
w.Event(progress.NewEvent(eventName, progress.Working, "Restart"))
w.Event(progress.NewEvent(getContainerName(container), progress.Working, "Restart"))
err := s.apiClient.ContainerStart(ctx, container.ID, moby.ContainerStartOptions{})
if err != nil {
return err
}
w.Event(progress.NewEvent(eventName, progress.Done, "Restarted"))
w.Event(progress.NewEvent(getContainerName(container), progress.Done, "Restarted"))
return nil
}
@ -247,10 +246,6 @@ func (s *composeService) runContainer(ctx context.Context, project *types.Projec
return err
}
}
err = s.apiClient.ContainerStart(ctx, id, moby.ContainerStartOptions{})
if err != nil {
return err
}
return nil
}
@ -291,5 +286,38 @@ func (s *composeService) isServiceHealthy(ctx context.Context, project *types.Pr
}
}
return true, nil
}
func (s *composeService) startService(ctx context.Context, project *types.Project, service types.ServiceConfig) error {
err := s.waitDependencies(ctx, project, service)
if err != nil {
return err
}
containers, err := s.apiClient.ContainerList(ctx, moby.ContainerListOptions{
Filters: filters.NewArgs(
projectFilter(project.Name),
serviceFilter(service.Name),
),
All: true,
})
if err != nil {
return err
}
eg, ctx := errgroup.WithContext(ctx)
for _, c := range containers {
container := c
if container.State == containerRunning {
continue
}
eg.Go(func() error {
w := progress.ContextWriter(ctx)
w.Event(progress.StartingEvent(getContainerName(container)))
err := s.apiClient.ContainerStart(ctx, container.ID, moby.ContainerStartOptions{})
if err == nil {
w.Event(progress.StartedEvent(getContainerName(container)))
}
return err
})
}
return eg.Wait()
}

View File

@ -45,7 +45,7 @@ func TestLocalBackendComposeUp(t *testing.T) {
})
t.Run("up", func(t *testing.T) {
c.RunDockerCmd("compose", "up", "-f", "../../tests/composefiles/demo_multi_port.yaml", "--project-name", projectName)
c.RunDockerCmd("compose", "up", "-f", "../../tests/composefiles/demo_multi_port.yaml", "--project-name", projectName, "-d")
})
t.Run("check running project", func(t *testing.T) {

View File

@ -58,6 +58,21 @@ func CreatingEvent(ID string) Event {
return NewEvent(ID, Working, "Creating")
}
// StartingEvent creates a new Starting in progress Event
func StartingEvent(ID string) Event {
return NewEvent(ID, Working, "Starting")
}
// StartedEvent creates a new Started in progress Event
func StartedEvent(ID string) Event {
return NewEvent(ID, Done, "Started")
}
// RunningEvent creates a new Running in progress Event
func RunningEvent(ID string) Event {
return NewEvent(ID, Done, "Running")
}
// CreatedEvent creates a new Created (done) Event
func CreatedEvent(ID string) Event {
return NewEvent(ID, Done, "Created")