From 48928811df928cf0a7906ff552ca4922e9a39936 Mon Sep 17 00:00:00 2001 From: aiordache Date: Tue, 23 Feb 2021 12:46:55 +0100 Subject: [PATCH] Wait for pods to be running/terminated on compose up/down Signed-off-by: aiordache --- kube/client/client.go | 38 ++++++++++++++++++---------------- kube/client/utils.go | 16 ++++++++------- kube/compose.go | 48 +++++++++++++++++++++++++++++++++++++++---- 3 files changed, 73 insertions(+), 29 deletions(-) diff --git a/kube/client/client.go b/kube/client/client.go index 2a9b75bb2..5af1c6fd9 100644 --- a/kube/client/client.go +++ b/kube/client/client.go @@ -116,17 +116,16 @@ func (kc *KubeClient) GetLogs(ctx context.Context, projectName string, consumer } // WaitForRunningPodState blocks until pods are in running state -func (kc KubeClient) WaitForPodState(ctx context.Context, projectName string, services []string, status string, timeout int) error { - var t time.Duration = 60 - - if timeout > 0 { - t = time.Duration(timeout) * time.Second +func (kc KubeClient) WaitForPodState(ctx context.Context, opts WaitForStatusOptions) error { + var timeout time.Duration = time.Duration(60) * time.Second + if opts.Timeout > 0 { + timeout = time.Duration(opts.Timeout) * time.Second } - selector := fmt.Sprintf("%s=%s", compose.ProjectTag, projectName) + selector := fmt.Sprintf("%s=%s", compose.ProjectTag, opts.ProjectName) waitingForPhase := corev1.PodRunning - switch status { + switch opts.Status { case compose.STARTING: waitingForPhase = corev1.PodPending case compose.UNKNOWN: @@ -135,7 +134,7 @@ func (kc KubeClient) WaitForPodState(ctx context.Context, projectName string, se errch := make(chan error, 1) done := make(chan bool) - + status := opts.Status go func() { for { time.Sleep(500 * time.Millisecond) @@ -147,28 +146,31 @@ func (kc KubeClient) WaitForPodState(ctx context.Context, projectName string, se errch <- err } - servicePods := 0 + servicePods := map[string]string{} stateReached := true for _, pod := range pods.Items { - + service := pod.Labels[compose.ServiceTag] + if opts.Services == nil || utils.StringContains(opts.Services, service) { + servicePods[service] = pod.Status.Message + } if status == compose.REMOVING { - if contains(services, pod.Labels[compose.ServiceTag]) { - servicePods = servicePods + 1 - } continue } if pod.Status.Phase != waitingForPhase { stateReached = false - } } - if status == compose.REMOVING { - if servicePods > 0 { + if len(servicePods) > 0 { stateReached = false } } + if opts.Log != nil { + for p, m := range servicePods { + opts.Log(p, stateReached, m) + } + } if stateReached { done <- true @@ -177,8 +179,8 @@ func (kc KubeClient) WaitForPodState(ctx context.Context, projectName string, se }() select { - case <-time.After(t): - return fmt.Errorf("timeout: pods did not reach expected state.") + case <-time.After(timeout): + return fmt.Errorf("timeout: pods did not reach expected state") case err := <-errch: if err != nil { return err diff --git a/kube/client/utils.go b/kube/client/utils.go index ccc21810b..aad7ff40d 100644 --- a/kube/client/utils.go +++ b/kube/client/utils.go @@ -33,11 +33,13 @@ func podToContainerSummary(pod corev1.Pod) compose.ContainerSummary { } } -func contains(slice []string, item string) bool { - for _, v := range slice { - if v == item { - return true - } - } - return false +type LogFunc func(pod string, stateReached bool, message string) + +// ServiceStatus hold status about a service +type WaitForStatusOptions struct { + ProjectName string + Services []string + Status string + Timeout int + Log LogFunc } diff --git a/kube/compose.go b/kube/compose.go index 02981b3f4..1a17b17d3 100644 --- a/kube/compose.go +++ b/kube/compose.go @@ -92,9 +92,21 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options w.Event(progress.NewEvent(eventName, progress.Done, "")) - eventName = "Wait for pods to be running" + logF := func(pod string, stateReached bool, message string) { + state := progress.Done + if !stateReached { + state = progress.Working + } + w.Event(progress.NewEvent(pod, state, message)) + } - return s.client.WaitForPodState(ctx, project.Name, project.ServiceNames(), compose.RUNNING, 10) + return s.client.WaitForPodState(ctx, client.WaitForStatusOptions{ + ProjectName: project.Name, + Services: project.ServiceNames(), + Status: compose.RUNNING, + Timeout: 60, + Log: logF, + }) } // Down executes the equivalent to a `compose down` @@ -116,9 +128,37 @@ func (s *composeService) Down(ctx context.Context, projectName string, options c w.Event(progress.NewEvent(eventName, progress.Working, message)) } err := s.sdk.Uninstall(projectName, logger) - w.Event(progress.NewEvent(eventName, progress.Done, "")) + if err != nil { + return err + } - return err + events := []string{} + logF := func(pod string, stateReached bool, message string) { + state := progress.Done + if !stateReached { + state = progress.Working + } + w.Event(progress.NewEvent(pod, state, message)) + if !utils.StringContains(events, pod) { + events = append(events, pod) + } + } + + err = s.client.WaitForPodState(ctx, client.WaitForStatusOptions{ + ProjectName: projectName, + Services: nil, + Status: compose.REMOVING, + Timeout: 60, + Log: logF, + }) + if err != nil { + return err + } + for _, e := range events { + w.Event(progress.NewEvent(e, progress.Done, "")) + } + w.Event(progress.NewEvent(eventName, progress.Done, "")) + return nil } // List executes the equivalent to a `docker stack ls`