From cf782a3dbbe82ccabce8cddfd89ae6b00d6b50ac Mon Sep 17 00:00:00 2001 From: Drew Romanyk Date: Thu, 9 Nov 2017 17:53:27 -0600 Subject: [PATCH] Implement subnet config validation (fixes #4552) Signed-off-by: Drew Romanyk --- compose/config/config_schema_v3.5.json | 2 +- compose/config/validation.py | 30 +++++++++- tests/unit/config/config_test.py | 82 ++++++++++++++++++++++++++ 3 files changed, 112 insertions(+), 2 deletions(-) diff --git a/compose/config/config_schema_v3.5.json b/compose/config/config_schema_v3.5.json index 5400cd99f..c3ac559ee 100644 --- a/compose/config/config_schema_v3.5.json +++ b/compose/config/config_schema_v3.5.json @@ -419,7 +419,7 @@ "items": { "type": "object", "properties": { - "subnet": {"type": "string"} + "subnet": {"type": "string", "format": "subnet_ip_address"} }, "additionalProperties": false } diff --git a/compose/config/validation.py b/compose/config/validation.py index 8247cf150..a8061a5a4 100644 --- a/compose/config/validation.py +++ b/compose/config/validation.py @@ -5,6 +5,7 @@ import json import logging import os import re +import socket import sys import six @@ -43,6 +44,9 @@ DOCKER_CONFIG_HINTS = { VALID_NAME_CHARS = '[a-zA-Z0-9\._\-]' VALID_EXPOSE_FORMAT = r'^\d+(\-\d+)?(\/[a-zA-Z]+)?$' +VALID_IPV4_FORMAT = r'^(\d{1,3}.){3}\d{1,3}$' +VALID_IPV4_CIDR_FORMAT = r'^(\d|[1-2]\d|3[0-2])$' +VALID_IPV6_CIDR_FORMAT = r'^(\d|[1-9]\d|1[0-1]\d|12[0-8])$' @FormatChecker.cls_checks(format="ports", raises=ValidationError) @@ -64,6 +68,30 @@ def format_expose(instance): return True +@FormatChecker.cls_checks("subnet_ip_address", raises=ValidationError) +def format_subnet_ip_address(instance): + if isinstance(instance, six.string_types): + if '/' not in instance: + raise ValidationError("should be of the format 'IP_ADDRESS/CIDR'") + + ip_address, cidr = instance.split('/') + + if re.match(VALID_IPV4_FORMAT, ip_address): + if not (re.match(VALID_IPV4_CIDR_FORMAT, cidr) and + all(0 <= int(component) <= 255 for component in ip_address.split("."))): + raise ValidationError("should be of the format 'IP_ADDRESS/CIDR'") + elif re.match(VALID_IPV6_CIDR_FORMAT, cidr) and hasattr(socket, "inet_pton"): + try: + if not (socket.inet_pton(socket.AF_INET6, ip_address)): + raise ValidationError("should be of the format 'IP_ADDRESS/CIDR'") + except socket.error as e: + raise ValidationError(six.text_type(e)) + else: + raise ValidationError("should be of the format 'IP_ADDRESS/CIDR'") + + return True + + def match_named_volumes(service_dict, project_volumes): service_volumes = service_dict.get('volumes', []) for volume_spec in service_volumes: @@ -391,7 +419,7 @@ def process_config_schema_errors(error): def validate_against_config_schema(config_file): schema = load_jsonschema(config_file) - format_checker = FormatChecker(["ports", "expose"]) + format_checker = FormatChecker(["ports", "expose", "subnet_ip_address"]) validator = Draft4Validator( schema, resolver=RefResolver(get_resolver_path(), schema), diff --git a/tests/unit/config/config_test.py b/tests/unit/config/config_test.py index a758154c0..819d8f5be 100644 --- a/tests/unit/config/config_test.py +++ b/tests/unit/config/config_test.py @@ -2846,6 +2846,88 @@ class PortsTest(unittest.TestCase): ) +class SubnetTest(unittest.TestCase): + INVALID_SUBNET_TYPES = [ + None, + False, + 10, + ] + + INVALID_SUBNET_MAPPINGS = [ + "", + "192.168.0.1/sdfsdfs", + "192.168.0.1/", + "192.168.0.1/33", + "192.168.0.1/01", + "192.168.0.1", + "fe80:0000:0000:0000:0204:61ff:fe9d:f156/sdfsdfs", + "fe80:0000:0000:0000:0204:61ff:fe9d:f156/", + "fe80:0000:0000:0000:0204:61ff:fe9d:f156/129", + "fe80:0000:0000:0000:0204:61ff:fe9d:f156/01", + "fe80:0000:0000:0000:0204:61ff:fe9d:f156", + ] + + ILLEGAL_SUBNET_MAPPINGS = [ + "ge80:0000:0000:0000:0204:61ff:fe9d:f156/128" + ] + + VALID_SUBNET_MAPPINGS = [ + "192.168.0.1/0", + "192.168.0.1/32", + "fe80:0000:0000:0000:0204:61ff:fe9d:f156/0", + "fe80:0000:0000:0000:0204:61ff:fe9d:f156/128", + ] + + def test_config_invalid_subnet_type_validation(self): + for invalid_subnet in self.INVALID_SUBNET_TYPES: + with pytest.raises(ConfigurationError) as exc: + self.check_config(invalid_subnet) + + assert "contains an invalid type" in exc.value.msg + + def test_config_invalid_subnet_format_validation(self): + for invalid_subnet in self.INVALID_SUBNET_MAPPINGS: + with pytest.raises(ConfigurationError) as exc: + self.check_config(invalid_subnet) + + assert "should be of the format 'IP_ADDRESS/CIDR'" in exc.value.msg + + def test_config_illegal_subnet_type_validation(self): + for invalid_subnet in self.ILLEGAL_SUBNET_MAPPINGS: + with pytest.raises(ConfigurationError) as exc: + self.check_config(invalid_subnet) + + assert "illegal IP address string" in exc.value.msg + + def test_config_valid_subnet_format_validation(self): + for valid_subnet in self.VALID_SUBNET_MAPPINGS: + self.check_config(valid_subnet) + + def check_config(self, subnet): + config.load( + build_config_details({ + 'version': '3.5', + 'services': { + 'web': { + 'image': 'busybox' + } + }, + 'networks': { + 'default': { + 'ipam': { + 'config': [ + { + 'subnet': subnet + } + ], + 'driver': 'default' + } + } + } + }) + ) + + class InterpolationTest(unittest.TestCase): @mock.patch.dict(os.environ)