Add `Stop` command on the gRPC side.

This commit is contained in:
Djordje Lukic 2020-05-16 12:13:51 +02:00
parent f118856076
commit ce7cbd4463
10 changed files with 537 additions and 365 deletions

View File

@ -101,7 +101,7 @@ type aciContainerService struct {
ctx store.AciContext
}
func (cs *aciContainerService) List(ctx context.Context) ([]containers.Container, error) {
func (cs *aciContainerService) List(ctx context.Context, _ bool) ([]containers.Container, error) {
var containerGroups []containerinstance.ContainerGroup
result, err := cs.containerGroupsClient.ListByResourceGroup(ctx, cs.ctx.ResourceGroup)
if err != nil {
@ -177,6 +177,10 @@ func (cs *aciContainerService) Run(ctx context.Context, r containers.ContainerCo
return createACIContainers(ctx, cs.ctx, groupDefinition)
}
func (cs *aciContainerService) Stop(ctx context.Context, containerName string) error {
return errors.New("not implemented")
}
func getGroupAndContainerName(containerID string) (groupName string, containerName string) {
tokens := strings.Split(containerID, "_")
groupName = tokens[0]

View File

@ -15,6 +15,7 @@ import (
)
type psOpts struct {
all bool
quiet bool
}
@ -30,6 +31,7 @@ func PsCommand() *cobra.Command {
}
cmd.Flags().BoolVarP(&opts.quiet, "quiet", "q", false, "Only display IDs")
cmd.Flags().BoolVarP(&opts.quiet, "all", "a", false, "Show all containers (default shows just running)")
return cmd
}
@ -40,7 +42,7 @@ func runPs(ctx context.Context, opts psOpts) error {
return errors.Wrap(err, "cannot connect to backend")
}
containers, err := c.ContainerService().List(ctx)
containers, err := c.ContainerService().List(ctx, opts.all)
if err != nil {
return errors.Wrap(err, "fetch containers")
}

View File

@ -54,7 +54,9 @@ type LogsRequest struct {
// Service interacts with the underlying container backend
type Service interface {
// List returns all the containers
List(ctx context.Context) ([]Container, error)
List(ctx context.Context, all bool) ([]Container, error)
// Stop stops the running container
Stop(ctx context.Context, containerName string) error
// Run creates and starts a container
Run(ctx context.Context, config ContainerConfig) error
// Exec executes a command inside a running container

File diff suppressed because it is too large Load Diff

View File

@ -42,16 +42,25 @@ service Containers {
rpc Exec(ExecRequest) returns (ExecResponse);
}
message Port {
uint32 host_port = 1;
uint32 container_port = 2;
string protocol = 3;
string host_ip = 4;
}
message Container {
string id = 1;
string image = 2;
string status = 3;
uint64 cpu_time = 4;
uint64 memory_usage = 5;
uint64 memory_limit = 6;
uint64 pids_current = 7;
uint64 pids_limit = 8;
map<string, string> labels = 9;
string command = 4;
uint64 cpu_time = 5;
uint64 memory_usage = 6;
uint64 memory_limit = 7;
uint64 pids_current = 8;
uint64 pids_limit = 9;
repeated string labels = 10;
repeated Port ports = 11;
}
message CreateRequest {
@ -139,7 +148,7 @@ message KillResponse {
}
message ListRequest {
repeated string filters = 1;
bool all = 1;
}
message ListResponse {

View File

@ -2,6 +2,7 @@ package example
import (
"context"
"errors"
"fmt"
"io"
@ -37,7 +38,7 @@ func init() {
type containerService struct{}
func (cs *containerService) List(ctx context.Context) ([]containers.Container, error) {
func (cs *containerService) List(ctx context.Context, _ bool) ([]containers.Container, error) {
return []containers.Container{
{
ID: "id",
@ -55,6 +56,10 @@ func (cs *containerService) Run(ctx context.Context, r containers.ContainerConfi
return nil
}
func (cs *containerService) Stop(ctx context.Context, containerName string) error {
return errors.New("not implemented")
}
func (cs *containerService) Exec(ctx context.Context, name string, command string, reader io.Reader, writer io.Writer) error {
fmt.Printf("Executing command %q on container %q", command, name)
return nil

View File

@ -3,6 +3,7 @@ package moby
import (
"context"
"io"
"time"
"github.com/docker/api/context/cloud"
@ -51,10 +52,11 @@ func (ms *mobyService) CloudService() cloud.Service {
return nil
}
func (ms *mobyService) List(ctx context.Context) ([]containers.Container, error) {
func (ms *mobyService) List(ctx context.Context, all bool) ([]containers.Container, error) {
css, err := ms.apiClient.ContainerList(ctx, types.ContainerListOptions{
All: false,
All: all,
})
if err != nil {
return []containers.Container{}, err
}
@ -62,8 +64,12 @@ func (ms *mobyService) List(ctx context.Context) ([]containers.Container, error)
var result []containers.Container
for _, container := range css {
result = append(result, containers.Container{
ID: container.ID,
Image: container.Image,
ID: container.ID,
Image: container.Image,
// TODO: `Status` is a human readable string ("Up 24 minutes"),
// we need to return the `State` instead but first we need to
// define an enum on the proto side with all the possible container
// statuses. We also need to add a `Created` property on the gRPC side.
Status: container.Status,
Command: container.Command,
Ports: getPorts(container.Ports),
@ -85,6 +91,11 @@ func (ms *mobyService) Run(ctx context.Context, r containers.ContainerConfig) er
return ms.apiClient.ContainerStart(ctx, create.ID, types.ContainerStartOptions{})
}
func (ms *mobyService) Stop(ctx context.Context, containerName string) error {
timeout := 1 * time.Second
return ms.apiClient.ContainerStop(ctx, containerName, &timeout)
}
func (ms *mobyService) Exec(ctx context.Context, name string, command string, reader io.Reader, writer io.Writer) error {
cec, err := ms.apiClient.ContainerExecCreate(ctx, name, types.ExecConfig{
Cmd: []string{command},

102
server/proxy/containers.go Normal file
View File

@ -0,0 +1,102 @@
package proxy
import (
"context"
"github.com/docker/api/containers"
v1 "github.com/docker/api/containers/v1"
)
// NewContainerAPI creates a proxy container server
func NewContainerAPI() v1.ContainersServer {
return &proxyContainerAPI{}
}
type proxyContainerAPI struct {
}
func portsToGrpc(ports []containers.Port) []*v1.Port {
var result []*v1.Port
for _, port := range ports {
result = append(result, &v1.Port{
ContainerPort: port.ContainerPort,
HostPort: port.HostPort,
HostIp: port.HostIP,
Protocol: port.Protocol,
})
}
return result
}
func (p *proxyContainerAPI) List(ctx context.Context, request *v1.ListRequest) (*v1.ListResponse, error) {
client := Client(ctx)
c, err := client.ContainerService().List(ctx, request.GetAll())
if err != nil {
return &v1.ListResponse{}, nil
}
response := &v1.ListResponse{
Containers: []*v1.Container{},
}
for _, container := range c {
response.Containers = append(response.Containers, &v1.Container{
Id: container.ID,
Image: container.Image,
Command: container.Command,
Status: container.Status,
CpuTime: container.CPUTime,
Labels: container.Labels,
MemoryLimit: container.MemoryLimit,
MemoryUsage: container.MemoryUsage,
PidsCurrent: container.PidsCurrent,
PidsLimit: container.PidsLimit,
Ports: portsToGrpc(container.Ports),
})
}
return response, nil
}
func (p *proxyContainerAPI) Create(ctx context.Context, request *v1.CreateRequest) (*v1.CreateResponse, error) {
client := Client(ctx)
err := client.ContainerService().Run(ctx, containers.ContainerConfig{
ID: request.Id,
Image: request.Image,
})
return &v1.CreateResponse{}, err
}
func (p *proxyContainerAPI) Start(_ context.Context, request *v1.StartRequest) (*v1.StartResponse, error) {
panic("not implemented") // TODO: Implement
}
func (p *proxyContainerAPI) Stop(ctx context.Context, request *v1.StopRequest) (*v1.StopResponse, error) {
c := Client(ctx)
return &v1.StopResponse{}, c.ContainerService().Stop(ctx, request.Id)
}
func (p *proxyContainerAPI) Kill(ctx context.Context, request *v1.KillRequest) (*v1.KillResponse, error) {
c := Client(ctx)
return &v1.KillResponse{}, c.ContainerService().Delete(ctx, request.Id, false)
}
func (p *proxyContainerAPI) Delete(ctx context.Context, request *v1.DeleteRequest) (*v1.DeleteResponse, error) {
err := Client(ctx).ContainerService().Delete(ctx, request.Id, request.Force)
if err != nil {
return &v1.DeleteResponse{}, err
}
return &v1.DeleteResponse{}, nil
}
func (p *proxyContainerAPI) Update(_ context.Context, _ *v1.UpdateRequest) (*v1.UpdateResponse, error) {
panic("not implemented") // TODO: Implement
}
func (p *proxyContainerAPI) Exec(_ context.Context, _ *v1.ExecRequest) (*v1.ExecResponse, error) {
panic("not implemented") // TODO: Implement
}

View File

@ -4,8 +4,6 @@ import (
"context"
"github.com/docker/api/client"
"github.com/docker/api/containers"
v1 "github.com/docker/api/containers/v1"
)
type clientKey struct{}
@ -20,71 +18,3 @@ func Client(ctx context.Context) *client.Client {
c, _ := ctx.Value(clientKey{}).(*client.Client)
return c
}
// NewContainerAPI creates a proxy container server
func NewContainerAPI() v1.ContainersServer {
return &proxyContainerAPI{}
}
type proxyContainerAPI struct{}
func (p *proxyContainerAPI) List(ctx context.Context, _ *v1.ListRequest) (*v1.ListResponse, error) {
client := Client(ctx)
c, err := client.ContainerService().List(ctx)
if err != nil {
return &v1.ListResponse{}, nil
}
response := &v1.ListResponse{
Containers: []*v1.Container{},
}
for _, container := range c {
response.Containers = append(response.Containers, &v1.Container{
Id: container.ID,
Image: container.Image,
})
}
return response, nil
}
func (p *proxyContainerAPI) Create(ctx context.Context, request *v1.CreateRequest) (*v1.CreateResponse, error) {
client := Client(ctx)
err := client.ContainerService().Run(ctx, containers.ContainerConfig{
ID: request.Id,
Image: request.Image,
})
return &v1.CreateResponse{}, err
}
func (p *proxyContainerAPI) Start(_ context.Context, _ *v1.StartRequest) (*v1.StartResponse, error) {
panic("not implemented") // TODO: Implement
}
func (p *proxyContainerAPI) Stop(_ context.Context, _ *v1.StopRequest) (*v1.StopResponse, error) {
panic("not implemented") // TODO: Implement
}
func (p *proxyContainerAPI) Kill(_ context.Context, _ *v1.KillRequest) (*v1.KillResponse, error) {
panic("not implemented") // TODO: Implement
}
func (p *proxyContainerAPI) Delete(ctx context.Context, request *v1.DeleteRequest) (*v1.DeleteResponse, error) {
err := Client(ctx).ContainerService().Delete(ctx, request.Id, request.Force)
if err != nil {
return &v1.DeleteResponse{}, err
}
return &v1.DeleteResponse{}, nil
}
func (p *proxyContainerAPI) Update(_ context.Context, _ *v1.UpdateRequest) (*v1.UpdateResponse, error) {
panic("not implemented") // TODO: Implement
}
func (p *proxyContainerAPI) Exec(_ context.Context, _ *v1.ExecRequest) (*v1.ExecResponse, error) {
panic("not implemented") // TODO: Implement
}

View File

@ -29,7 +29,6 @@ package server
import (
"context"
"errors"
"net"
"strings"
@ -59,7 +58,6 @@ func New() *grpc.Server {
return s
}
//CreateListener creates a listener either on tcp://, or local listener, supporting unix:// for unix socket or npipe:// for named pipes on windows
func CreateListener(address string) (net.Listener, error) {
if strings.HasPrefix(address, "tcp://") {
@ -79,29 +77,33 @@ func stream(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo,
func unaryMeta(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, errors.New("missing metadata")
return handler(ctx, req)
}
key := md[apicontext.Key]
key, ok := md[apicontext.Key]
if !ok {
return handler(ctx, req)
}
if len(key) == 1 {
s, err := store.New()
if err != nil {
return nil, err
}
ctx = store.WithContextStore(ctx, s)
ctx = store.WithContextStore(ctx, s)
ctx = apicontext.WithCurrentContext(ctx, key[0])
c, err := client.New(ctx)
if err != nil {
return nil, err
}
ctx, err = proxy.WithClient(ctx, c)
if err != nil {
return nil, err
}
}
m, err := handler(ctx, req)
return m, err
return handler(ctx, req)
}