diff --git a/ironic/conductor/manager.py b/ironic/conductor/manager.py
index 7fcccec041..66013b99d6 100644
--- a/ironic/conductor/manager.py
+++ b/ironic/conductor/manager.py
@@ -861,7 +861,7 @@ class ConductorManager(base_manager.BaseConductorManager):
 
         """
         LOG.debug("RPC continue_node_deploy called for node %s.", node_id)
-        with task_manager.acquire(context, node_id, shared=False,
+        with task_manager.acquire(context, node_id, shared=False, patient=True,
                                   purpose='continue node deploying') as task:
             node = task.node
 
@@ -1118,7 +1118,7 @@ class ConductorManager(base_manager.BaseConductorManager):
         """
         LOG.debug("RPC continue_node_clean called for node %s.", node_id)
 
-        with task_manager.acquire(context, node_id, shared=False,
+        with task_manager.acquire(context, node_id, shared=False, patient=True,
                                   purpose='continue node cleaning') as task:
             node = task.node
             if node.target_provision_state == states.MANAGEABLE:
diff --git a/ironic/conductor/task_manager.py b/ironic/conductor/task_manager.py
index 7ed3eab3c4..8a337ee332 100644
--- a/ironic/conductor/task_manager.py
+++ b/ironic/conductor/task_manager.py
@@ -170,7 +170,7 @@ class TaskManager(object):
     """
 
     def __init__(self, context, node_id, shared=False,
-                 purpose='unspecified action', retry=True,
+                 purpose='unspecified action', retry=True, patient=False,
                  load_driver=True):
         """Create a new TaskManager.
 
@@ -185,6 +185,12 @@ class TaskManager(object):
                        lock. Default: False.
         :param purpose: human-readable purpose to put to debug logs.
         :param retry: whether to retry locking if it fails. Default: True.
+        :param patient: whether to indefinitely retry locking if it fails.
+                        Set this to True if there is an operation that does not
+                        have any fallback or built-in retry mechanism, such as
+                        finalizing a state transition during deploy/clean.
+                        The default retry behavior is to retry a configured
+                        number of times and then give up. Default: False.
         :param load_driver: whether to load the ``driver`` object. Set this to
                             False if loading the driver is undesired or
                             impossible.
@@ -203,6 +209,7 @@ class TaskManager(object):
         self.node_id = node_id
         self.shared = shared
         self._retry = retry
+        self._patient = patient
 
         self.fsm = states.machine.copy()
         self._purpose = purpose
@@ -260,7 +267,9 @@ class TaskManager(object):
     def _lock(self):
         self._debug_timer.restart()
 
-        if self._retry:
+        if self._patient:
+            attempts = None
+        elif self._retry:
             attempts = CONF.conductor.node_locked_retry_attempts
         else:
             attempts = 1
diff --git a/ironic/tests/unit/conductor/test_manager.py b/ironic/tests/unit/conductor/test_manager.py
index 648e86c981..93023484f7 100644
--- a/ironic/tests/unit/conductor/test_manager.py
+++ b/ironic/tests/unit/conductor/test_manager.py
@@ -1784,6 +1784,27 @@ class ContinueNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
                                       deployments.continue_node_deploy,
                                       mock.ANY)
 
+    @mock.patch('ironic.conductor.manager.ConductorManager._spawn_worker',
+                autospec=True)
+    def test_continue_node_deploy_locked(self, mock_spawn):
+        """Test that continuing a deploy via RPC cannot fail due to locks."""
+        max_attempts = 3
+        self.config(node_locked_retry_attempts=max_attempts, group='conductor')
+        prv_state = states.DEPLOYWAIT
+        tgt_prv_state = states.ACTIVE
+        node = obj_utils.create_test_node(self.context, driver='fake-hardware',
+                                          provision_state=prv_state,
+                                          target_provision_state=tgt_prv_state,
+                                          last_error=None,
+                                          deploy_step=self.deploy_steps[0])
+        self._start_service()
+        with mock.patch.object(objects.Node, 'reserve', autospec=True) as mck:
+            mck.side_effect = (
+                ([exception.NodeLocked(node='foo', host='foo')] * max_attempts)
+                + [node])
+            self.service.continue_node_deploy(self.context, node.uuid)
+        self._stop_service()
+
     @mock.patch.object(task_manager.TaskManager, 'process_event',
                        autospec=True)
     @mock.patch('ironic.conductor.manager.ConductorManager._spawn_worker',
@@ -2582,6 +2603,28 @@ class DoNodeCleanTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
     def test_continue_node_clean_manual_abort_last_clean_step(self):
         self._continue_node_clean_abort_last_clean_step(manual=True)
 
+    @mock.patch('ironic.conductor.manager.ConductorManager._spawn_worker',
+                autospec=True)
+    def test_continue_node_clean_locked(self, mock_spawn):
+        """Test that continuing a clean via RPC cannot fail due to locks."""
+        max_attempts = 3
+        self.config(node_locked_retry_attempts=max_attempts, group='conductor')
+        driver_info = {'clean_steps': [self.clean_steps[0]],
+                       'clean_step_index': 0}
+        tgt_prov_state = states.AVAILABLE
+        node = obj_utils.create_test_node(
+            self.context, driver='fake-hardware',
+            provision_state=states.CLEANWAIT,
+            target_provision_state=tgt_prov_state, last_error=None,
+            driver_internal_info=driver_info, clean_step=self.clean_steps[0])
+        self._start_service()
+        with mock.patch.object(objects.Node, 'reserve', autospec=True) as mck:
+            mck.side_effect = (
+                ([exception.NodeLocked(node='foo', host='foo')] * max_attempts)
+                + [node])
+            self.service.continue_node_clean(self.context, node.uuid)
+        self._stop_service()
+
 
 class DoNodeRescueTestCase(mgr_utils.CommonMixIn, mgr_utils.ServiceSetUpMixin,
                            db_base.DbTestCase):
diff --git a/ironic/tests/unit/conductor/test_task_manager.py b/ironic/tests/unit/conductor/test_task_manager.py
index e49982d593..c20d053e3e 100644
--- a/ironic/tests/unit/conductor/test_task_manager.py
+++ b/ironic/tests/unit/conductor/test_task_manager.py
@@ -222,6 +222,25 @@ class TaskManagerTestCase(db_base.DbTestCase):
         reserve_mock.assert_called_once_with(self.context, self.host,
                                              'fake-node-id')
 
+    def test_excl_lock_exception_patient(
+            self, get_voltgt_mock, get_volconn_mock, get_portgroups_mock,
+            get_ports_mock, build_driver_mock,
+            reserve_mock, release_mock, node_get_mock):
+        retry_attempts = 3
+        self.config(node_locked_retry_attempts=retry_attempts,
+                    group='conductor')
+
+        # Fail on the first 3 attempts, succeed on the fourth.
+        reserve_mock.side_effect = (
+            ([exception.NodeLocked(node='foo', host='foo')] * 3) + [self.node])
+
+        task_manager.TaskManager(self.context, 'fake-node-id', patient=True)
+
+        expected_calls = [mock.call(self.context, self.host,
+                                    'fake-node-id')] * 4
+        reserve_mock.assert_has_calls(expected_calls)
+        self.assertEqual(4, reserve_mock.call_count)
+
     def test_excl_lock_reserve_exception(
             self, get_voltgt_mock, get_volconn_mock, get_portgroups_mock,
             get_ports_mock, build_driver_mock,
diff --git a/releasenotes/notes/story-2008323-fix-stuck-deploying-state-43d51149a02c08b8.yaml b/releasenotes/notes/story-2008323-fix-stuck-deploying-state-43d51149a02c08b8.yaml
new file mode 100644
index 0000000000..a8f1abd5e4
--- /dev/null
+++ b/releasenotes/notes/story-2008323-fix-stuck-deploying-state-43d51149a02c08b8.yaml
@@ -0,0 +1,7 @@
+---
+fixes:
+  - |
+    Fixes a bug where a conductor could fail to complete a deployment if there
+    was contention on a shared lock. This would manifest as an instance being
+    stuck in the "deploying" state, though the node had in fact started or even
+    completed its final boot.