From 15df33437bd4e9888f46d9b453adf03fb7c59ab6 Mon Sep 17 00:00:00 2001
From: cid <afonnepaulc@gmail.com>
Date: Wed, 22 Jan 2025 23:39:13 +0100
Subject: [PATCH] Apply Rules: inspection rules migration

Change-Id: Icca713bb4ef00d5bffca6e529c8bea4a7fe1f285
---
 ironic/common/inspection_rules/__init__.py   |   0
 ironic/common/inspection_rules/actions.py    | 488 +++++++++++++++++++
 ironic/common/inspection_rules/base.py       | 155 ++++++
 ironic/common/inspection_rules/engine.py     | 235 +++++++++
 ironic/common/inspection_rules/operators.py  | 259 ++++++++++
 ironic/common/inspection_rules/validation.py | 189 +++++++
 ironic/common/utils.py                       |  15 +
 ironic/conductor/inspection.py               |   7 +-
 ironic/conf/inspector.py                     |  16 +
 requirements.txt                             |   1 +
 10 files changed, 1364 insertions(+), 1 deletion(-)
 create mode 100644 ironic/common/inspection_rules/__init__.py
 create mode 100644 ironic/common/inspection_rules/actions.py
 create mode 100644 ironic/common/inspection_rules/base.py
 create mode 100644 ironic/common/inspection_rules/engine.py
 create mode 100644 ironic/common/inspection_rules/operators.py
 create mode 100644 ironic/common/inspection_rules/validation.py

diff --git a/ironic/common/inspection_rules/__init__.py b/ironic/common/inspection_rules/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/ironic/common/inspection_rules/actions.py b/ironic/common/inspection_rules/actions.py
new file mode 100644
index 0000000000..53653981c6
--- /dev/null
+++ b/ironic/common/inspection_rules/actions.py
@@ -0,0 +1,488 @@
+
+# Copyright 2013 Red Hat, Inc.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import abc
+
+from oslo_log import log
+
+from ironic.common import exception
+from ironic.common.i18n import _
+from ironic.common.inspection_rules import base
+from ironic import objects
+
+
+LOG = log.getLogger(__name__)
+ACTIONS = {
+    "fail": "FailAction",
+    "set-attribute": "SetAttributeAction",
+    "set-capability": "SetCapabilityAction",
+    "unset-capability": "UnsetCapabilityAction",
+    "extend-attribute": "ExtendAttributeAction",
+    "add-trait": "AddTraitAction",
+    "remove-trait": "RemoveTraitAction",
+    "set-plugin-data": "SetPluginDataAction",
+    "extend-plugin-data": "ExtendPluginDataAction",
+    "unset-plugin-data": "UnsetPluginDataAction",
+    "log": "LogAction",
+    "del-attribute": "DelAttributeAction",
+    "set-port-attribute": "SetPortAttributeAction",
+    "extend-port-attribute": "ExtendPortAttributeAction",
+    "del-port-attribute": "DelPortAttributeAction",
+}
+
+
+def get_action(op_name):
+    """Get operator class by name."""
+    class_name = ACTIONS[op_name]
+    return globals()[class_name]
+
+
+def update_nested_dict(d, key_path, value):
+    keys = key_path.split('.') if isinstance(key_path, str) else key_path
+    current = d
+    for key in keys[:-1]:
+        current = current.setdefault(key, {})
+    current[keys[-1]] = value
+    return d
+
+
+class ActionBase(base.Base, metaclass=abc.ABCMeta):
+    """Abstract base class for rule action plugins."""
+
+    OPTIONAL_ARGS = set()
+    """Set with names of optional parameters."""
+
+    FORMATTED_ARGS = []
+    """List of params to be formatted with python format."""
+
+    @abc.abstractmethod
+    def __call__(self, task, *args, **kwargs):
+        """Run action on successful rule match."""
+
+    def _execute_with_loop(self, task, action, inventory, plugin_data):
+        loop_items = action.get('loop', [])
+        results = []
+
+        if isinstance(loop_items, (list, dict)):
+            for item in loop_items:
+                action_copy = action.copy()
+                action_copy['args'] = item
+                results.append(self._execute_action(task, action_copy,
+                                                    inventory, plugin_data))
+        return results
+
+    def _execute_action(self, task, action, inventory, plugin_data):
+        processed_args = self._process_args(task, action, inventory,
+                                            plugin_data)
+
+        arg_values = [processed_args[arg_name]
+                      for arg_name in self.get_arg_names()]
+
+        for optional_arg in self.OPTIONAL_ARGS:
+            arg_values.append(processed_args.get(optional_arg, False))
+
+        return self(task, *arg_values)
+
+
+class LogAction(ActionBase):
+    FORMATTED_ARGS = ['msg']
+
+    @classmethod
+    def get_arg_names(cls):
+        return ['msg']
+
+    def __call__(self, task, msg, level='info'):
+        getattr(LOG, level)(msg)
+
+
+class FailAction(ActionBase):
+
+    @classmethod
+    def get_arg_names(cls):
+        return ['msg']
+
+    def __call__(self, task, msg):
+        msg = _('%(msg)s') % {'msg': msg}
+        raise exception.HardwareInspectionFailure(error=msg)
+
+
+class SetAttributeAction(ActionBase):
+    FORMATTED_ARGS = ['value']
+
+    @classmethod
+    def get_arg_names(cls):
+        return ['path', 'value']
+
+    def __call__(self, task, path, value):
+        try:
+            attr_path_parts = path.strip('/').split('/')
+            if len(attr_path_parts) == 1:
+                setattr(task.node, attr_path_parts[0], value)
+            else:
+                base_attr = getattr(task.node, attr_path_parts[0])
+                current = base_attr
+                for part in attr_path_parts[1:-1]:
+                    current = current.setdefault(part, {})
+                current[attr_path_parts[-1]] = value
+                setattr(task.node, attr_path_parts[0], base_attr)
+            task.node.save()
+        except Exception as exc:
+            msg = ("Failed to set attribute %(path)s "
+                   "with value %(value)s: %(exc)s" %
+                   {'path': path, 'value': value, 'exc': exc})
+            LOG.error(msg)
+            raise exception.InvalidParameterValue(msg)
+
+
+class ExtendAttributeAction(ActionBase):
+
+    OPTIONAL_ARGS = {'unique'}
+    FORMATTED_ARGS = ['value']
+
+    @classmethod
+    def get_arg_names(cls):
+        return ['path', 'value']
+
+    def __call__(self, task, path, value, unique=False):
+        try:
+            attr_path_parts = path.strip('/').split('/')
+            if len(attr_path_parts) == 1:
+                current = getattr(task.node, attr_path_parts[0], [])
+            else:
+                base_attr = getattr(task.node, attr_path_parts[0])
+                current = base_attr
+                for part in attr_path_parts[1:-1]:
+                    current = current.setdefault(part, {})
+                current = current.setdefault(attr_path_parts[-1], [])
+
+            if not isinstance(current, list):
+                current = []
+            if not unique or value not in current:
+                current.append(value)
+
+            if len(attr_path_parts) == 1:
+                setattr(task.node, attr_path_parts[0], current)
+            else:
+                setattr(task.node, attr_path_parts[0], base_attr)
+            task.node.save()
+        except Exception as exc:
+            msg = ("Failed to extend attribute %(path)s: %(exc)s") % {
+                'path': path, 'exc': exc}
+            raise exception.InvalidParameterValue(msg)
+
+
+class DelAttributeAction(ActionBase):
+
+    @classmethod
+    def get_arg_names(cls):
+        return ['path']
+
+    def __call__(self, task, path):
+        try:
+            attr_path_parts = path.strip('/').split('/')
+            if len(attr_path_parts) == 1:
+                delattr(task.node, attr_path_parts[0])
+            else:
+                base_attr = getattr(task.node, attr_path_parts[0])
+                current = base_attr
+                for part in attr_path_parts[1:-1]:
+                    current = current[part]
+                del current[attr_path_parts[-1]]
+                setattr(task.node, attr_path_parts[0], base_attr)
+            task.node.save()
+        except Exception as exc:
+            msg = ("Failed to delete attribute at %(path)s: %(exc)s") % {
+                'path': path, 'exc': exc}
+            raise exception.InvalidParameterValue(msg)
+
+
+class AddTraitAction(ActionBase):
+
+    @classmethod
+    def get_arg_names(cls):
+        return ['name']
+
+    def __call__(self, task, name):
+        try:
+            new_trait = objects.Trait(task.context, node_id=task.node.id,
+                                      trait=name)
+            new_trait.create()
+        except Exception as exc:
+            msg = (_("Failed to add new trait %(name)s: %(exc)s") %
+                   {'name': name, 'exc': exc})
+            raise exception.InvalidParameterValue(msg)
+
+
+class RemoveTraitAction(ActionBase):
+
+    @classmethod
+    def get_arg_names(cls):
+        return ['name']
+
+    def __call__(self, task, name):
+        try:
+            objects.Trait.destroy(task.context, node_id=task.node.id,
+                                  trait=name)
+        except exception.NodeTraitNotFound as exc:
+            LOG.warning(_("Failed to remove trait %(name)s: %(exc)s"),
+                        {'name': name, 'exc': exc})
+        except Exception as exc:
+            msg = (_("Failed to remove trait %(name)s: %(exc)s") %
+                   {'name': name, 'exc': exc})
+            raise exception.InvalidParameterValue(msg)
+
+
+class SetCapabilityAction(ActionBase):
+    FORMATTED_ARGS = ['value']
+
+    @classmethod
+    def get_arg_names(cls):
+        return ['name', 'value']
+
+    def __call__(self, task, name, value):
+        try:
+            properties = task.node.properties.copy()
+            capabilities = properties.get('capabilities', '')
+            caps = dict(cap.split(':', 1)
+                        for cap in capabilities.split(',') if cap)
+            caps[name] = value
+            properties['capabilities'] = ','.join('%s:%s' % (k, v)
+                                                  for k, v in caps.items())
+            task.node.properties = properties
+            task.node.save()
+        except Exception as exc:
+            raise exception.InvalidParameterValue(
+                "Failed to set capability %(name)s: %(exc)s" %
+                {'name': name, 'exc': exc})
+
+
+class UnsetCapabilityAction(ActionBase):
+    @classmethod
+    def get_arg_names(cls):
+        return ['name']
+
+    def __call__(self, task, name):
+        try:
+            properties = task.node.properties.copy()
+            capabilities = properties.get('capabilities', '')
+            caps = dict(cap.split(':', 1)
+                        for cap in capabilities.split(',') if cap)
+            caps.pop(name, None)
+            properties['capabilities'] = ','.join('%s:%s' % (k, v)
+                                                  for k, v in caps.items())
+            task.node.properties = properties
+            task.node.save()
+        except Exception as exc:
+            raise exception.InvalidParameterValue(
+                "Failed to unset capability %(name)s: %(exc)s" %
+                {'name': name, 'exc': exc})
+
+
+class SetPluginDataAction(ActionBase):
+
+    FORMATTED_ARGS = ['value']
+
+    @classmethod
+    def get_arg_names(cls):
+        return ['path', 'value', 'plugin_data']
+
+    def __call__(self, task, path, value, plugin_data):
+        try:
+            update_nested_dict(plugin_data, path, value)
+            return {'plugin_data': plugin_data}
+        except Exception as exc:
+            msg = ("Failed to set plugin data at %(path)s: %(exc)s" % {
+                'path': path, 'exc': exc})
+            raise exception.InvalidParameterValue(msg)
+
+
+class ExtendPluginDataAction(ActionBase):
+
+    OPTIONAL_ARGS = {'unique'}
+    FORMATTED_ARGS = ['value']
+
+    @classmethod
+    def get_arg_names(cls):
+        return ['path', 'value', 'plugin_data']
+
+    def __call__(self, task, path, value, plugin_data, unique=False):
+        try:
+            current = self._get_nested_value(plugin_data, path)
+            if current is None:
+                current = []
+                update_nested_dict(plugin_data, path, current)
+            elif not isinstance(current, list):
+                current = []
+                update_nested_dict(plugin_data, path, current)
+            if not unique or value not in current:
+                current.append(value)
+            return {'plugin_data': plugin_data}
+        except Exception as exc:
+            msg = ("Failed to extend plugin data at %(path)s: %(exc)s") % {
+                'path': path, 'exc': exc}
+            raise exception.InvalidParameterValue(msg)
+
+    @staticmethod
+    def _get_nested_value(d, key_path, default=None):
+        keys = key_path.split('.') if isinstance(key_path, str) else key_path
+        current = d
+        try:
+            for key in keys:
+                current = current[key]
+            return current
+        except (KeyError, TypeError):
+            return default
+
+
+class UnsetPluginDataAction(ActionBase):
+
+    @classmethod
+    def get_arg_names(cls):
+        return ['path', 'plugin_data']
+
+    def __call__(self, task, path, plugin_data):
+        try:
+            if not self._unset_nested_dict(plugin_data, path):
+                LOG.warning("Path %s not found", path)
+            return {'plugin_data': plugin_data}
+        except Exception as exc:
+            msg = ("Failed to unset plugin data at %(path)s: %(exc)s") % {
+                'path': path, 'exc': exc}
+            raise exception.InvalidParameterValue(msg)
+
+    @staticmethod
+    def _unset_nested_dict(d, key_path):
+        keys = key_path.split('.') if isinstance(key_path, str) else key_path
+        current = d
+        for key in keys[:-1]:
+            if not isinstance(current, dict) or key not in current:
+                return False
+            current = current[key]
+
+        target_key = keys[-1]
+        if isinstance(current, dict) and target_key in current:
+            if len(current) == 1:
+                parent = d
+                for key in keys[:-2]:
+                    parent = parent[key]
+                if len(keys) > 1:
+                    del parent[keys[-2]]
+            else:
+                del current[target_key]
+            return True
+        return False
+
+
+class SetPortAttributeAction(ActionBase):
+    FORMATTED_ARGS = ['value']
+
+    @classmethod
+    def get_arg_names(cls):
+        return ['port_id', 'path', 'value']
+
+    def __call__(self, task, port_id, path, value):
+        port = next((p for p in task.ports if p.uuid == port_id), None)
+        if not port:
+            raise exception.PortNotFound(port=port_id)
+        try:
+            attr_path_parts = path.strip('/').split('/')
+            if len(attr_path_parts) == 1:
+                setattr(port, attr_path_parts[0], value)
+            else:
+                base_attr = getattr(port, attr_path_parts[0])
+                current = base_attr
+                for part in attr_path_parts[1:-1]:
+                    current = current.setdefault(part, {})
+                current[attr_path_parts[-1]] = value
+                setattr(port, attr_path_parts[0], base_attr)
+            port.save()
+        except Exception as exc:
+            msg = ("Failed to set attribute %(path)s for port "
+                   "%(port_id)s: %(exc)s") % {'path': path,
+                                              'port_id': port_id,
+                                              'exc': str(exc)}
+            LOG.warning(msg)
+
+
+class ExtendPortAttributeAction(ActionBase):
+    OPTIONAL_ARGS = {'unique'}
+    FORMATTED_ARGS = ['value']
+
+    @classmethod
+    def get_arg_names(cls):
+        return ['port_id', 'path', 'value']
+
+    def __call__(self, task, port_id, path, value, unique=False):
+        port = next((p for p in task.ports if p.uuid == port_id), None)
+        if not port:
+            raise exception.PortNotFound(port=port_id)
+        try:
+            attr_path_parts = path.strip('/').split('/')
+            if len(attr_path_parts) == 1:
+                current = getattr(port, attr_path_parts[0], [])
+            else:
+                base_attr = getattr(port, attr_path_parts[0])
+                current = base_attr
+                for part in attr_path_parts[1:-1]:
+                    current = current.setdefault(part, {})
+                current = current.setdefault(attr_path_parts[-1], [])
+
+            if not isinstance(current, list):
+                current = []
+            if not unique or value not in current:
+                current.append(value)
+
+            if len(attr_path_parts) == 1:
+                setattr(port, attr_path_parts[0], current)
+            else:
+                setattr(port, attr_path_parts[0], base_attr)
+            port.save()
+        except Exception as exc:
+            msg = ("Failed to extend attribute %(path)s for port "
+                   "%(port_id)s: %(exc)s") % {'path': path,
+                                              'port_id': port_id,
+                                              'exc': str(exc)}
+            LOG.warning(msg)
+
+
+class DelPortAttributeAction(ActionBase):
+
+    @classmethod
+    def get_arg_names(cls):
+        return ['port_id', 'path']
+
+    def __call__(self, task, port_id, path):
+        port = next((p for p in task.ports if p.uuid == port_id), None)
+        if not port:
+            raise exception.PortNotFound(port=port_id)
+        try:
+            attr_path_parts = path.strip('/').split('/')
+            if len(attr_path_parts) == 1:
+                delattr(port, attr_path_parts[0])
+            else:
+                base_attr = getattr(port, attr_path_parts[0])
+                current = base_attr
+                for part in attr_path_parts[1:-1]:
+                    current = current[part]
+                del current[attr_path_parts[-1]]
+                setattr(port, attr_path_parts[0], base_attr)
+            port.save()
+        except Exception as exc:
+            msg = ("Failed to delete attribute %(path)s for port "
+                   "%(port_id)s: %(exc)s") % {'path': path,
+                                              'port_id': port_id,
+                                              'exc': str(exc)}
+            LOG.warning(msg)
diff --git a/ironic/common/inspection_rules/base.py b/ironic/common/inspection_rules/base.py
new file mode 100644
index 0000000000..39b7c6dbd6
--- /dev/null
+++ b/ironic/common/inspection_rules/base.py
@@ -0,0 +1,155 @@
+# Copyright 2013 Red Hat, Inc.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import abc
+
+from oslo_log import log
+
+from ironic.common.i18n import _
+from ironic.common import utils as common_utils
+import ironic.conf
+
+
+CONF = ironic.conf.CONF
+LOG = log.getLogger(__name__)
+SENSITIVE_FIELDS = ['password', 'auth_token', 'bmc_password']
+
+
+class Base(object):
+
+    USES_PLUGIN_DATA = False
+    """Flag to indicate if this action needs plugin_data as an arg."""
+
+    OPTIONAL_ARGS = set()
+    """Set with names of optional parameters."""
+
+    @classmethod
+    @abc.abstractmethod
+    def get_arg_names(cls):
+        """Return list of argument names in order expected."""
+        raise NotImplementedError
+
+    def _normalize_list_args(self, *args, **kwargs):
+        """Convert list arguments into dictionary format.
+
+        """
+        op_name = kwargs['op']
+        arg_list = kwargs['args']
+        if not isinstance(arg_list, list):
+            if isinstance(arg_list, dict) and 'plugin-data' in op_name:
+                arg_list['plugin_data'] = {}
+            return arg_list
+
+        # plugin_data is a required argument during validation but since
+        # it comes from the inspection data and added later, we need to
+        # make sure validation does not fail for that sake.
+        if 'plugin-data' in op_name:
+            arg_list.append('{}')
+
+        arg_names = set(self.__class__.get_arg_names())
+        if len(arg_list) < len(arg_names):
+            missing = arg_names[len(arg_list):]
+            msg = (_("Not enough arguments provided. Missing: %s"),
+                   ", ".join(missing))
+            LOG.error(msg)
+            raise ValueError(msg)
+
+        arg_list = {name: arg_list[i] for i, name in enumerate(arg_names)}
+
+        # Add optional args if they exist in the input
+        start_idx = len(arg_names)
+        for i, opt_arg in enumerate(self.OPTIONAL_ARGS):
+            if start_idx + i < len(arg_list):
+                arg_list[opt_arg] = arg_list[start_idx + i]
+
+        return arg_list
+
+    def validate(self, *args, **kwargs):
+        """Validate args passed during creation.
+
+        Default implementation checks for presence of required fields.
+
+        :param args: args as a dictionary
+        :param kwargs: used for extensibility without breaking existing plugins
+        :raises: ValueError on validation failure
+        """
+        required_args = set(self.__class__.get_arg_names())
+        normalized_args = self._normalize_list_args(
+            args=kwargs.get('args', {}), op=kwargs['op'])
+
+        if isinstance(normalized_args, dict):
+            provided = set(normalized_args.keys())
+            missing = required_args - provided
+            unexpected = provided - (required_args | self.OPTIONAL_ARGS)
+
+            msg = []
+            if missing:
+                msg.append(_('missing required argument(s): %s')
+                           % ', '.join(missing))
+            if unexpected:
+                msg.append(_('unexpected argument(s): %s')
+                           % ', '.join(unexpected))
+            if msg:
+                raise ValueError('; '.join(msg))
+        else:
+            raise ValueError(_("args must be either a list or dictionary"))
+
+    @staticmethod
+    def interpolate_variables(value, node, inventory, plugin_data):
+        if isinstance(value, str):
+            try:
+                return value.format(node=node, inventory=inventory,
+                                    plugin_data=plugin_data)
+            except (AttributeError, KeyError, ValueError, IndexError,
+                    TypeError) as e:
+                LOG.warning(
+                    "Interpolation failed: %(value)s: %(error_class)s, "
+                    "%(error)s", {'value': value,
+                                  'error_class': e.__class__.__name__,
+                                  'error': e})
+                return value
+        elif isinstance(value, dict):
+            return {
+                Base.interpolate_variables(k, node, inventory, plugin_data):
+                Base.interpolate_variables(v, node, inventory, plugin_data)
+                for k, v in value.items()}
+        elif isinstance(value, list):
+            return [Base.interpolate_variables(
+                v, node, inventory, plugin_data) for v in value]
+        return value
+
+    def _process_args(self, task, operation, inventory, plugin_data):
+        "Normalize and process args based on the operator."
+
+        op = operation.get('op')
+        if not op:
+            raise ValueError("Operation must contain 'op' key")
+
+        op, invtd = common_utils.parse_inverted_operator(op)
+        dict_args = self._normalize_list_args(args=operation.get('args', {}),
+                                              op=op)
+
+        # plugin-data becomes available during inspection,
+        # we need to populate with the actual value.
+        if 'plugin_data' in dict_args or 'plugin-data' in op:
+            dict_args['plugin_data'] = plugin_data
+
+        node = task.node
+        formatted_args = getattr(self, 'FORMATTED_ARGS', [])
+        return {
+            k: (self.interpolate_variables(v, node, inventory, plugin_data)
+                if k in formatted_args else v)
+            for k, v in dict_args.items()
+        }
diff --git a/ironic/common/inspection_rules/engine.py b/ironic/common/inspection_rules/engine.py
new file mode 100644
index 0000000000..3db447f047
--- /dev/null
+++ b/ironic/common/inspection_rules/engine.py
@@ -0,0 +1,235 @@
+# Copyright 2013 Red Hat, Inc.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+
+from oslo_log import log
+import yaml
+
+from ironic.common import exception
+from ironic.common.i18n import _
+from ironic.common.inspection_rules import actions
+from ironic.common.inspection_rules import operators
+from ironic.common.inspection_rules import validation
+from ironic.common import utils as common_utils
+from ironic.conf import CONF
+from ironic import objects
+
+
+LOG = log.getLogger(__name__)
+SENSITIVE_FIELDS = ['password', 'auth_token', 'bmc_password']
+
+
+def get_built_in_rules():
+    """Load built-in inspection rules."""
+    built_in_rules = []
+    built_in_rules_dir = CONF.inspection_rules.built_in_rules
+
+    if not built_in_rules_dir:
+        return built_in_rules
+
+    try:
+        with open(built_in_rules_dir, 'r') as f:
+            rules_data = yaml.safe_load(f)
+
+        for rule_data in rules_data:
+            try:
+                rule = {
+                    'uuid': rule_data.get('uuid'),
+                    'priority': rule_data.get('priority', 0),
+                    'description': rule_data.get('description'),
+                    'scope': rule_data.get('scope'),
+                    'sensitive': rule_data.get('sensitive', False),
+                    'phase': rule_data.get('phase', 'main'),
+                    'actions': rule_data.get('actions', []),
+                    'conditions': rule_data.get('conditions', []),
+                    'built_in': True
+                }
+                validation.validate_inspection_rule(rule)
+                built_in_rules.append(rule)
+            except Exception as e:
+                LOG.error(_("Error parsing built-in rule: %s"), e)
+                raise
+    except FileNotFoundError:
+        LOG.error(_("Built-in rules file not found: %s"),
+                  built_in_rules_dir)
+        raise
+    except yaml.YAMLError as e:
+        LOG.error(_("Error parsing YAML in built-in rules file %s: %s"),
+                  built_in_rules_dir, e)
+        raise
+    except Exception as e:
+        LOG.error(_("Error loading built-in rules from %s: %s"),
+                  built_in_rules_dir, e)
+        raise
+
+    return built_in_rules
+
+
+def _mask_sensitive_data(data):
+    """Recursively mask sensitive fields in data."""
+    if isinstance(data, dict):
+        return {key: (_mask_sensitive_data(value)
+                      if key not in SENSITIVE_FIELDS else '***')
+                for key, value in data.items()}
+    elif isinstance(data, list):
+        return [_mask_sensitive_data(item) for item in data]
+    return data
+
+
+def check_conditions(task, rule, inventory, plugin_data):
+    try:
+        if not rule.get('conditions', None):
+            return True
+
+        for condition in rule['conditions']:
+            op, invtd = common_utils.parse_inverted_operator(
+                condition['op'])
+
+            if op not in operators.OPERATORS:
+                supported_ops = ', '.join(operators.OPERATORS.keys())
+                msg = (_("Unsupported operator: '%(op)s'. Supported "
+                         "operators are: %(supported_ops)s.") % {
+                             'op': op, 'supported_ops': supported_ops})
+                raise ValueError(msg)
+
+            result = False
+            plugin = operators.get_operator(op)
+            if 'loop' in condition:
+                result = plugin()._check_with_loop(task, condition, inventory,
+                                                   plugin_data)
+            else:
+                result = plugin()._check_condition(task, condition, inventory,
+                                                   plugin_data)
+            if not result:
+                LOG.debug("Skipping rule %(rule)s on node %(node)s: "
+                          "condition check '%(op)s': '%(args)s' failed ",
+                          {'rule': rule['uuid'], 'node': task.node.uuid,
+                           'op': condition['op'], 'args': condition['args']})
+                return False
+        return True
+
+    except Exception as err:
+        LOG.error("Error checking condition on node %(node)s: %(err)s.",
+                  {'node': task.node.uuid, 'err': err})
+        raise
+
+
+def apply_actions(task, rule, inventory, plugin_data):
+
+    result = {'plugin_data': plugin_data}
+    for action in rule['actions']:
+        try:
+            op = action['op']
+            if op not in actions.ACTIONS:
+                supported_ops = ', '.join(actions.ACTIONS.keys())
+                msg = (_("Unsupported action: '%(op)s'. Supported actions "
+                         "are: %(supported_ops)s.") % {
+                             'op': op, 'supported_ops': supported_ops})
+                raise ValueError(msg)
+
+            plugin = actions.get_action(op)
+            if 'loop' in action:
+                action_result = plugin()._execute_with_loop(
+                    task, action, inventory, result['plugin_data'])
+            else:
+                action_result = plugin()._execute_action(
+                    task, action, inventory, result['plugin_data'])
+
+            if action_result is not None and isinstance(action_result, dict):
+                result['plugin_data'] = action_result.get(
+                    'plugin_data', result['plugin_data'])
+        except exception.IronicException as err:
+            LOG.error("Error applying action on node %(node)s: %(err)s.",
+                      {'node': task.node.uuid, 'err': err})
+            raise
+        except Exception as err:
+            LOG.exception("Unexpected error applying action on node "
+                          "%(node)s: %(err)s.", {'node': task.node.uuid,
+                                                 'err': err})
+            raise
+    return result
+
+
+def apply_rules(task, inventory, plugin_data, inspection_phase):
+    """Apply inspection rules to a node."""
+    node = task.node
+
+    all_rules = objects.InspectionRule.list(
+        context=task.context,
+        filters={'phase': inspection_phase})
+
+    built_in_rules = get_built_in_rules()
+    rules = all_rules + built_in_rules
+
+    if not rules:
+        LOG.debug("No inspection rules to apply for phase "
+                  "'%(phase)s on node: %(node)s'", {
+                      'phase': inspection_phase,
+                      'node': node.uuid})
+        return
+
+    mask_secrets = CONF.inspection_rules.mask_secrets
+    if mask_secrets == 'always':
+        inventory = _mask_sensitive_data(inventory)
+        plugin_data = _mask_sensitive_data(plugin_data)
+    elif mask_secrets == 'sensitive':
+        # Mask secrets unless the rule is marked as sensitive
+        for rule in rules:
+            if not rule.get('sensitive', False):
+                inventory = _mask_sensitive_data(inventory)
+                plugin_data = _mask_sensitive_data(plugin_data)
+                break
+
+    rules.sort(key=lambda rule: rule['priority'], reverse=True)
+    LOG.debug("Applying %(count)d inspection rules to node %(node)s",
+              {'count': len(rules), 'node': node.uuid})
+
+    result = {'plugin_data': plugin_data}
+    for rule in rules:
+        try:
+            if not check_conditions(task, rule, inventory, plugin_data):
+                continue
+
+            LOG.info("Applying actions for rule %(rule)s to node %(node)s",
+                     {'rule': rule['uuid'], 'node': node.uuid})
+
+            rule_result = apply_actions(task, rule, inventory, plugin_data)
+            if rule_result and 'plugin_data' in rule_result:
+                result['plugin_data'] = rule_result['plugin_data']
+
+        except exception.HardwareInspectionFailure:
+            raise
+        except exception.IronicException as e:
+            if rule['sensitive']:
+                LOG.error("Error applying sensitive rule %(rule)s to node "
+                          "%(node)s", {'rule': rule['uuid'],
+                                       'node': node.uuid})
+            else:
+                LOG.error("Error applying rule %(rule)s to node "
+                          "%(node)s: %(error)s", {'rule': rule['uuid'],
+                                                  'node': node.uuid,
+                                                  'error': e})
+            raise
+        except Exception as e:
+            msg = ("Failed to apply rule %(rule)s to node %(node)s: "
+                   "%(error)s" % {'rule': rule['uuid'], 'node': node.uuid,
+                                  'error': e})
+
+            LOG.exception(msg)
+
+            raise exception.IronicException(msg)
+
+    LOG.info("Finished applying inspection rules to node %s", node.uuid)
+    return result
diff --git a/ironic/common/inspection_rules/operators.py b/ironic/common/inspection_rules/operators.py
new file mode 100644
index 0000000000..9aa55522d0
--- /dev/null
+++ b/ironic/common/inspection_rules/operators.py
@@ -0,0 +1,259 @@
+# Copyright 2013 Red Hat, Inc.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import abc
+import operator
+import re
+
+import netaddr
+from oslo_log import log
+
+from ironic.common.i18n import _
+from ironic.common.inspection_rules import base
+from ironic.common import utils as common_utils
+
+
+LOG = log.getLogger(__name__)
+OPERATORS = {
+    "eq": "EqOperator",
+    "lt": "LtOperator",
+    "gt": "GtOperator",
+    "is-empty": "EmptyOperator",
+    "in-net": "NetOperator",
+    "matches": "MatchesOperator",
+    "contains": "ContainsOperator",
+    "one-of": "OneOfOperator",
+    "is-none": "IsNoneOperator",
+    "is-true": "IsTrueOperator",
+    "is-false": "IsFalseOperator",
+}
+
+
+def get_operator(op_name):
+    """Get operator class by name."""
+    class_name = OPERATORS[op_name]
+    return globals()[class_name]
+
+
+def coerce(value, expected):
+    if isinstance(expected, float):
+        return float(value)
+    elif isinstance(expected, int):
+        return int(value)
+    else:
+        return value
+
+
+class OperatorBase(base.Base, metaclass=abc.ABCMeta):
+    """Abstract base class for rule condition plugins."""
+
+    OPTIONAL_ARGS = set()
+    """Set with names of optional parameters."""
+
+    @abc.abstractmethod
+    def check(self, *args, **kwargs):
+        """Check if condition holds for a given field."""
+
+    def _check_with_loop(self, task, condition, inventory, plugin_data):
+        loop_items = condition.get('loop', [])
+        multiple = condition.get('multiple', 'any')
+        results = []
+
+        if isinstance(loop_items, (list, dict)):
+            for item in loop_items:
+                condition_copy = condition.copy()
+                condition_copy['args'] = item
+                result = self._check_condition(task, condition_copy,
+                                               inventory, plugin_data)
+                results.append(result)
+
+                if multiple == 'first' and result:
+                    return True
+                elif multiple == 'last':
+                    results = [result]
+
+            if multiple == 'any':
+                return any(results)
+            elif multiple == 'all':
+                return all(results)
+            return results[0] if results else False
+        return self._check_condition(task, condition, inventory, plugin_data)
+
+    def _check_condition(self, task, condition, inventory, plugin_data):
+        """Process condition arguments and apply the check logic.
+
+        :param task: TaskManger instance
+        :param condition: condition to check
+        :param args: parameters as a dictionary, changing it here will change
+                     what will be stored in database
+        :param kwargs: used for extensibility without breaking existing plugins
+        :raises ValueError: on unacceptable field value
+        :returns: True if check succeeded, otherwise False
+        """
+        op, is_inverted = common_utils.parse_inverted_operator(
+            condition['op'])
+
+        processed_args = self._process_args(task, condition, inventory,
+                                            plugin_data)
+        arg_values = [processed_args[arg_name]
+                      for arg_name in self.get_arg_names()]
+
+        for optional_arg in self.OPTIONAL_ARGS:
+            arg_values.append(processed_args.get(optional_arg, False))
+
+        result = self.check(*arg_values)
+        return not result if is_inverted else result
+
+
+class SimpleOperator(OperatorBase):
+
+    op = None
+    OPTIONAL_ARGS = {'force_strings'}
+
+    @classmethod
+    def get_arg_names(cls):
+        return ['values']
+
+    def check(self, values, force_strings=False):
+        if force_strings:
+            values = [coerce(value, str) for value in values]
+        return self.op(values)
+
+
+class EqOperator(SimpleOperator):
+    op = operator.eq
+
+
+class LtOperator(SimpleOperator):
+    op = operator.lt
+
+
+class GtOperator(SimpleOperator):
+    op = operator.gt
+
+
+class EmptyOperator(OperatorBase):
+    FORMATTED_ARGS = ['value']
+
+    @classmethod
+    def get_arg_names(cls):
+        return ['value']
+
+    def check(self, value):
+        return str(value) in ("", 'None', '[]', '{}')
+
+
+class NetOperator(OperatorBase):
+    FORMATTED_ARGS = ['address', 'subnet']
+
+    @classmethod
+    def get_arg_names(cls):
+        return ['address', 'subnet']
+
+    def validate(self, address, subnet):
+        try:
+            netaddr.IPNetwork(subnet)
+        except netaddr.AddrFormatError as exc:
+            LOG.error(_('invalid value: %s'), exc)
+
+    def check(self, address, subnet):
+        network = netaddr.IPNetwork(subnet)
+        return netaddr.IPAddress(address) in network
+
+
+class IsTrueOperator(OperatorBase):
+    FORMATTED_ARGS = ['value']
+
+    @classmethod
+    def get_arg_names(cls):
+        return ['value']
+
+    def check(self, value):
+        if isinstance(value, bool):
+            return value
+        if isinstance(value, (int, float)):
+            return bool(value)
+        if isinstance(value, str):
+            return value.lower() in ('yes', 'true')
+        return False
+
+
+class IsFalseOperator(OperatorBase):
+    FORMATTED_ARGS = ['value']
+
+    @classmethod
+    def get_arg_names(cls):
+        return ['value']
+
+    def check(self, value):
+        if isinstance(value, bool):
+            return not value
+        if isinstance(value, (int, float)):
+            return not bool(value)
+        if isinstance(value, str):
+            return value.lower() in ('no', 'false')
+        return value is None
+
+
+class IsNoneOperator(OperatorBase):
+    FORMATTED_ARGS = ['value']
+
+    @classmethod
+    def get_arg_names(cls):
+        return ['value']
+
+    def check(self, value):
+        return str(value) == 'None'
+
+
+class OneOfOperator(OperatorBase):
+    FORMATTED_ARGS = ['value']
+
+    @classmethod
+    def get_arg_names(cls):
+        return ['value', 'values']
+
+    def check(self, value, values=[]):
+        return value in values
+
+
+class ReOperator(OperatorBase):
+    FORMATTED_ARGS = ['value']
+
+    @classmethod
+    def get_arg_names(cls):
+        return ['value', 'regex']
+
+    def validate_regex(self, regex):
+        try:
+            re.compile(regex)
+        except re.error as exc:
+            raise ValueError(_('invalid regular expression: %s') % exc)
+
+
+class MatchesOperator(ReOperator):
+
+    def check(self, value, regex):
+        self.validate_regex(regex)
+        if regex[-1] != '$':
+            regex += '$'
+        return re.match(regex, str(value)) is not None
+
+
+class ContainsOperator(ReOperator):
+
+    def check(self, value, regex):
+        self.validate_regex(regex)
+        return re.search(regex, str(value)) is not None
diff --git a/ironic/common/inspection_rules/validation.py b/ironic/common/inspection_rules/validation.py
new file mode 100644
index 0000000000..42b210d2db
--- /dev/null
+++ b/ironic/common/inspection_rules/validation.py
@@ -0,0 +1,189 @@
+# Copyright 2013 Red Hat, Inc.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import enum
+
+import jsonschema
+
+from ironic.common import args
+from ironic.common import exception
+from ironic.common.i18n import _
+from ironic.common.inspection_rules import actions
+from ironic.common.inspection_rules import operators
+from ironic.common import utils as common_utils
+
+
+_CONDITIONS_SCHEMA = None
+_ACTIONS_SCHEMA = None
+
+
+class InspectionPhase(enum.Enum):
+    MAIN = 'main'
+
+
+def conditions_schema():
+    global _CONDITIONS_SCHEMA
+    if _CONDITIONS_SCHEMA is None:
+        condition_plugins = list(operators.OPERATORS.keys())
+        condition_plugins.extend(
+            ["!%s" % op for op in list(condition_plugins)])
+        _CONDITIONS_SCHEMA = {
+            "title": "Inspection rule conditions schema",
+            "type": "array",
+            "minItems": 0,
+            "items": {
+                "type": "object",
+                "required": ["op", "args"],
+                "properties": {
+                    "op": {
+                        "description": "Condition operator",
+                        "enum": condition_plugins
+                    },
+                    "args": {
+                        "description": "Arguments for the condition",
+                        "type": ["array", "object"]
+                    },
+                    "multiple": {
+                        "description": "How to treat multiple values",
+                        "enum": ["any", "all", "first", "last"]
+                    },
+                    "loop": {
+                        "description": "Loop behavior for conditions",
+                        "type": ["array", "object"]
+                    },
+                },
+                # other properties are validated by plugins
+                "additionalProperties": True
+            }
+        }
+
+    return _CONDITIONS_SCHEMA
+
+
+def actions_schema():
+    global _ACTIONS_SCHEMA
+    if _ACTIONS_SCHEMA is None:
+        action_plugins = list(actions.ACTIONS.keys())
+        _ACTIONS_SCHEMA = {
+            "title": "Inspection rule actions schema",
+            "type": "array",
+            "minItems": 1,
+            "items": {
+                "type": "object",
+                "required": ["op", "args"],
+                "properties": {
+                    "op": {
+                        "description": "action operator",
+                        "enum": action_plugins
+                    },
+                    "args": {
+                        "description": "Arguments for the action",
+                        "type": ["array", "object"]
+                    },
+                    "loop": {
+                        "description": "Loop behavior for actions",
+                        "type": ["array", "object"]
+                    },
+                },
+                "additionalProperties": True
+            }
+        }
+
+    return _ACTIONS_SCHEMA
+
+
+SCHEMA = {
+    'type': 'object',
+    'properties': {
+        'uuid': {'type': ['string', 'null']},
+        'priority': {'type': 'integer', "minimum": 0},
+        'description': {'type': ['string', 'null'], 'maxLength': 255},
+        'sensitive': {'type': ['boolean', 'null']},
+        'phase': {'type': ['string', 'null'], 'maxLength': 16},
+        "conditions": conditions_schema(),
+        "actions": actions_schema()
+    },
+    'required': ['actions'],
+    "additionalProperties": False
+}
+
+VALIDATOR = args.and_valid(
+    args.schema(SCHEMA),
+    args.dict_valid(uuid=args.uuid)
+)
+
+
+def validate_inspection_rule(rule):
+    """Validate an inspection rule using the JSON schema.
+
+    :param rule: The inspection rule to validate.
+    :raises: Invalid if the rule is invalid.
+    """
+    if not rule.get('conditions'):
+        rule['conditions'] = []
+
+    errors = []
+    try:
+        jsonschema.validate(rule, SCHEMA)
+    except jsonschema.ValidationError as e:
+        errors.append(_('Validation failed for inspection rule: %s') % e)
+
+    phase = rule.get('phase', InspectionPhase.MAIN.value)
+    if phase not in (p.value for p in InspectionPhase):
+        errors.append(
+            _('Invalid phase: %(phase)s. Valid phases are: %(valid)s') % {
+                'phase': phase, 'valid': ', '.join(
+                    [p.value for p in InspectionPhase])
+            })
+
+    priority = rule.get('priority', 0)
+    if priority < 0 and not rule['built_in']:
+        errors.append(
+            _("Priority cannot be negative for user-defined rules."))
+    if priority > 9999 and not rule['built_in']:
+        errors.append(
+            _("Priority must be between 0 and 9999 for user-defined rules."))
+
+    # Additional plugin-specific validation
+    for condition in rule.get('conditions', []):
+        op, invtd = common_utils.parse_inverted_operator(
+            condition['op'])
+        plugin = operators.get_operator(op)
+        if not plugin or not callable(plugin):
+            errors.append(
+                _('Unsupported condition operator: %s') % op)
+        try:
+            plugin().validate(**condition)
+        except ValueError as exc:
+            errors.append(_('Invalid parameters for condition operator '
+                            '%(op)s: %(error)s') % {'op': op,
+                                                    'error': exc})
+
+    for action in rule['actions']:
+        plugin = actions.get_action(action['op'])
+        if not plugin or not callable(plugin):
+            errors.append(_('Unsupported action operator: %s') % action['op'])
+        try:
+            plugin().validate(**action)
+        except ValueError as exc:
+            errors.append(_('Invalid parameters for action operator %(op)s: '
+                            '%(error)s') % {'op': action['op'], 'error': exc})
+
+    if errors:
+        if len(errors) == 1:
+            raise exception.Invalid(errors[0])
+        else:
+            raise exception.Invalid(_('Multiple validation errors occurred: '
+                                      '%s') % '; '.join(errors))
diff --git a/ironic/common/utils.py b/ironic/common/utils.py
index b126668558..42113fba5a 100644
--- a/ironic/common/utils.py
+++ b/ironic/common/utils.py
@@ -1147,3 +1147,18 @@ def get_route_source(dest, ignore_link_local=True):
     except (IndexError, ValueError):
         LOG.debug('No route to host %(dest)s, route record: %(rec)s',
                   {'dest': dest, 'rec': out})
+
+
+def parse_inverted_operator(op):
+    """Handle inverted operators."""
+    op = op.strip()
+    if op.count('!') > 1:
+        msg = _("Multiple exclamation marks are not allowed. "
+                "To apply the invert of an operation, simply put an "
+                "exclamation mark (with an optional space) before "
+                "the op, e.g. eq - !eq.")
+        raise ValueError(msg)
+
+    is_inverted = op.startswith('!')
+    op = op.lstrip('!').strip()
+    return op, is_inverted
diff --git a/ironic/conductor/inspection.py b/ironic/conductor/inspection.py
index fb8f7f5d55..f3abf454a4 100644
--- a/ironic/conductor/inspection.py
+++ b/ironic/conductor/inspection.py
@@ -17,6 +17,7 @@ from oslo_utils import excutils
 
 from ironic.common import exception
 from ironic.common.i18n import _
+from ironic.common.inspection_rules import engine
 from ironic.common import states
 from ironic.conductor import task_manager
 from ironic.conductor import utils
@@ -66,7 +67,7 @@ def inspect_hardware(task):
                  {'node': node.uuid})
     elif new_state == states.INSPECTWAIT:
         task.process_event('wait')
-        LOG.info('Successfully started introspection on node %(node)s',
+        LOG.info('Successfully started inspection on node %(node)s',
                  {'node': node.uuid})
     else:
         error = (_("During inspection, driver returned unexpected "
@@ -131,6 +132,10 @@ def continue_inspection(task, inventory, plugin_data):
                       'asynchronously for node %s', node.uuid)
             return
 
+        result = engine.apply_rules(task, inventory, plugin_data, 'main')
+        if result and 'plugin_data' in result:
+            plugin_data = result['plugin_data']
+
         # NOTE(dtantsur): logs can be huge and are stored separately
         plugin_data.pop('logs', None)
 
diff --git a/ironic/conf/inspector.py b/ironic/conf/inspector.py
index 4b33607cec..aa0115b589 100644
--- a/ironic/conf/inspector.py
+++ b/ironic/conf/inspector.py
@@ -155,6 +155,10 @@ discovery_opts = [
                mutable=True,
                help=_("The default driver to use for newly enrolled nodes. "
                       "Must be set when enabling auto-discovery.")),
+    cfg.StrOpt('inspection_scope',
+               default=None,
+               help=_("The default inspection scope for nodes enrolled via "
+                      "auto-discovery.")),
 ]
 
 pxe_filter_opts = [
@@ -174,11 +178,23 @@ pxe_filter_opts = [
                       "of dnsmasq with the database.")),
 ]
 
+inspection_rule_opts = [
+    cfg.StrOpt('built_in_rules',
+               mutable=True,
+               help=_("Path to YAML file of built-in inspection rules.")),
+    cfg.StrOpt('mask_secrets',
+               default='always',
+               choices=['always', 'never', 'sensitive'],
+               help=_("Whether to mask secrets in the node information "
+                      "passed to the rules."))
+]
+
 
 def register_opts(conf):
     conf.register_opts(opts, group='inspector')
     conf.register_opts(discovery_opts, group='auto_discovery')
     conf.register_opts(pxe_filter_opts, group='pxe_filter')
+    conf.register_opts(inspection_rule_opts, group='inspection_rules')
     auth.register_auth_opts(conf, 'inspector',
                             service_type='baremetal-introspection')
 
diff --git a/requirements.txt b/requirements.txt
index 2f3705f9bb..799bc23683 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -47,3 +47,4 @@ microversion-parse>=1.0.1 # Apache-2.0
 zeroconf>=0.24.0 # LGPL
 os-service-types>=1.7.0 # Apache-2.0
 bcrypt>=3.1.3 # Apache-2.0
+PyYAML>=6.0.2 # MIT