diff --git a/doc/source/contributor/webapi-version-history.rst b/doc/source/contributor/webapi-version-history.rst
index e5bf5f750f..fcb20ba188 100644
--- a/doc/source/contributor/webapi-version-history.rst
+++ b/doc/source/contributor/webapi-version-history.rst
@@ -2,6 +2,13 @@
 REST API Version History
 ========================
 
+1.72 (Wallaby, 17.0)
+----------------------
+
+Add support for ``agent_status`` and ``agent_status_message`` to /v1/heartbeat.
+These fields are used for external installation tools, such as Anaconda, to
+report back status.
+
 1.71 (Wallaby, 17.0)
 ----------------------
 
diff --git a/ironic/api/controllers/v1/ramdisk.py b/ironic/api/controllers/v1/ramdisk.py
index 46cc9fa530..2b26d35fc1 100644
--- a/ironic/api/controllers/v1/ramdisk.py
+++ b/ironic/api/controllers/v1/ramdisk.py
@@ -35,6 +35,7 @@ LOG = log.getLogger(__name__)
 
 _LOOKUP_RETURN_FIELDS = ['uuid', 'properties', 'instance_info',
                          'driver_internal_info']
+AGENT_VALID_STATES = ['start', 'end', 'error']
 
 
 def config(token):
@@ -158,9 +159,11 @@ class HeartbeatController(rest.RestController):
     @method.expose(status_code=http_client.ACCEPTED)
     @args.validate(node_ident=args.uuid_or_name, callback_url=args.string,
                    agent_version=args.string, agent_token=args.string,
-                   agent_verify_ca=args.string)
+                   agent_verify_ca=args.string, agent_status=args.string,
+                   agent_status_message=args.string)
     def post(self, node_ident, callback_url, agent_version=None,
-             agent_token=None, agent_verify_ca=None):
+             agent_token=None, agent_verify_ca=None, agent_status=None,
+             agent_status_message=None):
         """Process a heartbeat from the deploy ramdisk.
 
         :param node_ident: the UUID or logical name of a node.
@@ -172,6 +175,11 @@ class HeartbeatController(rest.RestController):
             assumed.
         :param agent_token: randomly generated validation token.
         :param agent_verify_ca: TLS certificate to use to connect to the agent.
+        :param agent_status: Current status of the heartbeating agent. Used by
+            anaconda ramdisk to send status back to Ironic. The valid states
+            are 'start', 'end', 'error'
+        :param agent_status_message: Optional status message describing current
+            agent_status
         :raises: NodeNotFound if node with provided UUID or name was not found.
         :raises: InvalidUuidOrName if node_ident is not valid name or UUID.
         :raises: NoValidHost if RPC topic for node could not be retrieved.
@@ -185,6 +193,13 @@ class HeartbeatController(rest.RestController):
             raise exception.InvalidParameterValue(
                 _('Field "agent_version" not recognised'))
 
+        if ((agent_status or agent_status_message)
+                and not api_utils.allow_status_in_heartbeat()):
+            raise exception.InvalidParameterValue(
+                _('Fields "agent_status" and "agent_status_message" '
+                  'not recognised.')
+            )
+
         api_utils.check_policy('baremetal:node:ipa_heartbeat')
 
         if (agent_verify_ca is not None
@@ -213,6 +228,17 @@ class HeartbeatController(rest.RestController):
             raise exception.InvalidParameterValue(
                 _('Agent token is required for heartbeat processing.'))
 
+        if agent_status is not None and agent_status not in AGENT_VALID_STATES:
+            valid_states = ','.join(AGENT_VALID_STATES)
+            LOG.error('Agent heartbeat received for node %(node)s '
+                      'has an invalid agent status: %(agent_status)s. '
+                      'Valid states are %(valid_states)s ',
+                      {'node': node_ident, 'agent_status': agent_status,
+                       'valid_states': valid_states})
+            msg = (_('Agent status is invalid. Valid states are %s.') %
+                   valid_states)
+            raise exception.InvalidParameterValue(msg)
+
         try:
             topic = api.request.rpcapi.get_topic_for(rpc_node)
         except exception.NoValidHost as e:
@@ -221,4 +247,5 @@ class HeartbeatController(rest.RestController):
 
         api.request.rpcapi.heartbeat(
             api.request.context, rpc_node.uuid, callback_url,
-            agent_version, agent_token, agent_verify_ca, topic=topic)
+            agent_version, agent_token, agent_verify_ca, agent_status,
+            agent_status_message, topic=topic)
diff --git a/ironic/api/controllers/v1/utils.py b/ironic/api/controllers/v1/utils.py
index 0e4b77ec2e..65ce7fda58 100644
--- a/ironic/api/controllers/v1/utils.py
+++ b/ironic/api/controllers/v1/utils.py
@@ -1884,6 +1884,11 @@ def allow_deploy_steps():
     return api.request.version.minor >= versions.MINOR_69_DEPLOY_STEPS
 
 
+def allow_status_in_heartbeat():
+    """Check if heartbeat accepts agent_status and agent_status_message."""
+    return api.request.version.minor >= versions.MINOR_72_HEARTBEAT_STATUS
+
+
 def check_allow_deploy_steps(target, deploy_steps):
     """Check if deploy steps are allowed"""
 
diff --git a/ironic/api/controllers/v1/versions.py b/ironic/api/controllers/v1/versions.py
index caaa94924a..12c15fe837 100644
--- a/ironic/api/controllers/v1/versions.py
+++ b/ironic/api/controllers/v1/versions.py
@@ -109,6 +109,7 @@ BASE_VERSION = 1
 # v1.69: Add deploy_steps to provisioning
 # v1.70: Add disable_ramdisk to manual cleaning.
 # v1.71: Add signifier for Scope based roles.
+# v1.72: Add agent_status and agent_status_message to /v1/heartbeat
 
 MINOR_0_JUNO = 0
 MINOR_1_INITIAL_VERSION = 1
@@ -182,6 +183,7 @@ MINOR_68_HEARTBEAT_VERIFY_CA = 68
 MINOR_69_DEPLOY_STEPS = 69
 MINOR_70_CLEAN_DISABLE_RAMDISK = 70
 MINOR_71_RBAC_SCOPES = 71
+MINOR_72_HEARTBEAT_STATUS = 72
 
 # When adding another version, update:
 # - MINOR_MAX_VERSION
@@ -189,7 +191,7 @@ MINOR_71_RBAC_SCOPES = 71
 #   explanation of what changed in the new version
 # - common/release_mappings.py, RELEASE_MAPPING['master']['api']
 
-MINOR_MAX_VERSION = MINOR_71_RBAC_SCOPES
+MINOR_MAX_VERSION = MINOR_72_HEARTBEAT_STATUS
 
 # String representations of the minor and maximum versions
 _MIN_VERSION_STRING = '{}.{}'.format(BASE_VERSION, MINOR_1_INITIAL_VERSION)
diff --git a/ironic/common/release_mappings.py b/ironic/common/release_mappings.py
index d0bc46637e..37b5f11108 100644
--- a/ironic/common/release_mappings.py
+++ b/ironic/common/release_mappings.py
@@ -302,8 +302,8 @@ RELEASE_MAPPING = {
         }
     },
     '17.0': {
-        'api': '1.71',
-        'rpc': '1.53',
+        'api': '1.72',
+        'rpc': '1.54',
         'objects': {
             'Allocation': ['1.1'],
             'Node': ['1.35'],
@@ -320,8 +320,8 @@ RELEASE_MAPPING = {
         }
     },
     'master': {
-        'api': '1.71',
-        'rpc': '1.53',
+        'api': '1.72',
+        'rpc': '1.54',
         'objects': {
             'Allocation': ['1.1'],
             'Node': ['1.35'],
diff --git a/ironic/conductor/manager.py b/ironic/conductor/manager.py
index aafdd41def..c26b10930f 100644
--- a/ironic/conductor/manager.py
+++ b/ironic/conductor/manager.py
@@ -91,7 +91,7 @@ class ConductorManager(base_manager.BaseConductorManager):
     # NOTE(rloo): This must be in sync with rpcapi.ConductorAPI's.
     # NOTE(pas-ha): This also must be in sync with
     #               ironic.common.release_mappings.RELEASE_MAPPING['master']
-    RPC_API_VERSION = '1.53'
+    RPC_API_VERSION = '1.54'
 
     target = messaging.Target(version=RPC_API_VERSION)
 
@@ -3034,7 +3034,8 @@ class ConductorManager(base_manager.BaseConductorManager):
     @messaging.expected_exceptions(exception.InvalidParameterValue)
     @messaging.expected_exceptions(exception.NoFreeConductorWorker)
     def heartbeat(self, context, node_id, callback_url, agent_version=None,
-                  agent_token=None, agent_verify_ca=None):
+                  agent_token=None, agent_verify_ca=None, agent_status=None,
+                  agent_status_message=None):
         """Process a heartbeat from the ramdisk.
 
         :param context: request context.
@@ -3048,13 +3049,18 @@ class ConductorManager(base_manager.BaseConductorManager):
             agent_version, in these cases assume agent v3.0.0 (the last release
             before sending agent_version was introduced).
         :param agent_token: randomly generated validation token.
+        :param agent_status: Status of the heartbeating agent. Agent status is
+            one of 'start', 'end', error'
+        :param agent_status_message: Message describing agent's status
         :param agent_verify_ca: TLS certificate for the agent.
         :raises: NoFreeConductorWorker if there are no conductors to process
             this heartbeat request.
         """
         LOG.debug('RPC heartbeat called for node %s', node_id)
 
-        if agent_version is None:
+        # Do not raise exception if version is missing when agent is
+        # anaconda ramdisk.
+        if agent_version is None and agent_status is None:
             LOG.error('Node %s transmitted no version information which '
                       'indicates the agent is incompatible with the ironic '
                       'services and must be upgraded.', node_id)
@@ -3091,7 +3097,8 @@ class ConductorManager(base_manager.BaseConductorManager):
 
             task.spawn_after(
                 self._spawn_worker, task.driver.deploy.heartbeat,
-                task, callback_url, agent_version, agent_verify_ca)
+                task, callback_url, agent_version, agent_verify_ca,
+                agent_status, agent_status_message)
 
     @METRICS.timer('ConductorManager.vif_list')
     @messaging.expected_exceptions(exception.NetworkError,
diff --git a/ironic/conductor/rpcapi.py b/ironic/conductor/rpcapi.py
index 66b206cc14..d05f75228a 100644
--- a/ironic/conductor/rpcapi.py
+++ b/ironic/conductor/rpcapi.py
@@ -106,13 +106,14 @@ class ConductorAPI(object):
     |    1.51 - Added agent_verify_ca to heartbeat.
     |    1.52 - Added deploy steps argument to provisioning
     |    1.53 - Added disable_ramdisk to do_node_clean.
-
+    |    1.54 - Added optional agent_status and agent_status_message to
+                heartbeat
     """
 
     # NOTE(rloo): This must be in sync with manager.ConductorManager's.
     # NOTE(pas-ha): This also must be in sync with
     #               ironic.common.release_mappings.RELEASE_MAPPING['master']
-    RPC_API_VERSION = '1.53'
+    RPC_API_VERSION = '1.54'
 
     def __init__(self, topic=None):
         super(ConductorAPI, self).__init__()
@@ -920,7 +921,8 @@ class ConductorAPI(object):
                           node_id=node_id, clean_steps=clean_steps, **params)
 
     def heartbeat(self, context, node_id, callback_url, agent_version,
-                  agent_token=None, agent_verify_ca=None, topic=None):
+                  agent_token=None, agent_verify_ca=None, agent_status=None,
+                  agent_status_message=None, topic=None):
         """Process a node heartbeat.
 
         :param context: request context.
@@ -930,6 +932,9 @@ class ConductorAPI(object):
         :param agent_token: randomly generated validation token.
         :param agent_version: the version of the agent that is heartbeating
         :param agent_verify_ca: TLS certificate for the agent.
+        :param agent_status: The status of the agent that is heartbeating
+        :param agent_status_message: Optional message describing the agent
+            status
         :raises: InvalidParameterValue if an invalid agent token is received.
         """
         new_kws = {}
@@ -943,6 +948,10 @@ class ConductorAPI(object):
         if self.client.can_send_version('1.51'):
             version = '1.51'
             new_kws['agent_verify_ca'] = agent_verify_ca
+        if self.client.can_send_version('1.54'):
+            version = '1.54'
+            new_kws['agent_status'] = agent_status
+            new_kws['agent_status_message'] = agent_status_message
         cctxt = self.client.prepare(topic=topic or self.topic, version=version)
         return cctxt.call(context, 'heartbeat', node_id=node_id,
                           callback_url=callback_url, **new_kws)
diff --git a/ironic/drivers/base.py b/ironic/drivers/base.py
index ebeff2de5a..2b1700e382 100644
--- a/ironic/drivers/base.py
+++ b/ironic/drivers/base.py
@@ -478,13 +478,16 @@ class DeployInterface(BaseInterface):
         pass
 
     def heartbeat(self, task, callback_url, agent_version,
-                  agent_verify_ca=None):
+                  agent_verify_ca=None, agent_status=None,
+                  agent_status_message=None):
         """Record a heartbeat for the node.
 
         :param task: A TaskManager instance containing the node to act on.
         :param callback_url: a URL to use to call to the ramdisk.
         :param agent_version: The version of the agent that is heartbeating
         :param agent_verify_ca: TLS certificate for the agent.
+        :param agent_status: Status of the heartbeating agent
+        :param agent_status_message: Message describing the agent status
         :return: None
         """
         LOG.warning('Got heartbeat message from node %(node)s, but '
diff --git a/ironic/drivers/modules/agent_base.py b/ironic/drivers/modules/agent_base.py
index ef2fe307c5..c51f8ec812 100644
--- a/ironic/drivers/modules/agent_base.py
+++ b/ironic/drivers/modules/agent_base.py
@@ -612,13 +612,17 @@ class HeartbeatMixin(object):
 
     @METRICS.timer('HeartbeatMixin.heartbeat')
     def heartbeat(self, task, callback_url, agent_version,
-                  agent_verify_ca=None):
+                  agent_verify_ca=None, agent_status=None,
+                  agent_status_message=None):
         """Process a heartbeat.
 
         :param task: task to work with.
         :param callback_url: agent HTTP API URL.
         :param agent_version: The version of the agent that is heartbeating
         :param agent_verify_ca: TLS certificate for the agent.
+        :param agent_status: Status of the heartbeating agent
+        :param agent_status_message: Status message that describes the
+            agent_status
         """
         # NOTE(pas-ha) immediately skip the rest if nothing to do
         if (task.node.provision_state not in self.heartbeat_allowed_states
@@ -649,6 +653,11 @@ class HeartbeatMixin(object):
             timeutils.utcnow().isoformat())
         if agent_verify_ca:
             driver_internal_info['agent_verify_ca'] = agent_verify_ca
+        if agent_status:
+            driver_internal_info['agent_status'] = agent_status
+        if agent_status_message:
+            driver_internal_info['agent_status_message'] = \
+                agent_status_message
         node.driver_internal_info = driver_internal_info
         node.save()
 
diff --git a/ironic/drivers/modules/ks.cfg.template b/ironic/drivers/modules/ks.cfg.template
index 3d74c4f3c8..1a2cecaf3e 100644
--- a/ironic/drivers/modules/ks.cfg.template
+++ b/ironic/drivers/modules/ks.cfg.template
@@ -19,19 +19,19 @@ liveimg --url {{ ks_options.liveimg_url }}
 
 # Following %pre, %onerror and %trackback sections are mandatory
 %pre
-/usr/bin/curl -X PUT -H 'Content-Type: application/json' -H 'Accept:application/json' -d '{"agent_token": "{{ ks_options.agent_token }}", "agent_state": "start", "agent_status": "Deployment starting. Running pre-installation scripts."}' {{ ks_options.heartbeat_url }}
+/usr/bin/curl -X PUT -H 'Content-Type: application/json' -H 'Accept: application/json' -H 'X-OpenStack-Ironic-API-Version: 1.72' -d '{"agent_token": "{{ ks_options.agent_token }}", "agent_status": "start", "agent_status_message": "Deployment starting. Running pre-installation scripts."}' {{ ks_options.heartbeat_url }}
 %end
 
 %onerror
-/usr/bin/curl -X PUT -H 'Content-Type: application/json' -H 'Accept:application/json' -d '{"agent_token": "{{ ks_options.agent_token }}", "agent_state": "error", "agent_status": "Error: Deploying using anaconda. Check console for more information."}' {{ ks_options.heartbeat_url }}
+/usr/bin/curl -X PUT -H 'Content-Type: application/json' -H 'Accept: application/json' -H 'X-OpenStack-Ironic-API-Version: 1.72' -d '{"agent_token": "{{ ks_options.agent_token }}", "agent_status": "error", "agent_status_message": "Error: Deploying using anaconda. Check console for more information."}' {{ ks_options.heartbeat_url }}
 %end
 
 %traceback
-/usr/bin/curl -X PUT -H 'Content-Type: application/json' -H 'Accept:application/json' -d '{"agent_token": "{{ ks_options.agent_token }}", "agent_state": "error", "agent_status": "Error: Installer crashed unexpectedly."}' {{ ks_options.heartbeat_url }}
+/usr/bin/curl -X PUT -H 'Content-Type: application/json' -H 'Accept: application/json' -H 'X-OpenStack-Ironic-API-Version: 1.72' -d '{"agent_token": "{{ ks_options.agent_token }}", "agent_status": "error", "agent_status_message": "Error: Installer crashed unexpectedly."}' {{ ks_options.heartbeat_url }}
 %end
 
 # Sending callback after the installation is mandatory
 %post
-/usr/bin/curl -X PUT -H 'Content-Type: application/json' -H 'Accept:application/json' -d '{"agent_token": "{{ ks_options.agent_token }}", "agent_state": "end", "agent_status": "Deployment completed successfully."}' {{ ks_options.heartbeat_url }}
+/usr/bin/curl -X PUT -H 'Content-Type: application/json' -H 'Accept: application/json' -H 'X-OpenStack-Ironic-API-Version: 1.72' -d '{"agent_token": "{{ ks_options.agent_token }}", "agent_status": "end", "agent_status_message": "Deployment completed successfully."}' {{ ks_options.heartbeat_url }}
 %end
 
diff --git a/ironic/drivers/modules/pxe.py b/ironic/drivers/modules/pxe.py
index 97f8e5961f..07b54acae5 100644
--- a/ironic/drivers/modules/pxe.py
+++ b/ironic/drivers/modules/pxe.py
@@ -18,6 +18,7 @@ PXE Boot Interface
 from ironic_lib import metrics_utils
 from oslo_log import log as logging
 
+from ironic.common import boot_devices
 from ironic.common import exception
 from ironic.common.i18n import _
 from ironic.common import states
@@ -40,7 +41,7 @@ class PXEBoot(pxe_base.PXEBaseMixin, base.BootInterface):
 class PXERamdiskDeploy(agent_base.AgentBaseMixin, agent_base.HeartbeatMixin,
                        base.DeployInterface):
 
-    def get_properties(self, task):
+    def get_properties(self):
         return {}
 
     def validate(self, task):
@@ -121,19 +122,113 @@ class PXERamdiskDeploy(agent_base.AgentBaseMixin, agent_base.HeartbeatMixin,
 class PXEAnacondaDeploy(agent_base.AgentBaseMixin, agent_base.HeartbeatMixin,
                         base.DeployInterface):
 
-    def get_properties(self, task):
+    def get_properties(self):
         return {}
 
     def validate(self, task):
-        pass
+        task.driver.boot.validate(task)
 
     @METRICS.timer('AnacondaDeploy.deploy')
     @base.deploy_step(priority=100)
     @task_manager.require_exclusive_lock
     def deploy(self, task):
-        pass
+        manager_utils.node_power_action(task, states.POWER_OFF)
+        with manager_utils.power_state_for_network_configuration(task):
+            task.driver.network.configure_tenant_networks(task)
+
+        # calling boot.prepare_instance will also set the node
+        # to PXE boot, and update PXE templates accordingly
+        task.driver.boot.prepare_instance(task)
+
+        # Power-on the instance, with PXE prepared, we're done.
+        manager_utils.node_power_action(task, states.POWER_ON)
+        LOG.info('Deployment setup for node %s done', task.node.uuid)
+        return None
 
     @METRICS.timer('AnacondaDeploy.prepare')
     @task_manager.require_exclusive_lock
     def prepare(self, task):
-        pass
+        node = task.node
+
+        deploy_utils.populate_storage_driver_internal_info(task)
+        if node.provision_state == states.DEPLOYING:
+            # Ask the network interface to validate itself so
+            # we can ensure we are able to proceed.
+            task.driver.network.validate(task)
+
+            manager_utils.node_power_action(task, states.POWER_OFF)
+            # NOTE(TheJulia): If this was any other interface, we would
+            # unconfigure tenant networks, add provisioning networks, etc.
+            task.driver.storage.attach_volumes(task)
+        if node.provision_state in (states.ACTIVE, states.UNRESCUING):
+            # In the event of takeover or unrescue.
+            task.driver.boot.prepare_instance(task)
+
+    def deploy_has_started(self, task):
+        agent_status = task.node.driver_internal_info.get('agent_status')
+        if agent_status == 'start':
+            return True
+        return False
+
+    def deploy_is_done(self, task):
+        agent_status = task.node.driver_internal_info.get('agent_status')
+        if agent_status == 'end':
+            return True
+        return False
+
+    def should_manage_boot(self, task):
+        return False
+
+    def reboot_to_instance(self, task):
+        node = task.node
+        try:
+            # anaconda deploy will install the bootloader and the node is ready
+            # to boot from disk.
+
+            deploy_utils.try_set_boot_device(task, boot_devices.DISK)
+        except Exception as e:
+            msg = (_("Failed to change the boot device to %(boot_dev)s "
+                     "when deploying node %(node)s. Error: %(error)s") %
+                   {'boot_dev': boot_devices.DISK, 'node': node.uuid,
+                    'error': e})
+            agent_base.log_and_raise_deployment_error(task, msg)
+
+        try:
+            self.clean_up(task)
+            manager_utils.node_power_action(task, states.POWER_OFF)
+            task.driver.network.remove_provisioning_network(task)
+            task.driver.network.configure_tenant_networks(task)
+            manager_utils.node_power_action(task, states.POWER_ON)
+            node.provision_state = states.ACTIVE
+            node.save()
+        except Exception as e:
+            msg = (_('Error rebooting node %(node)s after deploy. '
+                     'Error: %(error)s') %
+                   {'node': node.uuid, 'error': e})
+            agent_base.log_and_raise_deployment_error(task, msg)
+
+    def _heartbeat_deploy_wait(self, task):
+        node = task.node
+        agent_status_message = node.driver_internal_info.get(
+            'agent_status_message'
+        )
+        msg = {'node_id': node.uuid,
+               'agent_status_message': agent_status_message}
+
+        if self.deploy_has_started(task):
+            LOG.info('The deploy on node %(node_id)s has started. Anaconda '
+                     'returned following message: '
+                     '%(agent_status_message)s ', msg)
+            node.touch_provisioning()
+
+        elif self.deploy_is_done(task):
+            LOG.info('The deploy on node %(node_id)s has ended. Anaconda '
+                     'agent returned following message: '
+                     '%(agent_status_message)s', msg)
+            self.reboot_to_instance(task)
+        else:
+            LOG.error('The deploy on node %(node_id)s failed. Anaconda '
+                      'returned following error message: '
+                      '%(agent_status_message)s', msg)
+            deploy_utils.set_failed_state(task, agent_status_message,
+                                          collect_logs=False)
diff --git a/ironic/tests/base.py b/ironic/tests/base.py
index e19a2380ce..f5000d0be8 100644
--- a/ironic/tests/base.py
+++ b/ironic/tests/base.py
@@ -159,7 +159,7 @@ class TestCase(oslo_test_base.BaseTestCase):
                 values = ['fake']
 
             if iface == 'deploy':
-                values.extend(['iscsi', 'direct'])
+                values.extend(['iscsi', 'direct', 'anaconda'])
             elif iface == 'boot':
                 values.append('pxe')
             elif iface == 'storage':
diff --git a/ironic/tests/unit/api/controllers/v1/test_ramdisk.py b/ironic/tests/unit/api/controllers/v1/test_ramdisk.py
index 51005b772b..ec72f9ea3b 100644
--- a/ironic/tests/unit/api/controllers/v1/test_ramdisk.py
+++ b/ironic/tests/unit/api/controllers/v1/test_ramdisk.py
@@ -226,7 +226,8 @@ class TestHeartbeat(test_api_base.BaseApiTest):
         self.assertEqual(b'', response.body)
         mock_heartbeat.assert_called_once_with(mock.ANY, mock.ANY,
                                                node.uuid, 'url', None, 'x',
-                                               None, topic='test-topic')
+                                               None, None, None,
+                                               topic='test-topic')
 
     @mock.patch.object(rpcapi.ConductorAPI, 'heartbeat', autospec=True)
     def test_ok_with_json(self, mock_heartbeat):
@@ -241,7 +242,8 @@ class TestHeartbeat(test_api_base.BaseApiTest):
         mock_heartbeat.assert_called_once_with(mock.ANY, mock.ANY,
                                                node.uuid, 'url', None,
                                                'maybe some magic',
-                                               None, topic='test-topic')
+                                               None, None, None,
+                                               topic='test-topic')
 
     @mock.patch.object(rpcapi.ConductorAPI, 'heartbeat', autospec=True)
     def test_ok_by_name(self, mock_heartbeat):
@@ -255,8 +257,8 @@ class TestHeartbeat(test_api_base.BaseApiTest):
         self.assertEqual(b'', response.body)
         mock_heartbeat.assert_called_once_with(mock.ANY, mock.ANY,
                                                node.uuid, 'url', None,
-                                               'token',
-                                               None, topic='test-topic')
+                                               'token', None, None, None,
+                                               topic='test-topic')
 
     @mock.patch.object(rpcapi.ConductorAPI, 'heartbeat', autospec=True)
     def test_ok_agent_version(self, mock_heartbeat):
@@ -272,7 +274,8 @@ class TestHeartbeat(test_api_base.BaseApiTest):
         mock_heartbeat.assert_called_once_with(mock.ANY, mock.ANY,
                                                node.uuid, 'url', '1.4.1',
                                                'meow',
-                                               None, topic='test-topic')
+                                               None, None, None,
+                                               topic='test-topic')
 
     @mock.patch.object(rpcapi.ConductorAPI, 'heartbeat', autospec=True)
     def test_old_API_agent_version_error(self, mock_heartbeat):
@@ -309,7 +312,7 @@ class TestHeartbeat(test_api_base.BaseApiTest):
         self.assertEqual(b'', response.body)
         mock_heartbeat.assert_called_once_with(mock.ANY, mock.ANY,
                                                node.uuid, 'url', None,
-                                               'abcdef1', None,
+                                               'abcdef1', None, None, None,
                                                topic='test-topic')
 
     @mock.patch.object(rpcapi.ConductorAPI, 'heartbeat', autospec=True)
@@ -325,9 +328,41 @@ class TestHeartbeat(test_api_base.BaseApiTest):
         self.assertEqual(b'', response.body)
         mock_heartbeat.assert_called_once_with(mock.ANY, mock.ANY,
                                                node.uuid, 'url', None,
-                                               'meow', 'abcdef1',
+                                               'meow', 'abcdef1', None, None,
                                                topic='test-topic')
 
+    @mock.patch.object(rpcapi.ConductorAPI, 'heartbeat', autospec=True)
+    def test_ok_agent_status_and_status(self, mock_heartbeat):
+        node = obj_utils.create_test_node(self.context)
+        response = self.post_json(
+            '/heartbeat/%s' % node.uuid,
+            {'callback_url': 'url',
+             'agent_token': 'meow',
+             'agent_status': 'start',
+             'agent_status_message': 'woof',
+             'agent_verify_ca': 'abcdef1'},
+            headers={api_base.Version.string: str(api_v1.max_version())})
+        self.assertEqual(http_client.ACCEPTED, response.status_int)
+        self.assertEqual(b'', response.body)
+        mock_heartbeat.assert_called_once_with(mock.ANY, mock.ANY,
+                                               node.uuid, 'url', None,
+                                               'meow', 'abcdef1', 'start',
+                                               'woof', topic='test-topic')
+
+    @mock.patch.object(rpcapi.ConductorAPI, 'heartbeat', autospec=True)
+    def test_bad_invalid_agent_status(self, mock_heartbeat):
+        node = obj_utils.create_test_node(self.context)
+        response = self.post_json(
+            '/heartbeat/%s' % node.uuid,
+            {'callback_url': 'url',
+             'agent_token': 'meow',
+             'agent_status': 'invalid_state',
+             'agent_status_message': 'woof',
+             'agent_verify_ca': 'abcdef1'},
+            headers={api_base.Version.string: str(api_v1.max_version())},
+            expect_errors=True)
+        self.assertEqual(http_client.BAD_REQUEST, response.status_int)
+
     @mock.patch.object(rpcapi.ConductorAPI, 'heartbeat', autospec=True)
     def test_old_API_agent_verify_ca_error(self, mock_heartbeat):
         node = obj_utils.create_test_node(self.context)
@@ -340,6 +375,20 @@ class TestHeartbeat(test_api_base.BaseApiTest):
             expect_errors=True)
         self.assertEqual(http_client.BAD_REQUEST, response.status_int)
 
+    @mock.patch.object(rpcapi.ConductorAPI, 'heartbeat', autospec=True)
+    def test_old_api_agent_status_error(self, mock_heartbeat):
+        node = obj_utils.create_test_node(self.context)
+        response = self.post_json(
+            '/heartbeat/%s' % node.uuid,
+            {'callback_url': 'url',
+             'agent_token': 'meow',
+             'agent_verify_ca': 'abcd',
+             'agent_status': 'wow',
+             'agent_status_message': 'much status'},
+            headers={api_base.Version.string: '1.71'},
+            expect_errors=True)
+        self.assertEqual(http_client.BAD_REQUEST, response.status_int)
+
 
 @mock.patch.object(auth_token.AuthProtocol, 'process_request',
                    lambda *_: None)
diff --git a/ironic/tests/unit/conductor/test_manager.py b/ironic/tests/unit/conductor/test_manager.py
index 2957e6b918..a4ce0ecd48 100644
--- a/ironic/tests/unit/conductor/test_manager.py
+++ b/ironic/tests/unit/conductor/test_manager.py
@@ -7185,6 +7185,32 @@ class DoNodeAdoptionTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
         self.assertEqual(exception.InvalidParameterValue, exc.exc_info[0])
         self.assertEqual(expected_string, str(exc.exc_info[1]))
 
+    @mock.patch('ironic.drivers.modules.fake.FakeDeploy.heartbeat',
+                autospec=True)
+    @mock.patch('ironic.conductor.manager.ConductorManager._spawn_worker',
+                autospec=True)
+    def test_heartbeat_without_agent_version_anaconda(self, mock_spawn,
+                                                      mock_heartbeat):
+        """Test heartbeating anaconda deploy ramdisk without agent_version"""
+        node = obj_utils.create_test_node(
+            self.context, driver='fake-hardware',
+            provision_state=states.DEPLOYING,
+            target_provision_state=states.ACTIVE,
+            driver_internal_info={'agent_secret_token': 'magic'})
+
+        self._start_service()
+
+        mock_spawn.reset_mock()
+
+        mock_spawn.side_effect = self._fake_spawn
+
+        self.service.heartbeat(self.context, node.uuid, 'http://callback',
+                               agent_version=None, agent_token='magic',
+                               agent_status='start')
+        mock_heartbeat.assert_called_with(mock.ANY, mock.ANY,
+                                          'http://callback', None,
+                                          None, 'start', None)
+
     @mock.patch('ironic.drivers.modules.fake.FakeDeploy.heartbeat',
                 autospec=True)
     @mock.patch('ironic.conductor.manager.ConductorManager._spawn_worker',
@@ -7206,7 +7232,8 @@ class DoNodeAdoptionTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
         self.service.heartbeat(self.context, node.uuid, 'http://callback',
                                '1.4.1', agent_token='magic')
         mock_heartbeat.assert_called_with(mock.ANY, mock.ANY,
-                                          'http://callback', '1.4.1', None)
+                                          'http://callback', '1.4.1', None,
+                                          None, None)
 
     @mock.patch('ironic.drivers.modules.fake.FakeDeploy.heartbeat',
                 autospec=True)
@@ -7254,7 +7281,8 @@ class DoNodeAdoptionTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
         self.service.heartbeat(self.context, node.uuid, 'http://callback',
                                '6.1.0', agent_token='a secret')
         mock_heartbeat.assert_called_with(mock.ANY, mock.ANY,
-                                          'http://callback', '6.1.0', None)
+                                          'http://callback', '6.1.0', None,
+                                          None, None)
 
     @mock.patch('ironic.drivers.modules.fake.FakeDeploy.heartbeat',
                 autospec=True)
@@ -7278,7 +7306,8 @@ class DoNodeAdoptionTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
         self.service.heartbeat(self.context, node.uuid, 'http://callback',
                                '6.1.0', agent_token='a secret')
         mock_heartbeat.assert_called_with(mock.ANY, mock.ANY,
-                                          'http://callback', '6.1.0', None)
+                                          'http://callback', '6.1.0', None,
+                                          None, None)
 
     @mock.patch('ironic.drivers.modules.fake.FakeDeploy.heartbeat',
                 autospec=True)
@@ -7410,8 +7439,8 @@ class DoNodeAdoptionTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
                                agent_version='6.1.0', agent_token='a secret',
                                agent_verify_ca='abcd')
         mock_heartbeat.assert_called_with(
-            mock.ANY, mock.ANY, 'http://callback', '6.1.0',
-            '/path/to/crt')
+            mock.ANY, mock.ANY, 'http://callback', '6.1.0', '/path/to/crt',
+            None, None)
 
 
 @mgr_utils.mock_record_keepalive
diff --git a/ironic/tests/unit/conductor/test_rpcapi.py b/ironic/tests/unit/conductor/test_rpcapi.py
index 3f0ae2040f..5624c6439f 100644
--- a/ironic/tests/unit/conductor/test_rpcapi.py
+++ b/ironic/tests/unit/conductor/test_rpcapi.py
@@ -560,7 +560,7 @@ class RPCAPITestCase(db_base.DbTestCase):
                           node_id='fake-node',
                           callback_url='http://ramdisk.url:port',
                           agent_version=None,
-                          version='1.51')
+                          version='1.54')
 
     def test_heartbeat_agent_token(self):
         self._test_rpcapi('heartbeat',
@@ -569,7 +569,7 @@ class RPCAPITestCase(db_base.DbTestCase):
                           callback_url='http://ramdisk.url:port',
                           agent_version=None,
                           agent_token='xyz1',
-                          version='1.51')
+                          version='1.54')
 
     def test_destroy_volume_connector(self):
         fake_volume_connector = db_utils.get_test_volume_connector()
diff --git a/ironic/tests/unit/drivers/modules/test_pxe.py b/ironic/tests/unit/drivers/modules/test_pxe.py
index 3f7d9e4b73..4b06a495a3 100644
--- a/ironic/tests/unit/drivers/modules/test_pxe.py
+++ b/ironic/tests/unit/drivers/modules/test_pxe.py
@@ -42,6 +42,7 @@ from ironic.drivers.modules import ipxe
 from ironic.drivers.modules import pxe
 from ironic.drivers.modules import pxe_base
 from ironic.drivers.modules.storage import noop as noop_storage
+from ironic import objects
 from ironic.tests.unit.db import base as db_base
 from ironic.tests.unit.db import utils as db_utils
 from ironic.tests.unit.objects import utils as obj_utils
@@ -1045,6 +1046,161 @@ class PXERamdiskDeployTestCase(db_base.DbTestCase):
             self.assertTrue(mock_warning.called)
 
 
+class PXEAnacondaDeployTestCase(db_base.DbTestCase):
+
+    def setUp(self):
+        super(PXEAnacondaDeployTestCase, self).setUp()
+        self.temp_dir = tempfile.mkdtemp()
+        self.config(tftp_root=self.temp_dir, group='pxe')
+        self.config_temp_dir('http_root', group='deploy')
+        self.config(http_url='http://fakeurl', group='deploy')
+        self.temp_dir = tempfile.mkdtemp()
+        self.config(images_path=self.temp_dir, group='pxe')
+        self.config(enabled_deploy_interfaces=['anaconda'])
+        self.config(enabled_boot_interfaces=['pxe'])
+        for iface in drivers_base.ALL_INTERFACES:
+            impl = 'fake'
+            if iface == 'network':
+                impl = 'noop'
+            if iface == 'deploy':
+                impl = 'anaconda'
+            if iface == 'boot':
+                impl = 'pxe'
+            config_kwarg = {'enabled_%s_interfaces' % iface: [impl],
+                            'default_%s_interface' % iface: impl}
+            self.config(**config_kwarg)
+        self.config(enabled_hardware_types=['fake-hardware'])
+        instance_info = INST_INFO_DICT
+        self.node = obj_utils.create_test_node(
+            self.context,
+            driver='fake-hardware',
+            instance_info=instance_info,
+            driver_info=DRV_INFO_DICT,
+            driver_internal_info=DRV_INTERNAL_INFO_DICT)
+        self.port = obj_utils.create_test_port(self.context,
+                                               node_id=self.node.id)
+        self.deploy = pxe.PXEAnacondaDeploy()
+
+    @mock.patch.object(pxe_utils, 'prepare_instance_kickstart_config',
+                       autospec=True)
+    @mock.patch.object(pxe_utils, 'validate_kickstart_file', autospec=True)
+    @mock.patch.object(pxe_utils, 'validate_kickstart_template', autospec=True)
+    @mock.patch.object(deploy_utils, 'switch_pxe_config', autospec=True)
+    @mock.patch.object(dhcp_factory, 'DHCPFactory', autospec=True)
+    @mock.patch.object(pxe_utils, 'cache_ramdisk_kernel', autospec=True)
+    @mock.patch.object(pxe_utils, 'get_instance_image_info', autospec=True)
+    def test_deploy(self, mock_image_info, mock_cache,
+                    mock_dhcp_factory, mock_switch_config, mock_ks_tmpl,
+                    mock_ks_file, mock_prepare_ks_config):
+        image_info = {'kernel': ('', '/path/to/kernel'),
+                      'ramdisk': ('', '/path/to/ramdisk'),
+                      'stage2': ('', '/path/to/stage2'),
+                      'ks_template': ('', '/path/to/ks_template'),
+                      'ks_cfg': ('', '/path/to/ks_cfg')}
+        mock_image_info.return_value = image_info
+        with task_manager.acquire(self.context, self.node.uuid) as task:
+            self.assertIsNone(task.driver.deploy.deploy(task))
+            mock_image_info.assert_called_once_with(task, ipxe_enabled=False)
+            mock_cache.assert_called_once_with(
+                task, image_info, ipxe_enabled=False)
+            mock_ks_tmpl.assert_called_once_with(image_info['ks_template'][1])
+            mock_ks_file.assert_called_once_with(mock_ks_tmpl.return_value)
+            mock_prepare_ks_config.assert_called_once_with(task, image_info,
+                                                           anaconda_boot=True)
+
+    @mock.patch.object(pxe.PXEBoot, 'prepare_instance', autospec=True)
+    def test_prepare(self, mock_prepare_instance):
+        node = self.node
+        node.provision_state = states.DEPLOYING
+        node.instance_info = {}
+        node.save()
+        with task_manager.acquire(self.context, node.uuid) as task:
+            task.driver.deploy.prepare(task)
+            self.assertFalse(mock_prepare_instance.called)
+
+    @mock.patch.object(pxe.PXEBoot, 'prepare_instance', autospec=True)
+    def test_prepare_active(self, mock_prepare_instance):
+        node = self.node
+        node.provision_state = states.ACTIVE
+        node.save()
+        with task_manager.acquire(self.context, node.uuid) as task:
+            task.driver.deploy.prepare(task)
+            mock_prepare_instance.assert_called_once_with(mock.ANY, task)
+
+    @mock.patch.object(pxe_utils, 'clean_up_pxe_env', autospec=True)
+    @mock.patch.object(pxe_utils, 'get_instance_image_info', autospec=True)
+    @mock.patch.object(deploy_utils, 'try_set_boot_device', autospec=True)
+    def test_reboot_to_instance(self, mock_set_boot_dev, mock_image_info,
+                                mock_cleanup_pxe_env):
+        image_info = {'kernel': ('', '/path/to/kernel'),
+                      'ramdisk': ('', '/path/to/ramdisk'),
+                      'stage2': ('', '/path/to/stage2'),
+                      'ks_template': ('', '/path/to/ks_template'),
+                      'ks_cfg': ('', '/path/to/ks_cfg')}
+        mock_image_info.return_value = image_info
+        with task_manager.acquire(self.context, self.node.uuid) as task:
+            task.driver.deploy.reboot_to_instance(task)
+            mock_set_boot_dev.assert_called_once_with(task, boot_devices.DISK)
+            mock_cleanup_pxe_env.assert_called_once_with(task, image_info,
+                                                         ipxe_enabled=False)
+
+    @mock.patch.object(objects.node.Node, 'touch_provisioning', autospec=True)
+    def test_heartbeat_deploy_start(self, mock_touch):
+        self.node.provision_state = states.DEPLOYWAIT
+        self.node.save()
+        with task_manager.acquire(self.context, self.node.uuid,
+                                  shared=True) as task:
+            self.deploy.heartbeat(task, 'url', '3.2.0', None, 'start', 'msg')
+            self.assertFalse(task.shared)
+            self.assertEqual(
+                'url', task.node.driver_internal_info['agent_url'])
+            self.assertEqual(
+                '3.2.0',
+                task.node.driver_internal_info['agent_version'])
+            self.assertEqual(
+                'start',
+                task.node.driver_internal_info['agent_status'])
+            mock_touch.assert_called()
+
+    @mock.patch.object(deploy_utils, 'set_failed_state', autospec=True)
+    def test_heartbeat_deploy_error(self, mock_set_failed_state):
+        self.node.provision_state = states.DEPLOYWAIT
+        self.node.save()
+        with task_manager.acquire(self.context, self.node.uuid,
+                                  shared=True) as task:
+            self.deploy.heartbeat(task, 'url', '3.2.0', None, 'error',
+                                  'errmsg')
+            self.assertFalse(task.shared)
+            self.assertEqual(
+                'url', task.node.driver_internal_info['agent_url'])
+            self.assertEqual(
+                '3.2.0',
+                task.node.driver_internal_info['agent_version'])
+            self.assertEqual(
+                'error',
+                task.node.driver_internal_info['agent_status'])
+            mock_set_failed_state.assert_called_once_with(task, 'errmsg',
+                                                          collect_logs=False)
+
+    @mock.patch.object(pxe.PXEAnacondaDeploy, 'reboot_to_instance',
+                       autospec=True)
+    def test_heartbeat_deploy_end(self, mock_reboot_to_instance):
+        self.node.provision_state = states.DEPLOYWAIT
+        self.node.save()
+        with task_manager.acquire(self.context, self.node.uuid,
+                                  shared=True) as task:
+            self.deploy.heartbeat(task, None, None, None, 'end', 'sucess')
+            self.assertFalse(task.shared)
+            self.assertIsNone(
+                task.node.driver_internal_info['agent_url'])
+            self.assertIsNone(
+                task.node.driver_internal_info['agent_version'])
+            self.assertEqual(
+                'end',
+                task.node.driver_internal_info['agent_status'])
+            self.assertTrue(mock_reboot_to_instance.called)
+
+
 class PXEValidateRescueTestCase(db_base.DbTestCase):
 
     def setUp(self):
diff --git a/releasenotes/notes/anaconda-deploy-interface-c04932f6f469227a.yaml b/releasenotes/notes/anaconda-deploy-interface-c04932f6f469227a.yaml
new file mode 100644
index 0000000000..4ef33b6d7e
--- /dev/null
+++ b/releasenotes/notes/anaconda-deploy-interface-c04932f6f469227a.yaml
@@ -0,0 +1,17 @@
+---
+features:
+  - |
+    Add ``anaconda`` deploy interface to Ironic. This driver will deploy
+    the OS using anaconda installer and kickstart file instead of IPA. To
+    support this feature a new configuration group ``anaconda`` is added to
+    Ironic configuration file along with ``default_ks_template`` configuration
+    option.
+
+    The deploy interface uses heartbeat API to communicate. The kickstart
+    template must include %pre %post %onerror and %traceback sections that
+    should send status of the deployment back to Ironic API using heartbeats.
+    An example of such calls to hearbeat API can be found in the default
+    kickstart template. To enable anaconda to send status back to Ironic API
+    via heartbeat ``agent_status`` and ``agent_status_message`` are added to
+    the heartbeat API. Use of these new parameters require API microversion
+    1.72 or greater.