mirror of
				https://github.com/docker/compose.git
				synced 2025-10-31 11:14:02 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			168 lines
		
	
	
		
			5.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			168 lines
		
	
	
		
			5.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import functools
 | |
| import os
 | |
| 
 | |
| import pytest
 | |
| from docker.errors import APIError
 | |
| from docker.utils import version_lt
 | |
| 
 | |
| from .. import unittest
 | |
| from ..helpers import BUSYBOX_IMAGE_WITH_TAG
 | |
| from compose.cli.docker_client import docker_client
 | |
| from compose.config.config import resolve_environment
 | |
| from compose.config.environment import Environment
 | |
| from compose.const import API_VERSIONS
 | |
| from compose.const import COMPOSE_SPEC as VERSION
 | |
| from compose.const import COMPOSEFILE_V1 as V1
 | |
| from compose.const import LABEL_PROJECT
 | |
| from compose.progress_stream import stream_output
 | |
| from compose.service import Service
 | |
| 
 | |
| SWARM_SKIP_CONTAINERS_ALL = os.environ.get('SWARM_SKIP_CONTAINERS_ALL', '0') != '0'
 | |
| SWARM_SKIP_CPU_SHARES = os.environ.get('SWARM_SKIP_CPU_SHARES', '0') != '0'
 | |
| SWARM_SKIP_RM_VOLUMES = os.environ.get('SWARM_SKIP_RM_VOLUMES', '0') != '0'
 | |
| SWARM_ASSUME_MULTINODE = os.environ.get('SWARM_ASSUME_MULTINODE', '0') != '0'
 | |
| 
 | |
| 
 | |
| def pull_busybox(client):
 | |
|     client.pull(BUSYBOX_IMAGE_WITH_TAG, stream=False)
 | |
| 
 | |
| 
 | |
| def get_links(container):
 | |
|     links = container.get('HostConfig.Links') or []
 | |
| 
 | |
|     def format_link(link):
 | |
|         _, alias = link.split(':')
 | |
|         return alias.split('/')[-1]
 | |
| 
 | |
|     return [format_link(link) for link in links]
 | |
| 
 | |
| 
 | |
| def engine_max_version():
 | |
|     if 'DOCKER_VERSION' not in os.environ:
 | |
|         return VERSION
 | |
|     version = os.environ['DOCKER_VERSION'].partition('-')[0]
 | |
|     if version_lt(version, '1.10'):
 | |
|         return V1
 | |
|     return VERSION
 | |
| 
 | |
| 
 | |
| def min_version_skip(version):
 | |
|     return pytest.mark.skipif(
 | |
|         engine_max_version() < version,
 | |
|         reason="Engine version %s is too low" % version
 | |
|     )
 | |
| 
 | |
| 
 | |
| class DockerClientTestCase(unittest.TestCase):
 | |
|     @classmethod
 | |
|     def setUpClass(cls):
 | |
|         version = API_VERSIONS[engine_max_version()]
 | |
|         cls.client = docker_client(Environment(), version)
 | |
| 
 | |
|     @classmethod
 | |
|     def tearDownClass(cls):
 | |
|         del cls.client
 | |
| 
 | |
|     def tearDown(self):
 | |
|         for c in self.client.containers(
 | |
|                 all=True,
 | |
|                 filters={'label': '%s=composetest' % LABEL_PROJECT}):
 | |
|             self.client.remove_container(c['Id'], force=True)
 | |
| 
 | |
|         for i in self.client.images(
 | |
|                 filters={'label': 'com.docker.compose.test_image'}):
 | |
|             try:
 | |
|                 self.client.remove_image(i, force=True)
 | |
|             except APIError as e:
 | |
|                 if e.is_server_error():
 | |
|                     pass
 | |
| 
 | |
|         volumes = self.client.volumes().get('Volumes') or []
 | |
|         for v in volumes:
 | |
|             if 'composetest_' in v['Name']:
 | |
|                 self.client.remove_volume(v['Name'])
 | |
| 
 | |
|         networks = self.client.networks()
 | |
|         for n in networks:
 | |
|             if 'composetest_' in n['Name']:
 | |
|                 self.client.remove_network(n['Name'])
 | |
| 
 | |
|     def create_service(self, name, **kwargs):
 | |
|         if 'image' not in kwargs and 'build' not in kwargs:
 | |
|             kwargs['image'] = BUSYBOX_IMAGE_WITH_TAG
 | |
| 
 | |
|         if 'command' not in kwargs:
 | |
|             kwargs['command'] = ["top"]
 | |
| 
 | |
|         kwargs['environment'] = resolve_environment(
 | |
|             kwargs, Environment.from_env_file(None)
 | |
|         )
 | |
|         labels = dict(kwargs.setdefault('labels', {}))
 | |
|         labels['com.docker.compose.test-name'] = self.id()
 | |
| 
 | |
|         return Service(name, client=self.client, project='composetest', **kwargs)
 | |
| 
 | |
|     def check_build(self, *args, **kwargs):
 | |
|         kwargs.setdefault('rm', True)
 | |
|         build_output = self.client.build(*args, **kwargs)
 | |
|         with open(os.devnull, 'w') as devnull:
 | |
|             for event in stream_output(build_output, devnull):
 | |
|                 pass
 | |
| 
 | |
|     def require_api_version(self, minimum):
 | |
|         api_version = self.client.version()['ApiVersion']
 | |
|         if version_lt(api_version, minimum):
 | |
|             pytest.skip("API version is too low ({} < {})".format(api_version, minimum))
 | |
| 
 | |
|     def get_volume_data(self, volume_name):
 | |
|         if not is_cluster(self.client):
 | |
|             return self.client.inspect_volume(volume_name)
 | |
| 
 | |
|         volumes = self.client.volumes(filters={'name': volume_name})['Volumes']
 | |
|         assert len(volumes) > 0
 | |
|         return self.client.inspect_volume(volumes[0]['Name'])
 | |
| 
 | |
| 
 | |
| def if_runtime_available(runtime):
 | |
|     def decorator(f):
 | |
|         @functools.wraps(f)
 | |
|         def wrapper(self, *args, **kwargs):
 | |
|             if runtime not in self.client.info().get('Runtimes', {}):
 | |
|                 return pytest.skip("This daemon does not support the '{}'' runtime".format(runtime))
 | |
|             return f(self, *args, **kwargs)
 | |
|         return wrapper
 | |
| 
 | |
|     return decorator
 | |
| 
 | |
| 
 | |
| def is_cluster(client):
 | |
|     if SWARM_ASSUME_MULTINODE:
 | |
|         return True
 | |
| 
 | |
|     def get_nodes_number():
 | |
|         try:
 | |
|             return len(client.nodes())
 | |
|         except APIError:
 | |
|             # If the Engine is not part of a Swarm, the SDK will raise
 | |
|             # an APIError
 | |
|             return 0
 | |
| 
 | |
|     if not hasattr(is_cluster, 'nodes') or is_cluster.nodes is None:
 | |
|         # Only make the API call if the value hasn't been cached yet
 | |
|         is_cluster.nodes = get_nodes_number()
 | |
| 
 | |
|     return is_cluster.nodes > 1
 | |
| 
 | |
| 
 | |
| def no_cluster(reason):
 | |
|     def decorator(f):
 | |
|         @functools.wraps(f)
 | |
|         def wrapper(self, *args, **kwargs):
 | |
|             if is_cluster(self.client):
 | |
|                 pytest.skip("Test will not be run in cluster mode: %s" % reason)
 | |
|                 return
 | |
|             return f(self, *args, **kwargs)
 | |
|         return wrapper
 | |
| 
 | |
|     return decorator
 |