#!/usr/bin/python3 # # Copyright (c) 2023-2025 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # # # This script sets up bind mounts from the local filesystem # to the checked out to-release ostree filesystem so that # the data migration running under chroot has proper access # to the local filesystem data and configuration. # import logging as LOG import os import subprocess import sys import upgrade_utils class ChrootMounts: DEV_PATH="/dev" PLATFORM_PATH="/opt/platform" RABBIT_PATH="/var/lib/rabbitmq" POSTGRES_PATH="/var/lib/postgresql" KUBERNETES_PATH="/var/lib/kubernetes" PLATFORM_CONF_PATH="/etc/platform" TMP_PATH="/tmp" USR_PATH="/usr" ETC_PATH="/etc" PROC_PATH="/proc" LOG_PATH="/var/log" def __init__(self, checkout_dir): self._mount_points = { # src:dst os.path.normpath(self.DEV_PATH): os.path.normpath(f"{checkout_dir}/{self.DEV_PATH}"), os.path.normpath(self.PLATFORM_PATH): os.path.normpath(f"{checkout_dir}/{self.PLATFORM_PATH}"), os.path.normpath(self.RABBIT_PATH): os.path.normpath(f"{checkout_dir}/{self.RABBIT_PATH}"), os.path.normpath(self.POSTGRES_PATH): os.path.normpath(f"{checkout_dir}/{self.POSTGRES_PATH}"), os.path.normpath(self.KUBERNETES_PATH): os.path.normpath(f"{checkout_dir}/{self.KUBERNETES_PATH}"), os.path.normpath(self.PROC_PATH): os.path.normpath(f"{checkout_dir}/{self.PROC_PATH}"), os.path.normpath(self.LOG_PATH): os.path.normpath(f"{checkout_dir}/{self.LOG_PATH}"), os.path.normpath(self.PLATFORM_CONF_PATH): os.path.normpath(f"{self.TMP_PATH}/{self.PLATFORM_CONF_PATH}"), os.path.normpath(f"{checkout_dir}/{self.USR_PATH}/{self.ETC_PATH}"): os.path.normpath(f"{checkout_dir}/{self.ETC_PATH}"), } def mount(self): for src, dst in self._mount_points.items(): try: os.makedirs(dst, exist_ok=True) cmd = ["mount", "--bind", src, dst] subprocess.run(cmd, check=True, text=True, capture_output=True) except subprocess.CalledProcessError as e: LOG.error(f"Failed to bind mount {src} to {dst}: {str(e.stderr)}") self.umount() raise LOG.info(f"Bind mounted {src} -> {dst}") def umount(self): for _, dst in self._mount_points.items(): try: subprocess.run(["umount", "-l", dst], check=True, text=True, capture_output=True) except subprocess.CalledProcessError as e: # ignore messages that are not harmful if "not mounted" in e.stderr or "no mount point specified" in e.stderr: continue LOG.error(f"Failed to umount {dst}: {e.stderr}") raise LOG.info(f"Unmounted {dst}") def check(self): mounted = [] for _, dst in self._mount_points.items(): try: cmd = ["findmnt", dst] subprocess.run(cmd, check=True) mounted.append(dst) except subprocess.CalledProcessError: pass if len(mounted) > 0: mounted_message = f"Mounted mount points: {', '.join(mounted)}" print(mounted_message) LOG.error(mounted_message) raise OSError(mounted_message) def run(self, operation): try: if operation == "-m": self.mount() elif operation == "-c": self.check() elif operation == "-u": self.umount() except Exception: return 1 return 0 if __name__ == "__main__": upgrade_utils.configure_logging("/var/log/software.log", log_level=LOG.INFO) checkout_dir = None operation = None for arg in range(1, len(sys.argv)): if arg == 1: checkout_dir = sys.argv[arg] elif arg == 2: operation = sys.argv[arg] if checkout_dir is None or operation is None: usage_msg = (f"usage: {os.path.basename(sys.argv[0])} <-m|-c|-u>\n" f"-m: bind mounts the local directories into checkout directories\n" f"-c: check if there are existing bind mounts\n" f"-u: unmount the bind mounts") print(usage_msg) LOG.error(usage_msg) sys.exit(1) if not os.path.isdir(checkout_dir): error_msg = f"Invalid directory: {checkout_dir}" print(error_msg) LOG.error(error_msg) sys.exit(1) chroot_mounts = ChrootMounts(checkout_dir) sys.exit(chroot_mounts.run(operation))