Merge pull request #1379 from aiordache/kube_hack

Kube backend updates from Hack days
This commit is contained in:
Anca Iordache 2021-03-04 18:17:06 +01:00 committed by GitHub
commit b3025ca4fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 272 additions and 40 deletions

View File

@ -116,7 +116,7 @@ func runConvert(ctx context.Context, opts convertOptions, services []string) err
}
var out io.Writer = os.Stdout
if opts.Output != "" {
if opts.Output != "" && len(json) > 0 {
file, err := os.Create(opts.Output)
if err != nil {
return err

View File

@ -22,6 +22,7 @@ import (
"context"
"fmt"
"io"
"time"
"github.com/docker/compose-cli/api/compose"
"github.com/docker/compose-cli/utils"
@ -83,16 +84,6 @@ func (kc KubeClient) GetContainers(ctx context.Context, projectName string, all
return result, nil
}
func podToContainerSummary(pod corev1.Pod) compose.ContainerSummary {
return compose.ContainerSummary{
ID: pod.GetObjectMeta().GetName(),
Name: pod.GetObjectMeta().GetName(),
Service: pod.GetObjectMeta().GetLabels()[compose.ServiceTag],
State: string(pod.Status.Phase),
Project: pod.GetObjectMeta().GetLabels()[compose.ProjectTag],
}
}
// GetLogs retrieves pod logs
func (kc *KubeClient) GetLogs(ctx context.Context, projectName string, consumer compose.LogConsumer, follow bool) error {
pods, err := kc.client.CoreV1().Pods(kc.namespace).List(ctx, metav1.ListOptions{
@ -111,13 +102,62 @@ func (kc *KubeClient) GetLogs(ctx context.Context, projectName string, consumer
eg.Go(func() error {
r, err := request.Stream(ctx)
defer r.Close() // nolint errcheck
if err != nil {
return err
}
defer r.Close() // nolint errcheck
_, err = io.Copy(w, r)
return err
})
}
return eg.Wait()
}
// WaitForPodState blocks until pods reach desired state
func (kc KubeClient) WaitForPodState(ctx context.Context, opts WaitForStatusOptions) error {
var timeout time.Duration = time.Minute
if opts.Timeout != nil {
timeout = *opts.Timeout
}
errch := make(chan error, 1)
done := make(chan bool)
go func() {
for {
time.Sleep(500 * time.Millisecond)
pods, err := kc.client.CoreV1().Pods(kc.namespace).List(ctx, metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", compose.ProjectTag, opts.ProjectName),
})
if err != nil {
errch <- err
}
stateReached, servicePods, err := checkPodsState(opts.Services, pods.Items, opts.Status)
if err != nil {
errch <- err
}
if opts.Log != nil {
for p, m := range servicePods {
opts.Log(p, stateReached, m)
}
}
if stateReached {
done <- true
}
}
}()
select {
case <-time.After(timeout):
return fmt.Errorf("timeout: pods did not reach expected state")
case err := <-errch:
if err != nil {
return err
}
case <-done:
return nil
}
return nil
}

77
kube/client/utils.go Normal file
View File

@ -0,0 +1,77 @@
// +build kube
/*
Copyright 2020 Docker Compose CLI authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package client
import (
"fmt"
"time"
"github.com/docker/compose-cli/api/compose"
"github.com/docker/compose-cli/utils"
corev1 "k8s.io/api/core/v1"
)
func podToContainerSummary(pod corev1.Pod) compose.ContainerSummary {
return compose.ContainerSummary{
ID: pod.GetObjectMeta().GetName(),
Name: pod.GetObjectMeta().GetName(),
Service: pod.GetObjectMeta().GetLabels()[compose.ServiceTag],
State: string(pod.Status.Phase),
Project: pod.GetObjectMeta().GetLabels()[compose.ProjectTag],
}
}
func checkPodsState(services []string, pods []corev1.Pod, status string) (bool, map[string]string, error) {
servicePods := map[string]string{}
stateReached := true
for _, pod := range pods {
service := pod.Labels[compose.ServiceTag]
if len(services) > 0 && !utils.StringContains(services, service) {
continue
}
servicePods[service] = pod.Status.Message
if status == compose.REMOVING {
continue
}
if pod.Status.Phase == corev1.PodFailed {
return false, servicePods, fmt.Errorf(pod.Status.Reason)
}
if status == compose.RUNNING && pod.Status.Phase != corev1.PodRunning {
stateReached = false
}
}
if status == compose.REMOVING && len(servicePods) > 0 {
stateReached = false
}
return stateReached, servicePods, nil
}
// LogFunc defines a custom logger function (progress writer events)
type LogFunc func(pod string, stateReached bool, message string)
// WaitForStatusOptions hold the state pods should reach
type WaitForStatusOptions struct {
ProjectName string
Services []string
Status string
Timeout *time.Duration
Log LogFunc
}

View File

@ -89,9 +89,23 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
message := fmt.Sprintf(format, v...)
w.Event(progress.NewEvent(eventName, progress.Done, message))
})
if err != nil {
return err
}
w.Event(progress.NewEvent(eventName, progress.Done, ""))
return err
return s.client.WaitForPodState(ctx, client.WaitForStatusOptions{
ProjectName: project.Name,
Services: project.ServiceNames(),
Status: compose.RUNNING,
Log: func(pod string, stateReached bool, message string) {
state := progress.Done
if !stateReached {
state = progress.Working
}
w.Event(progress.NewEvent(pod, state, message))
},
})
}
// Down executes the equivalent to a `compose down`
@ -113,9 +127,35 @@ func (s *composeService) Down(ctx context.Context, projectName string, options c
w.Event(progress.NewEvent(eventName, progress.Working, message))
}
err := s.sdk.Uninstall(projectName, logger)
w.Event(progress.NewEvent(eventName, progress.Done, ""))
if err != nil {
return err
}
return err
events := []string{}
err = s.client.WaitForPodState(ctx, client.WaitForStatusOptions{
ProjectName: projectName,
Services: nil,
Status: compose.REMOVING,
Timeout: options.Timeout,
Log: func(pod string, stateReached bool, message string) {
state := progress.Done
if !stateReached {
state = progress.Working
}
w.Event(progress.NewEvent(pod, state, message))
if !utils.StringContains(events, pod) {
events = append(events, pod)
}
},
})
if err != nil {
return err
}
for _, e := range events {
w.Event(progress.NewEvent(e, progress.Done, ""))
}
w.Event(progress.NewEvent(eventName, progress.Done, ""))
return nil
}
// List executes the equivalent to a `docker stack ls`
@ -175,8 +215,8 @@ func (s *composeService) Convert(ctx context.Context, project *types.Project, op
}
if options.Output != "" {
fullpath, err := helm.SaveChart(chart, options.Output)
return []byte(fullpath), err
_, err := helm.SaveChart(chart, options.Output)
return nil, err
}
buff := []byte{}

View File

@ -83,7 +83,7 @@ func TestComposeUp(t *testing.T) {
getServiceRegx := func(service string) string {
// match output with random hash / spaces like:
// db-698f4dd798-jd9gw db Running
return fmt.Sprintf("%s-.*\\s+%s\\s+Pending\\s+", service, service)
return fmt.Sprintf("%s-.*\\s+%s\\s+Running\\s+", service, service)
}
res := c.RunDockerCmd("compose", "-p", projectName, "ps", "--all")
testify.Regexp(t, getServiceRegx("db"), res.Stdout())
@ -93,10 +93,11 @@ func TestComposeUp(t *testing.T) {
assert.Equal(t, len(Lines(res.Stdout())), 4, res.Stdout())
})
t.Run("compose ps hides non running containers", func(t *testing.T) {
// to be revisited
/*t.Run("compose ps hides non running containers", func(t *testing.T) {
res := c.RunDockerCmd("compose", "-p", projectName, "ps")
assert.Equal(t, len(Lines(res.Stdout())), 1, res.Stdout())
})
})*/
t.Run("check running project", func(t *testing.T) {
// Docker Desktop kube cluster automatically exposes ports on the host, this is not the case with kind on Desktop,

View File

@ -42,6 +42,17 @@ const (
func MapToKubernetesObjects(project *types.Project) (map[string]runtime.Object, error) {
objects := map[string]runtime.Object{}
secrets, err := toSecretSpecs(project)
if err != nil {
return nil, err
}
if len(secrets) > 0 {
for _, secret := range secrets {
name := secret.Name[len(project.Name)+1:]
objects[fmt.Sprintf("%s-secret.yaml", name)] = &secret
}
}
for _, service := range project.Services {
svcObject := mapToService(project, service)
if svcObject != nil {

58
kube/resources/secrets.go Normal file
View File

@ -0,0 +1,58 @@
// +build kube
/*
Copyright 2020 Docker Compose CLI authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resources
import (
"io/ioutil"
"strings"
"github.com/compose-spec/compose-go/types"
corev1 "k8s.io/api/core/v1"
)
func toSecretSpecs(project *types.Project) ([]corev1.Secret, error) {
var secrets []corev1.Secret
for _, s := range project.Secrets {
if s.External.External {
continue
}
name := strings.ReplaceAll(s.Name, "_", "-")
// load secret file content
sensitiveData, err := ioutil.ReadFile(s.File)
if err != nil {
return nil, err
}
readOnly := true
secret := corev1.Secret{}
secret.SetGroupVersionKind(corev1.SchemeGroupVersion.WithKind("Secret"))
secret.Name = name
secret.Type = "compose"
secret.Data = map[string][]byte{
name: sensitiveData,
}
secret.Immutable = &readOnly
secrets = append(secrets, secret)
}
return secrets, nil
}

View File

@ -84,17 +84,11 @@ func toVolumeSpecs(project *types.Project, s types.ServiceConfig) ([]volumeSpec,
})
}
for i, s := range s.Secrets {
name := fmt.Sprintf("secret-%d", i)
for _, s := range s.Secrets {
name := fmt.Sprintf("%s-%s", project.Name, s.Source)
target := path.Join("/run/secrets", or(s.Target, path.Join(s.Source, s.Source)))
target := path.Join("/run/secrets", or(s.Target, s.Source))
subPath := name
readOnly := true
specs = append(specs, volumeSpec{
source: secretVolume(s, project.Secrets[name], subPath),
mount: volumeMount(name, target, readOnly, subPath),
})
specs = append(specs, secretMount(name, target))
}
for i, c := range s.Configs {
@ -178,18 +172,29 @@ func defaultMode(mode *uint32) *int32 {
return defaultMode
}
func secretVolume(config types.ServiceSecretConfig, topLevelConfig types.SecretConfig, subPath string) *apiv1.VolumeSource {
return &apiv1.VolumeSource{
Secret: &apiv1.SecretVolumeSource{
SecretName: config.Source,
Items: []apiv1.KeyToPath{
{
Key: toKey(topLevelConfig.File),
Path: subPath,
Mode: defaultMode(config.Mode),
func secretMount(name, target string) volumeSpec {
readOnly := true
filename := filepath.Base(target)
dir := filepath.Dir(target)
return volumeSpec{
source: &apiv1.VolumeSource{
Secret: &apiv1.SecretVolumeSource{
SecretName: name,
Items: []apiv1.KeyToPath{
{
Key: name,
Path: filename,
},
},
},
},
mount: apiv1.VolumeMount{
Name: filename,
MountPath: dir,
ReadOnly: readOnly,
},
}
}