diff --git a/ironic/conductor/manager.py b/ironic/conductor/manager.py
index 8c6b209e49..363c8ca111 100644
--- a/ironic/conductor/manager.py
+++ b/ironic/conductor/manager.py
@@ -816,8 +816,8 @@ class ConductorManager(base_manager.BaseConductorManager):
             if node.provision_state == states.CLEANWAIT:
                 task.process_event('resume', target_state=target_state)
 
-            task.set_spawn_error_hook(utils.cleaning_error_handler, task.node,
-                                      _('Failed to run next clean step'))
+            task.set_spawn_error_hook(utils.spawn_cleaning_error_handler,
+                                      task.node)
             task.spawn_after(
                 self._spawn_worker,
                 self._do_next_clean_step,
diff --git a/ironic/conductor/utils.py b/ironic/conductor/utils.py
index 60f4b0b9ca..99b0236d22 100644
--- a/ironic/conductor/utils.py
+++ b/ironic/conductor/utils.py
@@ -227,6 +227,15 @@ def cleaning_error_handler(task, msg, tear_down_cleaning=True,
         task.process_event('fail', target_state=target_state)
 
 
+def spawn_cleaning_error_handler(e, node):
+    """Handle spawning error for node cleaning."""
+    if isinstance(e, exception.NoFreeConductorWorker):
+        node.last_error = (_("No free conductor workers available"))
+        node.save()
+        LOG.warning(_LW("No free conductor workers available to perform "
+                        "cleaning on node %(node)s"), {'node': node.uuid})
+
+
 def power_state_error_handler(e, node, power_state):
     """Set the node's power states if error occurs.
 
diff --git a/ironic/drivers/modules/agent_base_vendor.py b/ironic/drivers/modules/agent_base_vendor.py
index cd25d06c93..0ae5876e11 100644
--- a/ironic/drivers/modules/agent_base_vendor.py
+++ b/ironic/drivers/modules/agent_base_vendor.py
@@ -350,15 +350,20 @@ class BaseAgentVendor(base.VendorInterface):
             # release starts.
             elif node.provision_state in (states.CLEANWAIT, states.CLEANING):
                 node.touch_provisioning()
-                if not node.clean_step:
-                    LOG.debug('Node %s just booted to start cleaning.',
-                              node.uuid)
-                    msg = _('Node failed to start the next cleaning step.')
-                    manager_utils.set_node_cleaning_steps(task)
-                    self.notify_conductor_resume_clean(task)
-                else:
-                    msg = _('Node failed to check cleaning progress.')
-                    self.continue_cleaning(task, **kwargs)
+                try:
+                    if not node.clean_step:
+                        LOG.debug('Node %s just booted to start cleaning.',
+                                  node.uuid)
+                        msg = _('Node failed to start the next cleaning step.')
+                        manager_utils.set_node_cleaning_steps(task)
+                        self.notify_conductor_resume_clean(task)
+                    else:
+                        msg = _('Node failed to check cleaning progress.')
+                        self.continue_cleaning(task, **kwargs)
+                except exception.NoFreeConductorWorker:
+                    # waiting for the next heartbeat, node.last_error and
+                    # logging message is filled already via conductor's hook
+                    pass
 
         except Exception as e:
             err_info = {'node': node.uuid, 'msg': msg, 'e': e}
diff --git a/ironic/tests/unit/conductor/test_utils.py b/ironic/tests/unit/conductor/test_utils.py
index 96b07a9ae9..a029a0dfff 100644
--- a/ironic/tests/unit/conductor/test_utils.py
+++ b/ironic/tests/unit/conductor/test_utils.py
@@ -538,3 +538,109 @@ class NodeCleaningStepsTestCase(base.DbTestCase):
                                     conductor_utils._validate_user_clean_steps,
                                     task, user_steps)
             mock_steps.assert_called_once_with(task, enabled=False, sort=False)
+
+
+class ErrorHandlersTestCase(tests_base.TestCase):
+    def setUp(self):
+        super(ErrorHandlersTestCase, self).setUp()
+        self.task = mock.Mock(spec=task_manager.TaskManager)
+        self.task.driver = mock.Mock(spec_set=['deploy'])
+        self.task.node = mock.Mock(spec_set=objects.Node)
+        self.node = self.task.node
+
+    @mock.patch.object(conductor_utils, 'LOG')
+    def test_provision_error_handler_no_worker(self, log_mock):
+        exc = exception.NoFreeConductorWorker()
+        conductor_utils.provisioning_error_handler(exc, self.node, 'state-one',
+                                                   'state-two')
+        self.node.save.assert_called_once_with()
+        self.assertEqual('state-one', self.node.provision_state)
+        self.assertEqual('state-two', self.node.target_provision_state)
+        self.assertIn('No free conductor workers', self.node.last_error)
+        self.assertTrue(log_mock.warning.called)
+
+    @mock.patch.object(conductor_utils, 'LOG')
+    def test_provision_error_handler_other_error(self, log_mock):
+        exc = Exception('foo')
+        conductor_utils.provisioning_error_handler(exc, self.node, 'state-one',
+                                                   'state-two')
+        self.assertFalse(self.node.save.called)
+        self.assertFalse(log_mock.warning.called)
+
+    def test_cleaning_error_handler(self):
+        self.node.provision_state = states.CLEANING
+        target = 'baz'
+        self.node.target_provision_state = target
+        msg = 'error bar'
+        conductor_utils.cleaning_error_handler(self.task, msg)
+        self.node.save.assert_called_once_with()
+        self.assertEqual({}, self.node.clean_step)
+        self.assertEqual(msg, self.node.last_error)
+        self.assertTrue(self.node.maintenance)
+        self.assertEqual(msg, self.node.maintenance_reason)
+        driver = self.task.driver.deploy
+        driver.tear_down_cleaning.assert_called_once_with(self.task)
+        self.task.process_event.assert_called_once_with('fail',
+                                                        target_state=None)
+
+    def test_cleaning_error_handler_manual(self):
+        target = states.MANAGEABLE
+        self.node.target_provision_state = target
+        conductor_utils.cleaning_error_handler(self.task, 'foo')
+        self.task.process_event.assert_called_once_with('fail',
+                                                        target_state=target)
+
+    def test_cleaning_error_handler_no_teardown(self):
+        target = states.MANAGEABLE
+        self.node.target_provision_state = target
+        conductor_utils.cleaning_error_handler(self.task, 'foo',
+                                               tear_down_cleaning=False)
+        self.assertFalse(self.task.driver.deploy.tear_down_cleaning.called)
+        self.task.process_event.assert_called_once_with('fail',
+                                                        target_state=target)
+
+    def test_cleaning_error_handler_no_fail(self):
+        conductor_utils.cleaning_error_handler(self.task, 'foo',
+                                               set_fail_state=False)
+        driver = self.task.driver.deploy
+        driver.tear_down_cleaning.assert_called_once_with(self.task)
+        self.assertFalse(self.task.process_event.called)
+
+    @mock.patch.object(conductor_utils, 'LOG')
+    def test_cleaning_error_handler_tear_down_error(self, log_mock):
+        driver = self.task.driver.deploy
+        driver.tear_down_cleaning.side_effect = Exception('bar')
+        conductor_utils.cleaning_error_handler(self.task, 'foo')
+        self.assertTrue(log_mock.exception.called)
+
+    @mock.patch.object(conductor_utils, 'LOG')
+    def test_spawn_cleaning_error_handler_no_worker(self, log_mock):
+        exc = exception.NoFreeConductorWorker()
+        conductor_utils.spawn_cleaning_error_handler(exc, self.node)
+        self.node.save.assert_called_once_with()
+        self.assertIn('No free conductor workers', self.node.last_error)
+        self.assertTrue(log_mock.warning.called)
+
+    @mock.patch.object(conductor_utils, 'LOG')
+    def test_spawn_cleaning_error_handler_other_error(self, log_mock):
+        exc = Exception('foo')
+        conductor_utils.spawn_cleaning_error_handler(exc, self.node)
+        self.assertFalse(self.node.save.called)
+        self.assertFalse(log_mock.warning.called)
+
+    @mock.patch.object(conductor_utils, 'LOG')
+    def test_power_state_error_handler_no_worker(self, log_mock):
+        exc = exception.NoFreeConductorWorker()
+        conductor_utils.power_state_error_handler(exc, self.node, 'newstate')
+        self.node.save.assert_called_once_with()
+        self.assertEqual('newstate', self.node.power_state)
+        self.assertEqual(states.NOSTATE, self.node.target_power_state)
+        self.assertIn('No free conductor workers', self.node.last_error)
+        self.assertTrue(log_mock.warning.called)
+
+    @mock.patch.object(conductor_utils, 'LOG')
+    def test_power_state_error_handler_other_error(self, log_mock):
+        exc = Exception('foo')
+        conductor_utils.power_state_error_handler(exc, self.node, 'foo')
+        self.assertFalse(self.node.save.called)
+        self.assertFalse(log_mock.warning.called)
diff --git a/ironic/tests/unit/drivers/modules/test_agent_base_vendor.py b/ironic/tests/unit/drivers/modules/test_agent_base_vendor.py
index 831e871b10..62994ef407 100644
--- a/ironic/tests/unit/drivers/modules/test_agent_base_vendor.py
+++ b/ironic/tests/unit/drivers/modules/test_agent_base_vendor.py
@@ -436,6 +436,35 @@ class TestBaseAgentVendor(db_base.DbTestCase):
             mock_handler.reset_mock()
             mock_continue.reset_mock()
 
+    @mock.patch.object(manager_utils, 'cleaning_error_handler')
+    @mock.patch.object(agent_base_vendor.BaseAgentVendor,
+                       'continue_cleaning', autospec=True)
+    def test_heartbeat_continue_cleaning_no_worker(self, mock_continue,
+                                                   mock_handler):
+        kwargs = {
+            'agent_url': 'http://127.0.0.1:9999/bar'
+        }
+        self.node.clean_step = {
+            'priority': 10,
+            'interface': 'deploy',
+            'step': 'foo',
+            'reboot_requested': False
+        }
+
+        mock_continue.side_effect = exception.NoFreeConductorWorker()
+
+        for state in (states.CLEANWAIT, states.CLEANING):
+            self.node.provision_state = state
+            self.node.save()
+            with task_manager.acquire(
+                    self.context, self.node.uuid, shared=True) as task:
+                self.passthru.heartbeat(task, **kwargs)
+
+            mock_continue.assert_called_once_with(mock.ANY, task, **kwargs)
+            self.assertFalse(mock_handler.called)
+            mock_handler.reset_mock()
+            mock_continue.reset_mock()
+
     @mock.patch.object(agent_base_vendor.BaseAgentVendor, 'continue_deploy',
                        autospec=True)
     @mock.patch.object(agent_base_vendor.BaseAgentVendor, 'reboot_to_instance',
diff --git a/releasenotes/notes/fix-cleaning-spawn-error-60b60281f3be51c2.yaml b/releasenotes/notes/fix-cleaning-spawn-error-60b60281f3be51c2.yaml
new file mode 100644
index 0000000000..153924c9ee
--- /dev/null
+++ b/releasenotes/notes/fix-cleaning-spawn-error-60b60281f3be51c2.yaml
@@ -0,0 +1,4 @@
+---
+fixes:
+  - Fix issues with error handling when spawning a new thread to continue
+    cleaning. See https://bugs.launchpad.net/ironic/+bug/1539118.