Add exec command

Signed-off-by: aiordache <anca.iordache@docker.com>
This commit is contained in:
aiordache 2021-04-23 12:54:51 +02:00
parent 7fd3c6f5cb
commit d8db079af3
2 changed files with 108 additions and 28 deletions

View File

@ -31,10 +31,12 @@ import (
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"k8s.io/client-go/tools/portforward" "k8s.io/client-go/tools/portforward"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/client-go/transport/spdy" "k8s.io/client-go/transport/spdy"
) )
@ -43,6 +45,7 @@ type KubeClient struct {
client *kubernetes.Clientset client *kubernetes.Clientset
namespace string namespace string
config *rest.Config config *rest.Config
ioStreams genericclioptions.IOStreams
} }
// NewKubeClient new kubernetes client // NewKubeClient new kubernetes client
@ -54,7 +57,7 @@ func NewKubeClient(config genericclioptions.RESTClientGetter) (*KubeClient, erro
clientset, err := kubernetes.NewForConfig(restConfig) clientset, err := kubernetes.NewForConfig(restConfig)
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("failed creating clientset. Error: %+v", err)
} }
namespace, _, err := config.ToRawKubeConfigLoader().Namespace() namespace, _, err := config.ToRawKubeConfigLoader().Namespace()
@ -66,9 +69,83 @@ func NewKubeClient(config genericclioptions.RESTClientGetter) (*KubeClient, erro
client: clientset, client: clientset,
namespace: namespace, namespace: namespace,
config: restConfig, config: restConfig,
ioStreams: genericclioptions.IOStreams{In: os.Stdin, Out: os.Stdout, ErrOut: os.Stderr},
}, nil }, nil
} }
// GetContainers get containers for a given compose project
func (kc KubeClient) GetPod(ctx context.Context, projectName, serviceName string) (*corev1.Pod, error) {
pods, err := kc.client.CoreV1().Pods(kc.namespace).List(ctx, metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", compose.ProjectTag, projectName),
})
if err != nil {
return nil, err
}
if pods == nil {
return nil, nil
}
var pod corev1.Pod
for _, p := range pods.Items {
service := p.Labels[compose.ServiceTag]
if service == serviceName {
pod = p
break
}
}
return &pod, nil
}
// Exec executes a command in a container
func (kc KubeClient) Exec(ctx context.Context, projectName string, opts compose.RunOptions) error {
pod, err := kc.GetPod(ctx, projectName, opts.Service)
if err != nil || pod == nil {
return err
}
if len(pod.Spec.Containers) == 0 {
return fmt.Errorf("no containers running in pod %s", pod.Name)
}
// get first container in the pod
container := &pod.Spec.Containers[0]
containerName := container.Name
req := kc.client.CoreV1().RESTClient().Post().
Resource("pods").
Name(pod.Name).
Namespace(kc.namespace).
SubResource("exec")
option := &corev1.PodExecOptions{
Container: containerName,
Command: opts.Command,
Stdin: true,
Stdout: true,
Stderr: true,
TTY: opts.Tty,
}
if opts.Reader == nil {
option.Stdin = false
}
scheme := runtime.NewScheme()
if err := corev1.AddToScheme(scheme); err != nil {
return fmt.Errorf("error adding to scheme: %v", err)
}
parameterCodec := runtime.NewParameterCodec(scheme)
req.VersionedParams(option, parameterCodec)
exec, err := remotecommand.NewSPDYExecutor(kc.config, "POST", req.URL())
if err != nil {
return err
}
return exec.Stream(remotecommand.StreamOptions{
Stdin: opts.Reader,
Stdout: opts.Writer,
Stderr: opts.Writer,
Tty: opts.Tty,
})
}
// GetContainers get containers for a given compose project // GetContainers get containers for a given compose project
func (kc KubeClient) GetContainers(ctx context.Context, projectName string, all bool) ([]compose.ContainerSummary, error) { func (kc KubeClient) GetContainers(ctx context.Context, projectName string, all bool) ([]compose.ContainerSummary, error) {
fieldSelector := "" fieldSelector := ""
@ -178,9 +255,13 @@ func (kc KubeClient) MapPorts(ctx context.Context, opts PortMappingOptions) erro
for serviceName, servicePorts := range opts.Services { for serviceName, servicePorts := range opts.Services {
serviceName = serviceName serviceName = serviceName
servicePorts = servicePorts servicePorts = servicePorts
pod, err := kc.GetPod(ctx, opts.ProjectName, serviceName)
if err != nil {
return err
}
eg.Go(func() error { eg.Go(func() error {
req := kc.client.RESTClient().Post().Resource("services").Namespace(kc.namespace).Name(serviceName).SubResource("portforward") req := kc.client.RESTClient().Post().Resource("pods").Namespace(kc.namespace).Name(pod.Name).SubResource("portforward") //fmt.Sprintf("service/%s", serviceName)).SubResource("portforward")
transport, upgrader, err := spdy.RoundTripperFor(kc.config) transport, upgrader, err := spdy.RoundTripperFor(kc.config)
if err != nil { if err != nil {
return err return err

View File

@ -122,8 +122,8 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
w.Event(progress.NewEvent(pod, state, message)) w.Event(progress.NewEvent(pod, state, message))
}, },
}) })
return err //return err
/*
if err != nil { if err != nil {
return err return err
} }
@ -150,7 +150,6 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
}) })
} }
return nil return nil
*/
} }
@ -311,7 +310,7 @@ func (s *composeService) Remove(ctx context.Context, project *types.Project, opt
// Exec executes a command in a running service container // Exec executes a command in a running service container
func (s *composeService) Exec(ctx context.Context, project *types.Project, opts compose.RunOptions) (int, error) { func (s *composeService) Exec(ctx context.Context, project *types.Project, opts compose.RunOptions) (int, error) {
return 0, errdefs.ErrNotImplemented return 0, s.client.Exec(ctx, project.Name, opts)
} }
func (s *composeService) Pause(ctx context.Context, project string, options compose.PauseOptions) error { func (s *composeService) Pause(ctx context.Context, project string, options compose.PauseOptions) error {