root/build-tools/stx/patch/patch-builder
Dostoievski Batista 6633f50248 Add remote signing for patch
This change add the option --remote-sign for the patch-builder script
allowing the user to send the patch to a remote signing server, get the
it signed and download it signed.

Test plan:
    PASS: Build patch without signing it remotely
    PASS: Build patch signing it remotely

Story: 2010676
Task: 51341

Change-Id: I59631ea81f05133f47aa3036f6c9f29e1a02b9c2
Signed-off-by: Dostoievski Batista <dostoievski.albinobatista@windriver.com>
2024-11-14 07:17:09 -03:00

359 lines
13 KiB
Python
Executable File

#!/usr/bin/env python3
#
# Copyright (c) 2023-2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
'''
Builds a Debian patch
'''
import hashlib
import logging
import os
import shutil
import subprocess
import sys
import tarfile
import tempfile
import click
import fetch_debs
import metadata
from signing.patch_signing import sign_files
import constants
sys.path.append('..')
import utils
logger = logging.getLogger('patch_builder')
utils.set_logger(logger)
# Patch signature files
detached_signature_file = "signature.v2"
mdsum_signature_file = "signature"
# Signing server variables
signing_server = os.getenv('SIGNING_SERVER', '')
signing_user = os.getenv('SIGNING_USER', '')
# Patch output directory
DEPLOY_DIR = "/localdisk/deploy"
PATCH_OUTPUT = os.path.join(DEPLOY_DIR, "patch_output")
class PatchBuilder(object):
def __init__(self, patch_recipe_file, file_name=None, sign_remote=False):
self.metadata = metadata.PatchMetadata(patch_recipe_file)
self.metadata.parse_input_xml_data()
self.fetch_debs = fetch_debs.FetchDebs()
self.fetch_debs.need_dl_stx_pkgs = self.metadata.stx_packages
self.fetch_debs.need_dl_binary_pkgs = self.metadata.binary_packages
self.patch_name = f'{self.metadata.patch_id}.patch' if file_name == None else file_name
self.sign_remote = sign_remote
def get_md5(self, path):
'''
Utility function for generating the md5sum of a file
:param path: Path to file
'''
md5 = hashlib.md5()
block_size = 8192
with open(path, 'rb') as f:
for chunk in iter(lambda: f.read(block_size), b''):
md5.update(chunk)
return int(md5.hexdigest(), 16)
def build_patch(self):
logger.info(f"Generating patch {self.patch_name}")
# Fetch debs from metadata and
# Create software.tar, metadata.tar and signatures
# Create a temporary working directory
logger.debug("Fetching debs...")
self.fetch_debs.fetch_stx_packages()
self.fetch_debs.fetch_external_binaries()
# verify if dir is not empty
dl_dir = os.path.join(self.fetch_debs.output_dir, "downloads", "binary")
if not os.listdir(dl_dir):
logger.error("No debs fetched")
return False
logger.info("################ PATCH BUILD ################")
logger.info("Download completed, building our patch")
tmpdir = tempfile.mkdtemp(prefix="patch_")
os.chdir(tmpdir)
tar = tarfile.open("software.tar", "w")
# copy all files from dl_dir into the tar
for file in os.listdir(dl_dir):
logger.info(f"Saving file {file}")
tar.add(os.path.join(dl_dir, file), arcname=file)
# append deb name into metadata
self.metadata.debs.append(file)
tar.close()
pre_install = self.metadata.pre_install
post_install = self.metadata.post_install
# pre/post install scripts
if pre_install:
logger.debug(f"Copying pre-install script: {pre_install}")
self.copy_rename_script(pre_install, "PRE_INSTALL")
if post_install:
logger.debug(f"Copying post-install script: {post_install}")
self.copy_rename_script(post_install, "POST_INSTALL")
# Copy all activate scripts
if self.metadata.activation_scripts:
for script in self.metadata.activation_scripts:
self.copy_rename_script(path_to_script=script, rename=False)
# if the patch includes the 'software' package we need to make deploy-precheck
# and upgrade_utils.py from .deb file accessible directly from patch file
if 'software' in self.metadata.stx_packages:
logger.info(f"Patch includes the software package, getting scripts from deb file...")
# create temporary folder to hold our files until we copy them to the patch
tmp_folder = tempfile.mkdtemp(prefix='deb_')
# Collect files
files_to_get = [constants.PATCH_SCRIPTS["DEPLOY_PRECHECK"],
constants.PATCH_SCRIPTS["UPGRADE_UTILS"]]
path_files = self.get_files_from_deb(dl_dir, tmp_folder, 'software', files_to_get)
for path in path_files:
self.copy_rename_script(path_to_script=path, rename=False)
# removing the temporary folder
shutil.rmtree(tmp_folder)
if not pre_install and not post_install and self.metadata.reboot_required == 'N':
logger.warn("In service patch without restart scripts provided")
# Generate metadata.xml
logger.debug("Generating metadata file")
self.metadata.generate_patch_metadata("metadata.xml")
tar = tarfile.open("metadata.tar", "w")
tar.add("metadata.xml")
tar.close()
os.remove("metadata.xml")
# Pack .patch file
self.__sign_and_pack(self.patch_name)
def copy_rename_script(self, path_to_script, script_type=None, rename=True):
'''
Copy the script to the directory we are in and rename based
on PATCH_SCRIPT, if necessary.
:param path_to_script: Path to the script
:param script_type: Type of the script from the constant PATCH_SCRIPTS
:param rename: Select if we should
'''
if not os.path.isfile(path_to_script):
erro_msg = f"Install script {path_to_script} not found"
logger.error(erro_msg)
raise FileNotFoundError(erro_msg)
# check if need a rename or not
if rename:
# We check the type to correctly rename the file to a expected value
script_name = constants.PATCH_SCRIPTS.get(script_type, None)
if script_name and rename:
logger.info(f"Renaming {path_to_script} to {script_name}")
shutil.copy(path_to_script, f"./{script_name}")
else:
raise ValueError(f"Script type provided is not valid one: {script_type}")
else:
logger.info(f"Copying {path_to_script}...")
shutil.copy(path_to_script, "./")
def get_files_from_deb(self, download_dir, tmp_folder, package_name, files):
'''
Get files from inside the .deb and make it available in temporary folder
:param download_dir: Full path of directory where the deb is downloaded
:param tmp_folder: Temporary folder where file will be available
:param package_name: Name of the package
:param files: List of name of the files to be extracted
:returns list: full path for the script file
'''
# from download dir, search for {package_name}_*.deb package
pkg_name = None
for file in os.listdir(download_dir):
if file.startswith(f'{package_name}_') and file.endswith('.deb'):
pkg_name = file
if not pkg_name:
erro_msg = f'Unable to find {package_name} package inside download folder'
logger.error(erro_msg)
raise FileNotFoundError(erro_msg)
deb_path = os.path.join(download_dir, pkg_name)
# we copy deb to the temporary folder
shutil.copy(deb_path, tmp_folder)
# We first unpack deb file and get data.tar.xz from there
cmd = ['ar', '-x', os.path.join(tmp_folder, pkg_name)]
subprocess.check_call(cmd, cwd=tmp_folder)
# With data.tar.xz, we try to find script file
data_tar = tarfile.open(os.path.join(tmp_folder, 'data.tar.xz'))
paths = []
for f in files:
file_tarpath = None
for member in data_tar.getnames():
if member.endswith(f):
file_tarpath = member
if not file_tarpath:
erro_msg = f"Unable to find {f} inside data tar."
logger.error(erro_msg)
raise FileNotFoundError(erro_msg)
# We extract said file to the temporary folder
data_tar.extract(file_tarpath, path=tmp_folder)
# add it to our return
paths.append(os.path.join(tmp_folder, file_tarpath))
data_tar.close()
return paths
def __sign_and_pack(self, patch_file):
"""
Generates the patch signatures and pack the .patch file
:param patch_file .patch file full path
"""
filelist = ["metadata.tar", "software.tar"]
if self.metadata.pre_install:
filelist.append(constants.PATCH_SCRIPTS["PRE_INSTALL"])
if self.metadata.post_install:
filelist.append(constants.PATCH_SCRIPTS["POST_INSTALL"])
# Generate the local signature file
logger.debug(f"Generating signature for patch files {filelist}")
sig = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
for f in filelist:
sig ^= self.get_md5(f)
sigfile = open(mdsum_signature_file, "w")
sigfile.write("%x" % sig)
sigfile.close()
# this comes from patch_functions write_patch
# Generate the detached signature
#
# Note: if cert_type requests a formal signature, but the signing key
# is not found, we'll instead sign with the "dev" key and
# need_resign_with_formal is set to True.
need_resign_with_formal = sign_files(
filelist,
detached_signature_file,
cert_type=None)
logger.info(f"Formal signing status: {need_resign_with_formal}")
# Save files into .patch
files = [f for f in os.listdir('.') if os.path.isfile(f)]
if not os.path.exists(PATCH_OUTPUT):
os.makedirs(PATCH_OUTPUT)
patch_full_path = os.path.join(PATCH_OUTPUT, patch_file)
tar = tarfile.open(patch_full_path, "w:gz")
for file in files:
logger.info(f"Saving file {file}")
tar.add(file)
tar.close()
logger.info(f"Patch file created {patch_full_path}")
if self.sign_remote:
self.__sign_patch_remotely(patch_full_path)
def __sign_patch_remotely(self, patch_file):
"""
Send the patch file to be signed remotely by a signing server
:param patch_file full path to the patch file
"""
logger.info("Starting remote signing for: %s", patch_file)
if not signing_server:
logger.exception("SIGNING_SERVER variable not set, unable to continue.")
sys.exit(1)
if not signing_user:
logger.exception("SIGNING_USER variable not set, unable to continue.")
sys.exit(1)
try:
conn_string = f"{signing_user}@{signing_server}"
patch_basename = os.path.basename(patch_file)
# First we get the upload path from the signing server, it should return something
# similar to: "Upload: /tmp/sign_upload.5jR11pS0"
call_path = subprocess.check_output([
"ssh",
"-o StrictHostKeyChecking=no",
conn_string,
f"sudo {constants.GET_UPLOAD_PATH} -r"]).decode(sys.stdout.encoding).strip()
upload_path = call_path.split()[1]
logger.info("Upload path receive from signing server: %s", upload_path)
# We send the patch to the signing server
logger.info("Sending patch to signing server...")
subprocess.check_output([
"scp",
"-q",
patch_file,
f"{conn_string}:{upload_path}"])
# Request the signing server to sign the patch, it should return the full path
# of the patch inside the signing server
logger.info("Signing patch...")
signed_patch_path = subprocess.check_output([
"ssh",
conn_string,
f"sudo {constants.REQUEST_SIGN}",
f"{upload_path}/{patch_basename}",
"usm"]).decode(sys.stdout.encoding).strip()
logger.info("Signing successful, path returned: %s", signed_patch_path)
logger.info("Downloading signed patch...")
subprocess.check_output([
"scp",
"-q",
f"{conn_string}:{signed_patch_path}",
patch_file])
logger.info("Patch successfully signed: %s", patch_file)
except subprocess.CalledProcessError as e:
logger.exception("Failure to sign patch: %s", e)
except Exception as e:
logger.exception("An unexpected error has occurred when signing the patch: %s", e)
@click.command()
@click.option(
'--recipe',
help='Patch recipe input XML file, examples are available under EXAMLES directory',
required=True)
@click.option(
'--name',
help='Allow user to define name of the patch file. e.g.: test-sample-rr.patch. \
Name will default to patch_id if not defined',
required=False)
@click.option(
'--remote-sign',
help='Send the patch to defined SIGNING SERVER to be sign with an different key.',
is_flag=True,
required=False)
def build(recipe, name=None, remote_sign=False):
patch_builder = PatchBuilder(recipe, name, remote_sign)
patch_builder.build_patch()
if __name__ == '__main__':
build()