callback: disable threading by default

Threading is prone to database lock exceptions when using the default
database backend (sqlite) and so disable it by default.

Users can enable threading when using mysql or postgresql by specifying
the ARA_CALLBACK_THREADS environment variable or callback_threads in
ansible.cfg.

Fixes: https://github.com/ansible-community/ara/issues/195
Change-Id: I80edfd6268684722c2783e01323355d791c19b9d
This commit is contained in:
David Moreau Simard 2020-12-17 21:26:14 -05:00
parent a000a888a7
commit 4fe18c6492
No known key found for this signature in database
GPG Key ID: 7D4729EC4E64E8B7
5 changed files with 64 additions and 27 deletions

@ -160,6 +160,18 @@ options:
ini:
- section: ara
key: ignored_files
callback_threads:
description:
- The number of threads to use in API client thread pools
- When set to 0, no threading will be used (default) which is appropriate for usage with sqlite
- Using threads is recommended when the server is using MySQL or PostgreSQL
type: integer
default: 0
env:
- name: ARA_CALLBACK_THREADS
ini:
- section: ara
key: callback_threads
"""
@ -177,10 +189,7 @@ class CallbackModule(CallbackBase):
self.log = logging.getLogger("ara.plugins.callback.default")
# These are configured in self.set_options
self.client = None
self.thread_count = None
self.global_threads = None
# Need individual threads for tasks to ensure all results are saved before moving on to next task
self.task_threads = None
self.callback_threads = None
self.ignored_facts = []
self.ignored_arguments = []
@ -223,14 +232,26 @@ class CallbackModule(CallbackBase):
# TODO: Consider un-hardcoding this and plumbing pool_maxsize to requests.adapters.HTTPAdapter.
# In the meantime default to 4 so we don't go above requests.adapters.DEFAULT_POOLSIZE.
# Otherwise we can hit "urllib3.connectionpool: Connection pool is full"
# TODO: Using >= 2 threads with the offline client can result in execution getting locked up
self.thread_count = 1 if client == "offline" else 4
self.global_threads = ThreadPoolExecutor(max_workers=self.thread_count)
self.log.debug("working with %s thread(s)" % self.thread_count)
self.callback_threads = self.get_option("callback_threads")
if self.callback_threads > 4:
self.callback_threads = 4
def _submit_thread(self, threadpool, func, *args, **kwargs):
# Manages whether or not the function should be threaded to keep things DRY
if self.callback_threads:
# Pick from one of two thread pools (global or task)
threads = getattr(self, threadpool + "_threads")
threads.submit(func, *args, **kwargs)
else:
func(*args, **kwargs)
def v2_playbook_on_start(self, playbook):
self.log.debug("v2_playbook_on_start")
if self.callback_threads:
self.global_threads = ThreadPoolExecutor(max_workers=self.callback_threads)
self.log.debug("Global thread pool initialized with %s thread(s)" % self.callback_threads)
content = None
if playbook._file_name == "__adhoc_playbook__":
@ -275,7 +296,7 @@ class CallbackModule(CallbackBase):
)
# Record the playbook file
self.global_threads.submit(self._get_or_create_file, path, content)
self._submit_thread("global", self._get_or_create_file, path, content)
return self.playbook
@ -287,7 +308,7 @@ class CallbackModule(CallbackBase):
# Load variables to verify if there is anything relevant for ara
play_vars = play._variable_manager.get_vars(play=play)["vars"]
if "ara_playbook_name" in play_vars:
self._set_playbook_name(name=play_vars["ara_playbook_name"])
self._submit_thread("global", self._set_playbook_name, play_vars["ara_playbook_name"])
labels = self.default_labels + self.argument_labels
if "ara_playbook_labels" in play_vars:
@ -301,11 +322,11 @@ class CallbackModule(CallbackBase):
else:
raise TypeError("ara_playbook_labels must be a list or a comma-separated string")
if labels:
self._set_playbook_labels(labels=labels)
self._submit_thread("global", self._set_playbook_labels, labels)
# Record all the files involved in the play
for path in play._loader._FILE_CACHE.keys():
self.global_threads.submit(self._get_or_create_file, path)
self._submit_thread("global", self._get_or_create_file, path)
# Create the play
self.play = self.client.post(
@ -327,7 +348,10 @@ class CallbackModule(CallbackBase):
def v2_playbook_on_task_start(self, task, is_conditional, handler=False):
self.log.debug("v2_playbook_on_task_start")
self._end_task()
self.task_threads = ThreadPoolExecutor(max_workers=self.thread_count)
if self.callback_threads:
self.task_threads = ThreadPoolExecutor(max_workers=self.callback_threads)
self.log.debug("Task thread pool initialized with %s thread(s)" % self.callback_threads)
pathspec = task.get_path()
if pathspec:
@ -362,16 +386,16 @@ class CallbackModule(CallbackBase):
self.result_started[host.get_name()] = datetime.datetime.now(datetime.timezone.utc).isoformat()
def v2_runner_on_ok(self, result, **kwargs):
self.task_threads.submit(self._load_result, result, "ok", **kwargs)
self._submit_thread("task", self._load_result, result, "ok", **kwargs)
def v2_runner_on_unreachable(self, result, **kwargs):
self.task_threads.submit(self._load_result, result, "unreachable", **kwargs)
self._submit_thread("task", self._load_result, result, "unreachable", **kwargs)
def v2_runner_on_failed(self, result, **kwargs):
self.task_threads.submit(self._load_result, result, "failed", **kwargs)
self._submit_thread("task", self._load_result, result, "failed", **kwargs)
def v2_runner_on_skipped(self, result, **kwargs):
self.task_threads.submit(self._load_result, result, "skipped", **kwargs)
self._submit_thread("task", self._load_result, result, "skipped", **kwargs)
def v2_playbook_on_stats(self, stats):
self.log.debug("v2_playbook_on_stats")
@ -382,21 +406,24 @@ class CallbackModule(CallbackBase):
def _end_task(self):
if self.task is not None:
self.task_threads.submit(
self._submit_thread(
"task",
self.client.patch,
"/api/v1/tasks/%s" % self.task["id"],
status="completed",
ended=datetime.datetime.now(datetime.timezone.utc).isoformat(),
)
# Flush threads before moving on to next task to make sure all results are saved
self.log.debug("waiting for task threads...")
self.task_threads.shutdown(wait=True)
self.task_threads = None
if self.callback_threads:
# Flush threads before moving on to next task to make sure all results are saved
self.log.debug("waiting for task threads...")
self.task_threads.shutdown(wait=True)
self.task_threads = None
self.task = None
def _end_play(self):
if self.play is not None:
self.global_threads.submit(
self._submit_thread(
"global",
self.client.patch,
"/api/v1/plays/%s" % self.play["id"],
status="completed",
@ -411,20 +438,24 @@ class CallbackModule(CallbackBase):
else:
status = "completed"
self.global_threads.submit(
self._submit_thread(
"global",
self.client.patch,
"/api/v1/playbooks/%s" % self.playbook["id"],
status=status,
ended=datetime.datetime.now(datetime.timezone.utc).isoformat(),
)
self.log.debug("waiting for global threads...")
self.global_threads.shutdown(wait=True)
if self.callback_threads:
self.log.debug("waiting for global threads...")
self.global_threads.shutdown(wait=True)
def _set_playbook_name(self, name):
if self.playbook["name"] != name:
self.playbook = self.client.patch("/api/v1/playbooks/%s" % self.playbook["id"], name=name)
def _set_playbook_labels(self, labels):
# Only update labels if our cache doesn't match
current_labels = [label["name"] for label in self.playbook["labels"]]
if sorted(current_labels) != sorted(labels):
self.log.debug("Updating playbook labels to match: %s" % ",".join(labels))
@ -514,7 +545,8 @@ class CallbackModule(CallbackBase):
host = self._get_or_create_host(hostname)
host_stats = stats.summarize(hostname)
self.global_threads.submit(
self._submit_thread(
"global",
self.client.patch,
"/api/v1/hosts/%s" % host["id"],
changed=host_stats["changed"],

@ -29,6 +29,7 @@ For example, a customized callback plugin configuration might look like this in
api_username = user
api_password = password
api_timeout = 15
callback_threads = 4
argument_labels = check,tags,subset
default_labels = prod,deploy
ignored_facts = ansible_env,ansible_all_ipv4_addresses
@ -43,6 +44,7 @@ or as environment variables:
export ARA_API_USERNAME=user
export ARA_API_PASSWORD=password
export ARA_API_TIMEOUT=15
export ARA_CALLBACK_THREADS=4
export ARA_ARGUMENT_LABELS=check,tags,subset
export ARA_DEFAULT_LABELS=prod,deploy
export ARA_IGNORED_FACTS=ansible_env,ansible_all_ipv4_addresses

@ -110,6 +110,7 @@
[ara]
api_client = {{ ara_api_client | default('offline') }}
api_server = {{ ara_api_server | default('http://127.0.0.1') }}
callback_threads = {{ ara_callback_threads | default(0) }}
{% if _default_labels is defined %}
default_labels = {{ _default_labels | join(',') }}
{% endif %}

@ -1,4 +1,5 @@
ara_tests_cleanup: true
ara_callback_threads: 4
ara_api_root_dir: "{{ ansible_user_dir }}/.ara-tests"
ara_api_secret_key: testing
ara_api_debug: true

@ -1,4 +1,5 @@
ara_tests_cleanup: true
ara_callback_threads: 4
ara_api_root_dir: "{{ ansible_user_dir }}/.ara-tests"
ara_api_secret_key: testing
ara_api_debug: true