Software upload: create pxeboot versioned dir
Details: This commit is to create pxeboot versioned directory. It combines the lengacy functionalities of import.sh and upgrade-start-pkg-extract scripts that setup to-release pxeboot files. Related to: https://review.opendev.org/c/starlingx/tools/+/902166 Test Plan: PASS: software upload and pxeboot files are in place Task: 48999 Story: 2010676 Change-Id: Ie92d00985fec78c01763775e33902246b1f2911c Signed-off-by: junfeng-li <junfeng.li@windriver.com>
This commit is contained in:
parent
d131e639c3
commit
bd2e03d5a7
@ -74,4 +74,6 @@ override_dh_install:
|
|||||||
${ROOT}/etc/software/ostree_mounts.yaml
|
${ROOT}/etc/software/ostree_mounts.yaml
|
||||||
install -m 755 scripts/create_postgresql_database.sh \
|
install -m 755 scripts/create_postgresql_database.sh \
|
||||||
${ROOT}/usr/sbin/create_postgresql_database.sh
|
${ROOT}/usr/sbin/create_postgresql_database.sh
|
||||||
|
install -m 755 scripts/usm_load_import \
|
||||||
|
${ROOT}/usr/sbin/usm_load_import
|
||||||
dh_install
|
dh_install
|
||||||
|
177
software/scripts/usm_load_import
Normal file
177
software/scripts/usm_load_import
Normal file
@ -0,0 +1,177 @@
|
|||||||
|
#!/usr/bin/python3
|
||||||
|
# -*- encoding: utf-8 -*-
|
||||||
|
#
|
||||||
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||||
|
#
|
||||||
|
# Copyright (c) 2023 Wind River Systems, Inc.
|
||||||
|
#
|
||||||
|
# SPDX-License-Identifier: Apache-2.0
|
||||||
|
#
|
||||||
|
|
||||||
|
"""
|
||||||
|
This script is run during 'software upload' command.
|
||||||
|
It is used to copy the required files from uploaded iso image
|
||||||
|
to the controller.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import glob
|
||||||
|
import logging as LOG
|
||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
import sys
|
||||||
|
|
||||||
|
|
||||||
|
AVAILABLE_DIR = "/opt/software/metadata/available"
|
||||||
|
FEED_OSTREE_BASE_DIR = "/var/www/pages/feed"
|
||||||
|
RELEASE_GA_NAME = "starlingx-%s.0"
|
||||||
|
SOFTWARE_STORAGE_DIR = "/opt/software"
|
||||||
|
TMP_DIR = "/tmp"
|
||||||
|
VAR_PXEBOOT_DIR = "/var/pxeboot"
|
||||||
|
|
||||||
|
|
||||||
|
def load_import(from_release, to_release, iso_mount_dir):
|
||||||
|
"""
|
||||||
|
Import the iso files to the feed and pxeboot directories
|
||||||
|
:param to_release: to release version
|
||||||
|
:param release_data: ReleaseData object
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Copy the iso file to /var/www/pages/feed/rel-<release>
|
||||||
|
os.makedirs(FEED_OSTREE_BASE_DIR, exist_ok=True)
|
||||||
|
to_release_feed_dir = os.path.join(FEED_OSTREE_BASE_DIR, ("rel-%s" % to_release))
|
||||||
|
os.makedirs(to_release_feed_dir, exist_ok=True)
|
||||||
|
|
||||||
|
feed_contents = ["install_uuid", "efi.img", "kickstart",
|
||||||
|
"ostree_repo", "pxeboot", "upgrades"]
|
||||||
|
for content in feed_contents:
|
||||||
|
src_abs_path = os.path.join(iso_mount_dir, content)
|
||||||
|
if os.path.isfile(src_abs_path):
|
||||||
|
shutil.copyfile(src_abs_path, os.path.join(to_release_feed_dir, content))
|
||||||
|
LOG.info("Copied %s to %s", src_abs_path, to_release_feed_dir)
|
||||||
|
elif os.path.isdir(src_abs_path):
|
||||||
|
shutil.copytree(src_abs_path, os.path.join(to_release_feed_dir, content))
|
||||||
|
LOG.info("Copied %s to %s", src_abs_path, to_release_feed_dir)
|
||||||
|
|
||||||
|
# Copy install_uuid to /var/www/pages/feed/rel-<release>
|
||||||
|
from_release_feed_dir = os.path.join(FEED_OSTREE_BASE_DIR, ("rel-%s" % from_release))
|
||||||
|
shutil.copyfile(os.path.join(from_release_feed_dir, "install_uuid"),
|
||||||
|
os.path.join(to_release_feed_dir, "install_uuid"))
|
||||||
|
LOG.info("Copied install_uuid to %s", to_release_feed_dir)
|
||||||
|
|
||||||
|
# Copy pxeboot-update-${from_release}.sh to from-release feed /upgrades
|
||||||
|
from_release_iso_upgrades_dir = os.path.join(from_release_feed_dir, "upgrades")
|
||||||
|
os.makedirs(from_release_iso_upgrades_dir, exist_ok=True)
|
||||||
|
shutil.copyfile(os.path.join("/etc", "pxeboot-update-%s.sh" % from_release),
|
||||||
|
os.path.join(from_release_iso_upgrades_dir, "pxeboot-update-%s.sh" % from_release))
|
||||||
|
LOG.info("Copied pxeboot-update-%s.sh to %s", from_release, from_release_iso_upgrades_dir)
|
||||||
|
|
||||||
|
# Copy pxelinux.cfg.files to from-release feed /pxeboot
|
||||||
|
from_release_feed_pxeboot_dir = os.path.join(from_release_feed_dir, "pxeboot")
|
||||||
|
os.makedirs(from_release_feed_pxeboot_dir, exist_ok=True)
|
||||||
|
|
||||||
|
# Find from-release pxelinux.cfg.files
|
||||||
|
pxe_dir = os.path.join(VAR_PXEBOOT_DIR, "pxelinux.cfg.files")
|
||||||
|
from_release_pxe_files = glob.glob(os.path.join(pxe_dir, '*' + from_release))
|
||||||
|
for from_release_pxe_file in from_release_pxe_files:
|
||||||
|
if os.path.isfile(from_release_pxe_file):
|
||||||
|
shutil.copyfile(from_release_pxe_file, os.path.join(from_release_feed_pxeboot_dir,
|
||||||
|
os.path.basename(from_release_pxe_file)))
|
||||||
|
LOG.info("Copied %s to %s", from_release_pxe_file, from_release_feed_pxeboot_dir)
|
||||||
|
|
||||||
|
# Converted from upgrade package extraction script
|
||||||
|
shutil.copyfile(os.path.join(to_release_feed_dir, "kickstart", "kickstart.cfg"),
|
||||||
|
os.path.join(to_release_feed_dir, "kickstart.cfg"))
|
||||||
|
|
||||||
|
# Copy bzImage and initrd
|
||||||
|
bzimage_files = glob.glob(os.path.join(to_release_feed_dir, 'pxeboot', 'bzImage*'))
|
||||||
|
for bzimage_file in bzimage_files:
|
||||||
|
if os.path.isfile(bzimage_file):
|
||||||
|
shutil.copyfile(bzimage_file, os.path.join(VAR_PXEBOOT_DIR,
|
||||||
|
os.path.basename(bzimage_file)))
|
||||||
|
LOG.info("Copied %s to %s", bzimage_file, VAR_PXEBOOT_DIR)
|
||||||
|
|
||||||
|
initrd_files = glob.glob(os.path.join(to_release_feed_dir, 'pxeboot', 'initrd*'))
|
||||||
|
for initrd_file in initrd_files:
|
||||||
|
if os.path.isfile(initrd_file):
|
||||||
|
shutil.copyfile(initrd_file, os.path.join(VAR_PXEBOOT_DIR,
|
||||||
|
os.path.basename(initrd_file)))
|
||||||
|
LOG.info("Copied %s to %s", initrd_file, VAR_PXEBOOT_DIR)
|
||||||
|
|
||||||
|
# Copy to_release_feed/pxelinux.cfg.files to /var/pxeboot/pxelinux.cfg.files
|
||||||
|
pxeboot_cfg_files = glob.glob(os.path.join(to_release_feed_dir, 'pxeboot', 'pxelinux.cfg.files',
|
||||||
|
'*' + from_release))
|
||||||
|
for pxeboot_cfg_file in pxeboot_cfg_files:
|
||||||
|
if os.path.isfile(pxeboot_cfg_file):
|
||||||
|
shutil.copyfile(pxeboot_cfg_file, os.path.join(VAR_PXEBOOT_DIR,
|
||||||
|
'pxelinux.cfg.files',
|
||||||
|
os.path.basename(pxeboot_cfg_file)))
|
||||||
|
LOG.info("Copied %s to %s", pxeboot_cfg_file, VAR_PXEBOOT_DIR)
|
||||||
|
|
||||||
|
# Copy pxeboot-update.sh to /etc
|
||||||
|
pxeboot_update_filename = "pxeboot-update-%s.sh" % to_release
|
||||||
|
shutil.copyfile(os.path.join(to_release_feed_dir, "upgrades", pxeboot_update_filename),
|
||||||
|
os.path.join("/etc", pxeboot_update_filename))
|
||||||
|
LOG.info("Copied pxeboot-update-%s.sh to %s", to_release, "/etc")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
LOG.exception("Load import failed. Error: %s" % str(e))
|
||||||
|
shutil.rmtree(to_release_feed_dir)
|
||||||
|
LOG.info("Removed %s", to_release_feed_dir)
|
||||||
|
raise
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Copy metadata.xml to /opt/software/metadata/available
|
||||||
|
os.makedirs(AVAILABLE_DIR, exist_ok=True)
|
||||||
|
metadata_name = f"{RELEASE_GA_NAME % to_release}-metadata.xml"
|
||||||
|
LOG.info("metadata name: %s", metadata_name)
|
||||||
|
abs_stx_release_metadata_file = os.path.join(iso_mount_dir,
|
||||||
|
'patches',
|
||||||
|
metadata_name)
|
||||||
|
|
||||||
|
# Copy stx release metadata.xml to available metadata dir
|
||||||
|
# TODO(jli14): prepatched iso will have more than one metadata file.
|
||||||
|
shutil.copyfile(abs_stx_release_metadata_file,
|
||||||
|
os.path.join(AVAILABLE_DIR, metadata_name))
|
||||||
|
LOG.info("Copied %s to %s", abs_stx_release_metadata_file, AVAILABLE_DIR)
|
||||||
|
except shutil.Error:
|
||||||
|
LOG.exception("Failed to copy the release %s metadata file to %s" % (to_release, AVAILABLE_DIR))
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description="Import files from uploaded iso image to controller.",
|
||||||
|
epilog="Use %(prog)s -h for help.",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--from-release",
|
||||||
|
required=True,
|
||||||
|
help="The from-release version.",
|
||||||
|
)
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
"--to-release",
|
||||||
|
required=True,
|
||||||
|
help="The to-release version.",
|
||||||
|
)
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
"--iso-dir",
|
||||||
|
required=True,
|
||||||
|
help="The mounted iso image directory.",
|
||||||
|
)
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
try:
|
||||||
|
load_import(args.from_release, args.to_release, args.iso_dir)
|
||||||
|
except Exception as e:
|
||||||
|
LOG.exception(e)
|
||||||
|
return 1
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
LOG.basicConfig(filename='/var/log/usm-load-import.log', level=LOG.INFO)
|
||||||
|
sys.exit(main())
|
@ -101,10 +101,7 @@ PATCH_EXTENSION = ".patch"
|
|||||||
SUPPORTED_UPLOAD_FILE_EXT = [ISO_EXTENSION, SIG_EXTENSION, PATCH_EXTENSION]
|
SUPPORTED_UPLOAD_FILE_EXT = [ISO_EXTENSION, SIG_EXTENSION, PATCH_EXTENSION]
|
||||||
SCRATCH_DIR = "/scratch"
|
SCRATCH_DIR = "/scratch"
|
||||||
RELEASE_GA_NAME = "starlingx-%s.0"
|
RELEASE_GA_NAME = "starlingx-%s.0"
|
||||||
|
LOCAL_LOAD_IMPORT_FILE = "/etc/software/usm_load_import"
|
||||||
CONTROLLER_HOSTNAME = 'controller'
|
|
||||||
CONTROLLER_0_HOSTNAME = '%s-0' % CONTROLLER_HOSTNAME
|
|
||||||
CONTROLLER_1_HOSTNAME = '%s-1' % CONTROLLER_HOSTNAME
|
|
||||||
|
|
||||||
# Precheck constants
|
# Precheck constants
|
||||||
LICENSE_FILE = "/etc/platform/.license"
|
LICENSE_FILE = "/etc/platform/.license"
|
||||||
|
@ -975,20 +975,27 @@ class PatchController(PatchService):
|
|||||||
return dict(info=msg_info, warning=msg_warning, error=msg_error)
|
return dict(info=msg_info, warning=msg_warning, error=msg_error)
|
||||||
|
|
||||||
def _process_upload_upgrade_files(self, upgrade_files, release_data):
|
def _process_upload_upgrade_files(self, upgrade_files, release_data):
|
||||||
'''
|
"""
|
||||||
Process the uploaded upgrade files
|
Process the uploaded upgrade files
|
||||||
:param upgrade_files: dict of upgrade files
|
:param upgrade_files: dict of upgrade files
|
||||||
:param release_data: ReleaseData object
|
:param release_data: ReleaseData object
|
||||||
:return: info, warning, error messages
|
:return: info, warning, error messages
|
||||||
'''
|
"""
|
||||||
local_info = ""
|
local_info = ""
|
||||||
local_warning = ""
|
local_warning = ""
|
||||||
local_error = ""
|
local_error = ""
|
||||||
|
|
||||||
|
iso_mount_dir = None
|
||||||
try:
|
try:
|
||||||
if not verify_files([upgrade_files[constants.ISO_EXTENSION]],
|
if not verify_files([upgrade_files[constants.ISO_EXTENSION]],
|
||||||
upgrade_files[constants.SIG_EXTENSION]):
|
upgrade_files[constants.SIG_EXTENSION]):
|
||||||
raise ReleaseValidationFailure("Invalid signature file")
|
raise ReleaseValidationFailure("Invalid signature file")
|
||||||
|
|
||||||
|
msg = ("iso and signature files uploaded completed\n"
|
||||||
|
"Importing iso is in progress\n")
|
||||||
|
LOG.info(msg)
|
||||||
|
local_info += msg
|
||||||
|
|
||||||
iso_file = upgrade_files.get(constants.ISO_EXTENSION)
|
iso_file = upgrade_files.get(constants.ISO_EXTENSION)
|
||||||
|
|
||||||
# Mount the iso file after signature verification
|
# Mount the iso file after signature verification
|
||||||
@ -1004,49 +1011,46 @@ class PatchController(PatchService):
|
|||||||
raise UpgradeNotSupported("Current release %s not supported to upgrade to %s"
|
raise UpgradeNotSupported("Current release %s not supported to upgrade to %s"
|
||||||
% (SW_VERSION, to_release))
|
% (SW_VERSION, to_release))
|
||||||
|
|
||||||
# After successful validation, copy metadata.xml to /opt/software/metadata/available
|
# Run /etc/software/usm-load-import script
|
||||||
os.makedirs(constants.AVAILABLE_DIR, exist_ok=True)
|
LOG.info("Start load importing from %s", iso_file)
|
||||||
to_release_name = constants.RELEASE_GA_NAME % to_release
|
shutil.copyfile(os.path.join(iso_mount_dir, 'upgrades', 'usm_load_import'),
|
||||||
stx_release_metadata_file = "%s-metadata.xml" % to_release_name
|
constants.LOCAL_LOAD_IMPORT_FILE)
|
||||||
abs_stx_release_metadata_file = os.path.join(iso_mount_dir, 'upgrades',
|
os.chmod(constants.LOCAL_LOAD_IMPORT_FILE, 0o755)
|
||||||
stx_release_metadata_file)
|
load_import_cmd = [constants.LOCAL_LOAD_IMPORT_FILE,
|
||||||
|
"--from-release=%s" % SW_VERSION,
|
||||||
# Copy stx release metadata.xml to available metadata dir
|
"--to-release=%s" % to_release,
|
||||||
# TODO(heitormatsui): treat the prepatched iso scenario
|
"--iso-dir=%s" % iso_mount_dir]
|
||||||
shutil.copyfile(abs_stx_release_metadata_file,
|
LOG.info("Running load import command: %s", " ".join(load_import_cmd))
|
||||||
os.path.join(constants.AVAILABLE_DIR, stx_release_metadata_file))
|
load_import_return = subprocess.run(load_import_cmd,
|
||||||
LOG.info("Copied %s to %s", abs_stx_release_metadata_file, constants.AVAILABLE_DIR)
|
stdout=subprocess.PIPE,
|
||||||
|
stderr=subprocess.STDOUT,
|
||||||
# Copy the iso file to /var/www/pages/feed/rel-<release>
|
check=False,
|
||||||
os.makedirs(constants.FEED_OSTREE_BASE_DIR, exist_ok=True)
|
text=True)
|
||||||
to_release_iso_dir = os.path.join(constants.FEED_OSTREE_BASE_DIR, ("rel-%s" % to_release))
|
if load_import_return.returncode != 0:
|
||||||
shutil.copytree(iso_mount_dir, to_release_iso_dir)
|
local_error += load_import_return.stdout
|
||||||
LOG.info("Copied iso file %s to %s", iso_file, to_release_iso_dir)
|
else:
|
||||||
|
local_info += load_import_return.stdout
|
||||||
|
|
||||||
# Update the release metadata
|
# Update the release metadata
|
||||||
|
abs_stx_release_metadata_file = os.path.join(iso_mount_dir,
|
||||||
|
'upgrades',
|
||||||
|
f"{constants.RELEASE_GA_NAME % to_release}-metadata.xml")
|
||||||
release_data.parse_metadata(abs_stx_release_metadata_file, state=constants.AVAILABLE)
|
release_data.parse_metadata(abs_stx_release_metadata_file, state=constants.AVAILABLE)
|
||||||
LOG.info("Updated release metadata for %s", to_release)
|
LOG.info("Updated release metadata for %s", to_release)
|
||||||
|
|
||||||
# Unmount the iso file.
|
|
||||||
unmount_iso_load(iso_mount_dir)
|
|
||||||
|
|
||||||
except ReleaseValidationFailure:
|
except ReleaseValidationFailure:
|
||||||
msg = "Upgrade file signature verification failed"
|
msg = "Upgrade file signature verification failed"
|
||||||
LOG.exception(msg)
|
LOG.exception(msg)
|
||||||
local_error += msg + "\n"
|
local_error += msg + "\n"
|
||||||
except UpgradeNotSupported:
|
|
||||||
msg = "Upgrade is not supported for current release %s" % SW_VERSION
|
|
||||||
LOG.exception(msg)
|
|
||||||
local_error += msg + "\n"
|
|
||||||
except shutil.Error:
|
|
||||||
msg = "Failed to copy the release %s metadata file to %s" % (to_release, constants.AVAILABLE_DIR)
|
|
||||||
LOG.exception(msg)
|
|
||||||
local_error += msg + "\n"
|
|
||||||
raise SoftwareError(msg)
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
msg = "Failed to process upgrade files. Error: %s" % str(e)
|
msg = "Failed to process upgrade files. Error: %s" % str(e)
|
||||||
LOG.exception(msg)
|
LOG.exception(msg)
|
||||||
local_error += msg + "\n"
|
local_error += msg + "\n"
|
||||||
|
finally:
|
||||||
|
# Unmount the iso file
|
||||||
|
if iso_mount_dir:
|
||||||
|
unmount_iso_load(iso_mount_dir)
|
||||||
|
LOG.info("Unmounted iso file %s", iso_file)
|
||||||
|
|
||||||
return local_info, local_warning, local_error
|
return local_info, local_warning, local_error
|
||||||
|
|
||||||
|
@ -3,14 +3,13 @@
|
|||||||
#
|
#
|
||||||
# Copyright (c) 2023 Wind River Systems, Inc.
|
# Copyright (c) 2023 Wind River Systems, Inc.
|
||||||
#
|
#
|
||||||
|
from software.software_controller import PatchController
|
||||||
|
from software.software_controller import ReleaseValidationFailure
|
||||||
import unittest
|
import unittest
|
||||||
from unittest.mock import MagicMock
|
from unittest.mock import MagicMock
|
||||||
from unittest.mock import patch
|
from unittest.mock import patch
|
||||||
from unittest.mock import call
|
|
||||||
from software import constants
|
from software import constants
|
||||||
|
|
||||||
from software.software_controller import PatchController
|
|
||||||
|
|
||||||
|
|
||||||
class TestSoftwareController(unittest.TestCase):
|
class TestSoftwareController(unittest.TestCase):
|
||||||
|
|
||||||
@ -26,115 +25,92 @@ class TestSoftwareController(unittest.TestCase):
|
|||||||
@patch('software.software_controller.PatchController.__init__', return_value=None)
|
@patch('software.software_controller.PatchController.__init__', return_value=None)
|
||||||
@patch('software.software_controller.verify_files')
|
@patch('software.software_controller.verify_files')
|
||||||
@patch('software.software_controller.mount_iso_load')
|
@patch('software.software_controller.mount_iso_load')
|
||||||
|
@patch('software.software_controller.shutil.copyfile')
|
||||||
|
@patch('software.software_controller.os.chmod')
|
||||||
@patch('software.software_controller.read_upgrade_metadata')
|
@patch('software.software_controller.read_upgrade_metadata')
|
||||||
@patch('software.software_functions.shutil.copyfile')
|
@patch('software.software_controller.subprocess.run')
|
||||||
@patch('os.makedirs')
|
|
||||||
@patch('software.software_functions.shutil.copytree')
|
|
||||||
@patch('software.software_controller.unmount_iso_load')
|
@patch('software.software_controller.unmount_iso_load')
|
||||||
def test_process_upload_upgrade_files(self,
|
def test_process_upload_upgrade_files(self,
|
||||||
mock_unmount_iso_load, # pylint: disable=unused-argument
|
mock_unmount_iso_load,
|
||||||
mock_copytree, # pylint: disable=unused-argument
|
mock_run,
|
||||||
mock_makedirs, # pylint: disable=unused-argument
|
|
||||||
mock_copyfile, # pylint: disable=unused-argument
|
|
||||||
mock_read_upgrade_metadata,
|
mock_read_upgrade_metadata,
|
||||||
|
mock_chmod, # pylint: disable=unused-argument
|
||||||
|
mock_copyfile, # pylint: disable=unused-argument
|
||||||
mock_mount_iso_load,
|
mock_mount_iso_load,
|
||||||
mock_verify_files,
|
mock_verify_files,
|
||||||
mock_init # pylint: disable=unused-argument
|
mock_init): # pylint: disable=unused-argument
|
||||||
):
|
|
||||||
controller = PatchController()
|
controller = PatchController()
|
||||||
controller.release_data = MagicMock()
|
controller.release_data = MagicMock()
|
||||||
controller.base_pkgdata = MagicMock()
|
|
||||||
|
|
||||||
# Mock the return values of the mocked functions
|
# Mock the return values of the mocked functions
|
||||||
mock_verify_files.return_value = True
|
mock_verify_files.return_value = True
|
||||||
mock_mount_iso_load.return_value = '/mnt/iso'
|
mock_mount_iso_load.return_value = '/test/iso'
|
||||||
mock_read_upgrade_metadata.return_value = ('2.0', [{'version': '1.0'}])
|
mock_read_upgrade_metadata.return_value = ('2.0', [{'version': '1.0'}, {'version': '2.0'}])
|
||||||
|
mock_run.return_value.returncode = 0
|
||||||
# Create a mock ReleaseData object
|
mock_run.return_value.stdout = 'Load import successful'
|
||||||
release_data = MagicMock()
|
|
||||||
|
|
||||||
# Call the function being tested
|
# Call the function being tested
|
||||||
with patch("software.software_controller.SW_VERSION", "1.0"):
|
with patch('software.software_controller.SW_VERSION', '1.0'):
|
||||||
info, warning, error = controller._process_upload_upgrade_files(self.upgrade_files, # pylint: disable=protected-access
|
info, warning, error = controller._process_upload_upgrade_files(self.upgrade_files, # pylint: disable=protected-access
|
||||||
release_data)
|
controller.release_data)
|
||||||
|
|
||||||
# Verify that the expected functions were called with the expected arguments
|
# Verify that the expected functions were called with the expected arguments
|
||||||
mock_mount_iso_load.assert_called_once_with(self.upgrade_files[constants.ISO_EXTENSION], '/tmp')
|
mock_verify_files.assert_called_once_with([self.upgrade_files[constants.ISO_EXTENSION]],
|
||||||
mock_read_upgrade_metadata.assert_called_once_with('/mnt/iso')
|
self.upgrade_files[constants.SIG_EXTENSION])
|
||||||
|
mock_mount_iso_load.assert_called_once_with(self.upgrade_files[constants.ISO_EXTENSION], constants.TMP_DIR)
|
||||||
|
mock_read_upgrade_metadata.assert_called_once_with('/test/iso')
|
||||||
|
|
||||||
|
self.assertEqual(mock_run.call_args[0][0], [constants.LOCAL_LOAD_IMPORT_FILE,
|
||||||
|
"--from-release=1.0", "--to-release=2.0", "--iso-dir=/test/iso"])
|
||||||
|
mock_unmount_iso_load.assert_called_once_with('/test/iso')
|
||||||
|
|
||||||
# Verify that the expected messages were returned
|
# Verify that the expected messages were returned
|
||||||
self.assertEqual(info, '')
|
self.assertEqual(
|
||||||
|
info, 'iso and signature files uploaded completed\nImporting iso is in progress\nLoad import successful')
|
||||||
self.assertEqual(warning, '')
|
self.assertEqual(warning, '')
|
||||||
self.assertEqual(error, '')
|
self.assertEqual(error, '')
|
||||||
|
|
||||||
# Verify that the expected methods were called on the ReleaseData object
|
|
||||||
release_data.parse_metadata.assert_called_once_with('/mnt/iso/upgrades/starlingx-2.0.0-metadata.xml', state='available')
|
|
||||||
|
|
||||||
# Verify that the expected files were copied to the expected directories
|
|
||||||
mock_copyfile.assert_called_once_with('/mnt/iso/upgrades/starlingx-2.0.0-metadata.xml',
|
|
||||||
constants.AVAILABLE_DIR + '/starlingx-2.0.0-metadata.xml')
|
|
||||||
expected_calls = [call(constants.AVAILABLE_DIR, exist_ok=True),
|
|
||||||
call(constants.FEED_OSTREE_BASE_DIR, exist_ok=True)]
|
|
||||||
self.assertEqual(mock_makedirs.call_count, 2)
|
|
||||||
mock_makedirs.assert_has_calls(expected_calls)
|
|
||||||
mock_unmount_iso_load.assert_called_once_with('/mnt/iso')
|
|
||||||
|
|
||||||
@patch('software.software_controller.PatchController.__init__', return_value=None)
|
@patch('software.software_controller.PatchController.__init__', return_value=None)
|
||||||
@patch('software.software_controller.verify_files')
|
@patch('software.software_controller.verify_files')
|
||||||
def test_process_upload_upgrade_files_invalid_signature(self, mock_verify_files, mock_init): # pylint: disable=unused-argument
|
@patch('software.software_controller.mount_iso_load')
|
||||||
|
@patch('software.software_controller.unmount_iso_load')
|
||||||
|
def test_process_upload_upgrade_files_invalid_signature(self,
|
||||||
|
mock_unmount_iso_load, # pylint: disable=unused-argument
|
||||||
|
mock_mount_iso_load,
|
||||||
|
mock_verify_files,
|
||||||
|
mock_init): # pylint: disable=unused-argument
|
||||||
controller = PatchController()
|
controller = PatchController()
|
||||||
controller.release_data = MagicMock()
|
controller.release_data = MagicMock()
|
||||||
controller.base_pkgdata = MagicMock()
|
|
||||||
|
|
||||||
# Mock the return values of the mocked functions
|
# Mock the return values of the mocked functions
|
||||||
mock_verify_files.return_value = False
|
mock_verify_files.return_value = False
|
||||||
|
mock_mount_iso_load.return_value = '/test/iso'
|
||||||
# Create a mock ReleaseData object
|
|
||||||
release_data = MagicMock()
|
|
||||||
|
|
||||||
# Call the function being tested
|
# Call the function being tested
|
||||||
with patch("software.software_controller.SW_VERSION", "1.0"):
|
with patch('software.software_controller.SW_VERSION', '1.0'):
|
||||||
info, warning, error = controller._process_upload_upgrade_files(self.upgrade_files, # pylint: disable=protected-access
|
info, warning, error = controller._process_upload_upgrade_files(self.upgrade_files, # pylint: disable=protected-access
|
||||||
release_data)
|
controller.release_data)
|
||||||
|
|
||||||
# Verify that the expected messages were returned
|
# Verify that the expected messages were returned
|
||||||
self.assertEqual(info, '')
|
self.assertEqual(info, '')
|
||||||
self.assertEqual(warning, '')
|
self.assertEqual(warning, '')
|
||||||
self.assertEqual(error, 'Upgrade file signature verification failed\n')
|
self.assertEqual(error, 'Upgrade file signature verification failed\n')
|
||||||
|
|
||||||
# Verify that the expected methods were called on the ReleaseData object
|
|
||||||
release_data.parse_metadata.assert_not_called()
|
|
||||||
|
|
||||||
@patch('software.software_controller.PatchController.__init__', return_value=None)
|
@patch('software.software_controller.PatchController.__init__', return_value=None)
|
||||||
@patch('software.software_controller.verify_files')
|
@patch('software.software_controller.verify_files', side_effect=ReleaseValidationFailure('Invalid signature file'))
|
||||||
@patch('software.software_controller.mount_iso_load')
|
def test_process_upload_upgrade_files_validation_error(self,
|
||||||
@patch('software.software_controller.read_upgrade_metadata')
|
mock_verify_files,
|
||||||
def test_process_upload_upgrade_files_unsupported_version(self,
|
mock_init): # pylint: disable=unused-argument
|
||||||
mock_read_upgrade_metadata,
|
|
||||||
mock_mount_iso_load,
|
|
||||||
mock_verify_files,
|
|
||||||
mock_init): # pylint: disable=unused-argument
|
|
||||||
controller = PatchController()
|
controller = PatchController()
|
||||||
controller.release_data = MagicMock()
|
controller.release_data = MagicMock()
|
||||||
controller.base_pkgdata = MagicMock()
|
|
||||||
|
|
||||||
# Mock the return values of the mocked functions
|
mock_verify_files.return_value = False
|
||||||
mock_verify_files.return_value = True
|
|
||||||
mock_mount_iso_load.return_value = '/mnt/iso'
|
|
||||||
mock_read_upgrade_metadata.return_value = ('2.0', [{'version': '1.5'}])
|
|
||||||
|
|
||||||
# Create a mock ReleaseData object
|
|
||||||
release_data = MagicMock()
|
|
||||||
|
|
||||||
# Call the function being tested
|
# Call the function being tested
|
||||||
with patch("software.software_controller.SW_VERSION", "1.0"):
|
info, warning, error = controller._process_upload_upgrade_files(self.upgrade_files, # pylint: disable=protected-access
|
||||||
info, warning, error = controller._process_upload_upgrade_files(self.upgrade_files, # pylint: disable=protected-access
|
controller.release_data)
|
||||||
release_data)
|
|
||||||
|
|
||||||
# Verify that the expected messages were returned
|
# Verify that the expected messages were returned
|
||||||
self.assertEqual(info, '')
|
self.assertEqual(info, '')
|
||||||
self.assertEqual(warning, '')
|
self.assertEqual(warning, '')
|
||||||
self.assertEqual(error, 'Upgrade is not supported for current release 1.0\n')
|
self.assertEqual(error, 'Upgrade file signature verification failed\n')
|
||||||
|
|
||||||
# Verify that the expected methods were called on the ReleaseData object
|
|
||||||
release_data.parse_metadata.assert_not_called()
|
|
||||||
|
@ -159,7 +159,9 @@ def save_temp_file(file_item, temp_dir=constants.SCRATCH_DIR):
|
|||||||
try:
|
try:
|
||||||
if not os.path.exists(temp_dir):
|
if not os.path.exists(temp_dir):
|
||||||
shutil.rmtree(temp_dir, ignore_errors=True)
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
||||||
os.makedirs(temp_dir)
|
os.makedirs(temp_dir, mode=0o755)
|
||||||
|
LOG.info("Created directory %s with free space %s bytes",
|
||||||
|
temp_dir, shutil.disk_usage(temp_dir).free)
|
||||||
except Exception:
|
except Exception:
|
||||||
raise Exception("Failed to create directory {}".format(temp_dir))
|
raise Exception("Failed to create directory {}".format(temp_dir))
|
||||||
|
|
||||||
@ -172,14 +174,18 @@ def save_temp_file(file_item, temp_dir=constants.SCRATCH_DIR):
|
|||||||
LOG.error("Not enough space to save file %s in %s \n \
|
LOG.error("Not enough space to save file %s in %s \n \
|
||||||
Available %s bytes. File size %s", file_name, temp_dir, avail_space, file_size)
|
Available %s bytes. File size %s", file_name, temp_dir, avail_space, file_size)
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.exception("Failed to get file size in bytes for %s or disk space for %s", file_item, temp_dir)
|
msg = "Failed to get file size in bytes for {} or disk space for {}".format(file_item, temp_dir)
|
||||||
|
LOG.exception(msg)
|
||||||
|
raise Exception(msg)
|
||||||
|
|
||||||
saved_file = os.path.join(temp_dir, os.path.basename(file_name))
|
saved_file = os.path.join(temp_dir, os.path.basename(file_name))
|
||||||
try:
|
try:
|
||||||
with open(saved_file, 'wb') as destination_file:
|
with open(saved_file, 'wb') as destination_file:
|
||||||
destination_file.write(file_item.value)
|
destination_file.write(file_item.value)
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.exception("Failed to save file %s", file_name)
|
msg = "Failed to save file {} in {}".format(file_name, temp_dir)
|
||||||
|
LOG.exception(msg)
|
||||||
|
raise Exception(msg)
|
||||||
|
|
||||||
|
|
||||||
def delete_temp_file(file_name, temp_dir=constants.SCRATCH_DIR):
|
def delete_temp_file(file_name, temp_dir=constants.SCRATCH_DIR):
|
||||||
|
Loading…
x
Reference in New Issue
Block a user