diff --git a/compose/cli/main.py b/compose/cli/main.py index 9b40599cf..37e299d94 100644 --- a/compose/cli/main.py +++ b/compose/cli/main.py @@ -26,6 +26,7 @@ from ..config import resolve_build_args from ..config.environment import Environment from ..config.serialize import serialize_config from ..config.types import VolumeSpec +from ..const import COMPOSEFILE_V2_2 as V2_2 from ..const import IS_WINDOWS_PLATFORM from ..errors import StreamParseError from ..progress_stream import StreamOutputError @@ -763,6 +764,9 @@ class TopLevelCommand(object): $ docker-compose scale web=2 worker=3 + This command is deprecated. Use the up command with the `--scale` flag + instead. + Usage: scale [options] [SERVICE=NUM...] Options: @@ -771,15 +775,18 @@ class TopLevelCommand(object): """ timeout = timeout_from_opts(options) - for s in options['SERVICE=NUM']: - if '=' not in s: - raise UserError('Arguments to scale should be in the form service=num') - service_name, num = s.split('=', 1) - try: - num = int(num) - except ValueError: - raise UserError('Number of containers for service "%s" is not a ' - 'number' % service_name) + if self.project.config_version == V2_2: + raise UserError( + 'The scale command is incompatible with the v2.2 format. ' + 'Use the up command with the --scale flag instead.' + ) + else: + log.warn( + 'The scale command is deprecated. ' + 'Use the up command with the --scale flag instead.' + ) + + for service_name, num in parse_scale_args(options['SERVICE=NUM']).items(): self.project.get_service(service_name).scale(num, timeout=timeout) def start(self, options): @@ -875,7 +882,7 @@ class TopLevelCommand(object): If you want to force Compose to stop and recreate all containers, use the `--force-recreate` flag. - Usage: up [options] [SERVICE...] + Usage: up [options] [--scale SERVICE=NUM...] [SERVICE...] Options: -d Detached mode: Run containers in the background, @@ -898,7 +905,9 @@ class TopLevelCommand(object): --remove-orphans Remove containers for services not defined in the Compose file --exit-code-from SERVICE Return the exit code of the selected service container. - Requires --abort-on-container-exit. + Implies --abort-on-container-exit. + --scale SERVICE=NUM Scale SERVICE to NUM instances. Overrides the `scale` + setting in the Compose file if present. """ start_deps = not options['--no-deps'] exit_value_from = exitval_from_opts(options, self.project) @@ -919,7 +928,9 @@ class TopLevelCommand(object): do_build=build_action_from_opts(options), timeout=timeout, detached=detached, - remove_orphans=remove_orphans) + remove_orphans=remove_orphans, + scale_override=parse_scale_args(options['--scale']), + ) if detached: return @@ -1238,3 +1249,19 @@ def call_docker(args): log.debug(" ".join(map(pipes.quote, args))) return subprocess.call(args) + + +def parse_scale_args(options): + res = {} + for s in options: + if '=' not in s: + raise UserError('Arguments to scale should be in the form service=num') + service_name, num = s.split('=', 1) + try: + num = int(num) + except ValueError: + raise UserError( + 'Number of containers for service "%s" is not a number' % service_name + ) + res[service_name] = num + return res diff --git a/compose/config/config_schema_v2.2.json b/compose/config/config_schema_v2.2.json index 3c4844a50..a178fccc4 100644 --- a/compose/config/config_schema_v2.2.json +++ b/compose/config/config_schema_v2.2.json @@ -222,6 +222,7 @@ "privileged": {"type": "boolean"}, "read_only": {"type": "boolean"}, "restart": {"type": "string"}, + "scale": {"type": "integer"}, "security_opt": {"type": "array", "items": {"type": "string"}, "uniqueItems": true}, "shm_size": {"type": ["number", "string"]}, "sysctls": {"$ref": "#/definitions/list_or_dict"}, diff --git a/compose/parallel.py b/compose/parallel.py index fde723f33..34fef71db 100644 --- a/compose/parallel.py +++ b/compose/parallel.py @@ -260,10 +260,6 @@ def parallel_remove(containers, options): parallel_operation(stopped_containers, 'remove', options, 'Removing') -def parallel_start(containers, options): - parallel_operation(containers, 'start', options, 'Starting') - - def parallel_pause(containers, options): parallel_operation(containers, 'pause', options, 'Pausing') diff --git a/compose/project.py b/compose/project.py index a75d71efc..e80b10455 100644 --- a/compose/project.py +++ b/compose/project.py @@ -57,12 +57,13 @@ class Project(object): """ A collection of services. """ - def __init__(self, name, services, client, networks=None, volumes=None): + def __init__(self, name, services, client, networks=None, volumes=None, config_version=None): self.name = name self.services = services self.client = client self.volumes = volumes or ProjectVolumes({}) self.networks = networks or ProjectNetworks({}, False) + self.config_version = config_version def labels(self, one_off=OneOffFilter.exclude): labels = ['{0}={1}'.format(LABEL_PROJECT, self.name)] @@ -82,7 +83,7 @@ class Project(object): networks, use_networking) volumes = ProjectVolumes.from_config(name, config_data, client) - project = cls(name, [], client, project_networks, volumes) + project = cls(name, [], client, project_networks, volumes, config_data.version) for service_dict in config_data.services: service_dict = dict(service_dict) @@ -380,13 +381,17 @@ class Project(object): do_build=BuildAction.none, timeout=None, detached=False, - remove_orphans=False): + remove_orphans=False, + scale_override=None): warn_for_swarm_mode(self.client) self.initialize() self.find_orphan_containers(remove_orphans) + if scale_override is None: + scale_override = {} + services = self.get_services_without_duplicate( service_names, include_deps=start_deps) @@ -399,7 +404,8 @@ class Project(object): return service.execute_convergence_plan( plans[service.name], timeout=timeout, - detached=detached + detached=detached, + scale_override=scale_override.get(service.name) ) def get_deps(service): @@ -589,10 +595,13 @@ def get_secrets(service, service_secrets, secret_defs): continue if secret.uid or secret.gid or secret.mode: - log.warn("Service \"{service}\" uses secret \"{secret}\" with uid, " - "gid, or mode. These fields are not supported by this " - "implementation of the Compose file".format( - service=service, secret=secret.source)) + log.warn( + "Service \"{service}\" uses secret \"{secret}\" with uid, " + "gid, or mode. These fields are not supported by this " + "implementation of the Compose file".format( + service=service, secret=secret.source + ) + ) secrets.append({'secret': secret, 'file': secret_def.get('file')}) diff --git a/compose/service.py b/compose/service.py index 52218872a..8699372ed 100644 --- a/compose/service.py +++ b/compose/service.py @@ -39,7 +39,6 @@ from .errors import HealthCheckFailed from .errors import NoHealthCheckConfigured from .errors import OperationFailedError from .parallel import parallel_execute -from .parallel import parallel_start from .progress_stream import stream_output from .progress_stream import StreamOutputError from .utils import json_hash @@ -149,6 +148,7 @@ class Service(object): network_mode=None, networks=None, secrets=None, + scale=None, **options ): self.name = name @@ -160,6 +160,7 @@ class Service(object): self.network_mode = network_mode or NetworkMode(None) self.networks = networks or {} self.secrets = secrets or [] + self.scale_num = scale or 1 self.options = options def __repr__(self): @@ -190,16 +191,7 @@ class Service(object): self.start_container_if_stopped(c, **options) return containers - def scale(self, desired_num, timeout=None): - """ - Adjusts the number of containers to the specified number and ensures - they are running. - - - creates containers until there are at least `desired_num` - - stops containers until there are at most `desired_num` running - - starts containers until there are at least `desired_num` running - - removes all stopped containers - """ + def show_scale_warnings(self, desired_num): if self.custom_container_name and desired_num > 1: log.warn('The "%s" service is using the custom container name "%s". ' 'Docker requires each container to have a unique name. ' @@ -211,14 +203,18 @@ class Service(object): 'for this service are created on a single host, the port will clash.' % self.name) - def create_and_start(service, number): - container = service.create_container(number=number, quiet=True) - service.start_container(container) - return container + def scale(self, desired_num, timeout=None): + """ + Adjusts the number of containers to the specified number and ensures + they are running. - def stop_and_remove(container): - container.stop(timeout=self.stop_timeout(timeout)) - container.remove() + - creates containers until there are at least `desired_num` + - stops containers until there are at most `desired_num` running + - starts containers until there are at least `desired_num` running + - removes all stopped containers + """ + + self.show_scale_warnings(desired_num) running_containers = self.containers(stopped=False) num_running = len(running_containers) @@ -229,11 +225,10 @@ class Service(object): return if desired_num > num_running: - # we need to start/create until we have desired_num all_containers = self.containers(stopped=True) if num_running != len(all_containers): - # we have some stopped containers, let's start them up again + # we have some stopped containers, check for divergences stopped_containers = [ c for c in all_containers if not c.is_running ] @@ -242,38 +237,14 @@ class Service(object): divergent_containers = [ c for c in stopped_containers if self._containers_have_diverged([c]) ] - stopped_containers = sorted( - set(stopped_containers) - set(divergent_containers), - key=attrgetter('number') - ) for c in divergent_containers: c.remove() - num_stopped = len(stopped_containers) + all_containers = list(set(all_containers) - set(divergent_containers)) - if num_stopped + num_running > desired_num: - num_to_start = desired_num - num_running - containers_to_start = stopped_containers[:num_to_start] - else: - containers_to_start = stopped_containers - - parallel_start(containers_to_start, {}) - - num_running += len(containers_to_start) - - num_to_create = desired_num - num_running - next_number = self._next_container_number() - container_numbers = [ - number for number in range( - next_number, next_number + num_to_create - ) - ] - - parallel_execute( - container_numbers, - lambda n: create_and_start(service=self, number=n), - lambda n: self.get_container_name(n), - "Creating and starting" + sorted_containers = sorted(all_containers, key=attrgetter('number')) + self._execute_convergence_start( + sorted_containers, desired_num, timeout, True, True ) if desired_num < num_running: @@ -283,12 +254,7 @@ class Service(object): running_containers, key=attrgetter('number')) - parallel_execute( - sorted_running_containers[-num_to_stop:], - stop_and_remove, - lambda c: c.name, - "Stopping and removing", - ) + self._downscale(sorted_running_containers[-num_to_stop:], timeout) def create_container(self, one_off=False, @@ -401,51 +367,120 @@ class Service(object): return has_diverged - def execute_convergence_plan(self, - plan, - timeout=None, - detached=False, - start=True): - (action, containers) = plan - should_attach_logs = not detached + def _execute_convergence_create(self, scale, detached, start): + i = self._next_container_number() - if action == 'create': - container = self.create_container() + def create_and_start(service, n): + container = service.create_container(number=n) + if not detached: + container.attach_log_stream() + if start: + self.start_container(container) + return container - if should_attach_logs: - container.attach_log_stream() - - if start: - self.start_container(container) - - return [container] - - elif action == 'recreate': - return [ - self.recreate_container( - container, - timeout=timeout, - attach_logs=should_attach_logs, - start_new_container=start - ) - for container in containers - ] - - elif action == 'start': - if start: - for container in containers: - self.start_container_if_stopped(container, attach_logs=should_attach_logs) + containers, errors = parallel_execute( + range(i, i + scale), + lambda n: create_and_start(self, n), + lambda n: self.get_container_name(n), + "Creating" + ) + for error in errors.values(): + raise OperationFailedError(error) return containers - elif action == 'noop': + def _execute_convergence_recreate(self, containers, scale, timeout, detached, start): + if len(containers) > scale: + self._downscale(containers[scale:], timeout) + containers = containers[:scale] + + def recreate(container): + return self.recreate_container( + container, timeout=timeout, attach_logs=not detached, + start_new_container=start + ) + containers, errors = parallel_execute( + containers, + recreate, + lambda c: c.name, + "Recreating" + ) + for error in errors.values(): + raise OperationFailedError(error) + + if len(containers) < scale: + containers.extend(self._execute_convergence_create( + scale - len(containers), detached, start + )) + return containers + + def _execute_convergence_start(self, containers, scale, timeout, detached, start): + if len(containers) > scale: + self._downscale(containers[scale:], timeout) + containers = containers[:scale] + if start: + _, errors = parallel_execute( + containers, + lambda c: self.start_container_if_stopped(c, attach_logs=not detached), + lambda c: c.name, + "Starting" + ) + + for error in errors.values(): + raise OperationFailedError(error) + + if len(containers) < scale: + containers.extend(self._execute_convergence_create( + scale - len(containers), detached, start + )) + return containers + + def _downscale(self, containers, timeout=None): + def stop_and_remove(container): + container.stop(timeout=self.stop_timeout(timeout)) + container.remove() + + parallel_execute( + containers, + stop_and_remove, + lambda c: c.name, + "Stopping and removing", + ) + + def execute_convergence_plan(self, plan, timeout=None, detached=False, + start=True, scale_override=None): + (action, containers) = plan + scale = scale_override if scale_override is not None else self.scale_num + containers = sorted(containers, key=attrgetter('number')) + + self.show_scale_warnings(scale) + + if action == 'create': + return self._execute_convergence_create( + scale, detached, start + ) + + if action == 'recreate': + return self._execute_convergence_recreate( + containers, scale, timeout, detached, start + ) + + if action == 'start': + return self._execute_convergence_start( + containers, scale, timeout, detached, start + ) + + if action == 'noop': + if scale != len(containers): + return self._execute_convergence_start( + containers, scale, timeout, detached, start + ) for c in containers: log.info("%s is up-to-date" % c.name) return containers - else: - raise Exception("Invalid action: {}".format(action)) + raise Exception("Invalid action: {}".format(action)) def recreate_container( self, diff --git a/tests/acceptance/cli_test.py b/tests/acceptance/cli_test.py index bfc963402..75b15ae65 100644 --- a/tests/acceptance/cli_test.py +++ b/tests/acceptance/cli_test.py @@ -151,7 +151,7 @@ class CLITestCase(DockerClientTestCase): def test_help(self): self.base_dir = 'tests/fixtures/no-composefile' result = self.dispatch(['help', 'up'], returncode=0) - assert 'Usage: up [options] [SERVICE...]' in result.stdout + assert 'Usage: up [options] [--scale SERVICE=NUM...] [SERVICE...]' in result.stdout # Prevent tearDown from trying to create a project self.base_dir = None @@ -1866,6 +1866,59 @@ class CLITestCase(DockerClientTestCase): self.assertEqual(len(project.get_service('simple').containers()), 0) self.assertEqual(len(project.get_service('another').containers()), 0) + def test_scale_v2_2(self): + self.base_dir = 'tests/fixtures/scale' + result = self.dispatch(['scale', 'web=1'], returncode=1) + assert 'incompatible with the v2.2 format' in result.stderr + + def test_up_scale_scale_up(self): + self.base_dir = 'tests/fixtures/scale' + project = self.project + + self.dispatch(['up', '-d']) + assert len(project.get_service('web').containers()) == 2 + assert len(project.get_service('db').containers()) == 1 + + self.dispatch(['up', '-d', '--scale', 'web=3']) + assert len(project.get_service('web').containers()) == 3 + assert len(project.get_service('db').containers()) == 1 + + def test_up_scale_scale_down(self): + self.base_dir = 'tests/fixtures/scale' + project = self.project + + self.dispatch(['up', '-d']) + assert len(project.get_service('web').containers()) == 2 + assert len(project.get_service('db').containers()) == 1 + + self.dispatch(['up', '-d', '--scale', 'web=1']) + assert len(project.get_service('web').containers()) == 1 + assert len(project.get_service('db').containers()) == 1 + + def test_up_scale_reset(self): + self.base_dir = 'tests/fixtures/scale' + project = self.project + + self.dispatch(['up', '-d', '--scale', 'web=3', '--scale', 'db=3']) + assert len(project.get_service('web').containers()) == 3 + assert len(project.get_service('db').containers()) == 3 + + self.dispatch(['up', '-d']) + assert len(project.get_service('web').containers()) == 2 + assert len(project.get_service('db').containers()) == 1 + + def test_up_scale_to_zero(self): + self.base_dir = 'tests/fixtures/scale' + project = self.project + + self.dispatch(['up', '-d']) + assert len(project.get_service('web').containers()) == 2 + assert len(project.get_service('db').containers()) == 1 + + self.dispatch(['up', '-d', '--scale', 'web=0', '--scale', 'db=0']) + assert len(project.get_service('web').containers()) == 0 + assert len(project.get_service('db').containers()) == 0 + def test_port(self): self.base_dir = 'tests/fixtures/ports-composefile' self.dispatch(['up', '-d'], None) diff --git a/tests/fixtures/scale/docker-compose.yml b/tests/fixtures/scale/docker-compose.yml new file mode 100644 index 000000000..a0d3b771f --- /dev/null +++ b/tests/fixtures/scale/docker-compose.yml @@ -0,0 +1,9 @@ +version: '2.2' +services: + web: + image: busybox + command: top + scale: 2 + db: + image: busybox + command: top diff --git a/tests/integration/project_test.py b/tests/integration/project_test.py index 2a29b1b6d..69f06b75c 100644 --- a/tests/integration/project_test.py +++ b/tests/integration/project_test.py @@ -19,6 +19,7 @@ from compose.config.types import VolumeFromSpec from compose.config.types import VolumeSpec from compose.const import COMPOSEFILE_V2_0 as V2_0 from compose.const import COMPOSEFILE_V2_1 as V2_1 +from compose.const import COMPOSEFILE_V2_2 as V2_2 from compose.const import COMPOSEFILE_V3_1 as V3_1 from compose.const import LABEL_PROJECT from compose.const import LABEL_SERVICE @@ -582,12 +583,12 @@ class ProjectTest(DockerClientTestCase): self.assertEqual(len(service.containers()), 3) project.up() service = project.get_service('web') - self.assertEqual(len(service.containers()), 3) + self.assertEqual(len(service.containers()), 1) service.scale(1) self.assertEqual(len(service.containers()), 1) - project.up() + project.up(scale_override={'web': 3}) service = project.get_service('web') - self.assertEqual(len(service.containers()), 1) + self.assertEqual(len(service.containers()), 3) # does scale=0 ,makes any sense? after recreating at least 1 container is running service.scale(0) project.up() @@ -1155,6 +1156,33 @@ class ProjectTest(DockerClientTestCase): containers = project.containers() self.assertEqual(len(containers), 1) + def test_project_up_config_scale(self): + config_data = build_config( + version=V2_2, + services=[{ + 'name': 'web', + 'image': 'busybox:latest', + 'command': 'top', + 'scale': 3 + }] + ) + + project = Project.from_config( + name='composetest', config_data=config_data, client=self.client + ) + project.up() + assert len(project.containers()) == 3 + + project.up(scale_override={'web': 2}) + assert len(project.containers()) == 2 + + project.up(scale_override={'web': 4}) + assert len(project.containers()) == 4 + + project.stop() + project.up() + assert len(project.containers()) == 3 + @v2_only() def test_initialize_volumes(self): vol_name = '{0:x}'.format(random.getrandbits(32)) diff --git a/tests/integration/service_test.py b/tests/integration/service_test.py index 636071755..87549c506 100644 --- a/tests/integration/service_test.py +++ b/tests/integration/service_test.py @@ -26,6 +26,7 @@ from compose.const import LABEL_PROJECT from compose.const import LABEL_SERVICE from compose.const import LABEL_VERSION from compose.container import Container +from compose.errors import OperationFailedError from compose.project import OneOffFilter from compose.service import ConvergencePlan from compose.service import ConvergenceStrategy @@ -777,15 +778,15 @@ class ServiceTest(DockerClientTestCase): message="testing", response={}, explanation="Boom")): - with mock.patch('sys.stderr', new_callable=StringIO) as mock_stderr: - service.scale(3) + with pytest.raises(OperationFailedError): + service.scale(3) - self.assertEqual(len(service.containers()), 1) - self.assertTrue(service.containers()[0].is_running) - self.assertIn( - "ERROR: for composetest_web_2 Cannot create container for service web: Boom", - mock_stderr.getvalue() + assert len(service.containers()) == 1 + assert service.containers()[0].is_running + assert ( + "ERROR: for composetest_web_2 Cannot create container for service" + " web: Boom" in mock_stderr.getvalue() ) def test_scale_with_unexpected_exception(self): @@ -837,7 +838,8 @@ class ServiceTest(DockerClientTestCase): service = self.create_service('app', container_name='custom-container') self.assertEqual(service.custom_container_name, 'custom-container') - service.scale(3) + with pytest.raises(OperationFailedError): + service.scale(3) captured_output = mock_log.warn.call_args[0][0]