diff --git a/ironic/common/release_mappings.py b/ironic/common/release_mappings.py
index 374b68f156..fd21b24566 100644
--- a/ironic/common/release_mappings.py
+++ b/ironic/common/release_mappings.py
@@ -103,7 +103,7 @@ RELEASE_MAPPING = {
         'api': '1.43',
         'rpc': '1.44',
         'objects': {
-            'Node': ['1.24', '1.25'],
+            'Node': ['1.25', '1.24'],
             'Conductor': ['1.2'],
             'Chassis': ['1.3'],
             'Port': ['1.8'],
@@ -116,9 +116,9 @@ RELEASE_MAPPING = {
     },
     'master': {
         'api': '1.43',
-        'rpc': '1.44',
+        'rpc': '1.45',
         'objects': {
-            'Node': ['1.24', '1.25', '1.26'],
+            'Node': ['1.26'],
             'Conductor': ['1.2'],
             'Chassis': ['1.3'],
             'Port': ['1.8'],
diff --git a/ironic/conductor/manager.py b/ironic/conductor/manager.py
index 2031782925..5ddab21e4e 100644
--- a/ironic/conductor/manager.py
+++ b/ironic/conductor/manager.py
@@ -1,5 +1,3 @@
-# coding=utf-8
-
 # Copyright 2013 Hewlett-Packard Development Company, L.P.
 # Copyright 2013 International Business Machines Corporation
 # All Rights Reserved.
@@ -55,6 +53,7 @@ from oslo_log import log
 import oslo_messaging as messaging
 from oslo_utils import excutils
 from oslo_utils import uuidutils
+from oslo_utils import versionutils
 from six.moves import queue
 
 from ironic.common import driver_factory
@@ -64,6 +63,7 @@ from ironic.common.glance_service import service_utils as glance_utils
 from ironic.common.i18n import _
 from ironic.common import images
 from ironic.common import network
+from ironic.common import release_mappings as versions
 from ironic.common import states
 from ironic.common import swift
 from ironic.conductor import base_manager
@@ -89,6 +89,10 @@ SYNC_EXCLUDED_STATES = (states.DEPLOYWAIT, states.CLEANWAIT, states.ENROLL)
 # agent_version parameter and need updating.
 _SEEN_AGENT_VERSION_DEPRECATIONS = []
 
+# NOTE(rloo) This list is used to keep track of deprecation warnings that
+# have already been issued for deploy drivers that do not use deploy steps.
+_SEEN_NO_DEPLOY_STEP_DEPRECATIONS = []
+
 
 class ConductorManager(base_manager.BaseConductorManager):
     """Ironic Conductor manager main class."""
@@ -96,7 +100,7 @@ class ConductorManager(base_manager.BaseConductorManager):
     # NOTE(rloo): This must be in sync with rpcapi.ConductorAPI's.
     # NOTE(pas-ha): This also must be in sync with
     #               ironic.common.release_mappings.RELEASE_MAPPING['master']
-    RPC_API_VERSION = '1.44'
+    RPC_API_VERSION = '1.45'
 
     target = messaging.Target(version=RPC_API_VERSION)
 
@@ -814,6 +818,59 @@ class ConductorManager(base_manager.BaseConductorManager):
                     action=event, node=task.node.uuid,
                     state=task.node.provision_state)
 
+    def _get_node_next_deploy_steps(self, task):
+        return self._get_node_next_steps(task, 'deploy')
+
+    @METRICS.timer('ConductorManager.continue_node_deploy')
+    def continue_node_deploy(self, context, node_id):
+        """RPC method to continue deploying a node.
+
+        This is useful for deploying tasks that are async. When they complete,
+        they call back via RPC, a new worker and lock are set up, and deploying
+        continues. This can also be used to resume deploying on take_over.
+
+        :param context: an admin context.
+        :param node_id: the ID or UUID of a node.
+        :raises: InvalidStateRequested if the node is not in DEPLOYWAIT state
+        :raises: NoFreeConductorWorker when there is no free worker to start
+                 async task
+        :raises: NodeLocked if node is locked by another conductor.
+        :raises: NodeNotFound if the node no longer appears in the database
+
+        """
+        LOG.debug("RPC continue_node_deploy called for node %s.", node_id)
+        with task_manager.acquire(context, node_id, shared=False,
+                                  purpose='continue node deploying') as task:
+            node = task.node
+
+            # FIXME(rloo): This should be states.DEPLOYWAIT, but we're using
+            # this temporarily to get control back to the conductor, to finish
+            # the deployment. Once we split up the deployment into separate
+            # deploy steps and after we've crossed a rolling-upgrade boundary,
+            # we should be able to check for DEPLOYWAIT only.
+            expected_states = [states.DEPLOYWAIT, states.DEPLOYING]
+            if node.provision_state not in expected_states:
+                raise exception.InvalidStateRequested(_(
+                    'Cannot continue deploying on %(node)s. Node is in '
+                    '%(state)s state; should be in one of %(deploy_state)s') %
+                    {'node': node.uuid,
+                     'state': node.provision_state,
+                     'deploy_state': ', '.join(expected_states)})
+
+            next_step_index = self._get_node_next_deploy_steps(task)
+
+            # TODO(rloo): When deprecation period is over and node is in
+            # states.DEPLOYWAIT only, delete the 'if' and always 'resume'.
+            if node.provision_state != states.DEPLOYING:
+                task.process_event('resume')
+
+            task.set_spawn_error_hook(utils.spawn_deploying_error_handler,
+                                      task.node)
+            task.spawn_after(
+                self._spawn_worker,
+                _do_next_deploy_step,
+                task, next_step_index, self.conductor.id)
+
     @METRICS.timer('ConductorManager.do_node_tear_down')
     @messaging.expected_exceptions(exception.NoFreeConductorWorker,
                                    exception.NodeLocked,
@@ -916,50 +973,55 @@ class ConductorManager(base_manager.BaseConductorManager):
         task.process_event('clean')
         self._do_node_clean(task)
 
-    def _get_node_next_clean_steps(self, task, skip_current_step=True):
-        """Get the task's node's next clean steps.
+    def _get_node_next_steps(self, task, step_type,
+                             skip_current_step=True):
+        """Get the task's node's next steps.
 
-        This determines what the next (remaining) clean steps are, and
-        returns the index into the clean steps list that corresponds to the
-        next clean step. The remaining clean steps are determined as follows:
+        This determines what the next (remaining) steps are, and
+        returns the index into the steps list that corresponds to the
+        next step. The remaining steps are determined as follows:
 
-        * If no clean steps have been started yet, all the clean steps
+        * If no steps have been started yet, all the steps
           must be executed
-        * If skip_current_step is False, the remaining clean steps start
-          with the current clean step. Otherwise, the remaining clean steps
-          start with the clean step after the current one.
+        * If skip_current_step is False, the remaining steps start
+          with the current step. Otherwise, the remaining steps
+          start with the step after the current one.
 
-        All the clean steps for an automated or manual cleaning are in
-        node.driver_internal_info['clean_steps']. node.clean_step is the
-        current clean step that was just executed (or None, {} if no steps
-        have been executed yet). node.driver_internal_info['clean_step_index']
-        is the index into the clean steps list (or None, doesn't exist if no
-        steps have been executed yet) and corresponds to node.clean_step.
+        All the steps are in node.driver_internal_info['<step_type>_steps'].
+        node.<step_type>_step is the current step that was just executed
+        (or None, {} if no steps have been executed yet).
+        node.driver_internal_info['<step_type>_step_index'] is the index
+        index into the steps list (or None, doesn't exist if no steps have
+        been executed yet) and corresponds to node.<step_type>_step.
 
         :param task: A TaskManager object
-        :param skip_current_step: True to skip the current clean step; False to
+        :param step_type: The type of steps to process: 'clean' or 'deploy'.
+        :param skip_current_step: True to skip the current step; False to
                                   include it.
-        :returns: index of the next clean step; None if there are no clean
-                  steps to execute.
+        :returns: index of the next step; None if there are none to execute.
 
         """
         node = task.node
-        if not node.clean_step:
+        if not getattr(node, '%s_step' % step_type):
             # first time through, all steps need to be done. Return the
             # index of the first step in the list.
             return 0
 
-        ind = node.driver_internal_info.get('clean_step_index')
+        ind = node.driver_internal_info.get('%s_step_index' % step_type)
         if ind is None:
             return None
 
         if skip_current_step:
             ind += 1
-        if ind >= len(node.driver_internal_info['clean_steps']):
+        if ind >= len(node.driver_internal_info['%s_steps' % step_type]):
             # no steps left to do
             ind = None
         return ind
 
+    def _get_node_next_clean_steps(self, task, skip_current_step=True):
+        return self._get_node_next_steps(task, 'clean',
+                                         skip_current_step=skip_current_step)
+
     @METRICS.timer('ConductorManager.do_node_clean')
     @messaging.expected_exceptions(exception.InvalidParameterValue,
                                    exception.InvalidStateRequested,
@@ -3310,103 +3372,264 @@ def _store_configdrive(node, configdrive):
 
 @METRICS.timer('do_node_deploy')
 @task_manager.require_exclusive_lock
-def do_node_deploy(task, conductor_id, configdrive=None):
+def do_node_deploy(task, conductor_id=None, configdrive=None):
     """Prepare the environment and deploy a node."""
     node = task.node
 
-    def handle_failure(e, task, logmsg, errmsg, traceback=False):
-        args = {'node': task.node.uuid, 'err': e}
-        LOG.error(logmsg, args, exc_info=traceback)
-        # NOTE(deva): there is no need to clear conductor_affinity
-        task.process_event('fail')
-        node.last_error = errmsg % e
+    try:
+        if configdrive:
+            _store_configdrive(node, configdrive)
+    except (exception.SwiftOperationError, exception.ConfigInvalid) as e:
+        with excutils.save_and_reraise_exception():
+            utils.deploying_error_handler(
+                task,
+                ('Error while uploading the configdrive for %(node)s '
+                 'to Swift') % {'node': node.uuid},
+                _('Failed to upload the configdrive to Swift. '
+                  'Error: %s') % e,
+                clean_up=False)
+    except db_exception.DBDataError as e:
+        with excutils.save_and_reraise_exception():
+            # NOTE(hshiina): This error happens when the configdrive is
+            #                too large. Remove the configdrive from the
+            #                object to update DB successfully in handling
+            #                the failure.
+            node.obj_reset_changes()
+            utils.deploying_error_handler(
+                task,
+                ('Error while storing the configdrive for %(node)s into '
+                 'the database: %(err)s') % {'node': node.uuid, 'err': e},
+                _("Failed to store the configdrive in the database. "
+                  "%s") % e,
+                clean_up=False)
+    except Exception as e:
+        with excutils.save_and_reraise_exception():
+            utils.deploying_error_handler(
+                task,
+                ('Unexpected error while preparing the configdrive for '
+                 'node %(node)s') % {'node': node.uuid},
+                _("Failed to prepare the configdrive. Exception: %s") % e,
+                traceback=True, clean_up=False)
 
     try:
-        try:
-            if configdrive:
-                _store_configdrive(node, configdrive)
-        except (exception.SwiftOperationError, exception.ConfigInvalid) as e:
-            with excutils.save_and_reraise_exception():
-                handle_failure(
-                    e, task,
-                    ('Error while uploading the configdrive for '
-                     '%(node)s to Swift'),
-                    _('Failed to upload the configdrive to Swift. '
-                      'Error: %s'))
-        except db_exception.DBDataError as e:
-            with excutils.save_and_reraise_exception():
-                # NOTE(hshiina): This error happens when the configdrive is
-                #                too large. Remove the configdrive from the
-                #                object to update DB successfully in handling
-                #                the failure.
-                node.obj_reset_changes()
-                handle_failure(
-                    e, task,
-                    ('Error while storing the configdrive for %(node)s into '
-                     'the database: %(err)s'),
-                    _("Failed to store the configdrive in the database. : %s"))
-        except Exception as e:
-            with excutils.save_and_reraise_exception():
-                handle_failure(
-                    e, task,
-                    ('Unexpected error while preparing the configdrive for '
-                     'node %(node)s'),
-                    _("Failed to prepare the configdrive. Exception: %s"),
-                    traceback=True)
+        task.driver.deploy.prepare(task)
+    except exception.IronicException as e:
+        with excutils.save_and_reraise_exception():
+            utils.deploying_error_handler(
+                task,
+                ('Error while preparing to deploy to node %(node)s: '
+                 '%(err)s') % {'node': node.uuid, 'err': e},
+                _("Failed to prepare to deploy: %s") % e,
+                clean_up=False)
+    except Exception as e:
+        with excutils.save_and_reraise_exception():
+            utils.deploying_error_handler(
+                task,
+                ('Unexpected error while preparing to deploy to node '
+                 '%(node)s') % {'node': node.uuid},
+                _("Failed to prepare to deploy. Exception: %s") % e,
+                traceback=True, clean_up=False)
 
+    try:
+        # This gets the deploy steps (if any) and puts them in the node's
+        # driver_internal_info['deploy_steps'].
+        utils.set_node_deployment_steps(task)
+    except exception.InstanceDeployFailure as e:
+        with excutils.save_and_reraise_exception():
+            utils.deploying_error_handler(
+                task,
+                'Error while getting deploy steps; cannot deploy to node '
+                '%(node)s. Error: %(err)s' % {'node': node.uuid, 'err': e},
+                _("Cannot get deploy steps; failed to deploy: %s") % e)
+
+    steps = node.driver_internal_info.get('deploy_steps', [])
+
+    new_rpc_version = True
+    release_ver = versions.RELEASE_MAPPING.get(CONF.pin_release_version)
+    if release_ver:
+        new_rpc_version = versionutils.is_compatible('1.45',
+                                                     release_ver['rpc'])
+
+    if not steps or not new_rpc_version:
+        # TODO(rloo): This if.. (and the above code wrt rpc version)
+        # can be deleted after the deprecation period when we no
+        # longer support drivers with no deploy steps.
+        # Note that after the deprecation period, there needs to be at least
+        # one deploy step. If none, the deployment fails.
+
+        if steps:
+            info = node.driver_internal_info
+            info.pop('deploy_steps')
+            node.driver_internal_info = info
+            node.save()
+
+        # We go back to using the old way, if:
+        # - out-of-tree driver hasn't yet converted to using deploy steps, or
+        # - we're in the middle of a rolling upgrade. This is to prevent the
+        #   corner case of having new conductors with old conductors, and
+        #   a node is deployed with a new conductor (via deploy steps), but
+        #   after the deploy_wait, the node gets handled by an old conductor.
+        #   To avoid this, we need to wait until all the conductors are new,
+        # signalled by the RPC API version being '1.45'.
+        _old_rest_of_do_node_deploy(task, conductor_id, not steps)
+    else:
+        _do_next_deploy_step(task, 0, conductor_id)
+
+
+def _old_rest_of_do_node_deploy(task, conductor_id, no_deploy_steps):
+    """The rest of the do_node_deploy() if not using deploy steps.
+
+    To support out-of-tree drivers that have not yet migrated to using
+    deploy steps.
+
+    :param no_deploy_steps: Boolean; True if there are no deploy steps.
+    """
+    # TODO(rloo): This method can be deleted after the deprecation period
+    #             for supporting drivers with no deploy steps.
+
+    if no_deploy_steps:
+        global _SEEN_NO_DEPLOY_STEP_DEPRECATIONS
+        deploy_driver_name = task.driver.deploy.__class__.__name__
+        if deploy_driver_name not in _SEEN_NO_DEPLOY_STEP_DEPRECATIONS:
+            LOG.warning('Deploy driver %s does not support deploy steps; this '
+                        'will be required after Stein.', deploy_driver_name)
+            _SEEN_NO_DEPLOY_STEP_DEPRECATIONS.append(deploy_driver_name)
+
+    node = task.node
+    try:
+        new_state = task.driver.deploy.deploy(task)
+    except exception.IronicException as e:
+        with excutils.save_and_reraise_exception():
+            utils.deploying_error_handler(
+                task,
+                ('Error in deploy of node %(node)s: %(err)s' %
+                 {'node': node.uuid, 'err': e}),
+                _("Failed to deploy: %s") % e)
+    except Exception as e:
+        with excutils.save_and_reraise_exception():
+            utils.deploying_error_handler(
+                task,
+                ('Unexpected error while deploying node %(node)s' %
+                 {'node': node.uuid}),
+                _("Failed to deploy. Exception: %s") % e,
+                traceback=True)
+
+    # Update conductor_affinity to reference this conductor's ID
+    # since there may be local persistent state
+    node.conductor_affinity = conductor_id
+
+    # NOTE(deva): Some drivers may return states.DEPLOYWAIT
+    #             eg. if they are waiting for a callback
+    if new_state == states.DEPLOYDONE:
+        task.process_event('done')
+        LOG.info('Successfully deployed node %(node)s with '
+                 'instance %(instance)s.',
+                 {'node': node.uuid, 'instance': node.instance_uuid})
+    elif new_state == states.DEPLOYWAIT:
+        task.process_event('wait')
+    else:
+        LOG.error('Unexpected state %(state)s returned while '
+                  'deploying node %(node)s.',
+                  {'state': new_state, 'node': node.uuid})
+    node.save()
+
+
+@task_manager.require_exclusive_lock
+def _do_next_deploy_step(task, step_index, conductor_id):
+    """Do deployment, starting from the specified deploy step.
+
+    :param task: a TaskManager instance with an exclusive lock
+    :param step_index: The first deploy step in the list to execute. This
+        is the index (from 0) into the list of deploy steps in the node's
+        driver_internal_info['deploy_steps']. Is None if there are no steps
+        to execute.
+    """
+    node = task.node
+    if step_index is None:
+        steps = []
+    else:
+        steps = node.driver_internal_info['deploy_steps'][step_index:]
+
+    LOG.info('Executing %(state)s on node %(node)s, remaining steps: '
+             '%(steps)s', {'node': node.uuid, 'steps': steps,
+                           'state': node.provision_state})
+
+    # Execute each step until we hit an async step or run out of steps
+    for ind, step in enumerate(steps):
+        # Save which step we're about to start so we can restart
+        # if necessary
+        node.deploy_step = step
+        driver_internal_info = node.driver_internal_info
+        driver_internal_info['deploy_step_index'] = step_index + ind
+        node.driver_internal_info = driver_internal_info
+        node.save()
+        interface = getattr(task.driver, step.get('interface'))
+        LOG.info('Executing %(step)s on node %(node)s',
+                 {'step': step, 'node': node.uuid})
         try:
-            task.driver.deploy.prepare(task)
+            result = interface.execute_deploy_step(task, step)
         except exception.IronicException as e:
-            with excutils.save_and_reraise_exception():
-                handle_failure(
-                    e, task,
-                    ('Error while preparing to deploy to node %(node)s: '
-                     '%(err)s'),
-                    _("Failed to prepare to deploy: %s"))
+            log_msg = ('Node %(node)s failed deploy step %(step)s. Error: '
+                       '%(err)s' %
+                       {'node': node.uuid, 'step': node.deploy_step, 'err': e})
+            utils.deploying_error_handler(
+                task, log_msg,
+                _("Failed to deploy: %s") % node.deploy_step)
+            return
         except Exception as e:
-            with excutils.save_and_reraise_exception():
-                handle_failure(
-                    e, task,
-                    ('Unexpected error while preparing to deploy to node '
-                     '%(node)s'),
-                    _("Failed to prepare to deploy. Exception: %s"),
-                    traceback=True)
+            log_msg = ('Node %(node)s failed deploy step %(step)s with '
+                       'unexpected error: %(err)s' %
+                       {'node': node.uuid, 'step': node.deploy_step, 'err': e})
+            utils.deploying_error_handler(
+                task, log_msg,
+                _("Failed to deploy. Exception: %s") % e, traceback=True)
+            return
 
-        try:
-            new_state = task.driver.deploy.deploy(task)
-        except exception.IronicException as e:
-            with excutils.save_and_reraise_exception():
-                handle_failure(
-                    e, task,
-                    'Error in deploy of node %(node)s: %(err)s',
-                    _("Failed to deploy: %s"))
-        except Exception as e:
-            with excutils.save_and_reraise_exception():
-                handle_failure(
-                    e, task,
-                    'Unexpected error while deploying node %(node)s',
-                    _("Failed to deploy. Exception: %s"),
-                    traceback=True)
-
-        # Update conductor_affinity to reference this conductor's ID
-        # since there may be local persistent state
-        node.conductor_affinity = conductor_id
+        if ind == 0:
+            # We've done the very first deploy step.
+            # Update conductor_affinity to reference this conductor's ID
+            # since there may be local persistent state
+            node.conductor_affinity = conductor_id
+            node.save()
 
+        # Check if the step is done or not. The step should return
+        # states.CLEANWAIT if the step is still being executed, or
+        # None if the step is done.
         # NOTE(deva): Some drivers may return states.DEPLOYWAIT
         #             eg. if they are waiting for a callback
-        if new_state == states.DEPLOYDONE:
-            task.process_event('done')
-            LOG.info('Successfully deployed node %(node)s with '
-                     'instance %(instance)s.',
-                     {'node': node.uuid, 'instance': node.instance_uuid})
-        elif new_state == states.DEPLOYWAIT:
+        if result == states.DEPLOYWAIT:
+            # Kill this worker, the async step will make an RPC call to
+            # continue_node_deploy() to continue deploying
+            LOG.info('Deploy step %(step)s on node %(node)s being '
+                     'executed asynchronously, waiting for driver.',
+                     {'node': node.uuid, 'step': step})
             task.process_event('wait')
-        else:
-            LOG.error('Unexpected state %(state)s returned while '
-                      'deploying node %(node)s.',
-                      {'state': new_state, 'node': node.uuid})
-    finally:
-        node.save()
+            return
+        elif result is not None:
+            # NOTE(rloo): This is an internal/dev error; shouldn't happen.
+            log_msg = (_('While executing deploy step %(step)s on node '
+                       '%(node)s, step returned unexpected state: %(val)s')
+                       % {'step': step, 'node': node.uuid, 'val': result})
+            utils.deploying_error_handler(
+                task, log_msg,
+                _("Failed to deploy: %s") % node.deploy_step)
+            return
+
+        LOG.info('Node %(node)s finished deploy step %(step)s',
+                 {'node': node.uuid, 'step': step})
+
+    # Finished executing the steps. Clear deploy_step.
+    node.deploy_step = None
+    driver_internal_info = node.driver_internal_info
+    driver_internal_info['deploy_steps'] = None
+    driver_internal_info.pop('deploy_step_index', None)
+    node.driver_internal_info = driver_internal_info
+    node.save()
+
+    task.process_event('done')
+    LOG.info('Successfully deployed node %(node)s with '
+             'instance %(instance)s.',
+             {'node': node.uuid, 'instance': node.instance_uuid})
 
 
 @task_manager.require_exclusive_lock
diff --git a/ironic/conductor/rpcapi.py b/ironic/conductor/rpcapi.py
index 1894690307..c244a6623c 100644
--- a/ironic/conductor/rpcapi.py
+++ b/ironic/conductor/rpcapi.py
@@ -93,13 +93,14 @@ class ConductorAPI(object):
     |    1.42 - Added optional agent_version to heartbeat
     |    1.43 - Added do_node_rescue, do_node_unrescue and can_send_rescue
     |    1.44 - Added add_node_traits and remove_node_traits.
+    |    1.45 - Added continue_node_deploy
 
     """
 
     # NOTE(rloo): This must be in sync with manager.ConductorManager's.
     # NOTE(pas-ha): This also must be in sync with
     #               ironic.common.release_mappings.RELEASE_MAPPING['master']
-    RPC_API_VERSION = '1.44'
+    RPC_API_VERSION = '1.45'
 
     def __init__(self, topic=None):
         super(ConductorAPI, self).__init__()
@@ -424,6 +425,20 @@ class ConductorAPI(object):
         return cctxt.cast(context, 'continue_node_clean',
                           node_id=node_id)
 
+    def continue_node_deploy(self, context, node_id, topic=None):
+        """Signal to conductor service to start the next deployment action.
+
+        NOTE(rloo): this is an RPC cast, there will be no response or
+        exception raised by the conductor for this RPC.
+
+        :param context: request context.
+        :param node_id: node id or uuid.
+        :param topic: RPC topic. Defaults to self.topic.
+        """
+        cctxt = self.client.prepare(topic=topic or self.topic, version='1.45')
+        return cctxt.cast(context, 'continue_node_deploy',
+                          node_id=node_id)
+
     def validate_driver_interfaces(self, context, node_id, topic=None):
         """Validate the `core` and `standardized` interfaces for drivers.
 
diff --git a/ironic/conductor/utils.py b/ironic/conductor/utils.py
index 8895d0d476..7c68ac1fb6 100644
--- a/ironic/conductor/utils.py
+++ b/ironic/conductor/utils.py
@@ -42,6 +42,20 @@ CLEANING_INTERFACE_PRIORITY = {
     'raid': 1,
 }
 
+DEPLOYING_INTERFACE_PRIORITY = {
+    # When two deploy steps have the same priority, their order is determined
+    # by which interface is implementing the step. The step of the interface
+    # with the highest value here, will be executed first in that case.
+    # TODO(rloo): If we think it makes sense to have the interface priorities
+    # the same for cleaning & deploying, replace the two with one e.g.
+    # 'INTERFACE_PRIORITIES'.
+    'power': 5,
+    'management': 4,
+    'deploy': 3,
+    'bios': 2,
+    'raid': 1,
+}
+
 
 @task_manager.require_exclusive_lock
 def node_set_boot_device(task, device, persistent=False):
@@ -335,27 +349,9 @@ def cleanup_after_timeout(task):
 
     :param task: a TaskManager instance.
     """
-    node = task.node
     msg = (_('Timeout reached while waiting for callback for node %s')
-           % node.uuid)
-    node.last_error = msg
-    LOG.error(msg)
-    node.save()
-
-    error_msg = _('Cleanup failed for node %(node)s after deploy timeout: '
-                  ' %(error)s')
-    try:
-        task.driver.deploy.clean_up(task)
-    except Exception as e:
-        msg = error_msg % {'node': node.uuid, 'error': e}
-        LOG.error(msg)
-        if isinstance(e, exception.IronicException):
-            node.last_error = msg
-        else:
-            node.last_error = _('Deploy timed out, but an unhandled '
-                                'exception was encountered while aborting. '
-                                'More info may be found in the log file.')
-        node.save()
+           % task.node.uuid)
+    deploying_error_handler(task, msg, msg)
 
 
 def provisioning_error_handler(e, node, provision_state,
@@ -441,6 +437,57 @@ def cleaning_error_handler(task, msg, tear_down_cleaning=True,
         task.process_event('fail', target_state=target_state)
 
 
+def deploying_error_handler(task, logmsg, errmsg, traceback=False,
+                            clean_up=True):
+    """Put a failed node in DEPLOYFAIL.
+
+    :param task: the task
+    :param logmsg: message to be logged
+    :param errmsg: message for the user
+    :param traceback: Boolean; True to log a traceback
+    :param clean_up: Boolean; True to clean up
+    """
+    node = task.node
+    LOG.error(logmsg, exc_info=traceback)
+    node.last_error = errmsg
+    node.save()
+
+    cleanup_err = None
+    if clean_up:
+        try:
+            task.driver.deploy.clean_up(task)
+        except Exception as e:
+            msg = ('Cleanup failed for node %(node)s; reason: %(err)s'
+                   % {'node': node.uuid, 'err': e})
+            LOG.exception(msg)
+            if isinstance(e, exception.IronicException):
+                addl = _('Also failed to clean up due to: %s') % e
+            else:
+                addl = _('An unhandled exception was encountered while '
+                         'aborting. More information may be found in the log '
+                         'file.')
+            cleanup_err = _('%(err)s. %(add)s') % {'err': errmsg, 'add': addl}
+
+    node.refresh()
+    if node.provision_state in (
+            states.DEPLOYING,
+            states.DEPLOYWAIT,
+            states.DEPLOYFAIL):
+        # Clear deploy step; we leave the list of deploy steps
+        # in node.driver_internal_info for debugging purposes.
+        node.deploy_step = {}
+        info = node.driver_internal_info
+        info.pop('deploy_step_index', None)
+        node.driver_internal_info = info
+
+    if cleanup_err:
+        node.last_error = cleanup_err
+    node.save()
+
+    # NOTE(deva): there is no need to clear conductor_affinity
+    task.process_event('fail')
+
+
 @task_manager.require_exclusive_lock
 def abort_on_conductor_take_over(task):
     """Set node's state when a task was aborted due to conductor take over.
@@ -537,6 +584,11 @@ def spawn_cleaning_error_handler(e, node):
     _spawn_error_handler(e, node, states.CLEANING)
 
 
+def spawn_deploying_error_handler(e, node):
+    """Handle spawning error for node deploying."""
+    _spawn_error_handler(e, node, states.DEPLOYING)
+
+
 def spawn_rescue_error_handler(e, node):
     """Handle spawning error for node rescue."""
     if isinstance(e, exception.NoFreeConductorWorker):
@@ -569,7 +621,7 @@ def power_state_error_handler(e, node, power_state):
                     {'node': node.uuid, 'power_state': power_state})
 
 
-def _step_key(step):
+def _clean_step_key(step):
     """Sort by priority, then interface priority in event of tie.
 
     :param step: cleaning step dict to get priority for.
@@ -578,6 +630,49 @@ def _step_key(step):
             CLEANING_INTERFACE_PRIORITY[step.get('interface')])
 
 
+def _deploy_step_key(step):
+    """Sort by priority, then interface priority in event of tie.
+
+    :param step: deploy step dict to get priority for.
+    """
+    return (step.get('priority'),
+            DEPLOYING_INTERFACE_PRIORITY[step.get('interface')])
+
+
+def _get_steps(task, interfaces, get_method, enabled=False,
+               sort_step_key=None):
+    """Get steps for task.node.
+
+    :param task: A TaskManager object
+    :param interfaces: A dictionary of (key) interfaces and their
+        (value) priorities. These are the interfaces that will have steps of
+        interest. The priorities are used for deciding the priorities of steps
+        having the same priority.
+    :param get_method: The method used to get the steps from the node's
+        interface; a string.
+    :param enabled: If True, returns only enabled (priority > 0) steps. If
+        False, returns all steps.
+    :param sort_step_key: If set, this is a method (key) used to sort the steps
+        from highest priority to lowest priority. For steps having the same
+        priority, they are sorted from highest interface priority to lowest.
+    :raises: NodeCleaningFailure or InstanceDeployFailure if there was a
+        problem getting the steps.
+    :returns: A list of step dictionaries
+    """
+    # Get steps from each interface
+    steps = list()
+    for interface in interfaces:
+        interface = getattr(task.driver, interface)
+        if interface:
+            interface_steps = [x for x in getattr(interface, get_method)(task)
+                               if not enabled or x['priority'] > 0]
+            steps.extend(interface_steps)
+    if sort_step_key:
+        # Sort the steps from higher priority to lower priority
+        steps = sorted(steps, key=sort_step_key, reverse=True)
+    return steps
+
+
 def _get_cleaning_steps(task, enabled=False, sort=True):
     """Get cleaning steps for task.node.
 
@@ -591,18 +686,31 @@ def _get_cleaning_steps(task, enabled=False, sort=True):
         clean steps.
     :returns: A list of clean step dictionaries
     """
-    # Iterate interfaces and get clean steps from each
-    steps = list()
-    for interface in CLEANING_INTERFACE_PRIORITY:
-        interface = getattr(task.driver, interface)
-        if interface:
-            interface_steps = [x for x in interface.get_clean_steps(task)
-                               if not enabled or x['priority'] > 0]
-            steps.extend(interface_steps)
+    sort_key = None
     if sort:
-        # Sort the steps from higher priority to lower priority
-        steps = sorted(steps, key=_step_key, reverse=True)
-    return steps
+        sort_key = _clean_step_key
+    return _get_steps(task, CLEANING_INTERFACE_PRIORITY, 'get_clean_steps',
+                      enabled=enabled, sort_step_key=sort_key)
+
+
+def _get_deployment_steps(task, enabled=False, sort=True):
+    """Get deployment steps for task.node.
+
+    :param task: A TaskManager object
+    :param enabled: If True, returns only enabled (priority > 0) steps. If
+        False, returns all deploy steps.
+    :param sort: If True, the steps are sorted from highest priority to lowest
+        priority. For steps having the same priority, they are sorted from
+        highest interface priority to lowest.
+    :raises: InstanceDeployFailure if there was a problem getting the
+        deploy steps.
+    :returns: A list of deploy step dictionaries
+    """
+    sort_key = None
+    if sort:
+        sort_key = _deploy_step_key
+    return _get_steps(task, DEPLOYING_INTERFACE_PRIORITY, 'get_deploy_steps',
+                      enabled=enabled, sort_step_key=sort_key)
 
 
 def set_node_cleaning_steps(task):
@@ -643,6 +751,24 @@ def set_node_cleaning_steps(task):
     node.save()
 
 
+def set_node_deployment_steps(task):
+    """Set up the node with deployment step information for deploying.
+
+    Get the deploy steps from the driver.
+
+    :raises: InstanceDeployFailure if there was a problem getting the
+             deployment steps.
+    """
+    node = task.node
+    driver_internal_info = node.driver_internal_info
+    driver_internal_info['deploy_steps'] = _get_deployment_steps(
+        task, enabled=True)
+    node.deploy_step = {}
+    driver_internal_info['deploy_step_index'] = None
+    node.driver_internal_info = driver_internal_info
+    node.save()
+
+
 def _validate_user_clean_steps(task, user_steps):
     """Validate the user-specified clean steps.
 
@@ -843,13 +969,28 @@ def validate_instance_info_traits(node):
         raise exception.InvalidParameterValue(err)
 
 
-def notify_conductor_resume_clean(task):
-    LOG.debug('Sending RPC to conductor to resume cleaning for node %s',
-              task.node.uuid)
+def _notify_conductor_resume_operation(task, operation, method):
+    """Notify the conductor to resume an operation.
+
+    :param task: the task
+    :param operation: the operation, a string
+    :param method: The name of the RPC method, a string
+    """
+    LOG.debug('Sending RPC to conductor to resume %(op)s for node %(node)s',
+              {'op': operation, 'node': task.node.uuid})
     from ironic.conductor import rpcapi
     uuid = task.node.uuid
     rpc = rpcapi.ConductorAPI()
     topic = rpc.get_topic_for(task.node)
     # Need to release the lock to let the conductor take it
     task.release_resources()
-    rpc.continue_node_clean(task.context, uuid, topic=topic)
+    getattr(rpc, method)(task.context, uuid, topic=topic)
+
+
+def notify_conductor_resume_clean(task):
+    _notify_conductor_resume_operation(task, 'cleaning', 'continue_node_clean')
+
+
+def notify_conductor_resume_deploy(task):
+    _notify_conductor_resume_operation(task, 'deploying',
+                                       'continue_node_deploy')
diff --git a/ironic/drivers/base.py b/ironic/drivers/base.py
index 950cabbae4..eb2144a71c 100644
--- a/ironic/drivers/base.py
+++ b/ironic/drivers/base.py
@@ -192,8 +192,8 @@ class BaseInterface(object):
         """
 
     def __new__(cls, *args, **kwargs):
-        # Get the list of clean steps when the interface is initialized by
-        # the conductor. We use __new__ instead of __init___
+        # Get the list of clean steps and deploy steps, when the interface is
+        # initialized by the conductor. We use __new__ instead of __init___
         # to avoid breaking backwards compatibility with all the drivers.
         # We want to return all steps, regardless of priority.
 
@@ -203,6 +203,7 @@ class BaseInterface(object):
         else:
             instance = super_new(cls, *args, **kwargs)
         instance.clean_steps = []
+        instance.deploy_steps = []
         for n, method in inspect.getmembers(instance, inspect.ismethod):
             if getattr(method, '_is_clean_step', False):
                 # Create a CleanStep to represent this method
@@ -212,11 +213,40 @@ class BaseInterface(object):
                         'argsinfo': method._clean_step_argsinfo,
                         'interface': instance.interface_type}
                 instance.clean_steps.append(step)
-        LOG.debug('Found clean steps %(steps)s for interface %(interface)s',
-                  {'steps': instance.clean_steps,
-                   'interface': instance.interface_type})
+            elif getattr(method, '_is_deploy_step', False):
+                # Create a DeployStep to represent this method
+                step = {'step': method.__name__,
+                        'priority': method._deploy_step_priority,
+                        'argsinfo': method._deploy_step_argsinfo,
+                        'interface': instance.interface_type}
+                instance.deploy_steps.append(step)
+        if instance.clean_steps:
+            LOG.debug('Found clean steps %(steps)s for interface '
+                      '%(interface)s',
+                      {'steps': instance.clean_steps,
+                       'interface': instance.interface_type})
+        if instance.deploy_steps:
+            LOG.debug('Found deploy steps %(steps)s for interface '
+                      '%(interface)s',
+                      {'steps': instance.deploy_steps,
+                       'interface': instance.interface_type})
         return instance
 
+    def _execute_step(self, task, step):
+        """Execute the step on task.node.
+
+        A step must take a single positional argument: a TaskManager
+        object. It may take one or more keyword variable arguments.
+
+        :param task: A TaskManager object
+        :param step: The step dictionary representing the step to execute
+        """
+        args = step.get('args')
+        if args is not None:
+            return getattr(self, step['step'])(task, **args)
+        else:
+            return getattr(self, step['step'])(task)
+
     def get_clean_steps(self, task):
         """Get a list of (enabled and disabled) clean steps for the interface.
 
@@ -253,11 +283,46 @@ class BaseInterface(object):
             states.CLEANWAIT if the step will continue to execute
             asynchronously.
         """
-        args = step.get('args')
-        if args is not None:
-            return getattr(self, step['step'])(task, **args)
-        else:
-            return getattr(self, step['step'])(task)
+        return self._execute_step(task, step)
+
+    def get_deploy_steps(self, task):
+        """Get a list of (enabled and disabled) deploy steps for the interface.
+
+        This function will return all deploy steps (both enabled and disabled)
+        for the interface, in an unordered list.
+
+        :param task: A TaskManager object, useful for interfaces overriding
+            this function
+        :raises InstanceDeployFailure: if there is a problem getting the steps
+            from the driver. For example, when a node (using an agent driver)
+            has just been enrolled and the agent isn't alive yet to be queried
+            for the available deploy steps.
+        :returns: A list of deploy step dictionaries
+        """
+        return self.deploy_steps
+
+    def execute_deploy_step(self, task, step):
+        """Execute the deploy step on task.node.
+
+        A deploy step must take a single positional argument: a TaskManager
+        object. It may take one or more keyword variable arguments (for
+        use in the future, when deploy steps can be specified via the API).
+
+        A step can be executed synchronously or asynchronously. A step should
+        return None if the method has completed synchronously or
+        states.DEPLOYWAIT if the step will continue to execute asynchronously.
+        If the step executes asynchronously, it should issue a call to the
+        'continue_node_deploy' RPC, so the conductor can begin the next
+        deploy step.
+
+        :param task: A TaskManager object
+        :param step: The deploy step dictionary representing the step to
+            execute
+        :returns: None if this method has completed synchronously, or
+            states.DEPLOYWAIT if the step will continue to execute
+            asynchronously.
+        """
+        return self._execute_step(task, step)
 
 
 class DeployInterface(BaseInterface):
@@ -1462,3 +1527,59 @@ def clean_step(priority, abortable=False, argsinfo=None):
         func._clean_step_argsinfo = argsinfo
         return func
     return decorator
+
+
+def deploy_step(priority, argsinfo=None):
+    """Decorator for deployment steps.
+
+    Only steps with priorities greater than 0 are used.
+    These steps are ordered by priority from highest value to lowest
+    value. For steps with the same priority, they are ordered by driver
+    interface priority (see conductor.manager.DEPLOYING_INTERFACE_PRIORITY).
+    execute_deploy_step() will be called on each step.
+
+    Decorated deploy steps must take as the only positional argument, a
+    TaskManager object.
+
+    Deploy steps can be either synchronous or asynchronous.  If the step is
+    synchronous, it should return `None` when finished, and the conductor
+    will continue on to the next step. While the deploy step is executing, the
+    node will be in `states.DEPLOYING` provision state. If the step is
+    asynchronous, the step should return `states.DEPLOYWAIT` to the
+    conductor before it starts the asynchronous work.  When the step is
+    complete, the step should make an RPC call to `continue_node_deploy` to
+    move to the next step in deployment. The node will be in
+    `states.DEPLOYWAIT` provision state during the asynchronous work.
+
+    Examples::
+
+        class MyInterface(base.BaseInterface):
+            @base.deploy_step(priority=100)
+            def example_deploying(self, task):
+                # do some deploying
+
+    :param priority: an integer (>=0) priority; used for determining the order
+        in which the step is run in the deployment process.
+    :param argsinfo: a dictionary of keyword arguments where key is the name of
+        the argument and value is a dictionary as follows::
+
+            'description': <description>. Required. This should include
+                           possible values.
+            'required': Boolean. Optional; default is False. True if this
+                        argument is required.  If so, it must be specified in
+                        the deployment request; false if it is optional.
+    :raises InvalidParameterValue: if any of the arguments are invalid
+    """
+    def decorator(func):
+        func._is_deploy_step = True
+        if isinstance(priority, int) and priority >= 0:
+            func._deploy_step_priority = priority
+        else:
+            raise exception.InvalidParameterValue(
+                _('"priority" must be an integer value >= 0, instead of "%s"')
+                % priority)
+
+        _validate_argsinfo(argsinfo)
+        func._deploy_step_argsinfo = argsinfo
+        return func
+    return decorator
diff --git a/ironic/drivers/modules/agent.py b/ironic/drivers/modules/agent.py
index c8c861f8b9..997b231476 100644
--- a/ironic/drivers/modules/agent.py
+++ b/ironic/drivers/modules/agent.py
@@ -401,6 +401,7 @@ class AgentDeploy(AgentDeployMixin, base.DeployInterface):
         validate_image_proxies(node)
 
     @METRICS.timer('AgentDeploy.deploy')
+    @base.deploy_step(priority=100)
     @task_manager.require_exclusive_lock
     def deploy(self, task):
         """Perform a deployment to a node.
@@ -427,7 +428,7 @@ class AgentDeploy(AgentDeployMixin, base.DeployInterface):
             task.driver.boot.prepare_instance(task)
             manager_utils.node_power_action(task, states.POWER_ON)
             LOG.info('Deployment to node %s done', task.node.uuid)
-            return states.DEPLOYDONE
+            return None
 
     @METRICS.timer('AgentDeploy.tear_down')
     @task_manager.require_exclusive_lock
diff --git a/ironic/drivers/modules/agent_base_vendor.py b/ironic/drivers/modules/agent_base_vendor.py
index 4c777c09e6..a332a77afb 100644
--- a/ironic/drivers/modules/agent_base_vendor.py
+++ b/ironic/drivers/modules/agent_base_vendor.py
@@ -655,8 +655,14 @@ class AgentDeployMixin(HeartbeatMixin):
             log_and_raise_deployment_error(task, msg, collect_logs=False,
                                            exc=e)
 
-        task.process_event('done')
-        LOG.info('Deployment to node %s done', task.node.uuid)
+        if not node.deploy_step:
+            # TODO(rloo): delete this 'if' part after deprecation period, when
+            # we expect all (out-of-tree) drivers to support deploy steps.
+            # After which we will always notify_conductor_resume_deploy().
+            task.process_event('done')
+            LOG.info('Deployment to node %s done', task.node.uuid)
+        else:
+            manager_utils.notify_conductor_resume_deploy(task)
 
     @METRICS.timer('AgentDeployMixin.prepare_instance_to_boot')
     def prepare_instance_to_boot(self, task, root_uuid, efi_sys_uuid):
diff --git a/ironic/drivers/modules/ansible/deploy.py b/ironic/drivers/modules/ansible/deploy.py
index b8211b2e80..273effb9ec 100644
--- a/ironic/drivers/modules/ansible/deploy.py
+++ b/ironic/drivers/modules/ansible/deploy.py
@@ -428,6 +428,7 @@ class AnsibleDeploy(agent_base.HeartbeatMixin, base.DeployInterface):
         _run_playbook(node, playbook, extra_vars, key)
 
     @METRICS.timer('AnsibleDeploy.deploy')
+    @base.deploy_step(priority=100)
     @task_manager.require_exclusive_lock
     def deploy(self, task):
         """Perform a deployment to a node."""
@@ -573,6 +574,15 @@ class AnsibleDeploy(agent_base.HeartbeatMixin, base.DeployInterface):
         self.reboot_and_finish_deploy(task)
         task.driver.boot.clean_up_ramdisk(task)
 
+        if not node.deploy_step:
+            # TODO(rloo): delete this 'if' part after deprecation period, when
+            # we expect all (out-of-tree) drivers to support deploy steps.
+            # After which we will always notify_conductor_resume_deploy().
+            task.process_event('done')
+            LOG.info('Deployment to node %s done', task.node.uuid)
+        else:
+            manager_utils.notify_conductor_resume_deploy(task)
+
     @METRICS.timer('AnsibleDeploy.reboot_and_finish_deploy')
     def reboot_and_finish_deploy(self, task):
         wait = CONF.ansible.post_deploy_get_power_state_retry_interval * 1000
@@ -620,6 +630,3 @@ class AnsibleDeploy(agent_base.HeartbeatMixin, base.DeployInterface):
                      'Error: %(error)s') %
                    {'node': node.uuid, 'error': e})
             agent_base.log_and_raise_deployment_error(task, msg)
-
-        task.process_event('done')
-        LOG.info('Deployment to node %s done', task.node.uuid)
diff --git a/ironic/drivers/modules/deploy_utils.py b/ironic/drivers/modules/deploy_utils.py
index a0811d79cd..3b9920c679 100644
--- a/ironic/drivers/modules/deploy_utils.py
+++ b/ironic/drivers/modules/deploy_utils.py
@@ -510,7 +510,7 @@ def set_failed_state(task, msg, collect_logs=True):
     with the given error message. It also powers off the baremetal node.
 
     :param task: a TaskManager instance containing the node to act on.
-    :param msg: the message to set in last_error of the node.
+    :param msg: the message to set in logs and last_error of the node.
     :param collect_logs: Boolean indicating whether to attempt to collect
                          logs from IPA-based ramdisk. Defaults to True.
                          Actual log collection is also affected by
@@ -523,7 +523,7 @@ def set_failed_state(task, msg, collect_logs=True):
         driver_utils.collect_ramdisk_logs(node)
 
     try:
-        task.process_event('fail')
+        manager_utils.deploying_error_handler(task, msg, msg, clean_up=False)
     except exception.InvalidState:
         msg2 = ('Internal error. Node %(node)s in provision state '
                 '"%(state)s" could not transition to a failed state.'
diff --git a/ironic/drivers/modules/fake.py b/ironic/drivers/modules/fake.py
index d588675eb6..680958f024 100644
--- a/ironic/drivers/modules/fake.py
+++ b/ironic/drivers/modules/fake.py
@@ -85,7 +85,7 @@ class FakeBoot(base.BootInterface):
 class FakeDeploy(base.DeployInterface):
     """Class for a fake deployment driver.
 
-    Example imlementation of a deploy interface that uses a
+    Example implementation of a deploy interface that uses a
     separate power interface.
     """
 
@@ -95,8 +95,9 @@ class FakeDeploy(base.DeployInterface):
     def validate(self, task):
         pass
 
+    @base.deploy_step(priority=100)
     def deploy(self, task):
-        return states.DEPLOYDONE
+        return None
 
     def tear_down(self, task):
         return states.DELETED
diff --git a/ironic/drivers/modules/iscsi_deploy.py b/ironic/drivers/modules/iscsi_deploy.py
index 35fb9a08f2..0cbea4cccd 100644
--- a/ironic/drivers/modules/iscsi_deploy.py
+++ b/ironic/drivers/modules/iscsi_deploy.py
@@ -445,6 +445,7 @@ class ISCSIDeploy(AgentDeployMixin, base.DeployInterface):
         validate(task)
 
     @METRICS.timer('ISCSIDeploy.deploy')
+    @base.deploy_step(priority=100)
     @task_manager.require_exclusive_lock
     def deploy(self, task):
         """Start deployment of the task's node.
@@ -474,9 +475,8 @@ class ISCSIDeploy(AgentDeployMixin, base.DeployInterface):
             task.driver.network.configure_tenant_networks(task)
             task.driver.boot.prepare_instance(task)
             manager_utils.node_power_action(task, states.POWER_ON)
-            task.process_event('done')
-            LOG.info('Deployment to node %s done', node.uuid)
-            return states.DEPLOYDONE
+
+            return None
 
     @METRICS.timer('ISCSIDeploy.tear_down')
     @task_manager.require_exclusive_lock
diff --git a/ironic/tests/unit/conductor/test_manager.py b/ironic/tests/unit/conductor/test_manager.py
index 26914b3400..a826e342d7 100644
--- a/ironic/tests/unit/conductor/test_manager.py
+++ b/ironic/tests/unit/conductor/test_manager.py
@@ -1187,8 +1187,13 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
             self.assertFalse(node.driver_internal_info['is_whole_disk_image'])
 
     @mock.patch('ironic.drivers.modules.fake.FakeDeploy.deploy')
-    def test_do_node_deploy_rebuild_active_state(self, mock_deploy, mock_iwdi):
-        # This tests manager.do_node_deploy(), the 'else' path of
+    def test_do_node_deploy_rebuild_active_state_old(self, mock_deploy,
+                                                     mock_iwdi):
+        # TODO(rloo): delete this after the deprecation period for supporting
+        # non deploy_steps.
+        # Mocking FakeDeploy.deploy before starting the service, causes
+        # it not to be a deploy_step.
+        # This tests manager._old_rest_of_do_node_deploy(), the 'else' path of
         # 'if new_state == states.DEPLOYDONE'. The node's states
         # aren't changed in this case.
         mock_iwdi.return_value = True
@@ -1220,8 +1225,12 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
         self.assertTrue(node.driver_internal_info['is_whole_disk_image'])
 
     @mock.patch('ironic.drivers.modules.fake.FakeDeploy.deploy')
-    def test_do_node_deploy_rebuild_active_state_waiting(self, mock_deploy,
-                                                         mock_iwdi):
+    def test_do_node_deploy_rebuild_active_state_waiting_old(self, mock_deploy,
+                                                             mock_iwdi):
+        # TODO(rloo): delete this after the deprecation period for supporting
+        # non deploy_steps.
+        # Mocking FakeDeploy.deploy before starting the service, causes
+        # it not to be a deploy_step.
         mock_iwdi.return_value = False
         self._start_service()
         mock_deploy.return_value = states.DEPLOYWAIT
@@ -1245,8 +1254,12 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
         self.assertFalse(node.driver_internal_info['is_whole_disk_image'])
 
     @mock.patch('ironic.drivers.modules.fake.FakeDeploy.deploy')
-    def test_do_node_deploy_rebuild_active_state_done(self, mock_deploy,
-                                                      mock_iwdi):
+    def test_do_node_deploy_rebuild_active_state_done_old(self, mock_deploy,
+                                                          mock_iwdi):
+        # TODO(rloo): delete this after the deprecation period for supporting
+        # non deploy_steps.
+        # Mocking FakeDeploy.deploy before starting the service, causes
+        # it not to be a deploy_step.
         mock_iwdi.return_value = False
         self._start_service()
         mock_deploy.return_value = states.DEPLOYDONE
@@ -1269,8 +1282,12 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
         self.assertFalse(node.driver_internal_info['is_whole_disk_image'])
 
     @mock.patch('ironic.drivers.modules.fake.FakeDeploy.deploy')
-    def test_do_node_deploy_rebuild_deployfail_state(self, mock_deploy,
-                                                     mock_iwdi):
+    def test_do_node_deploy_rebuild_deployfail_state_old(self, mock_deploy,
+                                                         mock_iwdi):
+        # TODO(rloo): delete this after the deprecation period for supporting
+        # non deploy_steps.
+        # Mocking FakeDeploy.deploy before starting the service, causes
+        # it not to be a deploy_step.
         mock_iwdi.return_value = False
         self._start_service()
         mock_deploy.return_value = states.DEPLOYDONE
@@ -1293,7 +1310,12 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
         self.assertFalse(node.driver_internal_info['is_whole_disk_image'])
 
     @mock.patch('ironic.drivers.modules.fake.FakeDeploy.deploy')
-    def test_do_node_deploy_rebuild_error_state(self, mock_deploy, mock_iwdi):
+    def test_do_node_deploy_rebuild_error_state_old(self, mock_deploy,
+                                                    mock_iwdi):
+        # TODO(rloo): delete this after the deprecation period for supporting
+        # non deploy_steps.
+        # Mocking FakeDeploy.deploy before starting the service, causes
+        # it not to be a deploy_step.
         mock_iwdi.return_value = False
         self._start_service()
         mock_deploy.return_value = states.DEPLOYDONE
@@ -1315,6 +1337,135 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
         mock_iwdi.assert_called_once_with(self.context, node.instance_info)
         self.assertFalse(node.driver_internal_info['is_whole_disk_image'])
 
+    def test_do_node_deploy_rebuild_active_state_error(self, mock_iwdi):
+        # Tests manager.do_node_deploy() & manager._do_next_deploy_step(),
+        # when getting an unexpected state returned from a deploy_step.
+        mock_iwdi.return_value = True
+        self._start_service()
+        with mock.patch.object(fake.FakeDeploy,
+                               'deploy', autospec=True) as mock_deploy:
+            mock_deploy.return_value = states.DEPLOYING
+            node = obj_utils.create_test_node(
+                self.context, driver='fake-hardware',
+                provision_state=states.ACTIVE,
+                target_provision_state=states.NOSTATE,
+                instance_info={'image_source': uuidutils.generate_uuid(),
+                               'kernel': 'aaaa', 'ramdisk': 'bbbb'},
+                driver_internal_info={'is_whole_disk_image': False})
+
+            self.service.do_node_deploy(self.context, node.uuid, rebuild=True)
+            self._stop_service()
+            node.refresh()
+            self.assertEqual(states.DEPLOYFAIL, node.provision_state)
+            self.assertEqual(states.ACTIVE, node.target_provision_state)
+            self.assertIsNotNone(node.last_error)
+            # Verify reservation has been cleared.
+            self.assertIsNone(node.reservation)
+            mock_deploy.assert_called_once_with(mock.ANY, mock.ANY)
+            # Verify instance_info values have been cleared.
+            self.assertNotIn('kernel', node.instance_info)
+            self.assertNotIn('ramdisk', node.instance_info)
+            mock_iwdi.assert_called_once_with(self.context, node.instance_info)
+            # Verify is_whole_disk_image reflects correct value on rebuild.
+            self.assertTrue(node.driver_internal_info['is_whole_disk_image'])
+
+    def test_do_node_deploy_rebuild_active_state_waiting(self, mock_iwdi):
+        mock_iwdi.return_value = False
+        self._start_service()
+        with mock.patch.object(fake.FakeDeploy,
+                               'deploy', autospec=True) as mock_deploy:
+            mock_deploy.return_value = states.DEPLOYWAIT
+            node = obj_utils.create_test_node(
+                self.context, driver='fake-hardware',
+                provision_state=states.ACTIVE,
+                target_provision_state=states.NOSTATE,
+                instance_info={'image_source': uuidutils.generate_uuid()})
+
+            self.service.do_node_deploy(self.context, node.uuid, rebuild=True)
+            self._stop_service()
+            node.refresh()
+            self.assertEqual(states.DEPLOYWAIT, node.provision_state)
+            self.assertEqual(states.ACTIVE, node.target_provision_state)
+            # last_error should be None.
+            self.assertIsNone(node.last_error)
+            # Verify reservation has been cleared.
+            self.assertIsNone(node.reservation)
+            mock_deploy.assert_called_once_with(mock.ANY, mock.ANY)
+            mock_iwdi.assert_called_once_with(self.context, node.instance_info)
+            self.assertFalse(node.driver_internal_info['is_whole_disk_image'])
+
+    def test_do_node_deploy_rebuild_active_state_done(self, mock_iwdi):
+        mock_iwdi.return_value = False
+        self._start_service()
+        with mock.patch.object(fake.FakeDeploy,
+                               'deploy', autospec=True) as mock_deploy:
+            mock_deploy.return_value = None
+            node = obj_utils.create_test_node(
+                self.context, driver='fake-hardware',
+                provision_state=states.ACTIVE,
+                target_provision_state=states.NOSTATE)
+
+            self.service.do_node_deploy(self.context, node.uuid, rebuild=True)
+            self._stop_service()
+            node.refresh()
+            self.assertEqual(states.ACTIVE, node.provision_state)
+            self.assertEqual(states.NOSTATE, node.target_provision_state)
+            # last_error should be None.
+            self.assertIsNone(node.last_error)
+            # Verify reservation has been cleared.
+            self.assertIsNone(node.reservation)
+            mock_deploy.assert_called_once_with(mock.ANY, mock.ANY)
+            mock_iwdi.assert_called_once_with(self.context, node.instance_info)
+            self.assertFalse(node.driver_internal_info['is_whole_disk_image'])
+
+    def test_do_node_deploy_rebuild_deployfail_state(self, mock_iwdi):
+        mock_iwdi.return_value = False
+        self._start_service()
+        with mock.patch.object(fake.FakeDeploy,
+                               'deploy', autospec=True) as mock_deploy:
+            mock_deploy.return_value = None
+            node = obj_utils.create_test_node(
+                self.context, driver='fake-hardware',
+                provision_state=states.DEPLOYFAIL,
+                target_provision_state=states.NOSTATE)
+
+            self.service.do_node_deploy(self.context, node.uuid, rebuild=True)
+            self._stop_service()
+            node.refresh()
+            self.assertEqual(states.ACTIVE, node.provision_state)
+            self.assertEqual(states.NOSTATE, node.target_provision_state)
+            # last_error should be None.
+            self.assertIsNone(node.last_error)
+            # Verify reservation has been cleared.
+            self.assertIsNone(node.reservation)
+            mock_deploy.assert_called_once_with(mock.ANY, mock.ANY)
+            mock_iwdi.assert_called_once_with(self.context, node.instance_info)
+            self.assertFalse(node.driver_internal_info['is_whole_disk_image'])
+
+    def test_do_node_deploy_rebuild_error_state(self, mock_iwdi):
+        mock_iwdi.return_value = False
+        self._start_service()
+        with mock.patch.object(fake.FakeDeploy,
+                               'deploy', autospec=True) as mock_deploy:
+            mock_deploy.return_value = None
+            node = obj_utils.create_test_node(
+                self.context, driver='fake-hardware',
+                provision_state=states.ERROR,
+                target_provision_state=states.NOSTATE)
+
+            self.service.do_node_deploy(self.context, node.uuid, rebuild=True)
+            self._stop_service()
+            node.refresh()
+            self.assertEqual(states.ACTIVE, node.provision_state)
+            self.assertEqual(states.NOSTATE, node.target_provision_state)
+            # last_error should be None.
+            self.assertIsNone(node.last_error)
+            # Verify reservation has been cleared.
+            self.assertIsNone(node.reservation)
+            mock_deploy.assert_called_once_with(mock.ANY, mock.ANY)
+            mock_iwdi.assert_called_once_with(self.context, node.instance_info)
+            self.assertFalse(node.driver_internal_info['is_whole_disk_image'])
+
     def test_do_node_deploy_rebuild_from_available_state(self, mock_iwdi):
         mock_iwdi.return_value = False
         self._start_service()
@@ -1365,6 +1516,109 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
             self.assertFalse(node.driver_internal_info['is_whole_disk_image'])
 
 
+@mgr_utils.mock_record_keepalive
+class ContinueNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
+                                 db_base.DbTestCase):
+    def setUp(self):
+        super(ContinueNodeDeployTestCase, self).setUp()
+        self.deploy_start = {
+            'step': 'deploy_start', 'priority': 50, 'interface': 'deploy'}
+        self.deploy_end = {
+            'step': 'deploy_end', 'priority': 20, 'interface': 'deploy'}
+        self.deploy_steps = [self.deploy_start, self.deploy_end]
+
+    @mock.patch('ironic.conductor.manager.ConductorManager._spawn_worker')
+    def test_continue_node_deploy_worker_pool_full(self, mock_spawn):
+        # Test the appropriate exception is raised if the worker pool is full
+        prv_state = states.DEPLOYWAIT
+        tgt_prv_state = states.ACTIVE
+        node = obj_utils.create_test_node(self.context, driver='fake-hardware',
+                                          provision_state=prv_state,
+                                          target_provision_state=tgt_prv_state,
+                                          last_error=None)
+        self._start_service()
+
+        mock_spawn.side_effect = exception.NoFreeConductorWorker()
+
+        self.assertRaises(exception.NoFreeConductorWorker,
+                          self.service.continue_node_deploy,
+                          self.context, node.uuid)
+
+    @mock.patch('ironic.conductor.manager.ConductorManager._spawn_worker')
+    def test_continue_node_deploy_wrong_state(self, mock_spawn):
+        # Test the appropriate exception is raised if node isn't already
+        # in DEPLOYWAIT state
+        prv_state = states.DEPLOYFAIL
+        tgt_prv_state = states.ACTIVE
+        node = obj_utils.create_test_node(self.context, driver='fake-hardware',
+                                          provision_state=prv_state,
+                                          target_provision_state=tgt_prv_state,
+                                          last_error=None)
+        self._start_service()
+
+        self.assertRaises(exception.InvalidStateRequested,
+                          self.service.continue_node_deploy,
+                          self.context, node.uuid)
+
+        self._stop_service()
+        node.refresh()
+        # Make sure node wasn't modified
+        self.assertEqual(prv_state, node.provision_state)
+        self.assertEqual(tgt_prv_state, node.target_provision_state)
+        # Verify reservation has been cleared.
+        self.assertIsNone(node.reservation)
+
+    @mock.patch('ironic.conductor.manager.ConductorManager._spawn_worker')
+    def test_continue_node_deploy(self, mock_spawn):
+        # test a node can continue deploying via RPC
+        prv_state = states.DEPLOYWAIT
+        tgt_prv_state = states.ACTIVE
+        driver_info = {'deploy_steps': self.deploy_steps,
+                       'deploy_step_index': 0}
+        node = obj_utils.create_test_node(self.context, driver='fake-hardware',
+                                          provision_state=prv_state,
+                                          target_provision_state=tgt_prv_state,
+                                          last_error=None,
+                                          driver_internal_info=driver_info,
+                                          deploy_step=self.deploy_steps[0])
+        self._start_service()
+        self.service.continue_node_deploy(self.context, node.uuid)
+        self._stop_service()
+        node.refresh()
+        self.assertEqual(states.DEPLOYING, node.provision_state)
+        self.assertEqual(tgt_prv_state, node.target_provision_state)
+        mock_spawn.assert_called_with(manager._do_next_deploy_step, mock.ANY,
+                                      1, mock.ANY)
+
+    @mock.patch.object(task_manager.TaskManager, 'process_event',
+                       autospec=True)
+    @mock.patch('ironic.conductor.manager.ConductorManager._spawn_worker')
+    def test_continue_node_deploy_deprecated(self, mock_spawn, mock_event):
+        # TODO(rloo): delete this when we remove support for handling
+        # deploy steps; node will always be in DEPLOYWAIT then.
+
+        # test a node can continue deploying via RPC
+        prv_state = states.DEPLOYING
+        tgt_prv_state = states.ACTIVE
+        driver_info = {'deploy_steps': self.deploy_steps,
+                       'deploy_step_index': 0}
+        self._start_service()
+        node = obj_utils.create_test_node(self.context, driver='fake-hardware',
+                                          provision_state=prv_state,
+                                          target_provision_state=tgt_prv_state,
+                                          last_error=None,
+                                          driver_internal_info=driver_info,
+                                          deploy_step=self.deploy_steps[0])
+        self.service.continue_node_deploy(self.context, node.uuid)
+        self._stop_service()
+        node.refresh()
+        self.assertEqual(states.DEPLOYING, node.provision_state)
+        self.assertEqual(tgt_prv_state, node.target_provision_state)
+        mock_spawn.assert_called_with(manager._do_next_deploy_step, mock.ANY,
+                                      1, mock.ANY)
+        self.assertFalse(mock_event.called)
+
+
 @mgr_utils.mock_record_keepalive
 class DoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
     @mock.patch('ironic.drivers.modules.fake.FakeDeploy.deploy')
@@ -1418,7 +1672,11 @@ class DoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
         self.assertFalse(mock_deploy.called)
 
     @mock.patch('ironic.drivers.modules.fake.FakeDeploy.deploy')
-    def test__do_node_deploy_driver_raises_error(self, mock_deploy):
+    def test__do_node_deploy_driver_raises_error_old(self, mock_deploy):
+        # TODO(rloo): delete this after the deprecation period for supporting
+        # non deploy_steps.
+        # Mocking FakeDeploy.deploy before starting the service, causes
+        # it not to be a deploy_step.
         self._start_service()
         # test when driver.deploy.deploy raises an ironic error
         mock_deploy.side_effect = exception.InstanceDeployFailure('test')
@@ -1440,7 +1698,12 @@ class DoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
         mock_deploy.assert_called_once_with(mock.ANY)
 
     @mock.patch('ironic.drivers.modules.fake.FakeDeploy.deploy')
-    def test__do_node_deploy_driver_unexpected_exception(self, mock_deploy):
+    def test__do_node_deploy_driver_unexpected_exception_old(self,
+                                                             mock_deploy):
+        # TODO(rloo): delete this after the deprecation period for supporting
+        # non deploy_steps.
+        # Mocking FakeDeploy.deploy before starting the service, causes
+        # it not to be a deploy_step.
         self._start_service()
         # test when driver.deploy.deploy raises an exception
         mock_deploy.side_effect = RuntimeError('test')
@@ -1461,9 +1724,48 @@ class DoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
         self.assertIsNotNone(node.last_error)
         mock_deploy.assert_called_once_with(mock.ANY)
 
+    def _test__do_node_deploy_driver_exception(self, exc, unexpected=False):
+        self._start_service()
+        with mock.patch.object(fake.FakeDeploy,
+                               'deploy', autospec=True) as mock_deploy:
+            # test when driver.deploy.deploy() raises an exception
+            mock_deploy.side_effect = exc
+            node = obj_utils.create_test_node(
+                self.context, driver='fake-hardware',
+                provision_state=states.DEPLOYING,
+                target_provision_state=states.ACTIVE)
+            task = task_manager.TaskManager(self.context, node.uuid)
+
+            manager.do_node_deploy(task, self.service.conductor.id)
+            node.refresh()
+            self.assertEqual(states.DEPLOYFAIL, node.provision_state)
+            # NOTE(deva): failing a deploy does not clear the target state
+            #             any longer. Instead, it is cleared when the instance
+            #             is deleted.
+            self.assertEqual(states.ACTIVE, node.target_provision_state)
+            self.assertIsNotNone(node.last_error)
+            if unexpected:
+                self.assertIn('Exception', node.last_error)
+            else:
+                self.assertNotIn('Exception', node.last_error)
+
+            mock_deploy.assert_called_once_with(mock.ANY, task)
+
+    def test__do_node_deploy_driver_ironic_exception(self):
+        self._test__do_node_deploy_driver_exception(
+            exception.InstanceDeployFailure('test'))
+
+    def test__do_node_deploy_driver_unexpected_exception(self):
+        self._test__do_node_deploy_driver_exception(RuntimeError('test'),
+                                                    unexpected=True)
+
     @mock.patch.object(manager, '_store_configdrive')
     @mock.patch('ironic.drivers.modules.fake.FakeDeploy.deploy')
-    def test__do_node_deploy_ok(self, mock_deploy, mock_store):
+    def test__do_node_deploy_ok_old(self, mock_deploy, mock_store):
+        # TODO(rloo): delete this after the deprecation period for supporting
+        # non deploy_steps.
+        # Mocking FakeDeploy.deploy before starting the service, causes
+        # it not to be a deploy_step.
         self._start_service()
         # test when driver.deploy.deploy returns DEPLOYDONE
         mock_deploy.return_value = states.DEPLOYDONE
@@ -1483,7 +1785,11 @@ class DoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
 
     @mock.patch.object(manager, '_store_configdrive')
     @mock.patch('ironic.drivers.modules.fake.FakeDeploy.deploy')
-    def test__do_node_deploy_ok_configdrive(self, mock_deploy, mock_store):
+    def test__do_node_deploy_ok_configdrive_old(self, mock_deploy, mock_store):
+        # TODO(rloo): delete this after the deprecation period for supporting
+        # non deploy_steps.
+        # Mocking FakeDeploy.deploy before starting the service, causes
+        # it not to be a deploy_step.
         self._start_service()
         # test when driver.deploy.deploy returns DEPLOYDONE
         mock_deploy.return_value = states.DEPLOYDONE
@@ -1502,15 +1808,44 @@ class DoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
         mock_deploy.assert_called_once_with(mock.ANY)
         mock_store.assert_called_once_with(task.node, configdrive)
 
+    @mock.patch.object(manager, '_store_configdrive')
+    def _test__do_node_deploy_ok(self, mock_store, configdrive=None):
+        self._start_service()
+        with mock.patch.object(fake.FakeDeploy,
+                               'deploy', autospec=True) as mock_deploy:
+            mock_deploy.return_value = None
+            node = obj_utils.create_test_node(
+                self.context, driver='fake-hardware',
+                provision_state=states.DEPLOYING,
+                target_provision_state=states.ACTIVE)
+            task = task_manager.TaskManager(self.context, node.uuid)
+
+            manager.do_node_deploy(task, self.service.conductor.id,
+                                   configdrive=configdrive)
+            node.refresh()
+            self.assertEqual(states.ACTIVE, node.provision_state)
+            self.assertEqual(states.NOSTATE, node.target_provision_state)
+            self.assertIsNone(node.last_error)
+            mock_deploy.assert_called_once_with(mock.ANY, mock.ANY)
+            if configdrive:
+                mock_store.assert_called_once_with(task.node, configdrive)
+            else:
+                self.assertFalse(mock_store.called)
+
+    def test__do_node_deploy_ok(self):
+        self._test__do_node_deploy_ok()
+
+    def test__do_node_deploy_ok_configdrive(self):
+        configdrive = 'foo'
+        self._test__do_node_deploy_ok(configdrive=configdrive)
+
     @mock.patch.object(swift, 'SwiftAPI')
-    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.deploy')
-    def test__do_node_deploy_configdrive_swift_error(self, mock_deploy,
+    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.prepare')
+    def test__do_node_deploy_configdrive_swift_error(self, mock_prepare,
                                                      mock_swift):
         CONF.set_override('configdrive_use_object_store', True,
                           group='deploy')
         self._start_service()
-        # test when driver.deploy.deploy returns DEPLOYDONE
-        mock_deploy.return_value = states.DEPLOYDONE
         node = obj_utils.create_test_node(self.context, driver='fake-hardware',
                                           provision_state=states.DEPLOYING,
                                           target_provision_state=states.ACTIVE)
@@ -1525,10 +1860,10 @@ class DoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
         self.assertEqual(states.DEPLOYFAIL, node.provision_state)
         self.assertEqual(states.ACTIVE, node.target_provision_state)
         self.assertIsNotNone(node.last_error)
-        self.assertFalse(mock_deploy.called)
+        self.assertFalse(mock_prepare.called)
 
-    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.deploy')
-    def test__do_node_deploy_configdrive_db_error(self, mock_deploy):
+    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.prepare')
+    def test__do_node_deploy_configdrive_db_error(self, mock_prepare):
         self._start_service()
         node = obj_utils.create_test_node(self.context, driver='fake-hardware',
                                           provision_state=states.DEPLOYING,
@@ -1539,7 +1874,7 @@ class DoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
         with mock.patch.object(dbapi.IMPL, 'update_node') as mock_db:
             db_node = self.dbapi.get_node_by_uuid(node.uuid)
             mock_db.side_effect = [db_exception.DBDataError('DB error'),
-                                   db_node, db_node]
+                                   db_node, db_node, db_node]
             self.assertRaises(db_exception.DBDataError,
                               manager.do_node_deploy, task,
                               self.service.conductor.id,
@@ -1551,17 +1886,22 @@ class DoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
                            'instance_info': expected_instance_info}),
                 mock.call(node.uuid,
                           {'version': mock.ANY,
-                           'provision_state': states.DEPLOYFAIL,
-                           'target_provision_state': states.ACTIVE}),
+                           'last_error': mock.ANY}),
                 mock.call(node.uuid,
                           {'version': mock.ANY,
-                           'last_error': mock.ANY})]
+                           'deploy_step': {},
+                           'driver_internal_info': mock.ANY}),
+                mock.call(node.uuid,
+                          {'version': mock.ANY,
+                           'provision_state': states.DEPLOYFAIL,
+                           'target_provision_state': states.ACTIVE}),
+            ]
             self.assertEqual(expected_calls, mock_db.mock_calls)
-            self.assertFalse(mock_deploy.called)
+            self.assertFalse(mock_prepare.called)
 
     @mock.patch.object(manager, '_store_configdrive')
-    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.deploy')
-    def test__do_node_deploy_configdrive_unexpected_error(self, mock_deploy,
+    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.prepare')
+    def test__do_node_deploy_configdrive_unexpected_error(self, mock_prepare,
                                                           mock_store):
         self._start_service()
         node = obj_utils.create_test_node(self.context, driver='fake-hardware',
@@ -1578,10 +1918,14 @@ class DoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
         self.assertEqual(states.DEPLOYFAIL, node.provision_state)
         self.assertEqual(states.ACTIVE, node.target_provision_state)
         self.assertIsNotNone(node.last_error)
-        self.assertFalse(mock_deploy.called)
+        self.assertFalse(mock_prepare.called)
 
     @mock.patch('ironic.drivers.modules.fake.FakeDeploy.deploy')
-    def test__do_node_deploy_ok_2(self, mock_deploy):
+    def test__do_node_deploy_ok_2_old(self, mock_deploy):
+        # TODO(rloo): delete this after the deprecation period for supporting
+        # non deploy_steps.
+        # Mocking FakeDeploy.deploy before starting the service, causes
+        # it not to be a deploy_step.
         # NOTE(rloo): a different way of testing for the same thing as in
         # test__do_node_deploy_ok()
         self._start_service()
@@ -1598,6 +1942,405 @@ class DoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
         self.assertIsNone(node.last_error)
         mock_deploy.assert_called_once_with(mock.ANY)
 
+    def test__do_node_deploy_ok_2(self):
+        # NOTE(rloo): a different way of testing for the same thing as in
+        # test__do_node_deploy_ok()
+        self._start_service()
+        with mock.patch.object(fake.FakeDeploy,
+                               'deploy', autospec=True) as mock_deploy:
+            # test when driver.deploy.deploy() returns None
+            mock_deploy.return_value = None
+            node = obj_utils.create_test_node(self.context,
+                                              driver='fake-hardware')
+            task = task_manager.TaskManager(self.context, node.uuid)
+            task.process_event('deploy')
+
+            manager.do_node_deploy(task, self.service.conductor.id)
+            node.refresh()
+            self.assertEqual(states.ACTIVE, node.provision_state)
+            self.assertEqual(states.NOSTATE, node.target_provision_state)
+            self.assertIsNone(node.last_error)
+            mock_deploy.assert_called_once_with(mock.ANY, mock.ANY)
+
+    @mock.patch.object(manager, '_do_next_deploy_step', autospec=True)
+    @mock.patch.object(manager, '_old_rest_of_do_node_deploy',
+                       autospec=True)
+    @mock.patch.object(conductor_utils, 'set_node_deployment_steps',
+                       autospec=True)
+    def test_do_node_deploy_deprecated(self, mock_set_steps, mock_old_way,
+                                       mock_deploy_step):
+        # TODO(rloo): no deploy steps; delete this when we remove support
+        # for handling no deploy steps.
+        self._start_service()
+        node = obj_utils.create_test_node(self.context, driver='fake-hardware')
+        task = task_manager.TaskManager(self.context, node.uuid)
+        task.process_event('deploy')
+
+        manager.do_node_deploy(task, self.service.conductor.id)
+        mock_set_steps.assert_called_once_with(task)
+        mock_old_way.assert_called_once_with(task, self.service.conductor.id,
+                                             True)
+        self.assertFalse(mock_deploy_step.called)
+        self.assertNotIn('deploy_steps', task.node.driver_internal_info)
+
+    @mock.patch.object(manager, '_do_next_deploy_step', autospec=True)
+    @mock.patch.object(manager, '_old_rest_of_do_node_deploy',
+                       autospec=True)
+    @mock.patch.object(conductor_utils, 'set_node_deployment_steps',
+                       autospec=True)
+    def test_do_node_deploy_steps(self, mock_set_steps, mock_old_way,
+                                  mock_deploy_step):
+        # these are not real steps...
+        fake_deploy_steps = ['step1', 'step2']
+
+        def add_steps(task):
+            info = task.node.driver_internal_info
+            info['deploy_steps'] = fake_deploy_steps
+            task.node.driver_internal_info = info
+            task.node.save()
+
+        mock_set_steps.side_effect = add_steps
+        self._start_service()
+        node = obj_utils.create_test_node(self.context, driver='fake-hardware')
+        task = task_manager.TaskManager(self.context, node.uuid)
+        task.process_event('deploy')
+
+        manager.do_node_deploy(task, self.service.conductor.id)
+        mock_set_steps.assert_called_once_with(task)
+        self.assertFalse(mock_old_way.called)
+        mock_set_steps.assert_called_once_with(task)
+        self.assertEqual(fake_deploy_steps,
+                         task.node.driver_internal_info['deploy_steps'])
+
+    @mock.patch.object(manager, '_do_next_deploy_step', autospec=True)
+    @mock.patch.object(manager, '_old_rest_of_do_node_deploy',
+                       autospec=True)
+    @mock.patch.object(conductor_utils, 'set_node_deployment_steps',
+                       autospec=True)
+    def test_do_node_deploy_steps_old_rpc(self, mock_set_steps, mock_old_way,
+                                          mock_deploy_step):
+        # TODO(rloo): old RPC; delete this when we remove support for drivers
+        # with no deploy steps.
+        CONF.set_override('pin_release_version', '11.0')
+        # these are not real steps...
+        fake_deploy_steps = ['step1', 'step2']
+
+        def add_steps(task):
+            info = task.node.driver_internal_info
+            info['deploy_steps'] = fake_deploy_steps
+            task.node.driver_internal_info = info
+            task.node.save()
+
+        mock_set_steps.side_effect = add_steps
+        self._start_service()
+        node = obj_utils.create_test_node(self.context, driver='fake-hardware')
+        task = task_manager.TaskManager(self.context, node.uuid)
+        task.process_event('deploy')
+
+        manager.do_node_deploy(task, self.service.conductor.id)
+        mock_set_steps.assert_called_once_with(task)
+        mock_old_way.assert_called_once_with(task, self.service.conductor.id,
+                                             False)
+        self.assertFalse(mock_deploy_step.called)
+        self.assertNotIn('deploy_steps', task.node.driver_internal_info)
+
+    @mock.patch.object(manager, 'LOG', autospec=True)
+    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.deploy', autospec=True)
+    def test__old_rest_of_do_node_deploy_no_steps(self, mock_deploy, mock_log):
+        # TODO(rloo): no deploy steps; delete this when we remove support
+        # for handling no deploy steps.
+        manager._SEEN_NO_DEPLOY_STEP_DEPRECATIONS = []
+        self._start_service()
+        node = obj_utils.create_test_node(self.context, driver='fake-hardware')
+        task = task_manager.TaskManager(self.context, node.uuid)
+        task.process_event('deploy')
+
+        manager._old_rest_of_do_node_deploy(task, self.service.conductor.id,
+                                            True)
+        mock_deploy.assert_called_once_with(mock.ANY, task)
+        self.assertTrue(mock_log.warning.called)
+        self.assertEqual(self.service.conductor.id,
+                         task.node.conductor_affinity)
+
+        # Make sure the deprecation warning isn't logged again
+        mock_log.reset_mock()
+        manager._old_rest_of_do_node_deploy(task, self.service.conductor.id,
+                                            True)
+        self.assertFalse(mock_log.warning.called)
+
+    @mock.patch.object(manager, 'LOG', autospec=True)
+    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.deploy', autospec=True)
+    def test__old_rest_of_do_node_deploy_has_steps(self, mock_deploy,
+                                                   mock_log):
+        # TODO(rloo): has steps but old RPC; delete this when we remove support
+        # for handling no deploy steps.
+        manager._SEEN_NO_DEPLOY_STEP_DEPRECATIONS = []
+        self._start_service()
+        node = obj_utils.create_test_node(self.context, driver='fake-hardware')
+        task = task_manager.TaskManager(self.context, node.uuid)
+        task.process_event('deploy')
+
+        manager._old_rest_of_do_node_deploy(task, self.service.conductor.id,
+                                            False)
+        mock_deploy.assert_called_once_with(mock.ANY, task)
+        self.assertFalse(mock_log.warning.called)
+        self.assertEqual(self.service.conductor.id,
+                         task.node.conductor_affinity)
+
+
+@mgr_utils.mock_record_keepalive
+class DoNextDeployStepTestCase(mgr_utils.ServiceSetUpMixin,
+                               db_base.DbTestCase):
+    def setUp(self):
+        super(DoNextDeployStepTestCase, self).setUp()
+        self.deploy_start = {
+            'step': 'deploy_start', 'priority': 50, 'interface': 'deploy'}
+        self.deploy_end = {
+            'step': 'deploy_end', 'priority': 20, 'interface': 'deploy'}
+        self.deploy_steps = [self.deploy_start, self.deploy_end]
+
+    @mock.patch.object(manager, 'LOG', autospec=True)
+    def test__do_next_deploy_step_none(self, mock_log):
+        self._start_service()
+        node = obj_utils.create_test_node(self.context, driver='fake-hardware')
+        task = task_manager.TaskManager(self.context, node.uuid)
+        task.process_event('deploy')
+
+        manager._do_next_deploy_step(task, None, self.service.conductor.id)
+
+        node.refresh()
+        self.assertEqual(states.ACTIVE, node.provision_state)
+        self.assertEqual(2, mock_log.info.call_count)
+
+    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.execute_deploy_step',
+                autospec=True)
+    def test__do_next_deploy_step_async(self, mock_execute):
+        driver_internal_info = {'deploy_step_index': None,
+                                'deploy_steps': self.deploy_steps}
+        self._start_service()
+        node = obj_utils.create_test_node(
+            self.context, driver='fake-hardware',
+            driver_internal_info=driver_internal_info,
+            deploy_step={})
+        mock_execute.return_value = states.DEPLOYWAIT
+        expected_first_step = node.driver_internal_info['deploy_steps'][0]
+        task = task_manager.TaskManager(self.context, node.uuid)
+        task.process_event('deploy')
+
+        manager._do_next_deploy_step(task, 0, self.service.conductor.id)
+
+        node.refresh()
+        self.assertEqual(states.DEPLOYWAIT, node.provision_state)
+        self.assertEqual(states.ACTIVE, node.target_provision_state)
+        self.assertEqual(expected_first_step, node.deploy_step)
+        self.assertEqual(0, node.driver_internal_info['deploy_step_index'])
+        self.assertEqual(self.service.conductor.id, node.conductor_affinity)
+        mock_execute.assert_called_once_with(mock.ANY, task,
+                                             self.deploy_steps[0])
+
+    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.execute_deploy_step',
+                autospec=True)
+    def test__do_next_deploy_step_continue_from_last_step(self, mock_execute):
+        # Resume an in-progress deploy after the first async step
+        driver_internal_info = {'deploy_step_index': 0,
+                                'deploy_steps': self.deploy_steps}
+        self._start_service()
+        node = obj_utils.create_test_node(
+            self.context, driver='fake-hardware',
+            provision_state=states.DEPLOYWAIT,
+            target_provision_state=states.ACTIVE,
+            driver_internal_info=driver_internal_info,
+            deploy_step=self.deploy_steps[0])
+        mock_execute.return_value = states.DEPLOYWAIT
+
+        task = task_manager.TaskManager(self.context, node.uuid)
+        task.process_event('resume')
+
+        manager._do_next_deploy_step(task, 1, self.service.conductor.id)
+        node.refresh()
+
+        self.assertEqual(states.DEPLOYWAIT, node.provision_state)
+        self.assertEqual(states.ACTIVE, node.target_provision_state)
+        self.assertEqual(self.deploy_steps[1], node.deploy_step)
+        self.assertEqual(1, node.driver_internal_info['deploy_step_index'])
+        mock_execute.assert_called_once_with(mock.ANY, task,
+                                             self.deploy_steps[1])
+
+    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.execute_deploy_step')
+    def test__do_next_deploy_step_last_step_done(self, mock_execute):
+        # Resume where last_step is the last deploy step that was executed
+        driver_internal_info = {'deploy_step_index': 1,
+                                'deploy_steps': self.deploy_steps}
+        self._start_service()
+        node = obj_utils.create_test_node(
+            self.context, driver='fake-hardware',
+            provision_state=states.DEPLOYWAIT,
+            target_provision_state=states.ACTIVE,
+            driver_internal_info=driver_internal_info,
+            deploy_step=self.deploy_steps[1])
+        mock_execute.return_value = None
+
+        task = task_manager.TaskManager(self.context, node.uuid)
+        task.process_event('resume')
+
+        manager._do_next_deploy_step(task, None, self.service.conductor.id)
+        node.refresh()
+        # Deploying should be complete without calling additional steps
+        self.assertEqual(states.ACTIVE, node.provision_state)
+        self.assertEqual(states.NOSTATE, node.target_provision_state)
+        self.assertEqual({}, node.deploy_step)
+        self.assertNotIn('deploy_step_index', node.driver_internal_info)
+        self.assertIsNone(node.driver_internal_info['deploy_steps'])
+        self.assertFalse(mock_execute.called)
+
+    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.execute_deploy_step')
+    def test__do_next_deploy_step_all(self, mock_execute):
+        # Run all steps from start to finish (all synchronous)
+        driver_internal_info = {'deploy_step_index': None,
+                                'deploy_steps': self.deploy_steps}
+        self._start_service()
+        node = obj_utils.create_test_node(
+            self.context, driver='fake-hardware',
+            driver_internal_info=driver_internal_info,
+            deploy_step={})
+        mock_execute.return_value = None
+
+        task = task_manager.TaskManager(self.context, node.uuid)
+        task.process_event('deploy')
+
+        manager._do_next_deploy_step(task, 1, self.service.conductor.id)
+
+        # Deploying should be complete
+        node.refresh()
+        self.assertEqual(states.ACTIVE, node.provision_state)
+        self.assertEqual(states.NOSTATE, node.target_provision_state)
+        self.assertEqual({}, node.deploy_step)
+        self.assertNotIn('deploy_step_index', node.driver_internal_info)
+        self.assertIsNone(node.driver_internal_info['deploy_steps'])
+        mock_execute.assert_has_calls = [mock.call(self.deploy_steps[0]),
+                                         mock.call(self.deploy_steps[1])]
+
+    @mock.patch.object(conductor_utils, 'LOG', autospec=True)
+    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.execute_deploy_step')
+    def _do_next_deploy_step_execute_fail(self, exc, traceback,
+                                          mock_execute, mock_log):
+        # When a deploy step fails, go to DEPLOYFAIL
+        driver_internal_info = {'deploy_step_index': None,
+                                'deploy_steps': self.deploy_steps}
+        self._start_service()
+        node = obj_utils.create_test_node(
+            self.context, driver='fake-hardware',
+            driver_internal_info=driver_internal_info,
+            deploy_step={})
+        mock_execute.side_effect = exc
+
+        task = task_manager.TaskManager(self.context, node.uuid)
+        task.process_event('deploy')
+
+        manager._do_next_deploy_step(task, 0, self.service.conductor.id)
+
+        # Make sure we go to DEPLOYFAIL, clear deploy_steps
+        node.refresh()
+        self.assertEqual(states.DEPLOYFAIL, node.provision_state)
+        self.assertEqual(states.ACTIVE, node.target_provision_state)
+        self.assertEqual({}, node.deploy_step)
+        self.assertNotIn('deploy_step_index', node.driver_internal_info)
+        self.assertIsNotNone(node.last_error)
+        self.assertFalse(node.maintenance)
+        mock_execute.assert_called_once_with(mock.ANY, self.deploy_steps[0])
+        mock_log.error.assert_called_once_with(mock.ANY, exc_info=traceback)
+
+    def test_do_next_deploy_step_execute_ironic_exception(self):
+        self._do_next_deploy_step_execute_fail(
+            exception.IronicException('foo'), False)
+
+    def test_do_next_deploy_step_execute_exception(self):
+        self._do_next_deploy_step_execute_fail(Exception('foo'), True)
+
+    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.execute_deploy_step')
+    def test_do_next_deploy_step_no_steps(self, mock_execute):
+
+        self._start_service()
+        for info in ({'deploy_steps': None, 'deploy_step_index': None},
+                     {'deploy_steps': None}):
+            # Resume where there are no steps, should be a noop
+            node = obj_utils.create_test_node(
+                self.context, driver='fake-hardware',
+                uuid=uuidutils.generate_uuid(),
+                last_error=None,
+                driver_internal_info=info,
+                deploy_step={})
+
+            task = task_manager.TaskManager(self.context, node.uuid)
+            task.process_event('deploy')
+
+            manager._do_next_deploy_step(task, None, self.service.conductor.id)
+
+            # Deploying should be complete without calling additional steps
+            node.refresh()
+            self.assertEqual(states.ACTIVE, node.provision_state)
+            self.assertEqual(states.NOSTATE, node.target_provision_state)
+            self.assertEqual({}, node.deploy_step)
+            self.assertNotIn('deploy_step_index', node.driver_internal_info)
+            self.assertFalse(mock_execute.called)
+            mock_execute.reset_mock()
+
+    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.execute_deploy_step')
+    def test_do_next_deploy_step_bad_step_return_value(self, mock_execute):
+        # When a deploy step fails, go to DEPLOYFAIL
+        self._start_service()
+        node = obj_utils.create_test_node(
+            self.context, driver='fake-hardware',
+            driver_internal_info={'deploy_steps': self.deploy_steps,
+                                  'deploy_step_index': None},
+            deploy_step={})
+        mock_execute.return_value = "foo"
+
+        task = task_manager.TaskManager(self.context, node.uuid)
+        task.process_event('deploy')
+
+        manager._do_next_deploy_step(task, 0, self.service.conductor.id)
+
+        # Make sure we go to DEPLOYFAIL, clear deploy_steps
+        node.refresh()
+        self.assertEqual(states.DEPLOYFAIL, node.provision_state)
+        self.assertEqual(states.ACTIVE, node.target_provision_state)
+        self.assertEqual({}, node.deploy_step)
+        self.assertNotIn('deploy_step_index', node.driver_internal_info)
+        self.assertIsNotNone(node.last_error)
+        self.assertFalse(node.maintenance)
+        mock_execute.assert_called_once_with(mock.ANY, self.deploy_steps[0])
+
+    def test__get_node_next_deploy_steps(self):
+        driver_internal_info = {'deploy_steps': self.deploy_steps,
+                                'deploy_step_index': 0}
+        node = obj_utils.create_test_node(
+            self.context, driver='fake-hardware',
+            provision_state=states.DEPLOYWAIT,
+            target_provision_state=states.ACTIVE,
+            driver_internal_info=driver_internal_info,
+            last_error=None,
+            deploy_step=self.deploy_steps[0])
+
+        with task_manager.acquire(self.context, node.uuid) as task:
+            step_index = self.service._get_node_next_deploy_steps(task)
+            self.assertEqual(1, step_index)
+
+    def test__get_node_next_deploy_steps_unset_deploy_step(self):
+        driver_internal_info = {'deploy_steps': self.deploy_steps,
+                                'deploy_step_index': None}
+        node = obj_utils.create_test_node(
+            self.context, driver='fake-hardware',
+            provision_state=states.DEPLOYWAIT,
+            target_provision_state=states.ACTIVE,
+            driver_internal_info=driver_internal_info,
+            last_error=None,
+            deploy_step=None)
+
+        with task_manager.acquire(self.context, node.uuid) as task:
+            step_index = self.service._get_node_next_deploy_steps(task)
+            self.assertEqual(0, step_index)
+
 
 @mgr_utils.mock_record_keepalive
 class CheckTimeoutsTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
diff --git a/ironic/tests/unit/conductor/test_rpcapi.py b/ironic/tests/unit/conductor/test_rpcapi.py
index 28075e7c07..32302848a6 100644
--- a/ironic/tests/unit/conductor/test_rpcapi.py
+++ b/ironic/tests/unit/conductor/test_rpcapi.py
@@ -360,6 +360,12 @@ class RPCAPITestCase(db_base.DbTestCase):
                           version='1.27',
                           node_id=self.fake_node['uuid'])
 
+    def test_continue_node_deploy(self):
+        self._test_rpcapi('continue_node_deploy',
+                          'cast',
+                          version='1.45',
+                          node_id=self.fake_node['uuid'])
+
     def test_get_raid_logical_disk_properties(self):
         self._test_rpcapi('get_raid_logical_disk_properties',
                           'call',
diff --git a/ironic/tests/unit/conductor/test_utils.py b/ironic/tests/unit/conductor/test_utils.py
index 428012e8a9..e5341986ba 100644
--- a/ironic/tests/unit/conductor/test_utils.py
+++ b/ironic/tests/unit/conductor/test_utils.py
@@ -18,6 +18,7 @@ from ironic.common import boot_modes
 from ironic.common import exception
 from ironic.common import network
 from ironic.common import states
+from ironic.conductor import rpcapi
 from ironic.conductor import task_manager
 from ironic.conductor import utils as conductor_utils
 from ironic.drivers import base as drivers_base
@@ -874,22 +875,27 @@ class NodeSoftPowerActionTestCase(db_base.DbTestCase):
         self.assertIsNone(node['last_error'])
 
 
-class CleanupAfterTimeoutTestCase(tests_base.TestCase):
+class DeployingErrorHandlerTestCase(tests_base.TestCase):
     def setUp(self):
-        super(CleanupAfterTimeoutTestCase, self).setUp()
+        super(DeployingErrorHandlerTestCase, self).setUp()
         self.task = mock.Mock(spec=task_manager.TaskManager)
         self.task.context = self.context
         self.task.driver = mock.Mock(spec_set=['deploy'])
         self.task.shared = False
         self.task.node = mock.Mock(spec_set=objects.Node)
         self.node = self.task.node
+        self.node.provision_state = states.DEPLOYING
+        self.node.last_error = None
+        self.node.deploy_step = None
+        self.node.driver_internal_info = {}
+        self.logmsg = "log message"
+        self.errmsg = "err message"
 
-    def test_cleanup_after_timeout(self):
+    @mock.patch.object(conductor_utils, 'deploying_error_handler',
+                       autospec=True)
+    def test_cleanup_after_timeout(self, mock_handler):
         conductor_utils.cleanup_after_timeout(self.task)
-
-        self.node.save.assert_called_once_with()
-        self.task.driver.deploy.clean_up.assert_called_once_with(self.task)
-        self.assertIn('Timeout reached', self.node.last_error)
+        mock_handler.assert_called_once_with(self.task, mock.ANY, mock.ANY)
 
     def test_cleanup_after_timeout_shared_lock(self):
         self.task.shared = True
@@ -898,25 +904,162 @@ class CleanupAfterTimeoutTestCase(tests_base.TestCase):
                           conductor_utils.cleanup_after_timeout,
                           self.task)
 
-    def test_cleanup_after_timeout_cleanup_ironic_exception(self):
-        clean_up_mock = self.task.driver.deploy.clean_up
-        clean_up_mock.side_effect = exception.IronicException('moocow')
+    def test_deploying_error_handler(self):
+        conductor_utils.deploying_error_handler(self.task, self.logmsg,
+                                                self.errmsg)
 
-        conductor_utils.cleanup_after_timeout(self.task)
+        self.assertEqual([mock.call()] * 2, self.node.save.call_args_list)
+        self.task.driver.deploy.clean_up.assert_called_once_with(self.task)
+        self.assertEqual(self.errmsg, self.node.last_error)
+        self.assertEqual({}, self.node.deploy_step)
+        self.assertNotIn('deploy_step_index', self.node.driver_internal_info)
+        self.task.process_event.assert_called_once_with('fail')
+
+    def _test_deploying_error_handler_cleanup(self, exc, expected_str):
+        clean_up_mock = self.task.driver.deploy.clean_up
+        clean_up_mock.side_effect = exc
+
+        conductor_utils.deploying_error_handler(self.task, self.logmsg,
+                                                self.errmsg)
 
         self.task.driver.deploy.clean_up.assert_called_once_with(self.task)
         self.assertEqual([mock.call()] * 2, self.node.save.call_args_list)
-        self.assertIn('moocow', self.node.last_error)
+        self.assertIn(expected_str, self.node.last_error)
+        self.assertEqual({}, self.node.deploy_step)
+        self.assertNotIn('deploy_step_index', self.node.driver_internal_info)
+        self.task.process_event.assert_called_once_with('fail')
 
-    def test_cleanup_after_timeout_cleanup_random_exception(self):
-        clean_up_mock = self.task.driver.deploy.clean_up
-        clean_up_mock.side_effect = Exception('moocow')
+    def test_deploying_error_handler_cleanup_ironic_exception(self):
+        self._test_deploying_error_handler_cleanup(
+            exception.IronicException('moocow'), 'moocow')
 
-        conductor_utils.cleanup_after_timeout(self.task)
+    def test_deploying_error_handler_cleanup_random_exception(self):
+        self._test_deploying_error_handler_cleanup(
+            Exception('moocow'), 'unhandled exception')
 
-        self.task.driver.deploy.clean_up.assert_called_once_with(self.task)
+    def test_deploying_error_handler_no_cleanup(self):
+        conductor_utils.deploying_error_handler(
+            self.task, self.logmsg, self.errmsg, clean_up=False)
+
+        self.assertFalse(self.task.driver.deploy.clean_up.called)
         self.assertEqual([mock.call()] * 2, self.node.save.call_args_list)
-        self.assertIn('Deploy timed out', self.node.last_error)
+        self.assertEqual(self.errmsg, self.node.last_error)
+        self.assertEqual({}, self.node.deploy_step)
+        self.assertNotIn('deploy_step_index', self.node.driver_internal_info)
+        self.task.process_event.assert_called_once_with('fail')
+
+    def test_deploying_error_handler_not_deploy(self):
+        # Not in a deploy state
+        self.node.provision_state = states.AVAILABLE
+        self.node.driver_internal_info['deploy_step_index'] = 2
+
+        conductor_utils.deploying_error_handler(
+            self.task, self.logmsg, self.errmsg, clean_up=False)
+
+        self.assertEqual([mock.call()] * 2, self.node.save.call_args_list)
+        self.assertEqual(self.errmsg, self.node.last_error)
+        self.assertIsNone(self.node.deploy_step)
+        self.assertIn('deploy_step_index', self.node.driver_internal_info)
+        self.task.process_event.assert_called_once_with('fail')
+
+
+class NodeDeployStepsTestCase(db_base.DbTestCase):
+    def setUp(self):
+        super(NodeDeployStepsTestCase, self).setUp()
+
+        self.deploy_start = {
+            'step': 'deploy_start', 'priority': 50, 'interface': 'deploy'}
+        self.power_one = {
+            'step': 'power_one', 'priority': 40, 'interface': 'power'}
+        self.deploy_middle = {
+            'step': 'deploy_middle', 'priority': 40, 'interface': 'deploy'}
+        self.deploy_end = {
+            'step': 'deploy_end', 'priority': 20, 'interface': 'deploy'}
+        self.power_disable = {
+            'step': 'power_disable', 'priority': 0, 'interface': 'power'}
+        # enabled steps
+        self.deploy_steps = [self.deploy_start, self.power_one,
+                             self.deploy_middle, self.deploy_end]
+        self.node = obj_utils.create_test_node(
+            self.context, driver='fake-hardware')
+
+    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.get_deploy_steps')
+    @mock.patch('ironic.drivers.modules.fake.FakePower.get_deploy_steps')
+    @mock.patch('ironic.drivers.modules.fake.FakeManagement.get_deploy_steps')
+    def test__get_deployment_steps(self, mock_mgt_steps, mock_power_steps,
+                                   mock_deploy_steps):
+        # Test getting deploy steps, with one driver returning None, two
+        # conflicting priorities, and asserting they are ordered properly.
+
+        mock_power_steps.return_value = [self.power_disable, self.power_one]
+        mock_deploy_steps.return_value = [
+            self.deploy_start, self.deploy_middle, self.deploy_end]
+
+        expected = self.deploy_steps + [self.power_disable]
+        with task_manager.acquire(
+                self.context, self.node.uuid, shared=False) as task:
+            steps = conductor_utils._get_deployment_steps(task, enabled=False)
+
+            self.assertEqual(expected, steps)
+            mock_mgt_steps.assert_called_once_with(task)
+            mock_power_steps.assert_called_once_with(task)
+            mock_deploy_steps.assert_called_once_with(task)
+
+    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.get_deploy_steps')
+    @mock.patch('ironic.drivers.modules.fake.FakePower.get_deploy_steps')
+    @mock.patch('ironic.drivers.modules.fake.FakeManagement.get_deploy_steps')
+    def test__get_deploy_steps_unsorted(self, mock_mgt_steps, mock_power_steps,
+                                        mock_deploy_steps):
+
+        mock_deploy_steps.return_value = [self.deploy_end,
+                                          self.deploy_start,
+                                          self.deploy_middle]
+        with task_manager.acquire(
+                self.context, self.node.uuid, shared=False) as task:
+            steps = conductor_utils._get_deployment_steps(task, enabled=False,
+                                                          sort=False)
+            self.assertEqual(mock_deploy_steps.return_value, steps)
+            mock_mgt_steps.assert_called_once_with(task)
+            mock_power_steps.assert_called_once_with(task)
+            mock_deploy_steps.assert_called_once_with(task)
+
+    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.get_deploy_steps')
+    @mock.patch('ironic.drivers.modules.fake.FakePower.get_deploy_steps')
+    @mock.patch('ironic.drivers.modules.fake.FakeManagement.get_deploy_steps')
+    def test__get_deployment_steps_only_enabled(
+            self, mock_mgt_steps, mock_power_steps, mock_deploy_steps):
+        # Test getting only deploy steps, with one driver returning None, two
+        # conflicting priorities, and asserting they are ordered properly.
+        # Should discard zero-priority deploy step.
+
+        mock_power_steps.return_value = [self.power_one, self.power_disable]
+        mock_deploy_steps.return_value = [self.deploy_end,
+                                          self.deploy_middle,
+                                          self.deploy_start]
+
+        with task_manager.acquire(
+                self.context, self.node.uuid, shared=True) as task:
+            steps = conductor_utils._get_deployment_steps(task, enabled=True)
+
+            self.assertEqual(self.deploy_steps, steps)
+            mock_mgt_steps.assert_called_once_with(task)
+            mock_power_steps.assert_called_once_with(task)
+            mock_deploy_steps.assert_called_once_with(task)
+
+    @mock.patch.object(conductor_utils, '_get_deployment_steps')
+    def test_set_node_deployment_steps(self, mock_steps):
+        mock_steps.return_value = self.deploy_steps
+
+        with task_manager.acquire(
+                self.context, self.node.uuid, shared=False) as task:
+            conductor_utils.set_node_deployment_steps(task)
+            self.node.refresh()
+            self.assertEqual(self.deploy_steps,
+                             self.node.driver_internal_info['deploy_steps'])
+            self.assertEqual({}, self.node.deploy_step)
+            self.assertIsNone(
+                self.node.driver_internal_info['deploy_step_index'])
+            mock_steps.assert_called_once_with(task, enabled=True)
 
 
 class NodeCleaningStepsTestCase(db_base.DbTestCase):
@@ -1298,6 +1441,21 @@ class ErrorHandlersTestCase(tests_base.TestCase):
         self.assertFalse(self.node.save.called)
         self.assertFalse(log_mock.warning.called)
 
+    @mock.patch.object(conductor_utils, 'LOG')
+    def test_spawn_deploying_error_handler_no_worker(self, log_mock):
+        exc = exception.NoFreeConductorWorker()
+        conductor_utils.spawn_deploying_error_handler(exc, self.node)
+        self.node.save.assert_called_once_with()
+        self.assertIn('No free conductor workers', self.node.last_error)
+        self.assertTrue(log_mock.warning.called)
+
+    @mock.patch.object(conductor_utils, 'LOG')
+    def test_spawn_deploying_error_handler_other_error(self, log_mock):
+        exc = Exception('foo')
+        conductor_utils.spawn_deploying_error_handler(exc, self.node)
+        self.assertFalse(self.node.save.called)
+        self.assertFalse(log_mock.warning.called)
+
     @mock.patch.object(conductor_utils, 'LOG')
     def test_spawn_rescue_error_handler_no_worker(self, log_mock):
         exc = exception.NoFreeConductorWorker()
@@ -1798,6 +1956,37 @@ class MiscTestCase(db_base.DbTestCase):
     def test_remove_node_rescue_password_save_false(self):
         self._test_remove_node_rescue_password(save=False)
 
+    @mock.patch.object(rpcapi.ConductorAPI, 'continue_node_deploy',
+                       autospec=True)
+    @mock.patch.object(rpcapi.ConductorAPI, 'get_topic_for', autospec=True)
+    def test__notify_conductor_resume_operation(self, mock_topic,
+                                                mock_rpc_call):
+        mock_topic.return_value = 'topic'
+        with task_manager.acquire(
+                self.context, self.node.uuid, shared=False) as task:
+            conductor_utils._notify_conductor_resume_operation(
+                task, 'deploying', 'continue_node_deploy')
+            mock_rpc_call.assert_called_once_with(
+                mock.ANY, task.context, self.node.uuid, topic='topic')
+
+    @mock.patch.object(conductor_utils, '_notify_conductor_resume_operation',
+                       autospec=True)
+    def test_notify_conductor_resume_clean(self, mock_resume):
+        with task_manager.acquire(
+                self.context, self.node.uuid, shared=False) as task:
+            conductor_utils.notify_conductor_resume_clean(task)
+            mock_resume.assert_called_once_with(
+                task, 'cleaning', 'continue_node_clean')
+
+    @mock.patch.object(conductor_utils, '_notify_conductor_resume_operation',
+                       autospec=True)
+    def test_notify_conductor_resume_deploy(self, mock_resume):
+        with task_manager.acquire(
+                self.context, self.node.uuid, shared=False) as task:
+            conductor_utils.notify_conductor_resume_deploy(task)
+            mock_resume.assert_called_once_with(
+                task, 'deploying', 'continue_node_deploy')
+
 
 class ValidateInstanceInfoTraitsTestCase(tests_base.TestCase):
 
diff --git a/ironic/tests/unit/drivers/modules/ansible/test_deploy.py b/ironic/tests/unit/drivers/modules/ansible/test_deploy.py
index 7fd4c141ce..0e559708a2 100644
--- a/ironic/tests/unit/drivers/modules/ansible/test_deploy.py
+++ b/ironic/tests/unit/drivers/modules/ansible/test_deploy.py
@@ -874,8 +874,13 @@ class TestAnsibleDeploy(AnsibleDeployTestCaseBase):
             self.assertEqual(states.ACTIVE, task.node.target_provision_state)
             self.assertEqual(states.DEPLOYING, task.node.provision_state)
 
+    @mock.patch.object(utils, 'notify_conductor_resume_deploy', autospec=True)
     @mock.patch.object(utils, 'node_set_boot_device', autospec=True)
-    def test_reboot_to_instance(self, bootdev_mock):
+    def test_reboot_to_instance(self, bootdev_mock, resume_mock):
+        self.node.provision_state = states.DEPLOYING
+        self.node.deploy_step = {
+            'step': 'deploy', 'priority': 100, 'interface': 'deploy'}
+        self.node.save()
         with task_manager.acquire(self.context, self.node.uuid) as task:
             with mock.patch.object(self.driver, 'reboot_and_finish_deploy',
                                    autospec=True):
@@ -883,6 +888,30 @@ class TestAnsibleDeploy(AnsibleDeployTestCaseBase):
                 self.driver.reboot_to_instance(task)
                 bootdev_mock.assert_called_once_with(task, 'disk',
                                                      persistent=True)
+                resume_mock.assert_called_once_with(task)
+                self.driver.reboot_and_finish_deploy.assert_called_once_with(
+                    task)
+                task.driver.boot.clean_up_ramdisk.assert_called_once_with(
+                    task)
+
+    @mock.patch.object(utils, 'notify_conductor_resume_deploy', autospec=True)
+    @mock.patch.object(utils, 'node_set_boot_device', autospec=True)
+    def test_reboot_to_instance_deprecated(self, bootdev_mock, resume_mock):
+        # TODO(rloo): no deploy steps; delete this when we remove support
+        # for handling no deploy steps.
+        self.node.provision_state = states.DEPLOYING
+        self.node.deploy_step = None
+        self.node.save()
+        with task_manager.acquire(self.context, self.node.uuid) as task:
+            with mock.patch.object(self.driver, 'reboot_and_finish_deploy',
+                                   autospec=True):
+                task.driver.boot = mock.Mock()
+                task.process_event = mock.Mock()
+                self.driver.reboot_to_instance(task)
+                bootdev_mock.assert_called_once_with(task, 'disk',
+                                                     persistent=True)
+                self.assertFalse(resume_mock.called)
+                task.process_event.assert_called_once_with('done')
                 self.driver.reboot_and_finish_deploy.assert_called_once_with(
                     task)
                 task.driver.boot.clean_up_ramdisk.assert_called_once_with(
diff --git a/ironic/tests/unit/drivers/modules/test_agent.py b/ironic/tests/unit/drivers/modules/test_agent.py
index 91b81527ce..ed290ea1cf 100644
--- a/ironic/tests/unit/drivers/modules/test_agent.py
+++ b/ironic/tests/unit/drivers/modules/test_agent.py
@@ -290,11 +290,13 @@ class TestAgentDeploy(db_base.DbTestCase):
                                                      mock_pxe_instance):
         mock_write.return_value = False
         self.node.provision_state = states.DEPLOYING
+        self.node.deploy_step = {
+            'step': 'deploy', 'priority': 50, 'interface': 'deploy'}
         self.node.save()
         with task_manager.acquire(
                 self.context, self.node['uuid'], shared=False) as task:
             driver_return = self.driver.deploy(task)
-            self.assertEqual(driver_return, states.DEPLOYDONE)
+            self.assertIsNone(driver_return)
             self.assertTrue(mock_pxe_instance.called)
 
     @mock.patch.object(noop_storage.NoopStorage, 'detach_volumes',
diff --git a/ironic/tests/unit/drivers/modules/test_agent_base_vendor.py b/ironic/tests/unit/drivers/modules/test_agent_base_vendor.py
index 90c9fe5546..98c91d6b21 100644
--- a/ironic/tests/unit/drivers/modules/test_agent_base_vendor.py
+++ b/ironic/tests/unit/drivers/modules/test_agent_base_vendor.py
@@ -425,6 +425,8 @@ class AgentRescueTests(AgentDeployMixinBaseTest):
 
 
 class AgentDeployMixinTest(AgentDeployMixinBaseTest):
+    @mock.patch.object(manager_utils, 'notify_conductor_resume_deploy',
+                       autospec=True)
     @mock.patch.object(driver_utils, 'collect_ramdisk_logs', autospec=True)
     @mock.patch.object(time, 'sleep', lambda seconds: None)
     @mock.patch.object(manager_utils, 'node_power_action', autospec=True)
@@ -434,10 +436,45 @@ class AgentDeployMixinTest(AgentDeployMixinBaseTest):
                        spec=types.FunctionType)
     def test_reboot_and_finish_deploy(
             self, power_off_mock, get_power_state_mock,
-            node_power_action_mock, mock_collect):
+            node_power_action_mock, collect_mock, resume_mock):
         cfg.CONF.set_override('deploy_logs_collect', 'always', 'agent')
         self.node.provision_state = states.DEPLOYING
         self.node.target_provision_state = states.ACTIVE
+        self.node.deploy_step = {
+            'step': 'deploy', 'priority': 50, 'interface': 'deploy'}
+        self.node.save()
+        with task_manager.acquire(self.context, self.node.uuid,
+                                  shared=True) as task:
+            get_power_state_mock.side_effect = [states.POWER_ON,
+                                                states.POWER_OFF]
+            self.deploy.reboot_and_finish_deploy(task)
+            power_off_mock.assert_called_once_with(task.node)
+            self.assertEqual(2, get_power_state_mock.call_count)
+            node_power_action_mock.assert_called_once_with(
+                task, states.POWER_ON)
+            self.assertEqual(states.DEPLOYING, task.node.provision_state)
+            self.assertEqual(states.ACTIVE, task.node.target_provision_state)
+            collect_mock.assert_called_once_with(task.node)
+            resume_mock.assert_called_once_with(task)
+
+    @mock.patch.object(manager_utils, 'notify_conductor_resume_deploy',
+                       autospec=True)
+    @mock.patch.object(driver_utils, 'collect_ramdisk_logs', autospec=True)
+    @mock.patch.object(time, 'sleep', lambda seconds: None)
+    @mock.patch.object(manager_utils, 'node_power_action', autospec=True)
+    @mock.patch.object(fake.FakePower, 'get_power_state',
+                       spec=types.FunctionType)
+    @mock.patch.object(agent_client.AgentClient, 'power_off',
+                       spec=types.FunctionType)
+    def test_reboot_and_finish_deploy_deprecated(
+            self, power_off_mock, get_power_state_mock,
+            node_power_action_mock, collect_mock, resume_mock):
+        # TODO(rloo): no deploy steps; delete this when we remove support
+        # for handling no deploy steps.
+        cfg.CONF.set_override('deploy_logs_collect', 'always', 'agent')
+        self.node.provision_state = states.DEPLOYING
+        self.node.target_provision_state = states.ACTIVE
+        self.node.deploy_step = None
         self.node.save()
         with task_manager.acquire(self.context, self.node.uuid,
                                   shared=True) as task:
@@ -450,7 +487,8 @@ class AgentDeployMixinTest(AgentDeployMixinBaseTest):
                 task, states.POWER_ON)
             self.assertEqual(states.ACTIVE, task.node.provision_state)
             self.assertEqual(states.NOSTATE, task.node.target_provision_state)
-            mock_collect.assert_called_once_with(task.node)
+            collect_mock.assert_called_once_with(task.node)
+            self.assertFalse(resume_mock.called)
 
     @mock.patch.object(driver_utils, 'collect_ramdisk_logs', autospec=True)
     @mock.patch.object(time, 'sleep', lambda seconds: None)
diff --git a/ironic/tests/unit/drivers/modules/test_deploy_utils.py b/ironic/tests/unit/drivers/modules/test_deploy_utils.py
index 021ef33f77..6bf0543c49 100644
--- a/ironic/tests/unit/drivers/modules/test_deploy_utils.py
+++ b/ironic/tests/unit/drivers/modules/test_deploy_utils.py
@@ -1077,14 +1077,13 @@ class OtherFunctionTestCase(db_base.DbTestCase):
 
     @mock.patch.object(utils, 'LOG', autospec=True)
     @mock.patch.object(manager_utils, 'node_power_action', autospec=True)
-    @mock.patch.object(task_manager.TaskManager, 'process_event',
-                       autospec=True)
-    def _test_set_failed_state(self, mock_event, mock_power, mock_log,
+    @mock.patch.object(manager_utils, 'deploying_error_handler', autospec=True)
+    def _test_set_failed_state(self, mock_error, mock_power, mock_log,
                                event_value=None, power_value=None,
                                log_calls=None, poweroff=True,
                                collect_logs=True):
         err_msg = 'some failure'
-        mock_event.side_effect = event_value
+        mock_error.side_effect = event_value
         mock_power.side_effect = power_value
         with task_manager.acquire(self.context, self.node.uuid,
                                   shared=False) as task:
@@ -1093,7 +1092,8 @@ class OtherFunctionTestCase(db_base.DbTestCase):
             else:
                 utils.set_failed_state(task, err_msg,
                                        collect_logs=collect_logs)
-            mock_event.assert_called_once_with(task, 'fail')
+            mock_error.assert_called_once_with(task, err_msg, err_msg,
+                                               clean_up=False)
             if poweroff:
                 mock_power.assert_called_once_with(task, states.POWER_OFF)
             else:
diff --git a/ironic/tests/unit/drivers/modules/test_iscsi_deploy.py b/ironic/tests/unit/drivers/modules/test_iscsi_deploy.py
index a7d1366de5..995ba0aad9 100644
--- a/ironic/tests/unit/drivers/modules/test_iscsi_deploy.py
+++ b/ironic/tests/unit/drivers/modules/test_iscsi_deploy.py
@@ -739,17 +739,20 @@ class ISCSIDeployTestCase(db_base.DbTestCase):
                                                     mock_write):
         mock_write.return_value = False
         self.node.provision_state = states.DEPLOYING
+        self.node.deploy_step = {
+            'step': 'deploy', 'priority': 50, 'interface': 'deploy'}
         self.node.save()
         with task_manager.acquire(self.context,
                                   self.node.uuid, shared=False) as task:
-            state = task.driver.deploy.deploy(task)
-            self.assertEqual(state, states.DEPLOYDONE)
+            ret = task.driver.deploy.deploy(task)
+            self.assertIsNone(ret)
             self.assertFalse(mock_cache_instance_image.called)
             self.assertFalse(mock_check_image_size.called)
             mock_remove_network.assert_called_once_with(mock.ANY, task)
             mock_tenant_network.assert_called_once_with(mock.ANY, task)
             mock_prepare_instance.assert_called_once_with(mock.ANY, task)
             self.assertEqual(2, mock_node_power_action.call_count)
+            self.assertEqual(states.DEPLOYING, task.node.provision_state)
 
     @mock.patch.object(noop_storage.NoopStorage, 'detach_volumes',
                        autospec=True)
diff --git a/ironic/tests/unit/drivers/test_base.py b/ironic/tests/unit/drivers/test_base.py
index 83f80ddb18..d3f7367655 100644
--- a/ironic/tests/unit/drivers/test_base.py
+++ b/ironic/tests/unit/drivers/test_base.py
@@ -354,6 +354,127 @@ class CleanStepTestCase(base.TestCase):
         method_args_mock.assert_called_once_with(task_mock, **args)
 
 
+class DeployStepTestCase(base.TestCase):
+    def test_get_and_execute_deploy_steps(self):
+        # Create a fake Driver class, create some deploy steps, make sure
+        # they are listed correctly, and attempt to execute one of them
+
+        method_mock = mock.MagicMock(spec_set=[])
+        method_args_mock = mock.MagicMock(spec_set=[])
+        task_mock = mock.MagicMock(spec_set=[])
+
+        class BaseTestClass(driver_base.BaseInterface):
+            def get_properties(self):
+                return {}
+
+            def validate(self, task):
+                pass
+
+        class TestClass(BaseTestClass):
+            interface_type = 'test'
+
+            @driver_base.deploy_step(priority=0)
+            def deploy_zero(self, task):
+                pass
+
+            @driver_base.deploy_step(priority=10)
+            def deploy_ten(self, task):
+                method_mock(task)
+
+            def not_deploy_method(self, task):
+                pass
+
+        class TestClass2(BaseTestClass):
+            interface_type = 'test2'
+
+            @driver_base.deploy_step(priority=0)
+            def deploy_zero2(self, task):
+                pass
+
+            @driver_base.deploy_step(priority=20)
+            def deploy_twenty(self, task):
+                method_mock(task)
+
+            def not_deploy_method2(self, task):
+                pass
+
+        class TestClass3(BaseTestClass):
+            interface_type = 'test3'
+
+            @driver_base.deploy_step(priority=0, argsinfo={
+                                     'arg1': {'description': 'desc1',
+                                              'required': True}})
+            def deploy_zero3(self, task, **kwargs):
+                method_args_mock(task, **kwargs)
+
+            @driver_base.deploy_step(priority=15, argsinfo={
+                                     'arg10': {'description': 'desc10'}})
+            def deploy_fifteen(self, task, **kwargs):
+                pass
+
+            def not_deploy_method3(self, task):
+                pass
+
+        obj = TestClass()
+        obj2 = TestClass2()
+        obj3 = TestClass3()
+
+        self.assertEqual(2, len(obj.get_deploy_steps(task_mock)))
+        # Ensure the steps look correct
+        self.assertEqual(10, obj.get_deploy_steps(task_mock)[0]['priority'])
+        self.assertEqual('test', obj.get_deploy_steps(
+            task_mock)[0]['interface'])
+        self.assertEqual('deploy_ten', obj.get_deploy_steps(
+            task_mock)[0]['step'])
+        self.assertEqual(0, obj.get_deploy_steps(task_mock)[1]['priority'])
+        self.assertEqual('test', obj.get_deploy_steps(
+            task_mock)[1]['interface'])
+        self.assertEqual('deploy_zero', obj.get_deploy_steps(
+            task_mock)[1]['step'])
+
+        # Ensure the second obj has different deploy steps
+        self.assertEqual(2, len(obj2.get_deploy_steps(task_mock)))
+        # Ensure the steps look correct
+        self.assertEqual(20, obj2.get_deploy_steps(task_mock)[0]['priority'])
+        self.assertEqual('test2', obj2.get_deploy_steps(
+            task_mock)[0]['interface'])
+        self.assertEqual('deploy_twenty', obj2.get_deploy_steps(
+            task_mock)[0]['step'])
+        self.assertEqual(0, obj2.get_deploy_steps(task_mock)[1]['priority'])
+        self.assertEqual('test2', obj2.get_deploy_steps(
+            task_mock)[1]['interface'])
+        self.assertEqual('deploy_zero2', obj2.get_deploy_steps(
+            task_mock)[1]['step'])
+        self.assertIsNone(obj2.get_deploy_steps(task_mock)[0]['argsinfo'])
+
+        # Ensure the third obj has different deploy steps
+        self.assertEqual(2, len(obj3.get_deploy_steps(task_mock)))
+        self.assertEqual(15, obj3.get_deploy_steps(task_mock)[0]['priority'])
+        self.assertEqual('test3', obj3.get_deploy_steps(
+            task_mock)[0]['interface'])
+        self.assertEqual('deploy_fifteen', obj3.get_deploy_steps(
+            task_mock)[0]['step'])
+        self.assertEqual({'arg10': {'description': 'desc10'}},
+                         obj3.get_deploy_steps(task_mock)[0]['argsinfo'])
+        self.assertEqual(0, obj3.get_deploy_steps(task_mock)[1]['priority'])
+        self.assertEqual(obj3.interface_type, obj3.get_deploy_steps(
+            task_mock)[1]['interface'])
+        self.assertEqual('deploy_zero3', obj3.get_deploy_steps(
+            task_mock)[1]['step'])
+        self.assertEqual({'arg1': {'description': 'desc1', 'required': True}},
+                         obj3.get_deploy_steps(task_mock)[1]['argsinfo'])
+
+        # Ensure we can execute the function.
+        obj.execute_deploy_step(task_mock, obj.get_deploy_steps(task_mock)[0])
+        method_mock.assert_called_once_with(task_mock)
+
+        args = {'arg1': 'val1'}
+        deploy_step = {'interface': 'test3', 'step': 'deploy_zero3',
+                       'args': args}
+        obj3.execute_deploy_step(task_mock, deploy_step)
+        method_args_mock.assert_called_once_with(task_mock, **args)
+
+
 class MyRAIDInterface(driver_base.RAIDInterface):
 
     def create_configuration(self, task):
diff --git a/releasenotes/notes/deploy_steps-243b341cf742f7cc.yaml b/releasenotes/notes/deploy_steps-243b341cf742f7cc.yaml
new file mode 100644
index 0000000000..80f6bb76a4
--- /dev/null
+++ b/releasenotes/notes/deploy_steps-243b341cf742f7cc.yaml
@@ -0,0 +1,13 @@
+---
+features:
+  - |
+    The `framework for deployment steps
+    <https://specs.openstack.org/openstack/ironic-specs/specs/approved/deployment-steps-framework.html>`_
+    is in place. All in-tree drivers (DeployInterfaces) have one (big) deploy
+    step; the conductor executes this step when deploying a node.
+deprecations:
+  - |
+    All drivers must implement their deployment process using `deploy steps`.
+    Out-of-tree drivers without deploy steps will be supported until the T* release.
+    For more details, see
+    `story 1753128 <https://storyboard.openstack.org/#!/story/1753128>`_.