Refactor setup_queue()

- Stop sharing set objects across threads
- Use a second queue to signal when producer threads are done
- Use a single consumer thread to check dependencies and kick off new
  producers

Signed-off-by: Aanand Prasad <aanand.prasad@gmail.com>
This commit is contained in:
Aanand Prasad 2016-04-08 12:58:19 +01:00 committed by Daniel Nephin
parent b865f35f17
commit cdef2b5e3b
2 changed files with 46 additions and 21 deletions

View File

@ -1,6 +1,7 @@
from __future__ import absolute_import
from __future__ import unicode_literals
import logging
import operator
import sys
from threading import Thread
@ -14,6 +15,9 @@ from compose.cli.signals import ShutdownException
from compose.utils import get_output_stream
log = logging.getLogger(__name__)
def parallel_execute(objects, func, get_name, msg, get_deps=None):
"""Runs func on objects in parallel while ensuring that func is
ran on object only after it is ran on all its dependencies.
@ -73,35 +77,53 @@ def setup_queue(objects, func, get_deps, get_name):
get_deps = _no_deps
results = Queue()
started = set() # objects being processed
finished = set() # objects which have been processed
output = Queue()
def do_op(obj):
def consumer():
started = set() # objects being processed
finished = set() # objects which have been processed
def ready(obj):
"""
Returns true if obj is ready to be processed:
- all dependencies have been processed
- obj is not already being processed
"""
return obj not in started and all(
dep not in objects or dep in finished
for dep in get_deps(obj)
)
while len(finished) < len(objects):
for obj in filter(ready, objects):
log.debug('Starting producer thread for {}'.format(obj))
t = Thread(target=producer, args=(obj,))
t.daemon = True
t.start()
started.add(obj)
try:
event = results.get(timeout=1)
except Empty:
continue
obj = event[0]
log.debug('Finished processing: {}'.format(obj))
finished.add(obj)
output.put(event)
def producer(obj):
try:
result = func(obj)
results.put((obj, result, None))
except Exception as e:
results.put((obj, None, e))
finished.add(obj)
feed()
t = Thread(target=consumer)
t.daemon = True
t.start()
def ready(obj):
# Is object ready for performing operation
return obj not in started and all(
dep not in objects or dep in finished
for dep in get_deps(obj)
)
def feed():
for obj in filter(ready, objects):
started.add(obj)
t = Thread(target=do_op, args=(obj,))
t.daemon = True
t.start()
feed()
return results
return output
class ParallelStreamWriter(object):

View File

@ -135,6 +135,9 @@ class Service(object):
self.networks = networks or {}
self.options = options
def __repr__(self):
return '<Service: {}>'.format(self.name)
def containers(self, stopped=False, one_off=False, filters={}):
filters.update({'label': self.labels(one_off=one_off)})