diff --git a/doc/source/admin/tuning.rst b/doc/source/admin/tuning.rst
index d0e772f29d..662f168826 100644
--- a/doc/source/admin/tuning.rst
+++ b/doc/source/admin/tuning.rst
@@ -149,6 +149,13 @@ using more than one CPU core.
    If you use JSON RPC, you also need to make sure the ports don't conflict by
    setting the :oslo.config:option:`json_rpc.port` option.
 
+Starting with the 2024.1 "Caracal" release cycle, a small proportion of the
+threads (specified by the
+:oslo.config:option:`conductor.reserved_workers_pool_percentage` option) is
+reserved for API requests and other critical tasks. Periodic tasks and agent
+heartbeats cannot use them. This ensures that the API stays responsive even
+under extreme internal load.
+
 .. _eventlet: https://eventlet.net/
 
 Database
diff --git a/ironic/conductor/base_manager.py b/ironic/conductor/base_manager.py
index c6f4b3273c..4cada13501 100644
--- a/ironic/conductor/base_manager.py
+++ b/ironic/conductor/base_manager.py
@@ -99,6 +99,29 @@ class BaseConductorManager(object):
         # clear all locks held by this conductor before registering
         self.dbapi.clear_node_reservations_for_conductor(self.host)
 
+    def _init_executors(self, total_workers, reserved_percentage):
+        # NOTE(dtantsur): do not allow queuing work. Given our model, it's
+        # better to reject an incoming request with HTTP 503 or reschedule
+        # a periodic task that end up with hidden backlog that is hard
+        # to track and debug. Using 1 instead of 0 because of how things are
+        # ordered in futurist (it checks for rejection first).
+        rejection_func = rejection.reject_when_reached(1)
+
+        reserved_workers = int(total_workers * reserved_percentage / 100)
+        remaining = total_workers - reserved_workers
+        LOG.info("Starting workers pool: %d normal workers + %d reserved",
+                 remaining, reserved_workers)
+
+        self._executor = futurist.GreenThreadPoolExecutor(
+            max_workers=remaining,
+            check_and_reject=rejection_func)
+        if reserved_workers:
+            self._reserved_executor = futurist.GreenThreadPoolExecutor(
+                max_workers=reserved_workers,
+                check_and_reject=rejection_func)
+        else:
+            self._reserved_executor = None
+
     def init_host(self, admin_context=None, start_consoles=True,
                   start_allocations=True):
         """Initialize the conductor host.
@@ -125,16 +148,8 @@ class BaseConductorManager(object):
         self._keepalive_evt = threading.Event()
         """Event for the keepalive thread."""
 
-        # NOTE(dtantsur): do not allow queuing work. Given our model, it's
-        # better to reject an incoming request with HTTP 503 or reschedule
-        # a periodic task that end up with hidden backlog that is hard
-        # to track and debug. Using 1 instead of 0 because of how things are
-        # ordered in futurist (it checks for rejection first).
-        rejection_func = rejection.reject_when_reached(1)
-        self._executor = futurist.GreenThreadPoolExecutor(
-            max_workers=CONF.conductor.workers_pool_size,
-            check_and_reject=rejection_func)
-        """Executor for performing tasks async."""
+        self._init_executors(CONF.conductor.workers_pool_size,
+                             CONF.conductor.reserved_workers_pool_percentage)
 
         # TODO(jroll) delete the use_groups argument and use the default
         # in Stein.
@@ -358,6 +373,8 @@ class BaseConductorManager(object):
         # having work complete normally.
         self._periodic_tasks.stop()
         self._periodic_tasks.wait()
+        if self._reserved_executor is not None:
+            self._reserved_executor.shutdown(wait=True)
         self._executor.shutdown(wait=True)
 
         if self._zeroconf is not None:
@@ -453,7 +470,8 @@ class BaseConductorManager(object):
             if self._mapped_to_this_conductor(*result[:3]):
                 yield result
 
-    def _spawn_worker(self, func, *args, **kwargs):
+    def _spawn_worker(self, func, *args, _allow_reserved_pool=True,
+                      **kwargs):
 
         """Create a greenthread to run func(*args, **kwargs).
 
@@ -466,6 +484,14 @@ class BaseConductorManager(object):
         """
         try:
             return self._executor.submit(func, *args, **kwargs)
+        except futurist.RejectedSubmission:
+            if not _allow_reserved_pool or self._reserved_executor is None:
+                raise exception.NoFreeConductorWorker()
+
+        LOG.debug('Normal workers pool is full, using reserved pool to run %s',
+                  func.__qualname__)
+        try:
+            return self._reserved_executor.submit(func, *args, **kwargs)
         except futurist.RejectedSubmission:
             raise exception.NoFreeConductorWorker()
 
diff --git a/ironic/conductor/manager.py b/ironic/conductor/manager.py
index 5efa3f66af..3e53f15353 100644
--- a/ironic/conductor/manager.py
+++ b/ironic/conductor/manager.py
@@ -3209,7 +3209,10 @@ class ConductorManager(base_manager.BaseConductorManager):
             task.spawn_after(
                 self._spawn_worker, task.driver.deploy.heartbeat,
                 task, callback_url, agent_version, agent_verify_ca,
-                agent_status, agent_status_message)
+                agent_status, agent_status_message,
+                # NOTE(dtantsur): heartbeats are not that critical to allow
+                # them to potentially overload the conductor.
+                _allow_reserved_pool=False)
 
     @METRICS.timer('ConductorManager.vif_list')
     @messaging.expected_exceptions(exception.NetworkError,
diff --git a/ironic/conf/conductor.py b/ironic/conf/conductor.py
index 3c6261536f..01f385ba6f 100644
--- a/ironic/conf/conductor.py
+++ b/ironic/conf/conductor.py
@@ -28,6 +28,12 @@ opts = [
                       'itself for handling heart beats and periodic tasks. '
                       'On top of that, `sync_power_state_workers` will take '
                       'up to 7 green threads with the default value of 8.')),
+    cfg.IntOpt('reserved_workers_pool_percentage',
+               default=5, min=0, max=50,
+               help=_('The percentage of the whole workers pool that will be '
+                      'kept for API requests and other important tasks. '
+                      'This part of the pool will not be used for periodic '
+                      'tasks or agent heartbeats. Set to 0 to disable.')),
     cfg.IntOpt('heartbeat_interval',
                default=10,
                help=_('Seconds between conductor heart beats.')),
diff --git a/ironic/tests/unit/conductor/test_base_manager.py b/ironic/tests/unit/conductor/test_base_manager.py
index b8b60b01ba..eae61eaf4a 100644
--- a/ironic/tests/unit/conductor/test_base_manager.py
+++ b/ironic/tests/unit/conductor/test_base_manager.py
@@ -402,20 +402,55 @@ class ManagerSpawnWorkerTestCase(tests_base.TestCase):
     def setUp(self):
         super(ManagerSpawnWorkerTestCase, self).setUp()
         self.service = manager.ConductorManager('hostname', 'test-topic')
-        self.executor = mock.Mock(spec=futurist.GreenThreadPoolExecutor)
-        self.service._executor = self.executor
+        self.service._executor = mock.Mock(
+            spec=futurist.GreenThreadPoolExecutor)
+        self.service._reserved_executor = mock.Mock(
+            spec=futurist.GreenThreadPoolExecutor)
+
+        self.func = lambda: None
 
     def test__spawn_worker(self):
-        self.service._spawn_worker('fake', 1, 2, foo='bar', cat='meow')
+        self.service._spawn_worker(self.func, 1, 2, foo='bar', cat='meow')
 
-        self.executor.submit.assert_called_once_with(
-            'fake', 1, 2, foo='bar', cat='meow')
+        self.service._executor.submit.assert_called_once_with(
+            self.func, 1, 2, foo='bar', cat='meow')
+        self.service._reserved_executor.submit.assert_not_called()
 
-    def test__spawn_worker_none_free(self):
-        self.executor.submit.side_effect = futurist.RejectedSubmission()
+    def test__spawn_worker_reserved(self):
+        self.service._executor.submit.side_effect = \
+            futurist.RejectedSubmission()
+
+        self.service._spawn_worker(self.func, 1, 2, foo='bar', cat='meow')
+
+        self.service._executor.submit.assert_called_once_with(
+            self.func, 1, 2, foo='bar', cat='meow')
+        self.service._reserved_executor.submit.assert_called_once_with(
+            self.func, 1, 2, foo='bar', cat='meow')
+
+    def test__spawn_worker_cannot_use_reserved(self):
+        self.service._executor.submit.side_effect = \
+            futurist.RejectedSubmission()
 
         self.assertRaises(exception.NoFreeConductorWorker,
-                          self.service._spawn_worker, 'fake')
+                          self.service._spawn_worker, self.func,
+                          _allow_reserved_pool=False)
+
+    def test__spawn_worker_no_reserved(self):
+        self.service._executor.submit.side_effect = \
+            futurist.RejectedSubmission()
+        self.service._reserved_executor = None
+
+        self.assertRaises(exception.NoFreeConductorWorker,
+                          self.service._spawn_worker, self.func)
+
+    def test__spawn_worker_none_free(self):
+        self.service._executor.submit.side_effect = \
+            futurist.RejectedSubmission()
+        self.service._reserved_executor.submit.side_effect = \
+            futurist.RejectedSubmission()
+
+        self.assertRaises(exception.NoFreeConductorWorker,
+                          self.service._spawn_worker, self.func)
 
 
 @mock.patch.object(objects.Conductor, 'unregister_all_hardware_interfaces',
diff --git a/ironic/tests/unit/conductor/test_manager.py b/ironic/tests/unit/conductor/test_manager.py
index c08137a4ab..4b7c66ba07 100644
--- a/ironic/tests/unit/conductor/test_manager.py
+++ b/ironic/tests/unit/conductor/test_manager.py
@@ -7609,6 +7609,15 @@ class DoNodeAdoptionTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
         self.assertEqual(states.NOSTATE, node.target_provision_state)
         self.assertIsNone(node.last_error)
 
+
+@mgr_utils.mock_record_keepalive
+class HeartbeatTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
+
+    def _fake_spawn(self, conductor_obj, func, *args, **kwargs):
+        self.assertFalse(kwargs.pop('_allow_reserved_pool'))
+        func(*args, **kwargs)
+        return mock.MagicMock()
+
     # TODO(TheJulia): We should double check if these heartbeat tests need
     # to move. I have this strange feeling we were lacking rpc testing of
     # heartbeat until we did adoption testing....
diff --git a/releasenotes/notes/reserved-workers-3cc0af8782b00fcc.yaml b/releasenotes/notes/reserved-workers-3cc0af8782b00fcc.yaml
new file mode 100644
index 0000000000..f1466e6b27
--- /dev/null
+++ b/releasenotes/notes/reserved-workers-3cc0af8782b00fcc.yaml
@@ -0,0 +1,6 @@
+---
+fixes:
+  - |
+    Each conductor now reserves a small proportion of its worker threads (5%
+    by default) for API requests and other critical tasks. This ensures that
+    the API stays responsive even under extreme internal load.