diff --git a/server/proxy/containers.go b/server/proxy/containers.go index 9509e5043..e401e02a6 100644 --- a/server/proxy/containers.go +++ b/server/proxy/containers.go @@ -6,6 +6,7 @@ import ( "github.com/docker/api/containers" containersv1 "github.com/docker/api/protos/containers/v1" + "github.com/docker/api/server/proxy/streams" ) func portsToGrpc(ports []containers.Port) []*containersv1.Port { @@ -97,7 +98,10 @@ func (p *proxy) Exec(ctx context.Context, request *containersv1.ExecRequest) (*c return &containersv1.ExecResponse{}, errors.New("unknown stream id") } - err := Client(ctx).ContainerService().Exec(ctx, request.GetId(), request.GetCommand(), &reader{stream}, &writer{stream}) + io := &streams.IO{ + Stream: stream, + } + err := Client(ctx).ContainerService().Exec(ctx, request.GetId(), request.GetCommand(), io, io) return &containersv1.ExecResponse{}, err } @@ -108,6 +112,8 @@ func (p *proxy) Logs(request *containersv1.LogsRequest, stream containersv1.Cont return c.ContainerService().Logs(ctx, request.GetContainerId(), containers.LogsRequest{ Follow: request.Follow, - Writer: &logStream{stream}, + Writer: &streams.Log{ + Stream: stream, + }, }) } diff --git a/server/proxy/logstream.go b/server/proxy/logstream.go deleted file mode 100644 index 95e63f352..000000000 --- a/server/proxy/logstream.go +++ /dev/null @@ -1,25 +0,0 @@ -package proxy - -import ( - "io" - - "google.golang.org/grpc" - - containersv1 "github.com/docker/api/protos/containers/v1" -) - -type logStream struct { - stream grpc.ServerStream -} - -func newStreamWriter(stream grpc.ServerStream) io.Writer { - return &logStream{ - stream: stream, - } -} - -func (w *logStream) Write(p []byte) (n int, err error) { - return len(p), w.stream.SendMsg(&containersv1.LogsResponse{ - Value: p, - }) -} diff --git a/server/proxy/proxy.go b/server/proxy/proxy.go index ef3f62c27..1756ef61e 100644 --- a/server/proxy/proxy.go +++ b/server/proxy/proxy.go @@ -8,6 +8,7 @@ import ( containersv1 "github.com/docker/api/protos/containers/v1" contextsv1 "github.com/docker/api/protos/contexts/v1" streamsv1 "github.com/docker/api/protos/streams/v1" + "github.com/docker/api/server/proxy/streams" ) type clientKey struct{} @@ -34,7 +35,7 @@ type Proxy interface { type proxy struct { currentContext string mu sync.Mutex - streams map[string]*Stream + streams map[string]*streams.Stream contextsProxy *contextsProxy } @@ -42,7 +43,7 @@ type proxy struct { func New(currentContext string) Proxy { return &proxy{ currentContext: currentContext, - streams: map[string]*Stream{}, + streams: map[string]*streams.Stream{}, contextsProxy: &contextsProxy{}, } } diff --git a/server/proxy/streams.go b/server/proxy/streams.go index 60acb9c6e..3efa1c056 100644 --- a/server/proxy/streams.go +++ b/server/proxy/streams.go @@ -1,41 +1,14 @@ package proxy import ( - "sync" - "github.com/containerd/containerd/log" - "github.com/golang/protobuf/ptypes" "github.com/google/uuid" - "google.golang.org/grpc" "google.golang.org/grpc/metadata" streamsv1 "github.com/docker/api/protos/streams/v1" + "github.com/docker/api/server/proxy/streams" ) -// Stream is a bidirectional stream for container IO -type Stream struct { - streamsv1.Streaming_NewStreamServer - - errm sync.Mutex - errChan chan<- error -} - -// CloseWithError sends the result of an action to the errChan or nil -// if no erros -func (s *Stream) CloseWithError(err error) error { - s.errm.Lock() - defer s.errm.Unlock() - - if s.errChan != nil { - if err != nil { - s.errChan <- err - } - close(s.errChan) - s.errChan = nil - } - return nil -} - func (p *proxy) NewStream(stream streamsv1.Streaming_NewStreamServer) error { var ( ctx = stream.Context() @@ -53,9 +26,9 @@ func (p *proxy) NewStream(stream streamsv1.Streaming_NewStreamServer) error { errc := make(chan error) p.mu.Lock() - p.streams[id] = &Stream{ + p.streams[id] = &streams.Stream{ Streaming_NewStreamServer: stream, - errChan: errc, + ErrChan: errc, } p.mu.Unlock() @@ -73,46 +46,3 @@ func (p *proxy) NewStream(stream streamsv1.Streaming_NewStreamServer) error { return ctx.Err() } } - -// io.Reader that forwards everything to the stream -type reader struct { - stream *Stream -} - -func (r reader) Read(p []byte) (int, error) { - a, err := r.stream.Recv() - if err != nil { - return 0, err - } - - var m streamsv1.BytesMessage - err = ptypes.UnmarshalAny(a, &m) - if err != nil { - return 0, err - } - - return copy(p, m.Value), nil -} - -// io.Writer that writes -type writer struct { - stream grpc.ServerStream -} - -func (w *writer) Write(p []byte) (n int, err error) { - if len(p) == 0 { - return 0, nil - } - - message := streamsv1.BytesMessage{ - Type: streamsv1.IOStream_STDOUT, - Value: p, - } - - m, err := ptypes.MarshalAny(&message) - if err != nil { - return 0, err - } - - return len(message.Value), w.stream.SendMsg(m) -} diff --git a/server/proxy/streams/io.go b/server/proxy/streams/io.go new file mode 100644 index 000000000..a957d611a --- /dev/null +++ b/server/proxy/streams/io.go @@ -0,0 +1,45 @@ +package streams + +import ( + "github.com/golang/protobuf/ptypes" + + streamsv1 "github.com/docker/api/protos/streams/v1" +) + +// IO implements an io.ReadWriter that forwards everything to the stream +type IO struct { + Stream *Stream +} + +func (io *IO) Read(p []byte) (int, error) { + a, err := io.Stream.Recv() + if err != nil { + return 0, err + } + + var m streamsv1.BytesMessage + err = ptypes.UnmarshalAny(a, &m) + if err != nil { + return 0, err + } + + return copy(p, m.Value), nil +} + +func (io *IO) Write(p []byte) (n int, err error) { + if len(p) == 0 { + return 0, nil + } + + message := streamsv1.BytesMessage{ + Type: streamsv1.IOStream_STDOUT, + Value: p, + } + + m, err := ptypes.MarshalAny(&message) + if err != nil { + return 0, err + } + + return len(message.Value), io.Stream.SendMsg(m) +} diff --git a/server/proxy/streams/logs.go b/server/proxy/streams/logs.go new file mode 100644 index 000000000..15f24f5f1 --- /dev/null +++ b/server/proxy/streams/logs.go @@ -0,0 +1,26 @@ +package streams + +import ( + "io" + + "google.golang.org/grpc" + + containersv1 "github.com/docker/api/protos/containers/v1" +) + +// Log implements an io.Writer that proxies logs over a gRPC stream +type Log struct { + Stream grpc.ServerStream +} + +func newStreamWriter(stream grpc.ServerStream) io.Writer { + return &Log{ + Stream: stream, + } +} + +func (w *Log) Write(p []byte) (n int, err error) { + return len(p), w.Stream.SendMsg(&containersv1.LogsResponse{ + Value: p, + }) +} diff --git a/server/proxy/logstream_test.go b/server/proxy/streams/logs_test.go similarity index 98% rename from server/proxy/logstream_test.go rename to server/proxy/streams/logs_test.go index fa045a009..fbf1b8df0 100644 --- a/server/proxy/logstream_test.go +++ b/server/proxy/streams/logs_test.go @@ -1,4 +1,4 @@ -package proxy +package streams import ( "context" diff --git a/server/proxy/streams/stream.go b/server/proxy/streams/stream.go new file mode 100644 index 000000000..218eb5c1a --- /dev/null +++ b/server/proxy/streams/stream.go @@ -0,0 +1,31 @@ +package streams + +import ( + "sync" + + streamsv1 "github.com/docker/api/protos/streams/v1" +) + +// Stream is a bidirectional stream for container IO +type Stream struct { + streamsv1.Streaming_NewStreamServer + + errm sync.Mutex + ErrChan chan<- error +} + +// CloseWithError sends the result of an action to the errChan or nil +// if no erros +func (s *Stream) CloseWithError(err error) error { + s.errm.Lock() + defer s.errm.Unlock() + + if s.ErrChan != nil { + if err != nil { + s.ErrChan <- err + } + close(s.ErrChan) + s.ErrChan = nil + } + return nil +} diff --git a/server/proxy/streams_test.go b/server/proxy/streams/stream_test.go similarity index 92% rename from server/proxy/streams_test.go rename to server/proxy/streams/stream_test.go index 90c6ae5db..ecbb5f111 100644 --- a/server/proxy/streams_test.go +++ b/server/proxy/streams/stream_test.go @@ -1,4 +1,4 @@ -package proxy +package streams import ( "context" @@ -54,7 +54,7 @@ func (bs *byteStream) RecvMsg(m interface{}) error { return nil } -func getReader(t *testing.T, in []byte, errResult error) reader { +func getReader(t *testing.T, in []byte, errResult error) IO { message := streamsv1.BytesMessage{ Type: streamsv1.IOStream_STDOUT, Value: in, @@ -62,8 +62,8 @@ func getReader(t *testing.T, in []byte, errResult error) reader { m, err := ptypes.MarshalAny(&message) require.Nil(t, err) - return reader{ - stream: &Stream{ + return IO{ + Stream: &Stream{ Streaming_NewStreamServer: &byteStream{ recvResult: m, recvErr: errResult, @@ -109,7 +109,11 @@ func TestStreamWriter(t *testing.T) { expected := getAny(t, in) bs := byteStream{} - w := writer{stream: &bs} + w := IO{ + Stream: &Stream{ + Streaming_NewStreamServer: &bs, + }, + } n, err := w.Write(in) assert.Nil(t, err) diff --git a/server/server.go b/server/server.go index 533810fc6..159362488 100644 --- a/server/server.go +++ b/server/server.go @@ -126,8 +126,6 @@ func getContext(ctx context.Context) string { // configureContext populates the request context with objects the client // needs: the context store and the api client func configureContext(ctx context.Context, currentContext string) (context.Context, error) { - // s := store.ContextStore(ctx) - // ctx = store.WithContextStore(ctx, s) if currentContext != "" { ctx = apicontext.WithCurrentContext(ctx, currentContext) }