Move parallel_execute to a new module.

Signed-off-by: Daniel Nephin <dnephin@docker.com>
This commit is contained in:
Daniel Nephin 2015-11-23 11:34:48 -05:00
parent 64447879d2
commit b4edf0c454
5 changed files with 146 additions and 144 deletions

View File

@ -1,7 +1,6 @@
from __future__ import absolute_import
from __future__ import unicode_literals
import operator
from functools import reduce
import six
@ -9,7 +8,6 @@ import six
from .const import LABEL_CONTAINER_NUMBER
from .const import LABEL_PROJECT
from .const import LABEL_SERVICE
from compose.utils import parallel_execute
class Container(object):
@ -252,40 +250,3 @@ def get_container_name(container):
# ps
shortest_name = min(container['Names'], key=lambda n: len(n.split('/')))
return shortest_name.split('/')[-1]
def parallel_operation(containers, operation, options, message):
parallel_execute(
containers,
operator.methodcaller(operation, **options),
operator.attrgetter('name'),
message)
def parallel_remove(containers, options):
stopped_containers = [c for c in containers if not c.is_running]
parallel_operation(stopped_containers, 'remove', options, 'Removing')
def parallel_stop(containers, options):
parallel_operation(containers, 'stop', options, 'Stopping')
def parallel_start(containers, options):
parallel_operation(containers, 'start', options, 'Starting')
def parallel_pause(containers, options):
parallel_operation(containers, 'pause', options, 'Pausing')
def parallel_unpause(containers, options):
parallel_operation(containers, 'unpause', options, 'Unpausing')
def parallel_kill(containers, options):
parallel_operation(containers, 'kill', options, 'Killing')
def parallel_restart(containers, options):
parallel_operation(containers, 'restart', options, 'Restarting')

135
compose/parallel.py Normal file
View File

@ -0,0 +1,135 @@
from __future__ import absolute_import
from __future__ import unicode_literals
import operator
import sys
from threading import Thread
from docker.errors import APIError
from six.moves.queue import Empty
from six.moves.queue import Queue
from compose.utils import get_output_stream
def perform_operation(func, arg, callback, index):
try:
callback((index, func(arg)))
except Exception as e:
callback((index, e))
def parallel_execute(objects, func, index_func, msg):
"""For a given list of objects, call the callable passing in the first
object we give it.
"""
objects = list(objects)
stream = get_output_stream(sys.stdout)
writer = ParallelStreamWriter(stream, msg)
for obj in objects:
writer.initialize(index_func(obj))
q = Queue()
# TODO: limit the number of threads #1828
for obj in objects:
t = Thread(
target=perform_operation,
args=(func, obj, q.put, index_func(obj)))
t.daemon = True
t.start()
done = 0
errors = {}
while done < len(objects):
try:
msg_index, result = q.get(timeout=1)
except Empty:
continue
if isinstance(result, APIError):
errors[msg_index] = "error", result.explanation
writer.write(msg_index, 'error')
elif isinstance(result, Exception):
errors[msg_index] = "unexpected_exception", result
else:
writer.write(msg_index, 'done')
done += 1
if not errors:
return
stream.write("\n")
for msg_index, (result, error) in errors.items():
stream.write("ERROR: for {} {} \n".format(msg_index, error))
if result == 'unexpected_exception':
raise error
class ParallelStreamWriter(object):
"""Write out messages for operations happening in parallel.
Each operation has it's own line, and ANSI code characters are used
to jump to the correct line, and write over the line.
"""
def __init__(self, stream, msg):
self.stream = stream
self.msg = msg
self.lines = []
def initialize(self, obj_index):
self.lines.append(obj_index)
self.stream.write("{} {} ... \r\n".format(self.msg, obj_index))
self.stream.flush()
def write(self, obj_index, status):
position = self.lines.index(obj_index)
diff = len(self.lines) - position
# move up
self.stream.write("%c[%dA" % (27, diff))
# erase
self.stream.write("%c[2K\r" % 27)
self.stream.write("{} {} ... {}\r".format(self.msg, obj_index, status))
# move back down
self.stream.write("%c[%dB" % (27, diff))
self.stream.flush()
def parallel_operation(containers, operation, options, message):
parallel_execute(
containers,
operator.methodcaller(operation, **options),
operator.attrgetter('name'),
message)
def parallel_remove(containers, options):
stopped_containers = [c for c in containers if not c.is_running]
parallel_operation(stopped_containers, 'remove', options, 'Removing')
def parallel_stop(containers, options):
parallel_operation(containers, 'stop', options, 'Stopping')
def parallel_start(containers, options):
parallel_operation(containers, 'start', options, 'Starting')
def parallel_pause(containers, options):
parallel_operation(containers, 'pause', options, 'Pausing')
def parallel_unpause(containers, options):
parallel_operation(containers, 'unpause', options, 'Unpausing')
def parallel_kill(containers, options):
parallel_operation(containers, 'kill', options, 'Killing')
def parallel_restart(containers, options):
parallel_operation(containers, 'restart', options, 'Restarting')

View File

@ -7,7 +7,7 @@ from functools import reduce
from docker.errors import APIError
from docker.errors import NotFound
from . import container
from . import parallel
from .config import ConfigurationError
from .config import get_service_name_from_net
from .const import DEFAULT_TIMEOUT
@ -241,22 +241,22 @@ class Project(object):
service.start(**options)
def stop(self, service_names=None, **options):
container.parallel_stop(self.containers(service_names), options)
parallel.parallel_stop(self.containers(service_names), options)
def pause(self, service_names=None, **options):
container.parallel_pause(reversed(self.containers(service_names)), options)
parallel.parallel_pause(reversed(self.containers(service_names)), options)
def unpause(self, service_names=None, **options):
container.parallel_unpause(self.containers(service_names), options)
parallel.parallel_unpause(self.containers(service_names), options)
def kill(self, service_names=None, **options):
container.parallel_kill(self.containers(service_names), options)
parallel.parallel_kill(self.containers(service_names), options)
def remove_stopped(self, service_names=None, **options):
container.parallel_remove(self.containers(service_names, stopped=True), options)
parallel.parallel_remove(self.containers(service_names, stopped=True), options)
def restart(self, service_names=None, **options):
container.parallel_restart(self.containers(service_names, stopped=True), options)
parallel.parallel_restart(self.containers(service_names, stopped=True), options)
def build(self, service_names=None, no_cache=False, pull=False, force_rm=False):
for service in self.get_services(service_names):

View File

@ -28,14 +28,14 @@ from .const import LABEL_PROJECT
from .const import LABEL_SERVICE
from .const import LABEL_VERSION
from .container import Container
from .container import parallel_remove
from .container import parallel_start
from .container import parallel_stop
from .legacy import check_for_legacy_containers
from .parallel import parallel_execute
from .parallel import parallel_remove
from .parallel import parallel_start
from .parallel import parallel_stop
from .progress_stream import stream_output
from .progress_stream import StreamOutputError
from .utils import json_hash
from .utils import parallel_execute
log = logging.getLogger(__name__)

View File

@ -2,107 +2,13 @@ import codecs
import hashlib
import json
import json.decoder
import logging
import sys
from threading import Thread
import six
from docker.errors import APIError
from six.moves.queue import Empty
from six.moves.queue import Queue
log = logging.getLogger(__name__)
json_decoder = json.JSONDecoder()
def perform_operation(func, arg, callback, index):
try:
callback((index, func(arg)))
except Exception as e:
callback((index, e))
def parallel_execute(objects, func, index_func, msg):
"""For a given list of objects, call the callable passing in the first
object we give it.
"""
objects = list(objects)
stream = get_output_stream(sys.stdout)
writer = ParallelStreamWriter(stream, msg)
for obj in objects:
writer.initialize(index_func(obj))
q = Queue()
# TODO: limit the number of threads #1828
for obj in objects:
t = Thread(
target=perform_operation,
args=(func, obj, q.put, index_func(obj)))
t.daemon = True
t.start()
done = 0
errors = {}
while done < len(objects):
try:
msg_index, result = q.get(timeout=1)
except Empty:
continue
if isinstance(result, APIError):
errors[msg_index] = "error", result.explanation
writer.write(msg_index, 'error')
elif isinstance(result, Exception):
errors[msg_index] = "unexpected_exception", result
else:
writer.write(msg_index, 'done')
done += 1
if not errors:
return
stream.write("\n")
for msg_index, (result, error) in errors.items():
stream.write("ERROR: for {} {} \n".format(msg_index, error))
if result == 'unexpected_exception':
raise error
class ParallelStreamWriter(object):
"""Write out messages for operations happening in parallel.
Each operation has it's own line, and ANSI code characters are used
to jump to the correct line, and write over the line.
"""
def __init__(self, stream, msg):
self.stream = stream
self.msg = msg
self.lines = []
def initialize(self, obj_index):
self.lines.append(obj_index)
self.stream.write("{} {} ... \r\n".format(self.msg, obj_index))
self.stream.flush()
def write(self, obj_index, status):
position = self.lines.index(obj_index)
diff = len(self.lines) - position
# move up
self.stream.write("%c[%dA" % (27, diff))
# erase
self.stream.write("%c[2K\r" % 27)
self.stream.write("{} {} ... {}\r".format(self.msg, obj_index, status))
# move back down
self.stream.write("%c[%dB" % (27, diff))
self.stream.flush()
def get_output_stream(stream):
if six.PY3:
return stream