Add some unit tests for new bundle and push commands.

Signed-off-by: Daniel Nephin <dnephin@docker.com>
This commit is contained in:
Daniel Nephin 2016-06-14 16:55:48 -04:00 committed by Aanand Prasad
parent 33cc601176
commit db02c9f537
3 changed files with 270 additions and 20 deletions

View File

@ -57,17 +57,7 @@ class MissingDigests(Exception):
def serialize_bundle(config, image_digests):
if config.networks:
log.warn("Unsupported top level key 'networks' - ignoring")
if config.volumes:
log.warn("Unsupported top level key 'volumes' - ignoring")
return json.dumps(
to_bundle(config, image_digests),
indent=2,
sort_keys=True,
)
return json.dumps(to_bundle(config, image_digests), indent=2, sort_keys=True)
def get_image_digests(project, allow_fetch=False):
@ -99,7 +89,7 @@ def get_image_digest(service, allow_fetch=False):
"required to generate a proper image digest for the bundle. Specify "
"an image repo and tag with the 'image' option.".format(s=service))
separator = parse_repository_tag(service.options['image'])[2]
_, _, separator = parse_repository_tag(service.options['image'])
# Compose file already uses a digest, no lookup required
if separator == '@':
return service.options['image']
@ -143,24 +133,32 @@ def fetch_image_digest(service):
if not digest:
raise ValueError("Failed to get digest for %s" % service.name)
repo = parse_repository_tag(service.options['image'])[0]
repo, _, _ = parse_repository_tag(service.options['image'])
identifier = '{repo}@{digest}'.format(repo=repo, digest=digest)
# Pull by digest so that image['RepoDigests'] is populated for next time
# and we don't have to pull/push again
service.client.pull(identifier)
log.info("Stored digest for {}".format(service.image_name))
# only do this is RepoTags isn't already populated
image = service.image()
if not image['RepoDigests']:
# Pull by digest so that image['RepoDigests'] is populated for next time
# and we don't have to pull/push again
service.client.pull(identifier)
log.info("Stored digest for {}".format(service.image_name))
return identifier
def to_bundle(config, image_digests):
if config.networks:
log.warn("Unsupported top level key 'networks' - ignoring")
if config.volumes:
log.warn("Unsupported top level key 'volumes' - ignoring")
config = denormalize_config(config)
return {
'version': VERSION,
'services': {
'Version': VERSION,
'Services': {
name: convert_service_to_bundle(
name,
service_dict,

232
tests/unit/bundle_test.py Normal file
View File

@ -0,0 +1,232 @@
from __future__ import absolute_import
from __future__ import unicode_literals
import docker
import mock
import pytest
from compose import bundle
from compose import service
from compose.cli.errors import UserError
from compose.config.config import Config
@pytest.fixture
def mock_service():
return mock.create_autospec(
service.Service,
client=mock.create_autospec(docker.Client),
options={})
def test_get_image_digest_exists(mock_service):
mock_service.options['image'] = 'abcd'
mock_service.image.return_value = {'RepoDigests': ['digest1']}
digest = bundle.get_image_digest(mock_service)
assert digest == 'digest1'
def test_get_image_digest_image_uses_digest(mock_service):
mock_service.options['image'] = image_id = 'redis@sha256:digest'
digest = bundle.get_image_digest(mock_service)
assert digest == image_id
assert not mock_service.image.called
def test_get_image_digest_no_image(mock_service):
with pytest.raises(UserError) as exc:
bundle.get_image_digest(service.Service(name='theservice'))
assert "doesn't define an image tag" in exc.exconly()
def test_fetch_image_digest_for_image_with_saved_digest(mock_service):
mock_service.options['image'] = image_id = 'abcd'
mock_service.pull.return_value = expected = 'sha256:thedigest'
mock_service.image.return_value = {'RepoDigests': ['digest1']}
digest = bundle.fetch_image_digest(mock_service)
assert digest == image_id + '@' + expected
mock_service.pull.assert_called_once_with()
assert not mock_service.push.called
assert not mock_service.client.pull.called
def test_fetch_image_digest_for_image(mock_service):
mock_service.options['image'] = image_id = 'abcd'
mock_service.pull.return_value = expected = 'sha256:thedigest'
mock_service.image.return_value = {'RepoDigests': []}
digest = bundle.fetch_image_digest(mock_service)
assert digest == image_id + '@' + expected
mock_service.pull.assert_called_once_with()
assert not mock_service.push.called
mock_service.client.pull.assert_called_once_with(digest)
def test_fetch_image_digest_for_build(mock_service):
mock_service.options['build'] = '.'
mock_service.options['image'] = image_id = 'abcd'
mock_service.push.return_value = expected = 'sha256:thedigest'
mock_service.image.return_value = {'RepoDigests': ['digest1']}
digest = bundle.fetch_image_digest(mock_service)
assert digest == image_id + '@' + expected
mock_service.push.assert_called_once_with()
assert not mock_service.pull.called
assert not mock_service.client.pull.called
def test_to_bundle():
image_digests = {'a': 'aaaa', 'b': 'bbbb'}
services = [
{'name': 'a', 'build': '.', },
{'name': 'b', 'build': './b'},
]
config = Config(
version=2,
services=services,
volumes={'special': {}},
networks={'extra': {}})
with mock.patch('compose.bundle.log.warn', autospec=True) as mock_log:
output = bundle.to_bundle(config, image_digests)
assert mock_log.mock_calls == [
mock.call("Unsupported top level key 'networks' - ignoring"),
mock.call("Unsupported top level key 'volumes' - ignoring"),
]
assert output == {
'Version': '0.1',
'Services': {
'a': {'Image': 'aaaa', 'Networks': ['default']},
'b': {'Image': 'bbbb', 'Networks': ['default']},
}
}
def test_convert_service_to_bundle():
name = 'theservice'
image_digest = 'thedigest'
service_dict = {
'ports': ['80'],
'expose': ['1234'],
'networks': {'extra': {}},
'command': 'foo',
'entrypoint': 'entry',
'environment': {'BAZ': 'ENV'},
'build': '.',
'working_dir': '/tmp',
'user': 'root',
'labels': {'FOO': 'LABEL'},
'privileged': True,
}
with mock.patch('compose.bundle.log.warn', autospec=True) as mock_log:
config = bundle.convert_service_to_bundle(name, service_dict, image_digest)
mock_log.assert_called_once_with(
"Unsupported key 'privileged' in services.theservice - ignoring")
assert config == {
'Image': image_digest,
'Ports': [
{'Protocol': 'tcp', 'Port': 80},
{'Protocol': 'tcp', 'Port': 1234},
],
'Networks': ['extra'],
'Command': ['entry', 'foo'],
'Env': ['BAZ=ENV'],
'WorkingDir': '/tmp',
'User': 'root',
'Labels': {'FOO': 'LABEL'},
}
def test_set_command_and_args_none():
config = {}
bundle.set_command_and_args(config, [], [])
assert config == {}
def test_set_command_and_args_from_command():
config = {}
bundle.set_command_and_args(config, [], "echo ok")
assert config == {'Args': ['echo', 'ok']}
def test_set_command_and_args_from_entrypoint():
config = {}
bundle.set_command_and_args(config, "echo entry", [])
assert config == {'Command': ['echo', 'entry']}
def test_set_command_and_args_from_both():
config = {}
bundle.set_command_and_args(config, "echo entry", ["extra", "arg"])
assert config == {'Command': ['echo', 'entry', "extra", "arg"]}
def test_make_service_networks_default():
name = 'theservice'
service_dict = {}
with mock.patch('compose.bundle.log.warn', autospec=True) as mock_log:
networks = bundle.make_service_networks(name, service_dict)
assert not mock_log.called
assert networks == ['default']
def test_make_service_networks():
name = 'theservice'
service_dict = {
'networks': {
'foo': {
'aliases': ['one', 'two'],
},
'bar': {}
},
}
with mock.patch('compose.bundle.log.warn', autospec=True) as mock_log:
networks = bundle.make_service_networks(name, service_dict)
mock_log.assert_called_once_with(
"Unsupported key 'aliases' in services.theservice.networks.foo - ignoring")
assert sorted(networks) == sorted(service_dict['networks'])
def test_make_port_specs():
service_dict = {
'expose': ['80', '500/udp'],
'ports': [
'400:80',
'222',
'127.0.0.1:8001:8001',
'127.0.0.1:5000-5001:3000-3001'],
}
port_specs = bundle.make_port_specs(service_dict)
assert port_specs == [
{'Protocol': 'tcp', 'Port': 80},
{'Protocol': 'tcp', 'Port': 222},
{'Protocol': 'tcp', 'Port': 8001},
{'Protocol': 'tcp', 'Port': 3000},
{'Protocol': 'tcp', 'Port': 3001},
{'Protocol': 'udp', 'Port': 500},
]
def test_make_port_spec_with_protocol():
port_spec = bundle.make_port_spec("5000/udp")
assert port_spec == {'Protocol': 'udp', 'Port': 5000}
def test_make_port_spec_default_protocol():
port_spec = bundle.make_port_spec("50000")
assert port_spec == {'Protocol': 'tcp', 'Port': 50000}

View File

@ -65,3 +65,23 @@ class ProgressStreamTestCase(unittest.TestCase):
events = progress_stream.stream_output(events, output)
self.assertTrue(len(output.getvalue()) > 0)
def test_get_digest_from_push():
digest = "sha256:abcd"
events = [
{"status": "..."},
{"status": "..."},
{"progressDetail": {}, "aux": {"Digest": digest}},
]
assert progress_stream.get_digest_from_push(events) == digest
def test_get_digest_from_pull():
digest = "sha256:abcd"
events = [
{"status": "..."},
{"status": "..."},
{"status": "Digest: %s" % digest},
]
assert progress_stream.get_digest_from_pull(events) == digest