Clean-up: Inspection Rules Testing
Change-Id: I72fd942b72b3e89c9688b6b19614b0bd7a67d0e7
This commit is contained in:
parent
3f387f3f3d
commit
910ee2e38c
@ -56,10 +56,6 @@ class ShallowMaskList(abc.MutableSequence):
|
||||
def __delitem__(self, index):
|
||||
del self._original[index]
|
||||
|
||||
def __iter__(self):
|
||||
for item in self._original:
|
||||
yield self._mask_value(item)
|
||||
|
||||
def __len__(self):
|
||||
return len(self._original)
|
||||
|
||||
|
@ -23,10 +23,9 @@ from ironic.tests.unit.db import base as db_base
|
||||
from ironic.tests.unit.objects import utils as obj_utils
|
||||
|
||||
|
||||
class TestApplyRules(db_base.DbTestCase):
|
||||
|
||||
class TestInspectionRules(db_base.DbTestCase):
|
||||
def setUp(self):
|
||||
super(TestApplyRules, self).setUp()
|
||||
super(TestInspectionRules, self).setUp()
|
||||
self.node = obj_utils.create_test_node(self.context,
|
||||
driver='fake-hardware')
|
||||
self.sensitive_fields = ['password', 'auth_token', 'bmc_password']
|
||||
@ -66,121 +65,18 @@ class TestApplyRules(db_base.DbTestCase):
|
||||
self.sensitive_rule = obj_utils.create_test_inspection_rule(
|
||||
self.context, sensitive=True)
|
||||
|
||||
def test_set_mask_enabled(self):
|
||||
"""Test that set_mask_enabled properly toggles masking."""
|
||||
masked_dict = utils.ShallowMaskDict(
|
||||
self.test_data, sensitive_fields=self.sensitive_fields,
|
||||
mask_enabled=True)
|
||||
|
||||
self.assertEqual('***', masked_dict['password'])
|
||||
|
||||
masked_dict.set_mask_enabled(False)
|
||||
self.assertEqual('secret123', masked_dict['password'])
|
||||
|
||||
masked_dict.set_mask_enabled(True)
|
||||
self.assertEqual('***', masked_dict['password'])
|
||||
|
||||
def test_getitem_masked(self):
|
||||
"""Test that __getitem__ masks sensitive fields."""
|
||||
masked_dict = utils.ShallowMaskDict(
|
||||
self.test_data, sensitive_fields=self.sensitive_fields,
|
||||
mask_enabled=True)
|
||||
|
||||
self.assertEqual('***', masked_dict['password'])
|
||||
self.assertEqual('***', masked_dict['auth_token'])
|
||||
|
||||
self.assertEqual('testuser', masked_dict['username'])
|
||||
|
||||
def test_getitem_not_masked(self):
|
||||
"""Test that __getitem__ doesn't mask when disabled."""
|
||||
masked_dict = utils.ShallowMaskDict(
|
||||
self.test_data, sensitive_fields=self.sensitive_fields,
|
||||
mask_enabled=False)
|
||||
|
||||
self.assertEqual('secret123', masked_dict['password'])
|
||||
self.assertEqual('abc123token', masked_dict['auth_token'])
|
||||
|
||||
def test_nested_dict_masking(self):
|
||||
"""Test that nested dictionaries are properly masked."""
|
||||
masked_dict = utils.ShallowMaskDict(
|
||||
self.test_data, sensitive_fields=self.sensitive_fields,
|
||||
mask_enabled=True)
|
||||
|
||||
nested = masked_dict['nested']
|
||||
self.assertIsInstance(nested, utils.ShallowMaskDict)
|
||||
self.assertEqual('***', nested['password'])
|
||||
|
||||
self.assertEqual('value', nested['normal'])
|
||||
|
||||
def test_list_masking(self):
|
||||
"""Test that lists containing dictionaries are properly masked."""
|
||||
masked_dict = utils.ShallowMaskDict(
|
||||
self.test_data, sensitive_fields=self.sensitive_fields,
|
||||
mask_enabled=True)
|
||||
|
||||
list_data = masked_dict['list_data']
|
||||
self.assertEqual('***', list_data[0]['password'])
|
||||
|
||||
self.assertEqual('item1', list_data[0]['name'])
|
||||
self.assertEqual('value2', list_data[1]['normal'])
|
||||
|
||||
def test_items_masked(self):
|
||||
"""Test that items() method returns masked values."""
|
||||
masked_dict = utils.ShallowMaskDict(
|
||||
self.test_data, sensitive_fields=self.sensitive_fields,
|
||||
mask_enabled=True)
|
||||
|
||||
items = dict(masked_dict.items())
|
||||
|
||||
self.assertEqual('***', items['password'])
|
||||
self.assertEqual('***', items['auth_token'])
|
||||
self.assertEqual('***', items['nested']['password'])
|
||||
|
||||
def test_values_masked(self):
|
||||
"""Test that values() method masks sensitive values."""
|
||||
test_data = {'username': 'user', 'password': 'secret'}
|
||||
masked_dict = utils.ShallowMaskDict(
|
||||
test_data, sensitive_fields=self.sensitive_fields,
|
||||
mask_enabled=True)
|
||||
|
||||
values = list(masked_dict.values())
|
||||
|
||||
self.assertIn('user', values)
|
||||
self.assertIn('***', values)
|
||||
self.assertNotIn('secret', values)
|
||||
|
||||
def test_get_method_masked(self):
|
||||
"""Test that the get() method properly masks sensitive fields."""
|
||||
masked_dict = utils.ShallowMaskDict(
|
||||
self.test_data, sensitive_fields=self.sensitive_fields,
|
||||
mask_enabled=True)
|
||||
|
||||
self.assertEqual('***', masked_dict.get('password'))
|
||||
self.assertEqual('testuser', masked_dict.get('username'))
|
||||
|
||||
# Non-existent field should return default
|
||||
self.assertIsNone(masked_dict.get('nonexistent'))
|
||||
self.assertEqual('default', masked_dict.get('nonexistent', 'default'))
|
||||
|
||||
def test_modifying_dict(self):
|
||||
"""Test that modifications affect the original data."""
|
||||
original_data = {'username': 'user', 'data': [1, 2, 3]}
|
||||
masked_dict = utils.ShallowMaskDict(
|
||||
original_data, sensitive_fields=self.sensitive_fields,
|
||||
mask_enabled=True)
|
||||
|
||||
masked_dict['new_key'] = 'new_value'
|
||||
masked_dict['data'].append(4)
|
||||
|
||||
self.assertEqual('new_value', original_data['new_key'])
|
||||
self.assertEqual([1, 2, 3, 4], original_data['data'])
|
||||
@mock.patch('ironic.objects.InspectionRule.list', autospec=True)
|
||||
class TestApplyRules(TestInspectionRules):
|
||||
def setUp(self):
|
||||
super(TestApplyRules, self).setUp()
|
||||
|
||||
@mock.patch.object(engine, 'get_built_in_rules', autospec=True)
|
||||
@mock.patch.object(engine, 'check_conditions', autospec=True)
|
||||
@mock.patch.object(engine, 'apply_actions', autospec=True)
|
||||
@mock.patch('ironic.objects.InspectionRule.list', autospec=True)
|
||||
def test_apply_rules_no_rules(self, mock_list, mock_apply_actions,
|
||||
mock_check_conditions, mock_get_built_in):
|
||||
def test_apply_rules_no_rules(self, mock_apply_actions,
|
||||
mock_check_conditions, mock_get_built_in,
|
||||
mock_list):
|
||||
mock_list.return_value = []
|
||||
mock_get_built_in.return_value = []
|
||||
|
||||
@ -200,9 +96,9 @@ class TestApplyRules(db_base.DbTestCase):
|
||||
@mock.patch.object(engine, 'get_built_in_rules', autospec=True)
|
||||
@mock.patch.object(engine, 'check_conditions', autospec=True)
|
||||
@mock.patch.object(engine, 'apply_actions', autospec=True)
|
||||
@mock.patch('ironic.objects.InspectionRule.list', autospec=True)
|
||||
def test_apply_rules_success(self, mock_list, mock_apply_actions,
|
||||
mock_check_conditions, mock_get_built_in):
|
||||
def test_apply_rules_success(self, mock_apply_actions,
|
||||
mock_check_conditions, mock_get_built_in,
|
||||
mock_list):
|
||||
rule1 = {'uuid': 'rule-1', 'priority': 100, 'conditions': [],
|
||||
'actions': [{'op': 'set-attribute',
|
||||
'args': {'path': 'a', 'value': 'b'}}]}
|
||||
@ -217,18 +113,21 @@ class TestApplyRules(db_base.DbTestCase):
|
||||
with task_manager.acquire(self.context, self.node.uuid) as task:
|
||||
engine.apply_rules(task, self.inventory, self.plugin_data, 'main')
|
||||
|
||||
mock_get_built_in.assert_called_once()
|
||||
mock_list.assert_called_once_with(
|
||||
context=self.context,
|
||||
filters={'phase': 'main'})
|
||||
|
||||
mock_get_built_in.assert_called_once()
|
||||
self.assertEqual(2, mock_check_conditions.call_count)
|
||||
self.assertEqual(2, mock_apply_actions.call_count)
|
||||
|
||||
@mock.patch.object(engine, 'get_built_in_rules', autospec=True)
|
||||
@mock.patch.object(engine, 'check_conditions', autospec=True)
|
||||
@mock.patch.object(engine, 'apply_actions', autospec=True)
|
||||
@mock.patch('ironic.objects.InspectionRule.list', autospec=True)
|
||||
def test_apply_rules_condition_false(self, mock_list, mock_apply_actions,
|
||||
mock_check_conditions,
|
||||
mock_get_built_in):
|
||||
def test_apply_rules_some_conditions_pass(self, mock_apply_actions,
|
||||
mock_check_conditions,
|
||||
mock_get_built_in,
|
||||
mock_list):
|
||||
"""Test that rules are skipped when conditions don't match."""
|
||||
mock_list.return_value = [self.rule1]
|
||||
mock_get_built_in.return_value = [self.rule2]
|
||||
@ -239,17 +138,35 @@ class TestApplyRules(db_base.DbTestCase):
|
||||
engine.apply_rules(task, self.inventory, self.plugin_data, 'main')
|
||||
|
||||
self.assertEqual(2, mock_check_conditions.call_count)
|
||||
mock_apply_actions.assert_called_once()
|
||||
|
||||
@mock.patch.object(engine, 'get_built_in_rules', autospec=True)
|
||||
@mock.patch.object(engine, 'check_conditions', autospec=True)
|
||||
@mock.patch.object(engine, 'apply_actions', autospec=True)
|
||||
def test_apply_rules_all_conditions_fail(self, mock_apply_actions,
|
||||
mock_check_conditions,
|
||||
mock_get_built_in,
|
||||
mock_list):
|
||||
"""Test that rules are skipped when conditions don't match."""
|
||||
mock_list.return_value = [self.rule1]
|
||||
mock_get_built_in.return_value = [self.rule2]
|
||||
|
||||
mock_check_conditions.side_effect = [False, False]
|
||||
|
||||
with task_manager.acquire(self.context, self.node.uuid) as task:
|
||||
engine.apply_rules(task, self.inventory, self.plugin_data, 'main')
|
||||
|
||||
self.assertEqual(2, mock_check_conditions.call_count)
|
||||
mock_apply_actions.assert_not_called()
|
||||
|
||||
@mock.patch.object(engine, 'LOG', autospec=True)
|
||||
@mock.patch.object(engine, 'get_built_in_rules', autospec=True)
|
||||
@mock.patch.object(engine, 'check_conditions', autospec=True)
|
||||
@mock.patch.object(engine, 'apply_actions', autospec=True)
|
||||
@mock.patch('ironic.objects.InspectionRule.list', autospec=True)
|
||||
def test_apply_rules_ironic_exception(self, mock_list,
|
||||
mock_apply_actions,
|
||||
def test_apply_rules_ironic_exception(self, mock_apply_actions,
|
||||
mock_check_conditions,
|
||||
mock_get_built_in,
|
||||
mock_log):
|
||||
mock_log, mock_list):
|
||||
"""Test that IronicException is re-raised."""
|
||||
mock_list.return_value = [self.rule1, self.rule2]
|
||||
mock_get_built_in.return_value = []
|
||||
@ -273,11 +190,10 @@ class TestApplyRules(db_base.DbTestCase):
|
||||
@mock.patch.object(engine, 'get_built_in_rules', autospec=True)
|
||||
@mock.patch.object(engine, 'check_conditions', autospec=True)
|
||||
@mock.patch.object(engine, 'apply_actions', autospec=True)
|
||||
@mock.patch('ironic.objects.InspectionRule.list', autospec=True)
|
||||
def test_apply_rules_with_always_mask(self, mock_list, mock_apply_actions,
|
||||
def test_apply_rules_with_always_mask(self, mock_apply_actions,
|
||||
mock_check_conditions,
|
||||
mock_get_built_in,
|
||||
mock_masked_dict):
|
||||
mock_masked_dict, mock_list):
|
||||
"""Test apply_rules with mask_secrets='always'."""
|
||||
self.config(mask_secrets='always', group='inspection_rules')
|
||||
|
||||
@ -312,10 +228,10 @@ class TestApplyRules(db_base.DbTestCase):
|
||||
@mock.patch.object(engine, 'get_built_in_rules', autospec=True)
|
||||
@mock.patch.object(engine, 'check_conditions', autospec=True)
|
||||
@mock.patch.object(engine, 'apply_actions', autospec=True)
|
||||
@mock.patch('ironic.objects.InspectionRule.list', autospec=True)
|
||||
def test_apply_rules_with_never_mask(self, mock_list, mock_apply_actions,
|
||||
def test_apply_rules_with_never_mask(self, mock_apply_actions,
|
||||
mock_check_conditions,
|
||||
mock_get_built_in, mock_masked_dict):
|
||||
mock_get_built_in, mock_masked_dict,
|
||||
mock_list):
|
||||
"""Test apply_rules with mask_secrets='never'."""
|
||||
self.config(mask_secrets='never', group='inspection_rules')
|
||||
|
||||
@ -350,12 +266,10 @@ class TestApplyRules(db_base.DbTestCase):
|
||||
@mock.patch.object(engine, 'get_built_in_rules', autospec=True)
|
||||
@mock.patch.object(engine, 'check_conditions', autospec=True)
|
||||
@mock.patch.object(engine, 'apply_actions', autospec=True)
|
||||
@mock.patch('ironic.objects.InspectionRule.list', autospec=True)
|
||||
def test_apply_rules_with_sensitive_mask(self, mock_list,
|
||||
mock_apply_actions,
|
||||
def test_apply_rules_with_sensitive_mask(self, mock_apply_actions,
|
||||
mock_check_conditions,
|
||||
mock_get_built_in,
|
||||
mock_masked_dict):
|
||||
mock_masked_dict, mock_list):
|
||||
"""Test apply_rules with mask_secrets='sensitive'."""
|
||||
self.config(mask_secrets='sensitive', group='inspection_rules')
|
||||
|
||||
@ -391,11 +305,9 @@ class TestApplyRules(db_base.DbTestCase):
|
||||
])
|
||||
|
||||
|
||||
class TestOperators(db_base.DbTestCase):
|
||||
class TestOperators(TestInspectionRules):
|
||||
def setUp(self):
|
||||
super(TestOperators, self).setUp()
|
||||
self.node = obj_utils.create_test_node(self.context,
|
||||
driver='fake-hardware')
|
||||
|
||||
def test_operator_exceptions(self):
|
||||
"""Test that operators raise proper exceptions for invalid inputs."""
|
||||
@ -560,12 +472,10 @@ class TestOperators(db_base.DbTestCase):
|
||||
self.assertFalse(result)
|
||||
|
||||
|
||||
class TestActions(db_base.DbTestCase):
|
||||
class TestActions(TestInspectionRules):
|
||||
"""Test inspection rule actions"""
|
||||
def setUp(self):
|
||||
super(TestActions, self).setUp()
|
||||
self.node = obj_utils.create_test_node(self.context,
|
||||
driver='fake-hardware')
|
||||
|
||||
@mock.patch.object(inspection_rules.actions.LOG, 'info', autospec=True)
|
||||
def test_log_action(self, mock_log):
|
||||
@ -782,3 +692,117 @@ class TestActions(db_base.DbTestCase):
|
||||
self.assertEqual(2, len(results))
|
||||
self.assertEqual('value1', task.node.extra['test1'])
|
||||
self.assertEqual('value2', task.node.extra['test2'])
|
||||
|
||||
|
||||
class TestShallowMask(TestInspectionRules):
|
||||
def setUp(self):
|
||||
super(TestShallowMask, self).setUp()
|
||||
|
||||
def test_set_mask_enabled(self):
|
||||
"""Test that set_mask_enabled properly toggles masking."""
|
||||
masked_dict = utils.ShallowMaskDict(
|
||||
self.test_data, sensitive_fields=self.sensitive_fields,
|
||||
mask_enabled=True)
|
||||
|
||||
self.assertEqual('***', masked_dict['password'])
|
||||
|
||||
masked_dict.set_mask_enabled(False)
|
||||
self.assertEqual('secret123', masked_dict['password'])
|
||||
|
||||
masked_dict.set_mask_enabled(True)
|
||||
self.assertEqual('***', masked_dict['password'])
|
||||
|
||||
def test_getitem_masked(self):
|
||||
"""Test that __getitem__ masks sensitive fields."""
|
||||
masked_dict = utils.ShallowMaskDict(
|
||||
self.test_data, sensitive_fields=self.sensitive_fields,
|
||||
mask_enabled=True)
|
||||
|
||||
self.assertEqual('***', masked_dict['password'])
|
||||
self.assertEqual('***', masked_dict['auth_token'])
|
||||
|
||||
self.assertEqual('testuser', masked_dict['username'])
|
||||
|
||||
def test_getitem_not_masked(self):
|
||||
"""Test that __getitem__ doesn't mask when disabled."""
|
||||
masked_dict = utils.ShallowMaskDict(
|
||||
self.test_data, sensitive_fields=self.sensitive_fields,
|
||||
mask_enabled=False)
|
||||
|
||||
self.assertEqual('secret123', masked_dict['password'])
|
||||
self.assertEqual('abc123token', masked_dict['auth_token'])
|
||||
|
||||
def test_nested_dict_masking(self):
|
||||
"""Test that nested dictionaries are properly masked."""
|
||||
masked_dict = utils.ShallowMaskDict(
|
||||
self.test_data, sensitive_fields=self.sensitive_fields,
|
||||
mask_enabled=True)
|
||||
|
||||
nested = masked_dict['nested']
|
||||
self.assertIsInstance(nested, utils.ShallowMaskDict)
|
||||
self.assertEqual('***', nested['password'])
|
||||
|
||||
self.assertEqual('value', nested['normal'])
|
||||
|
||||
def test_list_masking(self):
|
||||
"""Test that lists containing dictionaries are properly masked."""
|
||||
masked_dict = utils.ShallowMaskDict(
|
||||
self.test_data, sensitive_fields=self.sensitive_fields,
|
||||
mask_enabled=True)
|
||||
|
||||
list_data = masked_dict['list_data']
|
||||
self.assertEqual('***', list_data[0]['password'])
|
||||
|
||||
self.assertEqual('item1', list_data[0]['name'])
|
||||
self.assertEqual('value2', list_data[1]['normal'])
|
||||
|
||||
def test_items_masked(self):
|
||||
"""Test that items() method returns masked values."""
|
||||
masked_dict = utils.ShallowMaskDict(
|
||||
self.test_data, sensitive_fields=self.sensitive_fields,
|
||||
mask_enabled=True)
|
||||
|
||||
items = dict(masked_dict.items())
|
||||
|
||||
self.assertEqual('***', items['password'])
|
||||
self.assertEqual('***', items['auth_token'])
|
||||
self.assertEqual('***', items['nested']['password'])
|
||||
|
||||
def test_values_masked(self):
|
||||
"""Test that values() method masks sensitive values."""
|
||||
test_data = {'username': 'user', 'password': 'secret'}
|
||||
masked_dict = utils.ShallowMaskDict(
|
||||
test_data, sensitive_fields=self.sensitive_fields,
|
||||
mask_enabled=True)
|
||||
|
||||
values = list(masked_dict.values())
|
||||
|
||||
self.assertIn('user', values)
|
||||
self.assertIn('***', values)
|
||||
self.assertNotIn('secret', values)
|
||||
|
||||
def test_get_method_masked(self):
|
||||
"""Test that the get() method properly masks sensitive fields."""
|
||||
masked_dict = utils.ShallowMaskDict(
|
||||
self.test_data, sensitive_fields=self.sensitive_fields,
|
||||
mask_enabled=True)
|
||||
|
||||
self.assertEqual('***', masked_dict.get('password'))
|
||||
self.assertEqual('testuser', masked_dict.get('username'))
|
||||
|
||||
# Non-existent field should return default
|
||||
self.assertIsNone(masked_dict.get('nonexistent'))
|
||||
self.assertEqual('default', masked_dict.get('nonexistent', 'default'))
|
||||
|
||||
def test_modifying_dict(self):
|
||||
"""Test that modifications affect the original data."""
|
||||
original_data = {'username': 'user', 'data': [1, 2, 3]}
|
||||
masked_dict = utils.ShallowMaskDict(
|
||||
original_data, sensitive_fields=self.sensitive_fields,
|
||||
mask_enabled=True)
|
||||
|
||||
masked_dict['new_key'] = 'new_value'
|
||||
masked_dict['data'].append(4)
|
||||
|
||||
self.assertEqual('new_value', original_data['new_key'])
|
||||
self.assertEqual([1, 2, 3, 4], original_data['data'])
|
||||
|
Loading…
x
Reference in New Issue
Block a user