Merge pull request #4428 from edsrzf/parallel-pull

Pull services in parallel
This commit is contained in:
Joffrey F 2017-03-01 14:38:22 -08:00 committed by GitHub
commit 5ff3037aa8
5 changed files with 85 additions and 22 deletions

View File

@ -602,10 +602,12 @@ class TopLevelCommand(object):
Options:
--ignore-pull-failures Pull what it can and ignores images with pull failures.
--parallel Pull multiple images in parallel.
"""
self.project.pull(
service_names=options['SERVICE'],
ignore_pull_failures=options.get('--ignore-pull-failures')
ignore_pull_failures=options.get('--ignore-pull-failures'),
parallel_pull=options.get('--parallel')
)
def push(self, options):

View File

@ -4,6 +4,7 @@ from __future__ import unicode_literals
import logging
import operator
import sys
from threading import Semaphore
from threading import Thread
from docker.errors import APIError
@ -25,7 +26,7 @@ log = logging.getLogger(__name__)
STOP = object()
def parallel_execute(objects, func, get_name, msg, get_deps=None):
def parallel_execute(objects, func, get_name, msg, get_deps=None, limit=None):
"""Runs func on objects in parallel while ensuring that func is
ran on object only after it is ran on all its dependencies.
@ -39,7 +40,7 @@ def parallel_execute(objects, func, get_name, msg, get_deps=None):
for obj in objects:
writer.initialize(get_name(obj))
events = parallel_execute_iter(objects, func, get_deps)
events = parallel_execute_iter(objects, func, get_deps, limit)
errors = {}
results = []
@ -96,7 +97,15 @@ class State(object):
return set(self.objects) - self.started - self.finished - self.failed
def parallel_execute_iter(objects, func, get_deps):
class NoLimit(object):
def __enter__(self):
pass
def __exit__(self, *ex):
pass
def parallel_execute_iter(objects, func, get_deps, limit):
"""
Runs func on objects in parallel while ensuring that func is
ran on object only after it is ran on all its dependencies.
@ -115,11 +124,16 @@ def parallel_execute_iter(objects, func, get_deps):
if get_deps is None:
get_deps = _no_deps
if limit is None:
limiter = NoLimit()
else:
limiter = Semaphore(limit)
results = Queue()
state = State(objects)
while True:
feed_queue(objects, func, get_deps, results, state)
feed_queue(objects, func, get_deps, results, state, limiter)
try:
event = results.get(timeout=0.1)
@ -143,19 +157,20 @@ def parallel_execute_iter(objects, func, get_deps):
yield event
def producer(obj, func, results):
def producer(obj, func, results, limiter):
"""
The entry point for a producer thread which runs func on a single object.
Places a tuple on the results queue once func has either returned or raised.
"""
try:
result = func(obj)
results.put((obj, result, None))
except Exception as e:
results.put((obj, None, e))
with limiter:
try:
result = func(obj)
results.put((obj, result, None))
except Exception as e:
results.put((obj, None, e))
def feed_queue(objects, func, get_deps, results, state):
def feed_queue(objects, func, get_deps, results, state, limiter):
"""
Starts producer threads for any objects which are ready to be processed
(i.e. they have no dependencies which haven't been successfully processed).
@ -179,7 +194,7 @@ def feed_queue(objects, func, get_deps, results, state):
) for dep, ready_check in deps
):
log.debug('Starting producer thread for {}'.format(obj))
t = Thread(target=producer, args=(obj, func, results))
t = Thread(target=producer, args=(obj, func, results, limiter))
t.daemon = True
t.start()
state.started.add(obj)
@ -201,7 +216,7 @@ class UpstreamError(Exception):
class ParallelStreamWriter(object):
"""Write out messages for operations happening in parallel.
Each operation has it's own line, and ANSI code characters are used
Each operation has its own line, and ANSI code characters are used
to jump to the correct line, and write over the line.
"""

View File

@ -454,9 +454,22 @@ class Project(object):
return plans
def pull(self, service_names=None, ignore_pull_failures=False):
for service in self.get_services(service_names, include_deps=False):
service.pull(ignore_pull_failures)
def pull(self, service_names=None, ignore_pull_failures=False, parallel_pull=False):
services = self.get_services(service_names, include_deps=False)
if parallel_pull:
def pull_service(service):
service.pull(ignore_pull_failures, True)
parallel.parallel_execute(
services,
pull_service,
operator.attrgetter('name'),
'Pulling',
limit=5)
else:
for service in services:
service.pull(ignore_pull_failures)
def push(self, service_names=None, ignore_push_failures=False):
for service in self.get_services(service_names, include_deps=False):

View File

@ -2,6 +2,7 @@ from __future__ import absolute_import
from __future__ import unicode_literals
import logging
import os
import re
import sys
from collections import namedtuple
@ -897,17 +898,23 @@ class Service(object):
return any(has_host_port(binding) for binding in self.options.get('ports', []))
def pull(self, ignore_pull_failures=False):
def pull(self, ignore_pull_failures=False, silent=False):
if 'image' not in self.options:
return
repo, tag, separator = parse_repository_tag(self.options['image'])
tag = tag or 'latest'
log.info('Pulling %s (%s%s%s)...' % (self.name, repo, separator, tag))
if not silent:
log.info('Pulling %s (%s%s%s)...' % (self.name, repo, separator, tag))
try:
output = self.client.pull(repo, tag=tag, stream=True)
return progress_stream.get_digest_from_pull(
stream_output(output, sys.stdout))
if silent:
with open(os.devnull, 'w') as devnull:
return progress_stream.get_digest_from_pull(
stream_output(output, devnull))
else:
return progress_stream.get_digest_from_pull(
stream_output(output, sys.stdout))
except (StreamOutputError, NotFound) as e:
if not ignore_pull_failures:
raise

View File

@ -1,6 +1,8 @@
from __future__ import absolute_import
from __future__ import unicode_literals
from threading import Lock
import six
from docker.errors import APIError
@ -40,6 +42,30 @@ def test_parallel_execute():
assert errors == {}
def test_parallel_execute_with_limit():
limit = 1
tasks = 20
lock = Lock()
def f(obj):
locked = lock.acquire(False)
# we should always get the lock because we're the only thread running
assert locked
lock.release()
return None
results, errors = parallel_execute(
objects=list(range(tasks)),
func=f,
get_name=six.text_type,
msg="Testing",
limit=limit,
)
assert results == tasks*[None]
assert errors == {}
def test_parallel_execute_with_deps():
log = []
@ -82,7 +108,7 @@ def test_parallel_execute_with_upstream_errors():
events = [
(obj, result, type(exception))
for obj, result, exception
in parallel_execute_iter(objects, process, get_deps)
in parallel_execute_iter(objects, process, get_deps, None)
]
assert (cache, None, type(None)) in events