Merge "Switch cloud.networking.qos* operations to rely on proxy layer"

This commit is contained in:
Zuul 2021-03-02 21:44:06 +00:00 committed by Gerrit Code Review
commit 1724bf3d43
12 changed files with 284 additions and 355 deletions

View File

@ -34,11 +34,7 @@ class NetworkCloudMixin(_normalize.Normalizer):
@_utils.cache_on_arguments()
def _neutron_extensions(self):
extensions = set()
resp = self.network.get('/extensions')
data = proxy._json_response(
resp,
error_message="Error fetching extension list for neutron")
for extension in self._get_and_munchify('extensions', data):
for extension in self.network.extensions():
extensions.add(extension['alias'])
return extensions
@ -232,8 +228,15 @@ class NetworkCloudMixin(_normalize.Normalizer):
found.
"""
return _utils._get_entity(
self, 'qos_policie', name_or_id, filters)
if not self._has_neutron_extension('qos'):
raise exc.OpenStackCloudUnavailableExtension(
'QoS extension is not available on target cloud')
if not filters:
filters = {}
return self.network.find_qos_policy(
name_or_id=name_or_id,
ignore_missing=True,
**filters)
def search_qos_policies(self, name_or_id=None, filters=None):
"""Search QoS policies
@ -247,8 +250,15 @@ class NetworkCloudMixin(_normalize.Normalizer):
:raises: ``OpenStackCloudException`` if something goes wrong during the
OpenStack API call.
"""
policies = self.list_qos_policies(filters)
return _utils._filter_list(policies, name_or_id, filters)
if not self._has_neutron_extension('qos'):
raise exc.OpenStackCloudUnavailableExtension(
'QoS extension is not available on target cloud')
query = {}
if name_or_id:
query['name'] = name_or_id
if filters:
query.update(filters)
return list(self.network.qos_policies(**query))
def list_qos_rule_types(self, filters=None):
"""List all available QoS rule types.
@ -264,11 +274,7 @@ class NetworkCloudMixin(_normalize.Normalizer):
# Translate None from search interface to empty {} for kwargs below
if not filters:
filters = {}
resp = self.network.get("/qos/rule-types", params=filters)
data = proxy._json_response(
resp,
error_message="Error fetching QoS rule types list")
return self._get_and_munchify('rule_types', data)
return list(self.network.qos_rule_types(**filters))
def get_qos_rule_type_details(self, rule_type, filters=None):
"""Get a QoS rule type details by rule type name.
@ -288,13 +294,7 @@ class NetworkCloudMixin(_normalize.Normalizer):
'qos-rule-type-details extension is not available '
'on target cloud')
resp = self.network.get(
"/qos/rule-types/{rule_type}".format(rule_type=rule_type))
data = proxy._json_response(
resp,
error_message="Error fetching QoS details of {rule_type} "
"rule type".format(rule_type=rule_type))
return self._get_and_munchify('rule_type', data)
return self.network.get_qos_rule_type(rule_type)
def list_qos_policies(self, filters=None):
"""List all available QoS policies.
@ -309,11 +309,7 @@ class NetworkCloudMixin(_normalize.Normalizer):
# Translate None from search interface to empty {} for kwargs below
if not filters:
filters = {}
resp = self.network.get("/qos/policies", params=filters)
data = proxy._json_response(
resp,
error_message="Error fetching QoS policies list")
return self._get_and_munchify('policies', data)
return list(self.network.qos_policies(**filters))
def get_network(self, name_or_id, filters=None):
"""Get a network by name or ID.
@ -1193,8 +1189,7 @@ class NetworkCloudMixin(_normalize.Normalizer):
self.log.debug("'qos-default' extension is not available on "
"target cloud")
data = self.network.post("/qos/policies", json={'policy': kwargs})
return self._get_and_munchify('policy', data)
return self.network.create_qos_policy(**kwargs)
@_utils.valid_kwargs("name", "description", "shared", "default",
"project_id")
@ -1231,16 +1226,11 @@ class NetworkCloudMixin(_normalize.Normalizer):
self.log.debug("No QoS policy data to update")
return
curr_policy = self.get_qos_policy(name_or_id)
curr_policy = self.network.find_qos_policy(name_or_id)
if not curr_policy:
raise exc.OpenStackCloudException(
"QoS policy %s not found." % name_or_id)
data = self.network.put(
"/qos/policies/{policy_id}".format(
policy_id=curr_policy['id']),
json={'policy': kwargs})
return self._get_and_munchify('policy', data)
return self.network.update_qos_policy(curr_policy, **kwargs)
def delete_qos_policy(self, name_or_id):
"""Delete a QoS policy.
@ -1254,18 +1244,17 @@ class NetworkCloudMixin(_normalize.Normalizer):
if not self._has_neutron_extension('qos'):
raise exc.OpenStackCloudUnavailableExtension(
'QoS extension is not available on target cloud')
policy = self.get_qos_policy(name_or_id)
policy = self.network.find_qos_policy(name_or_id)
if not policy:
self.log.debug("QoS policy %s not found for deleting", name_or_id)
return False
exceptions.raise_from_response(self.network.delete(
"/qos/policies/{policy_id}".format(policy_id=policy['id'])))
self.network.delete_qos_policy(policy)
return True
def search_qos_bandwidth_limit_rules(self, policy_name_or_id, rule_id=None,
filters=None):
def search_qos_bandwidth_limit_rules(
self, policy_name_or_id, rule_id=None, filters=None
):
"""Search QoS bandwidth limit rules
:param string policy_name_or_id: Name or ID of the QoS policy to which
@ -1298,7 +1287,7 @@ class NetworkCloudMixin(_normalize.Normalizer):
raise exc.OpenStackCloudUnavailableExtension(
'QoS extension is not available on target cloud')
policy = self.get_qos_policy(policy_name_or_id)
policy = self.network.find_qos_policy(policy_name_or_id)
if not policy:
raise exc.OpenStackCloudResourceNotFound(
"QoS policy {name_or_id} not Found.".format(
@ -1308,15 +1297,8 @@ class NetworkCloudMixin(_normalize.Normalizer):
if not filters:
filters = {}
resp = self.network.get(
"/qos/policies/{policy_id}/bandwidth_limit_rules".format(
policy_id=policy['id']),
params=filters)
data = proxy._json_response(
resp,
error_message="Error fetching QoS bandwidth limit rules from "
"{policy}".format(policy=policy['id']))
return self._get_and_munchify('bandwidth_limit_rules', data)
return list(self.network.qos_bandwidth_limit_rules(
qos_policy=policy, **filters))
def get_qos_bandwidth_limit_rule(self, policy_name_or_id, rule_id):
"""Get a QoS bandwidth limit rule by name or ID.
@ -1333,21 +1315,14 @@ class NetworkCloudMixin(_normalize.Normalizer):
raise exc.OpenStackCloudUnavailableExtension(
'QoS extension is not available on target cloud')
policy = self.get_qos_policy(policy_name_or_id)
policy = self.network.find_qos_policy(policy_name_or_id)
if not policy:
raise exc.OpenStackCloudResourceNotFound(
"QoS policy {name_or_id} not Found.".format(
name_or_id=policy_name_or_id))
resp = self.network.get(
"/qos/policies/{policy_id}/bandwidth_limit_rules/{rule_id}".
format(policy_id=policy['id'], rule_id=rule_id))
data = proxy._json_response(
resp,
error_message="Error fetching QoS bandwidth limit rule {rule_id} "
"from {policy}".format(rule_id=rule_id,
policy=policy['id']))
return self._get_and_munchify('bandwidth_limit_rule', data)
return self.network.get_qos_bandwidth_limit_rule(
rule_id, policy)
@_utils.valid_kwargs("max_burst_kbps", "direction")
def create_qos_bandwidth_limit_rule(self, policy_name_or_id, max_kbps,
@ -1369,7 +1344,7 @@ class NetworkCloudMixin(_normalize.Normalizer):
raise exc.OpenStackCloudUnavailableExtension(
'QoS extension is not available on target cloud')
policy = self.get_qos_policy(policy_name_or_id)
policy = self.network.find_qos_policy(policy_name_or_id)
if not policy:
raise exc.OpenStackCloudResourceNotFound(
"QoS policy {name_or_id} not Found.".format(
@ -1383,11 +1358,7 @@ class NetworkCloudMixin(_normalize.Normalizer):
"target cloud")
kwargs['max_kbps'] = max_kbps
data = self.network.post(
"/qos/policies/{policy_id}/bandwidth_limit_rules".format(
policy_id=policy['id']),
json={'bandwidth_limit_rule': kwargs})
return self._get_and_munchify('bandwidth_limit_rule', data)
return self.network.create_qos_bandwidth_limit_rule(policy, **kwargs)
@_utils.valid_kwargs("max_kbps", "max_burst_kbps", "direction")
def update_qos_bandwidth_limit_rule(self, policy_name_or_id, rule_id,
@ -1410,7 +1381,9 @@ class NetworkCloudMixin(_normalize.Normalizer):
raise exc.OpenStackCloudUnavailableExtension(
'QoS extension is not available on target cloud')
policy = self.get_qos_policy(policy_name_or_id)
policy = self.network.find_qos_policy(
policy_name_or_id,
ignore_missing=True)
if not policy:
raise exc.OpenStackCloudResourceNotFound(
"QoS policy {name_or_id} not Found.".format(
@ -1427,19 +1400,16 @@ class NetworkCloudMixin(_normalize.Normalizer):
self.log.debug("No QoS bandwidth limit rule data to update")
return
curr_rule = self.get_qos_bandwidth_limit_rule(
policy_name_or_id, rule_id)
curr_rule = self.network.get_qos_bandwidth_limit_rule(
qos_rule=rule_id, qos_policy=policy)
if not curr_rule:
raise exc.OpenStackCloudException(
"QoS bandwidth_limit_rule {rule_id} not found in policy "
"{policy_id}".format(rule_id=rule_id,
policy_id=policy['id']))
data = self.network.put(
"/qos/policies/{policy_id}/bandwidth_limit_rules/{rule_id}".
format(policy_id=policy['id'], rule_id=rule_id),
json={'bandwidth_limit_rule': kwargs})
return self._get_and_munchify('bandwidth_limit_rule', data)
return self.network.update_qos_bandwidth_limit_rule(
qos_rule=curr_rule, qos_policy=policy, **kwargs)
def delete_qos_bandwidth_limit_rule(self, policy_name_or_id, rule_id):
"""Delete a QoS bandwidth limit rule.
@ -1454,17 +1424,16 @@ class NetworkCloudMixin(_normalize.Normalizer):
raise exc.OpenStackCloudUnavailableExtension(
'QoS extension is not available on target cloud')
policy = self.get_qos_policy(policy_name_or_id)
policy = self.network.find_qos_policy(policy_name_or_id)
if not policy:
raise exc.OpenStackCloudResourceNotFound(
"QoS policy {name_or_id} not Found.".format(
name_or_id=policy_name_or_id))
try:
exceptions.raise_from_response(self.network.delete(
"/qos/policies/{policy}/bandwidth_limit_rules/{rule}".
format(policy=policy['id'], rule=rule_id)))
except exc.OpenStackCloudURINotFound:
self.network.delete_qos_bandwidth_limit_rule(
rule_id, policy, ignore_missing=False)
except exceptions.ResourceNotFound:
self.log.debug(
"QoS bandwidth limit rule {rule_id} not found in policy "
"{policy_id}. Ignoring.".format(rule_id=rule_id,
@ -1507,7 +1476,8 @@ class NetworkCloudMixin(_normalize.Normalizer):
raise exc.OpenStackCloudUnavailableExtension(
'QoS extension is not available on target cloud')
policy = self.get_qos_policy(policy_name_or_id)
policy = self.network.find_qos_policy(
policy_name_or_id, ignore_missing=True)
if not policy:
raise exc.OpenStackCloudResourceNotFound(
"QoS policy {name_or_id} not Found.".format(
@ -1517,15 +1487,7 @@ class NetworkCloudMixin(_normalize.Normalizer):
if not filters:
filters = {}
resp = self.network.get(
"/qos/policies/{policy_id}/dscp_marking_rules".format(
policy_id=policy['id']),
params=filters)
data = proxy._json_response(
resp,
error_message="Error fetching QoS DSCP marking rules from "
"{policy}".format(policy=policy['id']))
return self._get_and_munchify('dscp_marking_rules', data)
return list(self.network.qos_dscp_marking_rules(policy, **filters))
def get_qos_dscp_marking_rule(self, policy_name_or_id, rule_id):
"""Get a QoS DSCP marking rule by name or ID.
@ -1542,21 +1504,13 @@ class NetworkCloudMixin(_normalize.Normalizer):
raise exc.OpenStackCloudUnavailableExtension(
'QoS extension is not available on target cloud')
policy = self.get_qos_policy(policy_name_or_id)
policy = self.network.find_qos_policy(policy_name_or_id)
if not policy:
raise exc.OpenStackCloudResourceNotFound(
"QoS policy {name_or_id} not Found.".format(
name_or_id=policy_name_or_id))
resp = self.network.get(
"/qos/policies/{policy_id}/dscp_marking_rules/{rule_id}".
format(policy_id=policy['id'], rule_id=rule_id))
data = proxy._json_response(
resp,
error_message="Error fetching QoS DSCP marking rule {rule_id} "
"from {policy}".format(rule_id=rule_id,
policy=policy['id']))
return self._get_and_munchify('dscp_marking_rule', data)
return self.network.get_qos_dscp_marking_rule(rule_id, policy)
def create_qos_dscp_marking_rule(self, policy_name_or_id, dscp_mark):
"""Create a QoS DSCP marking rule.
@ -1572,20 +1526,14 @@ class NetworkCloudMixin(_normalize.Normalizer):
raise exc.OpenStackCloudUnavailableExtension(
'QoS extension is not available on target cloud')
policy = self.get_qos_policy(policy_name_or_id)
policy = self.network.find_qos_policy(policy_name_or_id)
if not policy:
raise exc.OpenStackCloudResourceNotFound(
"QoS policy {name_or_id} not Found.".format(
name_or_id=policy_name_or_id))
body = {
'dscp_mark': dscp_mark
}
data = self.network.post(
"/qos/policies/{policy_id}/dscp_marking_rules".format(
policy_id=policy['id']),
json={'dscp_marking_rule': body})
return self._get_and_munchify('dscp_marking_rule', data)
return self.network.create_qos_dscp_marking_rule(
policy, dscp_mark=dscp_mark)
@_utils.valid_kwargs("dscp_mark")
def update_qos_dscp_marking_rule(self, policy_name_or_id, rule_id,
@ -1604,7 +1552,7 @@ class NetworkCloudMixin(_normalize.Normalizer):
raise exc.OpenStackCloudUnavailableExtension(
'QoS extension is not available on target cloud')
policy = self.get_qos_policy(policy_name_or_id)
policy = self.network.find_qos_policy(policy_name_or_id)
if not policy:
raise exc.OpenStackCloudResourceNotFound(
"QoS policy {name_or_id} not Found.".format(
@ -1614,19 +1562,16 @@ class NetworkCloudMixin(_normalize.Normalizer):
self.log.debug("No QoS DSCP marking rule data to update")
return
curr_rule = self.get_qos_dscp_marking_rule(
policy_name_or_id, rule_id)
curr_rule = self.network.get_qos_dscp_marking_rule(
rule_id, policy)
if not curr_rule:
raise exc.OpenStackCloudException(
"QoS dscp_marking_rule {rule_id} not found in policy "
"{policy_id}".format(rule_id=rule_id,
policy_id=policy['id']))
data = self.network.put(
"/qos/policies/{policy_id}/dscp_marking_rules/{rule_id}".
format(policy_id=policy['id'], rule_id=rule_id),
json={'dscp_marking_rule': kwargs})
return self._get_and_munchify('dscp_marking_rule', data)
return self.network.update_qos_dscp_marking_rule(
curr_rule, policy, **kwargs)
def delete_qos_dscp_marking_rule(self, policy_name_or_id, rule_id):
"""Delete a QoS DSCP marking rule.
@ -1641,17 +1586,16 @@ class NetworkCloudMixin(_normalize.Normalizer):
raise exc.OpenStackCloudUnavailableExtension(
'QoS extension is not available on target cloud')
policy = self.get_qos_policy(policy_name_or_id)
policy = self.network.find_qos_policy(policy_name_or_id)
if not policy:
raise exc.OpenStackCloudResourceNotFound(
"QoS policy {name_or_id} not Found.".format(
name_or_id=policy_name_or_id))
try:
exceptions.raise_from_response(self.network.delete(
"/qos/policies/{policy}/dscp_marking_rules/{rule}".
format(policy=policy['id'], rule=rule_id)))
except exc.OpenStackCloudURINotFound:
self.network.delete_qos_dscp_marking_rule(
rule_id, policy, ignore_missing=False)
except exceptions.ResourceNotFound:
self.log.debug(
"QoS DSCP marking rule {rule_id} not found in policy "
"{policy_id}. Ignoring.".format(rule_id=rule_id,
@ -1696,7 +1640,7 @@ class NetworkCloudMixin(_normalize.Normalizer):
raise exc.OpenStackCloudUnavailableExtension(
'QoS extension is not available on target cloud')
policy = self.get_qos_policy(policy_name_or_id)
policy = self.network.find_qos_policy(policy_name_or_id)
if not policy:
raise exc.OpenStackCloudResourceNotFound(
"QoS policy {name_or_id} not Found.".format(
@ -1706,15 +1650,9 @@ class NetworkCloudMixin(_normalize.Normalizer):
if not filters:
filters = {}
resp = self.network.get(
"/qos/policies/{policy_id}/minimum_bandwidth_rules".format(
policy_id=policy['id']),
params=filters)
data = proxy._json_response(
resp,
error_message="Error fetching QoS minimum bandwidth rules from "
"{policy}".format(policy=policy['id']))
return self._get_and_munchify('minimum_bandwidth_rules', data)
return list(
self.network.qos_minimum_bandwidth_rules(
policy, **filters))
def get_qos_minimum_bandwidth_rule(self, policy_name_or_id, rule_id):
"""Get a QoS minimum bandwidth rule by name or ID.
@ -1731,25 +1669,18 @@ class NetworkCloudMixin(_normalize.Normalizer):
raise exc.OpenStackCloudUnavailableExtension(
'QoS extension is not available on target cloud')
policy = self.get_qos_policy(policy_name_or_id)
policy = self.network.find_qos_policy(policy_name_or_id)
if not policy:
raise exc.OpenStackCloudResourceNotFound(
"QoS policy {name_or_id} not Found.".format(
name_or_id=policy_name_or_id))
resp = self.network.get(
"/qos/policies/{policy_id}/minimum_bandwidth_rules/{rule_id}".
format(policy_id=policy['id'], rule_id=rule_id))
data = proxy._json_response(
resp,
error_message="Error fetching QoS minimum_bandwidth rule {rule_id}"
" from {policy}".format(rule_id=rule_id,
policy=policy['id']))
return self._get_and_munchify('minimum_bandwidth_rule', data)
return self.network.get_qos_minimum_bandwidth_rule(rule_id, policy)
@_utils.valid_kwargs("direction")
def create_qos_minimum_bandwidth_rule(self, policy_name_or_id, min_kbps,
**kwargs):
def create_qos_minimum_bandwidth_rule(
self, policy_name_or_id, min_kbps, **kwargs
):
"""Create a QoS minimum bandwidth limit rule.
:param string policy_name_or_id: Name or ID of the QoS policy to which
@ -1765,22 +1696,19 @@ class NetworkCloudMixin(_normalize.Normalizer):
raise exc.OpenStackCloudUnavailableExtension(
'QoS extension is not available on target cloud')
policy = self.get_qos_policy(policy_name_or_id)
policy = self.network.find_qos_policy(policy_name_or_id)
if not policy:
raise exc.OpenStackCloudResourceNotFound(
"QoS policy {name_or_id} not Found.".format(
name_or_id=policy_name_or_id))
kwargs['min_kbps'] = min_kbps
data = self.network.post(
"/qos/policies/{policy_id}/minimum_bandwidth_rules".format(
policy_id=policy['id']),
json={'minimum_bandwidth_rule': kwargs})
return self._get_and_munchify('minimum_bandwidth_rule', data)
return self.network.create_qos_minimum_bandwidth_rule(policy, **kwargs)
@_utils.valid_kwargs("min_kbps", "direction")
def update_qos_minimum_bandwidth_rule(self, policy_name_or_id, rule_id,
**kwargs):
def update_qos_minimum_bandwidth_rule(
self, policy_name_or_id, rule_id, **kwargs
):
"""Update a QoS minimum bandwidth rule.
:param string policy_name_or_id: Name or ID of the QoS policy to which
@ -1797,7 +1725,7 @@ class NetworkCloudMixin(_normalize.Normalizer):
raise exc.OpenStackCloudUnavailableExtension(
'QoS extension is not available on target cloud')
policy = self.get_qos_policy(policy_name_or_id)
policy = self.network.find_qos_policy(policy_name_or_id)
if not policy:
raise exc.OpenStackCloudResourceNotFound(
"QoS policy {name_or_id} not Found.".format(
@ -1807,19 +1735,16 @@ class NetworkCloudMixin(_normalize.Normalizer):
self.log.debug("No QoS minimum bandwidth rule data to update")
return
curr_rule = self.get_qos_minimum_bandwidth_rule(
policy_name_or_id, rule_id)
curr_rule = self.network.get_qos_minimum_bandwidth_rule(
rule_id, policy)
if not curr_rule:
raise exc.OpenStackCloudException(
"QoS minimum_bandwidth_rule {rule_id} not found in policy "
"{policy_id}".format(rule_id=rule_id,
policy_id=policy['id']))
data = self.network.put(
"/qos/policies/{policy_id}/minimum_bandwidth_rules/{rule_id}".
format(policy_id=policy['id'], rule_id=rule_id),
json={'minimum_bandwidth_rule': kwargs})
return self._get_and_munchify('minimum_bandwidth_rule', data)
return self.network.update_qos_minimum_bandwidth_rule(
curr_rule, policy, **kwargs)
def delete_qos_minimum_bandwidth_rule(self, policy_name_or_id, rule_id):
"""Delete a QoS minimum bandwidth rule.
@ -1834,17 +1759,16 @@ class NetworkCloudMixin(_normalize.Normalizer):
raise exc.OpenStackCloudUnavailableExtension(
'QoS extension is not available on target cloud')
policy = self.get_qos_policy(policy_name_or_id)
policy = self.network.find_qos_policy(policy_name_or_id)
if not policy:
raise exc.OpenStackCloudResourceNotFound(
"QoS policy {name_or_id} not Found.".format(
name_or_id=policy_name_or_id))
try:
exceptions.raise_from_response(self.network.delete(
"/qos/policies/{policy}/minimum_bandwidth_rules/{rule}".
format(policy=policy['id'], rule=rule_id)))
except exc.OpenStackCloudURINotFound:
self.network.delete_qos_minimum_bandwidth_rule(
rule_id, policy, ignore_missing=False)
except exceptions.ResourceNotFound:
self.log.debug(
"QoS minimum bandwidth rule {rule_id} not found in policy "
"{policy_id}. Ignoring.".format(rule_id=rule_id,

View File

@ -38,7 +38,9 @@ class QoSPolicy(resource.Resource, resource.TagMixin):
name = resource.Body('name')
#: The ID of the project who owns the network. Only administrative
#: users can specify a project ID other than their own.
project_id = resource.Body('tenant_id')
project_id = resource.Body('project_id', alias='tenant_id')
#: Tenant_id (deprecated attribute).
tenant_id = resource.Body('tenant_id', deprecated=True)
#: The QoS policy description.
description = resource.Body('description')
#: Indicates whether this QoS policy is the default policy for this

View File

@ -29,6 +29,6 @@ class QoSRuleType(resource.Resource):
# Properties
#: QoS rule type name.
type = resource.Body('type')
type = resource.Body('type', alternate_id=True)
#: List of QoS backend drivers supporting this QoS rule type
drivers = resource.Body('drivers')

View File

@ -49,7 +49,7 @@ class TestQosPolicy(base.BaseFunctionalTest):
policy = self.operator_cloud.create_qos_policy(name=self.policy_name)
self.assertIn('id', policy)
self.assertEqual(self.policy_name, policy['name'])
self.assertFalse(policy['shared'])
self.assertFalse(policy['is_shared'])
self.assertFalse(policy['is_default'])
def test_create_qos_policy_shared(self):
@ -57,7 +57,7 @@ class TestQosPolicy(base.BaseFunctionalTest):
name=self.policy_name, shared=True)
self.assertIn('id', policy)
self.assertEqual(self.policy_name, policy['name'])
self.assertTrue(policy['shared'])
self.assertTrue(policy['is_shared'])
self.assertFalse(policy['is_default'])
def test_create_qos_policy_default(self):
@ -68,19 +68,19 @@ class TestQosPolicy(base.BaseFunctionalTest):
name=self.policy_name, default=True)
self.assertIn('id', policy)
self.assertEqual(self.policy_name, policy['name'])
self.assertFalse(policy['shared'])
self.assertFalse(policy['is_shared'])
self.assertTrue(policy['is_default'])
def test_update_qos_policy(self):
policy = self.operator_cloud.create_qos_policy(name=self.policy_name)
self.assertEqual(self.policy_name, policy['name'])
self.assertFalse(policy['shared'])
self.assertFalse(policy['is_shared'])
self.assertFalse(policy['is_default'])
updated_policy = self.operator_cloud.update_qos_policy(
policy['id'], shared=True, default=True)
self.assertEqual(self.policy_name, updated_policy['name'])
self.assertTrue(updated_policy['shared'])
self.assertTrue(updated_policy['is_shared'])
self.assertTrue(updated_policy['is_default'])
def test_list_qos_policies_filtered(self):

View File

@ -455,7 +455,8 @@ class TestMemoryCache(base.TestCase):
self.assertEqual([], self.cloud.list_flavors())
fake_flavor_dicts = [
_flavor.Flavor(connection=self.cloud, **f)
_flavor.Flavor(connection=self.cloud,
**f)._to_munch(original_names=False)
for f in fakes.FAKE_FLAVOR_LIST
]

View File

@ -16,6 +16,7 @@
import copy
from openstack.cloud import exc
from openstack.network.v2 import qos_bandwidth_limit_rule
from openstack.tests.unit import base
@ -67,6 +68,12 @@ class TestQosBandwidthLimitRule(base.TestCase):
enabled_neutron_extensions = [qos_extension,
qos_bw_limit_direction_extension]
def _compare_rules(self, exp, real):
self.assertDictEqual(
qos_bandwidth_limit_rule.QoSBandwidthLimitRule(**exp).to_dict(
computed=False),
real.to_dict(computed=False))
def test_get_qos_bandwidth_limit_rule(self):
self.register_uris([
dict(method='GET',
@ -75,12 +82,14 @@ class TestQosBandwidthLimitRule(base.TestCase):
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': self.enabled_neutron_extensions}),
'network', 'public',
append=['v2.0', 'qos', 'policies', self.policy_name]),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies']),
append=['v2.0', 'qos', 'policies'],
qs_elements=['name=%s' % self.policy_name]),
json={'policies': [self.mock_policy]}),
dict(method='GET',
uri=self.get_mock_url(
@ -91,7 +100,7 @@ class TestQosBandwidthLimitRule(base.TestCase):
])
r = self.cloud.get_qos_bandwidth_limit_rule(self.policy_name,
self.rule_id)
self.assertDictEqual(self.mock_rule, r)
self._compare_rules(self.mock_rule, r)
self.assert_calls()
def test_get_qos_bandwidth_limit_rule_no_qos_policy_found(self):
@ -102,12 +111,14 @@ class TestQosBandwidthLimitRule(base.TestCase):
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': self.enabled_neutron_extensions}),
'network', 'public',
append=['v2.0', 'qos', 'policies', self.policy_name]),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies']),
append=['v2.0', 'qos', 'policies'],
qs_elements=['name=%s' % self.policy_name]),
json={'policies': []})
])
self.assertRaises(
@ -137,12 +148,14 @@ class TestQosBandwidthLimitRule(base.TestCase):
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': self.enabled_neutron_extensions}),
'network', 'public',
append=['v2.0', 'qos', 'policies', self.policy_name]),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies']),
append=['v2.0', 'qos', 'policies'],
qs_elements=['name=%s' % self.policy_name]),
json={'policies': [self.mock_policy]}),
dict(method='POST',
uri=self.get_mock_url(
@ -153,7 +166,7 @@ class TestQosBandwidthLimitRule(base.TestCase):
])
rule = self.cloud.create_qos_bandwidth_limit_rule(
self.policy_name, max_kbps=self.rule_max_kbps)
self.assertDictEqual(self.mock_rule, rule)
self._compare_rules(self.mock_rule, rule)
self.assert_calls()
def test_create_qos_bandwidth_limit_rule_no_qos_extension(self):
@ -177,12 +190,14 @@ class TestQosBandwidthLimitRule(base.TestCase):
json={'extensions': [self.qos_extension]}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': [self.qos_extension]}),
'network', 'public',
append=['v2.0', 'qos', 'policies', self.policy_name]),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies']),
append=['v2.0', 'qos', 'policies'],
qs_elements=['name=%s' % self.policy_name]),
json={'policies': [self.mock_policy]}),
dict(method='GET',
uri=self.get_mock_url(
@ -197,7 +212,7 @@ class TestQosBandwidthLimitRule(base.TestCase):
])
rule = self.cloud.create_qos_bandwidth_limit_rule(
self.policy_name, max_kbps=self.rule_max_kbps, direction="ingress")
self.assertDictEqual(self.mock_rule, rule)
self._compare_rules(self.mock_rule, rule)
self.assert_calls()
def test_update_qos_bandwidth_limit_rule(self):
@ -207,29 +222,12 @@ class TestQosBandwidthLimitRule(base.TestCase):
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': self.enabled_neutron_extensions}),
json={'extensions': [self.qos_extension]}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies']),
json={'policies': [self.mock_policy]}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies']),
json={'policies': [self.mock_policy]}),
append=['v2.0', 'qos', 'policies', self.policy_id]),
json=self.mock_policy),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
@ -248,7 +246,7 @@ class TestQosBandwidthLimitRule(base.TestCase):
])
rule = self.cloud.update_qos_bandwidth_limit_rule(
self.policy_id, self.rule_id, max_kbps=self.rule_max_kbps + 100)
self.assertDictEqual(expected_rule, rule)
self._compare_rules(expected_rule, rule)
self.assert_calls()
def test_update_qos_bandwidth_limit_rule_no_qos_extension(self):
@ -272,38 +270,20 @@ class TestQosBandwidthLimitRule(base.TestCase):
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': [self.qos_extension]}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': [self.qos_extension]}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies']),
json={'policies': [self.mock_policy]}),
append=['v2.0', 'qos', 'policies', self.policy_id]),
json=self.mock_policy),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': [self.qos_extension]}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': [self.qos_extension]}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': [self.qos_extension]}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies']),
json={'policies': [self.mock_policy]}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies', self.policy_id,
'bandwidth_limit_rules',
self.rule_id]),
'bandwidth_limit_rules', self.rule_id]),
json={'bandwidth_limit_rule': self.mock_rule}),
dict(method='PUT',
uri=self.get_mock_url(
@ -321,7 +301,7 @@ class TestQosBandwidthLimitRule(base.TestCase):
direction="ingress")
# Even if there was attempt to change direction to 'ingress' it should
# be not changed in returned rule
self.assertDictEqual(expected_rule, rule)
self._compare_rules(expected_rule, rule)
self.assert_calls()
def test_delete_qos_bandwidth_limit_rule(self):
@ -332,12 +312,14 @@ class TestQosBandwidthLimitRule(base.TestCase):
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': self.enabled_neutron_extensions}),
'network', 'public',
append=['v2.0', 'qos', 'policies', self.policy_name]),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies']),
append=['v2.0', 'qos', 'policies'],
qs_elements=['name=%s' % self.policy_name]),
json={'policies': [self.mock_policy]}),
dict(method='DELETE',
uri=self.get_mock_url(
@ -372,12 +354,14 @@ class TestQosBandwidthLimitRule(base.TestCase):
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': self.enabled_neutron_extensions}),
'network', 'public',
append=['v2.0', 'qos', 'policies', self.policy_name]),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies']),
append=['v2.0', 'qos', 'policies'],
qs_elements=['name=%s' % self.policy_name]),
json={'policies': [self.mock_policy]}),
dict(method='DELETE',
uri=self.get_mock_url(

View File

@ -16,6 +16,7 @@
import copy
from openstack.cloud import exc
from openstack.network.v2 import qos_dscp_marking_rule
from openstack.tests.unit import base
@ -54,6 +55,12 @@ class TestQosDscpMarkingRule(base.TestCase):
enabled_neutron_extensions = [qos_extension]
def _compare_rules(self, exp, real):
self.assertDictEqual(
qos_dscp_marking_rule.QoSDSCPMarkingRule(**exp).to_dict(
computed=False),
real.to_dict(computed=False))
def test_get_qos_dscp_marking_rule(self):
self.register_uris([
dict(method='GET',
@ -62,12 +69,14 @@ class TestQosDscpMarkingRule(base.TestCase):
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': self.enabled_neutron_extensions}),
'network', 'public',
append=['v2.0', 'qos', 'policies', self.policy_name]),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies']),
append=['v2.0', 'qos', 'policies'],
qs_elements=['name=%s' % self.policy_name]),
json={'policies': [self.mock_policy]}),
dict(method='GET',
uri=self.get_mock_url(
@ -78,7 +87,7 @@ class TestQosDscpMarkingRule(base.TestCase):
])
r = self.cloud.get_qos_dscp_marking_rule(self.policy_name,
self.rule_id)
self.assertDictEqual(self.mock_rule, r)
self._compare_rules(self.mock_rule, r)
self.assert_calls()
def test_get_qos_dscp_marking_rule_no_qos_policy_found(self):
@ -89,12 +98,14 @@ class TestQosDscpMarkingRule(base.TestCase):
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': self.enabled_neutron_extensions}),
'network', 'public',
append=['v2.0', 'qos', 'policies', self.policy_name]),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies']),
append=['v2.0', 'qos', 'policies'],
qs_elements=['name=%s' % self.policy_name]),
json={'policies': []})
])
self.assertRaises(
@ -124,12 +135,14 @@ class TestQosDscpMarkingRule(base.TestCase):
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': self.enabled_neutron_extensions}),
'network', 'public',
append=['v2.0', 'qos', 'policies', self.policy_name]),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies']),
append=['v2.0', 'qos', 'policies'],
qs_elements=['name=%s' % self.policy_name]),
json={'policies': [self.mock_policy]}),
dict(method='POST',
uri=self.get_mock_url(
@ -140,7 +153,7 @@ class TestQosDscpMarkingRule(base.TestCase):
])
rule = self.cloud.create_qos_dscp_marking_rule(
self.policy_name, dscp_mark=self.rule_dscp_mark)
self.assertDictEqual(self.mock_rule, rule)
self._compare_rules(self.mock_rule, rule)
self.assert_calls()
def test_create_qos_dscp_marking_rule_no_qos_extension(self):
@ -165,28 +178,11 @@ class TestQosDscpMarkingRule(base.TestCase):
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies']),
json={'policies': [self.mock_policy]}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies']),
json={'policies': [self.mock_policy]}),
append=['v2.0', 'qos', 'policies', self.policy_id]),
json=self.mock_policy),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
@ -205,7 +201,7 @@ class TestQosDscpMarkingRule(base.TestCase):
])
rule = self.cloud.update_qos_dscp_marking_rule(
self.policy_id, self.rule_id, dscp_mark=new_dscp_mark_value)
self.assertDictEqual(expected_rule, rule)
self._compare_rules(expected_rule, rule)
self.assert_calls()
def test_update_qos_dscp_marking_rule_no_qos_extension(self):
@ -229,12 +225,14 @@ class TestQosDscpMarkingRule(base.TestCase):
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': self.enabled_neutron_extensions}),
'network', 'public',
append=['v2.0', 'qos', 'policies', self.policy_name]),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies']),
append=['v2.0', 'qos', 'policies'],
qs_elements=['name=%s' % self.policy_name]),
json={'policies': [self.mock_policy]}),
dict(method='DELETE',
uri=self.get_mock_url(
@ -270,12 +268,14 @@ class TestQosDscpMarkingRule(base.TestCase):
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': self.enabled_neutron_extensions}),
'network', 'public',
append=['v2.0', 'qos', 'policies', self.policy_name]),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies']),
append=['v2.0', 'qos', 'policies'],
qs_elements=['name=%s' % self.policy_name]),
json={'policies': [self.mock_policy]}),
dict(method='DELETE',
uri=self.get_mock_url(

View File

@ -16,6 +16,7 @@
import copy
from openstack.cloud import exc
from openstack.network.v2 import qos_minimum_bandwidth_rule
from openstack.tests.unit import base
@ -55,6 +56,12 @@ class TestQosMinimumBandwidthRule(base.TestCase):
enabled_neutron_extensions = [qos_extension]
def _compare_rules(self, exp, real):
self.assertDictEqual(
qos_minimum_bandwidth_rule.QoSMinimumBandwidthRule(
**exp).to_dict(computed=False),
real.to_dict(computed=False))
def test_get_qos_minimum_bandwidth_rule(self):
self.register_uris([
dict(method='GET',
@ -63,13 +70,16 @@ class TestQosMinimumBandwidthRule(base.TestCase):
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': self.enabled_neutron_extensions}),
'network', 'public',
append=['v2.0', 'qos', 'policies', self.policy_name]),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies']),
append=['v2.0', 'qos', 'policies'],
qs_elements=['name=%s' % self.policy_name]),
json={'policies': [self.mock_policy]}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
@ -79,7 +89,7 @@ class TestQosMinimumBandwidthRule(base.TestCase):
])
r = self.cloud.get_qos_minimum_bandwidth_rule(self.policy_name,
self.rule_id)
self.assertDictEqual(self.mock_rule, r)
self._compare_rules(self.mock_rule, r)
self.assert_calls()
def test_get_qos_minimum_bandwidth_rule_no_qos_policy_found(self):
@ -90,13 +100,15 @@ class TestQosMinimumBandwidthRule(base.TestCase):
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': self.enabled_neutron_extensions}),
'network', 'public',
append=['v2.0', 'qos', 'policies', self.policy_name]),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies']),
json={'policies': []})
append=['v2.0', 'qos', 'policies'],
qs_elements=['name=%s' % self.policy_name]),
json={'policies': []}),
])
self.assertRaises(
exc.OpenStackCloudResourceNotFound,
@ -125,12 +137,14 @@ class TestQosMinimumBandwidthRule(base.TestCase):
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': self.enabled_neutron_extensions}),
'network', 'public',
append=['v2.0', 'qos', 'policies', self.policy_name]),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies']),
append=['v2.0', 'qos', 'policies'],
qs_elements=['name=%s' % self.policy_name]),
json={'policies': [self.mock_policy]}),
dict(method='POST',
uri=self.get_mock_url(
@ -141,7 +155,7 @@ class TestQosMinimumBandwidthRule(base.TestCase):
])
rule = self.cloud.create_qos_minimum_bandwidth_rule(
self.policy_name, min_kbps=self.rule_min_kbps)
self.assertDictEqual(self.mock_rule, rule)
self._compare_rules(self.mock_rule, rule)
self.assert_calls()
def test_create_qos_minimum_bandwidth_rule_no_qos_extension(self):
@ -165,28 +179,11 @@ class TestQosMinimumBandwidthRule(base.TestCase):
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies']),
json={'policies': [self.mock_policy]}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies']),
json={'policies': [self.mock_policy]}),
append=['v2.0', 'qos', 'policies', self.policy_id]),
json=self.mock_policy),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
@ -205,7 +202,7 @@ class TestQosMinimumBandwidthRule(base.TestCase):
])
rule = self.cloud.update_qos_minimum_bandwidth_rule(
self.policy_id, self.rule_id, min_kbps=self.rule_min_kbps + 100)
self.assertDictEqual(expected_rule, rule)
self._compare_rules(expected_rule, rule)
self.assert_calls()
def test_update_qos_minimum_bandwidth_rule_no_qos_extension(self):
@ -229,12 +226,14 @@ class TestQosMinimumBandwidthRule(base.TestCase):
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': self.enabled_neutron_extensions}),
'network', 'public',
append=['v2.0', 'qos', 'policies', self.policy_name]),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies']),
append=['v2.0', 'qos', 'policies'],
qs_elements=['name=%s' % self.policy_name]),
json={'policies': [self.mock_policy]}),
dict(method='DELETE',
uri=self.get_mock_url(
@ -269,12 +268,14 @@ class TestQosMinimumBandwidthRule(base.TestCase):
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': self.enabled_neutron_extensions}),
'network', 'public',
append=['v2.0', 'qos', 'policies', self.policy_name]),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies']),
append=['v2.0', 'qos', 'policies'],
qs_elements=['name=%s' % self.policy_name]),
json={'policies': [self.mock_policy]}),
dict(method='DELETE',
uri=self.get_mock_url(

View File

@ -16,6 +16,7 @@
import copy
from openstack.cloud import exc
from openstack.network.v2 import qos_policy as _policy
from openstack.tests.unit import base
@ -33,7 +34,8 @@ class TestQosPolicy(base.TestCase):
'project_id': project_id,
'tenant_id': project_id,
'shared': False,
'is_default': False
'is_default': False,
'tags': [],
}
qos_extension = {
@ -54,6 +56,11 @@ class TestQosPolicy(base.TestCase):
enabled_neutron_extensions = [qos_extension, qos_default_extension]
def _compare_policies(self, exp, real):
self.assertDictEqual(
_policy.QoSPolicy(**exp).to_dict(computed=False),
real.to_dict(computed=False))
def test_get_qos_policy(self):
self.register_uris([
dict(method='GET',
@ -63,12 +70,19 @@ class TestQosPolicy(base.TestCase):
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies']),
json={'policies': [self.mock_policy]})
append=['v2.0', 'qos',
'policies', self.policy_name]),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies'],
qs_elements=['name=%s' % self.policy_name]),
json={'policies': [self.mock_policy]}),
])
r = self.cloud.get_qos_policy(self.policy_name)
self.assertIsNotNone(r)
self.assertDictEqual(self.mock_policy, r)
self._compare_policies(self.mock_policy, r)
self.assert_calls()
def test_get_qos_policy_no_qos_extension(self):
@ -97,7 +111,7 @@ class TestQosPolicy(base.TestCase):
])
policy = self.cloud.create_qos_policy(
name=self.policy_name, project_id=self.project_id)
self.assertDictEqual(self.mock_policy, policy)
self._compare_policies(self.mock_policy, policy)
self.assert_calls()
def test_create_qos_policy_no_qos_extension(self):
@ -134,7 +148,7 @@ class TestQosPolicy(base.TestCase):
])
policy = self.cloud.create_qos_policy(
name=self.policy_name, project_id=self.project_id, default=True)
self.assertDictEqual(self.mock_policy, policy)
self._compare_policies(self.mock_policy, policy)
self.assert_calls()
def test_delete_qos_policy(self):
@ -145,12 +159,15 @@ class TestQosPolicy(base.TestCase):
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': self.enabled_neutron_extensions}),
'network', 'public',
append=['v2.0', 'qos', 'policies',
self.policy_name]),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies']),
append=['v2.0', 'qos', 'policies'],
qs_elements=['name=%s' % self.policy_name]),
json={'policies': [self.mock_policy]}),
dict(method='DELETE',
uri=self.get_mock_url(
@ -181,12 +198,14 @@ class TestQosPolicy(base.TestCase):
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': self.enabled_neutron_extensions}),
'network', 'public',
append=['v2.0', 'qos', 'policies', 'goofy']),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies']),
append=['v2.0', 'qos', 'policies'],
qs_elements=['name=goofy']),
json={'policies': []})
])
self.assertFalse(self.cloud.delete_qos_policy('goofy'))
@ -202,36 +221,34 @@ class TestQosPolicy(base.TestCase):
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': self.enabled_neutron_extensions}),
'network', 'public',
append=['v2.0', 'qos', 'policies',
self.policy_name]),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies']),
json={'policies': [policy1, policy2]})
append=['v2.0', 'qos', 'policies'],
qs_elements=['name=%s' % self.policy_name]),
json={'policies': [policy1, policy2]}),
])
self.assertRaises(exc.OpenStackCloudException,
self.cloud.delete_qos_policy,
self.policy_name)
self.assert_calls()
def test_delete_qos_policy_multiple_using_id(self):
def test_delete_qos_policy_using_id(self):
policy1 = self.mock_policy
policy2 = dict(id='456', name=self.policy_name)
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies']),
json={'policies': [policy1, policy2]}),
append=['v2.0', 'qos', 'policies', policy1['id']]),
json=policy1),
dict(method='DELETE',
uri=self.get_mock_url(
'network', 'public',
@ -249,15 +266,11 @@ class TestQosPolicy(base.TestCase):
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': self.enabled_neutron_extensions}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies']),
json={'policies': [self.mock_policy]}),
append=['v2.0', 'qos', 'policies', self.policy_id]),
json=self.mock_policy),
dict(method='PUT',
uri=self.get_mock_url(
'network', 'public',
@ -268,7 +281,7 @@ class TestQosPolicy(base.TestCase):
])
policy = self.cloud.update_qos_policy(
self.policy_id, name='goofy')
self.assertDictEqual(expected_policy, policy)
self._compare_policies(expected_policy, policy)
self.assert_calls()
def test_update_qos_policy_no_qos_extension(self):
@ -295,15 +308,11 @@ class TestQosPolicy(base.TestCase):
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': [self.qos_extension]}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public', append=['v2.0', 'extensions']),
json={'extensions': [self.qos_extension]}),
dict(method='GET',
uri=self.get_mock_url(
'network', 'public',
append=['v2.0', 'qos', 'policies']),
json={'policies': [self.mock_policy]}),
append=['v2.0', 'qos', 'policies', self.policy_id]),
json=self.mock_policy),
dict(method='PUT',
uri=self.get_mock_url(
'network', 'public',
@ -314,5 +323,5 @@ class TestQosPolicy(base.TestCase):
])
policy = self.cloud.update_qos_policy(
self.policy_id, name='goofy', default=True)
self.assertDictEqual(expected_policy, policy)
self._compare_policies(expected_policy, policy)
self.assert_calls()

View File

@ -14,6 +14,7 @@
# limitations under the License.
from openstack.cloud import exc
from openstack.network.v2 import qos_rule_type
from openstack.tests.unit import base
@ -66,6 +67,11 @@ class TestQosRuleType(base.TestCase):
'type': rule_type_name
}
def _compare_rule_types(self, exp, real):
self.assertDictEqual(
qos_rule_type.QoSRuleType(**exp).to_dict(computed=False),
real.to_dict(computed=False))
def test_list_qos_rule_types(self):
self.register_uris([
dict(method='GET',
@ -79,7 +85,8 @@ class TestQosRuleType(base.TestCase):
json={'rule_types': self.mock_rule_types})
])
rule_types = self.cloud.list_qos_rule_types()
self.assertEqual(self.mock_rule_types, rule_types)
for a, b in zip(self.mock_rule_types, rule_types):
self._compare_rule_types(a, b)
self.assert_calls()
def test_list_qos_rule_types_no_qos_extension(self):
@ -114,7 +121,8 @@ class TestQosRuleType(base.TestCase):
self.rule_type_name]),
json={'rule_type': self.mock_rule_type_details})
])
self.assertEqual(
self._compare_rule_types(
self.mock_rule_type_details,
self.cloud.get_qos_rule_type_details(self.rule_type_name)
)

View File

@ -17,6 +17,7 @@ import testtools
from openstack.cloud import exc
from openstack import connection
from openstack import exceptions
from openstack.tests import fakes
from openstack.tests.unit import base
from openstack import utils
@ -637,8 +638,7 @@ class TestShade(base.TestCase):
status_code=404)
])
with testtools.ExpectedException(
exc.OpenStackCloudURINotFound,
"Error fetching extension list for neutron"
exceptions.ResourceNotFound
):
self.cloud._neutron_extensions()

View File

@ -45,8 +45,8 @@ class TestQoSPolicy(base.TestCase):
self.assertEqual(EXAMPLE['id'], sot.id)
self.assertEqual(EXAMPLE['description'], sot.description)
self.assertEqual(EXAMPLE['name'], sot.name)
self.assertTrue(sot.is_shared)
self.assertEqual(EXAMPLE['tenant_id'], sot.project_id)
self.assertEqual(EXAMPLE['tenant_id'], sot.tenant_id)
self.assertEqual(EXAMPLE['rules'], sot.rules)
self.assertEqual(EXAMPLE['is_default'], sot.is_default)
self.assertEqual(EXAMPLE['tags'], sot.tags)