From 48166a79c7dd484e082c09594c4abe44cf75da20 Mon Sep 17 00:00:00 2001 From: Shea Rozmiarek Date: Fri, 8 Dec 2017 00:34:22 -0600 Subject: [PATCH] Add COMPOSE_PARALLEL_LIMIT to restrict global number of parallel operations Signed-off-by: Shea Rozmiarek --- compose/const.py | 1 + compose/parallel.py | 16 +++++++++++++++- tests/unit/parallel_test.py | 27 +++++++++++++++++++++++++++ 3 files changed, 43 insertions(+), 1 deletion(-) diff --git a/compose/const.py b/compose/const.py index 2ac08b89a..6e5902cad 100644 --- a/compose/const.py +++ b/compose/const.py @@ -18,6 +18,7 @@ LABEL_VERSION = 'com.docker.compose.version' LABEL_VOLUME = 'com.docker.compose.volume' LABEL_CONFIG_HASH = 'com.docker.compose.config-hash' NANOCPUS_SCALE = 1000000000 +PARALLEL_LIMIT = 64 SECRETS_PATH = '/run/secrets' diff --git a/compose/parallel.py b/compose/parallel.py index f271561ff..4f881c8f1 100644 --- a/compose/parallel.py +++ b/compose/parallel.py @@ -15,6 +15,8 @@ from six.moves.queue import Queue from compose.cli.colors import green from compose.cli.colors import red from compose.cli.signals import ShutdownException +from compose.config.environment import Environment +from compose.const import PARALLEL_LIMIT from compose.errors import HealthCheckFailed from compose.errors import NoHealthCheckConfigured from compose.errors import OperationFailedError @@ -26,6 +28,18 @@ log = logging.getLogger(__name__) STOP = object() +def get_configured_limit(): + limit = Environment.from_command_line({'COMPOSE_PARALLEL_LIMIT': None})['COMPOSE_PARALLEL_LIMIT'] + if limit: + limit = int(limit) + else: + limit = PARALLEL_LIMIT + return limit + + +global_limiter = Semaphore(get_configured_limit()) + + def parallel_execute(objects, func, get_name, msg, get_deps=None, limit=None, parent_objects=None): """Runs func on objects in parallel while ensuring that func is ran on object only after it is ran on all its dependencies. @@ -173,7 +187,7 @@ def producer(obj, func, results, limiter): The entry point for a producer thread which runs func on a single object. Places a tuple on the results queue once func has either returned or raised. """ - with limiter: + with limiter, global_limiter: try: result = func(obj) results.put((obj, result, None)) diff --git a/tests/unit/parallel_test.py b/tests/unit/parallel_test.py index 3a60f01a6..7aabed17b 100644 --- a/tests/unit/parallel_test.py +++ b/tests/unit/parallel_test.py @@ -1,11 +1,13 @@ from __future__ import absolute_import from __future__ import unicode_literals +import os from threading import Lock import six from docker.errors import APIError +from compose.parallel import get_configured_limit from compose.parallel import parallel_execute from compose.parallel import parallel_execute_iter from compose.parallel import ParallelStreamWriter @@ -67,6 +69,31 @@ def test_parallel_execute_with_limit(): assert errors == {} +def test_parallel_execute_with_global_limit(): + os.environ['COMPOSE_PARALLEL_LIMIT'] = '1' + tasks = 20 + lock = Lock() + + assert get_configured_limit() == 1 + + def f(obj): + locked = lock.acquire(False) + # we should always get the lock because we're the only thread running + assert locked + lock.release() + return None + + results, errors = parallel_execute( + objects=list(range(tasks)), + func=f, + get_name=six.text_type, + msg="Testing", + ) + + assert results == tasks * [None] + assert errors == {} + + def test_parallel_execute_with_deps(): log = []