diff --git a/kube/client/client.go b/kube/client/client.go index 0b9a241b8..40f7ee736 100644 --- a/kube/client/client.go +++ b/kube/client/client.go @@ -31,10 +31,12 @@ import ( "golang.org/x/sync/errgroup" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/portforward" + "k8s.io/client-go/tools/remotecommand" "k8s.io/client-go/transport/spdy" ) @@ -43,6 +45,7 @@ type KubeClient struct { client *kubernetes.Clientset namespace string config *rest.Config + ioStreams genericclioptions.IOStreams } // NewKubeClient new kubernetes client @@ -54,7 +57,7 @@ func NewKubeClient(config genericclioptions.RESTClientGetter) (*KubeClient, erro clientset, err := kubernetes.NewForConfig(restConfig) if err != nil { - return nil, err + return nil, fmt.Errorf("failed creating clientset. Error: %+v", err) } namespace, _, err := config.ToRawKubeConfigLoader().Namespace() @@ -66,9 +69,83 @@ func NewKubeClient(config genericclioptions.RESTClientGetter) (*KubeClient, erro client: clientset, namespace: namespace, config: restConfig, + ioStreams: genericclioptions.IOStreams{In: os.Stdin, Out: os.Stdout, ErrOut: os.Stderr}, }, nil } +// GetContainers get containers for a given compose project +func (kc KubeClient) GetPod(ctx context.Context, projectName, serviceName string) (*corev1.Pod, error) { + pods, err := kc.client.CoreV1().Pods(kc.namespace).List(ctx, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s", compose.ProjectTag, projectName), + }) + if err != nil { + return nil, err + } + if pods == nil { + return nil, nil + } + var pod corev1.Pod + for _, p := range pods.Items { + service := p.Labels[compose.ServiceTag] + if service == serviceName { + pod = p + break + } + } + return &pod, nil +} + +// Exec executes a command in a container +func (kc KubeClient) Exec(ctx context.Context, projectName string, opts compose.RunOptions) error { + pod, err := kc.GetPod(ctx, projectName, opts.Service) + if err != nil || pod == nil { + return err + } + if len(pod.Spec.Containers) == 0 { + return fmt.Errorf("no containers running in pod %s", pod.Name) + } + // get first container in the pod + container := &pod.Spec.Containers[0] + containerName := container.Name + + req := kc.client.CoreV1().RESTClient().Post(). + Resource("pods"). + Name(pod.Name). + Namespace(kc.namespace). + SubResource("exec") + + option := &corev1.PodExecOptions{ + Container: containerName, + Command: opts.Command, + Stdin: true, + Stdout: true, + Stderr: true, + TTY: opts.Tty, + } + + if opts.Reader == nil { + option.Stdin = false + } + + scheme := runtime.NewScheme() + if err := corev1.AddToScheme(scheme); err != nil { + return fmt.Errorf("error adding to scheme: %v", err) + } + parameterCodec := runtime.NewParameterCodec(scheme) + req.VersionedParams(option, parameterCodec) + + exec, err := remotecommand.NewSPDYExecutor(kc.config, "POST", req.URL()) + if err != nil { + return err + } + return exec.Stream(remotecommand.StreamOptions{ + Stdin: opts.Reader, + Stdout: opts.Writer, + Stderr: opts.Writer, + Tty: opts.Tty, + }) +} + // GetContainers get containers for a given compose project func (kc KubeClient) GetContainers(ctx context.Context, projectName string, all bool) ([]compose.ContainerSummary, error) { fieldSelector := "" @@ -178,9 +255,13 @@ func (kc KubeClient) MapPorts(ctx context.Context, opts PortMappingOptions) erro for serviceName, servicePorts := range opts.Services { serviceName = serviceName servicePorts = servicePorts + pod, err := kc.GetPod(ctx, opts.ProjectName, serviceName) + if err != nil { + return err + } eg.Go(func() error { - req := kc.client.RESTClient().Post().Resource("services").Namespace(kc.namespace).Name(serviceName).SubResource("portforward") + req := kc.client.RESTClient().Post().Resource("pods").Namespace(kc.namespace).Name(pod.Name).SubResource("portforward") //fmt.Sprintf("service/%s", serviceName)).SubResource("portforward") transport, upgrader, err := spdy.RoundTripperFor(kc.config) if err != nil { return err diff --git a/kube/compose.go b/kube/compose.go index c3c5974de..61fe57e3b 100644 --- a/kube/compose.go +++ b/kube/compose.go @@ -122,35 +122,34 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options w.Event(progress.NewEvent(pod, state, message)) }, }) - return err - /* - if err != nil { - return err - } + //return err - // check if there is a port mapping - services := map[string]client.Ports{} + if err != nil { + return err + } - for _, s := range project.Services { - if len(s.Ports) > 0 { - services[s.Name] = client.Ports{} - for _, p := range s.Ports { - services[s.Name] = append(services[s.Name], compose.PortPublisher{ - TargetPort: int(p.Target), - PublishedPort: int(p.Published), - Protocol: p.Protocol, - }) - } + // check if there is a port mapping + services := map[string]client.Ports{} + + for _, s := range project.Services { + if len(s.Ports) > 0 { + services[s.Name] = client.Ports{} + for _, p := range s.Ports { + services[s.Name] = append(services[s.Name], compose.PortPublisher{ + TargetPort: int(p.Target), + PublishedPort: int(p.Published), + Protocol: p.Protocol, + }) } } - if len(services) > 0 { - return s.client.MapPorts(ctx, client.PortMappingOptions{ - ProjectName: project.Name, - Services: services, - }) - } - return nil - */ + } + if len(services) > 0 { + return s.client.MapPorts(ctx, client.PortMappingOptions{ + ProjectName: project.Name, + Services: services, + }) + } + return nil } @@ -311,7 +310,7 @@ func (s *composeService) Remove(ctx context.Context, project *types.Project, opt // Exec executes a command in a running service container func (s *composeService) Exec(ctx context.Context, project *types.Project, opts compose.RunOptions) (int, error) { - return 0, errdefs.ErrNotImplemented + return 0, s.client.Exec(ctx, project.Name, opts) } func (s *composeService) Pause(ctx context.Context, project string, options compose.PauseOptions) error {