Add allowlist and denylist for action providers and actions

This adds allowlist and denylist ListOpt config options for
filtering what action providers is loaded

Also adds allowlist and denylist ListOpt config options for
filtering what actions is loaded by the legacy action provider.

This is used in a security context where we need to restrict
a environment to only load actions using the legacy action
provider and then filter what actions is explicitly allowed
or not allowed to be used.

Change-Id: I57cd468cba227cc8cc7960e84e9a111217e80cad
This commit is contained in:
Tobias Urdin 2025-03-11 10:54:21 +01:00
parent e9079fa11e
commit 11785d1e31
6 changed files with 205 additions and 0 deletions

View File

@ -99,7 +99,17 @@ class LegacyActionProvider(ml_actions.ActionProvider):
invoke_on_load=False
)
allowlist = CONF.legacy_action_provider.allowlist
denylist = CONF.legacy_action_provider.denylist
for action_name in ext_mgr.names():
if allowlist:
if action_name not in allowlist:
continue
elif denylist:
if action_name in denylist:
continue
action_cls = ext_mgr[action_name].plugin
if CONF.legacy_action_provider.only_builtin_actions:

View File

@ -64,6 +64,29 @@ auth_type_opt = cfg.StrOpt(
help=_('Authentication type (valid options: keystone, keycloak-oidc)')
)
action_providers_opts = [
cfg.ListOpt(
'allowlist',
default=[],
help=_(
'Allowlist with action providers that is allowed to be '
'loaded from the entry point "mistral.action.providers", '
'if empty all action providers will be allowed unless '
'denylist is set.'
),
),
cfg.ListOpt(
'denylist',
default=[],
help=_(
'Denylist with action providers that is not allowed to '
'be loaded from the entry point "mistral.action.providers", '
'allowlist takes precendence, if empty all action providers '
'will be allowed.'
),
),
]
legacy_action_provider_opts = [
cfg.BoolOpt(
'load_action_plugins',
@ -91,6 +114,25 @@ legacy_action_provider_opts = [
'This property is needed mostly for testing.'
)
),
cfg.ListOpt(
'allowlist',
default=[],
help=_(
'Allowlist with actions that is allowed to be '
'loaded from the entry point "mistral.actions", '
'if empty all actions will be allowed.'
),
),
cfg.ListOpt(
'denylist',
default=[],
help=_(
'Denylist with actions that is not allowed to '
'be loaded from the entry point "mistral.actions", '
'allowlist takes precedence, if empty all actions '
'will be allowed.'
),
),
]
api_opts = [
@ -755,6 +797,7 @@ healthcheck_opts = [
CONF = cfg.CONF
ACTION_PROVIDERS_GROUP = 'action_providers'
LEGACY_ACTION_PROVIDER_GROUP = 'legacy_action_provider'
API_GROUP = 'api'
ENGINE_GROUP = 'engine'
@ -785,6 +828,7 @@ CONF.register_opt(oslo_rpc_executor)
CONF.register_opt(expiration_token_duration)
CONF.register_opts(service_opts.service_opts)
CONF.register_opts(action_providers_opts, group=ACTION_PROVIDERS_GROUP)
CONF.register_opts(
legacy_action_provider_opts,
group=LEGACY_ACTION_PROVIDER_GROUP
@ -842,6 +886,7 @@ _DEFAULT_LOG_LEVELS = [
def list_opts():
return [
(ACTION_PROVIDERS_GROUP, action_providers_opts),
(API_GROUP, api_opts),
(ENGINE_GROUP, engine_opts),
(EXECUTOR_GROUP, executor_opts),

View File

@ -18,6 +18,7 @@ collection of functions for accessing information about actions
available in the system.
"""
from oslo_config import cfg
from oslo_log import log as logging
from stevedore import extension
@ -39,7 +40,17 @@ def _get_registered_providers():
invoke_on_load=False
)
allowlist = cfg.CONF.action_providers.allowlist
denylist = cfg.CONF.action_providers.denylist
for provider_name in mgr.names():
if allowlist:
if provider_name not in allowlist:
continue
elif denylist:
if provider_name in denylist:
continue
provider_cls = mgr[provider_name].plugin
try:

View File

@ -202,3 +202,61 @@ class LegacyActionProviderTest(base.BaseTest):
)
self.assertEqual('Goodbye, Lieutenant Dan!', goodbye_action.run(None))
def test_allowlist(self):
self.override_config(
'load_action_generators',
False,
'legacy_action_provider'
)
self.override_config(
'allowlist', ['std.noop'],
'legacy_action_provider'
)
provider = legacy.LegacyActionProvider()
action_descs = provider.find_all()
self.assertEqual(1, len(action_descs))
self.assertIsNotNone(provider.find('std.noop'))
def test_denylist(self):
self.override_config(
'load_action_generators',
False,
'legacy_action_provider'
)
self.override_config(
'denylist', ['std.noop'],
'legacy_action_provider'
)
provider = legacy.LegacyActionProvider()
action_descs = provider.find_all()
self.assertTrue(
all(
[
a_d.name != 'std.noop'
for a_d in action_descs
]
)
)
def test_allowlist_and_denylist(self):
self.override_config(
'load_action_generators',
False,
'legacy_action_provider'
)
self.override_config(
'allowlist', ['std.noop'],
'legacy_action_provider'
)
self.override_config(
'denylist', ['std.fail'],
'legacy_action_provider'
)
provider = legacy.LegacyActionProvider()
action_descs = provider.find_all()
self.assertEqual(1, len(action_descs))
self.assertIsNotNone(provider.find('std.noop'))

View File

@ -0,0 +1,71 @@
# Copyright 2025 Binero.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest import mock
from mistral import config
from mistral.services.actions import _get_registered_providers
from mistral.tests.unit import base
class FakeExtManager:
def __init__(self):
self.plugins = ['adhoc', 'dynamic', 'legacy']
def names(self):
return self.plugins
class ActionsTest(base.DbTestCase):
def setUp(self):
super(ActionsTest, self).setUp()
@mock.patch('stevedore.enabled.ExtensionManager')
def test_get_registered_providers(self, mock_ext_mgr):
mock_ext_mgr.return_value = FakeExtManager()
providers = _get_registered_providers()
self.assertEqual(3, len(providers))
self.assertEqual('adhoc', providers[0].name)
self.assertEqual('dynamic', providers[1].name)
self.assertEqual('legacy', providers[2].name)
@mock.patch('stevedore.enabled.ExtensionManager')
def test_get_registered_providers_allowlist(self, mock_ext_mgr):
self.override_config(
'allowlist', ['dynamic'], config.ACTION_PROVIDERS_GROUP)
mock_ext_mgr.return_value = FakeExtManager()
providers = _get_registered_providers()
self.assertEqual(1, len(providers))
self.assertEqual('dynamic', providers[0].name)
@mock.patch('stevedore.enabled.ExtensionManager')
def test_get_registered_providers_denylist(self, mock_ext_mgr):
self.override_config(
'denylist', ['legacy'], config.ACTION_PROVIDERS_GROUP)
mock_ext_mgr.return_value = FakeExtManager()
providers = _get_registered_providers()
self.assertEqual(2, len(providers))
self.assertEqual('adhoc', providers[0].name)
self.assertEqual('dynamic', providers[1].name)
@mock.patch('stevedore.enabled.ExtensionManager')
def test_get_registered_providers_allow_and_deny(self, mock_ext_mgr):
self.override_config(
'allowlist', ['adhoc'], group=config.ACTION_PROVIDERS_GROUP)
self.override_config(
'denylist', ['dynamic'], group=config.ACTION_PROVIDERS_GROUP)
mock_ext_mgr.return_value = FakeExtManager()
providers = _get_registered_providers()
self.assertEqual(1, len(providers))
self.assertEqual('adhoc', providers[0].name)

View File

@ -0,0 +1,10 @@
---
features:
- |
Added mutually exclusive config options ``[action_providers]/allowlist``
and ``[action_providers]/denylist`` that can be used to filter what action
providers is loaded.
- |
Added mutually exclusive config options ``[legacy_action_provider]/allowlist``
and ``[legacy_action_provider]/denylist`` that can be used to filter what
actions is loaded by the legacy action provider.