diff --git a/cmd/compose/compose.go b/cmd/compose/compose.go index 024d6589d..ae0d13f1b 100644 --- a/cmd/compose/compose.go +++ b/cmd/compose/compose.go @@ -263,10 +263,11 @@ func RootCommand(dockerCli command.Cli, backend api.Service) *cobra.Command { opts := projectOptions{} var ( - ansi string - noAnsi bool - verbose bool - version bool + ansi string + noAnsi bool + verbose bool + version bool + parallel int ) c := &cobra.Command{ Short: "Docker Compose", @@ -325,6 +326,9 @@ func RootCommand(dockerCli command.Cli, backend api.Service) *cobra.Command { opts.ProjectDir = opts.WorkDir fmt.Fprint(os.Stderr, aec.Apply("option '--workdir' is DEPRECATED at root level! Please use '--project-directory' instead.\n", aec.RedF)) } + if parallel > 0 { + backend.MaxConcurrency(parallel) + } return nil }, } @@ -370,6 +374,7 @@ func RootCommand(dockerCli command.Cli, backend api.Service) *cobra.Command { ) c.Flags().StringVar(&ansi, "ansi", "auto", `Control when to print ANSI control characters ("never"|"always"|"auto")`) + c.Flags().IntVar(¶llel, "parallel", -1, `Control max parallelism, -1 for unlimited`) c.Flags().BoolVarP(&version, "version", "v", false, "Show the Docker Compose version information") c.Flags().MarkHidden("version") //nolint:errcheck c.Flags().BoolVar(&noAnsi, "no-ansi", false, `Do not print ANSI control characters (DEPRECATED)`) diff --git a/docs/reference/compose.md b/docs/reference/compose.md index d512fb2ac..27960628c 100644 --- a/docs/reference/compose.md +++ b/docs/reference/compose.md @@ -42,6 +42,7 @@ Docker Compose | `--compatibility` | | | Run compose in backward compatibility mode | | `--env-file` | `string` | | Specify an alternate environment file. | | `-f`, `--file` | `stringArray` | | Compose configuration files | +| `--parallel` | `int` | `-1` | Control max parallelism, -1 for unlimited | | `--profile` | `stringArray` | | Specify a profile to enable | | `--project-directory` | `string` | | Specify an alternate working directory (default: the path of the, first specified, Compose file) | diff --git a/docs/reference/docker_compose.yaml b/docs/reference/docker_compose.yaml index c0512717c..9709ef880 100644 --- a/docs/reference/docker_compose.yaml +++ b/docs/reference/docker_compose.yaml @@ -208,6 +208,16 @@ options: experimentalcli: false kubernetes: false swarm: false + - option: parallel + value_type: int + default_value: "-1" + description: Control max parallelism, -1 for unlimited + deprecated: false + hidden: false + experimental: false + experimentalcli: false + kubernetes: false + swarm: false - option: profile value_type: stringArray default_value: '[]' diff --git a/pkg/api/api.go b/pkg/api/api.go index 5096e5920..d02a90e48 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -75,6 +75,8 @@ type Service interface { Port(ctx context.Context, projectName string, service string, port uint16, options PortOptions) (string, int, error) // Images executes the equivalent of a `compose images` Images(ctx context.Context, projectName string, options ImagesOptions) ([]ImageSummary, error) + // MaxConcurrency defines upper limit for concurrent operations against engine API + MaxConcurrency(parallel int) } // BuildOptions group options of the Build API diff --git a/pkg/api/proxy.go b/pkg/api/proxy.go index aa2364b74..1ed80b26c 100644 --- a/pkg/api/proxy.go +++ b/pkg/api/proxy.go @@ -50,6 +50,7 @@ type ServiceProxy struct { EventsFn func(ctx context.Context, project string, options EventsOptions) error PortFn func(ctx context.Context, project string, service string, port uint16, options PortOptions) (string, int, error) ImagesFn func(ctx context.Context, projectName string, options ImagesOptions) ([]ImageSummary, error) + MaxConcurrencyFn func(parallel int) interceptors []Interceptor } @@ -87,6 +88,7 @@ func (s *ServiceProxy) WithService(service Service) *ServiceProxy { s.EventsFn = service.Events s.PortFn = service.Port s.ImagesFn = service.Images + s.MaxConcurrencyFn = service.MaxConcurrency return s } @@ -308,3 +310,7 @@ func (s *ServiceProxy) Images(ctx context.Context, project string, options Image } return s.ImagesFn(ctx, project, options) } + +func (s *ServiceProxy) MaxConcurrency(i int) { + s.MaxConcurrencyFn(i) +} diff --git a/pkg/compose/compose.go b/pkg/compose/compose.go index fb664cea0..adc282303 100644 --- a/pkg/compose/compose.go +++ b/pkg/compose/compose.go @@ -41,12 +41,14 @@ import ( // NewComposeService create a local implementation of the compose.Service API func NewComposeService(dockerCli command.Cli) api.Service { return &composeService{ - dockerCli: dockerCli, + dockerCli: dockerCli, + maxConcurrency: -1, } } type composeService struct { - dockerCli command.Cli + dockerCli command.Cli + maxConcurrency int } func (s *composeService) apiClient() client.APIClient { @@ -57,6 +59,10 @@ func (s *composeService) configFile() *configfile.ConfigFile { return s.dockerCli.ConfigFile() } +func (s *composeService) MaxConcurrency(i int) { + s.maxConcurrency = i +} + func (s *composeService) stdout() *streams.Out { return s.dockerCli.Out() } diff --git a/pkg/compose/pull.go b/pkg/compose/pull.go index 1a79edc7b..50a029f0c 100644 --- a/pkg/compose/pull.go +++ b/pkg/compose/pull.go @@ -63,6 +63,7 @@ func (s *composeService) pull(ctx context.Context, project *types.Project, opts w := progress.ContextWriter(ctx) eg, ctx := errgroup.WithContext(ctx) + eg.SetLimit(s.maxConcurrency) var mustBuild []string @@ -279,6 +280,7 @@ func (s *composeService) pullRequiredImages(ctx context.Context, project *types. return progress.Run(ctx, func(ctx context.Context) error { w := progress.ContextWriter(ctx) eg, ctx := errgroup.WithContext(ctx) + eg.SetLimit(s.maxConcurrency) pulledImages := make([]string, len(needPull)) for i, service := range needPull { i, service := i, service diff --git a/pkg/compose/push.go b/pkg/compose/push.go index 65309afe3..08f9f5edc 100644 --- a/pkg/compose/push.go +++ b/pkg/compose/push.go @@ -47,6 +47,7 @@ func (s *composeService) Push(ctx context.Context, project *types.Project, optio func (s *composeService) push(ctx context.Context, project *types.Project, options api.PushOptions) error { eg, ctx := errgroup.WithContext(ctx) + eg.SetLimit(s.maxConcurrency) info, err := s.apiClient().Info(ctx) if err != nil { diff --git a/pkg/mocks/mock_docker_compose_api.go b/pkg/mocks/mock_docker_compose_api.go index 6569ccc64..3046ae427 100644 --- a/pkg/mocks/mock_docker_compose_api.go +++ b/pkg/mocks/mock_docker_compose_api.go @@ -194,6 +194,18 @@ func (mr *MockServiceMockRecorder) Logs(ctx, projectName, consumer, options inte return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Logs", reflect.TypeOf((*MockService)(nil).Logs), ctx, projectName, consumer, options) } +// MaxConcurrency mocks base method. +func (m *MockService) MaxConcurrency(parallel int) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "MaxConcurrency", parallel) +} + +// MaxConcurrency indicates an expected call of MaxConcurrency. +func (mr *MockServiceMockRecorder) MaxConcurrency(parallel interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MaxConcurrency", reflect.TypeOf((*MockService)(nil).MaxConcurrency), parallel) +} + // Pause mocks base method. func (m *MockService) Pause(ctx context.Context, projectName string, options api.PauseOptions) error { m.ctrl.T.Helper()