Refactor the interceptors

Avoid having a function that takes in two contexts as parameters
This commit is contained in:
Djordje Lukic 2020-05-21 14:17:23 +02:00 committed by Djordje Lukic
parent 5945e6a56c
commit 36c01f511b
2 changed files with 43 additions and 28 deletions

View File

@ -49,9 +49,7 @@ func runServe(ctx context.Context, opts serveOpts) error {
p := proxy.NewContainerAPI() p := proxy.NewContainerAPI()
containersv1.RegisterContainersServer(s, p) containersv1.RegisterContainersServer(s, p)
cliv1.RegisterCliServer(s, &cliServer{ cliv1.RegisterCliServer(s, &cliServer{})
ctx,
})
go func() { go func() {
<-ctx.Done() <-ctx.Done()
@ -66,7 +64,6 @@ func runServe(ctx context.Context, opts serveOpts) error {
} }
type cliServer struct { type cliServer struct {
ctx context.Context
} }
func (cs *cliServer) Contexts(ctx context.Context, request *cliv1.ContextsRequest) (*cliv1.ContextsResponse, error) { func (cs *cliServer) Contexts(ctx context.Context, request *cliv1.ContextsRequest) (*cliv1.ContextsResponse, error) {

View File

@ -48,12 +48,12 @@ import (
func New(ctx context.Context) *grpc.Server { func New(ctx context.Context) *grpc.Server {
s := grpc.NewServer( s := grpc.NewServer(
grpc.ChainUnaryInterceptor( grpc.ChainUnaryInterceptor(
unaryMeta(ctx), unaryServerInterceptor(ctx),
unary, unary,
), ),
grpc.ChainStreamInterceptor( grpc.ChainStreamInterceptor(
grpc.StreamServerInterceptor(stream), grpc.StreamServerInterceptor(stream),
grpc.StreamServerInterceptor(streamMeta(ctx)), grpc.StreamServerInterceptor(streamServerInterceptor(ctx)),
), ),
) )
hs := health.NewServer() hs := health.NewServer()
@ -77,9 +77,11 @@ func stream(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo,
return grpc_prometheus.StreamServerInterceptor(srv, ss, info, handler) return grpc_prometheus.StreamServerInterceptor(srv, ss, info, handler)
} }
func unaryMeta(clictx context.Context) func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { // unaryServerInterceptor configures the context and sends it to the next handler
func unaryServerInterceptor(clictx context.Context) func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
configuredCtx, err := configureContext(ctx, clictx) currentContext := getContext(ctx)
configuredCtx, err := configureContext(clictx, currentContext)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -88,35 +90,48 @@ func unaryMeta(clictx context.Context) func(ctx context.Context, req interface{}
} }
} }
func streamMeta(clictx context.Context) func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { // streamServerInterceptor configures the context and sends it to the next handler
func streamServerInterceptor(clictx context.Context) func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
ctx, err := configureContext(ss.Context(), clictx) currentContext := getContext(ss.Context())
ctx, err := configureContext(clictx, currentContext)
if err != nil { if err != nil {
return err return err
} }
nss := newServerStream(ctx, ss) return handler(srv, newServerStream(ctx, ss))
return handler(srv, nss)
} }
} }
// nolint: golint // getContext returns the current context name sent in the request metadata, it
func configureContext(ctx context.Context, clictx context.Context) (context.Context, error) { // returns an empty string if there is no metadata
// not present
func getContext(ctx context.Context) string {
md, ok := metadata.FromIncomingContext(ctx) md, ok := metadata.FromIncomingContext(ctx)
if !ok { if !ok {
return ctx, nil return ""
} }
key, ok := md[apicontext.Key] key, ok := md[apicontext.Key]
if !ok { if !ok {
return ctx, nil return ""
} }
if len(key) == 1 { if len(key) == 1 {
s := store.ContextStore(clictx) return key[0]
}
return ""
}
// configureContext populates the request context with objects the client
// needs: the context store and the api client
func configureContext(ctx context.Context, currentContext string) (context.Context, error) {
s := store.ContextStore(ctx)
ctx = store.WithContextStore(ctx, s) ctx = store.WithContextStore(ctx, s)
ctx = apicontext.WithCurrentContext(ctx, key[0]) if currentContext != "" {
ctx = apicontext.WithCurrentContext(ctx, currentContext)
}
c, err := client.New(ctx) c, err := client.New(ctx)
if err != nil { if err != nil {
@ -127,11 +142,14 @@ func configureContext(ctx context.Context, clictx context.Context) (context.Cont
if err != nil { if err != nil {
return nil, err return nil, err
} }
}
return ctx, nil return ctx, nil
} }
// A gRPC server stream will only let you get its context but
// there is no way to set a new (augmented context) to the next
// handler (like we do for a unary request). We need to wrap the grpc.ServerSteam
// to be able to set a new context that will be sent to the next stream interceptor.
type contextServerStream struct { type contextServerStream struct {
s grpc.ServerStream s grpc.ServerStream
ctx context.Context ctx context.Context