Merge pull request #1901 from mat007/fix-races

Fix races
This commit is contained in:
Ulysses Souza 2021-07-07 11:11:11 -03:00 committed by GitHub
commit ed0b123b75
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 50 additions and 39 deletions

View File

@ -285,8 +285,7 @@ func (kc KubeClient) MapPortsToLocalhost(ctx context.Context, opts PortMappingOp
eg, ctx := errgroup.WithContext(ctx)
for serviceName, servicePorts := range opts.Services {
serviceName := serviceName
servicePorts := servicePorts
serviceName, servicePorts := serviceName, servicePorts
pod, err := kc.GetPod(ctx, opts.ProjectName, serviceName)
if err != nil {
return err

View File

@ -55,6 +55,19 @@ const (
type convergence struct {
service *composeService
observedState map[string]Containers
stateMutex sync.Mutex
}
func (c *convergence) getObservedState(serviceName string) Containers {
c.stateMutex.Lock()
defer c.stateMutex.Unlock()
return c.observedState[serviceName]
}
func (c *convergence) setObservedState(serviceName string, containers Containers) {
c.stateMutex.Lock()
defer c.stateMutex.Unlock()
c.observedState[serviceName] = containers
}
func newConvergence(services []string, state Containers, s *composeService) *convergence {
@ -97,7 +110,7 @@ var mu sync.Mutex
// updateProject updates project after service converged, so dependent services relying on `service:xx` can refer to actual containers.
func (c *convergence) updateProject(project *types.Project, service string) {
containers := c.observedState[service]
containers := c.getObservedState(service)
container := containers[0]
// operation is protected by a Mutex so that we can safely update project.Services while running concurrent convergence on services
@ -148,7 +161,7 @@ func (c *convergence) ensureService(ctx context.Context, project *types.Project,
if err != nil {
return err
}
containers := c.observedState[service.Name]
containers := c.getObservedState(service.Name)
actual := len(containers)
updated := make(Containers, expected)
@ -157,6 +170,7 @@ func (c *convergence) ensureService(ctx context.Context, project *types.Project,
for i, container := range containers {
if i > expected {
// Scale Down
container := container
eg.Go(func() error {
err := c.service.apiClient.ContainerStop(ctx, container.ID, timeout)
if err != nil {
@ -178,7 +192,7 @@ func (c *convergence) ensureService(ctx context.Context, project *types.Project,
name := getContainerProgressName(container)
diverged := container.Labels[api.ConfigHashLabel] != configHash
if diverged || recreate == api.RecreateForce || service.Extensions[extLifecycle] == forceRecreate {
i := i
i, container := i, container
eg.Go(func() error {
recreated, err := c.service.recreateContainer(ctx, project, service, container, inherit, timeout)
updated[i] = recreated
@ -197,6 +211,7 @@ func (c *convergence) ensureService(ctx context.Context, project *types.Project,
case ContainerExited:
w.Event(progress.CreatedEvent(name))
default:
container := container
eg.Go(func() error {
return c.service.startContainer(ctx, container)
})
@ -212,16 +227,17 @@ func (c *convergence) ensureService(ctx context.Context, project *types.Project,
// Scale UP
number := next + i
name := getContainerName(project.Name, service, number)
i := i
eg.Go(func() error {
container, err := c.service.createContainer(ctx, project, service, name, number, false, true)
updated[actual+i-1] = container
updated[actual+i] = container
return err
})
continue
}
err = eg.Wait()
c.observedState[service.Name] = updated
c.setObservedState(service.Name, updated)
return err
}
@ -542,11 +558,11 @@ func (s *composeService) startService(ctx context.Context, project *types.Projec
w := progress.ContextWriter(ctx)
eg, ctx := errgroup.WithContext(ctx)
for _, c := range containers {
container := c
for _, container := range containers {
if container.State == ContainerRunning {
continue
}
container := container
eg.Go(func() error {
eventName := getContainerProgressName(container)
w.Event(progress.StartingEvent(eventName))

View File

@ -81,9 +81,8 @@ func (s *composeService) Copy(ctx context.Context, project *types.Project, opts
}
g := errgroup.Group{}
for i := range containers {
containerID := containers[i].ID
for _, container := range containers {
containerID := container.ID
g.Go(func() error {
switch direction {
case fromService:

View File

@ -91,22 +91,22 @@ func visit(ctx context.Context, project *types.Project, traversalConfig graphTra
// Note: this could be `graph.walk` or whatever
func run(ctx context.Context, graph *Graph, eg *errgroup.Group, nodes []*Vertex, traversalConfig graphTraversalConfig, fn func(context.Context, string) error) error {
for _, node := range nodes {
n := node
// Don't start this service yet if all of its children have
// not been started yet.
if len(traversalConfig.filterAdjacentByStatusFn(graph, n.Service, traversalConfig.adjacentServiceStatusToSkip)) != 0 {
if len(traversalConfig.filterAdjacentByStatusFn(graph, node.Service, traversalConfig.adjacentServiceStatusToSkip)) != 0 {
continue
}
node := node
eg.Go(func() error {
err := fn(ctx, n.Service)
err := fn(ctx, node.Service)
if err != nil {
return err
}
graph.UpdateStatus(n.Service, traversalConfig.targetServiceStatus)
graph.UpdateStatus(node.Service, traversalConfig.targetServiceStatus)
return run(ctx, graph, eg, traversalConfig.adjacentNodesFn(n), traversalConfig, fn)
return run(ctx, graph, eg, traversalConfig.adjacentNodesFn(node), traversalConfig, fn)
})
}

View File

@ -217,17 +217,17 @@ func (s *composeService) stopContainers(ctx context.Context, w progress.Writer,
func (s *composeService) removeContainers(ctx context.Context, w progress.Writer, containers []moby.Container, timeout *time.Duration, volumes bool) error {
eg, _ := errgroup.WithContext(ctx)
for _, container := range containers {
toDelete := container
container := container
eg.Go(func() error {
eventName := getContainerProgressName(toDelete)
eventName := getContainerProgressName(container)
w.Event(progress.StoppingEvent(eventName))
err := s.stopContainers(ctx, w, []moby.Container{toDelete}, timeout)
err := s.stopContainers(ctx, w, []moby.Container{container}, timeout)
if err != nil {
w.Event(progress.ErrorMessageEvent(eventName, "Error while Stopping"))
return err
}
w.Event(progress.RemovingEvent(eventName))
err = s.apiClient.ContainerRemove(ctx, toDelete.ID, moby.ContainerRemoveOptions{
err = s.apiClient.ContainerRemove(ctx, container.ID, moby.ContainerRemoveOptions{
Force: true,
RemoveVolumes: volumes,
})

View File

@ -29,20 +29,20 @@ import (
)
func (s *composeService) Logs(ctx context.Context, projectName string, consumer api.LogConsumer, options api.LogOptions) error {
list, err := s.getContainers(ctx, projectName, oneOffExclude, true, options.Services...)
containers, err := s.getContainers(ctx, projectName, oneOffExclude, true, options.Services...)
if err != nil {
return err
}
eg, ctx := errgroup.WithContext(ctx)
for _, c := range list {
c := c
for _, c := range containers {
service := c.Labels[api.ServiceLabel]
container, err := s.apiClient.ContainerInspect(ctx, c.ID)
if err != nil {
return err
}
name := getContainerNameWithoutProject(c)
eg.Go(func() error {
r, err := s.apiClient.ContainerLogs(ctx, container.ID, types.ContainerLogsOptions{
ShowStdout: true,
@ -58,7 +58,6 @@ func (s *composeService) Logs(ctx context.Context, projectName string, consumer
}
defer r.Close() // nolint errcheck
name := getContainerNameWithoutProject(c)
w := utils.GetWriter(func(line string) {
consumer.Log(name, service, line)
})

View File

@ -38,9 +38,8 @@ func (s *composeService) Ps(ctx context.Context, projectName string, options api
summary := make([]api.ContainerSummary, len(containers))
eg, ctx := errgroup.WithContext(ctx)
for i, c := range containers {
container := c
i := i
for i, container := range containers {
i, container := i, container
eg.Go(func() error {
var publishers []api.PortPublisher
sort.Slice(container.Ports, func(i, j int) bool {

View File

@ -58,8 +58,8 @@ func (s *composeService) pull(ctx context.Context, project *types.Project, opts
w := progress.ContextWriter(ctx)
eg, ctx := errgroup.WithContext(ctx)
for _, srv := range project.Services {
service := srv
for _, service := range project.Services {
service := service
if service.Image == "" {
w.Event(progress.Event{
ID: service.Name,

View File

@ -74,12 +74,12 @@ func (s *composeService) Remove(ctx context.Context, project *types.Project, opt
func (s *composeService) remove(ctx context.Context, containers Containers, options api.RemoveOptions) error {
w := progress.ContextWriter(ctx)
eg, ctx := errgroup.WithContext(ctx)
for _, c := range containers {
c := c
for _, container := range containers {
container := container
eg.Go(func() error {
eventName := getContainerProgressName(c)
eventName := getContainerProgressName(container)
w.Event(progress.RemovingEvent(eventName))
err := s.apiClient.ContainerRemove(ctx, c.ID, moby.ContainerRemoveOptions{
err := s.apiClient.ContainerRemove(ctx, container.ID, moby.ContainerRemoveOptions{
RemoveVolumes: options.Volumes,
Force: options.Force,
})

View File

@ -49,8 +49,8 @@ func (s *composeService) restart(ctx context.Context, project *types.Project, op
return nil
}
eg, ctx := errgroup.WithContext(ctx)
for _, c := range observedState.filter(isService(service)) {
container := c
for _, container := range observedState.filter(isService(service)) {
container := container
eg.Go(func() error {
eventName := getContainerProgressName(container)
w.Event(progress.RestartingEvent(eventName))

View File

@ -34,9 +34,8 @@ func (s *composeService) Top(ctx context.Context, projectName string, services [
}
summary := make([]api.ContainerProcSummary, len(containers))
eg, ctx := errgroup.WithContext(ctx)
for i, c := range containers {
container := c
i := i
for i, container := range containers {
i, container := i, container
eg.Go(func() error {
topContent, err := s.apiClient.ContainerTop(ctx, container.ID, []string{})
if err != nil {