mirror of https://github.com/docker/compose.git
Run convergence in service dependency order
Signed-off-by: Nicolas De Loof <nicolas.deloof@gmail.com>
This commit is contained in:
parent
5547204f75
commit
adb62e9080
|
@ -79,8 +79,8 @@ func (s *local) Up(ctx context.Context, project *types.Project, detach bool) err
|
|||
}
|
||||
}
|
||||
|
||||
err := inDependencyOrder(ctx, project, func(service types.ServiceConfig) error {
|
||||
return s.ensureService(ctx, project, service)
|
||||
err := inDependencyOrder(ctx, project, func(c context.Context, service types.ServiceConfig) error {
|
||||
return s.ensureService(c, project, service)
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -33,6 +33,11 @@ import (
|
|||
"github.com/docker/compose-cli/progress"
|
||||
)
|
||||
|
||||
const (
|
||||
extLifecycle = "x-lifecycle"
|
||||
forceRecreate = "force_recreate"
|
||||
)
|
||||
|
||||
func (s *local) ensureService(ctx context.Context, project *types.Project, service types.ServiceConfig) error {
|
||||
actual, err := s.containerService.apiClient.ContainerList(ctx, moby.ContainerListOptions{
|
||||
Filters: filters.NewArgs(
|
||||
|
@ -80,10 +85,11 @@ func (s *local) ensureService(ctx context.Context, project *types.Project, servi
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, container := range actual {
|
||||
container := container
|
||||
diverged := container.Labels[configHashLabel] != expected
|
||||
if diverged {
|
||||
if diverged || service.Extensions[extLifecycle] == forceRecreate {
|
||||
eg.Go(func() error {
|
||||
return s.recreateContainer(ctx, project, service, container)
|
||||
})
|
||||
|
@ -184,9 +190,20 @@ func (s *local) recreateContainer(ctx context.Context, project *types.Project, s
|
|||
StatusText: "Recreated",
|
||||
Done: true,
|
||||
})
|
||||
setDependentLifecycle(project, service.Name, forceRecreate)
|
||||
return nil
|
||||
}
|
||||
|
||||
// setDependentLifecycle define the Lifecycle strategy for all services to depend on specified service
|
||||
func setDependentLifecycle(project *types.Project, service string, strategy string) {
|
||||
for i, s := range project.Services {
|
||||
if contains(s.GetDependencies(), service) {
|
||||
s.Extensions[extLifecycle] = strategy
|
||||
project.Services[i] = s
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *local) restartContainer(ctx context.Context, service types.ServiceConfig, container moby.Container) error {
|
||||
w := progress.ContextWriter(ctx)
|
||||
w.Event(progress.Event{
|
||||
|
|
|
@ -25,15 +25,18 @@ import (
|
|||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
func inDependencyOrder(ctx context.Context, project *types.Project, fn func(types.ServiceConfig) error) error {
|
||||
eg, _ := errgroup.WithContext(ctx)
|
||||
func inDependencyOrder(ctx context.Context, project *types.Project, fn func(context.Context, types.ServiceConfig) error) error {
|
||||
var (
|
||||
scheduled []string
|
||||
ready []string
|
||||
)
|
||||
services := sortByDependency(project.Services)
|
||||
|
||||
results := make(chan string)
|
||||
for len(ready) < len(project.Services) {
|
||||
for _, service := range project.Services {
|
||||
errors := make(chan error)
|
||||
eg, ctx := errgroup.WithContext(ctx)
|
||||
for len(ready) < len(services) {
|
||||
for _, service := range services {
|
||||
if contains(scheduled, service.Name) {
|
||||
continue
|
||||
}
|
||||
|
@ -41,9 +44,9 @@ func inDependencyOrder(ctx context.Context, project *types.Project, fn func(type
|
|||
service := service
|
||||
scheduled = append(scheduled, service.Name)
|
||||
eg.Go(func() error {
|
||||
err := fn(service)
|
||||
err := fn(ctx, service)
|
||||
if err != nil {
|
||||
close(results)
|
||||
errors <- err
|
||||
return err
|
||||
}
|
||||
results <- service.Name
|
||||
|
@ -51,11 +54,30 @@ func inDependencyOrder(ctx context.Context, project *types.Project, fn func(type
|
|||
})
|
||||
}
|
||||
}
|
||||
result, ok := <-results
|
||||
if !ok {
|
||||
break
|
||||
select {
|
||||
case result := <-results:
|
||||
ready = append(ready, result)
|
||||
case err := <-errors:
|
||||
return err
|
||||
}
|
||||
ready = append(ready, result)
|
||||
}
|
||||
return eg.Wait()
|
||||
}
|
||||
|
||||
// sortByDependency sort a Service slice so it can be processed in respect to dependency ordering
|
||||
func sortByDependency(services types.Services) types.Services {
|
||||
var sorted types.Services
|
||||
var done []string
|
||||
for len(sorted) < len(services) {
|
||||
for _, s := range services {
|
||||
if contains(done, s.Name) {
|
||||
continue
|
||||
}
|
||||
if containsAll(done, s.GetDependencies()) {
|
||||
sorted = append(sorted, s)
|
||||
done = append(done, s.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
return sorted
|
||||
}
|
||||
|
|
|
@ -49,7 +49,7 @@ func TestInDependencyOrder(t *testing.T) {
|
|||
},
|
||||
}
|
||||
//nolint:errcheck, unparam
|
||||
go inDependencyOrder(context.TODO(), &project, func(config types.ServiceConfig) error {
|
||||
go inDependencyOrder(context.TODO(), &project, func(ctx context.Context, config types.ServiceConfig) error {
|
||||
order <- config.Name
|
||||
return nil
|
||||
})
|
||||
|
|
Loading…
Reference in New Issue