Merge pull request #4267 from shin-/3754-depends-on-healthcheck

Allow service dependencies to wait on healthy containers
This commit is contained in:
Joffrey F 2017-01-04 13:00:19 -08:00 committed by GitHub
commit be88bb0e6c
11 changed files with 273 additions and 18 deletions

View File

@ -649,6 +649,8 @@ def process_service(service_config):
if 'sysctls' in service_dict:
service_dict['sysctls'] = build_string_dict(parse_sysctls(service_dict['sysctls']))
service_dict = process_depends_on(service_dict)
for field in ['dns', 'dns_search', 'tmpfs']:
if field in service_dict:
service_dict[field] = to_list(service_dict[field])
@ -658,6 +660,14 @@ def process_service(service_config):
return service_dict
def process_depends_on(service_dict):
if 'depends_on' in service_dict and not isinstance(service_dict['depends_on'], dict):
service_dict['depends_on'] = dict([
(svc, {'condition': 'service_started'}) for svc in service_dict['depends_on']
])
return service_dict
def process_healthcheck(service_dict, service_name):
if 'healthcheck' not in service_dict:
return service_dict
@ -665,7 +675,7 @@ def process_healthcheck(service_dict, service_name):
hc = {}
raw = service_dict['healthcheck']
if raw.get('disable'):
if raw.get('disable') or raw.get('disabled'):
if len(raw) > 1:
raise ConfigurationError(
'Service "{}" defines an invalid healthcheck: '

View File

@ -77,7 +77,28 @@
"cpu_shares": {"type": ["number", "string"]},
"cpu_quota": {"type": ["number", "string"]},
"cpuset": {"type": "string"},
"depends_on": {"$ref": "#/definitions/list_of_strings"},
"depends_on": {
"oneOf": [
{"$ref": "#/definitions/list_of_strings"},
{
"type": "object",
"additionalProperties": false,
"patternProperties": {
"^[a-zA-Z0-9._-]+$": {
"type": "object",
"additionalProperties": false,
"properties": {
"condition": {
"type": "string",
"enum": ["service_started", "service_healthy"]
}
},
"required": ["condition"]
}
}
}
]
},
"devices": {"type": "array", "items": {"type": "string"}, "uniqueItems": true},
"dns": {"$ref": "#/definitions/string_or_list"},
"dns_search": {"$ref": "#/definitions/string_or_list"},
@ -120,6 +141,7 @@
"external_links": {"type": "array", "items": {"type": "string"}, "uniqueItems": true},
"extra_hosts": {"$ref": "#/definitions/list_or_dict"},
"healthcheck": {"$ref": "#/definitions/healthcheck"},
"hostname": {"type": "string"},
"image": {"type": "string"},
"ipc": {"type": "string"},
@ -231,6 +253,24 @@
"additionalProperties": false
},
"healthcheck": {
"id": "#/definitions/healthcheck",
"type": "object",
"additionalProperties": false,
"properties": {
"disabled": {"type": "boolean"},
"interval": {"type": "string"},
"retries": {"type": "number"},
"test": {
"oneOf": [
{"type": "string"},
{"type": "array", "items": {"type": "string"}}
]
},
"timeout": {"type": "string"}
}
},
"network": {
"id": "#/definitions/network",
"type": "object",

View File

@ -180,11 +180,13 @@ def validate_links(service_config, service_names):
def validate_depends_on(service_config, service_names):
for dependency in service_config.config.get('depends_on', []):
deps = service_config.config.get('depends_on', {})
for dependency in deps.keys():
if dependency not in service_names:
raise ConfigurationError(
"Service '{s.name}' depends on service '{dep}' which is "
"undefined.".format(s=service_config, dep=dependency))
"undefined.".format(s=service_config, dep=dependency)
)
def get_unsupported_config_msg(path, error_key):
@ -201,7 +203,7 @@ def anglicize_json_type(json_type):
def is_service_dict_schema(schema_id):
return schema_id in ('config_schema_v1.json', '#/properties/services')
return schema_id in ('config_schema_v1.json', '#/properties/services')
def handle_error_for_schema_with_id(error, path):

View File

@ -10,3 +10,24 @@ class OperationFailedError(Exception):
class StreamParseError(RuntimeError):
def __init__(self, reason):
self.msg = reason
class HealthCheckException(Exception):
def __init__(self, reason):
self.msg = reason
class HealthCheckFailed(HealthCheckException):
def __init__(self, container_id):
super(HealthCheckFailed, self).__init__(
'Container "{}" is unhealthy.'.format(container_id)
)
class NoHealthCheckConfigured(HealthCheckException):
def __init__(self, service_name):
super(NoHealthCheckConfigured, self).__init__(
'Service "{}" is missing a healthcheck configuration'.format(
service_name
)
)

View File

@ -165,13 +165,14 @@ def feed_queue(objects, func, get_deps, results, state):
for obj in pending:
deps = get_deps(obj)
if any(dep in state.failed for dep in deps):
if any(dep[0] in state.failed for dep in deps):
log.debug('{} has upstream errors - not processing'.format(obj))
results.put((obj, None, UpstreamError()))
state.failed.add(obj)
elif all(
dep not in objects or dep in state.finished
for dep in deps
dep not in objects or (
dep in state.finished and (not ready_check or ready_check(dep))
) for dep, ready_check in deps
):
log.debug('Starting producer thread for {}'.format(obj))
t = Thread(target=producer, args=(obj, func, results))

View File

@ -227,7 +227,10 @@ class Project(object):
services = self.get_services(service_names)
def get_deps(service):
return {self.get_service(dep) for dep in service.get_dependency_names()}
return {
(self.get_service(dep), config)
for dep, config in service.get_dependency_configs().items()
}
parallel.parallel_execute(
services,
@ -243,7 +246,7 @@ class Project(object):
def get_deps(container):
# actually returning inversed dependencies
return {other for other in containers
return {(other, None) for other in containers
if container.service in
self.get_service(other.service).get_dependency_names()}
@ -394,7 +397,10 @@ class Project(object):
)
def get_deps(service):
return {self.get_service(dep) for dep in service.get_dependency_names()}
return {
(self.get_service(dep), config)
for dep, config in service.get_dependency_configs().items()
}
results, errors = parallel.parallel_execute(
services,

View File

@ -28,6 +28,8 @@ from .const import LABEL_PROJECT
from .const import LABEL_SERVICE
from .const import LABEL_VERSION
from .container import Container
from .errors import HealthCheckFailed
from .errors import NoHealthCheckConfigured
from .errors import OperationFailedError
from .parallel import parallel_execute
from .parallel import parallel_start
@ -69,6 +71,9 @@ DOCKER_START_KEYS = [
'volumes_from',
]
CONDITION_STARTED = 'service_started'
CONDITION_HEALTHY = 'service_healthy'
class BuildError(Exception):
def __init__(self, service, reason):
@ -533,10 +538,38 @@ class Service(object):
def get_dependency_names(self):
net_name = self.network_mode.service_name
return (self.get_linked_service_names() +
self.get_volumes_from_names() +
([net_name] if net_name else []) +
self.options.get('depends_on', []))
return (
self.get_linked_service_names() +
self.get_volumes_from_names() +
([net_name] if net_name else []) +
list(self.options.get('depends_on', {}).keys())
)
def get_dependency_configs(self):
net_name = self.network_mode.service_name
configs = dict(
[(name, None) for name in self.get_linked_service_names()]
)
configs.update(dict(
[(name, None) for name in self.get_volumes_from_names()]
))
configs.update({net_name: None} if net_name else {})
configs.update(self.options.get('depends_on', {}))
for svc, config in self.options.get('depends_on', {}).items():
if config['condition'] == CONDITION_STARTED:
configs[svc] = lambda s: True
elif config['condition'] == CONDITION_HEALTHY:
configs[svc] = lambda s: s.is_healthy()
else:
# The config schema already prevents this, but it might be
# bypassed if Compose is called programmatically.
raise ValueError(
'depends_on condition "{}" is invalid.'.format(
config['condition']
)
)
return configs
def get_linked_service_names(self):
return [service.name for (service, _) in self.links]
@ -871,6 +904,24 @@ class Service(object):
else:
log.error(six.text_type(e))
def is_healthy(self):
""" Check that all containers for this service report healthy.
Returns false if at least one healthcheck is pending.
If an unhealthy container is detected, raise a HealthCheckFailed
exception.
"""
result = True
for ctnr in self.containers():
ctnr.inspect()
status = ctnr.get('State.Health.Status')
if status is None:
raise NoHealthCheckConfigured(self.name)
elif status == 'starting':
result = False
elif status == 'unhealthy':
raise HealthCheckFailed(ctnr.short_id)
return result
def short_id_alias_exists(container, network):
aliases = container.get(

View File

@ -32,6 +32,11 @@ exe = EXE(pyz,
'compose/config/config_schema_v2.1.json',
'DATA'
),
(
'compose/config/config_schema_v3.0.json',
'compose/config/config_schema_v3.0.json',
'DATA'
),
(
'compose/GITSHA',
'compose/GITSHA',

View File

@ -19,6 +19,8 @@ from compose.config.types import VolumeSpec
from compose.const import LABEL_PROJECT
from compose.const import LABEL_SERVICE
from compose.container import Container
from compose.errors import HealthCheckFailed
from compose.errors import NoHealthCheckConfigured
from compose.project import Project
from compose.project import ProjectError
from compose.service import ConvergenceStrategy
@ -1375,3 +1377,115 @@ class ProjectTest(DockerClientTestCase):
ctnr for ctnr in project._labeled_containers()
if ctnr.labels.get(LABEL_SERVICE) == 'service1'
]) == 0
@v2_1_only()
def test_project_up_healthy_dependency(self):
config_dict = {
'version': '2.1',
'services': {
'svc1': {
'image': 'busybox:latest',
'command': 'top',
'healthcheck': {
'test': 'exit 0',
'retries': 1,
'timeout': '10s',
'interval': '0.1s'
},
},
'svc2': {
'image': 'busybox:latest',
'command': 'top',
'depends_on': {
'svc1': {'condition': 'service_healthy'},
}
}
}
}
config_data = build_config(config_dict)
project = Project.from_config(
name='composetest', config_data=config_data, client=self.client
)
project.up()
containers = project.containers()
assert len(containers) == 2
svc1 = project.get_service('svc1')
svc2 = project.get_service('svc2')
assert 'svc1' in svc2.get_dependency_names()
assert svc1.is_healthy()
@v2_1_only()
def test_project_up_unhealthy_dependency(self):
config_dict = {
'version': '2.1',
'services': {
'svc1': {
'image': 'busybox:latest',
'command': 'top',
'healthcheck': {
'test': 'exit 1',
'retries': 1,
'timeout': '10s',
'interval': '0.1s'
},
},
'svc2': {
'image': 'busybox:latest',
'command': 'top',
'depends_on': {
'svc1': {'condition': 'service_healthy'},
}
}
}
}
config_data = build_config(config_dict)
project = Project.from_config(
name='composetest', config_data=config_data, client=self.client
)
with pytest.raises(HealthCheckFailed):
project.up()
containers = project.containers()
assert len(containers) == 1
svc1 = project.get_service('svc1')
svc2 = project.get_service('svc2')
assert 'svc1' in svc2.get_dependency_names()
with pytest.raises(HealthCheckFailed):
svc1.is_healthy()
@v2_1_only()
def test_project_up_no_healthcheck_dependency(self):
config_dict = {
'version': '2.1',
'services': {
'svc1': {
'image': 'busybox:latest',
'command': 'top',
'healthcheck': {
'disabled': True
},
},
'svc2': {
'image': 'busybox:latest',
'command': 'top',
'depends_on': {
'svc1': {'condition': 'service_healthy'},
}
}
}
}
config_data = build_config(config_dict)
project = Project.from_config(
name='composetest', config_data=config_data, client=self.client
)
with pytest.raises(NoHealthCheckConfigured):
project.up()
containers = project.containers()
assert len(containers) == 1
svc1 = project.get_service('svc1')
svc2 = project.get_service('svc2')
assert 'svc1' in svc2.get_dependency_names()
with pytest.raises(NoHealthCheckConfigured):
svc1.is_healthy()

View File

@ -920,7 +920,10 @@ class ConfigTest(unittest.TestCase):
'build': {'context': os.path.abspath('/')},
'image': 'example/web',
'volumes': [VolumeSpec.parse('/home/user/project:/code')],
'depends_on': ['db', 'other'],
'depends_on': {
'db': {'condition': 'service_started'},
'other': {'condition': 'service_started'},
},
},
{
'name': 'db',
@ -3055,7 +3058,9 @@ class ExtendsTest(unittest.TestCase):
image: example
""")
services = load_from_filename(str(tmpdir.join('docker-compose.yml')))
assert service_sort(services)[2]['depends_on'] == ['other']
assert service_sort(services)[2]['depends_on'] == {
'other': {'condition': 'service_started'}
}
@pytest.mark.xfail(IS_WINDOWS_PLATFORM, reason='paths use slash')

View File

@ -25,7 +25,7 @@ deps = {
def get_deps(obj):
return deps[obj]
return [(dep, None) for dep in deps[obj]]
def test_parallel_execute():