diff --git a/pkg/compose/dependencies.go b/pkg/compose/dependencies.go index 8a146ebf1..dc09fe0d8 100644 --- a/pkg/compose/dependencies.go +++ b/pkg/compose/dependencies.go @@ -97,19 +97,37 @@ func InReverseDependencyOrder(ctx context.Context, project *types.Project, fn fu } func (t *graphTraversal) visit(ctx context.Context, g *Graph) error { - nodes := t.extremityNodesFn(g) + expect := len(g.Vertices) + if expect == 0 { + return nil + } eg, ctx := errgroup.WithContext(ctx) if t.maxConcurrency > 0 { - eg.SetLimit(t.maxConcurrency) + eg.SetLimit(t.maxConcurrency + 1) } - t.run(ctx, g, eg, nodes) + nodeCh := make(chan *Vertex) + eg.Go(func() error { + for node := range nodeCh { + expect-- + if expect == 0 { + close(nodeCh) + return nil + } + t.run(ctx, g, eg, t.adjacentNodesFn(node), nodeCh) + } + return nil + }) - return eg.Wait() + nodes := t.extremityNodesFn(g) + t.run(ctx, g, eg, nodes, nodeCh) + + err := eg.Wait() + return err } // Note: this could be `graph.walk` or whatever -func (t *graphTraversal) run(ctx context.Context, graph *Graph, eg *errgroup.Group, nodes []*Vertex) { +func (t *graphTraversal) run(ctx context.Context, graph *Graph, eg *errgroup.Group, nodes []*Vertex, nodeCh chan *Vertex) { for _, node := range nodes { // Don't start this service yet if all of its children have // not been started yet. @@ -125,14 +143,11 @@ func (t *graphTraversal) run(ctx context.Context, graph *Graph, eg *errgroup.Gro eg.Go(func() error { err := t.visitorFn(ctx, node.Service) - if err != nil { - return err + if err == nil { + graph.UpdateStatus(node.Key, t.targetServiceStatus) } - - graph.UpdateStatus(node.Key, t.targetServiceStatus) - - t.run(ctx, graph, eg, t.adjacentNodesFn(node)) - return nil + nodeCh <- node + return err }) } }