mirror of
https://github.com/docker/compose.git
synced 2025-04-08 17:05:13 +02:00
Merge pull request #924 from docker/dependecy_order
This commit is contained in:
commit
b3f406f410
@ -79,8 +79,8 @@ func (s *local) Up(ctx context.Context, project *types.Project, detach bool) err
|
||||
}
|
||||
}
|
||||
|
||||
err := inDependencyOrder(ctx, project, func(service types.ServiceConfig) error {
|
||||
return s.ensureService(ctx, project, service)
|
||||
err := inDependencyOrder(ctx, project, func(c context.Context, service types.ServiceConfig) error {
|
||||
return s.ensureService(c, project, service)
|
||||
})
|
||||
return err
|
||||
}
|
||||
@ -415,10 +415,10 @@ func getContainerCreateOptions(p *types.Project, s types.ServiceConfig, number i
|
||||
MacAddress: s.MacAddress,
|
||||
Labels: labels,
|
||||
StopSignal: s.StopSignal,
|
||||
// Env: s.Environment, FIXME conversion
|
||||
// Healthcheck: s.HealthCheck, FIXME conversion
|
||||
Env: toMobyEnv(s.Environment),
|
||||
Healthcheck: toMobyHealthCheck(s.HealthCheck),
|
||||
// Volumes: // FIXME unclear to me the overlap with HostConfig.Mounts
|
||||
// StopTimeout: s.StopGracePeriod FIXME conversion
|
||||
StopTimeout: toSeconds(s.StopGracePeriod),
|
||||
}
|
||||
|
||||
mountOptions := buildContainerMountOptions(p, s, inherit)
|
||||
|
@ -22,6 +22,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/compose-spec/compose-go/types"
|
||||
moby "github.com/docker/docker/api/types"
|
||||
@ -33,7 +34,17 @@ import (
|
||||
"github.com/docker/compose-cli/progress"
|
||||
)
|
||||
|
||||
const (
|
||||
extLifecycle = "x-lifecycle"
|
||||
forceRecreate = "force_recreate"
|
||||
)
|
||||
|
||||
func (s *local) ensureService(ctx context.Context, project *types.Project, service types.ServiceConfig) error {
|
||||
err := s.waitDependencies(ctx, project, service)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
actual, err := s.containerService.apiClient.ContainerList(ctx, moby.ContainerListOptions{
|
||||
Filters: filters.NewArgs(
|
||||
filters.Arg("label", fmt.Sprintf("%s=%s", projectLabel, project.Name)),
|
||||
@ -80,10 +91,11 @@ func (s *local) ensureService(ctx context.Context, project *types.Project, servi
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, container := range actual {
|
||||
container := container
|
||||
diverged := container.Labels[configHashLabel] != expected
|
||||
if diverged {
|
||||
if diverged || service.Extensions[extLifecycle] == forceRecreate {
|
||||
eg.Go(func() error {
|
||||
return s.recreateContainer(ctx, project, service, container)
|
||||
})
|
||||
@ -102,6 +114,30 @@ func (s *local) ensureService(ctx context.Context, project *types.Project, servi
|
||||
return eg.Wait()
|
||||
}
|
||||
|
||||
func (s *local) waitDependencies(ctx context.Context, project *types.Project, service types.ServiceConfig) error {
|
||||
eg, ctx := errgroup.WithContext(ctx)
|
||||
for dep, config := range service.DependsOn {
|
||||
switch config.Condition {
|
||||
case "service_healthy":
|
||||
eg.Go(func() error {
|
||||
ticker := time.NewTicker(500 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
<-ticker.C
|
||||
healthy, err := s.isServiceHealthy(ctx, project, dep)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if healthy {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
return eg.Wait()
|
||||
}
|
||||
|
||||
func nextContainerNumber(containers []moby.Container) (int, error) {
|
||||
max := 0
|
||||
for _, c := range containers {
|
||||
@ -184,9 +220,23 @@ func (s *local) recreateContainer(ctx context.Context, project *types.Project, s
|
||||
StatusText: "Recreated",
|
||||
Done: true,
|
||||
})
|
||||
setDependentLifecycle(project, service.Name, forceRecreate)
|
||||
return nil
|
||||
}
|
||||
|
||||
// setDependentLifecycle define the Lifecycle strategy for all services to depend on specified service
|
||||
func setDependentLifecycle(project *types.Project, service string, strategy string) {
|
||||
for i, s := range project.Services {
|
||||
if contains(s.GetDependencies(), service) {
|
||||
if s.Extensions == nil {
|
||||
s.Extensions = map[string]interface{}{}
|
||||
}
|
||||
s.Extensions[extLifecycle] = strategy
|
||||
project.Services[i] = s
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *local) restartContainer(ctx context.Context, service types.ServiceConfig, container moby.Container) error {
|
||||
w := progress.ContextWriter(ctx)
|
||||
w.Event(progress.Event{
|
||||
@ -240,3 +290,33 @@ func (s *local) connectContainerToNetwork(ctx context.Context, id string, servic
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *local) isServiceHealthy(ctx context.Context, project *types.Project, service string) (bool, error) {
|
||||
containers, err := s.containerService.apiClient.ContainerList(ctx, moby.ContainerListOptions{
|
||||
Filters: filters.NewArgs(
|
||||
filters.Arg("label", fmt.Sprintf("%s=%s", projectLabel, project.Name)),
|
||||
filters.Arg("label", fmt.Sprintf("%s=%s", serviceLabel, service)),
|
||||
),
|
||||
})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
for _, c := range containers {
|
||||
container, err := s.containerService.apiClient.ContainerInspect(ctx, c.ID)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if container.State == nil || container.State.Health == nil {
|
||||
return false, fmt.Errorf("container for service %q has no healthcheck configured", service)
|
||||
}
|
||||
switch container.State.Health.Status {
|
||||
case "starting":
|
||||
return false, nil
|
||||
case "unhealthy":
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
|
||||
}
|
||||
|
@ -23,7 +23,9 @@ import (
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
compose "github.com/compose-spec/compose-go/types"
|
||||
"github.com/docker/docker/api/types"
|
||||
"github.com/docker/docker/api/types/container"
|
||||
"github.com/docker/go-connections/nat"
|
||||
@ -93,6 +95,57 @@ func toPorts(ports []types.Port) []containers.Port {
|
||||
return result
|
||||
}
|
||||
|
||||
func toMobyEnv(environment compose.MappingWithEquals) []string {
|
||||
var env []string
|
||||
for k, v := range environment {
|
||||
if v == nil {
|
||||
env = append(env, k)
|
||||
} else {
|
||||
env = append(env, fmt.Sprintf("%s=%s", k, *v))
|
||||
}
|
||||
}
|
||||
return env
|
||||
}
|
||||
|
||||
func toMobyHealthCheck(check *compose.HealthCheckConfig) *container.HealthConfig {
|
||||
if check == nil {
|
||||
return nil
|
||||
}
|
||||
var (
|
||||
interval time.Duration
|
||||
timeout time.Duration
|
||||
period time.Duration
|
||||
retries int
|
||||
)
|
||||
if check.Interval != nil {
|
||||
interval = time.Duration(*check.Interval)
|
||||
}
|
||||
if check.Timeout != nil {
|
||||
timeout = time.Duration(*check.Timeout)
|
||||
}
|
||||
if check.StartPeriod != nil {
|
||||
period = time.Duration(*check.StartPeriod)
|
||||
}
|
||||
if check.Retries != nil {
|
||||
retries = int(*check.Retries)
|
||||
}
|
||||
return &container.HealthConfig{
|
||||
Test: check.Test,
|
||||
Interval: interval,
|
||||
Timeout: timeout,
|
||||
StartPeriod: period,
|
||||
Retries: retries,
|
||||
}
|
||||
}
|
||||
|
||||
func toSeconds(d *compose.Duration) *int {
|
||||
if d == nil {
|
||||
return nil
|
||||
}
|
||||
s := int(time.Duration(*d).Seconds())
|
||||
return &s
|
||||
}
|
||||
|
||||
func fromPorts(ports []containers.Port) (map[nat.Port]struct{}, map[nat.Port][]nat.PortBinding, error) {
|
||||
var (
|
||||
exposedPorts = make(map[nat.Port]struct{}, len(ports))
|
||||
|
@ -25,37 +25,89 @@ import (
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
func inDependencyOrder(ctx context.Context, project *types.Project, fn func(types.ServiceConfig) error) error {
|
||||
eg, _ := errgroup.WithContext(ctx)
|
||||
var (
|
||||
scheduled []string
|
||||
ready []string
|
||||
)
|
||||
func inDependencyOrder(ctx context.Context, project *types.Project, fn func(context.Context, types.ServiceConfig) error) error {
|
||||
graph := buildDependencyGraph(project.Services)
|
||||
|
||||
eg, ctx := errgroup.WithContext(ctx)
|
||||
results := make(chan string)
|
||||
for len(ready) < len(project.Services) {
|
||||
for _, service := range project.Services {
|
||||
if contains(scheduled, service.Name) {
|
||||
continue
|
||||
}
|
||||
if containsAll(ready, service.GetDependencies()) {
|
||||
service := service
|
||||
scheduled = append(scheduled, service.Name)
|
||||
eg.Go(func() error {
|
||||
err := fn(service)
|
||||
if err != nil {
|
||||
close(results)
|
||||
return err
|
||||
}
|
||||
results <- service.Name
|
||||
return nil
|
||||
})
|
||||
}
|
||||
errors := make(chan error)
|
||||
for len(graph) > 0 {
|
||||
for _, n := range graph.independents() {
|
||||
service := n.service
|
||||
eg.Go(func() error {
|
||||
err := fn(ctx, service)
|
||||
if err != nil {
|
||||
errors <- err
|
||||
return err
|
||||
}
|
||||
results <- service.Name
|
||||
return nil
|
||||
})
|
||||
}
|
||||
result, ok := <-results
|
||||
if !ok {
|
||||
break
|
||||
select {
|
||||
case result := <-results:
|
||||
graph.resolved(result)
|
||||
case err := <-errors:
|
||||
return err
|
||||
}
|
||||
ready = append(ready, result)
|
||||
}
|
||||
return eg.Wait()
|
||||
}
|
||||
|
||||
type dependencyGraph map[string]node
|
||||
|
||||
type node struct {
|
||||
service types.ServiceConfig
|
||||
dependencies []string
|
||||
dependent []string
|
||||
}
|
||||
|
||||
func (graph dependencyGraph) independents() []node {
|
||||
var nodes []node
|
||||
for _, node := range graph {
|
||||
if len(node.dependencies) == 0 {
|
||||
nodes = append(nodes, node)
|
||||
}
|
||||
}
|
||||
return nodes
|
||||
}
|
||||
|
||||
func (graph dependencyGraph) resolved(result string) {
|
||||
for _, parent := range graph[result].dependent {
|
||||
node := graph[parent]
|
||||
node.dependencies = remove(node.dependencies, result)
|
||||
graph[parent] = node
|
||||
}
|
||||
delete(graph, result)
|
||||
}
|
||||
|
||||
func buildDependencyGraph(services types.Services) dependencyGraph {
|
||||
graph := dependencyGraph{}
|
||||
for _, s := range services {
|
||||
graph[s.Name] = node{
|
||||
service: s,
|
||||
}
|
||||
}
|
||||
|
||||
for _, s := range services {
|
||||
node := graph[s.Name]
|
||||
for _, name := range s.GetDependencies() {
|
||||
dependency := graph[name]
|
||||
node.dependencies = append(node.dependencies, name)
|
||||
dependency.dependent = append(dependency.dependent, s.Name)
|
||||
graph[name] = dependency
|
||||
}
|
||||
graph[s.Name] = node
|
||||
}
|
||||
return graph
|
||||
}
|
||||
|
||||
func remove(slice []string, item string) []string {
|
||||
var s []string
|
||||
for _, i := range slice {
|
||||
if i != item {
|
||||
s = append(s, i)
|
||||
}
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
@ -49,7 +49,7 @@ func TestInDependencyOrder(t *testing.T) {
|
||||
},
|
||||
}
|
||||
//nolint:errcheck, unparam
|
||||
go inDependencyOrder(context.TODO(), &project, func(config types.ServiceConfig) error {
|
||||
go inDependencyOrder(context.TODO(), &project, func(ctx context.Context, config types.ServiceConfig) error {
|
||||
order <- config.Name
|
||||
return nil
|
||||
})
|
||||
|
@ -40,12 +40,3 @@ func contains(slice []string, item string) bool {
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func containsAll(slice []string, items []string) bool {
|
||||
for _, i := range items {
|
||||
if !contains(slice, i) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user