Merge pull request #221 from rumpl/feat-context-metadata

Use the context from the metadata if it exists
This commit is contained in:
Djordje Lukic 2020-06-16 00:43:36 -07:00 committed by GitHub
commit 50c68ce4dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 275 additions and 107 deletions

View File

@ -0,0 +1,41 @@
package server
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
// A gRPC server stream will only let you get its context but
// there is no way to set a new (augmented context) to the next
// handler (like we do for a unary request). We need to wrap the grpc.ServerSteam
// to be able to set a new context that will be sent to the next stream interceptor.
type contextServerStream struct {
ss grpc.ServerStream
ctx context.Context
}
func (css *contextServerStream) SetHeader(md metadata.MD) error {
return css.ss.SetHeader(md)
}
func (css *contextServerStream) SendHeader(md metadata.MD) error {
return css.ss.SendHeader(md)
}
func (css *contextServerStream) SetTrailer(md metadata.MD) {
css.ss.SetTrailer(md)
}
func (css *contextServerStream) Context() context.Context {
return css.ctx
}
func (css *contextServerStream) SendMsg(m interface{}) error {
return css.ss.SendMsg(m)
}
func (css *contextServerStream) RecvMsg(m interface{}) error {
return css.ss.RecvMsg(m)
}

111
server/interceptor.go Normal file
View File

@ -0,0 +1,111 @@
package server
import (
"context"
"errors"
"strings"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"github.com/docker/api/client"
"github.com/docker/api/config"
apicontext "github.com/docker/api/context"
"github.com/docker/api/context/store"
"github.com/docker/api/server/proxy"
)
// key is the key where the current docker context is stored in the metadata
// of a gRPC request
const key = "context_key"
// unaryServerInterceptor configures the context and sends it to the next handler
func unaryServerInterceptor(clictx context.Context) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
currentContext, err := getIncomingContext(ctx)
if err != nil {
currentContext, err = getConfigContext(clictx)
if err != nil {
return nil, err
}
}
configuredCtx, err := configureContext(clictx, currentContext, info.FullMethod)
if err != nil {
return nil, err
}
return handler(configuredCtx, req)
}
}
// streamServerInterceptor configures the context and sends it to the next handler
func streamServerInterceptor(clictx context.Context) grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
currentContext, err := getIncomingContext(ss.Context())
if err != nil {
currentContext, err = getConfigContext(clictx)
if err != nil {
return err
}
}
ctx, err := configureContext(clictx, currentContext, info.FullMethod)
if err != nil {
return err
}
return handler(srv, &contextServerStream{
ss: ss,
ctx: ctx,
})
}
}
// Returns the current context from the configuration file
func getConfigContext(ctx context.Context) (string, error) {
configDir := config.Dir(ctx)
configFile, err := config.LoadFile(configDir)
if err != nil {
return "", err
}
return configFile.CurrentContext, nil
}
// Returns the context set by the caller if any, error otherwise
func getIncomingContext(ctx context.Context) (string, error) {
if md, ok := metadata.FromIncomingContext(ctx); ok {
if key, ok := md[key]; ok {
return key[0], nil
}
}
return "", errors.New("not found")
}
// configureContext populates the request context with objects the client
// needs: the context store and the api client
func configureContext(ctx context.Context, currentContext string, method string) (context.Context, error) {
configDir := config.Dir(ctx)
ctx = apicontext.WithCurrentContext(ctx, currentContext)
// The contexts service doesn't need the client
if !strings.Contains(method, "/com.docker.api.protos.context.v1.Contexts") {
c, err := client.New(ctx)
if err != nil {
return nil, err
}
ctx, err = proxy.WithClient(ctx, c)
if err != nil {
return nil, err
}
}
s, err := store.New(store.WithRoot(configDir))
if err != nil {
return nil, err
}
ctx = store.WithContextStore(ctx, s)
return ctx, nil
}

121
server/interceptor_test.go Normal file
View File

@ -0,0 +1,121 @@
package server
import (
"context"
"io/ioutil"
"os"
"path"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"github.com/docker/api/config"
apicontext "github.com/docker/api/context"
)
type interceptorSuite struct {
suite.Suite
dir string
ctx context.Context
}
func (is *interceptorSuite) BeforeTest(suiteName, testName string) {
dir, err := ioutil.TempDir("", "example")
require.Nil(is.T(), err)
ctx := context.Background()
ctx = config.WithDir(ctx, dir)
err = ioutil.WriteFile(path.Join(dir, "config.json"), []byte(`{"currentContext": "default"}`), 0644)
require.Nil(is.T(), err)
is.dir = dir
is.ctx = ctx
}
func (is *interceptorSuite) AfterTest(suiteName, tesName string) {
err := os.RemoveAll(is.dir)
require.Nil(is.T(), err)
}
func (is *interceptorSuite) TestUnaryGetCurrentContext() {
interceptor := unaryServerInterceptor(is.ctx)
currentContext := is.callUnary(context.Background(), interceptor)
assert.Equal(is.T(), "default", currentContext)
}
func (is *interceptorSuite) TestUnaryContextFromMetadata() {
contextName := "test"
interceptor := unaryServerInterceptor(is.ctx)
reqCtx := context.Background()
reqCtx = metadata.NewIncomingContext(reqCtx, metadata.MD{
(key): []string{contextName},
})
currentContext := is.callUnary(reqCtx, interceptor)
assert.Equal(is.T(), contextName, currentContext)
}
func (is *interceptorSuite) TestStreamGetCurrentContext() {
interceptor := streamServerInterceptor(is.ctx)
currentContext := is.callStream(context.Background(), interceptor)
assert.Equal(is.T(), "default", currentContext)
}
func (is *interceptorSuite) TestStreamContextFromMetadata() {
contextName := "test"
interceptor := streamServerInterceptor(is.ctx)
reqCtx := context.Background()
reqCtx = metadata.NewIncomingContext(reqCtx, metadata.MD{
(key): []string{contextName},
})
currentContext := is.callStream(reqCtx, interceptor)
assert.Equal(is.T(), contextName, currentContext)
}
func (is *interceptorSuite) callStream(ctx context.Context, interceptor grpc.StreamServerInterceptor) string {
currentContext := ""
err := interceptor(nil, &contextServerStream{
ctx: ctx,
}, &grpc.StreamServerInfo{
FullMethod: "/com.docker.api.protos.context.v1.Contexts/test",
}, func(srv interface{}, stream grpc.ServerStream) error {
currentContext = apicontext.CurrentContext(stream.Context())
return nil
})
require.Nil(is.T(), err)
return currentContext
}
func (is *interceptorSuite) callUnary(ctx context.Context, interceptor grpc.UnaryServerInterceptor) string {
currentContext := ""
resp, err := interceptor(ctx, nil, &grpc.UnaryServerInfo{
FullMethod: "/com.docker.api.protos.context.v1.Contexts/test",
}, func(ctx context.Context, req interface{}) (interface{}, error) {
currentContext = apicontext.CurrentContext(ctx)
return nil, nil
})
require.Nil(is.T(), err)
require.Nil(is.T(), resp)
return currentContext
}
func TestInterceptor(t *testing.T) {
suite.Run(t, new(interceptorSuite))
}

View File

@ -35,13 +35,6 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/metadata"
"github.com/docker/api/client"
"github.com/docker/api/config"
apicontext "github.com/docker/api/context"
"github.com/docker/api/context/store"
"github.com/docker/api/server/proxy"
)
// New returns a new GRPC server.
@ -55,109 +48,11 @@ func New(ctx context.Context) *grpc.Server {
return s
}
//CreateListener creates a listener either on tcp://, or local listener, supporting unix:// for unix socket or npipe:// for named pipes on windows
// CreateListener creates a listener either on tcp://, or local listener,
// supporting unix:// for unix socket or npipe:// for named pipes on windows
func CreateListener(address string) (net.Listener, error) {
if strings.HasPrefix(address, "tcp://") {
return net.Listen("tcp", strings.TrimPrefix(address, "tcp://"))
}
return createLocalListener(address)
}
// unaryServerInterceptor configures the context and sends it to the next handler
func unaryServerInterceptor(clictx context.Context) func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
configuredCtx, err := configureContext(clictx, info.FullMethod)
if err != nil {
return nil, err
}
return handler(configuredCtx, req)
}
}
// streamServerInterceptor configures the context and sends it to the next handler
func streamServerInterceptor(clictx context.Context) func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
ctx, err := configureContext(clictx, info.FullMethod)
if err != nil {
return err
}
return handler(srv, newServerStream(ctx, ss))
}
}
// configureContext populates the request context with objects the client
// needs: the context store and the api client
func configureContext(ctx context.Context, method string) (context.Context, error) {
configDir := config.Dir(ctx)
configFile, err := config.LoadFile(configDir)
if err != nil {
return nil, err
}
if configFile.CurrentContext != "" {
ctx = apicontext.WithCurrentContext(ctx, configFile.CurrentContext)
}
// The contexts service doesn't need the client
if !strings.Contains(method, "/com.docker.api.protos.context.v1.Contexts") {
c, err := client.New(ctx)
if err != nil {
return nil, err
}
ctx, err = proxy.WithClient(ctx, c)
if err != nil {
return nil, err
}
}
s, err := store.New(store.WithRoot(configDir))
if err != nil {
return nil, err
}
ctx = store.WithContextStore(ctx, s)
return ctx, nil
}
// A gRPC server stream will only let you get its context but
// there is no way to set a new (augmented context) to the next
// handler (like we do for a unary request). We need to wrap the grpc.ServerSteam
// to be able to set a new context that will be sent to the next stream interceptor.
type contextServerStream struct {
s grpc.ServerStream
ctx context.Context
}
func newServerStream(ctx context.Context, s grpc.ServerStream) grpc.ServerStream {
return &contextServerStream{
s: s,
ctx: ctx,
}
}
func (css *contextServerStream) SetHeader(md metadata.MD) error {
return css.s.SetHeader(md)
}
func (css *contextServerStream) SendHeader(md metadata.MD) error {
return css.s.SendHeader(md)
}
func (css *contextServerStream) SetTrailer(md metadata.MD) {
css.s.SetTrailer(md)
}
func (css *contextServerStream) Context() context.Context {
return css.ctx
}
func (css *contextServerStream) SendMsg(m interface{}) error {
return css.s.SendMsg(m)
}
func (css *contextServerStream) RecvMsg(m interface{}) error {
return css.s.RecvMsg(m)
}