compose/compose/network.py

335 lines
12 KiB
Python

from __future__ import absolute_import
from __future__ import unicode_literals
import logging
import re
from collections import OrderedDict
from docker.errors import NotFound
from docker.types import IPAMConfig
from docker.types import IPAMPool
from docker.utils import version_gte
from docker.utils import version_lt
from . import __version__
from .config import ConfigurationError
from .const import LABEL_NETWORK
from .const import LABEL_PROJECT
from .const import LABEL_VERSION
log = logging.getLogger(__name__)
OPTS_EXCEPTIONS = [
'com.docker.network.driver.overlay.vxlanid_list',
'com.docker.network.windowsshim.hnsid',
'com.docker.network.windowsshim.networkname'
]
class Network(object):
def __init__(self, client, project, name, driver=None, driver_opts=None,
ipam=None, external=False, internal=False, enable_ipv6=False,
labels=None, custom_name=False):
self.client = client
self.project = project
self.name = name
self.driver = driver
self.driver_opts = driver_opts
self.ipam = create_ipam_config_from_dict(ipam)
self.external = external
self.internal = internal
self.enable_ipv6 = enable_ipv6
self.labels = labels
self.custom_name = custom_name
self.legacy = None
def ensure(self):
if self.external:
if self.driver == 'overlay':
# Swarm nodes do not register overlay networks that were
# created on a different node unless they're in use.
# See docker/compose#4399
return
try:
self.inspect()
log.debug(
'Network {0} declared as external. No new '
'network will be created.'.format(self.name)
)
except NotFound:
raise ConfigurationError(
'Network {name} declared as external, but could'
' not be found. Please create the network manually'
' using `{command} {name}` and try again.'.format(
name=self.full_name,
command='docker network create'
)
)
return
self._set_legacy_flag()
try:
data = self.inspect(legacy=self.legacy)
check_remote_network_config(data, self)
except NotFound:
driver_name = 'the default driver'
if self.driver:
driver_name = 'driver "{}"'.format(self.driver)
log.info(
'Creating network "{}" with {}'.format(self.full_name, driver_name)
)
self.client.create_network(
name=self.full_name,
driver=self.driver,
options=self.driver_opts,
ipam=self.ipam,
internal=self.internal,
enable_ipv6=self.enable_ipv6,
labels=self._labels,
attachable=version_gte(self.client._version, '1.24') or None,
check_duplicate=True,
)
def remove(self):
if self.external:
log.info("Network %s is external, skipping", self.true_name)
return
log.info("Removing network {}".format(self.true_name))
self.client.remove_network(self.true_name)
def inspect(self, legacy=False):
if legacy:
return self.client.inspect_network(self.legacy_full_name)
return self.client.inspect_network(self.full_name)
@property
def legacy_full_name(self):
if self.custom_name:
return self.name
return '{0}_{1}'.format(
re.sub(r'[_-]', '', self.project), self.name
)
@property
def full_name(self):
if self.custom_name:
return self.name
return '{0}_{1}'.format(self.project, self.name)
@property
def true_name(self):
self._set_legacy_flag()
if self.legacy:
return self.legacy_full_name
return self.full_name
@property
def _labels(self):
if version_lt(self.client._version, '1.23'):
return None
labels = self.labels.copy() if self.labels else {}
labels.update({
LABEL_PROJECT: self.project,
LABEL_NETWORK: self.name,
LABEL_VERSION: __version__,
})
return labels
def _set_legacy_flag(self):
if self.legacy is not None:
return
try:
data = self.inspect(legacy=True)
self.legacy = data is not None
except NotFound:
self.legacy = False
def create_ipam_config_from_dict(ipam_dict):
if not ipam_dict:
return None
return IPAMConfig(
driver=ipam_dict.get('driver') or 'default',
pool_configs=[
IPAMPool(
subnet=config.get('subnet'),
iprange=config.get('ip_range'),
gateway=config.get('gateway'),
aux_addresses=config.get('aux_addresses'),
)
for config in ipam_dict.get('config', [])
],
options=ipam_dict.get('options')
)
class NetworkConfigChangedError(ConfigurationError):
def __init__(self, net_name, property_name):
super(NetworkConfigChangedError, self).__init__(
'Network "{}" needs to be recreated - {} has changed'.format(
net_name, property_name
)
)
def check_remote_ipam_config(remote, local):
remote_ipam = remote.get('IPAM')
ipam_dict = create_ipam_config_from_dict(local.ipam)
if local.ipam.get('driver') and local.ipam.get('driver') != remote_ipam.get('Driver'):
raise NetworkConfigChangedError(local.true_name, 'IPAM driver')
if len(ipam_dict['Config']) != 0:
if len(ipam_dict['Config']) != len(remote_ipam['Config']):
raise NetworkConfigChangedError(local.true_name, 'IPAM configs')
remote_configs = sorted(remote_ipam['Config'], key='Subnet')
local_configs = sorted(ipam_dict['Config'], key='Subnet')
while local_configs:
lc = local_configs.pop()
rc = remote_configs.pop()
if lc.get('Subnet') != rc.get('Subnet'):
raise NetworkConfigChangedError(local.true_name, 'IPAM config subnet')
if lc.get('Gateway') is not None and lc.get('Gateway') != rc.get('Gateway'):
raise NetworkConfigChangedError(local.true_name, 'IPAM config gateway')
if lc.get('IPRange') != rc.get('IPRange'):
raise NetworkConfigChangedError(local.true_name, 'IPAM config ip_range')
if sorted(lc.get('AuxiliaryAddresses')) != sorted(rc.get('AuxiliaryAddresses')):
raise NetworkConfigChangedError(local.true_name, 'IPAM config aux_addresses')
remote_opts = remote_ipam.get('Options') or {}
local_opts = local.ipam.get('Options') or {}
for k in set.union(set(remote_opts.keys()), set(local_opts.keys())):
if remote_opts.get(k) != local_opts.get(k):
raise NetworkConfigChangedError(local.true_name, 'IPAM option "{}"'.format(k))
def check_remote_network_config(remote, local):
if local.driver and remote.get('Driver') != local.driver:
raise NetworkConfigChangedError(local.true_name, 'driver')
local_opts = local.driver_opts or {}
remote_opts = remote.get('Options') or {}
for k in set.union(set(remote_opts.keys()), set(local_opts.keys())):
if k in OPTS_EXCEPTIONS:
continue
if remote_opts.get(k) != local_opts.get(k):
raise NetworkConfigChangedError(local.true_name, 'option "{}"'.format(k))
if local.ipam is not None:
check_remote_ipam_config(remote, local)
if local.internal is not None and local.internal != remote.get('Internal', False):
raise NetworkConfigChangedError(local.true_name, 'internal')
if local.enable_ipv6 is not None and local.enable_ipv6 != remote.get('EnableIPv6', False):
raise NetworkConfigChangedError(local.true_name, 'enable_ipv6')
local_labels = local.labels or {}
remote_labels = remote.get('Labels', {})
for k in set.union(set(remote_labels.keys()), set(local_labels.keys())):
if k.startswith('com.docker.'): # We are only interested in user-specified labels
continue
if remote_labels.get(k) != local_labels.get(k):
log.warn(
'Network {}: label "{}" has changed. It may need to be'
' recreated.'.format(local.true_name, k)
)
def build_networks(name, config_data, client):
network_config = config_data.networks or {}
networks = {
network_name: Network(
client=client, project=name,
name=data.get('name', network_name),
driver=data.get('driver'),
driver_opts=data.get('driver_opts'),
ipam=data.get('ipam'),
external=bool(data.get('external', False)),
internal=data.get('internal'),
enable_ipv6=data.get('enable_ipv6'),
labels=data.get('labels'),
custom_name=data.get('name') is not None,
)
for network_name, data in network_config.items()
}
if 'default' not in networks:
networks['default'] = Network(client, name, 'default')
return networks
class ProjectNetworks(object):
def __init__(self, networks, use_networking):
self.networks = networks or {}
self.use_networking = use_networking
@classmethod
def from_services(cls, services, networks, use_networking):
service_networks = {
network: networks.get(network)
for service in services
for network in get_network_names_for_service(service)
}
unused = set(networks) - set(service_networks) - {'default'}
if unused:
log.warn(
"Some networks were defined but are not used by any service: "
"{}".format(", ".join(unused)))
return cls(service_networks, use_networking)
def remove(self):
if not self.use_networking:
return
for network in self.networks.values():
try:
network.remove()
except NotFound:
log.warn("Network %s not found.", network.true_name)
def initialize(self):
if not self.use_networking:
return
for network in self.networks.values():
network.ensure()
def get_network_defs_for_service(service_dict):
if 'network_mode' in service_dict:
return {}
networks = service_dict.get('networks', {'default': None})
return dict(
(net, (config or {}))
for net, config in networks.items()
)
def get_network_names_for_service(service_dict):
return get_network_defs_for_service(service_dict).keys()
def get_networks(service_dict, network_definitions):
networks = {}
for name, netdef in get_network_defs_for_service(service_dict).items():
network = network_definitions.get(name)
if network:
networks[network.true_name] = netdef
else:
raise ConfigurationError(
'Service "{}" uses an undefined network "{}"'
.format(service_dict['name'], name))
if any([v.get('priority') for v in networks.values()]):
return OrderedDict(sorted(
networks.items(),
key=lambda t: t[1].get('priority') or 0, reverse=True
))
else:
# Ensure Compose will pick a consistent primary network if no
# priority is set
return OrderedDict(sorted(networks.items(), key=lambda t: t[0]))