wait for exec stdout to complete

Signed-off-by: Nicolas De Loof <nicolas.deloof@gmail.com>
This commit is contained in:
Nicolas De Loof 2021-06-09 17:41:17 +02:00
parent b1d2c18dfc
commit e7f228ecf9
2 changed files with 69 additions and 41 deletions

View File

@ -22,8 +22,9 @@ import (
"io" "io"
"github.com/compose-spec/compose-go/types" "github.com/compose-spec/compose-go/types"
apitypes "github.com/docker/docker/api/types" moby "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters" "github.com/docker/docker/api/types/filters"
"github.com/docker/docker/pkg/stdcopy"
"github.com/docker/compose-cli/api/compose" "github.com/docker/compose-cli/api/compose"
) )
@ -34,34 +35,14 @@ func (s *composeService) Exec(ctx context.Context, project *types.Project, opts
return 0, err return 0, err
} }
containers, err := s.apiClient.ContainerList(ctx, apitypes.ContainerListOptions{ container, err := s.getExecTarget(ctx, project, service, opts)
Filters: filters.NewArgs(
projectFilter(project.Name),
serviceFilter(service.Name),
filters.Arg("label", fmt.Sprintf("%s=%d", containerNumberLabel, opts.Index)),
),
})
if err != nil { if err != nil {
return 0, err return 0, err
} }
if len(containers) < 1 {
return 0, fmt.Errorf("container %s not running", getContainerName(project.Name, service, opts.Index))
}
container := containers[0]
var env []string exec, err := s.apiClient.ContainerExecCreate(ctx, container.ID, moby.ExecConfig{
for k, v := range service.Environment.OverrideBy(types.NewMappingWithEquals(opts.Environment)).
Resolve(func(s string) (string, bool) {
v, ok := project.Environment[s]
return v, ok
}).
RemoveEmpty() {
env = append(env, fmt.Sprintf("%s=%s", k, *v))
}
exec, err := s.apiClient.ContainerExecCreate(ctx, container.ID, apitypes.ExecConfig{
Cmd: opts.Command, Cmd: opts.Command,
Env: env, Env: s.getExecEnvironment(project, service, opts),
User: opts.User, User: opts.User,
Privileged: opts.Privileged, Privileged: opts.Privileged,
Tty: opts.Tty, Tty: opts.Tty,
@ -77,15 +58,14 @@ func (s *composeService) Exec(ctx context.Context, project *types.Project, opts
} }
if opts.Detach { if opts.Detach {
return 0, s.apiClient.ContainerExecStart(ctx, exec.ID, apitypes.ExecStartCheck{ return 0, s.apiClient.ContainerExecStart(ctx, exec.ID, moby.ExecStartCheck{
Detach: true, Detach: true,
Tty: opts.Tty, Tty: opts.Tty,
}) })
} }
resp, err := s.apiClient.ContainerExecAttach(ctx, exec.ID, apitypes.ExecStartCheck{ resp, err := s.apiClient.ContainerExecAttach(ctx, exec.ID, moby.ExecStartCheck{
Detach: false, Tty: opts.Tty,
Tty: opts.Tty,
}) })
if err != nil { if err != nil {
return 0, err return 0, err
@ -99,30 +79,78 @@ func (s *composeService) Exec(ctx context.Context, project *types.Project, opts
} }
} }
readChannel := make(chan error) err = s.interactiveExec(ctx, opts, resp)
writeChannel := make(chan error) if err != nil {
return 0, err
}
return s.getExecExitStatus(ctx, exec.ID)
}
// inspired by https://github.com/docker/cli/blob/master/cli/command/container/exec.go#L116
func (s *composeService) interactiveExec(ctx context.Context, opts compose.RunOptions, resp moby.HijackedResponse) error {
outputDone := make(chan error)
inputDone := make(chan error)
go func() { go func() {
_, err := io.Copy(opts.Writer, resp.Reader) if opts.Tty {
readChannel <- err _, err := io.Copy(opts.Writer, resp.Reader)
outputDone <- err
} else {
_, err := stdcopy.StdCopy(opts.Writer, opts.Writer, resp.Reader)
outputDone <- err
}
}() }()
go func() { go func() {
_, err := io.Copy(resp.Conn, opts.Reader) _, err := io.Copy(resp.Conn, opts.Reader)
writeChannel <- err inputDone <- err
}() }()
select { for {
case err = <-readChannel: select {
break case err := <-outputDone:
case err = <-writeChannel: return err
break case err := <-inputDone:
if err != nil {
return err
}
// Wait for output to complete streaming
case <-ctx.Done():
return ctx.Err()
}
} }
}
func (s *composeService) getExecTarget(ctx context.Context, project *types.Project, service types.ServiceConfig, opts compose.RunOptions) (moby.Container, error) {
containers, err := s.apiClient.ContainerList(ctx, moby.ContainerListOptions{
Filters: filters.NewArgs(
projectFilter(project.Name),
serviceFilter(service.Name),
filters.Arg("label", fmt.Sprintf("%s=%d", containerNumberLabel, opts.Index)),
),
})
if err != nil { if err != nil {
return 0, err return moby.Container{}, err
} }
return s.getExecExitStatus(ctx, exec.ID) if len(containers) < 1 {
return moby.Container{}, fmt.Errorf("container %s not running", getContainerName(project.Name, service, opts.Index))
}
container := containers[0]
return container, nil
}
func (s *composeService) getExecEnvironment(project *types.Project, service types.ServiceConfig, opts compose.RunOptions) []string {
var env []string
for k, v := range service.Environment.OverrideBy(types.NewMappingWithEquals(opts.Environment)).
Resolve(func(s string) (string, bool) {
v, ok := project.Environment[s]
return v, ok
}).
RemoveEmpty() {
env = append(env, fmt.Sprintf("%s=%s", k, *v))
}
return env
} }
func (s *composeService) getExecExitStatus(ctx context.Context, execID string) (int, error) { func (s *composeService) getExecExitStatus(ctx context.Context, execID string) (int, error) {

View File

@ -82,7 +82,7 @@ func TestNetworkAliasses(t *testing.T) {
t.Run("curl", func(t *testing.T) { t.Run("curl", func(t *testing.T) {
res := c.RunDockerCmd("compose", "-f", "./fixtures/network-alias/compose.yaml", "--project-name", projectName, "exec", "-T", "container1", "curl", "http://alias-of-container2/") res := c.RunDockerCmd("compose", "-f", "./fixtures/network-alias/compose.yaml", "--project-name", projectName, "exec", "-T", "container1", "curl", "http://alias-of-container2/")
assert.Assert(t, !strings.Contains(res.Stdout(), "Welcome to nginx!"), res.Stdout()) assert.Assert(t, strings.Contains(res.Stdout(), "Welcome to nginx!"), res.Stdout())
}) })
t.Run("down", func(t *testing.T) { t.Run("down", func(t *testing.T) {