Merge pull request #1734 from mnowster/1653-scale-in-parallel

Parallelise scale
This commit is contained in:
Aanand Prasad 2015-07-21 16:02:04 +01:00
commit dc62279d02
3 changed files with 117 additions and 56 deletions

View File

@ -198,15 +198,30 @@ class Project(object):
service.start(**options) service.start(**options)
def stop(self, service_names=None, **options): def stop(self, service_names=None, **options):
parallel_execute("stop", self.containers(service_names), "Stopping", "Stopped", **options) parallel_execute(
objects=self.containers(service_names),
obj_callable=lambda c: c.stop(**options),
msg_index=lambda c: c.name,
msg="Stopping"
)
def kill(self, service_names=None, **options): def kill(self, service_names=None, **options):
parallel_execute("kill", self.containers(service_names), "Killing", "Killed", **options) parallel_execute(
objects=self.containers(service_names),
obj_callable=lambda c: c.kill(**options),
msg_index=lambda c: c.name,
msg="Killing"
)
def remove_stopped(self, service_names=None, **options): def remove_stopped(self, service_names=None, **options):
all_containers = self.containers(service_names, stopped=True) all_containers = self.containers(service_names, stopped=True)
stopped_containers = [c for c in all_containers if not c.is_running] stopped_containers = [c for c in all_containers if not c.is_running]
parallel_execute("remove", stopped_containers, "Removing", "Removed", **options) parallel_execute(
objects=stopped_containers,
obj_callable=lambda c: c.remove(**options),
msg_index=lambda c: c.name,
msg="Removing"
)
def restart(self, service_names=None, **options): def restart(self, service_names=None, **options):
for service in self.get_services(service_names): for service in self.get_services(service_names):

View File

@ -24,7 +24,7 @@ from .const import (
from .container import Container from .container import Container
from .legacy import check_for_legacy_containers from .legacy import check_for_legacy_containers
from .progress_stream import stream_output, StreamOutputError from .progress_stream import stream_output, StreamOutputError
from .utils import json_hash from .utils import json_hash, parallel_execute
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -168,42 +168,82 @@ class Service(object):
'for this service are created on a single host, the port will clash.' 'for this service are created on a single host, the port will clash.'
% self.name) % self.name)
# Create enough containers def create_and_start(service, number):
containers = self.containers(stopped=True) container = service.create_container(number=number, quiet=True)
while len(containers) < desired_num: container.start()
containers.append(self.create_container()) return container
running_containers = [] running_containers = self.containers(stopped=False)
stopped_containers = [] num_running = len(running_containers)
for c in containers:
if c.is_running:
running_containers.append(c)
else:
stopped_containers.append(c)
running_containers.sort(key=lambda c: c.number)
stopped_containers.sort(key=lambda c: c.number)
# Stop containers if desired_num == num_running:
while len(running_containers) > desired_num: # do nothing as we already have the desired number
c = running_containers.pop() log.info('Desired container number already achieved')
log.info("Stopping %s..." % c.name) return
c.stop(timeout=timeout)
stopped_containers.append(c)
# Start containers if desired_num > num_running:
while len(running_containers) < desired_num: # we need to start/create until we have desired_num
c = stopped_containers.pop(0) all_containers = self.containers(stopped=True)
log.info("Starting %s..." % c.name)
self.start_container(c) if num_running != len(all_containers):
running_containers.append(c) # we have some stopped containers, let's start them up again
stopped_containers = sorted([c for c in all_containers if not c.is_running], key=attrgetter('number'))
num_stopped = len(stopped_containers)
if num_stopped + num_running > desired_num:
num_to_start = desired_num - num_running
containers_to_start = stopped_containers[:num_to_start]
else:
containers_to_start = stopped_containers
parallel_execute(
objects=containers_to_start,
obj_callable=lambda c: c.start(),
msg_index=lambda c: c.name,
msg="Starting"
)
num_running += len(containers_to_start)
num_to_create = desired_num - num_running
next_number = self._next_container_number()
container_numbers = [
number for number in range(
next_number, next_number + num_to_create
)
]
parallel_execute(
objects=container_numbers,
obj_callable=lambda n: create_and_start(service=self, number=n),
msg_index=lambda n: n,
msg="Creating and starting"
)
if desired_num < num_running:
num_to_stop = num_running - desired_num
sorted_running_containers = sorted(running_containers, key=attrgetter('number'))
containers_to_stop = sorted_running_containers[-num_to_stop:]
parallel_execute(
objects=containers_to_stop,
obj_callable=lambda c: c.stop(timeout=timeout),
msg_index=lambda c: c.name,
msg="Stopping"
)
self.remove_stopped() self.remove_stopped()
def remove_stopped(self, **options): def remove_stopped(self, **options):
for c in self.containers(stopped=True): containers = [c for c in self.containers(stopped=True) if not c.is_running]
if not c.is_running:
log.info("Removing %s..." % c.name) parallel_execute(
c.remove(**options) objects=containers,
obj_callable=lambda c: c.remove(**options),
msg_index=lambda c: c.name,
msg="Removing"
)
def create_container(self, def create_container(self,
one_off=False, one_off=False,

View File

@ -12,69 +12,75 @@ from threading import Thread
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def parallel_execute(command, containers, doing_msg, done_msg, **options): def parallel_execute(objects, obj_callable, msg_index, msg):
""" """
Execute a given command upon a list of containers in parallel. For a given list of objects, call the callable passing in the first
object we give it.
""" """
stream = codecs.getwriter('utf-8')(sys.stdout) stream = codecs.getwriter('utf-8')(sys.stdout)
lines = [] lines = []
errors = {} errors = {}
for container in containers: for obj in objects:
write_out_msg(stream, lines, container.name, doing_msg) write_out_msg(stream, lines, msg_index(obj), msg)
q = Queue() q = Queue()
def container_command_execute(container, command, **options): def inner_execute_function(an_callable, parameter, msg_index):
try: try:
getattr(container, command)(**options) result = an_callable(parameter)
except APIError as e: except APIError as e:
errors[container.name] = e.explanation errors[msg_index] = e.explanation
q.put(container) result = "error"
q.put((msg_index, result))
for container in containers: for an_object in objects:
t = Thread( t = Thread(
target=container_command_execute, target=inner_execute_function,
args=(container, command), args=(obj_callable, an_object, msg_index(an_object)),
kwargs=options,
) )
t.daemon = True t.daemon = True
t.start() t.start()
done = 0 done = 0
total_to_execute = len(objects)
while done < len(containers): while done < total_to_execute:
try: try:
container = q.get(timeout=1) msg_index, result = q.get(timeout=1)
write_out_msg(stream, lines, container.name, done_msg) if result == 'error':
write_out_msg(stream, lines, msg_index, msg, status='error')
else:
write_out_msg(stream, lines, msg_index, msg)
done += 1 done += 1
except Empty: except Empty:
pass pass
if errors: if errors:
for container in errors: for error in errors:
stream.write("ERROR: for {} {} \n".format(container, errors[container])) stream.write("ERROR: for {} {} \n".format(error, errors[error]))
def write_out_msg(stream, lines, container_name, msg): def write_out_msg(stream, lines, msg_index, msg, status="done"):
""" """
Using special ANSI code characters we can write out the msg over the top of Using special ANSI code characters we can write out the msg over the top of
a previous status message, if it exists. a previous status message, if it exists.
""" """
if container_name in lines: obj_index = msg_index
position = lines.index(container_name) if msg_index in lines:
position = lines.index(obj_index)
diff = len(lines) - position diff = len(lines) - position
# move up # move up
stream.write("%c[%dA" % (27, diff)) stream.write("%c[%dA" % (27, diff))
# erase # erase
stream.write("%c[2K\r" % 27) stream.write("%c[2K\r" % 27)
stream.write("{}: {} \n".format(container_name, msg)) stream.write("{} {}... {}\n".format(msg, obj_index, status))
# move back down # move back down
stream.write("%c[%dB" % (27, diff)) stream.write("%c[%dB" % (27, diff))
else: else:
diff = 0 diff = 0
lines.append(container_name) lines.append(obj_index)
stream.write("{}: {}... \r\n".format(container_name, msg)) stream.write("{} {}... \r\n".format(msg, obj_index))
stream.flush() stream.flush()