Add optional limit to the number of parallel operations

Signed-off-by: Evan Shaw <evan@vendhq.com>
This commit is contained in:
Evan Shaw 2017-02-13 16:15:49 +13:00
parent 21529169ad
commit 05aa8c7285
3 changed files with 31 additions and 15 deletions

View File

@ -4,6 +4,7 @@ from __future__ import unicode_literals
import logging import logging
import operator import operator
import sys import sys
from threading import Semaphore
from threading import Thread from threading import Thread
from docker.errors import APIError from docker.errors import APIError
@ -23,7 +24,7 @@ log = logging.getLogger(__name__)
STOP = object() STOP = object()
def parallel_execute(objects, func, get_name, msg, get_deps=None): def parallel_execute(objects, func, get_name, msg, get_deps=None, limit=None):
"""Runs func on objects in parallel while ensuring that func is """Runs func on objects in parallel while ensuring that func is
ran on object only after it is ran on all its dependencies. ran on object only after it is ran on all its dependencies.
@ -37,7 +38,7 @@ def parallel_execute(objects, func, get_name, msg, get_deps=None):
for obj in objects: for obj in objects:
writer.initialize(get_name(obj)) writer.initialize(get_name(obj))
events = parallel_execute_iter(objects, func, get_deps) events = parallel_execute_iter(objects, func, get_deps, limit)
errors = {} errors = {}
results = [] results = []
@ -94,7 +95,15 @@ class State(object):
return set(self.objects) - self.started - self.finished - self.failed return set(self.objects) - self.started - self.finished - self.failed
def parallel_execute_iter(objects, func, get_deps): class NoLimit(object):
def __enter__(self):
pass
def __exit__(self, *ex):
pass
def parallel_execute_iter(objects, func, get_deps, limit):
""" """
Runs func on objects in parallel while ensuring that func is Runs func on objects in parallel while ensuring that func is
ran on object only after it is ran on all its dependencies. ran on object only after it is ran on all its dependencies.
@ -113,11 +122,16 @@ def parallel_execute_iter(objects, func, get_deps):
if get_deps is None: if get_deps is None:
get_deps = _no_deps get_deps = _no_deps
if limit is None:
limiter = NoLimit()
else:
limiter = Semaphore(limit)
results = Queue() results = Queue()
state = State(objects) state = State(objects)
while True: while True:
feed_queue(objects, func, get_deps, results, state) feed_queue(objects, func, get_deps, results, state, limiter)
try: try:
event = results.get(timeout=0.1) event = results.get(timeout=0.1)
@ -141,19 +155,20 @@ def parallel_execute_iter(objects, func, get_deps):
yield event yield event
def producer(obj, func, results): def producer(obj, func, results, limiter):
""" """
The entry point for a producer thread which runs func on a single object. The entry point for a producer thread which runs func on a single object.
Places a tuple on the results queue once func has either returned or raised. Places a tuple on the results queue once func has either returned or raised.
""" """
try: with limiter:
result = func(obj) try:
results.put((obj, result, None)) result = func(obj)
except Exception as e: results.put((obj, result, None))
results.put((obj, None, e)) except Exception as e:
results.put((obj, None, e))
def feed_queue(objects, func, get_deps, results, state): def feed_queue(objects, func, get_deps, results, state, limiter):
""" """
Starts producer threads for any objects which are ready to be processed Starts producer threads for any objects which are ready to be processed
(i.e. they have no dependencies which haven't been successfully processed). (i.e. they have no dependencies which haven't been successfully processed).
@ -177,7 +192,7 @@ def feed_queue(objects, func, get_deps, results, state):
) for dep, ready_check in deps ) for dep, ready_check in deps
): ):
log.debug('Starting producer thread for {}'.format(obj)) log.debug('Starting producer thread for {}'.format(obj))
t = Thread(target=producer, args=(obj, func, results)) t = Thread(target=producer, args=(obj, func, results, limiter))
t.daemon = True t.daemon = True
t.start() t.start()
state.started.add(obj) state.started.add(obj)
@ -199,7 +214,7 @@ class UpstreamError(Exception):
class ParallelStreamWriter(object): class ParallelStreamWriter(object):
"""Write out messages for operations happening in parallel. """Write out messages for operations happening in parallel.
Each operation has it's own line, and ANSI code characters are used Each operation has its own line, and ANSI code characters are used
to jump to the correct line, and write over the line. to jump to the correct line, and write over the line.
""" """

View File

@ -463,7 +463,8 @@ class Project(object):
services, services,
pull_service, pull_service,
operator.attrgetter('name'), operator.attrgetter('name'),
'Pulling') 'Pulling',
limit=5)
def push(self, service_names=None, ignore_push_failures=False): def push(self, service_names=None, ignore_push_failures=False):
for service in self.get_services(service_names, include_deps=False): for service in self.get_services(service_names, include_deps=False):

View File

@ -82,7 +82,7 @@ def test_parallel_execute_with_upstream_errors():
events = [ events = [
(obj, result, type(exception)) (obj, result, type(exception))
for obj, result, exception for obj, result, exception
in parallel_execute_iter(objects, process, get_deps) in parallel_execute_iter(objects, process, get_deps, None)
] ]
assert (cache, None, type(None)) in events assert (cache, None, type(None)) in events