Merge pull request #4742 from shin-/2496-new_scale

docker-compose up --scale, scale in config file
This commit is contained in:
Joffrey F 2017-04-27 15:52:31 -07:00 committed by GitHub
commit fd699c5923
9 changed files with 285 additions and 125 deletions

View File

@ -26,6 +26,7 @@ from ..config import resolve_build_args
from ..config.environment import Environment from ..config.environment import Environment
from ..config.serialize import serialize_config from ..config.serialize import serialize_config
from ..config.types import VolumeSpec from ..config.types import VolumeSpec
from ..const import COMPOSEFILE_V2_2 as V2_2
from ..const import IS_WINDOWS_PLATFORM from ..const import IS_WINDOWS_PLATFORM
from ..errors import StreamParseError from ..errors import StreamParseError
from ..progress_stream import StreamOutputError from ..progress_stream import StreamOutputError
@ -763,6 +764,9 @@ class TopLevelCommand(object):
$ docker-compose scale web=2 worker=3 $ docker-compose scale web=2 worker=3
This command is deprecated. Use the up command with the `--scale` flag
instead.
Usage: scale [options] [SERVICE=NUM...] Usage: scale [options] [SERVICE=NUM...]
Options: Options:
@ -771,15 +775,18 @@ class TopLevelCommand(object):
""" """
timeout = timeout_from_opts(options) timeout = timeout_from_opts(options)
for s in options['SERVICE=NUM']: if self.project.config_version == V2_2:
if '=' not in s: raise UserError(
raise UserError('Arguments to scale should be in the form service=num') 'The scale command is incompatible with the v2.2 format. '
service_name, num = s.split('=', 1) 'Use the up command with the --scale flag instead.'
try: )
num = int(num) else:
except ValueError: log.warn(
raise UserError('Number of containers for service "%s" is not a ' 'The scale command is deprecated. '
'number' % service_name) 'Use the up command with the --scale flag instead.'
)
for service_name, num in parse_scale_args(options['SERVICE=NUM']).items():
self.project.get_service(service_name).scale(num, timeout=timeout) self.project.get_service(service_name).scale(num, timeout=timeout)
def start(self, options): def start(self, options):
@ -875,7 +882,7 @@ class TopLevelCommand(object):
If you want to force Compose to stop and recreate all containers, use the If you want to force Compose to stop and recreate all containers, use the
`--force-recreate` flag. `--force-recreate` flag.
Usage: up [options] [SERVICE...] Usage: up [options] [--scale SERVICE=NUM...] [SERVICE...]
Options: Options:
-d Detached mode: Run containers in the background, -d Detached mode: Run containers in the background,
@ -898,7 +905,9 @@ class TopLevelCommand(object):
--remove-orphans Remove containers for services not --remove-orphans Remove containers for services not
defined in the Compose file defined in the Compose file
--exit-code-from SERVICE Return the exit code of the selected service container. --exit-code-from SERVICE Return the exit code of the selected service container.
Requires --abort-on-container-exit. Implies --abort-on-container-exit.
--scale SERVICE=NUM Scale SERVICE to NUM instances. Overrides the `scale`
setting in the Compose file if present.
""" """
start_deps = not options['--no-deps'] start_deps = not options['--no-deps']
exit_value_from = exitval_from_opts(options, self.project) exit_value_from = exitval_from_opts(options, self.project)
@ -919,7 +928,9 @@ class TopLevelCommand(object):
do_build=build_action_from_opts(options), do_build=build_action_from_opts(options),
timeout=timeout, timeout=timeout,
detached=detached, detached=detached,
remove_orphans=remove_orphans) remove_orphans=remove_orphans,
scale_override=parse_scale_args(options['--scale']),
)
if detached: if detached:
return return
@ -1238,3 +1249,19 @@ def call_docker(args):
log.debug(" ".join(map(pipes.quote, args))) log.debug(" ".join(map(pipes.quote, args)))
return subprocess.call(args) return subprocess.call(args)
def parse_scale_args(options):
res = {}
for s in options:
if '=' not in s:
raise UserError('Arguments to scale should be in the form service=num')
service_name, num = s.split('=', 1)
try:
num = int(num)
except ValueError:
raise UserError(
'Number of containers for service "%s" is not a number' % service_name
)
res[service_name] = num
return res

View File

@ -222,6 +222,7 @@
"privileged": {"type": "boolean"}, "privileged": {"type": "boolean"},
"read_only": {"type": "boolean"}, "read_only": {"type": "boolean"},
"restart": {"type": "string"}, "restart": {"type": "string"},
"scale": {"type": "integer"},
"security_opt": {"type": "array", "items": {"type": "string"}, "uniqueItems": true}, "security_opt": {"type": "array", "items": {"type": "string"}, "uniqueItems": true},
"shm_size": {"type": ["number", "string"]}, "shm_size": {"type": ["number", "string"]},
"sysctls": {"$ref": "#/definitions/list_or_dict"}, "sysctls": {"$ref": "#/definitions/list_or_dict"},

View File

@ -260,10 +260,6 @@ def parallel_remove(containers, options):
parallel_operation(stopped_containers, 'remove', options, 'Removing') parallel_operation(stopped_containers, 'remove', options, 'Removing')
def parallel_start(containers, options):
parallel_operation(containers, 'start', options, 'Starting')
def parallel_pause(containers, options): def parallel_pause(containers, options):
parallel_operation(containers, 'pause', options, 'Pausing') parallel_operation(containers, 'pause', options, 'Pausing')

View File

@ -57,12 +57,13 @@ class Project(object):
""" """
A collection of services. A collection of services.
""" """
def __init__(self, name, services, client, networks=None, volumes=None): def __init__(self, name, services, client, networks=None, volumes=None, config_version=None):
self.name = name self.name = name
self.services = services self.services = services
self.client = client self.client = client
self.volumes = volumes or ProjectVolumes({}) self.volumes = volumes or ProjectVolumes({})
self.networks = networks or ProjectNetworks({}, False) self.networks = networks or ProjectNetworks({}, False)
self.config_version = config_version
def labels(self, one_off=OneOffFilter.exclude): def labels(self, one_off=OneOffFilter.exclude):
labels = ['{0}={1}'.format(LABEL_PROJECT, self.name)] labels = ['{0}={1}'.format(LABEL_PROJECT, self.name)]
@ -82,7 +83,7 @@ class Project(object):
networks, networks,
use_networking) use_networking)
volumes = ProjectVolumes.from_config(name, config_data, client) volumes = ProjectVolumes.from_config(name, config_data, client)
project = cls(name, [], client, project_networks, volumes) project = cls(name, [], client, project_networks, volumes, config_data.version)
for service_dict in config_data.services: for service_dict in config_data.services:
service_dict = dict(service_dict) service_dict = dict(service_dict)
@ -380,13 +381,17 @@ class Project(object):
do_build=BuildAction.none, do_build=BuildAction.none,
timeout=None, timeout=None,
detached=False, detached=False,
remove_orphans=False): remove_orphans=False,
scale_override=None):
warn_for_swarm_mode(self.client) warn_for_swarm_mode(self.client)
self.initialize() self.initialize()
self.find_orphan_containers(remove_orphans) self.find_orphan_containers(remove_orphans)
if scale_override is None:
scale_override = {}
services = self.get_services_without_duplicate( services = self.get_services_without_duplicate(
service_names, service_names,
include_deps=start_deps) include_deps=start_deps)
@ -399,7 +404,8 @@ class Project(object):
return service.execute_convergence_plan( return service.execute_convergence_plan(
plans[service.name], plans[service.name],
timeout=timeout, timeout=timeout,
detached=detached detached=detached,
scale_override=scale_override.get(service.name)
) )
def get_deps(service): def get_deps(service):
@ -589,10 +595,13 @@ def get_secrets(service, service_secrets, secret_defs):
continue continue
if secret.uid or secret.gid or secret.mode: if secret.uid or secret.gid or secret.mode:
log.warn("Service \"{service}\" uses secret \"{secret}\" with uid, " log.warn(
"Service \"{service}\" uses secret \"{secret}\" with uid, "
"gid, or mode. These fields are not supported by this " "gid, or mode. These fields are not supported by this "
"implementation of the Compose file".format( "implementation of the Compose file".format(
service=service, secret=secret.source)) service=service, secret=secret.source
)
)
secrets.append({'secret': secret, 'file': secret_def.get('file')}) secrets.append({'secret': secret, 'file': secret_def.get('file')})

View File

@ -39,7 +39,6 @@ from .errors import HealthCheckFailed
from .errors import NoHealthCheckConfigured from .errors import NoHealthCheckConfigured
from .errors import OperationFailedError from .errors import OperationFailedError
from .parallel import parallel_execute from .parallel import parallel_execute
from .parallel import parallel_start
from .progress_stream import stream_output from .progress_stream import stream_output
from .progress_stream import StreamOutputError from .progress_stream import StreamOutputError
from .utils import json_hash from .utils import json_hash
@ -149,6 +148,7 @@ class Service(object):
network_mode=None, network_mode=None,
networks=None, networks=None,
secrets=None, secrets=None,
scale=None,
**options **options
): ):
self.name = name self.name = name
@ -160,6 +160,7 @@ class Service(object):
self.network_mode = network_mode or NetworkMode(None) self.network_mode = network_mode or NetworkMode(None)
self.networks = networks or {} self.networks = networks or {}
self.secrets = secrets or [] self.secrets = secrets or []
self.scale_num = scale or 1
self.options = options self.options = options
def __repr__(self): def __repr__(self):
@ -190,16 +191,7 @@ class Service(object):
self.start_container_if_stopped(c, **options) self.start_container_if_stopped(c, **options)
return containers return containers
def scale(self, desired_num, timeout=None): def show_scale_warnings(self, desired_num):
"""
Adjusts the number of containers to the specified number and ensures
they are running.
- creates containers until there are at least `desired_num`
- stops containers until there are at most `desired_num` running
- starts containers until there are at least `desired_num` running
- removes all stopped containers
"""
if self.custom_container_name and desired_num > 1: if self.custom_container_name and desired_num > 1:
log.warn('The "%s" service is using the custom container name "%s". ' log.warn('The "%s" service is using the custom container name "%s". '
'Docker requires each container to have a unique name. ' 'Docker requires each container to have a unique name. '
@ -211,14 +203,18 @@ class Service(object):
'for this service are created on a single host, the port will clash.' 'for this service are created on a single host, the port will clash.'
% self.name) % self.name)
def create_and_start(service, number): def scale(self, desired_num, timeout=None):
container = service.create_container(number=number, quiet=True) """
service.start_container(container) Adjusts the number of containers to the specified number and ensures
return container they are running.
def stop_and_remove(container): - creates containers until there are at least `desired_num`
container.stop(timeout=self.stop_timeout(timeout)) - stops containers until there are at most `desired_num` running
container.remove() - starts containers until there are at least `desired_num` running
- removes all stopped containers
"""
self.show_scale_warnings(desired_num)
running_containers = self.containers(stopped=False) running_containers = self.containers(stopped=False)
num_running = len(running_containers) num_running = len(running_containers)
@ -229,11 +225,10 @@ class Service(object):
return return
if desired_num > num_running: if desired_num > num_running:
# we need to start/create until we have desired_num
all_containers = self.containers(stopped=True) all_containers = self.containers(stopped=True)
if num_running != len(all_containers): if num_running != len(all_containers):
# we have some stopped containers, let's start them up again # we have some stopped containers, check for divergences
stopped_containers = [ stopped_containers = [
c for c in all_containers if not c.is_running c for c in all_containers if not c.is_running
] ]
@ -242,38 +237,14 @@ class Service(object):
divergent_containers = [ divergent_containers = [
c for c in stopped_containers if self._containers_have_diverged([c]) c for c in stopped_containers if self._containers_have_diverged([c])
] ]
stopped_containers = sorted(
set(stopped_containers) - set(divergent_containers),
key=attrgetter('number')
)
for c in divergent_containers: for c in divergent_containers:
c.remove() c.remove()
num_stopped = len(stopped_containers) all_containers = list(set(all_containers) - set(divergent_containers))
if num_stopped + num_running > desired_num: sorted_containers = sorted(all_containers, key=attrgetter('number'))
num_to_start = desired_num - num_running self._execute_convergence_start(
containers_to_start = stopped_containers[:num_to_start] sorted_containers, desired_num, timeout, True, True
else:
containers_to_start = stopped_containers
parallel_start(containers_to_start, {})
num_running += len(containers_to_start)
num_to_create = desired_num - num_running
next_number = self._next_container_number()
container_numbers = [
number for number in range(
next_number, next_number + num_to_create
)
]
parallel_execute(
container_numbers,
lambda n: create_and_start(service=self, number=n),
lambda n: self.get_container_name(n),
"Creating and starting"
) )
if desired_num < num_running: if desired_num < num_running:
@ -283,12 +254,7 @@ class Service(object):
running_containers, running_containers,
key=attrgetter('number')) key=attrgetter('number'))
parallel_execute( self._downscale(sorted_running_containers[-num_to_stop:], timeout)
sorted_running_containers[-num_to_stop:],
stop_and_remove,
lambda c: c.name,
"Stopping and removing",
)
def create_container(self, def create_container(self,
one_off=False, one_off=False,
@ -401,50 +367,119 @@ class Service(object):
return has_diverged return has_diverged
def execute_convergence_plan(self, def _execute_convergence_create(self, scale, detached, start):
plan, i = self._next_container_number()
timeout=None,
detached=False,
start=True):
(action, containers) = plan
should_attach_logs = not detached
if action == 'create': def create_and_start(service, n):
container = self.create_container() container = service.create_container(number=n)
if not detached:
if should_attach_logs:
container.attach_log_stream() container.attach_log_stream()
if start: if start:
self.start_container(container) self.start_container(container)
return container
return [container] containers, errors = parallel_execute(
range(i, i + scale),
elif action == 'recreate': lambda n: create_and_start(self, n),
return [ lambda n: self.get_container_name(n),
self.recreate_container( "Creating"
container,
timeout=timeout,
attach_logs=should_attach_logs,
start_new_container=start
) )
for container in containers for error in errors.values():
] raise OperationFailedError(error)
elif action == 'start':
if start:
for container in containers:
self.start_container_if_stopped(container, attach_logs=should_attach_logs)
return containers return containers
elif action == 'noop': def _execute_convergence_recreate(self, containers, scale, timeout, detached, start):
if len(containers) > scale:
self._downscale(containers[scale:], timeout)
containers = containers[:scale]
def recreate(container):
return self.recreate_container(
container, timeout=timeout, attach_logs=not detached,
start_new_container=start
)
containers, errors = parallel_execute(
containers,
recreate,
lambda c: c.name,
"Recreating"
)
for error in errors.values():
raise OperationFailedError(error)
if len(containers) < scale:
containers.extend(self._execute_convergence_create(
scale - len(containers), detached, start
))
return containers
def _execute_convergence_start(self, containers, scale, timeout, detached, start):
if len(containers) > scale:
self._downscale(containers[scale:], timeout)
containers = containers[:scale]
if start:
_, errors = parallel_execute(
containers,
lambda c: self.start_container_if_stopped(c, attach_logs=not detached),
lambda c: c.name,
"Starting"
)
for error in errors.values():
raise OperationFailedError(error)
if len(containers) < scale:
containers.extend(self._execute_convergence_create(
scale - len(containers), detached, start
))
return containers
def _downscale(self, containers, timeout=None):
def stop_and_remove(container):
container.stop(timeout=self.stop_timeout(timeout))
container.remove()
parallel_execute(
containers,
stop_and_remove,
lambda c: c.name,
"Stopping and removing",
)
def execute_convergence_plan(self, plan, timeout=None, detached=False,
start=True, scale_override=None):
(action, containers) = plan
scale = scale_override if scale_override is not None else self.scale_num
containers = sorted(containers, key=attrgetter('number'))
self.show_scale_warnings(scale)
if action == 'create':
return self._execute_convergence_create(
scale, detached, start
)
if action == 'recreate':
return self._execute_convergence_recreate(
containers, scale, timeout, detached, start
)
if action == 'start':
return self._execute_convergence_start(
containers, scale, timeout, detached, start
)
if action == 'noop':
if scale != len(containers):
return self._execute_convergence_start(
containers, scale, timeout, detached, start
)
for c in containers: for c in containers:
log.info("%s is up-to-date" % c.name) log.info("%s is up-to-date" % c.name)
return containers return containers
else:
raise Exception("Invalid action: {}".format(action)) raise Exception("Invalid action: {}".format(action))
def recreate_container( def recreate_container(

View File

@ -151,7 +151,7 @@ class CLITestCase(DockerClientTestCase):
def test_help(self): def test_help(self):
self.base_dir = 'tests/fixtures/no-composefile' self.base_dir = 'tests/fixtures/no-composefile'
result = self.dispatch(['help', 'up'], returncode=0) result = self.dispatch(['help', 'up'], returncode=0)
assert 'Usage: up [options] [SERVICE...]' in result.stdout assert 'Usage: up [options] [--scale SERVICE=NUM...] [SERVICE...]' in result.stdout
# Prevent tearDown from trying to create a project # Prevent tearDown from trying to create a project
self.base_dir = None self.base_dir = None
@ -1866,6 +1866,59 @@ class CLITestCase(DockerClientTestCase):
self.assertEqual(len(project.get_service('simple').containers()), 0) self.assertEqual(len(project.get_service('simple').containers()), 0)
self.assertEqual(len(project.get_service('another').containers()), 0) self.assertEqual(len(project.get_service('another').containers()), 0)
def test_scale_v2_2(self):
self.base_dir = 'tests/fixtures/scale'
result = self.dispatch(['scale', 'web=1'], returncode=1)
assert 'incompatible with the v2.2 format' in result.stderr
def test_up_scale_scale_up(self):
self.base_dir = 'tests/fixtures/scale'
project = self.project
self.dispatch(['up', '-d'])
assert len(project.get_service('web').containers()) == 2
assert len(project.get_service('db').containers()) == 1
self.dispatch(['up', '-d', '--scale', 'web=3'])
assert len(project.get_service('web').containers()) == 3
assert len(project.get_service('db').containers()) == 1
def test_up_scale_scale_down(self):
self.base_dir = 'tests/fixtures/scale'
project = self.project
self.dispatch(['up', '-d'])
assert len(project.get_service('web').containers()) == 2
assert len(project.get_service('db').containers()) == 1
self.dispatch(['up', '-d', '--scale', 'web=1'])
assert len(project.get_service('web').containers()) == 1
assert len(project.get_service('db').containers()) == 1
def test_up_scale_reset(self):
self.base_dir = 'tests/fixtures/scale'
project = self.project
self.dispatch(['up', '-d', '--scale', 'web=3', '--scale', 'db=3'])
assert len(project.get_service('web').containers()) == 3
assert len(project.get_service('db').containers()) == 3
self.dispatch(['up', '-d'])
assert len(project.get_service('web').containers()) == 2
assert len(project.get_service('db').containers()) == 1
def test_up_scale_to_zero(self):
self.base_dir = 'tests/fixtures/scale'
project = self.project
self.dispatch(['up', '-d'])
assert len(project.get_service('web').containers()) == 2
assert len(project.get_service('db').containers()) == 1
self.dispatch(['up', '-d', '--scale', 'web=0', '--scale', 'db=0'])
assert len(project.get_service('web').containers()) == 0
assert len(project.get_service('db').containers()) == 0
def test_port(self): def test_port(self):
self.base_dir = 'tests/fixtures/ports-composefile' self.base_dir = 'tests/fixtures/ports-composefile'
self.dispatch(['up', '-d'], None) self.dispatch(['up', '-d'], None)

View File

@ -0,0 +1,9 @@
version: '2.2'
services:
web:
image: busybox
command: top
scale: 2
db:
image: busybox
command: top

View File

@ -19,6 +19,7 @@ from compose.config.types import VolumeFromSpec
from compose.config.types import VolumeSpec from compose.config.types import VolumeSpec
from compose.const import COMPOSEFILE_V2_0 as V2_0 from compose.const import COMPOSEFILE_V2_0 as V2_0
from compose.const import COMPOSEFILE_V2_1 as V2_1 from compose.const import COMPOSEFILE_V2_1 as V2_1
from compose.const import COMPOSEFILE_V2_2 as V2_2
from compose.const import COMPOSEFILE_V3_1 as V3_1 from compose.const import COMPOSEFILE_V3_1 as V3_1
from compose.const import LABEL_PROJECT from compose.const import LABEL_PROJECT
from compose.const import LABEL_SERVICE from compose.const import LABEL_SERVICE
@ -582,12 +583,12 @@ class ProjectTest(DockerClientTestCase):
self.assertEqual(len(service.containers()), 3) self.assertEqual(len(service.containers()), 3)
project.up() project.up()
service = project.get_service('web') service = project.get_service('web')
self.assertEqual(len(service.containers()), 3) self.assertEqual(len(service.containers()), 1)
service.scale(1) service.scale(1)
self.assertEqual(len(service.containers()), 1) self.assertEqual(len(service.containers()), 1)
project.up() project.up(scale_override={'web': 3})
service = project.get_service('web') service = project.get_service('web')
self.assertEqual(len(service.containers()), 1) self.assertEqual(len(service.containers()), 3)
# does scale=0 ,makes any sense? after recreating at least 1 container is running # does scale=0 ,makes any sense? after recreating at least 1 container is running
service.scale(0) service.scale(0)
project.up() project.up()
@ -1155,6 +1156,33 @@ class ProjectTest(DockerClientTestCase):
containers = project.containers() containers = project.containers()
self.assertEqual(len(containers), 1) self.assertEqual(len(containers), 1)
def test_project_up_config_scale(self):
config_data = build_config(
version=V2_2,
services=[{
'name': 'web',
'image': 'busybox:latest',
'command': 'top',
'scale': 3
}]
)
project = Project.from_config(
name='composetest', config_data=config_data, client=self.client
)
project.up()
assert len(project.containers()) == 3
project.up(scale_override={'web': 2})
assert len(project.containers()) == 2
project.up(scale_override={'web': 4})
assert len(project.containers()) == 4
project.stop()
project.up()
assert len(project.containers()) == 3
@v2_only() @v2_only()
def test_initialize_volumes(self): def test_initialize_volumes(self):
vol_name = '{0:x}'.format(random.getrandbits(32)) vol_name = '{0:x}'.format(random.getrandbits(32))

View File

@ -26,6 +26,7 @@ from compose.const import LABEL_PROJECT
from compose.const import LABEL_SERVICE from compose.const import LABEL_SERVICE
from compose.const import LABEL_VERSION from compose.const import LABEL_VERSION
from compose.container import Container from compose.container import Container
from compose.errors import OperationFailedError
from compose.project import OneOffFilter from compose.project import OneOffFilter
from compose.service import ConvergencePlan from compose.service import ConvergencePlan
from compose.service import ConvergenceStrategy from compose.service import ConvergenceStrategy
@ -777,15 +778,15 @@ class ServiceTest(DockerClientTestCase):
message="testing", message="testing",
response={}, response={},
explanation="Boom")): explanation="Boom")):
with mock.patch('sys.stderr', new_callable=StringIO) as mock_stderr: with mock.patch('sys.stderr', new_callable=StringIO) as mock_stderr:
with pytest.raises(OperationFailedError):
service.scale(3) service.scale(3)
self.assertEqual(len(service.containers()), 1) assert len(service.containers()) == 1
self.assertTrue(service.containers()[0].is_running) assert service.containers()[0].is_running
self.assertIn( assert (
"ERROR: for composetest_web_2 Cannot create container for service web: Boom", "ERROR: for composetest_web_2 Cannot create container for service"
mock_stderr.getvalue() " web: Boom" in mock_stderr.getvalue()
) )
def test_scale_with_unexpected_exception(self): def test_scale_with_unexpected_exception(self):
@ -837,6 +838,7 @@ class ServiceTest(DockerClientTestCase):
service = self.create_service('app', container_name='custom-container') service = self.create_service('app', container_name='custom-container')
self.assertEqual(service.custom_container_name, 'custom-container') self.assertEqual(service.custom_container_name, 'custom-container')
with pytest.raises(OperationFailedError):
service.scale(3) service.scale(3)
captured_output = mock_log.warn.call_args[0][0] captured_output = mock_log.warn.call_args[0][0]