mirror of https://github.com/docker/compose.git
Serialize access to observed state
This should fix concurrent accesses to the map. Signed-off-by: Mathieu Champlon <mathieu.champlon@docker.com>
This commit is contained in:
parent
918fe00f3e
commit
9317ffc0ab
|
@ -55,6 +55,19 @@ const (
|
||||||
type convergence struct {
|
type convergence struct {
|
||||||
service *composeService
|
service *composeService
|
||||||
observedState map[string]Containers
|
observedState map[string]Containers
|
||||||
|
stateMutex sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *convergence) getObservedState(serviceName string) Containers {
|
||||||
|
c.stateMutex.Lock()
|
||||||
|
defer c.stateMutex.Unlock()
|
||||||
|
return c.observedState[serviceName]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *convergence) setObservedState(serviceName string, containers Containers) {
|
||||||
|
c.stateMutex.Lock()
|
||||||
|
defer c.stateMutex.Unlock()
|
||||||
|
c.observedState[serviceName] = containers
|
||||||
}
|
}
|
||||||
|
|
||||||
func newConvergence(services []string, state Containers, s *composeService) *convergence {
|
func newConvergence(services []string, state Containers, s *composeService) *convergence {
|
||||||
|
@ -97,7 +110,7 @@ var mu sync.Mutex
|
||||||
|
|
||||||
// updateProject updates project after service converged, so dependent services relying on `service:xx` can refer to actual containers.
|
// updateProject updates project after service converged, so dependent services relying on `service:xx` can refer to actual containers.
|
||||||
func (c *convergence) updateProject(project *types.Project, service string) {
|
func (c *convergence) updateProject(project *types.Project, service string) {
|
||||||
containers := c.observedState[service]
|
containers := c.getObservedState(service)
|
||||||
container := containers[0]
|
container := containers[0]
|
||||||
|
|
||||||
// operation is protected by a Mutex so that we can safely update project.Services while running concurrent convergence on services
|
// operation is protected by a Mutex so that we can safely update project.Services while running concurrent convergence on services
|
||||||
|
@ -148,7 +161,7 @@ func (c *convergence) ensureService(ctx context.Context, project *types.Project,
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
containers := c.observedState[service.Name]
|
containers := c.getObservedState(service.Name)
|
||||||
actual := len(containers)
|
actual := len(containers)
|
||||||
updated := make(Containers, expected)
|
updated := make(Containers, expected)
|
||||||
|
|
||||||
|
@ -224,7 +237,7 @@ func (c *convergence) ensureService(ctx context.Context, project *types.Project,
|
||||||
}
|
}
|
||||||
|
|
||||||
err = eg.Wait()
|
err = eg.Wait()
|
||||||
c.observedState[service.Name] = updated
|
c.setObservedState(service.Name, updated)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue