Merge "Use threading instead of eventlet for engine queue"

This commit is contained in:
Zuul 2025-02-07 21:24:20 +00:00 committed by Gerrit Code Review
commit b6e80e1813

View File

@ -13,11 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import eventlet
import functools
from oslo_config import cfg
from oslo_log import log as logging
from osprofiler import profiler
import threading
from mistral import context
from mistral.db import utils as db_utils
@ -39,8 +39,8 @@ _THREAD_LOCAL_NAME = "__operation_queue_thread_local"
def _prepare():
# Register two queues: transactional and non transactional operations.
utils.set_thread_local(_THREAD_LOCAL_NAME, (list(), list()))
# Register queue for both transactional and non transactional operations.
utils.set_thread_local(_THREAD_LOCAL_NAME, list())
def _clear():
@ -48,22 +48,25 @@ def _clear():
def register_operation(func, args=None, in_tx=False):
"""Register an operation."""
"""Register an operation.
_get_queues()[0 if in_tx else 1].append((func, args or []))
An operation can be transactional (in_tx=True) or not.
"""
_get_queue().append((func, args or [], in_tx))
def _get_queues():
queues = utils.get_thread_local(_THREAD_LOCAL_NAME)
def _get_queue():
queue = utils.get_thread_local(_THREAD_LOCAL_NAME)
if queues is None:
if queue is None:
raise RuntimeError(
'Operation queue is not initialized for the current thread.'
' Most likely some engine method is not decorated with'
' operation_queue.run()'
)
return queues
return queue
def run(func):
@ -71,6 +74,7 @@ def run(func):
Various engine methods may register such operations. All such methods must
be decorated with this decorator.
Take a look at default_engine methods with @post_tx_queue.run
"""
@functools.wraps(func)
def decorate(*args, **kw):
@ -79,12 +83,9 @@ def run(func):
try:
res = func(*args, **kw)
queues = _get_queues()
queue = _get_queue()
tx_queue = queues[0]
non_tx_queue = queues[1]
if not tx_queue and not non_tx_queue:
if not queue:
return res
auth_ctx = context.ctx() if context.has_ctx() else None
@ -99,15 +100,12 @@ def run(func):
context.set_ctx(auth_ctx)
try:
if tx_queue:
_process_tx_queue(tx_queue)
if non_tx_queue:
_process_non_tx_queue(non_tx_queue)
_process_queue(queue)
finally:
context.set_ctx(old_auth_ctx)
eventlet.spawn(_within_new_thread)
t = threading.Thread(target=_within_new_thread)
t.start()
finally:
_clear()
@ -118,20 +116,26 @@ def run(func):
@db_utils.retry_on_db_error
@run
def _process_tx_queue(queue):
with db_api.transaction():
for func, args in queue:
def _process_queue(queue):
"""Run the functions from the queue
Note that this function is also decorated with @run because we may
register new operations (i.e. calling again register_operation()) from
one of the function currently being processed.
"""
for func, args, in_tx in queue:
if in_tx:
with db_api.transaction():
try:
func(*args)
except Exception:
LOG.exception(
"Failed to run transactional engine operation.")
raise
else:
try:
func(*args)
except Exception:
LOG.exception("Failed to run transactional engine operation.")
raise
def _process_non_tx_queue(queue):
for func, args in queue:
try:
func(*args)
except Exception:
LOG.exception("Failed to run non-transactional engine operation.")
LOG.exception(
"Failed to run non-transactional engine operation.")