From 4fe18c649220b271864af251f0b23e54aea35ab3 Mon Sep 17 00:00:00 2001 From: David Moreau Simard Date: Thu, 17 Dec 2020 21:26:14 -0500 Subject: [PATCH] callback: disable threading by default Threading is prone to database lock exceptions when using the default database backend (sqlite) and so disable it by default. Users can enable threading when using mysql or postgresql by specifying the ARA_CALLBACK_THREADS environment variable or callback_threads in ansible.cfg. Fixes: https://github.com/ansible-community/ara/issues/195 Change-Id: I80edfd6268684722c2783e01323355d791c19b9d --- ara/plugins/callback/ara_default.py | 86 ++++++++++++++------ doc/source/ansible-plugins-and-use-cases.rst | 2 + tests/basic.yaml | 1 + tests/vars/mysql_tests.yaml | 1 + tests/vars/postgresql_tests.yaml | 1 + 5 files changed, 64 insertions(+), 27 deletions(-) diff --git a/ara/plugins/callback/ara_default.py b/ara/plugins/callback/ara_default.py index ea8a1988..bbbaf3c9 100644 --- a/ara/plugins/callback/ara_default.py +++ b/ara/plugins/callback/ara_default.py @@ -160,6 +160,18 @@ options: ini: - section: ara key: ignored_files + callback_threads: + description: + - The number of threads to use in API client thread pools + - When set to 0, no threading will be used (default) which is appropriate for usage with sqlite + - Using threads is recommended when the server is using MySQL or PostgreSQL + type: integer + default: 0 + env: + - name: ARA_CALLBACK_THREADS + ini: + - section: ara + key: callback_threads """ @@ -177,10 +189,7 @@ class CallbackModule(CallbackBase): self.log = logging.getLogger("ara.plugins.callback.default") # These are configured in self.set_options self.client = None - self.thread_count = None - self.global_threads = None - # Need individual threads for tasks to ensure all results are saved before moving on to next task - self.task_threads = None + self.callback_threads = None self.ignored_facts = [] self.ignored_arguments = [] @@ -223,14 +232,26 @@ class CallbackModule(CallbackBase): # TODO: Consider un-hardcoding this and plumbing pool_maxsize to requests.adapters.HTTPAdapter. # In the meantime default to 4 so we don't go above requests.adapters.DEFAULT_POOLSIZE. # Otherwise we can hit "urllib3.connectionpool: Connection pool is full" - # TODO: Using >= 2 threads with the offline client can result in execution getting locked up - self.thread_count = 1 if client == "offline" else 4 - self.global_threads = ThreadPoolExecutor(max_workers=self.thread_count) - self.log.debug("working with %s thread(s)" % self.thread_count) + self.callback_threads = self.get_option("callback_threads") + if self.callback_threads > 4: + self.callback_threads = 4 + + def _submit_thread(self, threadpool, func, *args, **kwargs): + # Manages whether or not the function should be threaded to keep things DRY + if self.callback_threads: + # Pick from one of two thread pools (global or task) + threads = getattr(self, threadpool + "_threads") + threads.submit(func, *args, **kwargs) + else: + func(*args, **kwargs) def v2_playbook_on_start(self, playbook): self.log.debug("v2_playbook_on_start") + if self.callback_threads: + self.global_threads = ThreadPoolExecutor(max_workers=self.callback_threads) + self.log.debug("Global thread pool initialized with %s thread(s)" % self.callback_threads) + content = None if playbook._file_name == "__adhoc_playbook__": @@ -275,7 +296,7 @@ class CallbackModule(CallbackBase): ) # Record the playbook file - self.global_threads.submit(self._get_or_create_file, path, content) + self._submit_thread("global", self._get_or_create_file, path, content) return self.playbook @@ -287,7 +308,7 @@ class CallbackModule(CallbackBase): # Load variables to verify if there is anything relevant for ara play_vars = play._variable_manager.get_vars(play=play)["vars"] if "ara_playbook_name" in play_vars: - self._set_playbook_name(name=play_vars["ara_playbook_name"]) + self._submit_thread("global", self._set_playbook_name, play_vars["ara_playbook_name"]) labels = self.default_labels + self.argument_labels if "ara_playbook_labels" in play_vars: @@ -301,11 +322,11 @@ class CallbackModule(CallbackBase): else: raise TypeError("ara_playbook_labels must be a list or a comma-separated string") if labels: - self._set_playbook_labels(labels=labels) + self._submit_thread("global", self._set_playbook_labels, labels) # Record all the files involved in the play for path in play._loader._FILE_CACHE.keys(): - self.global_threads.submit(self._get_or_create_file, path) + self._submit_thread("global", self._get_or_create_file, path) # Create the play self.play = self.client.post( @@ -327,7 +348,10 @@ class CallbackModule(CallbackBase): def v2_playbook_on_task_start(self, task, is_conditional, handler=False): self.log.debug("v2_playbook_on_task_start") self._end_task() - self.task_threads = ThreadPoolExecutor(max_workers=self.thread_count) + + if self.callback_threads: + self.task_threads = ThreadPoolExecutor(max_workers=self.callback_threads) + self.log.debug("Task thread pool initialized with %s thread(s)" % self.callback_threads) pathspec = task.get_path() if pathspec: @@ -362,16 +386,16 @@ class CallbackModule(CallbackBase): self.result_started[host.get_name()] = datetime.datetime.now(datetime.timezone.utc).isoformat() def v2_runner_on_ok(self, result, **kwargs): - self.task_threads.submit(self._load_result, result, "ok", **kwargs) + self._submit_thread("task", self._load_result, result, "ok", **kwargs) def v2_runner_on_unreachable(self, result, **kwargs): - self.task_threads.submit(self._load_result, result, "unreachable", **kwargs) + self._submit_thread("task", self._load_result, result, "unreachable", **kwargs) def v2_runner_on_failed(self, result, **kwargs): - self.task_threads.submit(self._load_result, result, "failed", **kwargs) + self._submit_thread("task", self._load_result, result, "failed", **kwargs) def v2_runner_on_skipped(self, result, **kwargs): - self.task_threads.submit(self._load_result, result, "skipped", **kwargs) + self._submit_thread("task", self._load_result, result, "skipped", **kwargs) def v2_playbook_on_stats(self, stats): self.log.debug("v2_playbook_on_stats") @@ -382,21 +406,24 @@ class CallbackModule(CallbackBase): def _end_task(self): if self.task is not None: - self.task_threads.submit( + self._submit_thread( + "task", self.client.patch, "/api/v1/tasks/%s" % self.task["id"], status="completed", ended=datetime.datetime.now(datetime.timezone.utc).isoformat(), ) - # Flush threads before moving on to next task to make sure all results are saved - self.log.debug("waiting for task threads...") - self.task_threads.shutdown(wait=True) - self.task_threads = None + if self.callback_threads: + # Flush threads before moving on to next task to make sure all results are saved + self.log.debug("waiting for task threads...") + self.task_threads.shutdown(wait=True) + self.task_threads = None self.task = None def _end_play(self): if self.play is not None: - self.global_threads.submit( + self._submit_thread( + "global", self.client.patch, "/api/v1/plays/%s" % self.play["id"], status="completed", @@ -411,20 +438,24 @@ class CallbackModule(CallbackBase): else: status = "completed" - self.global_threads.submit( + self._submit_thread( + "global", self.client.patch, "/api/v1/playbooks/%s" % self.playbook["id"], status=status, ended=datetime.datetime.now(datetime.timezone.utc).isoformat(), ) - self.log.debug("waiting for global threads...") - self.global_threads.shutdown(wait=True) + + if self.callback_threads: + self.log.debug("waiting for global threads...") + self.global_threads.shutdown(wait=True) def _set_playbook_name(self, name): if self.playbook["name"] != name: self.playbook = self.client.patch("/api/v1/playbooks/%s" % self.playbook["id"], name=name) def _set_playbook_labels(self, labels): + # Only update labels if our cache doesn't match current_labels = [label["name"] for label in self.playbook["labels"]] if sorted(current_labels) != sorted(labels): self.log.debug("Updating playbook labels to match: %s" % ",".join(labels)) @@ -514,7 +545,8 @@ class CallbackModule(CallbackBase): host = self._get_or_create_host(hostname) host_stats = stats.summarize(hostname) - self.global_threads.submit( + self._submit_thread( + "global", self.client.patch, "/api/v1/hosts/%s" % host["id"], changed=host_stats["changed"], diff --git a/doc/source/ansible-plugins-and-use-cases.rst b/doc/source/ansible-plugins-and-use-cases.rst index c145eb03..ee7f30bd 100644 --- a/doc/source/ansible-plugins-and-use-cases.rst +++ b/doc/source/ansible-plugins-and-use-cases.rst @@ -29,6 +29,7 @@ For example, a customized callback plugin configuration might look like this in api_username = user api_password = password api_timeout = 15 + callback_threads = 4 argument_labels = check,tags,subset default_labels = prod,deploy ignored_facts = ansible_env,ansible_all_ipv4_addresses @@ -43,6 +44,7 @@ or as environment variables: export ARA_API_USERNAME=user export ARA_API_PASSWORD=password export ARA_API_TIMEOUT=15 + export ARA_CALLBACK_THREADS=4 export ARA_ARGUMENT_LABELS=check,tags,subset export ARA_DEFAULT_LABELS=prod,deploy export ARA_IGNORED_FACTS=ansible_env,ansible_all_ipv4_addresses diff --git a/tests/basic.yaml b/tests/basic.yaml index e5daa368..ff98ff5a 100644 --- a/tests/basic.yaml +++ b/tests/basic.yaml @@ -110,6 +110,7 @@ [ara] api_client = {{ ara_api_client | default('offline') }} api_server = {{ ara_api_server | default('http://127.0.0.1') }} + callback_threads = {{ ara_callback_threads | default(0) }} {% if _default_labels is defined %} default_labels = {{ _default_labels | join(',') }} {% endif %} diff --git a/tests/vars/mysql_tests.yaml b/tests/vars/mysql_tests.yaml index 39752df8..0bad87f4 100644 --- a/tests/vars/mysql_tests.yaml +++ b/tests/vars/mysql_tests.yaml @@ -1,4 +1,5 @@ ara_tests_cleanup: true +ara_callback_threads: 4 ara_api_root_dir: "{{ ansible_user_dir }}/.ara-tests" ara_api_secret_key: testing ara_api_debug: true diff --git a/tests/vars/postgresql_tests.yaml b/tests/vars/postgresql_tests.yaml index d1983f1a..dc44bc31 100644 --- a/tests/vars/postgresql_tests.yaml +++ b/tests/vars/postgresql_tests.yaml @@ -1,4 +1,5 @@ ara_tests_cleanup: true +ara_callback_threads: 4 ara_api_root_dir: "{{ ansible_user_dir }}/.ara-tests" ara_api_secret_key: testing ara_api_debug: true