Expand depends_on to allow different conditions (service_start, service_healthy)

Rework "up" and "start" to wait on conditional state of dependent services
Add integration tests

Signed-off-by: Joffrey F <joffrey@docker.com>
This commit is contained in:
Joffrey F 2016-12-19 20:20:03 -08:00
parent f6edd610f3
commit 04394b1d0a
10 changed files with 228 additions and 18 deletions

View File

@ -649,6 +649,8 @@ def process_service(service_config):
if 'sysctls' in service_dict: if 'sysctls' in service_dict:
service_dict['sysctls'] = build_string_dict(parse_sysctls(service_dict['sysctls'])) service_dict['sysctls'] = build_string_dict(parse_sysctls(service_dict['sysctls']))
service_dict = process_depends_on(service_dict)
for field in ['dns', 'dns_search', 'tmpfs']: for field in ['dns', 'dns_search', 'tmpfs']:
if field in service_dict: if field in service_dict:
service_dict[field] = to_list(service_dict[field]) service_dict[field] = to_list(service_dict[field])
@ -658,6 +660,14 @@ def process_service(service_config):
return service_dict return service_dict
def process_depends_on(service_dict):
if 'depends_on' in service_dict and not isinstance(service_dict['depends_on'], dict):
service_dict['depends_on'] = dict([
(svc, {'condition': 'service_started'}) for svc in service_dict['depends_on']
])
return service_dict
def process_healthcheck(service_dict, service_name): def process_healthcheck(service_dict, service_name):
if 'healthcheck' not in service_dict: if 'healthcheck' not in service_dict:
return service_dict return service_dict
@ -665,7 +675,7 @@ def process_healthcheck(service_dict, service_name):
hc = {} hc = {}
raw = service_dict['healthcheck'] raw = service_dict['healthcheck']
if raw.get('disable'): if raw.get('disable') or raw.get('disabled'):
if len(raw) > 1: if len(raw) > 1:
raise ConfigurationError( raise ConfigurationError(
'Service "{}" defines an invalid healthcheck: ' 'Service "{}" defines an invalid healthcheck: '

View File

@ -180,11 +180,13 @@ def validate_links(service_config, service_names):
def validate_depends_on(service_config, service_names): def validate_depends_on(service_config, service_names):
for dependency in service_config.config.get('depends_on', []): deps = service_config.config.get('depends_on', {})
for dependency in deps.keys():
if dependency not in service_names: if dependency not in service_names:
raise ConfigurationError( raise ConfigurationError(
"Service '{s.name}' depends on service '{dep}' which is " "Service '{s.name}' depends on service '{dep}' which is "
"undefined.".format(s=service_config, dep=dependency)) "undefined.".format(s=service_config, dep=dependency)
)
def get_unsupported_config_msg(path, error_key): def get_unsupported_config_msg(path, error_key):
@ -201,7 +203,7 @@ def anglicize_json_type(json_type):
def is_service_dict_schema(schema_id): def is_service_dict_schema(schema_id):
return schema_id in ('config_schema_v1.json', '#/properties/services') return schema_id in ('config_schema_v1.json', '#/properties/services')
def handle_error_for_schema_with_id(error, path): def handle_error_for_schema_with_id(error, path):

View File

@ -10,3 +10,24 @@ class OperationFailedError(Exception):
class StreamParseError(RuntimeError): class StreamParseError(RuntimeError):
def __init__(self, reason): def __init__(self, reason):
self.msg = reason self.msg = reason
class HealthCheckException(Exception):
def __init__(self, reason):
self.msg = reason
class HealthCheckFailed(HealthCheckException):
def __init__(self, container_id):
super(HealthCheckFailed, self).__init__(
'Container "{}" is unhealthy.'.format(container_id)
)
class NoHealthCheckConfigured(HealthCheckException):
def __init__(self, service_name):
super(NoHealthCheckConfigured, self).__init__(
'Service "{}" is missing a healthcheck configuration'.format(
service_name
)
)

View File

@ -165,13 +165,14 @@ def feed_queue(objects, func, get_deps, results, state):
for obj in pending: for obj in pending:
deps = get_deps(obj) deps = get_deps(obj)
if any(dep in state.failed for dep in deps): if any(dep[0] in state.failed for dep in deps):
log.debug('{} has upstream errors - not processing'.format(obj)) log.debug('{} has upstream errors - not processing'.format(obj))
results.put((obj, None, UpstreamError())) results.put((obj, None, UpstreamError()))
state.failed.add(obj) state.failed.add(obj)
elif all( elif all(
dep not in objects or dep in state.finished dep not in objects or (
for dep in deps dep in state.finished and (not ready_check or ready_check(dep))
) for dep, ready_check in deps
): ):
log.debug('Starting producer thread for {}'.format(obj)) log.debug('Starting producer thread for {}'.format(obj))
t = Thread(target=producer, args=(obj, func, results)) t = Thread(target=producer, args=(obj, func, results))

View File

@ -227,7 +227,10 @@ class Project(object):
services = self.get_services(service_names) services = self.get_services(service_names)
def get_deps(service): def get_deps(service):
return {self.get_service(dep) for dep in service.get_dependency_names()} return {
(self.get_service(dep), config)
for dep, config in service.get_dependency_configs().items()
}
parallel.parallel_execute( parallel.parallel_execute(
services, services,
@ -243,7 +246,7 @@ class Project(object):
def get_deps(container): def get_deps(container):
# actually returning inversed dependencies # actually returning inversed dependencies
return {other for other in containers return {(other, None) for other in containers
if container.service in if container.service in
self.get_service(other.service).get_dependency_names()} self.get_service(other.service).get_dependency_names()}
@ -394,7 +397,10 @@ class Project(object):
) )
def get_deps(service): def get_deps(service):
return {self.get_service(dep) for dep in service.get_dependency_names()} return {
(self.get_service(dep), config)
for dep, config in service.get_dependency_configs().items()
}
results, errors = parallel.parallel_execute( results, errors = parallel.parallel_execute(
services, services,

View File

@ -28,6 +28,8 @@ from .const import LABEL_PROJECT
from .const import LABEL_SERVICE from .const import LABEL_SERVICE
from .const import LABEL_VERSION from .const import LABEL_VERSION
from .container import Container from .container import Container
from .errors import HealthCheckFailed
from .errors import NoHealthCheckConfigured
from .errors import OperationFailedError from .errors import OperationFailedError
from .parallel import parallel_execute from .parallel import parallel_execute
from .parallel import parallel_start from .parallel import parallel_start
@ -69,6 +71,9 @@ DOCKER_START_KEYS = [
'volumes_from', 'volumes_from',
] ]
CONDITION_STARTED = 'service_started'
CONDITION_HEALTHY = 'service_healthy'
class BuildError(Exception): class BuildError(Exception):
def __init__(self, service, reason): def __init__(self, service, reason):
@ -533,10 +538,38 @@ class Service(object):
def get_dependency_names(self): def get_dependency_names(self):
net_name = self.network_mode.service_name net_name = self.network_mode.service_name
return (self.get_linked_service_names() + return (
self.get_volumes_from_names() + self.get_linked_service_names() +
([net_name] if net_name else []) + self.get_volumes_from_names() +
self.options.get('depends_on', [])) ([net_name] if net_name else []) +
list(self.options.get('depends_on', {}).keys())
)
def get_dependency_configs(self):
net_name = self.network_mode.service_name
configs = dict(
[(name, None) for name in self.get_linked_service_names()]
)
configs.update(dict(
[(name, None) for name in self.get_volumes_from_names()]
))
configs.update({net_name: None} if net_name else {})
configs.update(self.options.get('depends_on', {}))
for svc, config in self.options.get('depends_on', {}).items():
if config['condition'] == CONDITION_STARTED:
configs[svc] = lambda s: True
elif config['condition'] == CONDITION_HEALTHY:
configs[svc] = lambda s: s.is_healthy()
else:
# The config schema already prevents this, but it might be
# bypassed if Compose is called programmatically.
raise ValueError(
'depends_on condition "{}" is invalid.'.format(
config['condition']
)
)
return configs
def get_linked_service_names(self): def get_linked_service_names(self):
return [service.name for (service, _) in self.links] return [service.name for (service, _) in self.links]
@ -871,6 +904,24 @@ class Service(object):
else: else:
log.error(six.text_type(e)) log.error(six.text_type(e))
def is_healthy(self):
""" Check that all containers for this service report healthy.
Returns false if at least one healthcheck is pending.
If an unhealthy container is detected, raise a HealthCheckFailed
exception.
"""
result = True
for ctnr in self.containers():
ctnr.inspect()
status = ctnr.get('State.Health.Status')
if status is None:
raise NoHealthCheckConfigured(self.name)
elif status == 'starting':
result = False
elif status == 'unhealthy':
raise HealthCheckFailed(ctnr.short_id)
return result
def short_id_alias_exists(container, network): def short_id_alias_exists(container, network):
aliases = container.get( aliases = container.get(

View File

@ -928,7 +928,7 @@ class CLITestCase(DockerClientTestCase):
assert foo_container.get('HostConfig.NetworkMode') == \ assert foo_container.get('HostConfig.NetworkMode') == \
'container:{}'.format(bar_container.id) 'container:{}'.format(bar_container.id)
@v3_only() @v2_1_only()
def test_up_with_healthcheck(self): def test_up_with_healthcheck(self):
def wait_on_health_status(container, status): def wait_on_health_status(container, status):
def condition(): def condition():

View File

@ -19,6 +19,8 @@ from compose.config.types import VolumeSpec
from compose.const import LABEL_PROJECT from compose.const import LABEL_PROJECT
from compose.const import LABEL_SERVICE from compose.const import LABEL_SERVICE
from compose.container import Container from compose.container import Container
from compose.errors import HealthCheckFailed
from compose.errors import NoHealthCheckConfigured
from compose.project import Project from compose.project import Project
from compose.project import ProjectError from compose.project import ProjectError
from compose.service import ConvergenceStrategy from compose.service import ConvergenceStrategy
@ -1375,3 +1377,115 @@ class ProjectTest(DockerClientTestCase):
ctnr for ctnr in project._labeled_containers() ctnr for ctnr in project._labeled_containers()
if ctnr.labels.get(LABEL_SERVICE) == 'service1' if ctnr.labels.get(LABEL_SERVICE) == 'service1'
]) == 0 ]) == 0
@v2_1_only()
def test_project_up_healthy_dependency(self):
config_dict = {
'version': '2.1',
'services': {
'svc1': {
'image': 'busybox:latest',
'command': 'top',
'healthcheck': {
'test': 'exit 0',
'retries': 1,
'timeout': '10s',
'interval': '0.1s'
},
},
'svc2': {
'image': 'busybox:latest',
'command': 'top',
'depends_on': {
'svc1': {'condition': 'service_healthy'},
}
}
}
}
config_data = build_config(config_dict)
project = Project.from_config(
name='composetest', config_data=config_data, client=self.client
)
project.up()
containers = project.containers()
assert len(containers) == 2
svc1 = project.get_service('svc1')
svc2 = project.get_service('svc2')
assert 'svc1' in svc2.get_dependency_names()
assert svc1.is_healthy()
@v2_1_only()
def test_project_up_unhealthy_dependency(self):
config_dict = {
'version': '2.1',
'services': {
'svc1': {
'image': 'busybox:latest',
'command': 'top',
'healthcheck': {
'test': 'exit 1',
'retries': 1,
'timeout': '10s',
'interval': '0.1s'
},
},
'svc2': {
'image': 'busybox:latest',
'command': 'top',
'depends_on': {
'svc1': {'condition': 'service_healthy'},
}
}
}
}
config_data = build_config(config_dict)
project = Project.from_config(
name='composetest', config_data=config_data, client=self.client
)
with pytest.raises(HealthCheckFailed):
project.up()
containers = project.containers()
assert len(containers) == 1
svc1 = project.get_service('svc1')
svc2 = project.get_service('svc2')
assert 'svc1' in svc2.get_dependency_names()
with pytest.raises(HealthCheckFailed):
svc1.is_healthy()
@v2_1_only()
def test_project_up_no_healthcheck_dependency(self):
config_dict = {
'version': '2.1',
'services': {
'svc1': {
'image': 'busybox:latest',
'command': 'top',
'healthcheck': {
'disabled': True
},
},
'svc2': {
'image': 'busybox:latest',
'command': 'top',
'depends_on': {
'svc1': {'condition': 'service_healthy'},
}
}
}
}
config_data = build_config(config_dict)
project = Project.from_config(
name='composetest', config_data=config_data, client=self.client
)
with pytest.raises(NoHealthCheckConfigured):
project.up()
containers = project.containers()
assert len(containers) == 1
svc1 = project.get_service('svc1')
svc2 = project.get_service('svc2')
assert 'svc1' in svc2.get_dependency_names()
with pytest.raises(NoHealthCheckConfigured):
svc1.is_healthy()

View File

@ -920,7 +920,10 @@ class ConfigTest(unittest.TestCase):
'build': {'context': os.path.abspath('/')}, 'build': {'context': os.path.abspath('/')},
'image': 'example/web', 'image': 'example/web',
'volumes': [VolumeSpec.parse('/home/user/project:/code')], 'volumes': [VolumeSpec.parse('/home/user/project:/code')],
'depends_on': ['db', 'other'], 'depends_on': {
'db': {'condition': 'container_start'},
'other': {'condition': 'container_start'},
},
}, },
{ {
'name': 'db', 'name': 'db',
@ -3055,7 +3058,9 @@ class ExtendsTest(unittest.TestCase):
image: example image: example
""") """)
services = load_from_filename(str(tmpdir.join('docker-compose.yml'))) services = load_from_filename(str(tmpdir.join('docker-compose.yml')))
assert service_sort(services)[2]['depends_on'] == ['other'] assert service_sort(services)[2]['depends_on'] == {
'other': {'condition': 'container_start'}
}
@pytest.mark.xfail(IS_WINDOWS_PLATFORM, reason='paths use slash') @pytest.mark.xfail(IS_WINDOWS_PLATFORM, reason='paths use slash')

View File

@ -25,7 +25,7 @@ deps = {
def get_deps(obj): def get_deps(obj):
return deps[obj] return [(dep, None) for dep in deps[obj]]
def test_parallel_execute(): def test_parallel_execute():