Rework network functional tests

Fix skip conditions of the functional tests in the network area and
switch to use of the proper cloud. This is required to allow executing
functional tests on real OpenStack clouds differentiating regular user
vs administrator accounts.

Change-Id: I792dd52136c31fd95cd98be903f12a9273fa6c76
This commit is contained in:
Artem Goncharov 2022-09-23 17:52:16 +02:00
parent 0a313734b1
commit 3159933778
46 changed files with 1251 additions and 844 deletions

View File

@ -1,6 +1,7 @@
# This file contains list of tests that can work with regular user privileges
# Until all tests are modified to properly identify whether they are able to
# run or must skip the ones that are known to work are listed here.
openstack.tests.functional.network
openstack.tests.functional.block_storage.v3.test_volume
# Do not enable test_backup for now, since it is not capable to determine
# backup capabilities of the cloud

View File

@ -61,6 +61,8 @@ class BaseFunctionalTest(base.TestCase):
self._set_user_cloud()
if self._op_name:
self._set_operator_cloud()
else:
self.operator_cloud = None
self.identity_version = \
self.user_cloud.config.get_api_version('identity')

View File

@ -18,64 +18,71 @@ from openstack.tests.functional import base
class TestAddressGroup(base.BaseFunctionalTest):
ADDRESS_GROUP_ID = None
ADDRESSES = ['10.0.0.1/32', '2001:db8::/32']
ADDRESSES = ["10.0.0.1/32", "2001:db8::/32"]
def setUp(self):
super(TestAddressGroup, self).setUp()
# Skip the tests if address group extension is not enabled.
if not self.conn.network.find_extension('address-group'):
self.skipTest('Network Address Group extension disabled')
if not self.user_cloud.network.find_extension("address-group"):
self.skipTest("Network Address Group extension disabled")
self.ADDRESS_GROUP_NAME = self.getUniqueString()
self.ADDRESS_GROUP_DESCRIPTION = self.getUniqueString()
self.ADDRESS_GROUP_NAME_UPDATED = self.getUniqueString()
self.ADDRESS_GROUP_DESCRIPTION_UPDATED = self.getUniqueString()
address_group = self.conn.network.create_address_group(
address_group = self.user_cloud.network.create_address_group(
name=self.ADDRESS_GROUP_NAME,
description=self.ADDRESS_GROUP_DESCRIPTION,
addresses=self.ADDRESSES
addresses=self.ADDRESSES,
)
assert isinstance(address_group, _address_group.AddressGroup)
self.assertEqual(self.ADDRESS_GROUP_NAME, address_group.name)
self.assertEqual(self.ADDRESS_GROUP_DESCRIPTION,
address_group.description)
self.assertEqual(
self.ADDRESS_GROUP_DESCRIPTION, address_group.description
)
self.assertCountEqual(self.ADDRESSES, address_group.addresses)
self.ADDRESS_GROUP_ID = address_group.id
def tearDown(self):
sot = self.conn.network.delete_address_group(self.ADDRESS_GROUP_ID)
sot = self.user_cloud.network.delete_address_group(
self.ADDRESS_GROUP_ID)
self.assertIsNone(sot)
super(TestAddressGroup, self).tearDown()
def test_find(self):
sot = self.conn.network.find_address_group(self.ADDRESS_GROUP_NAME)
sot = self.user_cloud.network.find_address_group(
self.ADDRESS_GROUP_NAME)
self.assertEqual(self.ADDRESS_GROUP_ID, sot.id)
def test_get(self):
sot = self.conn.network.get_address_group(self.ADDRESS_GROUP_ID)
sot = self.user_cloud.network.get_address_group(self.ADDRESS_GROUP_ID)
self.assertEqual(self.ADDRESS_GROUP_NAME, sot.name)
def test_list(self):
names = [ag.name for ag in self.conn.network.address_groups()]
names = [ag.name for ag in self.user_cloud.network.address_groups()]
self.assertIn(self.ADDRESS_GROUP_NAME, names)
def test_update(self):
sot = self.conn.network.update_address_group(
sot = self.user_cloud.network.update_address_group(
self.ADDRESS_GROUP_ID,
name=self.ADDRESS_GROUP_NAME_UPDATED,
description=self.ADDRESS_GROUP_DESCRIPTION_UPDATED)
description=self.ADDRESS_GROUP_DESCRIPTION_UPDATED,
)
self.assertEqual(self.ADDRESS_GROUP_NAME_UPDATED, sot.name)
self.assertEqual(self.ADDRESS_GROUP_DESCRIPTION_UPDATED,
sot.description)
self.assertEqual(
self.ADDRESS_GROUP_DESCRIPTION_UPDATED, sot.description
)
def test_add_remove_addresses(self):
addrs = ['127.0.0.1/32', 'fe80::/10']
sot = self.conn.network.add_addresses_to_address_group(
self.ADDRESS_GROUP_ID, addrs)
addrs = ["127.0.0.1/32", "fe80::/10"]
sot = self.user_cloud.network.add_addresses_to_address_group(
self.ADDRESS_GROUP_ID, addrs
)
updated_addrs = self.ADDRESSES.copy()
updated_addrs.extend(addrs)
self.assertCountEqual(updated_addrs, sot.addresses)
sot = self.conn.network.remove_addresses_from_address_group(
self.ADDRESS_GROUP_ID, addrs)
sot = self.user_cloud.network.remove_addresses_from_address_group(
self.ADDRESS_GROUP_ID, addrs
)
self.assertCountEqual(self.ADDRESSES, sot.addresses)

View File

@ -25,7 +25,7 @@ class TestAddressScope(base.BaseFunctionalTest):
super(TestAddressScope, self).setUp()
self.ADDRESS_SCOPE_NAME = self.getUniqueString()
self.ADDRESS_SCOPE_NAME_UPDATED = self.getUniqueString()
address_scope = self.conn.network.create_address_scope(
address_scope = self.user_cloud.network.create_address_scope(
ip_version=self.IP_VERSION,
name=self.ADDRESS_SCOPE_NAME,
shared=self.IS_SHARED,
@ -35,26 +35,28 @@ class TestAddressScope(base.BaseFunctionalTest):
self.ADDRESS_SCOPE_ID = address_scope.id
def tearDown(self):
sot = self.conn.network.delete_address_scope(self.ADDRESS_SCOPE_ID)
sot = self.user_cloud.network.delete_address_scope(
self.ADDRESS_SCOPE_ID)
self.assertIsNone(sot)
super(TestAddressScope, self).tearDown()
def test_find(self):
sot = self.conn.network.find_address_scope(self.ADDRESS_SCOPE_NAME)
sot = self.user_cloud.network.find_address_scope(
self.ADDRESS_SCOPE_NAME)
self.assertEqual(self.ADDRESS_SCOPE_ID, sot.id)
def test_get(self):
sot = self.conn.network.get_address_scope(self.ADDRESS_SCOPE_ID)
sot = self.user_cloud.network.get_address_scope(self.ADDRESS_SCOPE_ID)
self.assertEqual(self.ADDRESS_SCOPE_NAME, sot.name)
self.assertEqual(self.IS_SHARED, sot.is_shared)
self.assertEqual(self.IP_VERSION, sot.ip_version)
def test_list(self):
names = [o.name for o in self.conn.network.address_scopes()]
names = [o.name for o in self.user_cloud.network.address_scopes()]
self.assertIn(self.ADDRESS_SCOPE_NAME, names)
def test_update(self):
sot = self.conn.network.update_address_scope(
self.ADDRESS_SCOPE_ID,
name=self.ADDRESS_SCOPE_NAME_UPDATED)
sot = self.user_cloud.network.update_address_scope(
self.ADDRESS_SCOPE_ID, name=self.ADDRESS_SCOPE_NAME_UPDATED
)
self.assertEqual(self.ADDRESS_SCOPE_NAME_UPDATED, sot.name)

View File

@ -19,7 +19,7 @@ from openstack.tests.functional import base
class TestAgent(base.BaseFunctionalTest):
AGENT = None
DESC = 'test description'
DESC = "test description"
def validate_uuid(self, s):
try:
@ -30,21 +30,27 @@ class TestAgent(base.BaseFunctionalTest):
def setUp(self):
super(TestAgent, self).setUp()
agent_list = list(self.conn.network.agents())
if not self.user_cloud._has_neutron_extension("agent"):
self.skipTest("Neutron agent extension is required for this test")
agent_list = list(self.user_cloud.network.agents())
if len(agent_list) == 0:
self.skipTest("No agents available")
self.AGENT = agent_list[0]
assert isinstance(self.AGENT, agent.Agent)
def test_list(self):
agent_list = list(self.conn.network.agents())
agent_list = list(self.user_cloud.network.agents())
self.AGENT = agent_list[0]
assert isinstance(self.AGENT, agent.Agent)
self.assertTrue(self.validate_uuid(self.AGENT.id))
def test_get(self):
sot = self.conn.network.get_agent(self.AGENT.id)
sot = self.user_cloud.network.get_agent(self.AGENT.id)
self.assertEqual(self.AGENT.id, sot.id)
def test_update(self):
sot = self.conn.network.update_agent(self.AGENT.id,
description=self.DESC)
sot = self.user_cloud.network.update_agent(
self.AGENT.id, description=self.DESC
)
self.assertEqual(self.DESC, sot.description)

View File

@ -23,33 +23,43 @@ class TestAgentNetworks(base.BaseFunctionalTest):
def setUp(self):
super(TestAgentNetworks, self).setUp()
if not self.user_cloud._has_neutron_extension("agent"):
self.skipTest("Neutron agent extension is required for this test")
self.NETWORK_NAME = self.getUniqueString('network')
net = self.conn.network.create_network(name=self.NETWORK_NAME)
self.addCleanup(self.conn.network.delete_network, net.id)
self.NETWORK_NAME = self.getUniqueString("network")
net = self.user_cloud.network.create_network(name=self.NETWORK_NAME)
self.addCleanup(self.user_cloud.network.delete_network, net.id)
assert isinstance(net, network.Network)
self.NETWORK_ID = net.id
agent_list = list(self.conn.network.agents())
agents = [agent for agent in agent_list
if agent.agent_type == 'DHCP agent']
agent_list = list(self.user_cloud.network.agents())
agents = [
agent for agent in agent_list if agent.agent_type == "DHCP agent"
]
if len(agent_list) == 0:
self.skipTest("No agents available")
self.AGENT = agents[0]
self.AGENT_ID = self.AGENT.id
def test_add_remove_agent(self):
net = self.AGENT.add_agent_to_network(self.conn.network,
network_id=self.NETWORK_ID)
net = self.AGENT.add_agent_to_network(
self.user_cloud.network, network_id=self.NETWORK_ID
)
self._verify_add(net)
net = self.AGENT.remove_agent_from_network(self.conn.network,
network_id=self.NETWORK_ID)
net = self.AGENT.remove_agent_from_network(
self.user_cloud.network, network_id=self.NETWORK_ID
)
self._verify_remove(net)
def _verify_add(self, network):
net = self.conn.network.dhcp_agent_hosting_networks(self.AGENT_ID)
net = self.user_cloud.network.dhcp_agent_hosting_networks(
self.AGENT_ID)
net_ids = [n.id for n in net]
self.assertIn(self.NETWORK_ID, net_ids)
def _verify_remove(self, network):
net = self.conn.network.dhcp_agent_hosting_networks(self.AGENT_ID)
net = self.user_cloud.network.dhcp_agent_hosting_networks(
self.AGENT_ID)
net_ids = [n.id for n in net]
self.assertNotIn(self.NETWORK_ID, net_ids)

View File

@ -22,24 +22,32 @@ class TestAgentRouters(base.BaseFunctionalTest):
def setUp(self):
super(TestAgentRouters, self).setUp()
if not self.user_cloud._has_neutron_extension("agent"):
self.skipTest("Neutron agent extension is required for this test")
self.ROUTER_NAME = 'router-name-' + self.getUniqueString('router-name')
self.ROUTER = self.conn.network.create_router(name=self.ROUTER_NAME)
self.addCleanup(self.conn.network.delete_router, self.ROUTER)
self.ROUTER_NAME = "router-name-" + self.getUniqueString("router-name")
self.ROUTER = self.user_cloud.network.create_router(
name=self.ROUTER_NAME)
self.addCleanup(self.user_cloud.network.delete_router, self.ROUTER)
assert isinstance(self.ROUTER, router.Router)
agent_list = list(self.conn.network.agents())
agents = [agent for agent in agent_list
if agent.agent_type == 'L3 agent']
agent_list = list(self.user_cloud.network.agents())
agents = [
agent for agent in agent_list if agent.agent_type == "L3 agent"
]
if len(agent_list) == 0:
self.skipTest("No agents available")
self.AGENT = agents[0]
def test_add_router_to_agent(self):
self.conn.network.add_router_to_agent(self.AGENT, self.ROUTER)
rots = self.conn.network.agent_hosted_routers(self.AGENT)
self.user_cloud.network.add_router_to_agent(self.AGENT, self.ROUTER)
rots = self.user_cloud.network.agent_hosted_routers(self.AGENT)
routers = [router.id for router in rots]
self.assertIn(self.ROUTER.id, routers)
def test_remove_router_from_agent(self):
self.conn.network.remove_router_from_agent(self.AGENT, self.ROUTER)
rots = self.conn.network.agent_hosted_routers(self.AGENT)
self.user_cloud.network.remove_router_from_agent(
self.AGENT, self.ROUTER)
rots = self.user_cloud.network.agent_hosted_routers(self.AGENT)
routers = [router.id for router in rots]
self.assertNotIn(self.ROUTER.id, routers)

View File

@ -15,46 +15,62 @@ from openstack.tests.functional import base
class TestAutoAllocatedTopology(base.BaseFunctionalTest):
NETWORK_NAME = 'auto_allocated_network'
NETWORK_NAME = "auto_allocated_network"
NETWORK_ID = None
PROJECT_ID = None
def setUp(self):
super(TestAutoAllocatedTopology, self).setUp()
projects = [o.project_id for o in self.conn.network.networks()]
if not self.operator_cloud:
self.skipTest("Operator cloud is required for this test")
if not self.operator_cloud._has_neutron_extension(
"auto-allocated-topology"
):
self.skipTest(
"Neutron auto-allocated-topology extension is "
"required for this test"
)
projects = [
o.project_id
for o in self.operator_cloud.network.networks()]
self.PROJECT_ID = projects[0]
def tearDown(self):
res = self.conn.network.delete_auto_allocated_topology(self.PROJECT_ID)
res = self.operator_cloud.network.delete_auto_allocated_topology(
self.PROJECT_ID)
self.assertIsNone(res)
super(TestAutoAllocatedTopology, self).tearDown()
def test_dry_run_option_pass(self):
# Dry run will only pass if there is a public network
networks = self.conn.network.networks()
networks = self.operator_cloud.network.networks()
self._set_network_external(networks)
# Dry run option will return "dry-run=pass" in the 'id' resource
top = self.conn.network.validate_auto_allocated_topology(
self.PROJECT_ID)
top = self.operator_cloud.network.validate_auto_allocated_topology(
self.PROJECT_ID
)
self.assertEqual(self.PROJECT_ID, top.project)
self.assertEqual('dry-run=pass', top.id)
self.assertEqual("dry-run=pass", top.id)
def test_show_no_project_option(self):
top = self.conn.network.get_auto_allocated_topology()
top = self.operator_cloud.network.get_auto_allocated_topology()
project = self.conn.session.get_project_id()
network = self.conn.network.get_network(top.id)
network = self.operator_cloud.network.get_network(top.id)
self.assertEqual(top.project_id, project)
self.assertEqual(top.id, network.id)
def test_show_project_option(self):
top = self.conn.network.get_auto_allocated_topology(self.PROJECT_ID)
network = self.conn.network.get_network(top.id)
top = self.operator_cloud.network.get_auto_allocated_topology(
self.PROJECT_ID)
network = self.operator_cloud.network.get_network(top.id)
self.assertEqual(top.project_id, network.project_id)
self.assertEqual(top.id, network.id)
self.assertEqual(network.name, 'auto_allocated_network')
self.assertEqual(network.name, "auto_allocated_network")
def _set_network_external(self, networks):
for network in networks:
if network.name == 'public':
self.conn.network.update_network(network, is_default=True)
if network.name == "public":
self.operator_cloud.network.update_network(
network, is_default=True)

View File

@ -15,12 +15,11 @@ from openstack.tests.functional import base
class TestAvailabilityZone(base.BaseFunctionalTest):
def test_list(self):
availability_zones = list(self.conn.network.availability_zones())
self.assertGreater(len(availability_zones), 0)
availability_zones = list(self.user_cloud.network.availability_zones())
if len(availability_zones) > 0:
for az in availability_zones:
self.assertIsInstance(az.name, str)
self.assertIsInstance(az.resource, str)
self.assertIsInstance(az.state, str)
for az in availability_zones:
self.assertIsInstance(az.name, str)
self.assertIsInstance(az.resource, str)
self.assertIsInstance(az.state, str)

View File

@ -21,34 +21,44 @@ class TestDVRRouter(base.BaseFunctionalTest):
def setUp(self):
super(TestDVRRouter, self).setUp()
if not self.operator_cloud:
# Current policies forbid regular user use it
self.skipTest("Operator cloud is required for this test")
if not self.operator_cloud._has_neutron_extension("dvr"):
self.skipTest("dvr service not supported by cloud")
self.NAME = self.getUniqueString()
self.UPDATE_NAME = self.getUniqueString()
sot = self.conn.network.create_router(name=self.NAME, distributed=True)
sot = self.operator_cloud.network.create_router(
name=self.NAME, distributed=True)
assert isinstance(sot, router.Router)
self.assertEqual(self.NAME, sot.name)
self.ID = sot.id
def tearDown(self):
sot = self.conn.network.delete_router(self.ID, ignore_missing=False)
sot = self.operator_cloud.network.delete_router(
self.ID, ignore_missing=False)
self.assertIsNone(sot)
super(TestDVRRouter, self).tearDown()
def test_find(self):
sot = self.conn.network.find_router(self.NAME)
sot = self.operator_cloud.network.find_router(self.NAME)
self.assertEqual(self.ID, sot.id)
def test_get(self):
sot = self.conn.network.get_router(self.ID)
sot = self.operator_cloud.network.get_router(self.ID)
self.assertEqual(self.NAME, sot.name)
self.assertEqual(self.ID, sot.id)
self.assertTrue(sot.is_distributed)
def test_list(self):
names = [o.name for o in self.conn.network.routers()]
names = [o.name for o in self.operator_cloud.network.routers()]
self.assertIn(self.NAME, names)
dvr = [o.is_distributed for o in self.conn.network.routers()]
dvr = [o.is_distributed for o in self.operator_cloud.network.routers()]
self.assertTrue(dvr)
def test_update(self):
sot = self.conn.network.update_router(self.ID, name=self.UPDATE_NAME)
sot = self.operator_cloud.network.update_router(
self.ID, name=self.UPDATE_NAME)
self.assertEqual(self.UPDATE_NAME, sot.name)

View File

@ -15,9 +15,8 @@ from openstack.tests.functional import base
class TestExtension(base.BaseFunctionalTest):
def test_list(self):
extensions = list(self.conn.network.extensions())
extensions = list(self.user_cloud.network.extensions())
self.assertGreater(len(extensions), 0)
for ext in extensions:
@ -25,5 +24,5 @@ class TestExtension(base.BaseFunctionalTest):
self.assertIsInstance(ext.alias, str)
def test_find(self):
extension = self.conn.network.find_extension('external-net')
self.assertEqual('Neutron external network', extension.name)
extension = self.user_cloud.network.find_extension("external-net")
self.assertEqual("Neutron external network", extension.name)

View File

@ -24,29 +24,30 @@ class TestFirewallGroup(base.BaseFunctionalTest):
def setUp(self):
super(TestFirewallGroup, self).setUp()
if not self.conn._has_neutron_extension('fwaas_v2'):
self.skipTest('fwaas_v2 service not supported by cloud')
if not self.user_cloud._has_neutron_extension("fwaas_v2"):
self.skipTest("fwaas_v2 service not supported by cloud")
self.NAME = self.getUniqueString()
sot = self.conn.network.create_firewall_group(name=self.NAME)
sot = self.user_cloud.network.create_firewall_group(name=self.NAME)
assert isinstance(sot, firewall_group.FirewallGroup)
self.assertEqual(self.NAME, sot.name)
self.ID = sot.id
def tearDown(self):
sot = self.conn.network.delete_firewall_group(self.ID,
ignore_missing=False)
sot = self.user_cloud.network.delete_firewall_group(
self.ID, ignore_missing=False
)
self.assertIs(None, sot)
super(TestFirewallGroup, self).tearDown()
def test_find(self):
sot = self.conn.network.find_firewall_group(self.NAME)
sot = self.user_cloud.network.find_firewall_group(self.NAME)
self.assertEqual(self.ID, sot.id)
def test_get(self):
sot = self.conn.network.get_firewall_group(self.ID)
sot = self.user_cloud.network.get_firewall_group(self.ID)
self.assertEqual(self.NAME, sot.name)
self.assertEqual(self.ID, sot.id)
def test_list(self):
names = [o.name for o in self.conn.network.firewall_groups()]
names = [o.name for o in self.user_cloud.network.firewall_groups()]
self.assertIn(self.NAME, names)

View File

@ -24,29 +24,30 @@ class TestFirewallPolicy(base.BaseFunctionalTest):
def setUp(self):
super(TestFirewallPolicy, self).setUp()
if not self.conn._has_neutron_extension('fwaas_v2'):
self.skipTest('fwaas_v2 service not supported by cloud')
if not self.user_cloud._has_neutron_extension("fwaas_v2"):
self.skipTest("fwaas_v2 service not supported by cloud")
self.NAME = self.getUniqueString()
sot = self.conn.network.create_firewall_policy(name=self.NAME)
sot = self.user_cloud.network.create_firewall_policy(name=self.NAME)
assert isinstance(sot, firewall_policy.FirewallPolicy)
self.assertEqual(self.NAME, sot.name)
self.ID = sot.id
def tearDown(self):
sot = self.conn.network.delete_firewall_policy(self.ID,
ignore_missing=False)
sot = self.user_cloud.network.delete_firewall_policy(
self.ID, ignore_missing=False
)
self.assertIs(None, sot)
super(TestFirewallPolicy, self).tearDown()
def test_find(self):
sot = self.conn.network.find_firewall_policy(self.NAME)
sot = self.user_cloud.network.find_firewall_policy(self.NAME)
self.assertEqual(self.ID, sot.id)
def test_get(self):
sot = self.conn.network.get_firewall_policy(self.ID)
sot = self.user_cloud.network.get_firewall_policy(self.ID)
self.assertEqual(self.NAME, sot.name)
self.assertEqual(self.ID, sot.id)
def test_list(self):
names = [o.name for o in self.conn.network.firewall_policies()]
names = [o.name for o in self.user_cloud.network.firewall_policies()]
self.assertIn(self.NAME, names)

View File

@ -20,41 +20,47 @@ from openstack.tests.functional import base
class TestFirewallRule(base.BaseFunctionalTest):
ACTION = 'allow'
DEST_IP = '10.0.0.0/24'
DEST_PORT = '80'
ACTION = "allow"
DEST_IP = "10.0.0.0/24"
DEST_PORT = "80"
IP_VERSION = 4
PROTOCOL = 'tcp'
SOUR_IP = '10.0.1.0/24'
SOUR_PORT = '8000'
PROTOCOL = "tcp"
SOUR_IP = "10.0.1.0/24"
SOUR_PORT = "8000"
ID = None
def setUp(self):
super(TestFirewallRule, self).setUp()
if not self.conn._has_neutron_extension('fwaas_v2'):
self.skipTest('fwaas_v2 service not supported by cloud')
if not self.user_cloud._has_neutron_extension("fwaas_v2"):
self.skipTest("fwaas_v2 service not supported by cloud")
self.NAME = self.getUniqueString()
sot = self.conn.network.create_firewall_rule(
name=self.NAME, action=self.ACTION, source_port=self.SOUR_PORT,
destination_port=self.DEST_PORT, source_ip_address=self.SOUR_IP,
destination_ip_address=self.DEST_IP, ip_version=self.IP_VERSION,
protocol=self.PROTOCOL)
sot = self.user_cloud.network.create_firewall_rule(
name=self.NAME,
action=self.ACTION,
source_port=self.SOUR_PORT,
destination_port=self.DEST_PORT,
source_ip_address=self.SOUR_IP,
destination_ip_address=self.DEST_IP,
ip_version=self.IP_VERSION,
protocol=self.PROTOCOL,
)
assert isinstance(sot, firewall_rule.FirewallRule)
self.assertEqual(self.NAME, sot.name)
self.ID = sot.id
def tearDown(self):
sot = self.conn.network.delete_firewall_rule(self.ID,
ignore_missing=False)
sot = self.user_cloud.network.delete_firewall_rule(
self.ID, ignore_missing=False
)
self.assertIs(None, sot)
super(TestFirewallRule, self).tearDown()
def test_find(self):
sot = self.conn.network.find_firewall_rule(self.NAME)
sot = self.user_cloud.network.find_firewall_rule(self.NAME)
self.assertEqual(self.ID, sot.id)
def test_get(self):
sot = self.conn.network.get_firewall_rule(self.ID)
sot = self.user_cloud.network.get_firewall_rule(self.ID)
self.assertEqual(self.ID, sot.id)
self.assertEqual(self.NAME, sot.name)
self.assertEqual(self.ACTION, sot.action)
@ -65,5 +71,5 @@ class TestFirewallRule(base.BaseFunctionalTest):
self.assertEqual(self.SOUR_PORT, sot.source_port)
def test_list(self):
ids = [o.id for o in self.conn.network.firewall_rules()]
ids = [o.id for o in self.user_cloud.network.firewall_rules()]
self.assertIn(self.ID, ids)

View File

@ -31,15 +31,18 @@ class TestFirewallPolicyRuleAssociations(base.BaseFunctionalTest):
def setUp(self):
super(TestFirewallPolicyRuleAssociations, self).setUp()
if not self.conn._has_neutron_extension('fwaas_v2'):
self.skipTest('fwaas_v2 service not supported by cloud')
rul1 = self.conn.network.create_firewall_rule(name=self.RULE1_NAME)
if not self.user_cloud._has_neutron_extension("fwaas_v2"):
self.skipTest("fwaas_v2 service not supported by cloud")
rul1 = self.user_cloud.network.create_firewall_rule(
name=self.RULE1_NAME)
assert isinstance(rul1, firewall_rule.FirewallRule)
self.assertEqual(self.RULE1_NAME, rul1.name)
rul2 = self.conn.network.create_firewall_rule(name=self.RULE2_NAME)
rul2 = self.user_cloud.network.create_firewall_rule(
name=self.RULE2_NAME)
assert isinstance(rul2, firewall_rule.FirewallRule)
self.assertEqual(self.RULE2_NAME, rul2.name)
pol = self.conn.network.create_firewall_policy(name=self.POLICY_NAME)
pol = self.user_cloud.network.create_firewall_policy(
name=self.POLICY_NAME)
assert isinstance(pol, firewall_policy.FirewallPolicy)
self.assertEqual(self.POLICY_NAME, pol.name)
self.RULE1_ID = rul1.id
@ -47,45 +50,51 @@ class TestFirewallPolicyRuleAssociations(base.BaseFunctionalTest):
self.POLICY_ID = pol.id
def tearDown(self):
sot = self.conn.network.delete_firewall_policy(self.POLICY_ID,
ignore_missing=False)
sot = self.user_cloud.network.delete_firewall_policy(
self.POLICY_ID, ignore_missing=False
)
self.assertIs(None, sot)
sot = self.conn.network.delete_firewall_rule(self.RULE1_ID,
ignore_missing=False)
sot = self.user_cloud.network.delete_firewall_rule(
self.RULE1_ID, ignore_missing=False
)
self.assertIs(None, sot)
sot = self.conn.network.delete_firewall_rule(self.RULE2_ID,
ignore_missing=False)
sot = self.user_cloud.network.delete_firewall_rule(
self.RULE2_ID, ignore_missing=False
)
self.assertIs(None, sot)
super(TestFirewallPolicyRuleAssociations, self).tearDown()
def test_insert_rule_into_policy(self):
policy = self.conn.network.insert_rule_into_policy(
self.POLICY_ID,
firewall_rule_id=self.RULE1_ID)
self.assertIn(self.RULE1_ID, policy['firewall_rules'])
policy = self.conn.network.insert_rule_into_policy(
policy = self.user_cloud.network.insert_rule_into_policy(
self.POLICY_ID, firewall_rule_id=self.RULE1_ID
)
self.assertIn(self.RULE1_ID, policy["firewall_rules"])
policy = self.user_cloud.network.insert_rule_into_policy(
self.POLICY_ID,
firewall_rule_id=self.RULE2_ID,
insert_before=self.RULE1_ID)
self.assertEqual(self.RULE1_ID, policy['firewall_rules'][1])
self.assertEqual(self.RULE2_ID, policy['firewall_rules'][0])
insert_before=self.RULE1_ID,
)
self.assertEqual(self.RULE1_ID, policy["firewall_rules"][1])
self.assertEqual(self.RULE2_ID, policy["firewall_rules"][0])
def test_remove_rule_from_policy(self):
# insert rules into policy before we remove it again
policy = self.conn.network.insert_rule_into_policy(
self.POLICY_ID, firewall_rule_id=self.RULE1_ID)
self.assertIn(self.RULE1_ID, policy['firewall_rules'])
policy = self.user_cloud.network.insert_rule_into_policy(
self.POLICY_ID, firewall_rule_id=self.RULE1_ID
)
self.assertIn(self.RULE1_ID, policy["firewall_rules"])
policy = self.conn.network.insert_rule_into_policy(
self.POLICY_ID, firewall_rule_id=self.RULE2_ID)
self.assertIn(self.RULE2_ID, policy['firewall_rules'])
policy = self.user_cloud.network.insert_rule_into_policy(
self.POLICY_ID, firewall_rule_id=self.RULE2_ID
)
self.assertIn(self.RULE2_ID, policy["firewall_rules"])
policy = self.conn.network.remove_rule_from_policy(
self.POLICY_ID,
firewall_rule_id=self.RULE1_ID)
self.assertNotIn(self.RULE1_ID, policy['firewall_rules'])
policy = self.user_cloud.network.remove_rule_from_policy(
self.POLICY_ID, firewall_rule_id=self.RULE1_ID
)
self.assertNotIn(self.RULE1_ID, policy["firewall_rules"])
policy = self.conn.network.remove_rule_from_policy(
self.POLICY_ID,
firewall_rule_id=self.RULE2_ID)
self.assertNotIn(self.RULE2_ID, policy['firewall_rules'])
policy = self.user_cloud.network.remove_rule_from_policy(
self.POLICY_ID, firewall_rule_id=self.RULE2_ID
)
self.assertNotIn(self.RULE2_ID, policy["firewall_rules"])

View File

@ -25,54 +25,80 @@ class TestFlavor(base.BaseFunctionalTest):
def setUp(self):
super(TestFlavor, self).setUp()
self.FLAVOR_NAME = self.getUniqueString('flavor')
flavors = self.conn.network.create_flavor(
name=self.FLAVOR_NAME,
service_type=self.SERVICE_TYPE)
assert isinstance(flavors, flavor.Flavor)
self.assertEqual(self.FLAVOR_NAME, flavors.name)
self.assertEqual(self.SERVICE_TYPE, flavors.service_type)
if not self.user_cloud._has_neutron_extension("flavors"):
self.skipTest("Neutron flavor extension is required for this test")
self.ID = flavors.id
self.FLAVOR_NAME = self.getUniqueString("flavor")
if self.operator_cloud:
flavors = self.operator_cloud.network.create_flavor(
name=self.FLAVOR_NAME, service_type=self.SERVICE_TYPE
)
assert isinstance(flavors, flavor.Flavor)
self.assertEqual(self.FLAVOR_NAME, flavors.name)
self.assertEqual(self.SERVICE_TYPE, flavors.service_type)
self.service_profiles = self.conn.network.create_service_profile(
description=self.SERVICE_PROFILE_DESCRIPTION,
metainfo=self.METAINFO,)
self.ID = flavors.id
self.service_profiles = (
self.operator_cloud.network.create_service_profile(
description=self.SERVICE_PROFILE_DESCRIPTION,
metainfo=self.METAINFO,
)
)
def tearDown(self):
flavors = self.conn.network.delete_flavor(self.ID, ignore_missing=True)
self.assertIsNone(flavors)
if self.operator_cloud and self.ID:
flavors = self.operator_cloud.network.delete_flavor(
self.ID, ignore_missing=True
)
self.assertIsNone(flavors)
service_profiles = self.conn.network.delete_service_profile(
self.ID, ignore_missing=True)
self.assertIsNone(service_profiles)
service_profiles = self.user_cloud.network.delete_service_profile(
self.ID, ignore_missing=True
)
self.assertIsNone(service_profiles)
super(TestFlavor, self).tearDown()
def test_find(self):
flavors = self.conn.network.find_flavor(self.FLAVOR_NAME)
self.assertEqual(self.ID, flavors.id)
if self.ID:
flavors = self.user_cloud.network.find_flavor(self.FLAVOR_NAME)
self.assertEqual(self.ID, flavors.id)
else:
self.user_cloud.network.find_flavor("definitely_missing")
def test_get(self):
flavors = self.conn.network.get_flavor(self.ID)
if not self.ID:
self.skipTest("Operator cloud required for this test")
flavors = self.user_cloud.network.get_flavor(self.ID)
self.assertEqual(self.FLAVOR_NAME, flavors.name)
self.assertEqual(self.ID, flavors.id)
def test_list(self):
names = [f.name for f in self.conn.network.flavors()]
self.assertIn(self.FLAVOR_NAME, names)
names = [f.name for f in self.user_cloud.network.flavors()]
if self.ID:
self.assertIn(self.FLAVOR_NAME, names)
def test_update(self):
flavor = self.conn.network.update_flavor(self.ID,
name=self.UPDATE_NAME)
if not self.operator_cloud:
self.skipTest("Operator cloud required for this test")
flavor = self.operator_cloud.network.update_flavor(
self.ID, name=self.UPDATE_NAME
)
self.assertEqual(self.UPDATE_NAME, flavor.name)
def test_associate_disassociate_flavor_with_service_profile(self):
response = \
self.conn.network.associate_flavor_with_service_profile(
self.ID, self.service_profiles.id)
if not self.operator_cloud:
self.skipTest("Operator cloud required for this test")
response = (
self.operator_cloud.network.associate_flavor_with_service_profile(
self.ID, self.service_profiles.id
)
)
self.assertIsNotNone(response)
response = \
self.conn.network.disassociate_flavor_from_service_profile(
self.ID, self.service_profiles.id)
response = self.operator_cloud.network \
.disassociate_flavor_from_service_profile(
self.ID, self.service_profiles.id
)
self.assertIsNone(response)

View File

@ -36,78 +36,107 @@ class TestFloatingIP(base.BaseFunctionalTest):
def setUp(self):
super(TestFloatingIP, self).setUp()
if not self.conn.has_service('dns'):
self.skipTest('dns service not supported by cloud')
if not self.user_cloud._has_neutron_extension("external-net"):
self.skipTest(
"Neutron external-net extension is required for this test"
)
self.TIMEOUT_SCALING_FACTOR = 1.5
self.ROT_NAME = self.getUniqueString()
self.EXT_NET_NAME = self.getUniqueString()
self.EXT_SUB_NAME = self.getUniqueString()
self.INT_NET_NAME = self.getUniqueString()
self.INT_SUB_NAME = self.getUniqueString()
# Create Exeternal Network
args = {'router:external': True}
net = self._create_network(self.EXT_NET_NAME, **args)
self.EXT_NET_ID = net.id
sub = self._create_subnet(
self.EXT_SUB_NAME, self.EXT_NET_ID, self.EXT_CIDR)
self.EXT_SUB_ID = sub.id
self.EXT_NET_ID = None
self.EXT_SUB_ID = None
self.is_dns_supported = False
# Find External Network
for net in self.user_cloud.network.networks(is_router_external=True):
self.EXT_NET_ID = net.id
# Find subnet of the chosen external net
for sub in self.user_cloud.network.subnets(network_id=self.EXT_NET_ID):
self.EXT_SUB_ID = sub.id
if not self.EXT_NET_ID and self.operator_cloud:
# There is no existing external net, but operator
# credentials available
# WARNING: this external net is not dropped
# Create External Network
args = {"router:external": True}
net = self._create_network(self.EXT_NET_NAME, **args)
self.EXT_NET_ID = net.id
sub = self._create_subnet(
self.EXT_SUB_NAME, self.EXT_NET_ID, self.EXT_CIDR
)
self.EXT_SUB_ID = sub.id
# Create Internal Network
net = self._create_network(self.INT_NET_NAME)
self.INT_NET_ID = net.id
sub = self._create_subnet(
self.INT_SUB_NAME, self.INT_NET_ID, self.INT_CIDR)
self.INT_SUB_NAME, self.INT_NET_ID, self.INT_CIDR
)
self.INT_SUB_ID = sub.id
# Create Router
args = {'external_gateway_info': {'network_id': self.EXT_NET_ID}}
sot = self.conn.network.create_router(name=self.ROT_NAME, **args)
args = {"external_gateway_info": {"network_id": self.EXT_NET_ID}}
sot = self.user_cloud.network.create_router(name=self.ROT_NAME, **args)
assert isinstance(sot, router.Router)
self.assertEqual(self.ROT_NAME, sot.name)
self.ROT_ID = sot.id
self.ROT = sot
# Add Router's Interface to Internal Network
sot = self.ROT.add_interface(
self.conn.network, subnet_id=self.INT_SUB_ID)
self.assertEqual(sot['subnet_id'], self.INT_SUB_ID)
self.user_cloud.network, subnet_id=self.INT_SUB_ID
)
self.assertEqual(sot["subnet_id"], self.INT_SUB_ID)
# Create Port in Internal Network
prt = self.conn.network.create_port(network_id=self.INT_NET_ID)
prt = self.user_cloud.network.create_port(network_id=self.INT_NET_ID)
assert isinstance(prt, port.Port)
self.PORT_ID = prt.id
self.PORT = prt
# Create Floating IP.
fip = self.conn.network.create_ip(
fip_args = dict(
floating_network_id=self.EXT_NET_ID,
dns_domain=self.DNS_DOMAIN, dns_name=self.DNS_NAME)
)
if (
self.user_cloud._has_neutron_extension("dns-integration")
and self.user_cloud.has_service("dns")
):
self.is_dns_supported = True
fip_args.update(
dict(dns_domain=self.DNS_DOMAIN, dns_name=self.DNS_NAME)
)
fip = self.user_cloud.network.create_ip(**fip_args)
assert isinstance(fip, floating_ip.FloatingIP)
self.FIP = fip
def tearDown(self):
sot = self.conn.network.delete_ip(self.FIP.id, ignore_missing=False)
sot = self.user_cloud.network.delete_ip(
self.FIP.id, ignore_missing=False
)
self.assertIsNone(sot)
sot = self.conn.network.delete_port(self.PORT_ID, ignore_missing=False)
sot = self.user_cloud.network.delete_port(
self.PORT_ID, ignore_missing=False
)
self.assertIsNone(sot)
sot = self.ROT.remove_interface(
self.conn.network, subnet_id=self.INT_SUB_ID)
self.assertEqual(sot['subnet_id'], self.INT_SUB_ID)
sot = self.conn.network.delete_router(
self.ROT_ID, ignore_missing=False)
self.user_cloud.network, subnet_id=self.INT_SUB_ID
)
self.assertEqual(sot["subnet_id"], self.INT_SUB_ID)
sot = self.user_cloud.network.delete_router(
self.ROT_ID, ignore_missing=False
)
self.assertIsNone(sot)
sot = self.conn.network.delete_subnet(
self.EXT_SUB_ID, ignore_missing=False)
sot = self.user_cloud.network.delete_subnet(
self.INT_SUB_ID, ignore_missing=False
)
self.assertIsNone(sot)
sot = self.conn.network.delete_network(
self.EXT_NET_ID, ignore_missing=False)
self.assertIsNone(sot)
sot = self.conn.network.delete_subnet(
self.INT_SUB_ID, ignore_missing=False)
self.assertIsNone(sot)
sot = self.conn.network.delete_network(
self.INT_NET_ID, ignore_missing=False)
sot = self.user_cloud.network.delete_network(
self.INT_NET_ID, ignore_missing=False
)
self.assertIsNone(sot)
super(TestFloatingIP, self).tearDown()
def _create_network(self, name, **args):
self.name = name
net = self.conn.network.create_network(name=name, **args)
net = self.user_cloud.network.create_network(name=name, **args)
assert isinstance(net, network.Network)
self.assertEqual(self.name, net.name)
return net
@ -116,32 +145,33 @@ class TestFloatingIP(base.BaseFunctionalTest):
self.name = name
self.net_id = net_id
self.cidr = cidr
sub = self.conn.network.create_subnet(
sub = self.user_cloud.network.create_subnet(
name=self.name,
ip_version=self.IPV4,
network_id=self.net_id,
cidr=self.cidr)
cidr=self.cidr,
)
assert isinstance(sub, subnet.Subnet)
self.assertEqual(self.name, sub.name)
return sub
def test_find_by_id(self):
sot = self.conn.network.find_ip(self.FIP.id)
sot = self.user_cloud.network.find_ip(self.FIP.id)
self.assertEqual(self.FIP.id, sot.id)
def test_find_by_ip_address(self):
sot = self.conn.network.find_ip(self.FIP.floating_ip_address)
sot = self.user_cloud.network.find_ip(self.FIP.floating_ip_address)
self.assertEqual(self.FIP.floating_ip_address, sot.floating_ip_address)
self.assertEqual(self.FIP.floating_ip_address, sot.name)
def test_find_available_ip(self):
sot = self.conn.network.find_available_ip()
sot = self.user_cloud.network.find_available_ip()
self.assertIsNotNone(sot.id)
self.assertIsNone(sot.port_id)
self.assertIsNone(sot.port_details)
def test_get(self):
sot = self.conn.network.get_ip(self.FIP.id)
sot = self.user_cloud.network.get_ip(self.FIP.id)
self.assertEqual(self.EXT_NET_ID, sot.floating_network_id)
self.assertEqual(self.FIP.id, sot.id)
self.assertEqual(self.FIP.floating_ip_address, sot.floating_ip_address)
@ -149,37 +179,41 @@ class TestFloatingIP(base.BaseFunctionalTest):
self.assertEqual(self.FIP.port_id, sot.port_id)
self.assertEqual(self.FIP.port_details, sot.port_details)
self.assertEqual(self.FIP.router_id, sot.router_id)
self.assertEqual(self.DNS_DOMAIN, sot.dns_domain)
self.assertEqual(self.DNS_NAME, sot.dns_name)
if self.is_dns_supported:
self.assertEqual(self.DNS_DOMAIN, sot.dns_domain)
self.assertEqual(self.DNS_NAME, sot.dns_name)
def test_list(self):
ids = [o.id for o in self.conn.network.ips()]
ids = [o.id for o in self.user_cloud.network.ips()]
self.assertIn(self.FIP.id, ids)
def test_update(self):
sot = self.conn.network.update_ip(self.FIP.id, port_id=self.PORT_ID)
sot = self.user_cloud.network.update_ip(
self.FIP.id, port_id=self.PORT_ID
)
self.assertEqual(self.PORT_ID, sot.port_id)
self._assert_port_details(self.PORT, sot.port_details)
self.assertEqual(self.FIP.id, sot.id)
def test_set_tags(self):
sot = self.conn.network.get_ip(self.FIP.id)
sot = self.user_cloud.network.get_ip(self.FIP.id)
self.assertEqual([], sot.tags)
self.conn.network.set_tags(sot, ['blue'])
sot = self.conn.network.get_ip(self.FIP.id)
self.assertEqual(['blue'], sot.tags)
self.user_cloud.network.set_tags(sot, ["blue"])
sot = self.user_cloud.network.get_ip(self.FIP.id)
self.assertEqual(["blue"], sot.tags)
self.conn.network.set_tags(sot, [])
sot = self.conn.network.get_ip(self.FIP.id)
self.user_cloud.network.set_tags(sot, [])
sot = self.user_cloud.network.get_ip(self.FIP.id)
self.assertEqual([], sot.tags)
def _assert_port_details(self, port, port_details):
self.assertEqual(port.name, port_details['name'])
self.assertEqual(port.network_id, port_details['network_id'])
self.assertEqual(port.mac_address, port_details['mac_address'])
self.assertEqual(port.is_admin_state_up,
port_details['admin_state_up'])
self.assertEqual(port.status, port_details['status'])
self.assertEqual(port.device_id, port_details['device_id'])
self.assertEqual(port.device_owner, port_details['device_owner'])
self.assertEqual(port.name, port_details["name"])
self.assertEqual(port.network_id, port_details["network_id"])
self.assertEqual(port.mac_address, port_details["mac_address"])
self.assertEqual(
port.is_admin_state_up, port_details["admin_state_up"]
)
self.assertEqual(port.status, port_details["status"])
self.assertEqual(port.device_id, port_details["device_id"])
self.assertEqual(port.device_owner, port_details["device_owner"])

View File

@ -27,48 +27,52 @@ class TestL3ConntrackHelper(base.BaseFunctionalTest):
def setUp(self):
super(TestL3ConntrackHelper, self).setUp()
if not self.conn.network.find_extension('l3-conntrack-helper'):
self.skipTest('L3 conntrack helper extension disabled')
if not self.user_cloud.network.find_extension("l3-conntrack-helper"):
self.skipTest("L3 conntrack helper extension disabled")
self.ROT_NAME = self.getUniqueString()
# Create Router
sot = self.conn.network.create_router(name=self.ROT_NAME)
sot = self.user_cloud.network.create_router(name=self.ROT_NAME)
self.assertIsInstance(sot, router.Router)
self.assertEqual(self.ROT_NAME, sot.name)
self.ROT_ID = sot.id
self.ROT = sot
# Create conntrack helper
ct_helper = self.conn.network.create_conntrack_helper(
ct_helper = self.user_cloud.network.create_conntrack_helper(
router=self.ROT,
protocol=self.PROTOCOL,
helper=self.HELPER,
port=self.PORT)
port=self.PORT,
)
self.assertIsInstance(ct_helper, _l3_conntrack_helper.ConntrackHelper)
self.CT_HELPER = ct_helper
def tearDown(self):
sot = self.conn.network.delete_router(
self.ROT_ID, ignore_missing=False)
sot = self.user_cloud.network.delete_router(
self.ROT_ID, ignore_missing=False
)
self.assertIsNone(sot)
super(TestL3ConntrackHelper, self).tearDown()
def test_get(self):
sot = self.conn.network.get_conntrack_helper(
self.CT_HELPER, self.ROT_ID)
sot = self.user_cloud.network.get_conntrack_helper(
self.CT_HELPER, self.ROT_ID
)
self.assertEqual(self.PROTOCOL, sot.protocol)
self.assertEqual(self.HELPER, sot.helper)
self.assertEqual(self.PORT, sot.port)
def test_list(self):
helper_ids = [o.id for o in
self.conn.network.conntrack_helpers(self.ROT_ID)]
helper_ids = [
o.id
for o in self.user_cloud.network.conntrack_helpers(self.ROT_ID)
]
self.assertIn(self.CT_HELPER.id, helper_ids)
def test_update(self):
NEW_PORT = 90
sot = self.conn.network.update_conntrack_helper(
self.CT_HELPER.id,
self.ROT_ID,
port=NEW_PORT)
sot = self.user_cloud.network.update_conntrack_helper(
self.CT_HELPER.id, self.ROT_ID, port=NEW_PORT
)
self.assertEqual(NEW_PORT, sot.port)

View File

@ -24,45 +24,46 @@ class TestLocalIP(base.BaseFunctionalTest):
def setUp(self):
super(TestLocalIP, self).setUp()
if not self.conn.network.find_extension('local_ip'):
self.skipTest('Local IP extension disabled')
if not self.user_cloud.network.find_extension("local_ip"):
self.skipTest("Local IP extension disabled")
self.LOCAL_IP_NAME = self.getUniqueString()
self.LOCAL_IP_DESCRIPTION = self.getUniqueString()
self.LOCAL_IP_NAME_UPDATED = self.getUniqueString()
self.LOCAL_IP_DESCRIPTION_UPDATED = self.getUniqueString()
local_ip = self.conn.network.create_local_ip(
local_ip = self.user_cloud.network.create_local_ip(
name=self.LOCAL_IP_NAME,
description=self.LOCAL_IP_DESCRIPTION,
)
assert isinstance(local_ip, _local_ip.LocalIP)
self.assertEqual(self.LOCAL_IP_NAME, local_ip.name)
self.assertEqual(self.LOCAL_IP_DESCRIPTION,
local_ip.description)
self.assertEqual(self.LOCAL_IP_DESCRIPTION, local_ip.description)
self.LOCAL_IP_ID = local_ip.id
def tearDown(self):
sot = self.conn.network.delete_local_ip(self.LOCAL_IP_ID)
sot = self.user_cloud.network.delete_local_ip(self.LOCAL_IP_ID)
self.assertIsNone(sot)
super(TestLocalIP, self).tearDown()
def test_find(self):
sot = self.conn.network.find_local_ip(self.LOCAL_IP_NAME)
sot = self.user_cloud.network.find_local_ip(self.LOCAL_IP_NAME)
self.assertEqual(self.LOCAL_IP_ID, sot.id)
def test_get(self):
sot = self.conn.network.get_local_ip(self.LOCAL_IP_ID)
sot = self.user_cloud.network.get_local_ip(self.LOCAL_IP_ID)
self.assertEqual(self.LOCAL_IP_NAME, sot.name)
def test_list(self):
names = [local_ip.name for local_ip in self.conn.network.local_ips()]
names = [
local_ip.name
for local_ip in self.user_cloud.network.local_ips()]
self.assertIn(self.LOCAL_IP_NAME, names)
def test_update(self):
sot = self.conn.network.update_local_ip(
sot = self.user_cloud.network.update_local_ip(
self.LOCAL_IP_ID,
name=self.LOCAL_IP_NAME_UPDATED,
description=self.LOCAL_IP_DESCRIPTION_UPDATED)
description=self.LOCAL_IP_DESCRIPTION_UPDATED,
)
self.assertEqual(self.LOCAL_IP_NAME_UPDATED, sot.name)
self.assertEqual(self.LOCAL_IP_DESCRIPTION_UPDATED,
sot.description)
self.assertEqual(self.LOCAL_IP_DESCRIPTION_UPDATED, sot.description)

View File

@ -25,44 +25,51 @@ class TestLocalIPAssociation(base.BaseFunctionalTest):
def setUp(self):
super(TestLocalIPAssociation, self).setUp()
if not self.conn.network.find_extension('local_ip'):
self.skipTest('Local IP extension disabled')
if not self.user_cloud.network.find_extension("local_ip"):
self.skipTest("Local IP extension disabled")
self.LOCAL_IP_ID = self.getUniqueString()
self.FIXED_PORT_ID = self.getUniqueString()
self.FIXED_IP = self.getUniqueString()
local_ip_association = self.conn.network.create_local_ip_association(
local_ip=self.LOCAL_IP_ID,
fixed_port_id=self.FIXED_PORT_ID,
fixed_ip=self.FIXED_IP
local_ip_association = self.user_cloud.network \
.create_local_ip_association(
local_ip=self.LOCAL_IP_ID,
fixed_port_id=self.FIXED_PORT_ID,
fixed_ip=self.FIXED_IP,
)
assert isinstance(
local_ip_association, _local_ip_association.LocalIPAssociation
)
assert isinstance(local_ip_association,
_local_ip_association.LocalIPAssociation)
self.assertEqual(self.LOCAL_IP_ID, local_ip_association.local_ip_id)
self.assertEqual(self.FIXED_PORT_ID,
local_ip_association.fixed_port_id)
self.assertEqual(self.FIXED_IP,
local_ip_association.fixed_ip)
self.assertEqual(
self.FIXED_PORT_ID, local_ip_association.fixed_port_id
)
self.assertEqual(self.FIXED_IP, local_ip_association.fixed_ip)
def tearDown(self):
sot = self.conn.network.delete_local_ip_association(
self.LOCAL_IP_ID,
self.FIXED_PORT_ID)
sot = self.user_cloud.network.delete_local_ip_association(
self.LOCAL_IP_ID, self.FIXED_PORT_ID
)
self.assertIsNone(sot)
super(TestLocalIPAssociation, self).tearDown()
def test_find(self):
sot = self.conn.network.find_local_ip_association(self.FIXED_PORT_ID,
self.LOCAL_IP_ID)
sot = self.user_cloud.network.find_local_ip_association(
self.FIXED_PORT_ID, self.LOCAL_IP_ID
)
self.assertEqual(self.FIXED_PORT_ID, sot.fixed_port_id)
def test_get(self):
sot = self.conn.network.get_local_ip_association(self.FIXED_PORT_ID,
self.LOCAL_IP_ID)
sot = self.user_cloud.network.get_local_ip_association(
self.FIXED_PORT_ID, self.LOCAL_IP_ID
)
self.assertEqual(self.FIXED_PORT_ID, sot.fixed_port_id)
def test_list(self):
fixed_port_id = [obj.fixed_port_id for obj in
self.conn.network.local_ip_associations(
self.LOCAL_IP_ID)]
fixed_port_id = [
obj.fixed_port_id
for obj in self.user_cloud.network.local_ip_associations(
self.LOCAL_IP_ID
)
]
self.assertIn(self.FIXED_PORT_ID, fixed_port_id)

View File

@ -33,80 +33,91 @@ class TestNDPProxy(base.BaseFunctionalTest):
def setUp(self):
super(TestNDPProxy, self).setUp()
if not self.conn.network.find_extension('l3-ndp-proxy'):
self.skipTest('L3 ndp proxy extension disabled')
if not self.user_cloud.network.find_extension("l3-ndp-proxy"):
self.skipTest("L3 ndp proxy extension disabled")
self.ROT_NAME = self.getUniqueString()
self.EXT_NET_NAME = self.getUniqueString()
self.EXT_SUB_NAME = self.getUniqueString()
self.INT_NET_NAME = self.getUniqueString()
self.INT_SUB_NAME = self.getUniqueString()
# Create Exeternal Network
args = {'router:external': True}
net = self._create_network(self.EXT_NET_NAME, **args)
self.EXT_NET_ID = net.id
sub = self._create_subnet(
self.EXT_SUB_NAME, self.EXT_NET_ID, self.EXT_CIDR)
self.EXT_SUB_ID = sub.id
# Create Internal Network
net = self._create_network(self.INT_NET_NAME)
self.INT_NET_ID = net.id
sub = self._create_subnet(
self.INT_SUB_NAME, self.INT_NET_ID, self.INT_CIDR)
self.INT_SUB_ID = sub.id
# Find External Network
for net in self.user_cloud.network.networks(is_router_external=True):
self.EXT_NET_ID = net.id
# Find subnet of the chosen external net
for sub in self.user_cloud.network.subnets(network_id=self.EXT_NET_ID):
self.EXT_SUB_ID = sub.id
if not self.EXT_NET_ID and self.operator_cloud:
# There is no existing external net, but operator
# credentials available
# WARNING: this external net is not dropped
# Create External Network
args = {"router:external": True}
net = self._create_network(self.EXT_NET_NAME, **args)
self.EXT_NET_ID = net.id
sub = self._create_subnet(
self.EXT_SUB_NAME, self.EXT_NET_ID, self.EXT_CIDR
)
self.EXT_SUB_ID = sub.id
# Create Router
args = {'external_gateway_info': {'network_id': self.EXT_NET_ID},
'enable_ndp_proxy': True}
sot = self.conn.network.create_router(name=self.ROT_NAME, **args)
args = {
"external_gateway_info": {"network_id": self.EXT_NET_ID},
"enable_ndp_proxy": True,
}
sot = self.user_cloud.network.create_router(name=self.ROT_NAME, **args)
assert isinstance(sot, router.Router)
self.assertEqual(self.ROT_NAME, sot.name)
self.ROT_ID = sot.id
self.ROT = sot
# Add Router's Interface to Internal Network
sot = self.ROT.add_interface(
self.conn.network, subnet_id=self.INT_SUB_ID)
self.assertEqual(sot['subnet_id'], self.INT_SUB_ID)
self.user_cloud.network, subnet_id=self.INT_SUB_ID
)
self.assertEqual(sot["subnet_id"], self.INT_SUB_ID)
# Create Port in Internal Network
prt = self.conn.network.create_port(network_id=self.INT_NET_ID)
prt = self.user_cloud.network.create_port(network_id=self.INT_NET_ID)
assert isinstance(prt, port.Port)
self.INTERNAL_PORT_ID = prt.id
self.INTERNAL_IP_ADDRESS = prt.fixed_ips[0]['ip_address']
self.INTERNAL_IP_ADDRESS = prt.fixed_ips[0]["ip_address"]
# Create ndp proxy
np = self.conn.network.create_ndp_proxy(
router_id=self.ROT_ID, port_id=self.INTERNAL_PORT_ID)
np = self.user_cloud.network.create_ndp_proxy(
router_id=self.ROT_ID, port_id=self.INTERNAL_PORT_ID
)
assert isinstance(np, _ndp_proxy.NDPProxy)
self.NP = np
def tearDown(self):
sot = self.conn.network.delete_ndp_proxy(
self.NP.id, ignore_missing=False)
sot = self.user_cloud.network.delete_ndp_proxy(
self.NP.id, ignore_missing=False
)
self.assertIsNone(sot)
sot = self.conn.network.delete_port(
self.INTERNAL_PORT_ID, ignore_missing=False)
sot = self.user_cloud.network.delete_port(
self.INTERNAL_PORT_ID, ignore_missing=False
)
self.assertIsNone(sot)
sot = self.ROT.remove_interface(
self.conn.network, subnet_id=self.INT_SUB_ID)
self.assertEqual(sot['subnet_id'], self.INT_SUB_ID)
sot = self.conn.network.delete_router(
self.ROT_ID, ignore_missing=False)
self.user_cloud.network, subnet_id=self.INT_SUB_ID
)
self.assertEqual(sot["subnet_id"], self.INT_SUB_ID)
sot = self.user_cloud.network.delete_router(
self.ROT_ID, ignore_missing=False
)
self.assertIsNone(sot)
sot = self.conn.network.delete_subnet(
self.EXT_SUB_ID, ignore_missing=False)
sot = self.user_cloud.network.delete_subnet(
self.INT_SUB_ID, ignore_missing=False
)
self.assertIsNone(sot)
sot = self.conn.network.delete_network(
self.EXT_NET_ID, ignore_missing=False)
self.assertIsNone(sot)
sot = self.conn.network.delete_subnet(
self.INT_SUB_ID, ignore_missing=False)
self.assertIsNone(sot)
sot = self.conn.network.delete_network(
self.INT_NET_ID, ignore_missing=False)
sot = self.user_cloud.network.delete_network(
self.INT_NET_ID, ignore_missing=False
)
self.assertIsNone(sot)
super(TestNDPProxy, self).tearDown()
def _create_network(self, name, **args):
self.name = name
net = self.conn.network.create_network(name=name, **args)
net = self.user_cloud.network.create_network(name=name, **args)
assert isinstance(net, network.Network)
self.assertEqual(self.name, net.name)
return net
@ -115,33 +126,35 @@ class TestNDPProxy(base.BaseFunctionalTest):
self.name = name
self.net_id = net_id
self.cidr = cidr
sub = self.conn.network.create_subnet(
sub = self.user_cloud.network.create_subnet(
name=self.name,
ip_version=self.IPV6,
network_id=self.net_id,
cidr=self.cidr)
cidr=self.cidr,
)
assert isinstance(sub, subnet.Subnet)
self.assertEqual(self.name, sub.name)
return sub
def test_find(self):
sot = self.conn.network.find_ndp_proxy(self.NP.id)
sot = self.user_cloud.network.find_ndp_proxy(self.NP.id)
self.assertEqual(self.ROT_ID, sot.router_id)
self.assertEqual(self.INTERNAL_PORT_ID, sot.port_id)
self.assertEqual(self.INTERNAL_IP_ADDRESS, sot.ip_address)
def test_get(self):
sot = self.conn.network.get_ndp_proxy(self.NP.id)
sot = self.user_cloud.network.get_ndp_proxy(self.NP.id)
self.assertEqual(self.ROT_ID, sot.router_id)
self.assertEqual(self.INTERNAL_PORT_ID, sot.port_id)
self.assertEqual(self.INTERNAL_IP_ADDRESS, sot.ip_address)
def test_list(self):
np_ids = [o.id for o in self.conn.network.ndp_proxies()]
np_ids = [o.id for o in self.user_cloud.network.ndp_proxies()]
self.assertIn(self.NP.id, np_ids)
def test_update(self):
description = "balabalbala"
sot = self.conn.network.update_ndp_proxy(
self.NP.id, description=description)
sot = self.user_cloud.network.update_ndp_proxy(
self.NP.id, description=description
)
self.assertEqual(description, sot.description)

View File

@ -19,10 +19,8 @@ def create_network(conn, name, cidr):
try:
network = conn.network.create_network(name=name)
subnet = conn.network.create_subnet(
name=name,
ip_version=4,
network_id=network.id,
cidr=cidr)
name=name, ip_version=4, network_id=network.id, cidr=cidr
)
return (network, subnet)
except Exception as e:
print(str(e))
@ -44,50 +42,57 @@ class TestNetwork(base.BaseFunctionalTest):
def setUp(self):
super(TestNetwork, self).setUp()
self.NAME = self.getUniqueString()
sot = self.conn.network.create_network(name=self.NAME)
sot = self.user_cloud.network.create_network(name=self.NAME)
assert isinstance(sot, network.Network)
self.assertEqual(self.NAME, sot.name)
self.ID = sot.id
def tearDown(self):
sot = self.conn.network.delete_network(self.ID, ignore_missing=False)
sot = self.user_cloud.network.delete_network(
self.ID, ignore_missing=False
)
self.assertIsNone(sot)
super(TestNetwork, self).tearDown()
def test_find(self):
sot = self.conn.network.find_network(self.NAME)
sot = self.user_cloud.network.find_network(self.NAME)
self.assertEqual(self.ID, sot.id)
def test_find_with_filter(self):
if not self.operator_cloud:
self.skipTest("Operator cloud required for this test")
project_id_1 = "1"
project_id_2 = "2"
sot1 = self.conn.network.create_network(name=self.NAME,
project_id=project_id_1)
sot2 = self.conn.network.create_network(name=self.NAME,
project_id=project_id_2)
sot = self.conn.network.find_network(self.NAME,
project_id=project_id_1)
sot1 = self.operator_cloud.network.create_network(
name=self.NAME, project_id=project_id_1
)
sot2 = self.operator_cloud.network.create_network(
name=self.NAME, project_id=project_id_2
)
sot = self.operator_cloud.network.find_network(
self.NAME, project_id=project_id_1
)
self.assertEqual(project_id_1, sot.project_id)
self.conn.network.delete_network(sot1.id)
self.conn.network.delete_network(sot2.id)
self.operator_cloud.network.delete_network(sot1.id)
self.operator_cloud.network.delete_network(sot2.id)
def test_get(self):
sot = self.conn.network.get_network(self.ID)
sot = self.user_cloud.network.get_network(self.ID)
self.assertEqual(self.NAME, sot.name)
self.assertEqual(self.ID, sot.id)
def test_list(self):
names = [o.name for o in self.conn.network.networks()]
names = [o.name for o in self.user_cloud.network.networks()]
self.assertIn(self.NAME, names)
def test_set_tags(self):
sot = self.conn.network.get_network(self.ID)
sot = self.user_cloud.network.get_network(self.ID)
self.assertEqual([], sot.tags)
self.conn.network.set_tags(sot, ['blue'])
sot = self.conn.network.get_network(self.ID)
self.assertEqual(['blue'], sot.tags)
self.user_cloud.network.set_tags(sot, ["blue"])
sot = self.user_cloud.network.get_network(self.ID)
self.assertEqual(["blue"], sot.tags)
self.conn.network.set_tags(sot, [])
sot = self.conn.network.get_network(self.ID)
self.user_cloud.network.set_tags(sot, [])
sot = self.user_cloud.network.get_network(self.ID)
self.assertEqual([], sot.tags)

View File

@ -27,48 +27,65 @@ class TestNetworkIPAvailability(base.BaseFunctionalTest):
def setUp(self):
super(TestNetworkIPAvailability, self).setUp()
if not self.operator_cloud:
self.skipTest("Operator cloud required for this test")
if not self.operator_cloud._has_neutron_extension(
"network-ip-availability"
):
self.skipTest(
"Neutron network-ip-availability extension is required "
"for this test"
)
self.NET_NAME = self.getUniqueString()
self.SUB_NAME = self.getUniqueString()
self.PORT_NAME = self.getUniqueString()
self.UPDATE_NAME = self.getUniqueString()
net = self.conn.network.create_network(name=self.NET_NAME)
net = self.operator_cloud.network.create_network(name=self.NET_NAME)
assert isinstance(net, network.Network)
self.assertEqual(self.NET_NAME, net.name)
self.NET_ID = net.id
sub = self.conn.network.create_subnet(
sub = self.operator_cloud.network.create_subnet(
name=self.SUB_NAME,
ip_version=self.IPV4,
network_id=self.NET_ID,
cidr=self.CIDR)
cidr=self.CIDR,
)
assert isinstance(sub, subnet.Subnet)
self.assertEqual(self.SUB_NAME, sub.name)
self.SUB_ID = sub.id
prt = self.conn.network.create_port(
name=self.PORT_NAME,
network_id=self.NET_ID)
prt = self.operator_cloud.network.create_port(
name=self.PORT_NAME, network_id=self.NET_ID
)
assert isinstance(prt, port.Port)
self.assertEqual(self.PORT_NAME, prt.name)
self.PORT_ID = prt.id
def tearDown(self):
sot = self.conn.network.delete_port(self.PORT_ID)
sot = self.operator_cloud.network.delete_port(self.PORT_ID)
self.assertIsNone(sot)
sot = self.conn.network.delete_subnet(self.SUB_ID)
sot = self.operator_cloud.network.delete_subnet(self.SUB_ID)
self.assertIsNone(sot)
sot = self.conn.network.delete_network(self.NET_ID)
sot = self.operator_cloud.network.delete_network(self.NET_ID)
self.assertIsNone(sot)
super(TestNetworkIPAvailability, self).tearDown()
def test_find(self):
sot = self.conn.network.find_network_ip_availability(self.NET_ID)
sot = self.operator_cloud.network.find_network_ip_availability(
self.NET_ID
)
self.assertEqual(self.NET_ID, sot.network_id)
def test_get(self):
sot = self.conn.network.get_network_ip_availability(self.NET_ID)
sot = self.operator_cloud.network.get_network_ip_availability(
self.NET_ID
)
self.assertEqual(self.NET_ID, sot.network_id)
self.assertEqual(self.NET_NAME, sot.network_name)
def test_list(self):
ids = [o.network_id for o in
self.conn.network.network_ip_availabilities()]
ids = [
o.network_id
for o in self.operator_cloud.network.network_ip_availabilities()
]
self.assertIn(self.NET_ID, ids)

View File

@ -20,44 +20,52 @@ from openstack.tests.functional import base
class TestNetworkSegmentRange(base.BaseFunctionalTest):
NETWORK_SEGMENT_RANGE_ID = None
NAME = 'test_name'
NAME = "test_name"
DEFAULT = False
SHARED = False
PROJECT_ID = '2018'
NETWORK_TYPE = 'vlan'
PHYSICAL_NETWORK = 'phys_net'
PROJECT_ID = "2018"
NETWORK_TYPE = "vlan"
PHYSICAL_NETWORK = "phys_net"
MINIMUM = 100
MAXIMUM = 200
def setUp(self):
super(TestNetworkSegmentRange, self).setUp()
if not self.operator_cloud:
self.skipTest("Operator cloud required for this test")
# NOTE(kailun): The network segment range extension is not yet enabled
# by default.
# Skip the tests if not enabled.
if not self.conn.network.find_extension('network-segment-range'):
self.skipTest('Network Segment Range extension disabled')
if not self.operator_cloud.network.find_extension(
"network-segment-range"
):
self.skipTest("Network Segment Range extension disabled")
test_seg_range = self.conn.network.create_network_segment_range(
name=self.NAME,
default=self.DEFAULT,
shared=self.SHARED,
project_id=self.PROJECT_ID,
network_type=self.NETWORK_TYPE,
physical_network=self.PHYSICAL_NETWORK,
minimum=self.MINIMUM,
maximum=self.MAXIMUM,
test_seg_range = (
self.operator_cloud.network.create_network_segment_range(
name=self.NAME,
default=self.DEFAULT,
shared=self.SHARED,
project_id=self.PROJECT_ID,
network_type=self.NETWORK_TYPE,
physical_network=self.PHYSICAL_NETWORK,
minimum=self.MINIMUM,
maximum=self.MAXIMUM,
)
)
self.assertIsInstance(
test_seg_range, network_segment_range.NetworkSegmentRange
)
self.assertIsInstance(test_seg_range,
network_segment_range.NetworkSegmentRange)
self.NETWORK_SEGMENT_RANGE_ID = test_seg_range.id
self.assertEqual(self.NAME, test_seg_range.name)
self.assertEqual(self.DEFAULT, test_seg_range.default)
self.assertEqual(self.SHARED, test_seg_range.shared)
self.assertEqual(self.PROJECT_ID, test_seg_range.project_id)
self.assertEqual(self.NETWORK_TYPE, test_seg_range.network_type)
self.assertEqual(self.PHYSICAL_NETWORK,
test_seg_range.physical_network)
self.assertEqual(
self.PHYSICAL_NETWORK, test_seg_range.physical_network
)
self.assertEqual(self.MINIMUM, test_seg_range.minimum)
self.assertEqual(self.MAXIMUM, test_seg_range.maximum)
@ -65,35 +73,48 @@ class TestNetworkSegmentRange(base.BaseFunctionalTest):
super(TestNetworkSegmentRange, self).tearDown()
def test_create_delete(self):
del_test_seg_range = self.conn.network.delete_network_segment_range(
self.NETWORK_SEGMENT_RANGE_ID)
del_test_seg_range = (
self.operator_cloud.network.delete_network_segment_range(
self.NETWORK_SEGMENT_RANGE_ID
)
)
self.assertIsNone(del_test_seg_range)
def test_find(self):
test_seg_range = self.conn.network.find_network_segment_range(
self.NETWORK_SEGMENT_RANGE_ID)
test_seg_range = (
self.operator_cloud.network.find_network_segment_range(
self.NETWORK_SEGMENT_RANGE_ID
)
)
self.assertEqual(self.NETWORK_SEGMENT_RANGE_ID, test_seg_range.id)
def test_get(self):
test_seg_range = self.conn.network.get_network_segment_range(
self.NETWORK_SEGMENT_RANGE_ID)
test_seg_range = self.operator_cloud.network.get_network_segment_range(
self.NETWORK_SEGMENT_RANGE_ID
)
self.assertEqual(self.NETWORK_SEGMENT_RANGE_ID, test_seg_range.id)
self.assertEqual(self.NAME, test_seg_range.name)
self.assertEqual(self.DEFAULT, test_seg_range.default)
self.assertEqual(self.SHARED, test_seg_range.shared)
self.assertEqual(self.PROJECT_ID, test_seg_range.project_id)
self.assertEqual(self.NETWORK_TYPE, test_seg_range.network_type)
self.assertEqual(self.PHYSICAL_NETWORK,
test_seg_range.physical_network)
self.assertEqual(
self.PHYSICAL_NETWORK, test_seg_range.physical_network
)
self.assertEqual(self.MINIMUM, test_seg_range.minimum)
self.assertEqual(self.MAXIMUM, test_seg_range.maximum)
def test_list(self):
ids = [o.id for o in self.conn.network.network_segment_ranges(
name=None)]
ids = [
o.id
for o in self.operator_cloud.network.network_segment_ranges(
name=None
)
]
self.assertIn(self.NETWORK_SEGMENT_RANGE_ID, ids)
def test_update(self):
update_seg_range = self.conn.network.update_segment(
self.NETWORK_SEGMENT_RANGE_ID, name='update_test_name')
self.assertEqual('update_test_name', update_seg_range.name)
update_seg_range = self.operator_cloud.network.update_segment(
self.NETWORK_SEGMENT_RANGE_ID, name="update_test_name"
)
self.assertEqual("update_test_name", update_seg_range.name)

View File

@ -31,64 +31,69 @@ class TestPort(base.BaseFunctionalTest):
self.SUB_NAME = self.getUniqueString()
self.PORT_NAME = self.getUniqueString()
self.UPDATE_NAME = self.getUniqueString()
net = self.conn.network.create_network(name=self.NET_NAME)
net = self.user_cloud.network.create_network(name=self.NET_NAME)
assert isinstance(net, network.Network)
self.assertEqual(self.NET_NAME, net.name)
self.NET_ID = net.id
sub = self.conn.network.create_subnet(
sub = self.user_cloud.network.create_subnet(
name=self.SUB_NAME,
ip_version=self.IPV4,
network_id=self.NET_ID,
cidr=self.CIDR)
cidr=self.CIDR,
)
assert isinstance(sub, subnet.Subnet)
self.assertEqual(self.SUB_NAME, sub.name)
self.SUB_ID = sub.id
prt = self.conn.network.create_port(
name=self.PORT_NAME,
network_id=self.NET_ID)
prt = self.user_cloud.network.create_port(
name=self.PORT_NAME, network_id=self.NET_ID
)
assert isinstance(prt, port.Port)
self.assertEqual(self.PORT_NAME, prt.name)
self.PORT_ID = prt.id
def tearDown(self):
sot = self.conn.network.delete_port(
self.PORT_ID, ignore_missing=False)
sot = self.user_cloud.network.delete_port(
self.PORT_ID, ignore_missing=False
)
self.assertIsNone(sot)
sot = self.conn.network.delete_subnet(
self.SUB_ID, ignore_missing=False)
sot = self.user_cloud.network.delete_subnet(
self.SUB_ID, ignore_missing=False
)
self.assertIsNone(sot)
sot = self.conn.network.delete_network(
self.NET_ID, ignore_missing=False)
sot = self.user_cloud.network.delete_network(
self.NET_ID, ignore_missing=False
)
self.assertIsNone(sot)
super(TestPort, self).tearDown()
def test_find(self):
sot = self.conn.network.find_port(self.PORT_NAME)
sot = self.user_cloud.network.find_port(self.PORT_NAME)
self.assertEqual(self.PORT_ID, sot.id)
def test_get(self):
sot = self.conn.network.get_port(self.PORT_ID)
sot = self.user_cloud.network.get_port(self.PORT_ID)
self.assertEqual(self.PORT_ID, sot.id)
self.assertEqual(self.PORT_NAME, sot.name)
self.assertEqual(self.NET_ID, sot.network_id)
def test_list(self):
ids = [o.id for o in self.conn.network.ports()]
ids = [o.id for o in self.user_cloud.network.ports()]
self.assertIn(self.PORT_ID, ids)
def test_update(self):
sot = self.conn.network.update_port(self.PORT_ID,
name=self.UPDATE_NAME)
sot = self.user_cloud.network.update_port(
self.PORT_ID, name=self.UPDATE_NAME
)
self.assertEqual(self.UPDATE_NAME, sot.name)
def test_set_tags(self):
sot = self.conn.network.get_port(self.PORT_ID)
sot = self.user_cloud.network.get_port(self.PORT_ID)
self.assertEqual([], sot.tags)
self.conn.network.set_tags(sot, ['blue'])
sot = self.conn.network.get_port(self.PORT_ID)
self.assertEqual(['blue'], sot.tags)
self.user_cloud.network.set_tags(sot, ["blue"])
sot = self.user_cloud.network.get_port(self.PORT_ID)
self.assertEqual(["blue"], sot.tags)
self.conn.network.set_tags(sot, [])
sot = self.conn.network.get_port(self.PORT_ID)
self.user_cloud.network.set_tags(sot, [])
sot = self.user_cloud.network.get_port(self.PORT_ID)
self.assertEqual([], sot.tags)

View File

@ -37,97 +37,122 @@ class TestPortForwarding(base.BaseFunctionalTest):
INTERNAL_PORT = 8080
EXTERNAL_PORT = 80
PROTOCOL = "tcp"
DESCRIPTION = 'description'
DESCRIPTION = "description"
def setUp(self):
super(TestPortForwarding, self).setUp()
if not self.conn.network.find_extension('floating-ip-port-forwarding'):
self.skipTest('Floating IP Port Forwarding extension disabled')
if not self.user_cloud._has_neutron_extension("external-net"):
self.skipTest(
"Neutron external-net extension is required for this test"
)
if not self.user_cloud.network.find_extension(
"floating-ip-port-forwarding"
):
self.skipTest("Floating IP Port Forwarding extension disabled")
self.ROT_NAME = self.getUniqueString()
self.EXT_NET_NAME = self.getUniqueString()
self.EXT_SUB_NAME = self.getUniqueString()
self.INT_NET_NAME = self.getUniqueString()
self.INT_SUB_NAME = self.getUniqueString()
# Create Exeternal Network
args = {'router:external': True}
net = self._create_network(self.EXT_NET_NAME, **args)
self.EXT_NET_ID = net.id
sub = self._create_subnet(
self.EXT_SUB_NAME, self.EXT_NET_ID, self.EXT_CIDR)
self.EXT_SUB_ID = sub.id
self.EXT_NET_ID = None
self.EXT_SUB_ID = None
# Find External Network
for net in self.user_cloud.network.networks(is_router_external=True):
self.EXT_NET_ID = net.id
# Find subnet of the chosen external net
for sub in self.user_cloud.network.subnets(network_id=self.EXT_NET_ID):
self.EXT_SUB_ID = sub.id
if not self.EXT_NET_ID and self.operator_cloud:
# There is no existing external net, but operator
# credentials available
# WARNING: this external net is not dropped
# Create External Network
args = {"router:external": True}
net = self._create_network(self.EXT_NET_NAME, **args)
self.EXT_NET_ID = net.id
sub = self._create_subnet(
self.EXT_SUB_NAME, self.EXT_NET_ID, self.EXT_CIDR
)
self.EXT_SUB_ID = sub.id
# Create Internal Network
net = self._create_network(self.INT_NET_NAME)
self.INT_NET_ID = net.id
sub = self._create_subnet(
self.INT_SUB_NAME, self.INT_NET_ID, self.INT_CIDR)
self.INT_SUB_NAME, self.INT_NET_ID, self.INT_CIDR
)
self.INT_SUB_ID = sub.id
# Create Router
args = {'external_gateway_info': {'network_id': self.EXT_NET_ID}}
sot = self.conn.network.create_router(name=self.ROT_NAME, **args)
args = {"external_gateway_info": {"network_id": self.EXT_NET_ID}}
sot = self.user_cloud.network.create_router(name=self.ROT_NAME, **args)
assert isinstance(sot, router.Router)
self.assertEqual(self.ROT_NAME, sot.name)
self.ROT_ID = sot.id
self.ROT = sot
# Add Router's Interface to Internal Network
sot = self.ROT.add_interface(
self.conn.network, subnet_id=self.INT_SUB_ID)
self.assertEqual(sot['subnet_id'], self.INT_SUB_ID)
self.user_cloud.network, subnet_id=self.INT_SUB_ID
)
self.assertEqual(sot["subnet_id"], self.INT_SUB_ID)
# Create Port in Internal Network
prt = self.conn.network.create_port(network_id=self.INT_NET_ID)
prt = self.user_cloud.network.create_port(network_id=self.INT_NET_ID)
assert isinstance(prt, port.Port)
self.INTERNAL_PORT_ID = prt.id
self.INTERNAL_IP_ADDRESS = prt.fixed_ips[0]['ip_address']
self.INTERNAL_IP_ADDRESS = prt.fixed_ips[0]["ip_address"]
# Create Floating IP.
fip = self.conn.network.create_ip(
floating_network_id=self.EXT_NET_ID)
fip = self.user_cloud.network.create_ip(
floating_network_id=self.EXT_NET_ID
)
assert isinstance(fip, floating_ip.FloatingIP)
self.FIP_ID = fip.id
# Create Port Forwarding
pf = self.conn.network.create_port_forwarding(
pf = self.user_cloud.network.create_port_forwarding(
floatingip_id=self.FIP_ID,
internal_port_id=self.INTERNAL_PORT_ID,
internal_ip_address=self.INTERNAL_IP_ADDRESS,
internal_port=self.INTERNAL_PORT,
external_port=self.EXTERNAL_PORT,
protocol=self.PROTOCOL,
description=self.DESCRIPTION)
description=self.DESCRIPTION,
)
assert isinstance(pf, _port_forwarding.PortForwarding)
self.PF = pf
def tearDown(self):
sot = self.conn.network.delete_port_forwarding(
self.PF, self.FIP_ID, ignore_missing=False)
sot = self.user_cloud.network.delete_port_forwarding(
self.PF, self.FIP_ID, ignore_missing=False
)
self.assertIsNone(sot)
sot = self.conn.network.delete_ip(self.FIP_ID, ignore_missing=False)
sot = self.user_cloud.network.delete_ip(
self.FIP_ID, ignore_missing=False
)
self.assertIsNone(sot)
sot = self.conn.network.delete_port(
self.INTERNAL_PORT_ID, ignore_missing=False)
sot = self.user_cloud.network.delete_port(
self.INTERNAL_PORT_ID, ignore_missing=False
)
self.assertIsNone(sot)
sot = self.ROT.remove_interface(
self.conn.network, subnet_id=self.INT_SUB_ID)
self.assertEqual(sot['subnet_id'], self.INT_SUB_ID)
sot = self.conn.network.delete_router(
self.ROT_ID, ignore_missing=False)
self.user_cloud.network, subnet_id=self.INT_SUB_ID
)
self.assertEqual(sot["subnet_id"], self.INT_SUB_ID)
sot = self.user_cloud.network.delete_router(
self.ROT_ID, ignore_missing=False
)
self.assertIsNone(sot)
sot = self.conn.network.delete_subnet(
self.EXT_SUB_ID, ignore_missing=False)
sot = self.user_cloud.network.delete_subnet(
self.INT_SUB_ID, ignore_missing=False
)
self.assertIsNone(sot)
sot = self.conn.network.delete_network(
self.EXT_NET_ID, ignore_missing=False)
self.assertIsNone(sot)
sot = self.conn.network.delete_subnet(
self.INT_SUB_ID, ignore_missing=False)
self.assertIsNone(sot)
sot = self.conn.network.delete_network(
self.INT_NET_ID, ignore_missing=False)
sot = self.user_cloud.network.delete_network(
self.INT_NET_ID, ignore_missing=False
)
self.assertIsNone(sot)
super(TestPortForwarding, self).tearDown()
def _create_network(self, name, **args):
self.name = name
net = self.conn.network.create_network(name=name, **args)
net = self.user_cloud.network.create_network(name=name, **args)
assert isinstance(net, network.Network)
self.assertEqual(self.name, net.name)
return net
@ -136,18 +161,20 @@ class TestPortForwarding(base.BaseFunctionalTest):
self.name = name
self.net_id = net_id
self.cidr = cidr
sub = self.conn.network.create_subnet(
sub = self.user_cloud.network.create_subnet(
name=self.name,
ip_version=self.IPV4,
network_id=self.net_id,
cidr=self.cidr)
cidr=self.cidr,
)
assert isinstance(sub, subnet.Subnet)
self.assertEqual(self.name, sub.name)
return sub
def test_find(self):
sot = self.conn.network.find_port_forwarding(
self.PF.id, self.FIP_ID)
sot = self.user_cloud.network.find_port_forwarding(
self.PF.id, self.FIP_ID
)
self.assertEqual(self.INTERNAL_PORT_ID, sot.internal_port_id)
self.assertEqual(self.INTERNAL_IP_ADDRESS, sot.internal_ip_address)
self.assertEqual(self.INTERNAL_PORT, sot.internal_port)
@ -156,8 +183,7 @@ class TestPortForwarding(base.BaseFunctionalTest):
self.assertEqual(self.DESCRIPTION, sot.description)
def test_get(self):
sot = self.conn.network.get_port_forwarding(
self.PF, self.FIP_ID)
sot = self.user_cloud.network.get_port_forwarding(self.PF, self.FIP_ID)
self.assertEqual(self.INTERNAL_PORT_ID, sot.internal_port_id)
self.assertEqual(self.INTERNAL_IP_ADDRESS, sot.internal_ip_address)
self.assertEqual(self.INTERNAL_PORT, sot.internal_port)
@ -166,14 +192,14 @@ class TestPortForwarding(base.BaseFunctionalTest):
self.assertEqual(self.DESCRIPTION, sot.description)
def test_list(self):
pf_ids = [o.id for o in
self.conn.network.port_forwardings(self.FIP_ID)]
pf_ids = [
o.id for o in self.user_cloud.network.port_forwardings(self.FIP_ID)
]
self.assertIn(self.PF.id, pf_ids)
def test_update(self):
NEW_EXTERNAL_PORT = 90
sot = self.conn.network.update_port_forwarding(
self.PF.id,
self.FIP_ID,
external_port=NEW_EXTERNAL_PORT)
sot = self.user_cloud.network.update_port_forwarding(
self.PF.id, self.FIP_ID, external_port=NEW_EXTERNAL_PORT
)
self.assertEqual(NEW_EXTERNAL_PORT, sot.external_port)

View File

@ -11,8 +11,9 @@
# under the License.
from openstack.network.v2 import (qos_bandwidth_limit_rule as
_qos_bandwidth_limit_rule)
from openstack.network.v2 import (
qos_bandwidth_limit_rule as _qos_bandwidth_limit_rule
)
from openstack.tests.functional import base
@ -25,58 +26,67 @@ class TestQoSBandwidthLimitRule(base.BaseFunctionalTest):
RULE_MAX_KBPS_NEW = 1800
RULE_MAX_BURST_KBPS = 1100
RULE_MAX_BURST_KBPS_NEW = 1300
RULE_DIRECTION = 'egress'
RULE_DIRECTION_NEW = 'ingress'
RULE_DIRECTION = "egress"
RULE_DIRECTION_NEW = "ingress"
def setUp(self):
super(TestQoSBandwidthLimitRule, self).setUp()
if not self.operator_cloud:
self.skipTest("Operator cloud required for this test")
# Skip the tests if qos-bw-limit-direction extension is not enabled.
if not self.conn.network.find_extension('qos-bw-limit-direction'):
self.skipTest('Network qos-bw-limit-direction extension disabled')
if not self.operator_cloud.network.find_extension(
"qos-bw-limit-direction"
):
self.skipTest("Network qos-bw-limit-direction extension disabled")
self.QOS_POLICY_NAME = self.getUniqueString()
self.RULE_ID = self.getUniqueString()
qos_policy = self.conn.network.create_qos_policy(
qos_policy = self.operator_cloud.network.create_qos_policy(
description=self.QOS_POLICY_DESCRIPTION,
name=self.QOS_POLICY_NAME,
shared=self.QOS_IS_SHARED,
)
self.QOS_POLICY_ID = qos_policy.id
qos_rule = self.conn.network.create_qos_bandwidth_limit_rule(
self.QOS_POLICY_ID, max_kbps=self.RULE_MAX_KBPS,
qos_rule = self.operator_cloud.network.create_qos_bandwidth_limit_rule(
self.QOS_POLICY_ID,
max_kbps=self.RULE_MAX_KBPS,
max_burst_kbps=self.RULE_MAX_BURST_KBPS,
direction=self.RULE_DIRECTION,
)
assert isinstance(qos_rule,
_qos_bandwidth_limit_rule.QoSBandwidthLimitRule)
assert isinstance(
qos_rule, _qos_bandwidth_limit_rule.QoSBandwidthLimitRule
)
self.assertEqual(self.RULE_MAX_KBPS, qos_rule.max_kbps)
self.assertEqual(self.RULE_MAX_BURST_KBPS, qos_rule.max_burst_kbps)
self.assertEqual(self.RULE_DIRECTION, qos_rule.direction)
self.RULE_ID = qos_rule.id
def tearDown(self):
rule = self.conn.network.delete_qos_minimum_bandwidth_rule(
self.RULE_ID,
self.QOS_POLICY_ID)
qos_policy = self.conn.network.delete_qos_policy(self.QOS_POLICY_ID)
rule = self.operator_cloud.network.delete_qos_minimum_bandwidth_rule(
self.RULE_ID, self.QOS_POLICY_ID
)
qos_policy = self.operator_cloud.network.delete_qos_policy(
self.QOS_POLICY_ID
)
self.assertIsNone(rule)
self.assertIsNone(qos_policy)
super(TestQoSBandwidthLimitRule, self).tearDown()
def test_find(self):
sot = self.conn.network.find_qos_bandwidth_limit_rule(
self.RULE_ID,
self.QOS_POLICY_ID)
sot = self.operator_cloud.network.find_qos_bandwidth_limit_rule(
self.RULE_ID, self.QOS_POLICY_ID
)
self.assertEqual(self.RULE_ID, sot.id)
self.assertEqual(self.RULE_MAX_KBPS, sot.max_kbps)
self.assertEqual(self.RULE_MAX_BURST_KBPS, sot.max_burst_kbps)
self.assertEqual(self.RULE_DIRECTION, sot.direction)
def test_get(self):
sot = self.conn.network.get_qos_bandwidth_limit_rule(
self.RULE_ID,
self.QOS_POLICY_ID)
sot = self.operator_cloud.network.get_qos_bandwidth_limit_rule(
self.RULE_ID, self.QOS_POLICY_ID
)
self.assertEqual(self.RULE_ID, sot.id)
self.assertEqual(self.QOS_POLICY_ID, sot.qos_policy_id)
self.assertEqual(self.RULE_MAX_KBPS, sot.max_kbps)
@ -84,18 +94,22 @@ class TestQoSBandwidthLimitRule(base.BaseFunctionalTest):
self.assertEqual(self.RULE_DIRECTION, sot.direction)
def test_list(self):
rule_ids = [o.id for o in
self.conn.network.qos_bandwidth_limit_rules(
self.QOS_POLICY_ID)]
rule_ids = [
o.id
for o in self.operator_cloud.network.qos_bandwidth_limit_rules(
self.QOS_POLICY_ID
)
]
self.assertIn(self.RULE_ID, rule_ids)
def test_update(self):
sot = self.conn.network.update_qos_bandwidth_limit_rule(
sot = self.operator_cloud.network.update_qos_bandwidth_limit_rule(
self.RULE_ID,
self.QOS_POLICY_ID,
max_kbps=self.RULE_MAX_KBPS_NEW,
max_burst_kbps=self.RULE_MAX_BURST_KBPS_NEW,
direction=self.RULE_DIRECTION_NEW)
direction=self.RULE_DIRECTION_NEW,
)
self.assertEqual(self.RULE_MAX_KBPS_NEW, sot.max_kbps)
self.assertEqual(self.RULE_MAX_BURST_KBPS_NEW, sot.max_burst_kbps)
self.assertEqual(self.RULE_DIRECTION_NEW, sot.direction)

View File

@ -11,8 +11,9 @@
# under the License.
from openstack.network.v2 import (qos_dscp_marking_rule as
_qos_dscp_marking_rule)
from openstack.network.v2 import (
qos_dscp_marking_rule as _qos_dscp_marking_rule
)
from openstack.tests.functional import base
@ -27,9 +28,12 @@ class TestQoSDSCPMarkingRule(base.BaseFunctionalTest):
def setUp(self):
super(TestQoSDSCPMarkingRule, self).setUp()
if not self.operator_cloud:
self.skipTest("Operator cloud is required for this test")
# Skip the tests if qos extension is not enabled.
if not self.conn.network.find_extension('qos'):
self.skipTest('Network qos extension disabled')
if not self.conn.network.find_extension("qos"):
self.skipTest("Network qos extension disabled")
self.QOS_POLICY_NAME = self.getUniqueString()
self.RULE_ID = self.getUniqueString()
@ -40,7 +44,8 @@ class TestQoSDSCPMarkingRule(base.BaseFunctionalTest):
)
self.QOS_POLICY_ID = qos_policy.id
qos_rule = self.conn.network.create_qos_dscp_marking_rule(
self.QOS_POLICY_ID, dscp_mark=self.RULE_DSCP_MARK,
self.QOS_POLICY_ID,
dscp_mark=self.RULE_DSCP_MARK,
)
assert isinstance(qos_rule, _qos_dscp_marking_rule.QoSDSCPMarkingRule)
self.assertEqual(self.RULE_DSCP_MARK, qos_rule.dscp_mark)
@ -48,8 +53,8 @@ class TestQoSDSCPMarkingRule(base.BaseFunctionalTest):
def tearDown(self):
rule = self.conn.network.delete_qos_minimum_bandwidth_rule(
self.RULE_ID,
self.QOS_POLICY_ID)
self.RULE_ID, self.QOS_POLICY_ID
)
qos_policy = self.conn.network.delete_qos_policy(self.QOS_POLICY_ID)
self.assertIsNone(rule)
self.assertIsNone(qos_policy)
@ -57,28 +62,30 @@ class TestQoSDSCPMarkingRule(base.BaseFunctionalTest):
def test_find(self):
sot = self.conn.network.find_qos_dscp_marking_rule(
self.RULE_ID,
self.QOS_POLICY_ID)
self.RULE_ID, self.QOS_POLICY_ID
)
self.assertEqual(self.RULE_ID, sot.id)
self.assertEqual(self.RULE_DSCP_MARK, sot.dscp_mark)
def test_get(self):
sot = self.conn.network.get_qos_dscp_marking_rule(
self.RULE_ID,
self.QOS_POLICY_ID)
self.RULE_ID, self.QOS_POLICY_ID
)
self.assertEqual(self.RULE_ID, sot.id)
self.assertEqual(self.QOS_POLICY_ID, sot.qos_policy_id)
self.assertEqual(self.RULE_DSCP_MARK, sot.dscp_mark)
def test_list(self):
rule_ids = [o.id for o in
self.conn.network.qos_dscp_marking_rules(
self.QOS_POLICY_ID)]
rule_ids = [
o.id
for o in self.conn.network.qos_dscp_marking_rules(
self.QOS_POLICY_ID
)
]
self.assertIn(self.RULE_ID, rule_ids)
def test_update(self):
sot = self.conn.network.update_qos_dscp_marking_rule(
self.RULE_ID,
self.QOS_POLICY_ID,
dscp_mark=self.RULE_DSCP_MARK_NEW)
self.RULE_ID, self.QOS_POLICY_ID, dscp_mark=self.RULE_DSCP_MARK_NEW
)
self.assertEqual(self.RULE_DSCP_MARK_NEW, sot.dscp_mark)

View File

@ -11,8 +11,9 @@
# under the License.
from openstack.network.v2 import (qos_minimum_bandwidth_rule as
_qos_minimum_bandwidth_rule)
from openstack.network.v2 import (
qos_minimum_bandwidth_rule as _qos_minimum_bandwidth_rule
)
from openstack.tests.functional import base
@ -24,67 +25,78 @@ class TestQoSMinimumBandwidthRule(base.BaseFunctionalTest):
RULE_ID = None
RULE_MIN_KBPS = 1200
RULE_MIN_KBPS_NEW = 1800
RULE_DIRECTION = 'egress'
RULE_DIRECTION = "egress"
def setUp(self):
super(TestQoSMinimumBandwidthRule, self).setUp()
if not self.operator_cloud:
self.skipTest("Operator cloud is required for this test")
# Skip the tests if qos-bw-limit-direction extension is not enabled.
if not self.conn.network.find_extension('qos-bw-limit-direction'):
self.skipTest('Network qos-bw-limit-direction extension disabled')
if not self.operator_cloud.network.find_extension(
"qos-bw-limit-direction"):
self.skipTest("Network qos-bw-limit-direction extension disabled")
self.QOS_POLICY_NAME = self.getUniqueString()
qos_policy = self.conn.network.create_qos_policy(
qos_policy = self.operator_cloud.network.create_qos_policy(
description=self.QOS_POLICY_DESCRIPTION,
name=self.QOS_POLICY_NAME,
shared=self.QOS_IS_SHARED,
)
self.QOS_POLICY_ID = qos_policy.id
qos_min_bw_rule = self.conn.network.create_qos_minimum_bandwidth_rule(
self.QOS_POLICY_ID, direction=self.RULE_DIRECTION,
min_kbps=self.RULE_MIN_KBPS,
qos_min_bw_rule = self.operator_cloud.network \
.create_qos_minimum_bandwidth_rule(
self.QOS_POLICY_ID,
direction=self.RULE_DIRECTION,
min_kbps=self.RULE_MIN_KBPS,
)
assert isinstance(
qos_min_bw_rule,
_qos_minimum_bandwidth_rule.QoSMinimumBandwidthRule,
)
assert isinstance(qos_min_bw_rule,
_qos_minimum_bandwidth_rule.QoSMinimumBandwidthRule)
self.assertEqual(self.RULE_MIN_KBPS, qos_min_bw_rule.min_kbps)
self.assertEqual(self.RULE_DIRECTION, qos_min_bw_rule.direction)
self.RULE_ID = qos_min_bw_rule.id
def tearDown(self):
rule = self.conn.network.delete_qos_minimum_bandwidth_rule(
self.RULE_ID,
rule = self.operator_cloud.network.delete_qos_minimum_bandwidth_rule(
self.RULE_ID, self.QOS_POLICY_ID
)
qos_policy = self.operator_cloud.network.delete_qos_policy(
self.QOS_POLICY_ID)
qos_policy = self.conn.network.delete_qos_policy(self.QOS_POLICY_ID)
self.assertIsNone(rule)
self.assertIsNone(qos_policy)
super(TestQoSMinimumBandwidthRule, self).tearDown()
def test_find(self):
sot = self.conn.network.find_qos_minimum_bandwidth_rule(
self.RULE_ID,
self.QOS_POLICY_ID)
sot = self.operator_cloud.network.find_qos_minimum_bandwidth_rule(
self.RULE_ID, self.QOS_POLICY_ID
)
self.assertEqual(self.RULE_ID, sot.id)
self.assertEqual(self.RULE_DIRECTION, sot.direction)
self.assertEqual(self.RULE_MIN_KBPS, sot.min_kbps)
def test_get(self):
sot = self.conn.network.get_qos_minimum_bandwidth_rule(
self.RULE_ID,
self.QOS_POLICY_ID)
sot = self.operator_cloud.network.get_qos_minimum_bandwidth_rule(
self.RULE_ID, self.QOS_POLICY_ID
)
self.assertEqual(self.RULE_ID, sot.id)
self.assertEqual(self.QOS_POLICY_ID, sot.qos_policy_id)
self.assertEqual(self.RULE_DIRECTION, sot.direction)
self.assertEqual(self.RULE_MIN_KBPS, sot.min_kbps)
def test_list(self):
rule_ids = [o.id for o in
self.conn.network.qos_minimum_bandwidth_rules(
self.QOS_POLICY_ID)]
rule_ids = [
o.id
for o in self.operator_cloud.network.qos_minimum_bandwidth_rules(
self.QOS_POLICY_ID
)
]
self.assertIn(self.RULE_ID, rule_ids)
def test_update(self):
sot = self.conn.network.update_qos_minimum_bandwidth_rule(
self.RULE_ID,
self.QOS_POLICY_ID,
min_kbps=self.RULE_MIN_KBPS_NEW)
sot = self.operator_cloud.network.update_qos_minimum_bandwidth_rule(
self.RULE_ID, self.QOS_POLICY_ID, min_kbps=self.RULE_MIN_KBPS_NEW
)
self.assertEqual(self.RULE_MIN_KBPS_NEW, sot.min_kbps)

View File

@ -11,8 +11,9 @@
# under the License.
from openstack.network.v2 import (qos_minimum_packet_rate_rule as
_qos_minimum_packet_rate_rule)
from openstack.network.v2 import (
qos_minimum_packet_rate_rule as _qos_minimum_packet_rate_rule
)
from openstack.tests.functional import base
@ -24,71 +25,84 @@ class TestQoSMinimumPacketRateRule(base.BaseFunctionalTest):
RULE_ID = None
RULE_MIN_KPPS = 1200
RULE_MIN_KPPS_NEW = 1800
RULE_DIRECTION = 'egress'
RULE_DIRECTION_NEW = 'ingress'
RULE_DIRECTION = "egress"
RULE_DIRECTION_NEW = "ingress"
def setUp(self):
super(TestQoSMinimumPacketRateRule, self).setUp()
if not self.operator_cloud:
self.skipTest("Operator cloud is required for this test")
# Skip the tests if qos-pps-minimum extension is not enabled.
if not self.conn.network.find_extension('qos-pps-minimum'):
self.skipTest('Network qos-pps-minimum extension disabled')
if not self.operator_cloud.network.find_extension("qos-pps-minimum"):
self.skipTest("Network qos-pps-minimum extension disabled")
self.QOS_POLICY_NAME = self.getUniqueString()
qos_policy = self.conn.network.create_qos_policy(
qos_policy = self.operator_cloud.network.create_qos_policy(
description=self.QOS_POLICY_DESCRIPTION,
name=self.QOS_POLICY_NAME,
shared=self.QOS_IS_SHARED,
)
self.QOS_POLICY_ID = qos_policy.id
qos_min_pps_rule = (
self.conn.network.create_qos_minimum_packet_rate_rule(
self.QOS_POLICY_ID, direction=self.RULE_DIRECTION,
min_kpps=self.RULE_MIN_KPPS))
self.operator_cloud.network.create_qos_minimum_packet_rate_rule(
self.QOS_POLICY_ID,
direction=self.RULE_DIRECTION,
min_kpps=self.RULE_MIN_KPPS,
)
)
assert isinstance(
qos_min_pps_rule,
_qos_minimum_packet_rate_rule.QoSMinimumPacketRateRule)
_qos_minimum_packet_rate_rule.QoSMinimumPacketRateRule,
)
self.assertEqual(self.RULE_MIN_KPPS, qos_min_pps_rule.min_kpps)
self.assertEqual(self.RULE_DIRECTION, qos_min_pps_rule.direction)
self.RULE_ID = qos_min_pps_rule.id
def tearDown(self):
rule = self.conn.network.delete_qos_minimum_packet_rate_rule(
self.RULE_ID,
self.QOS_POLICY_ID)
qos_policy = self.conn.network.delete_qos_policy(self.QOS_POLICY_ID)
rule = self.operator_cloud.network.delete_qos_minimum_packet_rate_rule(
self.RULE_ID, self.QOS_POLICY_ID
)
qos_policy = self.operator_cloud.network.delete_qos_policy(
self.QOS_POLICY_ID
)
self.assertIsNone(rule)
self.assertIsNone(qos_policy)
super(TestQoSMinimumPacketRateRule, self).tearDown()
def test_find(self):
sot = self.conn.network.find_qos_minimum_packet_rate_rule(
self.RULE_ID,
self.QOS_POLICY_ID)
sot = self.operator_cloud.network.find_qos_minimum_packet_rate_rule(
self.RULE_ID, self.QOS_POLICY_ID
)
self.assertEqual(self.RULE_ID, sot.id)
self.assertEqual(self.RULE_DIRECTION, sot.direction)
self.assertEqual(self.RULE_MIN_KPPS, sot.min_kpps)
def test_get(self):
sot = self.conn.network.get_qos_minimum_packet_rate_rule(
self.RULE_ID,
self.QOS_POLICY_ID)
sot = self.operator_cloud.network.get_qos_minimum_packet_rate_rule(
self.RULE_ID, self.QOS_POLICY_ID
)
self.assertEqual(self.RULE_ID, sot.id)
self.assertEqual(self.QOS_POLICY_ID, sot.qos_policy_id)
self.assertEqual(self.RULE_DIRECTION, sot.direction)
self.assertEqual(self.RULE_MIN_KPPS, sot.min_kpps)
def test_list(self):
rule_ids = [o.id for o in
self.conn.network.qos_minimum_packet_rate_rules(
self.QOS_POLICY_ID)]
rule_ids = [
o.id
for o in self.operator_cloud.network.qos_minimum_packet_rate_rules(
self.QOS_POLICY_ID
)
]
self.assertIn(self.RULE_ID, rule_ids)
def test_update(self):
sot = self.conn.network.update_qos_minimum_packet_rate_rule(
sot = self.operator_cloud.network.update_qos_minimum_packet_rate_rule(
self.RULE_ID,
self.QOS_POLICY_ID,
min_kpps=self.RULE_MIN_KPPS_NEW,
direction=self.RULE_DIRECTION_NEW)
direction=self.RULE_DIRECTION_NEW,
)
self.assertEqual(self.RULE_MIN_KPPS_NEW, sot.min_kpps)
self.assertEqual(self.RULE_DIRECTION_NEW, sot.direction)

View File

@ -26,13 +26,16 @@ class TestQoSPolicy(base.BaseFunctionalTest):
def setUp(self):
super(TestQoSPolicy, self).setUp()
if not self.operator_cloud:
self.skipTest("Operator cloud is required for this test")
# Skip the tests if qos extension is not enabled.
if not self.conn.network.find_extension('qos'):
self.skipTest('Network qos extension disabled')
if not self.operator_cloud.network.find_extension("qos"):
self.skipTest("Network qos extension disabled")
self.QOS_POLICY_NAME = self.getUniqueString()
self.QOS_POLICY_NAME_UPDATED = self.getUniqueString()
qos = self.conn.network.create_qos_policy(
qos = self.operator_cloud.network.create_qos_policy(
description=self.QOS_POLICY_DESCRIPTION,
name=self.QOS_POLICY_NAME,
shared=self.IS_SHARED,
@ -43,16 +46,16 @@ class TestQoSPolicy(base.BaseFunctionalTest):
self.QOS_POLICY_ID = qos.id
def tearDown(self):
sot = self.conn.network.delete_qos_policy(self.QOS_POLICY_ID)
sot = self.operator_cloud.network.delete_qos_policy(self.QOS_POLICY_ID)
self.assertIsNone(sot)
super(TestQoSPolicy, self).tearDown()
def test_find(self):
sot = self.conn.network.find_qos_policy(self.QOS_POLICY_NAME)
sot = self.operator_cloud.network.find_qos_policy(self.QOS_POLICY_NAME)
self.assertEqual(self.QOS_POLICY_ID, sot.id)
def test_get(self):
sot = self.conn.network.get_qos_policy(self.QOS_POLICY_ID)
sot = self.operator_cloud.network.get_qos_policy(self.QOS_POLICY_ID)
self.assertEqual(self.QOS_POLICY_NAME, sot.name)
self.assertEqual(self.IS_SHARED, sot.is_shared)
self.assertEqual(self.RULES, sot.rules)
@ -60,23 +63,23 @@ class TestQoSPolicy(base.BaseFunctionalTest):
self.assertEqual(self.IS_DEFAULT, sot.is_default)
def test_list(self):
names = [o.name for o in self.conn.network.qos_policies()]
names = [o.name for o in self.operator_cloud.network.qos_policies()]
self.assertIn(self.QOS_POLICY_NAME, names)
def test_update(self):
sot = self.conn.network.update_qos_policy(
self.QOS_POLICY_ID,
name=self.QOS_POLICY_NAME_UPDATED)
sot = self.operator_cloud.network.update_qos_policy(
self.QOS_POLICY_ID, name=self.QOS_POLICY_NAME_UPDATED
)
self.assertEqual(self.QOS_POLICY_NAME_UPDATED, sot.name)
def test_set_tags(self):
sot = self.conn.network.get_qos_policy(self.QOS_POLICY_ID)
sot = self.operator_cloud.network.get_qos_policy(self.QOS_POLICY_ID)
self.assertEqual([], sot.tags)
self.conn.network.set_tags(sot, ['blue'])
sot = self.conn.network.get_qos_policy(self.QOS_POLICY_ID)
self.assertEqual(['blue'], sot.tags)
self.operator_cloud.network.set_tags(sot, ["blue"])
sot = self.operator_cloud.network.get_qos_policy(self.QOS_POLICY_ID)
self.assertEqual(["blue"], sot.tags)
self.conn.network.set_tags(sot, [])
sot = self.conn.network.get_qos_policy(self.QOS_POLICY_ID)
self.operator_cloud.network.set_tags(sot, [])
sot = self.operator_cloud.network.get_qos_policy(self.QOS_POLICY_ID)
self.assertEqual([], sot.tags)

View File

@ -20,22 +20,27 @@ class TestQoSRuleType(base.BaseFunctionalTest):
def setUp(self):
super(TestQoSRuleType, self).setUp()
if not self.operator_cloud:
self.skipTest("Operator cloud is required for this test")
# Skip the tests if qos-rule-type-details extension is not enabled.
if not self.conn.network.find_extension('qos-rule-type-details'):
self.skipTest('Network qos-rule-type-details extension disabled')
if not self.operator_cloud.network.find_extension(
"qos-rule-type-details"):
self.skipTest("Network qos-rule-type-details extension disabled")
def test_find(self):
sot = self.conn.network.find_qos_rule_type(self.QOS_RULE_TYPE)
sot = self.operator_cloud.network.find_qos_rule_type(
self.QOS_RULE_TYPE)
self.assertEqual(self.QOS_RULE_TYPE, sot.type)
self.assertIsInstance(sot.drivers, list)
def test_get(self):
sot = self.conn.network.get_qos_rule_type(self.QOS_RULE_TYPE)
sot = self.operator_cloud.network.get_qos_rule_type(self.QOS_RULE_TYPE)
self.assertEqual(self.QOS_RULE_TYPE, sot.type)
self.assertIsInstance(sot.drivers, list)
def test_list(self):
rule_types = list(self.conn.network.qos_rule_types())
rule_types = list(self.operator_cloud.network.qos_rule_types())
self.assertGreater(len(rule_types), 0)
for rule_type in rule_types:

View File

@ -14,23 +14,32 @@ from openstack.tests.functional import base
class TestQuota(base.BaseFunctionalTest):
def setUp(self):
super(TestQuota, self).setUp()
if not self.operator_cloud:
self.skipTest("Operator cloud required for this test")
def test_list(self):
for qot in self.conn.network.quotas():
for qot in self.operator_cloud.network.quotas():
self.assertIsNotNone(qot.project_id)
self.assertIsNotNone(qot.networks)
def test_list_details(self):
expected_keys = ['limit', 'used', 'reserved']
project_id = self.conn.session.get_project_id()
quota_details = self.conn.network.get_quota(project_id, details=True)
expected_keys = ["limit", "used", "reserved"]
project_id = self.operator_cloud.session.get_project_id()
quota_details = self.operator_cloud.network.get_quota(
project_id, details=True
)
for details in quota_details._body.attributes.values():
for expected_key in expected_keys:
self.assertTrue(expected_key in details.keys())
def test_set(self):
attrs = {'networks': 123456789}
for project_quota in self.conn.network.quotas():
self.conn.network.update_quota(project_quota, **attrs)
new_quota = self.conn.network.get_quota(project_quota.project_id)
attrs = {"networks": 123456789}
for project_quota in self.operator_cloud.network.quotas():
self.operator_cloud.network.update_quota(project_quota, **attrs)
new_quota = self.operator_cloud.network.get_quota(
project_quota.project_id
)
self.assertEqual(123456789, new_quota.networks)

View File

@ -18,47 +18,77 @@ from openstack.tests.functional import base
class TestRBACPolicy(base.BaseFunctionalTest):
ACTION = 'access_as_shared'
OBJ_TYPE = 'network'
TARGET_TENANT_ID = '*'
ACTION = "access_as_shared"
OBJ_TYPE = "network"
TARGET_TENANT_ID = "*"
NET_ID = None
ID = None
def setUp(self):
super(TestRBACPolicy, self).setUp()
self.NET_NAME = self.getUniqueString('net')
if not self.user_cloud._has_neutron_extension("rbac-policies"):
self.skipTest(
"Neutron rbac-policies extension is required for this test"
)
self.NET_NAME = self.getUniqueString("net")
self.UPDATE_NAME = self.getUniqueString()
net = self.conn.network.create_network(name=self.NET_NAME)
net = self.user_cloud.network.create_network(name=self.NET_NAME)
assert isinstance(net, network.Network)
self.NET_ID = net.id
sot = self.conn.network.create_rbac_policy(
action=self.ACTION,
object_type=self.OBJ_TYPE,
target_tenant=self.TARGET_TENANT_ID,
object_id=self.NET_ID)
assert isinstance(sot, rbac_policy.RBACPolicy)
self.ID = sot.id
if self.operator_cloud:
sot = self.operator_cloud.network.create_rbac_policy(
action=self.ACTION,
object_type=self.OBJ_TYPE,
target_tenant=self.TARGET_TENANT_ID,
object_id=self.NET_ID,
)
assert isinstance(sot, rbac_policy.RBACPolicy)
self.ID = sot.id
else:
sot = self.user_cloud.network.create_rbac_policy(
action=self.ACTION,
object_type=self.OBJ_TYPE,
target_tenant=self.user_cloud.current_project_id,
object_id=self.NET_ID,
)
assert isinstance(sot, rbac_policy.RBACPolicy)
self.ID = sot.id
def tearDown(self):
sot = self.conn.network.delete_rbac_policy(
self.ID,
ignore_missing=False)
if self.operator_cloud:
sot = self.operator_cloud.network.delete_rbac_policy(
self.ID, ignore_missing=False
)
else:
sot = self.user_cloud.network.delete_rbac_policy(
self.ID, ignore_missing=False
)
self.assertIsNone(sot)
sot = self.conn.network.delete_network(
self.NET_ID,
ignore_missing=False)
sot = self.user_cloud.network.delete_network(
self.NET_ID, ignore_missing=False
)
self.assertIsNone(sot)
super(TestRBACPolicy, self).tearDown()
def test_find(self):
sot = self.conn.network.find_rbac_policy(self.ID)
if self.operator_cloud:
sot = self.operator_cloud.network.find_rbac_policy(self.ID)
else:
sot = self.user_cloud.network.find_rbac_policy(self.ID)
self.assertEqual(self.ID, sot.id)
def test_get(self):
sot = self.conn.network.get_rbac_policy(self.ID)
if self.operator_cloud:
sot = self.operator_cloud.network.get_rbac_policy(self.ID)
else:
sot = self.user_cloud.network.get_rbac_policy(self.ID)
self.assertEqual(self.ID, sot.id)
def test_list(self):
ids = [o.id for o in self.conn.network.rbac_policies()]
self.assertIn(self.ID, ids)
if self.operator_cloud:
ids = [o.id for o in self.operator_cloud.network.rbac_policies()]
else:
ids = [o.id for o in self.user_cloud.network.rbac_policies()]
if self.ID:
self.assertIn(self.ID, ids)

View File

@ -23,44 +23,50 @@ class TestRouter(base.BaseFunctionalTest):
super(TestRouter, self).setUp()
self.NAME = self.getUniqueString()
self.UPDATE_NAME = self.getUniqueString()
sot = self.conn.network.create_router(name=self.NAME)
sot = self.user_cloud.network.create_router(name=self.NAME)
assert isinstance(sot, router.Router)
self.assertEqual(self.NAME, sot.name)
self.ID = sot.id
def tearDown(self):
sot = self.conn.network.delete_router(self.ID, ignore_missing=False)
sot = self.user_cloud.network.delete_router(
self.ID, ignore_missing=False
)
self.assertIsNone(sot)
super(TestRouter, self).tearDown()
def test_find(self):
sot = self.conn.network.find_router(self.NAME)
sot = self.user_cloud.network.find_router(self.NAME)
self.assertEqual(self.ID, sot.id)
def test_get(self):
sot = self.conn.network.get_router(self.ID)
sot = self.user_cloud.network.get_router(self.ID)
self.assertEqual(self.NAME, sot.name)
self.assertEqual(self.ID, sot.id)
self.assertFalse(sot.is_ha)
if not self.user_cloud._has_neutron_extension("l3-ha"):
self.assertFalse(sot.is_ha)
def test_list(self):
names = [o.name for o in self.conn.network.routers()]
names = [o.name for o in self.user_cloud.network.routers()]
self.assertIn(self.NAME, names)
ha = [o.is_ha for o in self.conn.network.routers()]
self.assertIn(False, ha)
if not self.user_cloud._has_neutron_extension("l3-ha"):
ha = [o.is_ha for o in self.user_cloud.network.routers()]
self.assertIn(False, ha)
def test_update(self):
sot = self.conn.network.update_router(self.ID, name=self.UPDATE_NAME)
sot = self.user_cloud.network.update_router(
self.ID, name=self.UPDATE_NAME
)
self.assertEqual(self.UPDATE_NAME, sot.name)
def test_set_tags(self):
sot = self.conn.network.get_router(self.ID)
sot = self.user_cloud.network.get_router(self.ID)
self.assertEqual([], sot.tags)
self.conn.network.set_tags(sot, ['blue'])
sot = self.conn.network.get_router(self.ID)
self.assertEqual(['blue'], sot.tags)
self.user_cloud.network.set_tags(sot, ["blue"])
sot = self.user_cloud.network.get_router(self.ID)
self.assertEqual(["blue"], sot.tags)
self.conn.network.set_tags(sot, [])
sot = self.conn.network.get_router(self.ID)
self.user_cloud.network.set_tags(sot, [])
sot = self.user_cloud.network.get_router(self.ID)
self.assertEqual([], sot.tags)

View File

@ -31,17 +31,18 @@ class TestRouterInterface(base.BaseFunctionalTest):
self.ROUTER_NAME = self.getUniqueString()
self.NET_NAME = self.getUniqueString()
self.SUB_NAME = self.getUniqueString()
sot = self.conn.network.create_router(name=self.ROUTER_NAME)
sot = self.user_cloud.network.create_router(name=self.ROUTER_NAME)
assert isinstance(sot, router.Router)
self.assertEqual(self.ROUTER_NAME, sot.name)
net = self.conn.network.create_network(name=self.NET_NAME)
net = self.user_cloud.network.create_network(name=self.NET_NAME)
assert isinstance(net, network.Network)
self.assertEqual(self.NET_NAME, net.name)
sub = self.conn.network.create_subnet(
sub = self.user_cloud.network.create_subnet(
name=self.SUB_NAME,
ip_version=self.IPV4,
network_id=net.id,
cidr=self.CIDR)
cidr=self.CIDR,
)
assert isinstance(sub, subnet.Subnet)
self.assertEqual(self.SUB_NAME, sub.name)
self.ROUTER_ID = sot.id
@ -50,25 +51,30 @@ class TestRouterInterface(base.BaseFunctionalTest):
self.SUB_ID = sub.id
def tearDown(self):
sot = self.conn.network.delete_router(
self.ROUTER_ID, ignore_missing=False)
sot = self.user_cloud.network.delete_router(
self.ROUTER_ID, ignore_missing=False
)
self.assertIsNone(sot)
sot = self.conn.network.delete_subnet(
self.SUB_ID, ignore_missing=False)
sot = self.user_cloud.network.delete_subnet(
self.SUB_ID, ignore_missing=False
)
self.assertIsNone(sot)
sot = self.conn.network.delete_network(
self.NET_ID, ignore_missing=False)
sot = self.user_cloud.network.delete_network(
self.NET_ID, ignore_missing=False
)
self.assertIsNone(sot)
super(TestRouterInterface, self).tearDown()
def test_router_add_remove_interface(self):
iface = self.ROT.add_interface(self.conn.network,
subnet_id=self.SUB_ID)
iface = self.ROT.add_interface(
self.user_cloud.network, subnet_id=self.SUB_ID
)
self._verification(iface)
iface = self.ROT.remove_interface(self.conn.network,
subnet_id=self.SUB_ID)
iface = self.ROT.remove_interface(
self.user_cloud.network, subnet_id=self.SUB_ID
)
self._verification(iface)
def _verification(self, interface):
self.assertEqual(interface['subnet_id'], self.SUB_ID)
self.assertIn('port_id', interface)
self.assertEqual(interface["subnet_id"], self.SUB_ID)
self.assertIn("port_id", interface)

View File

@ -22,42 +22,45 @@ class TestSecurityGroup(base.BaseFunctionalTest):
def setUp(self):
super(TestSecurityGroup, self).setUp()
self.NAME = self.getUniqueString()
sot = self.conn.network.create_security_group(name=self.NAME)
sot = self.user_cloud.network.create_security_group(name=self.NAME)
assert isinstance(sot, security_group.SecurityGroup)
self.assertEqual(self.NAME, sot.name)
self.ID = sot.id
def tearDown(self):
sot = self.conn.network.delete_security_group(
self.ID, ignore_missing=False)
sot = self.user_cloud.network.delete_security_group(
self.ID, ignore_missing=False
)
self.assertIsNone(sot)
super(TestSecurityGroup, self).tearDown()
def test_find(self):
sot = self.conn.network.find_security_group(self.NAME)
sot = self.user_cloud.network.find_security_group(self.NAME)
self.assertEqual(self.ID, sot.id)
def test_get(self):
sot = self.conn.network.get_security_group(self.ID)
sot = self.user_cloud.network.get_security_group(self.ID)
self.assertEqual(self.NAME, sot.name)
self.assertEqual(self.ID, sot.id)
def test_list(self):
names = [o.name for o in self.conn.network.security_groups()]
names = [o.name for o in self.user_cloud.network.security_groups()]
self.assertIn(self.NAME, names)
def test_list_query_list_of_ids(self):
ids = [o.id for o in self.conn.network.security_groups(id=[self.ID])]
ids = [
o.id for o in self.user_cloud.network.security_groups(id=[self.ID])
]
self.assertIn(self.ID, ids)
def test_set_tags(self):
sot = self.conn.network.get_security_group(self.ID)
sot = self.user_cloud.network.get_security_group(self.ID)
self.assertEqual([], sot.tags)
self.conn.network.set_tags(sot, ['blue'])
sot = self.conn.network.get_security_group(self.ID)
self.assertEqual(['blue'], sot.tags)
self.user_cloud.network.set_tags(sot, ["blue"])
sot = self.user_cloud.network.get_security_group(self.ID)
self.assertEqual(["blue"], sot.tags)
self.conn.network.set_tags(sot, [])
sot = self.conn.network.get_security_group(self.ID)
self.user_cloud.network.set_tags(sot, [])
sot = self.user_cloud.network.get_security_group(self.ID)
self.assertEqual([], sot.tags)

View File

@ -18,43 +18,49 @@ from openstack.tests.functional import base
class TestSecurityGroupRule(base.BaseFunctionalTest):
IPV4 = 'IPv4'
PROTO = 'tcp'
IPV4 = "IPv4"
PROTO = "tcp"
PORT = 22
DIR = 'ingress'
DIR = "ingress"
ID = None
RULE_ID = None
def setUp(self):
super(TestSecurityGroupRule, self).setUp()
self.NAME = self.getUniqueString()
sot = self.conn.network.create_security_group(name=self.NAME)
sot = self.user_cloud.network.create_security_group(name=self.NAME)
assert isinstance(sot, security_group.SecurityGroup)
self.assertEqual(self.NAME, sot.name)
self.ID = sot.id
rul = self.conn.network.create_security_group_rule(
direction=self.DIR, ethertype=self.IPV4,
port_range_max=self.PORT, port_range_min=self.PORT,
protocol=self.PROTO, security_group_id=self.ID)
rul = self.user_cloud.network.create_security_group_rule(
direction=self.DIR,
ethertype=self.IPV4,
port_range_max=self.PORT,
port_range_min=self.PORT,
protocol=self.PROTO,
security_group_id=self.ID,
)
assert isinstance(rul, security_group_rule.SecurityGroupRule)
self.assertEqual(self.ID, rul.security_group_id)
self.RULE_ID = rul.id
def tearDown(self):
sot = self.conn.network.delete_security_group_rule(
self.RULE_ID, ignore_missing=False)
sot = self.user_cloud.network.delete_security_group_rule(
self.RULE_ID, ignore_missing=False
)
self.assertIsNone(sot)
sot = self.conn.network.delete_security_group(
self.ID, ignore_missing=False)
sot = self.user_cloud.network.delete_security_group(
self.ID, ignore_missing=False
)
self.assertIsNone(sot)
super(TestSecurityGroupRule, self).tearDown()
def test_find(self):
sot = self.conn.network.find_security_group_rule(self.RULE_ID)
sot = self.user_cloud.network.find_security_group_rule(self.RULE_ID)
self.assertEqual(self.RULE_ID, sot.id)
def test_get(self):
sot = self.conn.network.get_security_group_rule(self.RULE_ID)
sot = self.user_cloud.network.get_security_group_rule(self.RULE_ID)
self.assertEqual(self.RULE_ID, sot.id)
self.assertEqual(self.DIR, sot.direction)
self.assertEqual(self.PROTO, sot.protocol)
@ -63,5 +69,5 @@ class TestSecurityGroupRule(base.BaseFunctionalTest):
self.assertEqual(self.ID, sot.security_group_id)
def test_list(self):
ids = [o.id for o in self.conn.network.security_group_rules()]
ids = [o.id for o in self.user_cloud.network.security_group_rules()]
self.assertIn(self.RULE_ID, ids)

View File

@ -29,20 +29,25 @@ class TestSegment(base.BaseFunctionalTest):
super(TestSegment, self).setUp()
self.NETWORK_NAME = self.getUniqueString()
if not self.operator_cloud:
self.skipTest("Operator cloud required for this test")
# NOTE(rtheis): The segment extension is not yet enabled by default.
# Skip the tests if not enabled.
if not self.conn.network.find_extension('segment'):
self.skipTest('Segment extension disabled')
if not self.operator_cloud.network.find_extension("segment"):
self.skipTest("Segment extension disabled")
# Create a network to hold the segment.
net = self.conn.network.create_network(name=self.NETWORK_NAME)
net = self.operator_cloud.network.create_network(
name=self.NETWORK_NAME
)
assert isinstance(net, network.Network)
self.assertEqual(self.NETWORK_NAME, net.name)
self.NETWORK_ID = net.id
if self.SEGMENT_EXTENSION:
# Get the segment for the network.
for seg in self.conn.network.segments():
for seg in self.operator_cloud.network.segments():
assert isinstance(seg, segment.Segment)
if self.NETWORK_ID == seg.network_id:
self.NETWORK_TYPE = seg.network_type
@ -52,36 +57,36 @@ class TestSegment(base.BaseFunctionalTest):
break
def tearDown(self):
sot = self.conn.network.delete_network(
self.NETWORK_ID,
ignore_missing=False)
sot = self.operator_cloud.network.delete_network(
self.NETWORK_ID, ignore_missing=False
)
self.assertIsNone(sot)
super(TestSegment, self).tearDown()
def test_create_delete(self):
sot = self.conn.network.create_segment(
description='test description',
name='test name',
sot = self.operator_cloud.network.create_segment(
description="test description",
name="test name",
network_id=self.NETWORK_ID,
network_type='geneve',
network_type="geneve",
segmentation_id=2055,
)
self.assertIsInstance(sot, segment.Segment)
del_sot = self.conn.network.delete_segment(sot.id)
self.assertEqual('test description', sot.description)
self.assertEqual('test name', sot.name)
del_sot = self.operator_cloud.network.delete_segment(sot.id)
self.assertEqual("test description", sot.description)
self.assertEqual("test name", sot.name)
self.assertEqual(self.NETWORK_ID, sot.network_id)
self.assertEqual('geneve', sot.network_type)
self.assertEqual("geneve", sot.network_type)
self.assertIsNone(sot.physical_network)
self.assertEqual(2055, sot.segmentation_id)
self.assertIsNone(del_sot)
def test_find(self):
sot = self.conn.network.find_segment(self.SEGMENT_ID)
sot = self.operator_cloud.network.find_segment(self.SEGMENT_ID)
self.assertEqual(self.SEGMENT_ID, sot.id)
def test_get(self):
sot = self.conn.network.get_segment(self.SEGMENT_ID)
sot = self.operator_cloud.network.get_segment(self.SEGMENT_ID)
self.assertEqual(self.SEGMENT_ID, sot.id)
self.assertIsNone(sot.name)
self.assertEqual(self.NETWORK_ID, sot.network_id)
@ -90,10 +95,11 @@ class TestSegment(base.BaseFunctionalTest):
self.assertEqual(self.SEGMENTATION_ID, sot.segmentation_id)
def test_list(self):
ids = [o.id for o in self.conn.network.segments(name=None)]
ids = [o.id for o in self.operator_cloud.network.segments(name=None)]
self.assertIn(self.SEGMENT_ID, ids)
def test_update(self):
sot = self.conn.network.update_segment(self.SEGMENT_ID,
description='update')
self.assertEqual('update', sot.description)
sot = self.operator_cloud.network.update_segment(
self.SEGMENT_ID, description="update"
)
self.assertEqual("update", sot.description)

View File

@ -23,42 +23,71 @@ class TestServiceProfile(base.BaseFunctionalTest):
def setUp(self):
super(TestServiceProfile, self).setUp()
service_profiles = self.conn.network.create_service_profile(
description=self.SERVICE_PROFILE_DESCRIPTION,
metainfo=self.METAINFO,)
assert isinstance(service_profiles, _service_profile.ServiceProfile)
self.assertEqual(
self.SERVICE_PROFILE_DESCRIPTION,
service_profiles.description)
self.assertEqual(self.METAINFO, service_profiles.meta_info)
if not self.user_cloud._has_neutron_extension("flavors"):
self.skipTest("Neutron flavor extension is required for this test")
self.ID = service_profiles.id
if self.operator_cloud:
service_profiles = (
self.operator_cloud.network.create_service_profile(
description=self.SERVICE_PROFILE_DESCRIPTION,
metainfo=self.METAINFO,
)
)
assert isinstance(
service_profiles, _service_profile.ServiceProfile
)
self.assertEqual(
self.SERVICE_PROFILE_DESCRIPTION, service_profiles.description
)
self.assertEqual(self.METAINFO, service_profiles.meta_info)
self.ID = service_profiles.id
def tearDown(self):
service_profiles = self.conn.network.delete_service_profile(
self.ID,
ignore_missing=True)
self.assertIsNone(service_profiles)
if self.ID:
service_profiles = (
self.operator_cloud.network.delete_service_profile(
self.ID, ignore_missing=True
)
)
self.assertIsNone(service_profiles)
super(TestServiceProfile, self).tearDown()
def test_find(self):
service_profiles = self.conn.network.find_service_profile(
self.ID)
self.assertEqual(self.METAINFO,
service_profiles.meta_info)
self.user_cloud.network.find_service_profile(
name_or_id="not_existing",
ignore_missing=True)
if self.operator_cloud and self.ID:
service_profiles = self.operator_cloud.network \
.find_service_profile(self.ID)
self.assertEqual(self.METAINFO, service_profiles.meta_info)
def test_get(self):
service_profiles = self.conn.network.get_service_profile(self.ID)
if not self.ID:
self.skipTest("ServiceProfile was not created")
service_profiles = self.operator_cloud.network.get_service_profile(
self.ID)
self.assertEqual(self.METAINFO, service_profiles.meta_info)
self.assertEqual(self.SERVICE_PROFILE_DESCRIPTION,
service_profiles.description)
self.assertEqual(
self.SERVICE_PROFILE_DESCRIPTION, service_profiles.description
)
def test_update(self):
service_profiles = self.conn.network.update_service_profile(
self.ID,
description=self.UPDATE_DESCRIPTION)
if not self.ID:
self.skipTest("ServiceProfile was not created")
service_profiles = self.operator_cloud.network.update_service_profile(
self.ID, description=self.UPDATE_DESCRIPTION
)
self.assertEqual(self.UPDATE_DESCRIPTION, service_profiles.description)
def test_list(self):
metainfos = [f.meta_info for f in self.conn.network.service_profiles()]
self.assertIn(self.METAINFO, metainfos)
# Test in user scope
self.user_cloud.network.service_profiles()
# Test as operator
if self.operator_cloud:
metainfos = [
f.meta_info for f in
self.operator_cloud.network.service_profiles()
]
if self.ID:
self.assertIn(self.METAINFO, metainfos)

View File

@ -15,8 +15,9 @@ from openstack.tests.functional import base
class TestServiceProvider(base.BaseFunctionalTest):
def test_list(self):
providers = list(self.conn.network.service_providers())
providers = list(self.user_cloud.network.service_providers())
names = [o.name for o in providers]
service_types = [o.service_type for o in providers]
self.assertIn('ha', names)
self.assertIn('L3_ROUTER_NAT', service_types)
if self.user_cloud._has_neutron_extension("l3-ha"):
self.assertIn("ha", names)
self.assertIn("L3_ROUTER_NAT", service_types)

View File

@ -31,36 +31,38 @@ class TestSubnet(base.BaseFunctionalTest):
self.NET_NAME = self.getUniqueString()
self.SUB_NAME = self.getUniqueString()
self.UPDATE_NAME = self.getUniqueString()
net = self.conn.network.create_network(name=self.NET_NAME)
net = self.user_cloud.network.create_network(name=self.NET_NAME)
assert isinstance(net, network.Network)
self.assertEqual(self.NET_NAME, net.name)
self.NET_ID = net.id
sub = self.conn.network.create_subnet(
sub = self.user_cloud.network.create_subnet(
name=self.SUB_NAME,
ip_version=self.IPV4,
network_id=self.NET_ID,
cidr=self.CIDR,
dns_nameservers=self.DNS_SERVERS,
allocation_pools=self.POOL,
host_routes=self.ROUTES)
host_routes=self.ROUTES,
)
assert isinstance(sub, subnet.Subnet)
self.assertEqual(self.SUB_NAME, sub.name)
self.SUB_ID = sub.id
def tearDown(self):
sot = self.conn.network.delete_subnet(self.SUB_ID)
sot = self.user_cloud.network.delete_subnet(self.SUB_ID)
self.assertIsNone(sot)
sot = self.conn.network.delete_network(
self.NET_ID, ignore_missing=False)
sot = self.user_cloud.network.delete_network(
self.NET_ID, ignore_missing=False
)
self.assertIsNone(sot)
super(TestSubnet, self).tearDown()
def test_find(self):
sot = self.conn.network.find_subnet(self.SUB_NAME)
sot = self.user_cloud.network.find_subnet(self.SUB_NAME)
self.assertEqual(self.SUB_ID, sot.id)
def test_get(self):
sot = self.conn.network.get_subnet(self.SUB_ID)
sot = self.user_cloud.network.get_subnet(self.SUB_ID)
self.assertEqual(self.SUB_NAME, sot.name)
self.assertEqual(self.SUB_ID, sot.id)
self.assertEqual(self.DNS_SERVERS, sot.dns_nameservers)
@ -72,22 +74,23 @@ class TestSubnet(base.BaseFunctionalTest):
self.assertTrue(sot.is_dhcp_enabled)
def test_list(self):
names = [o.name for o in self.conn.network.subnets()]
names = [o.name for o in self.user_cloud.network.subnets()]
self.assertIn(self.SUB_NAME, names)
def test_update(self):
sot = self.conn.network.update_subnet(self.SUB_ID,
name=self.UPDATE_NAME)
sot = self.user_cloud.network.update_subnet(
self.SUB_ID, name=self.UPDATE_NAME
)
self.assertEqual(self.UPDATE_NAME, sot.name)
def test_set_tags(self):
sot = self.conn.network.get_subnet(self.SUB_ID)
sot = self.user_cloud.network.get_subnet(self.SUB_ID)
self.assertEqual([], sot.tags)
self.conn.network.set_tags(sot, ['blue'])
sot = self.conn.network.get_subnet(self.SUB_ID)
self.assertEqual(['blue'], sot.tags)
self.user_cloud.network.set_tags(sot, ["blue"])
sot = self.user_cloud.network.get_subnet(self.SUB_ID)
self.assertEqual(["blue"], sot.tags)
self.conn.network.set_tags(sot, [])
sot = self.conn.network.get_subnet(self.SUB_ID)
self.user_cloud.network.set_tags(sot, [])
sot = self.user_cloud.network.get_subnet(self.SUB_ID)
self.assertEqual([], sot.tags)

View File

@ -26,7 +26,7 @@ class TestSubnetFromSubnetPool(base.BaseFunctionalTest):
MAXIMUM_PREFIX_LENGTH = 32
SUBNET_PREFIX_LENGTH = 28
IP_VERSION = 4
PREFIXES = ['10.100.0.0/24']
PREFIXES = ["10.100.0.0/24"]
NET_ID = None
SUB_ID = None
SUB_POOL_ID = None
@ -37,41 +37,44 @@ class TestSubnetFromSubnetPool(base.BaseFunctionalTest):
self.SUB_NAME = self.getUniqueString()
self.SUB_POOL_NAME = self.getUniqueString()
sub_pool = self.conn.network.create_subnet_pool(
sub_pool = self.user_cloud.network.create_subnet_pool(
name=self.SUB_POOL_NAME,
min_prefixlen=self.MINIMUM_PREFIX_LENGTH,
default_prefixlen=self.DEFAULT_PREFIX_LENGTH,
max_prefixlen=self.MAXIMUM_PREFIX_LENGTH,
prefixes=self.PREFIXES)
prefixes=self.PREFIXES,
)
self.assertIsInstance(sub_pool, subnet_pool.SubnetPool)
self.assertEqual(self.SUB_POOL_NAME, sub_pool.name)
self.SUB_POOL_ID = sub_pool.id
net = self.conn.network.create_network(name=self.NET_NAME)
net = self.user_cloud.network.create_network(name=self.NET_NAME)
self.assertIsInstance(net, network.Network)
self.assertEqual(self.NET_NAME, net.name)
self.NET_ID = net.id
sub = self.conn.network.create_subnet(
sub = self.user_cloud.network.create_subnet(
name=self.SUB_NAME,
ip_version=self.IPV4,
network_id=self.NET_ID,
prefixlen=self.SUBNET_PREFIX_LENGTH,
subnetpool_id=self.SUB_POOL_ID)
subnetpool_id=self.SUB_POOL_ID,
)
self.assertIsInstance(sub, subnet.Subnet)
self.assertEqual(self.SUB_NAME, sub.name)
self.SUB_ID = sub.id
def tearDown(self):
sot = self.conn.network.delete_subnet(self.SUB_ID)
sot = self.user_cloud.network.delete_subnet(self.SUB_ID)
self.assertIsNone(sot)
sot = self.conn.network.delete_network(
self.NET_ID, ignore_missing=False)
sot = self.user_cloud.network.delete_network(
self.NET_ID, ignore_missing=False
)
self.assertIsNone(sot)
sot = self.conn.network.delete_subnet_pool(self.SUB_POOL_ID)
sot = self.user_cloud.network.delete_subnet_pool(self.SUB_POOL_ID)
self.assertIsNone(sot)
super(TestSubnetFromSubnetPool, self).tearDown()
def test_get(self):
sot = self.conn.network.get_subnet(self.SUB_ID)
sot = self.user_cloud.network.get_subnet(self.SUB_ID)
self.assertEqual(self.SUB_NAME, sot.name)
self.assertEqual(self.SUB_ID, sot.id)
self.assertEqual(self.CIDR, sot.cidr)

View File

@ -24,65 +24,63 @@ class TestSubnetPool(base.BaseFunctionalTest):
DEFAULT_QUOTA = 24
IS_SHARED = False
IP_VERSION = 4
PREFIXES = ['10.100.0.0/24', '10.101.0.0/24']
PREFIXES = ["10.100.0.0/24", "10.101.0.0/24"]
def setUp(self):
super(TestSubnetPool, self).setUp()
self.SUBNET_POOL_NAME = self.getUniqueString()
self.SUBNET_POOL_NAME_UPDATED = self.getUniqueString()
subnet_pool = self.conn.network.create_subnet_pool(
subnet_pool = self.user_cloud.network.create_subnet_pool(
name=self.SUBNET_POOL_NAME,
min_prefixlen=self.MINIMUM_PREFIX_LENGTH,
default_prefixlen=self.DEFAULT_PREFIX_LENGTH,
max_prefixlen=self.MAXIMUM_PREFIX_LENGTH,
default_quota=self.DEFAULT_QUOTA,
shared=self.IS_SHARED,
prefixes=self.PREFIXES)
prefixes=self.PREFIXES,
)
assert isinstance(subnet_pool, _subnet_pool.SubnetPool)
self.assertEqual(self.SUBNET_POOL_NAME, subnet_pool.name)
self.SUBNET_POOL_ID = subnet_pool.id
def tearDown(self):
sot = self.conn.network.delete_subnet_pool(self.SUBNET_POOL_ID)
sot = self.user_cloud.network.delete_subnet_pool(self.SUBNET_POOL_ID)
self.assertIsNone(sot)
super(TestSubnetPool, self).tearDown()
def test_find(self):
sot = self.conn.network.find_subnet_pool(self.SUBNET_POOL_NAME)
sot = self.user_cloud.network.find_subnet_pool(self.SUBNET_POOL_NAME)
self.assertEqual(self.SUBNET_POOL_ID, sot.id)
def test_get(self):
sot = self.conn.network.get_subnet_pool(self.SUBNET_POOL_ID)
sot = self.user_cloud.network.get_subnet_pool(self.SUBNET_POOL_ID)
self.assertEqual(self.SUBNET_POOL_NAME, sot.name)
self.assertEqual(self.MINIMUM_PREFIX_LENGTH,
sot.minimum_prefix_length)
self.assertEqual(self.DEFAULT_PREFIX_LENGTH,
sot.default_prefix_length)
self.assertEqual(self.MAXIMUM_PREFIX_LENGTH,
sot.maximum_prefix_length)
self.assertEqual(self.MINIMUM_PREFIX_LENGTH, sot.minimum_prefix_length)
self.assertEqual(self.DEFAULT_PREFIX_LENGTH, sot.default_prefix_length)
self.assertEqual(self.MAXIMUM_PREFIX_LENGTH, sot.maximum_prefix_length)
self.assertEqual(self.DEFAULT_QUOTA, sot.default_quota)
self.assertEqual(self.IS_SHARED, sot.is_shared)
self.assertEqual(self.IP_VERSION, sot.ip_version)
self.assertEqual(self.PREFIXES, sot.prefixes)
def test_list(self):
names = [o.name for o in self.conn.network.subnet_pools()]
names = [o.name for o in self.user_cloud.network.subnet_pools()]
self.assertIn(self.SUBNET_POOL_NAME, names)
def test_update(self):
sot = self.conn.network.update_subnet_pool(
self.SUBNET_POOL_ID,
name=self.SUBNET_POOL_NAME_UPDATED)
sot = self.user_cloud.network.update_subnet_pool(
self.SUBNET_POOL_ID, name=self.SUBNET_POOL_NAME_UPDATED
)
self.assertEqual(self.SUBNET_POOL_NAME_UPDATED, sot.name)
def test_set_tags(self):
sot = self.conn.network.get_subnet_pool(self.SUBNET_POOL_ID)
sot = self.user_cloud.network.get_subnet_pool(self.SUBNET_POOL_ID)
self.assertEqual([], sot.tags)
self.conn.network.set_tags(sot, ['blue'])
sot = self.conn.network.get_subnet_pool(self.SUBNET_POOL_ID)
self.assertEqual(['blue'], sot.tags)
self.user_cloud.network.set_tags(sot, ["blue"])
sot = self.user_cloud.network.get_subnet_pool(self.SUBNET_POOL_ID)
self.assertEqual(["blue"], sot.tags)
self.conn.network.set_tags(sot, [])
sot = self.conn.network.get_subnet_pool(self.SUBNET_POOL_ID)
self.user_cloud.network.set_tags(sot, [])
sot = self.user_cloud.network.get_subnet_pool(self.SUBNET_POOL_ID)
self.assertEqual([], sot.tags)

View File

@ -25,67 +25,76 @@ class TestTrunk(base.BaseFunctionalTest):
super(TestTrunk, self).setUp()
# Skip the tests if trunk extension is not enabled.
if not self.conn.network.find_extension('trunk'):
self.skipTest('Network trunk extension disabled')
if not self.user_cloud.network.find_extension("trunk"):
self.skipTest("Network trunk extension disabled")
self.TRUNK_NAME = self.getUniqueString()
self.TRUNK_NAME_UPDATED = self.getUniqueString()
net = self.conn.network.create_network()
net = self.user_cloud.network.create_network()
assert isinstance(net, network.Network)
self.NET_ID = net.id
prt = self.conn.network.create_port(network_id=self.NET_ID)
prt = self.user_cloud.network.create_port(network_id=self.NET_ID)
assert isinstance(prt, port.Port)
self.PORT_ID = prt.id
self.ports_to_clean = [self.PORT_ID]
trunk = self.conn.network.create_trunk(
name=self.TRUNK_NAME,
port_id=self.PORT_ID)
trunk = self.user_cloud.network.create_trunk(
name=self.TRUNK_NAME, port_id=self.PORT_ID
)
assert isinstance(trunk, _trunk.Trunk)
self.TRUNK_ID = trunk.id
def tearDown(self):
self.conn.network.delete_trunk(self.TRUNK_ID, ignore_missing=False)
self.user_cloud.network.delete_trunk(
self.TRUNK_ID, ignore_missing=False
)
for port_id in self.ports_to_clean:
self.conn.network.delete_port(port_id, ignore_missing=False)
self.conn.network.delete_network(self.NET_ID, ignore_missing=False)
self.user_cloud.network.delete_port(port_id, ignore_missing=False)
self.user_cloud.network.delete_network(
self.NET_ID, ignore_missing=False
)
super(TestTrunk, self).tearDown()
def test_find(self):
sot = self.conn.network.find_trunk(self.TRUNK_NAME)
sot = self.user_cloud.network.find_trunk(self.TRUNK_NAME)
self.assertEqual(self.TRUNK_ID, sot.id)
def test_get(self):
sot = self.conn.network.get_trunk(self.TRUNK_ID)
sot = self.user_cloud.network.get_trunk(self.TRUNK_ID)
self.assertEqual(self.TRUNK_ID, sot.id)
self.assertEqual(self.TRUNK_NAME, sot.name)
def test_list(self):
ids = [o.id for o in self.conn.network.trunks()]
ids = [o.id for o in self.user_cloud.network.trunks()]
self.assertIn(self.TRUNK_ID, ids)
def test_update(self):
sot = self.conn.network.update_trunk(self.TRUNK_ID,
name=self.TRUNK_NAME_UPDATED)
sot = self.user_cloud.network.update_trunk(
self.TRUNK_ID, name=self.TRUNK_NAME_UPDATED
)
self.assertEqual(self.TRUNK_NAME_UPDATED, sot.name)
def test_subports(self):
port_for_subport = self.conn.network.create_port(
network_id=self.NET_ID)
port_for_subport = self.user_cloud.network.create_port(
network_id=self.NET_ID
)
self.ports_to_clean.append(port_for_subport.id)
subports = [{
'port_id': port_for_subport.id,
'segmentation_type': 'vlan',
'segmentation_id': 111
}]
subports = [
{
"port_id": port_for_subport.id,
"segmentation_type": "vlan",
"segmentation_id": 111,
}
]
sot = self.conn.network.get_trunk_subports(self.TRUNK_ID)
self.assertEqual({'sub_ports': []}, sot)
sot = self.user_cloud.network.get_trunk_subports(self.TRUNK_ID)
self.assertEqual({"sub_ports": []}, sot)
self.conn.network.add_trunk_subports(self.TRUNK_ID, subports)
sot = self.conn.network.get_trunk_subports(self.TRUNK_ID)
self.assertEqual({'sub_ports': subports}, sot)
self.user_cloud.network.add_trunk_subports(self.TRUNK_ID, subports)
sot = self.user_cloud.network.get_trunk_subports(self.TRUNK_ID)
self.assertEqual({"sub_ports": subports}, sot)
self.conn.network.delete_trunk_subports(
self.TRUNK_ID, [{'port_id': port_for_subport.id}])
sot = self.conn.network.get_trunk_subports(self.TRUNK_ID)
self.assertEqual({'sub_ports': []}, sot)
self.user_cloud.network.delete_trunk_subports(
self.TRUNK_ID, [{"port_id": port_for_subport.id}]
)
sot = self.user_cloud.network.get_trunk_subports(self.TRUNK_ID)
self.assertEqual({"sub_ports": []}, sot)

View File

@ -20,36 +20,43 @@ class TestVpnIkePolicy(base.BaseFunctionalTest):
def setUp(self):
super(TestVpnIkePolicy, self).setUp()
if not self.conn._has_neutron_extension('vpnaas_v2'):
self.skipTest('vpnaas_v2 service not supported by cloud')
self.IKE_POLICY_NAME = self.getUniqueString('ikepolicy')
self.UPDATE_NAME = self.getUniqueString('ikepolicy-updated')
policy = self.conn.network.create_vpn_ike_policy(
name=self.IKE_POLICY_NAME)
if not self.user_cloud._has_neutron_extension("vpnaas"):
self.skipTest("vpnaas service not supported by cloud")
self.IKEPOLICY_NAME = self.getUniqueString("ikepolicy")
self.UPDATE_NAME = self.getUniqueString("ikepolicy-updated")
policy = self.user_cloud.network.create_vpn_ike_policy(
name=self.IKEPOLICY_NAME
)
assert isinstance(policy, vpn_ike_policy.VpnIkePolicy)
self.assertEqual(self.IKE_POLICY_NAME, policy.name)
self.assertEqual(self.IKEPOLICY_NAME, policy.name)
self.ID = policy.id
def tearDown(self):
ike_policy = self.conn.network.\
delete_vpn_ike_policy(self.ID, ignore_missing=True)
self.assertIsNone(ike_policy)
ikepolicy = self.user_cloud.network.delete_vpn_ike_policy(
self.ID, ignore_missing=True
)
self.assertIsNone(ikepolicy)
super(TestVpnIkePolicy, self).tearDown()
def test_list(self):
policies = [f.name for f in self.conn.network.vpn_ikepolicies()]
self.assertIn(self.IKE_POLICY_NAME, policies)
policies = [
f.name for f in
self.user_cloud.network.vpn_ike_policies()]
self.assertIn(self.IKEPOLICY_NAME, policies)
def test_find(self):
policy = self.conn.network.find_vpn_ike_policy(self.IKE_POLICY_NAME)
policy = self.user_cloud.network.find_vpn_ike_policy(
self.IKEPOLICY_NAME
)
self.assertEqual(self.ID, policy.id)
def test_get(self):
policy = self.conn.network.get_vpn_ike_policy(self.ID)
self.assertEqual(self.IKE_POLICY_NAME, policy.name)
policy = self.user_cloud.network.get_vpn_ike_policy(self.ID)
self.assertEqual(self.IKEPOLICY_NAME, policy.name)
self.assertEqual(self.ID, policy.id)
def test_update(self):
policy = self.conn.network.update_vpn_ike_policy(
self.ID, name=self.UPDATE_NAME)
policy = self.user_cloud.network.update_vpn_ike_policy(
self.ID, name=self.UPDATE_NAME
)
self.assertEqual(self.UPDATE_NAME, policy.name)