API/Testing: Inspection rules migration

Migrates Inspector rules over to Ironic

This change defers the addition of the configuration options;
``supported_interfaces``, ``[auto_discovery]`` ``inspection_scope`` and
the ``default_scope`` as otherwise specified.

And, while the ``scope`` field may remain in the database for easier
migration, it's also outside of the 'scope' of this change as well,
and there's a chance we use the existing `traits` field in nodes as an
alternative for node-to-rule association instead.

A future follow-up should address these excluded implementations.

Change-Id: I6baf00273e63bb96e133f0cf5da6d8953f97af5a
This commit is contained in:
cid 2025-01-22 23:50:55 +01:00
parent 18343b9be6
commit f23930bc94
10 changed files with 646 additions and 4 deletions

View File

@ -32,6 +32,7 @@ from ironic.api.controllers.v1 import conductor
from ironic.api.controllers.v1 import deploy_template
from ironic.api.controllers.v1 import driver
from ironic.api.controllers.v1 import event
from ironic.api.controllers.v1 import inspection_rule
from ironic.api.controllers.v1 import node
from ironic.api.controllers.v1 import port
from ironic.api.controllers.v1 import portgroup
@ -79,6 +80,7 @@ VERSIONED_CONTROLLERS = {
'deploy_templates': utils.allow_deploy_templates,
'shards': utils.allow_shards_endpoint,
'runbooks': utils.allow_runbooks,
'inspection_rules': utils.allow_inspection_rules,
# NOTE(dtantsur): continue_inspection is available in 1.1 as a
# compatibility hack to make it usable with IPA without changes.
# Hide this fact from consumers since it was not actually available
@ -133,7 +135,8 @@ class Controller(object):
'deploy_templates': deploy_template.DeployTemplatesController(),
'shards': shard.ShardController(),
'continue_inspection': ramdisk.ContinueInspectionController(),
'runbooks': runbook.RunbooksController()
'runbooks': runbook.RunbooksController(),
'inspection_rules': inspection_rule.InspectionRuleController()
}
@method.expose()

View File

@ -0,0 +1,295 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from http import client as http_client
from ironic.common import metrics_utils
from oslo_log import log
from oslo_utils import uuidutils
import pecan
from pecan import rest
from webob import exc as webob_exc
from ironic import api
from ironic.api.controllers import link
from ironic.api.controllers.v1 import collection
from ironic.api.controllers.v1 import notification_utils as notify
from ironic.api.controllers.v1 import utils as api_utils
from ironic.api import method
from ironic.common import args
from ironic.common import exception
from ironic.common.i18n import _
from ironic.common.inspection_rules import validation
import ironic.conf
from ironic import objects
CONF = ironic.conf.CONF
LOG = log.getLogger(__name__)
METRICS = metrics_utils.get_metrics_logger(__name__)
DEFAULT_RETURN_FIELDS = ['uuid', 'priority', 'phase', 'description']
def convert_actions(rpc_actions):
return [{
'op': action['op'],
'args': action['args'],
'loop': action.get('loop', [])
} for action in rpc_actions]
def convert_conditions(rpc_conditions):
converted_conditions = []
for condition in rpc_conditions:
result = {
'op': condition['op'],
'args': condition['args']
}
if condition.get('loop', []):
result['loop'] = condition['loop']
result['multiple'] = condition.get('multiple', 'any')
converted_conditions.append(result)
return converted_conditions
def rules_sanitize(inspection_rule, fields):
"""Removes sensitive and unrequested data.
Will only keep the fields specified in the ``fields`` parameter.
:param fields:
list of fields to preserve, or ``None`` to preserve them all
:type fields: list of str
"""
if inspection_rule.get('sensitive'):
inspection_rule['conditions'] = None
inspection_rule['actions'] = None
api_utils.sanitize_dict(inspection_rule, fields)
def convert_with_links(rpc_rule, fields=None, sanitize=True):
"""Add links to the inspection rule."""
inspection_rule = api_utils.object_to_dict(
rpc_rule,
fields=('description', 'priority', 'sensitive', 'phase', 'conditions',
'actions'),
link_resource='inspection',
)
inspection_rule['actions'] = convert_actions(rpc_rule.actions)
inspection_rule['conditions'] = convert_conditions(rpc_rule.conditions)
if fields is not None:
api_utils.check_for_invalid_fields(fields, inspection_rule)
if sanitize:
rules_sanitize(inspection_rule, fields)
return inspection_rule
def list_convert_with_links(rpc_rules, limit, fields=None, **kwargs):
return collection.list_convert_with_links(
items=[convert_with_links(t, fields=fields, sanitize=False)
for t in rpc_rules],
item_name='inspection_rules',
url='inspection',
limit=limit,
fields=fields,
sanitize_func=rules_sanitize,
**kwargs
)
class InspectionRuleController(rest.RestController):
"""REST controller for inspection rules."""
invalid_sort_key_list = ['actions', 'conditions']
@pecan.expose()
def _route(self, args, request=None):
if not api_utils.allow_inspection_rules():
msg = _("The API version does not allow inspection rules")
if api.request.method == "GET":
raise webob_exc.HTTPNotFound(msg)
else:
raise webob_exc.HTTPMethodNotAllowed(msg)
return super()._route(args, request)
@METRICS.timer('InspectionRuleController.get_all')
@method.expose()
@args.validate(marker=args.name, limit=args.integer, sort_key=args.string,
sort_dir=args.string, fields=args.string_list,
detail=args.boolean)
def get_all(self, marker=None, limit=None, sort_key='id', sort_dir='asc',
fields=None, detail=None, phase=None):
"""Retrieve a list of inspection rules.
:param marker: pagination marker for large data sets.
:param limit: maximum number of resources to return in a single result.
This value cannot be larger than the value of max_limit
in the [api] section of the ironic configuration, or only
max_limit resources will be returned.
:param sort_key: column to sort results by. Default: id.
:param sort_dir: direction to sort. "asc" or "desc". Default: asc.
:param fields: Optional, a list with a specified set of fields
of the resource to be returned.
:param detail: Optional, boolean to indicate whether retrieve a list
of inspection rules with detail.
"""
api_utils.check_policy('baremetal:inspection_rule:get')
api_utils.check_allowed_fields(fields)
api_utils.check_allowed_fields([sort_key])
fields = api_utils.get_request_return_fields(fields, detail,
DEFAULT_RETURN_FIELDS)
limit = api_utils.validate_limit(limit)
sort_dir = api_utils.validate_sort_dir(sort_dir)
if sort_key in self.invalid_sort_key_list:
raise exception.InvalidParameterValue(
_("The sort_key value %(key)s is an invalid field for "
"sorting") % {'key': sort_key})
marker_obj = None
if marker:
marker_obj = objects.InspectionRule.get_by_uuid(
api.request.context, marker)
rules = objects.InspectionRule.list(
api.request.context, limit=limit, marker=marker_obj,
sort_key=sort_key, sort_dir=sort_dir)
parameters = {'sort_key': sort_key, 'sort_dir': sort_dir}
if detail is not None:
parameters['detail'] = detail
filters = {}
if phase:
filters['phase'] = phase
return list_convert_with_links(
rules, limit, fields=fields, filters=filters, **parameters)
@METRICS.timer('InspectionRuleController.get_one')
@method.expose()
@args.validate(inspection_rule_uuid=args.uuid, fields=args.string_list)
def get_one(self, inspection_rule_uuid, fields=None):
"""Retrieve information about the given inspection rule.
:param inspection_rule_uuid: UUID of an inspection rule.
:param fields: Optional, a list with a specified set of fields
of the resource to be returned.
"""
api_utils.check_policy('baremetal:inspection_rule:get')
inspection_rule = objects.InspectionRule.get_by_uuid(
api.request.context, inspection_rule_uuid)
api_utils.check_allowed_fields(fields)
return convert_with_links(inspection_rule, fields=fields)
@METRICS.timer('InspectionRuleController.post')
@method.expose(status_code=http_client.CREATED)
@method.body('inspection_rule')
def post(self, inspection_rule):
"""Create a new inspection rule.
:param inspection_rule: a inspection rule within the request body.
"""
context = api.request.context
api_utils.check_policy('baremetal:inspection_rule:create')
validation.validate_rule(inspection_rule)
if not inspection_rule.get('uuid'):
inspection_rule['uuid'] = uuidutils.generate_uuid()
new_rule = objects.InspectionRule(context, **inspection_rule)
notify.emit_start_notification(context, new_rule, 'create')
with notify.handle_error_notification(context, new_rule, 'create'):
new_rule.create()
api.response.location = link.build_url('inspection_rules',
new_rule.uuid)
api_rule = convert_with_links(new_rule)
notify.emit_end_notification(context, new_rule, 'create')
return api_rule
@METRICS.timer('InspectionRuleController.patch')
@method.expose()
@method.body('patch')
@args.validate(inspection_rule_uuid=args.uuid)
def patch(self, inspection_rule_uuid, patch=None):
"""Update an existing inspection rule.
:param inspection_rule_uuid: UUID of the rule to update.
:param patch: a json PATCH document to apply to this inspection rule.
"""
context = api.request.context
api_utils.check_policy('baremetal:inspection_rule:update')
rpc_rule = objects.InspectionRule.get_by_uuid(context,
inspection_rule_uuid)
rule = rpc_rule.as_dict()
sensitive_patch = api_utils.get_patch_values(patch, '/sensitive')
sensitive = sensitive_patch[0] if sensitive_patch else None
if (not sensitive) and sensitive is not None:
if rpc_rule['sensitive']:
msg = _("Inspection rules cannot have "
"the sensitive flag unset.")
raise exception.PatchError(patch=patch, reason=msg)
rule = api_utils.apply_jsonpatch(rule, patch)
api_utils.patched_validate_with_schema(
rule, validation.SCHEMA,
validation.VALIDATOR)
validation.validate_rule(rule)
api_utils.patch_update_changed_fields(
rule, rpc_rule, fields=objects.InspectionRule.fields,
schema=validation.SCHEMA
)
notify.emit_start_notification(context, rpc_rule, 'update')
with notify.handle_error_notification(context, rpc_rule, 'update'):
rpc_rule.save()
api_rule = convert_with_links(rpc_rule)
notify.emit_end_notification(context, rpc_rule, 'update')
return api_rule
@METRICS.timer('InspectionRuleController.delete')
@method.expose(status_code=http_client.NO_CONTENT)
@args.validate(inspection_rule_uuid=args.uuid)
def delete(self, inspection_rule_uuid):
"""Delete an inspection rule.
:param inspection_rule_uuid: UUID of an inspection rule.
:param confirm: Confirmation string. Must be 'true' for bulk deletion.
"""
context = api.request.context
api_utils.check_policy('baremetal:inspection_rule:delete')
inspection_rule = objects.InspectionRule.get_by_uuid(
context, inspection_rule_uuid)
notify.emit_start_notification(context, inspection_rule, 'delete')
with notify.handle_error_notification(context, inspection_rule,
'delete'):
inspection_rule.destroy()
notify.emit_end_notification(context, inspection_rule, 'delete')

View File

@ -24,6 +24,7 @@ from ironic.objects import allocation as allocation_objects
from ironic.objects import chassis as chassis_objects
from ironic.objects import deploy_template as deploy_template_objects
from ironic.objects import fields
from ironic.objects import inspection_rule as inspection_rule_objects
from ironic.objects import node as node_objects
from ironic.objects import notification
from ironic.objects import port as port_objects
@ -57,6 +58,9 @@ CRUD_NOTIFY_OBJ = {
'volumetarget':
(volume_target_objects.VolumeTargetCRUDNotification,
volume_target_objects.VolumeTargetCRUDPayload),
'inspectionrule':
(inspection_rule_objects.InspectionRuleCRUDNotification,
inspection_rule_objects.InspectionRuleCRUDPayload),
}

View File

@ -1621,6 +1621,14 @@ def allow_runbooks():
return api.request.version.minor >= versions.MINOR_92_RUNBOOKS
def allow_inspection_rules():
"""Check if accessing inspection rule endpoints is allowed.
Version 1.96 of the API exposed rule endpoints.
"""
return api.request.version.minor >= versions.MINOR_96_INSPECTION_RULES
def check_owner_policy(object_type, policy_name, owner, lessee=None,
conceal_node=False):
"""Check if the policy authorizes this request on an object.

View File

@ -133,6 +133,7 @@ BASE_VERSION = 1
# v1.93: Add GET API for virtual media
# v1.94: Add node name support for port creation
# v1.95: Add node support for disable_power_off
# v1.96: Migrate inspection rules from Inspector
MINOR_0_JUNO = 0
MINOR_1_INITIAL_VERSION = 1
@ -230,6 +231,7 @@ MINOR_92_RUNBOOKS = 92
MINOR_93_GET_VMEDIA = 93
MINOR_94_PORT_NODENAME = 94
MINOR_95_DISABLE_POWER_OFF = 95
MINOR_96_INSPECTION_RULES = 96
# When adding another version, update:
# - MINOR_MAX_VERSION
@ -237,7 +239,7 @@ MINOR_95_DISABLE_POWER_OFF = 95
# explanation of what changed in the new version
# - common/release_mappings.py, RELEASE_MAPPING['master']['api']
MINOR_MAX_VERSION = MINOR_95_DISABLE_POWER_OFF
MINOR_MAX_VERSION = MINOR_96_INSPECTION_RULES
# String representations of the minor and maximum versions
_MIN_VERSION_STRING = '{}.{}'.format(BASE_VERSION, MINOR_1_INITIAL_VERSION)

View File

@ -1020,7 +1020,7 @@ node_policies = [
name='baremetal:node:inventory:get',
check_str=SYSTEM_OR_OWNER_READER,
scope_types=['system', 'project'],
description='Retrieve introspection data for a node.',
description='Retrieve inspection data for a node.',
operations=[
{'path': '/nodes/{node_ident}/inventory', 'method': 'GET'},
],
@ -2009,6 +2009,50 @@ runbook_policies = [
)
]
rule_policies = [
policy.DocumentedRuleDefault(
name='baremetal:inspection_rule:get',
check_str=SYSTEM_READER,
scope_types=['system', 'project'],
description='Get inspection rule(s)',
operations=[{'path': '/inspection_rules', 'method': 'GET'},
{'path': '/inspection_rules/{rule_id}', 'method': 'GET'}],
),
policy.DocumentedRuleDefault(
name='baremetal:inspection_rule:list_all',
check_str=SYSTEM_READER,
scope_types=['system', 'project'],
description='Retrieve all inspection_rule records',
operations=[
{'path': '/inspection_rules', 'method': 'GET'}
],
),
policy.DocumentedRuleDefault(
name='baremetal:inspection_rule:create',
check_str=SYSTEM_ADMIN,
scope_types=['system', 'project'],
description='Create inspection rule',
operations=[{'path': '/inspection_rules', 'method': 'POST'}],
),
policy.DocumentedRuleDefault(
name='baremetal:inspection_rule:update',
check_str=SYSTEM_ADMIN,
scope_types=['system', 'project'],
description='Update an inspection rule',
operations=[{'path': '/inspection_rules/{rule_id}',
'method': 'PATCH'}],
),
policy.DocumentedRuleDefault(
name='baremetal:inspection_rule:delete',
check_str=SYSTEM_ADMIN,
scope_types=['system', 'project'],
description='Delete an inspection rule',
operations=[{'path': '/inspection_rules', 'method': 'DELETE'},
{'path': '/inspection_rules/{rule_id}',
'method': 'DELETE'}],
)
]
def list_policies():
policies = itertools.chain(
@ -2026,6 +2070,7 @@ def list_policies():
event_policies,
deploy_template_policies,
runbook_policies,
rule_policies,
)
return policies

View File

@ -776,7 +776,7 @@ RELEASE_MAPPING = {
# make it below. To release, we will preserve a version matching
# the release as a separate block of text, like above.
'master': {
'api': '1.95',
'api': '1.96',
'rpc': '1.61',
'objects': {
'Allocation': ['1.1'],

View File

@ -0,0 +1,267 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests for the API /inspection_rules methods.
"""
import datetime
from http import client as http_client
from unittest import mock
from oslo_utils import timeutils
from oslo_utils import uuidutils
from ironic.api.controllers import base as api_base
from ironic.api.controllers import v1 as api_v1
from ironic.api.controllers.v1 import notification_utils
from ironic import objects
from ironic.objects import fields as obj_fields
from ironic.tests.unit.api import base as test_api_base
from ironic.tests.unit.api import utils as test_api_utils
from ironic.tests.unit.objects import utils as obj_utils
class BaseInspectionRulesAPITest(test_api_base.BaseApiTest):
headers = {api_base.Version.string: str(api_v1.max_version())}
invalid_version_headers = {api_base.Version.string: '1.92'}
class TestListInspectionRules(BaseInspectionRulesAPITest):
def test_empty(self):
data = self.get_json('/inspection_rules', headers=self.headers)
self.assertEqual([], data['inspection_rules'])
def test_one(self):
inspection_rule = obj_utils.create_test_inspection_rule(self.context)
data = self.get_json('/inspection_rules', headers=self.headers)
self.assertEqual(1, len(data['inspection_rules']))
self.assertEqual(inspection_rule.uuid,
data['inspection_rules'][0]['uuid'])
self.assertEqual(inspection_rule.description,
data['inspection_rules'][0]['description'])
self.assertNotIn('actions', data['inspection_rules'][0])
self.assertNotIn('conditions', data['inspection_rules'][0])
def test_get_one(self):
rule = obj_utils.create_test_inspection_rule(self.context)
data = self.get_json('/inspection_rules/%s' % rule.uuid,
headers=self.headers)
self.assertEqual(rule.uuid, data['uuid'])
self.assertIn('conditions', data)
self.assertIn('actions', data)
def test_get_rule_data(self):
"""Test get normal rule does not hide conditions and actions"""
idict = test_api_utils.post_get_test_inspection_rule()
idict['sensitive'] = False
idict['conditions'] = [{'op': 'eq', 'args': {'values': [1, 1]}}]
idict['actions'] = [{'op': 'set-attribute',
'args': {'path': 'test', 'value': 'secret'}}]
response = self.post_json('/inspection_rules', idict,
headers=self.headers)
self.assertEqual(201, response.status_int)
rule = self.get_json('/inspection_rules/%s' % idict['uuid'],
headers=self.headers)
self.assertFalse(rule['sensitive'])
self.assertIsNotNone(rule['conditions'])
self.assertIsNotNone(rule['actions'])
def test_get_sensitive_rule_hides_data(self):
"""Test get sensitive rule hides conditions and actions"""
idict = test_api_utils.post_get_test_inspection_rule()
idict['sensitive'] = True
idict['conditions'] = [{'op': 'eq', 'args': {'values': [1, 1]}}]
idict['actions'] = [{'op': 'set-attribute',
'args': {'path': 'test', 'value': 'secret'}}]
response = self.post_json('/inspection_rules', idict,
headers=self.headers)
self.assertEqual(201, response.status_int)
rule = self.get_json('/inspection_rules/%s' % idict['uuid'],
headers=self.headers)
self.assertTrue(rule['sensitive'])
self.assertIsNone(rule['conditions'])
self.assertIsNone(rule['actions'])
def test_list_hides_sensitive_data(self):
"""Test that listing rules hides sensitive data for sensitive rules."""
sensitive_rule = test_api_utils.post_get_test_inspection_rule()
sensitive_rule['sensitive'] = True
sensitive_rule['uuid'] = uuidutils.generate_uuid()
normal_rule = test_api_utils.post_get_test_inspection_rule()
normal_rule['sensitive'] = False
normal_rule['uuid'] = uuidutils.generate_uuid()
self.post_json('/inspection_rules', sensitive_rule,
headers=self.headers)
self.post_json('/inspection_rules', normal_rule, headers=self.headers)
data = self.get_json('/inspection_rules?detail=true',
headers=self.headers)
sensitive_result = next(r for r in data['inspection_rules']
if r['uuid'] == sensitive_rule['uuid'])
normal_result = next(r for r in data['inspection_rules']
if r['uuid'] == normal_rule['uuid'])
self.assertTrue(sensitive_result['sensitive'])
self.assertIsNone(sensitive_result['conditions'])
self.assertIsNone(sensitive_result['actions'])
self.assertFalse(normal_result['sensitive'])
self.assertIsNotNone(normal_result['conditions'])
self.assertIsNotNone(normal_result['actions'])
def test_get_all_invalid_api_version(self):
obj_utils.create_test_inspection_rule(self.context)
response = self.get_json('/inspection_rules',
headers=self.invalid_version_headers,
expect_errors=True)
self.assertEqual(http_client.NOT_FOUND, response.status_int)
def test_get_one_invalid_api_version(self):
inspection_rule = obj_utils.create_test_inspection_rule(self.context)
response = self.get_json(
'/inspection_rules/%s' % (inspection_rule.uuid),
headers=self.invalid_version_headers,
expect_errors=True)
self.assertEqual(http_client.NOT_FOUND, response.status_int)
def test_get_all(self):
obj_utils.create_test_inspection_rule(self.context)
obj_utils.create_test_inspection_rule(self.context)
data = self.get_json('/inspection_rules', headers=self.headers)
self.assertEqual(2, len(data['inspection_rules']))
class TestPost(BaseInspectionRulesAPITest):
@mock.patch.object(timeutils, 'utcnow', autospec=True)
def test_create_rule(self, mock_utcnow):
idict = test_api_utils.post_get_test_inspection_rule()
test_time = datetime.datetime(2024, 8, 27, 0, 0)
mock_utcnow.return_value = test_time
response = self.post_json('/inspection_rules', idict,
headers=self.headers)
self.assertEqual('application/json', response.content_type)
self.assertEqual(201, response.status_int)
result = self.get_json('/inspection_rules/%s' % idict['uuid'],
headers=self.headers)
self.assertEqual(idict['uuid'], result['uuid'])
self.assertFalse(result['updated_at'])
return_created_at = timeutils.parse_isotime(
result['created_at']).replace(tzinfo=None)
self.assertEqual(test_time, return_created_at)
# Check location header
self.assertIsNotNone(response.location)
expected_location = '/v1/inspection_rules/%s' % idict['uuid']
self.assertEqual(expected_location,
response.location[response.location.index('/v1'):])
def test_create_rule_generate_uuid(self):
idict = test_api_utils.post_get_test_inspection_rule()
del idict['uuid']
response = self.post_json('/inspection_rules', idict,
headers=self.headers)
result = self.get_json('/inspection_rules/%s' % response.json['uuid'],
headers=self.headers)
self.assertTrue(uuidutils.is_uuid_like(result['uuid']))
self.assertNotIn('id', result)
def test_create_rule_with_optional_args(self):
idict = test_api_utils.post_get_test_inspection_rule()
idict['conditions'] = [
{'op': 'eq', 'args': {'values': [5, 5]}, 'force_strings': True},
{'op': 'gt', 'args': {'values': [10, 5]}}
]
idict['actions'] = [
{'op': 'extend-attribute', 'args': {
'path': 'properties/capabilities', 'value': 'test:value'},
'unique': True},
{'op': 'set-attribute', 'args': {
'path': 'properties/test', 'value': 'test-value'}}
]
response = self.post_json('/inspection_rules', idict,
headers=self.headers)
self.assertEqual(201, response.status_int)
class TestPatch(BaseInspectionRulesAPITest):
def test_patch_invalid_api_version(self):
rule = obj_utils.create_test_inspection_rule(self.context)
patch = [{'op': 'replace', 'path': '/description',
'value': 'New description'}]
response = self.patch_json('/inspection_rules/%s' % rule.uuid,
patch, headers=self.invalid_version_headers,
expect_errors=True)
self.assertEqual(http_client.METHOD_NOT_ALLOWED, response.status_int)
def test_set_sensitive_field(self):
idict = test_api_utils.post_get_test_inspection_rule()
idict['sensitive'] = False
response = self.post_json('/inspection_rules', idict,
headers=self.headers)
self.assertEqual(201, response.status_int)
# A non-sensitive rule can be marked sensitive, but not if already set
patch = [{'op': 'replace', 'path': '/sensitive', 'value': True}]
response = self.patch_json(
'/inspection_rules/%s' % idict['uuid'],
patch,
headers=self.headers,
expect_errors=True
)
# Should succeed
self.assertEqual(http_client.OK, response.status_int)
# Should fail
new_patch = [{'op': 'replace', 'path': '/sensitive', 'value': False}]
response = self.patch_json(
'/inspection_rules/%s' % idict['uuid'],
new_patch,
headers=self.headers,
expect_errors=True
)
self.assertEqual(http_client.BAD_REQUEST, response.status_int)
@mock.patch.object(objects.InspectionRule, 'destroy', autospec=True)
class TestDelete(BaseInspectionRulesAPITest):
@mock.patch.object(notification_utils, '_emit_api_notification',
autospec=True)
def test_delete_by_uuid(self, mock_notify, mock_destroy):
rule = obj_utils.create_test_inspection_rule(self.context)
self.delete('/inspection_rules/%s' % rule.uuid,
headers=self.headers)
mock_destroy.assert_called_once_with(mock.ANY)
mock_notify.assert_has_calls([mock.call(mock.ANY, mock.ANY, 'delete',
obj_fields.NotificationLevel.INFO,
obj_fields.NotificationStatus.START),
mock.call(mock.ANY, mock.ANY, 'delete',
obj_fields.NotificationLevel.INFO,
obj_fields.NotificationStatus.END)])
def test_delete_invalid_api_version(self, mock_destroy):
rule = obj_utils.create_test_inspection_rule(self.context)
response = self.delete(
'/inspection_rules/%s' % rule.uuid,
expect_errors=True,
headers=self.invalid_version_headers)
self.assertEqual(http_client.METHOD_NOT_ALLOWED, response.status_int)

View File

@ -106,6 +106,12 @@ class TestV1Routing(api_base.BaseApiTest):
})
self.assertEqual({
'id': 'v1',
'inspection_rules': [
{'href': 'http://localhost/v1/inspection_rules/',
'rel': 'self'},
{'href': 'http://localhost/inspection_rules/',
'rel': 'bookmark'}
],
'links': [
{'href': 'http://localhost/v1/', 'rel': 'self'},
{'href': 'https://docs.openstack.org//ironic/latest'

View File

@ -224,3 +224,15 @@ def post_get_test_deploy_template(**kw):
def post_get_test_runbook(**kw):
"""Return a Runbook object with appropriate attributes."""
return runbook_post_data(**kw)
def inspection_rule_post_data(**kw):
"""Return a Inspection Rule object"""
inspection_rule = db_utils.get_test_inspection_rule(**kw)
inspection_rule.pop('version')
return inspection_rule
def post_get_test_inspection_rule(**kw):
"""Return a Inspection Rule object"""
return inspection_rule_post_data(**kw)