Merge pull request #1878 from ndeloof/exec_streams

This commit is contained in:
Nicolas De loof 2021-07-02 11:05:06 +02:00 committed by GitHub
commit 4049feb74d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 34 additions and 30 deletions

View File

@ -92,8 +92,9 @@ func runExec(ctx context.Context, backend api.Service, opts execOpts) error {
Detach: opts.detach, Detach: opts.detach,
WorkingDir: opts.workingDir, WorkingDir: opts.workingDir,
Writer: os.Stdout, Stdin: os.Stdin,
Reader: os.Stdin, Stdout: os.Stdout,
Stderr: os.Stderr,
} }
if execOpts.Tty { if execOpts.Tty {
@ -107,8 +108,9 @@ func runExec(ctx context.Context, backend api.Service, opts execOpts) error {
} }
}() }()
execOpts.Writer = con execOpts.Stdin = con
execOpts.Reader = con execOpts.Stdout = con
execOpts.Stderr = con
} }
exitCode, err := backend.Exec(ctx, project, execOpts) exitCode, err := backend.Exec(ctx, project, execOpts)
if exitCode != 0 { if exitCode != 0 {

View File

@ -189,8 +189,9 @@ func runRun(ctx context.Context, backend api.Service, project *types.Project, op
Command: opts.Command, Command: opts.Command,
Detach: opts.Detach, Detach: opts.Detach,
AutoRemove: opts.Remove, AutoRemove: opts.Remove,
Writer: os.Stdout, Stdin: os.Stdin,
Reader: os.Stdin, Stdout: os.Stdout,
Stderr: os.Stderr,
Tty: !opts.noTty, Tty: !opts.noTty,
WorkingDir: opts.workdir, WorkingDir: opts.workdir,
User: opts.user, User: opts.user,

View File

@ -125,7 +125,7 @@ func (kc KubeClient) Exec(ctx context.Context, projectName string, opts api.RunO
TTY: opts.Tty, TTY: opts.Tty,
} }
if opts.Reader == nil { if opts.Stdin == nil {
option.Stdin = false option.Stdin = false
} }
@ -141,9 +141,9 @@ func (kc KubeClient) Exec(ctx context.Context, projectName string, opts api.RunO
return err return err
} }
return exec.Stream(remotecommand.StreamOptions{ return exec.Stream(remotecommand.StreamOptions{
Stdin: opts.Reader, Stdin: opts.Stdin,
Stdout: opts.Writer, Stdout: opts.Stdout,
Stderr: opts.Writer, Stderr: opts.Stdout,
Tty: opts.Tty, Tty: opts.Tty,
}) })
} }

View File

@ -208,8 +208,9 @@ type RunOptions struct {
Entrypoint []string Entrypoint []string
Detach bool Detach bool
AutoRemove bool AutoRemove bool
Writer io.WriteCloser Stdin io.ReadCloser
Reader io.ReadCloser Stdout io.WriteCloser
Stderr io.WriteCloser
Tty bool Tty bool
WorkingDir string WorkingDir string
User string User string

View File

@ -78,21 +78,21 @@ func (s *composeService) attachContainer(ctx context.Context, container moby.Con
Line: line, Line: line,
}) })
}) })
_, _, err = s.attachContainerStreams(ctx, container.ID, service.Tty, nil, w) _, _, err = s.attachContainerStreams(ctx, container.ID, service.Tty, nil, w, w)
return err return err
} }
func (s *composeService) attachContainerStreams(ctx context.Context, container string, tty bool, r io.ReadCloser, w io.Writer) (func(), chan bool, error) { func (s *composeService) attachContainerStreams(ctx context.Context, container string, tty bool, stdin io.ReadCloser, stdout, stderr io.Writer) (func(), chan bool, error) {
detached := make(chan bool) detached := make(chan bool)
var ( var (
in *streams.In in *streams.In
restore = func() { /* noop */ } restore = func() { /* noop */ }
) )
if r != nil { if stdin != nil {
in = streams.NewIn(r) in = streams.NewIn(stdin)
} }
stdin, stdout, err := s.getContainerStreams(ctx, container) streamIn, streamOut, err := s.getContainerStreams(ctx, container)
if err != nil { if err != nil {
return restore, detached, err return restore, detached, err
} }
@ -102,10 +102,10 @@ func (s *composeService) attachContainerStreams(ctx context.Context, container s
if in != nil { if in != nil {
in.Close() //nolint:errcheck in.Close() //nolint:errcheck
} }
stdout.Close() //nolint:errcheck streamOut.Close() //nolint:errcheck
}() }()
if in != nil && stdin != nil { if in != nil && streamIn != nil {
if in.IsTerminal() { if in.IsTerminal() {
state, err := term.SetRawTerminal(in.FD()) state, err := term.SetRawTerminal(in.FD())
if err != nil { if err != nil {
@ -116,19 +116,19 @@ func (s *composeService) attachContainerStreams(ctx context.Context, container s
} }
} }
go func() { go func() {
_, err := io.Copy(stdin, r) _, err := io.Copy(streamIn, stdin)
if _, ok := err.(term.EscapeError); ok { if _, ok := err.(term.EscapeError); ok {
close(detached) close(detached)
} }
}() }()
} }
if w != nil { if stdout != nil {
go func() { go func() {
if tty { if tty {
io.Copy(w, stdout) // nolint:errcheck io.Copy(stdout, streamOut) // nolint:errcheck
} else { } else {
stdcopy.StdCopy(w, w, stdout) // nolint:errcheck stdcopy.StdCopy(stdout, stderr, streamOut) // nolint:errcheck
} }
}() }()
} }

View File

@ -96,12 +96,12 @@ func (s *composeService) interactiveExec(ctx context.Context, opts api.RunOption
stdout := ContainerStdout{HijackedResponse: resp} stdout := ContainerStdout{HijackedResponse: resp}
stdin := ContainerStdin{HijackedResponse: resp} stdin := ContainerStdin{HijackedResponse: resp}
r, err := s.getEscapeKeyProxy(opts.Reader) r, err := s.getEscapeKeyProxy(opts.Stdin)
if err != nil { if err != nil {
return err return err
} }
in := streams.NewIn(opts.Reader) in := streams.NewIn(opts.Stdin)
if in.IsTerminal() { if in.IsTerminal() {
state, err := term.SetRawTerminal(in.FD()) state, err := term.SetRawTerminal(in.FD())
if err != nil { if err != nil {
@ -112,10 +112,10 @@ func (s *composeService) interactiveExec(ctx context.Context, opts api.RunOption
go func() { go func() {
if opts.Tty { if opts.Tty {
_, err := io.Copy(opts.Writer, stdout) _, err := io.Copy(opts.Stdout, stdout)
outputDone <- err outputDone <- err
} else { } else {
_, err := stdcopy.StdCopy(opts.Writer, opts.Writer, stdout) _, err := stdcopy.StdCopy(opts.Stdout, opts.Stderr, stdout)
outputDone <- err outputDone <- err
} }
}() }()

View File

@ -74,15 +74,15 @@ func (s *composeService) RunOneOffContainer(ctx context.Context, project *types.
if err != nil { if err != nil {
return 0, err return 0, err
} }
fmt.Fprintln(opts.Writer, containerID) fmt.Fprintln(opts.Stdout, containerID)
return 0, nil return 0, nil
} }
r, err := s.getEscapeKeyProxy(opts.Reader) r, err := s.getEscapeKeyProxy(opts.Stdin)
if err != nil { if err != nil {
return 0, err return 0, err
} }
restore, detachC, err := s.attachContainerStreams(ctx, containerID, service.Tty, r, opts.Writer) restore, detachC, err := s.attachContainerStreams(ctx, containerID, service.Tty, r, opts.Stdout, opts.Stderr)
if err != nil { if err != nil {
return 0, err return 0, err
} }