powertrain-build/powertrain_build/interface/model_yaml_verification.py
Axel Andersson 816553f1bc Added powertrain-build entrypoint
The entrypoint will make it easier to install powertrain-build to an
isolated venv without having to call python -m powertrain_build.

Change-Id: I3850c97d17707f9bc03640bd1d997508637d97ba
2024-10-30 10:33:42 +01:00

349 lines
14 KiB
Python

# Copyright 2024 Volvo Car Corporation
# Licensed under Apache 2.0.
"""Module for verifying the model yaml files."""
import argparse
import logging
import sys
import typing
from pathlib import Path
from voluptuous import All, MultipleInvalid, Optional, Required, Schema
from ruamel.yaml import YAML
from powertrain_build.interface.application import Application
from powertrain_build.interface.base import BaseApplication
PARSER_HELP = "Verify the model yaml files."
class ModelYmlVerification(BaseApplication):
"""Class for verifying the model yaml files."""
def __init__(self, base_application):
self.base_application = base_application
self.raw = {}
self.app_insignals = self.get_insignals_name()
self.app_outsignals = self.get_outsignals_name()
self.signal_properties = {}
self.model_name = None
self.error_printed = False
def read_translation(self, translation_file):
"""Read specification of the yaml file.
Args:
translation_file (Path): file with specs.
"""
if not translation_file.is_file():
return {}
with open(translation_file, encoding="utf-8") as translation:
try:
yaml = YAML(typ='safe', pure=True)
self.raw = yaml.load(translation)
except yaml.YAMLError as e:
self.raw = {}
if hasattr(e, 'problem_mark'):
mark = e.problem_mark
self.error("Error while reading model file, verification of this file cannot continue until this "
f"is fixed:\nFile: {translation_file}\nLine: {mark.line + 1}\nColumn: {mark.column + 1}")
else:
self.error("Error while reading model file, verification of this file cannot continue until this "
f"is fixed:\nFile: {translation_file}")
def validate_signal_schema(self, signal_spec, signal_direction, is_hal, is_service):
"""Validate if signal have a correct schema in model yaml file.
Args:
signal_spec (dict): signal specification.
signal_direction (str): insignal or outsignal.
is_hal (Bool): signal comming from hal
is_service (Bool): signal comming from service
"""
if is_hal:
signal_schema = Schema({Required('insignal'): All(str), Optional('property'): All(str)})
elif is_service:
signal_schema = Schema({Required(signal_direction): All(str), Optional('property'): All(str)})
else:
signal_schema = Schema({Required(signal_direction): All(str), Required('property'): All(str)})
try:
signal_schema(signal_spec)
except MultipleInvalid as e:
self.error(f"{e} in {self.model_name}")
def validate_group_schema(self, group_spec):
"""Validate if device proxy signal group and hal endpoint
have correct schema.
Args:
group_spec (dict): dp signal group or hal endpoint.
"""
group_schema = Schema({Required(str): [{Required(str): list}]})
try:
group_schema(group_spec)
except MultipleInvalid as e:
self.error(self.model_name + ' ' + str(e))
def validate_service_schema(self, service_spec):
"""Validate if service schema in model yaml file.
Args:
service_spec (dict): service in model yaml file.
"""
service_schema = Schema(
{
Required(str): {
Optional('properties'): [{Required(str): list}],
Optional('methods'): [{Required(str): list}]
}
}
)
try:
service_schema(service_spec)
except MultipleInvalid as e:
self.error(self.model_name + ' ' + str(e))
def validate_schema(self):
"""Validate interface schema in model yaml file.
Interface could be hal, dp(signal/signal group) and serivece.
"""
schema = Schema({
Optional('hal'): dict, Optional('signal_groups'): dict,
Optional('service'): dict, Optional('signals'): dict})
try:
schema(self.raw)
except MultipleInvalid as e:
self.error(self.model_name + ' ' + str(e))
def parse_hal_definition(self, hal):
"""Parse hal definition.
Args:
hal (dict): hal specification in model yaml file.
"""
self.parse_group_definitions(hal, is_hal=True)
def parse_service_definitions(self, service):
"""Parse service.
Args:
service (dict): service in model yaml file.
"""
if service:
self.validate_service_schema(service)
for service_name, definition in service.items():
for endpoints in definition.get('properties', []):
for endpoint, signals in endpoints.items():
self.verify_signals({service_name: signals}, is_service=True, endpoint=endpoint)
for endpoints in definition.get('methods', []):
for endpoint, signals in endpoints.items():
self.verify_signals({service_name: signals}, is_service=True, endpoint=endpoint)
def parse_group_definitions(self, signal_groups, is_hal=False):
"""Parse signal groups.
Args:
signal_groups (dict): Hal/dp signal group in yaml file.
is_hal (Bool): hal signal
"""
if signal_groups:
self.validate_group_schema(signal_groups)
for interface, group_definitions in signal_groups.items():
for group in group_definitions:
for group_name, signals in group.items():
self.verify_signals({interface: signals}, is_hal, group=group_name)
def verify_signals(self, signals_definition, is_hal=False, is_service=False, endpoint=None, group=None):
"""verify signal in-signal and out-signal in model yaml file.
Args:
signals_definition (dict): parsed signals in model yaml file.
is_hal (Bool): hal signal.
is_service (Bool): service signal.
endpoint (str): service endpoint.
group(str): hal group.
"""
for interface, specifications in signals_definition.items():
for specification in specifications:
in_out_signal = [key for key in specification.keys() if 'signal' in key]
signal_name = specification[in_out_signal[0]]
self.validate_signal_schema(specification, in_out_signal[0], is_hal, is_service)
if in_out_signal == []:
self.error(f"signal is not defined for {interface} in {self.model_name}!")
if 'in' not in in_out_signal[0] and 'out' not in in_out_signal[0]:
self.error(f"in and out signal must be added to signal {specification['signal']}")
elif 'in' in in_out_signal[0] and specification[in_out_signal[0]] not in self.app_insignals:
self.error(
f"{specification['insignal']} is not defined as an insignal in json file")
elif "out" in in_out_signal[0] and specification[in_out_signal[0]] not in self.app_outsignals:
self.error(
f"{specification['outsignal']} is not defined as an outsignal in json file")
else:
if is_service:
if specification.get('property') is None:
self.verify_primitive(f"{interface}.{endpoint}.{signal_name}", in_out_signal[0])
else:
self.verify_primitive(
f"{interface}.{endpoint}.{specification['property']}.{signal_name}",
in_out_signal[0])
elif is_hal:
if specification.get('property') is None:
self.verify_primitive(f"{interface}.{group}.{signal_name}", in_out_signal[0])
else:
self.verify_primitive(f"{interface}.{group}.{specification['property']}.{signal_name}",
in_out_signal[0])
else:
self.verify_primitive(f"{interface}.{specification['property']}.{signal_name}",
in_out_signal[0])
def check_duplicate_signals(self, property_spec, in_out):
"""Check if each signal appears only once for each model.
It is ok for two insignals with the same name to be mapped to the same primitive
if they are in different models.
It is not ok for a outsignal to map to the same interface twice, but it is ok to map to
different interfaces (with same or different property name).
Args:
property_spec (str): property specification.
in_out (str): whether it is an in- or outsignal.
"""
signal_name = property_spec.split('.')[-1]
interface_name = property_spec.split('.')[0]
for model, spec in self.signal_properties.items():
for primitive in spec:
if signal_name in primitive.split('.'):
if property_spec not in primitive:
if "in" in in_out:
self.error(
f"You can't connect a signal {signal_name} in {self.model_name} model to two "
f"different primitives. It's already connected in {model} model")
else:
if interface_name in primitive.split('.'):
self.error(
f"You can't connect a signal {signal_name} in {self.model_name} model "
f"to the same interface ({interface_name}) twice. "
f"It's already connected as {primitive} in model {model}.")
else:
if model == self.model_name:
self.error(f"You can't connect signal {signal_name} in {self.model_name} model twice.")
elif "out" in in_out:
self.error(
f"You can't connect signal {signal_name} in {self.model_name} model to the same "
f"primitive as in another model. It is already defined in {model}")
def check_property(self, property_spec):
"""Check if we have only one signal written for the same property.
Args:
property_spec (str): property specification.
"""
signal_name = property_spec.split('.')[-1]
for model, spec in self.signal_properties.items():
for primitive in spec:
if ('.'.join(property_spec.split('.')[:-1]) == '.'.join(primitive.split('.')[:-1])
and signal_name != primitive.split('.')[-1]):
self.error(
f"You can't connect another signal to the existing property {property_spec} in "
f"{self.model_name} model, because it is already defined in {model} model.")
def verify_primitive(self, property_spec, in_out):
"""Runs the necessary tests.
Args:
property_spec (str): property specification.
in_out (str): whether it is an in- or outsignal.
"""
self.check_duplicate_signals(property_spec, in_out)
self.check_property(property_spec)
self.signal_properties[self.model_name].append(property_spec)
def get_insignals_name(self):
"""Base application in-signals.
"""
app_insignals = []
for signal in self.base_application.insignals:
app_insignals.append(signal.name)
return app_insignals
def get_outsignals_name(self):
"""Base application out-signals.
"""
app_outsignals = []
for signal in self.base_application.outsignals:
app_outsignals.append(signal.name)
return app_outsignals
def parse_definition(self, definition):
"""Parses all definition files
Args:
definition (list(Path)): model yaml files.
"""
for translation in definition:
path = Path(translation)
self.model_name = path.name.replace(".yaml", "")
self.signal_properties[self.model_name] = []
self.read_translation(translation)
self.validate_schema()
self.verify_signals(self.raw.get("signals", {}))
self.parse_group_definitions(self.raw.get("signal_groups", {}))
self.parse_hal_definition(self.raw.get("hal", {}))
self.parse_service_definitions(self.raw.get("service", {}))
def error(self, msg):
""" Prints an error message to the terminal.
Args:
msg (string): The message to be printed.
"""
self.error_printed = True
logging.error(f"{msg}\n")
def print_success_msg(self):
""" Prints a success message if no error messages have been printed.
"""
if not self.error_printed:
print('Yaml verification done without any errors.')
def get_app(project_config):
""" Get an app specification for the current project.
Args:
config (pathlib.Path): Path to the ProjectCfg.json.
Returns:
app (Application): powertrain-build project.
"""
app = Application()
app.parse_definition(project_config)
return app
def configure_parser(parser: argparse.ArgumentParser):
"""Configure the argument parser."""
parser.add_argument("config", help="The SPA2 project config file", type=Path)
parser.set_defaults(func=model_yaml_verification_cli)
def model_yaml_verification_cli(args: argparse.Namespace):
"""CLI function for model yaml verification."""
app = get_app(args.config)
model_yamls = app.get_translation_files()
model_yaml_ver = ModelYmlVerification(app)
model_yaml_ver.parse_definition(model_yamls)
model_yaml_ver.print_success_msg()
def main(argv: typing.Optional[typing.List[str]] = None):
"""Main function for model yaml verification."""
parser = argparse.ArgumentParser(description=PARSER_HELP)
configure_parser(parser)
args = parser.parse_args(argv)
args.func(args)
if __name__ == "__main__":
main(sys.argv[1:])