Move streams into own package

This commit is contained in:
Djordje Lukic 2020-06-06 22:46:35 +02:00
parent bb69de1db3
commit c8079d61ee
10 changed files with 126 additions and 110 deletions

View File

@ -6,6 +6,7 @@ import (
"github.com/docker/api/containers"
containersv1 "github.com/docker/api/protos/containers/v1"
"github.com/docker/api/server/proxy/streams"
)
func portsToGrpc(ports []containers.Port) []*containersv1.Port {
@ -97,7 +98,10 @@ func (p *proxy) Exec(ctx context.Context, request *containersv1.ExecRequest) (*c
return &containersv1.ExecResponse{}, errors.New("unknown stream id")
}
err := Client(ctx).ContainerService().Exec(ctx, request.GetId(), request.GetCommand(), &reader{stream}, &writer{stream})
io := &streams.IO{
Stream: stream,
}
err := Client(ctx).ContainerService().Exec(ctx, request.GetId(), request.GetCommand(), io, io)
return &containersv1.ExecResponse{}, err
}
@ -108,6 +112,8 @@ func (p *proxy) Logs(request *containersv1.LogsRequest, stream containersv1.Cont
return c.ContainerService().Logs(ctx, request.GetContainerId(), containers.LogsRequest{
Follow: request.Follow,
Writer: &logStream{stream},
Writer: &streams.Log{
Stream: stream,
},
})
}

View File

@ -1,25 +0,0 @@
package proxy
import (
"io"
"google.golang.org/grpc"
containersv1 "github.com/docker/api/protos/containers/v1"
)
type logStream struct {
stream grpc.ServerStream
}
func newStreamWriter(stream grpc.ServerStream) io.Writer {
return &logStream{
stream: stream,
}
}
func (w *logStream) Write(p []byte) (n int, err error) {
return len(p), w.stream.SendMsg(&containersv1.LogsResponse{
Value: p,
})
}

View File

@ -8,6 +8,7 @@ import (
containersv1 "github.com/docker/api/protos/containers/v1"
contextsv1 "github.com/docker/api/protos/contexts/v1"
streamsv1 "github.com/docker/api/protos/streams/v1"
"github.com/docker/api/server/proxy/streams"
)
type clientKey struct{}
@ -34,7 +35,7 @@ type Proxy interface {
type proxy struct {
currentContext string
mu sync.Mutex
streams map[string]*Stream
streams map[string]*streams.Stream
contextsProxy *contextsProxy
}
@ -42,7 +43,7 @@ type proxy struct {
func New(currentContext string) Proxy {
return &proxy{
currentContext: currentContext,
streams: map[string]*Stream{},
streams: map[string]*streams.Stream{},
contextsProxy: &contextsProxy{},
}
}

View File

@ -1,41 +1,14 @@
package proxy
import (
"sync"
"github.com/containerd/containerd/log"
"github.com/golang/protobuf/ptypes"
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
streamsv1 "github.com/docker/api/protos/streams/v1"
"github.com/docker/api/server/proxy/streams"
)
// Stream is a bidirectional stream for container IO
type Stream struct {
streamsv1.Streaming_NewStreamServer
errm sync.Mutex
errChan chan<- error
}
// CloseWithError sends the result of an action to the errChan or nil
// if no erros
func (s *Stream) CloseWithError(err error) error {
s.errm.Lock()
defer s.errm.Unlock()
if s.errChan != nil {
if err != nil {
s.errChan <- err
}
close(s.errChan)
s.errChan = nil
}
return nil
}
func (p *proxy) NewStream(stream streamsv1.Streaming_NewStreamServer) error {
var (
ctx = stream.Context()
@ -53,9 +26,9 @@ func (p *proxy) NewStream(stream streamsv1.Streaming_NewStreamServer) error {
errc := make(chan error)
p.mu.Lock()
p.streams[id] = &Stream{
p.streams[id] = &streams.Stream{
Streaming_NewStreamServer: stream,
errChan: errc,
ErrChan: errc,
}
p.mu.Unlock()
@ -73,46 +46,3 @@ func (p *proxy) NewStream(stream streamsv1.Streaming_NewStreamServer) error {
return ctx.Err()
}
}
// io.Reader that forwards everything to the stream
type reader struct {
stream *Stream
}
func (r reader) Read(p []byte) (int, error) {
a, err := r.stream.Recv()
if err != nil {
return 0, err
}
var m streamsv1.BytesMessage
err = ptypes.UnmarshalAny(a, &m)
if err != nil {
return 0, err
}
return copy(p, m.Value), nil
}
// io.Writer that writes
type writer struct {
stream grpc.ServerStream
}
func (w *writer) Write(p []byte) (n int, err error) {
if len(p) == 0 {
return 0, nil
}
message := streamsv1.BytesMessage{
Type: streamsv1.IOStream_STDOUT,
Value: p,
}
m, err := ptypes.MarshalAny(&message)
if err != nil {
return 0, err
}
return len(message.Value), w.stream.SendMsg(m)
}

View File

@ -0,0 +1,45 @@
package streams
import (
"github.com/golang/protobuf/ptypes"
streamsv1 "github.com/docker/api/protos/streams/v1"
)
// IO implements an io.ReadWriter that forwards everything to the stream
type IO struct {
Stream *Stream
}
func (io *IO) Read(p []byte) (int, error) {
a, err := io.Stream.Recv()
if err != nil {
return 0, err
}
var m streamsv1.BytesMessage
err = ptypes.UnmarshalAny(a, &m)
if err != nil {
return 0, err
}
return copy(p, m.Value), nil
}
func (io *IO) Write(p []byte) (n int, err error) {
if len(p) == 0 {
return 0, nil
}
message := streamsv1.BytesMessage{
Type: streamsv1.IOStream_STDOUT,
Value: p,
}
m, err := ptypes.MarshalAny(&message)
if err != nil {
return 0, err
}
return len(message.Value), io.Stream.SendMsg(m)
}

View File

@ -0,0 +1,26 @@
package streams
import (
"io"
"google.golang.org/grpc"
containersv1 "github.com/docker/api/protos/containers/v1"
)
// Log implements an io.Writer that proxies logs over a gRPC stream
type Log struct {
Stream grpc.ServerStream
}
func newStreamWriter(stream grpc.ServerStream) io.Writer {
return &Log{
Stream: stream,
}
}
func (w *Log) Write(p []byte) (n int, err error) {
return len(p), w.Stream.SendMsg(&containersv1.LogsResponse{
Value: p,
})
}

View File

@ -1,4 +1,4 @@
package proxy
package streams
import (
"context"

View File

@ -0,0 +1,31 @@
package streams
import (
"sync"
streamsv1 "github.com/docker/api/protos/streams/v1"
)
// Stream is a bidirectional stream for container IO
type Stream struct {
streamsv1.Streaming_NewStreamServer
errm sync.Mutex
ErrChan chan<- error
}
// CloseWithError sends the result of an action to the errChan or nil
// if no erros
func (s *Stream) CloseWithError(err error) error {
s.errm.Lock()
defer s.errm.Unlock()
if s.ErrChan != nil {
if err != nil {
s.ErrChan <- err
}
close(s.ErrChan)
s.ErrChan = nil
}
return nil
}

View File

@ -1,4 +1,4 @@
package proxy
package streams
import (
"context"
@ -54,7 +54,7 @@ func (bs *byteStream) RecvMsg(m interface{}) error {
return nil
}
func getReader(t *testing.T, in []byte, errResult error) reader {
func getReader(t *testing.T, in []byte, errResult error) IO {
message := streamsv1.BytesMessage{
Type: streamsv1.IOStream_STDOUT,
Value: in,
@ -62,8 +62,8 @@ func getReader(t *testing.T, in []byte, errResult error) reader {
m, err := ptypes.MarshalAny(&message)
require.Nil(t, err)
return reader{
stream: &Stream{
return IO{
Stream: &Stream{
Streaming_NewStreamServer: &byteStream{
recvResult: m,
recvErr: errResult,
@ -109,7 +109,11 @@ func TestStreamWriter(t *testing.T) {
expected := getAny(t, in)
bs := byteStream{}
w := writer{stream: &bs}
w := IO{
Stream: &Stream{
Streaming_NewStreamServer: &bs,
},
}
n, err := w.Write(in)
assert.Nil(t, err)

View File

@ -126,8 +126,6 @@ func getContext(ctx context.Context) string {
// configureContext populates the request context with objects the client
// needs: the context store and the api client
func configureContext(ctx context.Context, currentContext string) (context.Context, error) {
// s := store.ContextStore(ctx)
// ctx = store.WithContextStore(ctx, s)
if currentContext != "" {
ctx = apicontext.WithCurrentContext(ctx, currentContext)
}