Change the current context of the client on each request

* the interceptor takes the current context from the metadat
* each handler needs to call `client.SetContext()` before using the
sevices
This commit is contained in:
Djordje Lukic 2020-04-29 23:10:06 +02:00
parent 40a3a20f78
commit 9ea91791b4
7 changed files with 65 additions and 27 deletions

View File

@ -21,7 +21,7 @@ var PsCommand = cobra.Command{
return errors.Wrap(err, "cannot connect to backend")
}
containers, err := c.ContainerService(ctx).List(ctx)
containers, err := c.ContainerService().List(ctx)
if err != nil {
return errors.Wrap(err, "fetch containers")
}

View File

@ -50,7 +50,7 @@ func runServe(ctx context.Context, opts serveOpts) error {
return err
}
p := proxy.NewContainerApi(c.ContainerService(ctx))
p := proxy.NewContainerApi(c)
containersv1.RegisterContainersServer(s, p)
cliv1.RegisterCliServer(s, &cliServer{

View File

@ -119,7 +119,19 @@ func main() {
ctx, cancel := util.NewSigContext()
defer cancel()
ctx, err := apicontext.WithCurrentContext(ctx, opts.Config, opts.Context)
config, err := apicontext.LoadConfigFile(opts.Config, "config.json")
if err != nil {
logrus.Fatal("unable ot find configuration")
}
currentContext := opts.Context
if currentContext == "" {
currentContext = config.CurrentContext
}
if currentContext == "" {
currentContext = "default"
}
ctx = apicontext.WithCurrentContext(ctx, opts.Context)
if err != nil {
logrus.Fatal(err)
}

View File

@ -69,6 +69,21 @@ type Client struct {
cc containers.ContainerService
}
func (c *Client) ContainerService(ctx context.Context) containers.ContainerService {
func (c *Client) SetContext(ctx context.Context, contextType string) error {
b, err := backend.Get(ctx, contextType)
if err != nil {
return err
}
ba, ok := b.(containers.ContainerService)
if !ok {
return errors.New("unknown context type")
}
c.cc = ba
return nil
}
func (c *Client) ContainerService() containers.ContainerService {
return c.cc
}

View File

@ -3,23 +3,29 @@ package proxy
import (
"context"
"github.com/docker/api/containers"
"github.com/docker/api/client"
v1 "github.com/docker/api/containers/v1"
apicontext "github.com/docker/api/context"
"github.com/golang/protobuf/ptypes/empty"
)
func NewContainerApi(client containers.ContainerService) v1.ContainersServer {
func NewContainerApi(client *client.Client) v1.ContainersServer {
return &proxyContainerApi{
client: client,
}
}
type proxyContainerApi struct {
client containers.ContainerService
client *client.Client
}
func (p *proxyContainerApi) List(ctx context.Context, _ *v1.ListRequest) (*v1.ListResponse, error) {
c, err := p.client.List(ctx)
currentContext := apicontext.CurrentContext(ctx)
if err := p.client.SetContext(ctx, currentContext); err != nil {
return &v1.ListResponse{}, nil
}
c, err := p.client.ContainerService().List(ctx)
if err != nil {
return &v1.ListResponse{}, nil
}

View File

@ -3,29 +3,15 @@ package context
import (
gocontext "context"
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
)
const KEY = "context_key"
type currentContextKey struct{}
func WithCurrentContext(ctx gocontext.Context, configName string, contextName string) (context.Context, error) {
config, err := LoadConfigFile(configName, "config.json")
if err != nil {
return ctx, err
}
currentContext := contextName
if currentContext == "" {
currentContext = config.CurrentContext
}
if currentContext == "" {
currentContext = "default"
}
logrus.Debugf("Current context %q", currentContext)
return context.WithValue(ctx, currentContextKey{}, currentContext), nil
func WithCurrentContext(ctx gocontext.Context, contextName string) context.Context {
return context.WithValue(ctx, currentContextKey{}, contextName)
}
// CurrentContext returns the current context name

View File

@ -29,17 +29,23 @@ package server
import (
"context"
"errors"
apicontext "github.com/docker/api/context"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/metadata"
)
// New returns a new GRPC server.
func New() *grpc.Server {
s := grpc.NewServer(
grpc.UnaryInterceptor(unary),
grpc.ChainUnaryInterceptor(
unaryMeta,
unary,
),
grpc.StreamInterceptor(stream),
)
hs := health.NewServer()
@ -54,3 +60,16 @@ func unary(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, han
func stream(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
return grpc_prometheus.StreamServerInterceptor(srv, ss, info, handler)
}
func unaryMeta(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, errors.New("missing metadata")
}
key := md[apicontext.KEY]
if len(key) == 1 {
ctx = apicontext.WithCurrentContext(ctx, key[0])
}
m, err := handler(ctx, req)
return m, err
}