diff --git a/compose/cli/main.py b/compose/cli/main.py
index 8dae737b8..557dc6367 100644
--- a/compose/cli/main.py
+++ b/compose/cli/main.py
@@ -13,7 +13,7 @@ import dockerpty
 from .. import __version__
 from .. import migration
 from ..project import NoSuchService, ConfigurationError
-from ..service import BuildError, CannotBeScaledError
+from ..service import BuildError, CannotBeScaledError, NeedsBuildError
 from ..config import parse_environment
 from .command import Command
 from .docopt_command import NoSuchCommand
@@ -47,6 +47,9 @@ def main():
     except BuildError as e:
         log.error("Service '%s' failed to build: %s" % (e.service.name, e.reason))
         sys.exit(1)
+    except NeedsBuildError as e:
+        log.error("Service '%s' needs to be built, but --no-build was passed." % e.service.name)
+        sys.exit(1)
 
 
 def setup_logging():
@@ -297,9 +300,8 @@ class TopLevelCommand(Command):
                 project.up(
                     service_names=deps,
                     start_deps=True,
-                    recreate=False,
+                    allow_recreate=False,
                     insecure_registry=insecure_registry,
-                    detach=options['-d']
                 )
 
         tty = True
@@ -441,6 +443,8 @@ class TopLevelCommand(Command):
                                    print new container names.
             --no-color             Produce monochrome output.
             --no-deps              Don't start linked services.
+            --x-smart-recreate     Only recreate containers whose configuration or
+                                   image needs to be updated. (EXPERIMENTAL)
             --no-recreate          If containers already exist, don't recreate them.
             --no-build             Don't build an image, even if it's missing
             -t, --timeout TIMEOUT  When attached, use this timeout in seconds
@@ -453,15 +457,16 @@ class TopLevelCommand(Command):
         monochrome = options['--no-color']
 
         start_deps = not options['--no-deps']
-        recreate = not options['--no-recreate']
+        allow_recreate = not options['--no-recreate']
+        smart_recreate = options['--x-smart-recreate']
         service_names = options['SERVICE']
 
         project.up(
             service_names=service_names,
             start_deps=start_deps,
-            recreate=recreate,
+            allow_recreate=allow_recreate,
+            smart_recreate=smart_recreate,
             insecure_registry=insecure_registry,
-            detach=detached,
             do_build=not options['--no-build'],
         )
 
diff --git a/compose/const.py b/compose/const.py
index 0714a6dbf..f76fb572c 100644
--- a/compose/const.py
+++ b/compose/const.py
@@ -4,3 +4,4 @@ LABEL_ONE_OFF = 'com.docker.compose.oneoff'
 LABEL_PROJECT = 'com.docker.compose.project'
 LABEL_SERVICE = 'com.docker.compose.service'
 LABEL_VERSION = 'com.docker.compose.version'
+LABEL_CONFIG_HASH = 'com.docker.compose.config-hash'
diff --git a/compose/container.py b/compose/container.py
index 3e462088f..719514971 100644
--- a/compose/container.py
+++ b/compose/container.py
@@ -179,13 +179,16 @@ class Container(object):
         return self.client.attach_socket(self.id, **kwargs)
 
     def __repr__(self):
-        return '<Container: %s>' % self.name
+        return '<Container: %s (%s)>' % (self.name, self.id[:6])
 
     def __eq__(self, other):
         if type(self) != type(other):
             return False
         return self.id == other.id
 
+    def __hash__(self):
+        return self.id.__hash__()
+
 
 def get_container_name(container):
     if not container.get('Name') and not container.get('Names'):
diff --git a/compose/project.py b/compose/project.py
index 8ca144813..c37175ae0 100644
--- a/compose/project.py
+++ b/compose/project.py
@@ -207,24 +207,59 @@ class Project(object):
     def up(self,
            service_names=None,
            start_deps=True,
-           recreate=True,
+           allow_recreate=True,
+           smart_recreate=False,
            insecure_registry=False,
-           detach=False,
            do_build=True):
-        running_containers = []
-        for service in self.get_services(service_names, include_deps=start_deps):
-            if recreate:
-                create_func = service.recreate_containers
+
+        services = self.get_services(service_names, include_deps=start_deps)
+
+        plans = self._get_convergence_plans(
+            services,
+            allow_recreate=allow_recreate,
+            smart_recreate=smart_recreate,
+        )
+
+        return [
+            container
+            for service in services
+            for container in service.execute_convergence_plan(
+                plans[service.name],
+                insecure_registry=insecure_registry,
+                do_build=do_build,
+            )
+        ]
+
+    def _get_convergence_plans(self,
+                               services,
+                               allow_recreate=True,
+                               smart_recreate=False):
+
+        plans = {}
+
+        for service in services:
+            updated_dependencies = [
+                name
+                for name in service.get_dependency_names()
+                if name in plans
+                and plans[name].action == 'recreate'
+            ]
+
+            if updated_dependencies:
+                log.debug(
+                    '%s has not changed but its dependencies (%s) have, so recreating',
+                    service.name, ", ".join(updated_dependencies),
+                )
+                plan = service.recreate_plan()
             else:
-                create_func = service.start_or_create_containers
+                plan = service.convergence_plan(
+                    allow_recreate=allow_recreate,
+                    smart_recreate=smart_recreate,
+                )
 
-            for container in create_func(
-                    insecure_registry=insecure_registry,
-                    detach=detach,
-                    do_build=do_build):
-                running_containers.append(container)
+            plans[service.name] = plan
 
-        return running_containers
+        return plans
 
     def pull(self, service_names=None, insecure_registry=False):
         for service in self.get_services(service_names, include_deps=True):
@@ -252,10 +287,7 @@ class Project(object):
         return containers
 
     def _inject_deps(self, acc, service):
-        net_name = service.get_net_name()
-        dep_names = (service.get_linked_names() +
-                     service.get_volumes_from_names() +
-                     ([net_name] if net_name else []))
+        dep_names = service.get_dependency_names()
 
         if len(dep_names) > 0:
             dep_services = self.get_services(
diff --git a/compose/service.py b/compose/service.py
index dc34a9bc2..0c03648c4 100644
--- a/compose/service.py
+++ b/compose/service.py
@@ -18,9 +18,11 @@ from .const import (
     LABEL_PROJECT,
     LABEL_SERVICE,
     LABEL_VERSION,
+    LABEL_CONFIG_HASH,
 )
 from .container import Container, get_container_name
 from .progress_stream import stream_output, StreamOutputError
+from .utils import json_hash
 
 log = logging.getLogger(__name__)
 
@@ -59,12 +61,20 @@ class ConfigError(ValueError):
     pass
 
 
+class NeedsBuildError(Exception):
+    def __init__(self, service):
+        self.service = service
+
+
 VolumeSpec = namedtuple('VolumeSpec', 'external internal mode')
 
 
 ServiceName = namedtuple('ServiceName', 'project service number')
 
 
+ConvergencePlan = namedtuple('ConvergencePlan', 'action containers')
+
+
 class Service(object):
     def __init__(self, name, client=None, project='default', links=None, external_links=None, volumes_from=None, net=None, **options):
         if not re.match('^%s+$' % VALID_NAME_CHARS, name):
@@ -147,7 +157,7 @@ class Service(object):
         # Create enough containers
         containers = self.containers(stopped=True)
         while len(containers) < desired_num:
-            containers.append(self.create_container(detach=True))
+            containers.append(self.create_container())
 
         running_containers = []
         stopped_containers = []
@@ -192,6 +202,11 @@ class Service(object):
         Create a container for this service. If the image doesn't exist, attempt to pull
         it.
         """
+        self.ensure_image_exists(
+            do_build=do_build,
+            insecure_registry=insecure_registry,
+        )
+
         container_options = self._get_container_create_options(
             override_options,
             number or self._next_container_number(one_off=one_off),
@@ -199,42 +214,142 @@ class Service(object):
             previous_container=previous_container,
         )
 
-        if (do_build and
-                self.can_be_built() and
-                not self.client.images(name=self.full_name)):
-            self.build()
+        return Container.create(self.client, **container_options)
 
+    def ensure_image_exists(self,
+                            do_build=True,
+                            insecure_registry=False):
+
+        if self.image():
+            return
+
+        if self.can_be_built():
+            if do_build:
+                self.build()
+            else:
+                raise NeedsBuildError(self)
+        else:
+            self.pull(insecure_registry=insecure_registry)
+
+    def image(self):
         try:
-            return Container.create(self.client, **container_options)
+            return self.client.inspect_image(self.image_name)
         except APIError as e:
             if e.response.status_code == 404 and e.explanation and 'No such image' in str(e.explanation):
-                self.pull(insecure_registry=insecure_registry)
-                return Container.create(self.client, **container_options)
-            raise
+                return None
+            else:
+                raise
 
-    def recreate_containers(self, insecure_registry=False, do_build=True, **override_options):
+    @property
+    def image_name(self):
+        if self.can_be_built():
+            return self.full_name
+        else:
+            return self.options['image']
+
+    def converge(self,
+                 allow_recreate=True,
+                 smart_recreate=False,
+                 insecure_registry=False,
+                 do_build=True):
         """
         If a container for this service doesn't exist, create and start one. If there are
         any, stop them, create+start new ones, and remove the old containers.
         """
+        plan = self.convergence_plan(
+            allow_recreate=allow_recreate,
+            smart_recreate=smart_recreate,
+        )
+
+        return self.execute_convergence_plan(
+            plan,
+            insecure_registry=insecure_registry,
+            do_build=do_build,
+        )
+
+    def convergence_plan(self,
+                         allow_recreate=True,
+                         smart_recreate=False):
+
         containers = self.containers(stopped=True)
+
         if not containers:
+            return ConvergencePlan('create', [])
+
+        if smart_recreate and not self._containers_have_diverged(containers):
+            stopped = [c for c in containers if not c.is_running]
+
+            if stopped:
+                return ConvergencePlan('start', stopped)
+
+            return ConvergencePlan('noop', containers)
+
+        if not allow_recreate:
+            return ConvergencePlan('start', containers)
+
+        return ConvergencePlan('recreate', containers)
+
+    def recreate_plan(self):
+        containers = self.containers(stopped=True)
+        return ConvergencePlan('recreate', containers)
+
+    def _containers_have_diverged(self, containers):
+        config_hash = self.config_hash()
+        has_diverged = False
+
+        for c in containers:
+            container_config_hash = c.labels.get(LABEL_CONFIG_HASH, None)
+            if container_config_hash != config_hash:
+                log.debug(
+                    '%s has diverged: %s != %s',
+                    c.name, container_config_hash, config_hash,
+                )
+                has_diverged = True
+
+        return has_diverged
+
+    def execute_convergence_plan(self,
+                                 plan,
+                                 insecure_registry=False,
+                                 do_build=True):
+        (action, containers) = plan
+
+        if action == 'create':
             container = self.create_container(
                 insecure_registry=insecure_registry,
                 do_build=do_build,
-                **override_options)
+            )
             self.start_container(container)
+
             return [container]
 
-        return [
-            self.recreate_container(
-                c,
-                insecure_registry=insecure_registry,
-                **override_options)
-            for c in containers
-        ]
+        elif action == 'recreate':
+            return [
+                self.recreate_container(
+                    c,
+                    insecure_registry=insecure_registry,
+                )
+                for c in containers
+            ]
 
-    def recreate_container(self, container, **override_options):
+        elif action == 'start':
+            for c in containers:
+                self.start_container_if_stopped(c)
+
+            return containers
+
+        elif action == 'noop':
+            for c in containers:
+                log.info("%s is up-to-date" % c.name)
+
+            return containers
+
+        else:
+            raise Exception("Invalid action: {}".format(action))
+
+    def recreate_container(self,
+                           container,
+                           insecure_registry=False):
         """Recreate a container.
 
         The original container is renamed to a temporary name so that data
@@ -257,16 +372,12 @@ class Service(object):
             container.id,
             '%s_%s' % (container.short_id, container.name))
 
-        override_options = dict(
-            override_options,
-            environment=merge_environment(
-                override_options.get('environment'),
-                {'affinity:container': '=' + container.id}))
         new_container = self.create_container(
+            insecure_registry=insecure_registry,
             do_build=False,
             previous_container=container,
             number=container.labels.get(LABEL_CONTAINER_NUMBER),
-            **override_options)
+        )
         self.start_container(new_container)
         container.remove()
         return new_container
@@ -285,20 +396,33 @@ class Service(object):
     def start_or_create_containers(
             self,
             insecure_registry=False,
-            detach=False,
             do_build=True):
         containers = self.containers(stopped=True)
 
         if not containers:
             new_container = self.create_container(
                 insecure_registry=insecure_registry,
-                detach=detach,
                 do_build=do_build,
             )
             return [self.start_container(new_container)]
         else:
             return [self.start_container_if_stopped(c) for c in containers]
 
+    def config_hash(self):
+        return json_hash(self.config_dict())
+
+    def config_dict(self):
+        return {
+            'options': self.options,
+            'image_id': self.image()['Id'],
+        }
+
+    def get_dependency_names(self):
+        net_name = self.get_net_name()
+        return (self.get_linked_names() +
+                self.get_volumes_from_names() +
+                ([net_name] if net_name else []))
+
     def get_linked_names(self):
         return [s.name for (s, _) in self.links]
 
@@ -386,6 +510,9 @@ class Service(object):
             number,
             one_off=False,
             previous_container=None):
+
+        add_config_hash = (not one_off and not override_options)
+
         container_options = dict(
             (k, self.options[k])
             for k in DOCKER_CONFIG_KEYS if k in self.options)
@@ -393,6 +520,16 @@ class Service(object):
 
         container_options['name'] = self.get_container_name(number, one_off)
 
+        if add_config_hash:
+            config_hash = self.config_hash()
+            if 'labels' not in container_options:
+                container_options['labels'] = {}
+            container_options['labels'][LABEL_CONFIG_HASH] = config_hash
+            log.debug("Added config hash: %s" % config_hash)
+
+        if 'detach' not in container_options:
+            container_options['detach'] = True
+
         # If a qualified hostname was given, split it into an
         # unqualified hostname and a domainname unless domainname
         # was also given explicitly. This matches the behavior of
@@ -429,8 +566,10 @@ class Service(object):
             self.options.get('environment'),
             override_options.get('environment'))
 
-        if self.can_be_built():
-            container_options['image'] = self.full_name
+        if previous_container:
+            container_options['environment']['affinity:container'] = ('=' + previous_container.id)
+
+        container_options['image'] = self.image_name
 
         container_options['labels'] = build_container_labels(
             container_options.get('labels', {}),
@@ -498,7 +637,7 @@ class Service(object):
 
         build_output = self.client.build(
             path=path,
-            tag=self.full_name,
+            tag=self.image_name,
             stream=True,
             rm=True,
             nocache=no_cache,
diff --git a/compose/state.py b/compose/state.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/compose/utils.py b/compose/utils.py
new file mode 100644
index 000000000..d441a2dae
--- /dev/null
+++ b/compose/utils.py
@@ -0,0 +1,9 @@
+import json
+import hashlib
+
+
+def json_hash(obj):
+    dump = json.dumps(obj, sort_keys=True)
+    h = hashlib.sha256()
+    h.update(dump)
+    return h.hexdigest()
diff --git a/tests/integration/project_test.py b/tests/integration/project_test.py
index 00d156b37..b6dcecbc6 100644
--- a/tests/integration/project_test.py
+++ b/tests/integration/project_test.py
@@ -185,7 +185,7 @@ class ProjectTest(DockerClientTestCase):
         old_db_id = project.containers()[0].id
         db_volume_path = project.containers()[0].inspect()['Volumes']['/var/db']
 
-        project.up(recreate=False)
+        project.up(allow_recreate=False)
         self.assertEqual(len(project.containers()), 2)
 
         db_container = [c for c in project.containers() if 'db' in c.name][0]
@@ -204,7 +204,7 @@ class ProjectTest(DockerClientTestCase):
         self.assertEqual(len(project.containers()), 0)
 
         project.up(['db'])
-        project.stop()
+        project.kill()
 
         old_containers = project.containers(stopped=True)
 
@@ -212,10 +212,11 @@ class ProjectTest(DockerClientTestCase):
         old_db_id = old_containers[0].id
         db_volume_path = old_containers[0].inspect()['Volumes']['/var/db']
 
-        project.up(recreate=False)
+        project.up(allow_recreate=False)
 
         new_containers = project.containers(stopped=True)
         self.assertEqual(len(new_containers), 2)
+        self.assertEqual([c.is_running for c in new_containers], [True, True])
 
         db_container = [c for c in new_containers if 'db' in c.name][0]
         self.assertEqual(db_container.id, old_db_id)
diff --git a/tests/integration/service_test.py b/tests/integration/service_test.py
index 47c826ec5..26f02d4a9 100644
--- a/tests/integration/service_test.py
+++ b/tests/integration/service_test.py
@@ -238,7 +238,7 @@ class ServiceTest(DockerClientTestCase):
         self.assertIn(volume_container_2.id,
                       host_container.get('HostConfig.VolumesFrom'))
 
-    def test_recreate_containers(self):
+    def test_converge(self):
         service = self.create_service(
             'db',
             environment={'FOO': '1'},
@@ -258,7 +258,7 @@ class ServiceTest(DockerClientTestCase):
         num_containers_before = len(self.client.containers(all=True))
 
         service.options['environment']['FOO'] = '2'
-        new_container, = service.recreate_containers()
+        new_container = service.converge()[0]
 
         self.assertEqual(new_container.get('Config.Entrypoint'), ['sleep'])
         self.assertEqual(new_container.get('Config.Cmd'), ['300'])
@@ -275,7 +275,7 @@ class ServiceTest(DockerClientTestCase):
                           self.client.inspect_container,
                           old_container.id)
 
-    def test_recreate_containers_when_containers_are_stopped(self):
+    def test_converge_when_containers_are_stopped(self):
         service = self.create_service(
             'db',
             environment={'FOO': '1'},
@@ -285,10 +285,10 @@ class ServiceTest(DockerClientTestCase):
         )
         service.create_container()
         self.assertEqual(len(service.containers(stopped=True)), 1)
-        service.recreate_containers()
+        service.converge()
         self.assertEqual(len(service.containers(stopped=True)), 1)
 
-    def test_recreate_containers_with_image_declared_volume(self):
+    def test_converge_with_image_declared_volume(self):
         service = Service(
             project='composetest',
             name='db',
@@ -300,7 +300,7 @@ class ServiceTest(DockerClientTestCase):
         self.assertEqual(old_container.get('Volumes').keys(), ['/data'])
         volume_path = old_container.get('Volumes')['/data']
 
-        new_container = service.recreate_containers()[0]
+        new_container = service.converge()[0]
         self.assertEqual(new_container.get('Volumes').keys(), ['/data'])
         self.assertEqual(new_container.get('Volumes')['/data'], volume_path)
 
@@ -651,8 +651,19 @@ class ServiceTest(DockerClientTestCase):
         expected = dict(labels_dict, **compose_labels)
 
         service = self.create_service('web', labels=labels_dict)
-        labels = create_and_start_container(service).labels
-        self.assertEqual(labels, expected)
+        labels = create_and_start_container(service).labels.items()
+        for pair in expected.items():
+            self.assertIn(pair, labels)
+
+        service.kill()
+        service.remove_stopped()
+
+        labels_list = ["%s=%s" % pair for pair in labels_dict.items()]
+
+        service = self.create_service('web', labels=labels_list)
+        labels = create_and_start_container(service).labels.items()
+        for pair in expected.items():
+            self.assertIn(pair, labels)
 
     def test_empty_labels(self):
         labels_list = ['foo', 'bar']
diff --git a/tests/integration/state_test.py b/tests/integration/state_test.py
new file mode 100644
index 000000000..3c0b2530f
--- /dev/null
+++ b/tests/integration/state_test.py
@@ -0,0 +1,263 @@
+from __future__ import unicode_literals
+import tempfile
+import shutil
+import os
+
+from compose import config
+from compose.project import Project
+from compose.const import LABEL_CONFIG_HASH
+
+from .testcases import DockerClientTestCase
+
+
+class ProjectTestCase(DockerClientTestCase):
+    def run_up(self, cfg, **kwargs):
+        if 'smart_recreate' not in kwargs:
+            kwargs['smart_recreate'] = True
+
+        project = self.make_project(cfg)
+        project.up(**kwargs)
+        return set(project.containers(stopped=True))
+
+    def make_project(self, cfg):
+        return Project.from_dicts(
+            name='composetest',
+            client=self.client,
+            service_dicts=config.from_dictionary(cfg),
+        )
+
+
+class BasicProjectTest(ProjectTestCase):
+    def setUp(self):
+        super(BasicProjectTest, self).setUp()
+
+        self.cfg = {
+            'db': {'image': 'busybox:latest'},
+            'web': {'image': 'busybox:latest'},
+        }
+
+    def test_no_change(self):
+        old_containers = self.run_up(self.cfg)
+        self.assertEqual(len(old_containers), 2)
+
+        new_containers = self.run_up(self.cfg)
+        self.assertEqual(len(new_containers), 2)
+
+        self.assertEqual(old_containers, new_containers)
+
+    def test_partial_change(self):
+        old_containers = self.run_up(self.cfg)
+        old_db = [c for c in old_containers if c.name_without_project == 'db_1'][0]
+        old_web = [c for c in old_containers if c.name_without_project == 'web_1'][0]
+
+        self.cfg['web']['command'] = '/bin/true'
+
+        new_containers = self.run_up(self.cfg)
+        self.assertEqual(len(new_containers), 2)
+
+        preserved = list(old_containers & new_containers)
+        self.assertEqual(preserved, [old_db])
+
+        removed = list(old_containers - new_containers)
+        self.assertEqual(removed, [old_web])
+
+        created = list(new_containers - old_containers)
+        self.assertEqual(len(created), 1)
+        self.assertEqual(created[0].name_without_project, 'web_1')
+        self.assertEqual(created[0].get('Config.Cmd'), ['/bin/true'])
+
+    def test_all_change(self):
+        old_containers = self.run_up(self.cfg)
+        self.assertEqual(len(old_containers), 2)
+
+        self.cfg['web']['command'] = '/bin/true'
+        self.cfg['db']['command'] = '/bin/true'
+
+        new_containers = self.run_up(self.cfg)
+        self.assertEqual(len(new_containers), 2)
+
+        unchanged = old_containers & new_containers
+        self.assertEqual(len(unchanged), 0)
+
+        new = new_containers - old_containers
+        self.assertEqual(len(new), 2)
+
+
+class ProjectWithDependenciesTest(ProjectTestCase):
+    def setUp(self):
+        super(ProjectWithDependenciesTest, self).setUp()
+
+        self.cfg = {
+            'db': {
+                'image': 'busybox:latest',
+                'command': 'tail -f /dev/null',
+            },
+            'web': {
+                'image': 'busybox:latest',
+                'command': 'tail -f /dev/null',
+                'links': ['db'],
+            },
+            'nginx': {
+                'image': 'busybox:latest',
+                'command': 'tail -f /dev/null',
+                'links': ['web'],
+            },
+        }
+
+    def test_up(self):
+        containers = self.run_up(self.cfg)
+        self.assertEqual(
+            set(c.name_without_project for c in containers),
+            set(['db_1', 'web_1', 'nginx_1']),
+        )
+
+    def test_change_leaf(self):
+        old_containers = self.run_up(self.cfg)
+
+        self.cfg['nginx']['environment'] = {'NEW_VAR': '1'}
+        new_containers = self.run_up(self.cfg)
+
+        self.assertEqual(
+            set(c.name_without_project for c in new_containers - old_containers),
+            set(['nginx_1']),
+        )
+
+    def test_change_middle(self):
+        old_containers = self.run_up(self.cfg)
+
+        self.cfg['web']['environment'] = {'NEW_VAR': '1'}
+        new_containers = self.run_up(self.cfg)
+
+        self.assertEqual(
+            set(c.name_without_project for c in new_containers - old_containers),
+            set(['web_1', 'nginx_1']),
+        )
+
+    def test_change_root(self):
+        old_containers = self.run_up(self.cfg)
+
+        self.cfg['db']['environment'] = {'NEW_VAR': '1'}
+        new_containers = self.run_up(self.cfg)
+
+        self.assertEqual(
+            set(c.name_without_project for c in new_containers - old_containers),
+            set(['db_1', 'web_1', 'nginx_1']),
+        )
+
+    def test_change_root_no_recreate(self):
+        old_containers = self.run_up(self.cfg)
+
+        self.cfg['db']['environment'] = {'NEW_VAR': '1'}
+        new_containers = self.run_up(self.cfg, allow_recreate=False)
+
+        self.assertEqual(new_containers - old_containers, set())
+
+
+class ServiceStateTest(DockerClientTestCase):
+    def test_trigger_create(self):
+        web = self.create_service('web')
+        self.assertEqual(('create', []), web.convergence_plan(smart_recreate=True))
+
+    def test_trigger_noop(self):
+        web = self.create_service('web')
+        container = web.create_container()
+        web.start()
+
+        web = self.create_service('web')
+        self.assertEqual(('noop', [container]), web.convergence_plan(smart_recreate=True))
+
+    def test_trigger_start(self):
+        options = dict(command=["/bin/sleep", "300"])
+
+        web = self.create_service('web', **options)
+        web.scale(2)
+
+        containers = web.containers(stopped=True)
+        containers[0].stop()
+        containers[0].inspect()
+
+        self.assertEqual([c.is_running for c in containers], [False, True])
+
+        web = self.create_service('web', **options)
+        self.assertEqual(
+            ('start', containers[0:1]),
+            web.convergence_plan(smart_recreate=True),
+        )
+
+    def test_trigger_recreate_with_config_change(self):
+        web = self.create_service('web', command=["/bin/sleep", "300"])
+        container = web.create_container()
+
+        web = self.create_service('web', command=["/bin/sleep", "400"])
+        self.assertEqual(('recreate', [container]), web.convergence_plan(smart_recreate=True))
+
+    def test_trigger_recreate_with_image_change(self):
+        repo = 'composetest_myimage'
+        tag = 'latest'
+        image = '{}:{}'.format(repo, tag)
+
+        image_id = self.client.images(name='busybox')[0]['Id']
+        self.client.tag(image_id, repository=repo, tag=tag)
+
+        try:
+            web = self.create_service('web', image=image)
+            container = web.create_container()
+
+            # update the image
+            c = self.client.create_container(image, ['touch', '/hello.txt'])
+            self.client.commit(c, repository=repo, tag=tag)
+            self.client.remove_container(c)
+
+            web = self.create_service('web', image=image)
+            self.assertEqual(('recreate', [container]), web.convergence_plan(smart_recreate=True))
+
+        finally:
+            self.client.remove_image(image)
+
+    def test_trigger_recreate_with_build(self):
+        context = tempfile.mkdtemp()
+
+        try:
+            dockerfile = os.path.join(context, 'Dockerfile')
+
+            with open(dockerfile, 'w') as f:
+                f.write('FROM busybox\n')
+
+            web = self.create_service('web', build=context)
+            container = web.create_container()
+
+            with open(dockerfile, 'w') as f:
+                f.write('FROM busybox\nCMD echo hello world\n')
+            web.build()
+
+            web = self.create_service('web', build=context)
+            self.assertEqual(('recreate', [container]), web.convergence_plan(smart_recreate=True))
+        finally:
+            shutil.rmtree(context)
+
+
+class ConfigHashTest(DockerClientTestCase):
+    def test_no_config_hash_when_one_off(self):
+        web = self.create_service('web')
+        container = web.create_container(one_off=True)
+        self.assertNotIn(LABEL_CONFIG_HASH, container.labels)
+
+    def test_no_config_hash_when_overriding_options(self):
+        web = self.create_service('web')
+        container = web.create_container(environment={'FOO': '1'})
+        self.assertNotIn(LABEL_CONFIG_HASH, container.labels)
+
+    def test_config_hash_with_custom_labels(self):
+        web = self.create_service('web', labels={'foo': '1'})
+        container = web.converge()[0]
+        self.assertIn(LABEL_CONFIG_HASH, container.labels)
+        self.assertIn('foo', container.labels)
+
+    def test_config_hash_sticks_around(self):
+        web = self.create_service('web', command=["/bin/sleep", "300"])
+        container = web.converge()[0]
+        self.assertIn(LABEL_CONFIG_HASH, container.labels)
+
+        web = self.create_service('web', command=["/bin/sleep", "400"])
+        container = web.converge()[0]
+        self.assertIn(LABEL_CONFIG_HASH, container.labels)
diff --git a/tests/unit/service_test.py b/tests/unit/service_test.py
index fa252062c..add48086d 100644
--- a/tests/unit/service_test.py
+++ b/tests/unit/service_test.py
@@ -5,14 +5,13 @@ from .. import unittest
 import mock
 
 import docker
-from requests import Response
 
 from compose.service import Service
 from compose.container import Container
 from compose.const import LABEL_SERVICE, LABEL_PROJECT, LABEL_ONE_OFF
 from compose.service import (
-    APIError,
     ConfigError,
+    NeedsBuildError,
     build_port_bindings,
     build_volume_binding,
     get_container_data_volumes,
@@ -223,36 +222,28 @@ class ServiceTest(unittest.TestCase):
             insecure_registry=False,
             stream=True)
 
-    @mock.patch('compose.service.Container', autospec=True)
-    @mock.patch('compose.service.log', autospec=True)
-    def test_create_container_from_insecure_registry(
-            self,
-            mock_log,
-            mock_container):
+    def test_create_container_from_insecure_registry(self):
         service = Service('foo', client=self.mock_client, image='someimage:sometag')
-        mock_response = mock.Mock(Response)
-        mock_response.status_code = 404
-        mock_response.reason = "Not Found"
-        mock_container.create.side_effect = APIError(
-            'Mock error', mock_response, "No such image")
+        images = []
 
-        # We expect the APIError because our service requires a
-        # non-existent image.
-        with self.assertRaises(APIError):
-            service.create_container(insecure_registry=True)
+        def pull(repo, tag=None, insecure_registry=False, **kwargs):
+            self.assertEqual('someimage', repo)
+            self.assertEqual('sometag', tag)
+            self.assertTrue(insecure_registry)
+            images.append({'Id': 'abc123'})
+            return []
 
-        self.mock_client.pull.assert_called_once_with(
-            'someimage',
-            tag='sometag',
-            insecure_registry=True,
-            stream=True)
-        mock_log.info.assert_called_with(
-            'Pulling foo (someimage:sometag)...')
+        service.image = lambda: images[0] if images else None
+        self.mock_client.pull = pull
+
+        service.create_container(insecure_registry=True)
+        self.assertEqual(1, len(images))
 
     @mock.patch('compose.service.Container', autospec=True)
     def test_recreate_container(self, _):
         mock_container = mock.create_autospec(Container)
         service = Service('foo', client=self.mock_client, image='someimage')
+        service.image = lambda: {'Id': 'abc123'}
         new_container = service.recreate_container(mock_container)
 
         mock_container.stop.assert_called_once_with()
@@ -273,36 +264,45 @@ class ServiceTest(unittest.TestCase):
 
     @mock.patch('compose.service.Container', autospec=True)
     def test_create_container_latest_is_used_when_no_tag_specified(self, mock_container):
-        mock_container.create.side_effect = APIError(
-            "oops",
-            mock.Mock(status_code=404),
-            "No such image")
         service = Service('foo', client=self.mock_client, image='someimage')
-        with self.assertRaises(APIError):
-            service.create_container()
-        self.mock_client.pull.assert_called_once_with(
-            'someimage',
-            tag='latest',
-            insecure_registry=False,
-            stream=True)
+        images = []
+
+        def pull(repo, tag=None, **kwargs):
+            self.assertEqual('someimage', repo)
+            self.assertEqual('latest', tag)
+            images.append({'Id': 'abc123'})
+            return []
+
+        service.image = lambda: images[0] if images else None
+        self.mock_client.pull = pull
+
+        service.create_container()
+        self.assertEqual(1, len(images))
 
     def test_create_container_with_build(self):
-        self.mock_client.images.return_value = []
         service = Service('foo', client=self.mock_client, build='.')
-        service.build = mock.create_autospec(service.build)
-        service.create_container(do_build=True)
 
-        self.mock_client.images.assert_called_once_with(name=service.full_name)
-        service.build.assert_called_once_with()
+        images = []
+        service.image = lambda *args, **kwargs: images[0] if images else None
+        service.build = lambda: images.append({'Id': 'abc123'})
+
+        service.create_container(do_build=True)
+        self.assertEqual(1, len(images))
 
     def test_create_container_no_build(self):
-        self.mock_client.images.return_value = []
         service = Service('foo', client=self.mock_client, build='.')
-        service.create_container(do_build=False)
+        service.image = lambda: {'Id': 'abc123'}
 
-        self.assertFalse(self.mock_client.images.called)
+        service.create_container(do_build=False)
         self.assertFalse(self.mock_client.build.called)
 
+    def test_create_container_no_build_but_needs_build(self):
+        service = Service('foo', client=self.mock_client, build='.')
+        service.image = lambda: None
+
+        with self.assertRaises(NeedsBuildError):
+            service.create_container(do_build=False)
+
 
 class ServiceVolumesTest(unittest.TestCase):