Interface allows ips only in the same network

This commit is contained in:
Rajaram Mallya 2011-11-18 16:04:04 +05:30
parent 394e9fd6e6
commit 643e2c30a0
5 changed files with 131 additions and 52 deletions

View File

@ -780,8 +780,19 @@ class Interface(ModelBase):
return data
def allow_ip(self, ip):
if self._ip_cannot_be_allowed(ip):
err_msg = _("Ip %s cannot be allowed on interface %s as "
"interface is not configured "
"for ip's network") % (ip.address,
self.virtual_interface_id)
raise IpNotAllowedOnInterfaceError(err_msg)
db_api.save_allowed_ip(self.id, ip.id)
def _ip_cannot_be_allowed(self, ip):
return (self.plugged_in_network_id() is None
or self.plugged_in_network_id() != ip.ip_block.network_id)
def disallow_ip(self, ip):
db_api.remove_allowed_ip(interface_id=self.id, ip_address_id=ip.id)
@ -1027,9 +1038,9 @@ class IpAllocationNotAllowedError(exception.MelangeError):
message = _("Ip Block can not allocate address")
class InvalidTenantError(exception.MelangeError):
class IpNotAllowedOnInterfaceError(exception.MelangeError):
message = _("Cannot access other tenant's block")
message = _("Ip cannot be allowed on interface")
class InvalidModelError(exception.MelangeError):

View File

@ -34,6 +34,10 @@ class BaseController(wsgi.Controller):
exception.NoMoreAddressesError,
models.AddressDoesNotBelongError,
models.AddressLockedError,
models.IpAllocationNotAllowedError,
models.IpNotAllowedOnInterfaceError,
models.NoMoreMacAddressesError,
models.AddressDisallowedByPolicyError,
],
webob.exc.HTTPBadRequest: [
models.InvalidModelError,

View File

@ -29,10 +29,12 @@ class IpBlockFactory(factory.Factory):
dns1 = "8.8.8.8"
dns2 = "8.8.4.4"
tenant_id = "tenant_id"
network_id = "network1234"
class PublicIpBlockFactory(IpBlockFactory):
type = "public"
network_id = "public_network_id"
class PrivateIpBlockFactory(IpBlockFactory):

View File

@ -363,6 +363,7 @@ class TestIpBlock(tests.BaseTest):
def test_subnet_creates_child_block_with_the_given_params(self):
ip_block = factory_models.PrivateIpBlockFactory(cidr="10.0.0.0/28",
network_id="1",
tenant_id="2")
subnet = ip_block.subnet("10.0.0.0/29",
@ -428,9 +429,9 @@ class TestIpBlock(tests.BaseTest):
gateway=None)
self.assertEqual(block.gateway, "10.0.0.1")
block = factory_models.IpBlockFactory(cidr="10.0.0.0/24",
gateway="10.0.0.10")
self.assertEqual(block.gateway, "10.0.0.10")
block = factory_models.IpBlockFactory(cidr="20.0.0.0/24",
gateway="20.0.0.10")
self.assertEqual(block.gateway, "20.0.0.10")
def test_gateway_ip_is_not_auto_set_if_ip_block_has_only_one_ip(self):
ipv4_block = factory_models.IpBlockFactory(cidr="10.0.0.0/32",
@ -460,7 +461,7 @@ class TestIpBlock(tests.BaseTest):
def test_find_ip_block(self):
block1 = factory_models.PrivateIpBlockFactory(cidr="10.0.0.1/8")
factory_models.PrivateIpBlockFactory(cidr="10.1.1.1/8")
factory_models.PrivateIpBlockFactory(cidr="30.1.1.1/8")
found_block = models.IpBlock.find(block1.id)
@ -932,7 +933,7 @@ class TestIpBlock(tests.BaseTest):
def test_delete_all_deallocated_ips_after_default_of_two_days(self):
ip_block1 = factory_models.PrivateIpBlockFactory(cidr="10.0.1.1/24")
ip_block2 = factory_models.PrivateIpBlockFactory(cidr="10.0.1.1/24")
ip_block2 = factory_models.PrivateIpBlockFactory(cidr="20.0.1.1/24")
current_time = datetime.datetime(2050, 1, 1)
two_days_before = current_time - datetime.timedelta(days=2)
ip1 = _allocate_ip(ip_block1)
@ -1028,7 +1029,7 @@ class TestIpAddress(tests.BaseTest):
def test_address_for_a_ip_block_is_unique(self):
block1 = factory_models.PrivateIpBlockFactory(cidr="10.1.1.1/24")
block2 = factory_models.PrivateIpBlockFactory(cidr="10.1.1.1/24")
block2 = factory_models.PrivateIpBlockFactory(cidr="20.1.1.1/24")
block1_ip = factory_models.IpAddressFactory(address="10.1.1.3",
ip_block_id=block1.id)
interface = factory_models.InterfaceFactory()
@ -2190,10 +2191,23 @@ class TestInterface(tests.BaseTest):
class TestAllowedIp(tests.BaseTest):
def _ip_on_network(self, network_id):
block = factory_models.IpBlockFactory(
network_id=network_id)
return block.allocate_ip(factory_models.InterfaceFactory())
def _plug_interface_into_network(self, network_id, interface):
factory_models.IpBlockFactory(
network_id=network_id).allocate_ip(interface)
def test_allow_ip_on_an_interface(self):
interface = factory_models.InterfaceFactory()
ip1 = factory_models.IpAddressFactory()
ip2 = factory_models.IpAddressFactory()
ip_on_interface = factory_models.IpBlockFactory(
network_id="x").allocate_ip(interface)
ip1 = self._ip_on_network("x")
ip2 = self._ip_on_network("x")
noise_ip1 = factory_models.IpAddressFactory()
noise_ip2 = factory_models.IpAddressFactory()
@ -2201,27 +2215,35 @@ class TestAllowedIp(tests.BaseTest):
interface.allow_ip(ip2)
actual_allowed_ips = interface.ips_allowed()
self.assertModelsEqual(actual_allowed_ips, [ip1, ip2])
self.assertModelsEqual(actual_allowed_ips, [ip1, ip2, ip_on_interface])
def test_disallow_an_ip_on_an_interface(self):
interface1 = factory_models.InterfaceFactory()
ip_on_interface1 = factory_models.IpBlockFactory(
network_id="A").allocate_ip(interface1)
interface2 = factory_models.InterfaceFactory()
ip1 = factory_models.IpAddressFactory()
ip2 = factory_models.IpAddressFactory()
ip3 = factory_models.IpAddressFactory()
noise_ip1 = factory_models.IpAddressFactory()
noise_ip2 = factory_models.IpAddressFactory()
ip_on_interface2 = factory_models.IpBlockFactory(
network_id="A").allocate_ip(interface2)
ip1 = self._ip_on_network("A")
ip2 = self._ip_on_network("A")
ip3 = self._ip_on_network("A")
noise_ip1 = self._ip_on_network("A")
noise_ip2 = self._ip_on_network("A")
interface1.allow_ip(ip1)
interface1.allow_ip(ip2)
interface1.allow_ip(ip3)
interface2.allow_ip(ip3)
interface1.disallow_ip(ip2)
self.assertModelsEqual(interface1.ips_allowed(), [ip1, ip3])
self.assertModelsEqual(interface1.ips_allowed(),
[ip1, ip3, ip_on_interface1])
interface1.disallow_ip(ip3)
self.assertModelsEqual(interface1.ips_allowed(), [ip1])
self.assertModelsEqual(interface2.ips_allowed(), [ip3])
self.assertModelsEqual(interface1.ips_allowed(),
[ip1, ip_on_interface1])
self.assertModelsEqual(interface2.ips_allowed(),
[ip3, ip_on_interface2])
def test_allocating_ips_allows_the_ip_on_the_interface(self):
interface = factory_models.InterfaceFactory()
@ -2233,21 +2255,24 @@ class TestAllowedIp(tests.BaseTest):
def test_deallocating_ip_disallows_that_ip_on_interface(self):
interface = factory_models.InterfaceFactory()
block = factory_models.IpBlockFactory()
block = factory_models.IpBlockFactory(network_id="xyz")
ip = block.allocate_ip(interface=interface)
other_interface = factory_models.InterfaceFactory()
block.allocate_ip(interface=other_interface)
other_interface.allow_ip(ip)
block.deallocate_ip(ip.address)
self.assertEqual(interface.ips_allowed(), [])
self.assertEqual(other_interface.ips_allowed(), [ip])
self.assertEqual(other_interface.find_allowed_ip(ip.address), ip)
def test_deallocating_allowed_ip_only_disassociates_from_interface(self):
interface = factory_models.InterfaceFactory()
block = factory_models.IpBlockFactory()
block = factory_models.IpBlockFactory(network_id="net123")
ip = block.allocate_ip(interface=interface)
other_interface = factory_models.InterfaceFactory()
self._plug_interface_into_network("net123", other_interface)
other_interface.allow_ip(ip)
current_time = datetime.datetime.now()
two_days_before = current_time - datetime.timedelta(days=2)
@ -2266,7 +2291,6 @@ class TestAllowedIp(tests.BaseTest):
interface = factory_models.InterfaceFactory()
block = factory_models.IpBlockFactory()
ip = block.allocate_ip(interface=interface)
interface.allow_ip(ip)
self.assertEqual(interface.ips_allowed(), [ip])
@ -2283,8 +2307,9 @@ class TestAllowedIp(tests.BaseTest):
def test_find_allowed_ip(self):
interface = factory_models.InterfaceFactory()
ip1 = factory_models.IpAddressFactory()
ip2 = factory_models.IpAddressFactory()
self._plug_interface_into_network("xyz", interface)
ip1 = self._ip_on_network("xyz")
ip2 = self._ip_on_network("xyz")
interface.allow_ip(ip1)
interface.allow_ip(ip2)
@ -2294,8 +2319,9 @@ class TestAllowedIp(tests.BaseTest):
def test_find_allowed_ip_raises_model_not_found(self):
interface = factory_models.InterfaceFactory(
virtual_interface_id="vif_1")
ip1 = factory_models.IpAddressFactory()
ip2 = factory_models.IpAddressFactory()
self._plug_interface_into_network("AAA", interface)
ip1 = self._ip_on_network("AAA")
ip2 = self._ip_on_network("AAA")
unshared_ip = factory_models.IpAddressFactory()
interface.allow_ip(ip1)
interface.allow_ip(ip2)
@ -2306,6 +2332,35 @@ class TestAllowedIp(tests.BaseTest):
interface.find_allowed_ip,
unshared_ip.address)
def test_cannot_allow_ip_when_interface_is_pluged_into_other_network(self):
interface_plugged_into_net1 = factory_models.InterfaceFactory(
virtual_interface_id="viffy")
net1_block = factory_models.IpBlockFactory(network_id="1")
net1_ip = net1_block.allocate_ip(interface_plugged_into_net1)
net2_block = factory_models.IpBlockFactory(network_id="2")
net2_ip = net2_block.allocate_ip(factory_models.InterfaceFactory())
err_msg = ("Ip %s cannot be allowed on interface viffy "
"as interface is not configured "
"for ip's network") % net2_ip.address
self.assertRaisesExcMessage(models.IpNotAllowedOnInterfaceError,
err_msg,
interface_plugged_into_net1.allow_ip,
net2_ip)
def test_cannot_allow_ip_if_interface_isnt_plugged_into_any_network(self):
unplugged_interface = factory_models.InterfaceFactory(
virtual_interface_id="vif_id")
ip = factory_models.IpAddressFactory()
err_msg = ("Ip %s cannot be allowed on interface vif_id "
"as interface is not configured "
"for ip's network") % ip.address
self.assertRaisesExcMessage(models.IpNotAllowedOnInterfaceError,
err_msg,
unplugged_interface.allow_ip,
ip)
def _allocate_ip(block, interface=None, **kwargs):
interface = interface or factory_models.InterfaceFactory()

View File

@ -306,11 +306,12 @@ class TestIpBlockController(BaseTestController):
self.assertEqual(response.status, "404 Not Found")
def test_index_scoped_by_tenant(self):
ip_block1 = factory_models.PrivateIpBlockFactory(cidr="10.0.0.1/8",
ip_block1 = factory_models.PrivateIpBlockFactory(cidr="10.0.0.1/24",
tenant_id='999')
ip_block2 = factory_models.PrivateIpBlockFactory(cidr="10.0.0.2/8",
ip_block2 = factory_models.PrivateIpBlockFactory(cidr="20.0.0.2/24",
tenant_id='999')
factory_models.PrivateIpBlockFactory(cidr="10.1.1.1/2",
factory_models.PrivateIpBlockFactory(cidr="30.1.1.1/2",
network_id="blah",
tenant_id='987')
response = self.app.get("/ipam/tenants/999/ip_blocks")
@ -352,6 +353,7 @@ class TestSubnetController(BaseTestController):
def test_create(self):
parent = factory_models.IpBlockFactory(cidr="10.0.0.0/28",
network_id="2",
tenant_id="123")
response = self.app.post_json(self._subnets_path(parent),
@ -985,8 +987,8 @@ class TestInsideGlobalsController(BaseTestController):
local_block = factory_models.PrivateIpBlockFactory(cidr="10.1.1.1/8")
global_block = factory_models.PublicIpBlockFactory(cidr="192.1.1.1/8")
[local_ip], global_ips = _allocate_ips((local_block, 1),
(global_block, 5))
[[local_ip]] = _allocate_ips((local_block, 1))
[global_ips] = _allocate_ips((global_block, 5))
local_ip.add_inside_globals(global_ips)
response = self.app.get("{0}?limit=2&marker={1}".
@ -1170,8 +1172,8 @@ class TestInsideLocalsController(BaseTestController):
local_block = factory_models.PrivateIpBlockFactory(cidr="10.1.1.1/24")
global_block = factory_models.PublicIpBlockFactory(cidr="77.1.1.1/24")
[global_ip], local_ips = _allocate_ips((global_block, 1),
(local_block, 5))
[[global_ip]] = _allocate_ips((global_block, 1))
[local_ips] = _allocate_ips((local_block, 5))
global_ip.add_inside_locals(local_ips)
response = self.app.get(self._nat_path(global_block,
@ -1183,8 +1185,8 @@ class TestInsideLocalsController(BaseTestController):
local_block = factory_models.PrivateIpBlockFactory(cidr="10.1.1.1/24")
global_block = factory_models.PublicIpBlockFactory(cidr="77.1.1.1/24")
[global_ip], local_ips = _allocate_ips((global_block, 1),
(local_block, 5))
[[global_ip]] = _allocate_ips((global_block, 1))
[local_ips] = _allocate_ips((local_block, 5))
global_ip.add_inside_locals(local_ips)
response = self.app.get("{0}?limit=2&marker={1}".
@ -2235,22 +2237,22 @@ class TestInterfaceAllowedIpsController(BaseTestController):
def test_index(self):
interface = factory_models.InterfaceFactory(
tenant_id="tnt_id", virtual_interface_id="vif_id")
noise_interface = factory_models.InterfaceFactory()
ip1 = factory_models.IpAddressFactory()
ip2 = factory_models.IpAddressFactory()
ip3 = factory_models.IpAddressFactory()
ip4 = factory_models.IpAddressFactory()
ip_factory = factory_models.IpAddressFactory
block_factory = factory_models.IpBlockFactory
ip_on_interface = block_factory(network_id="1").allocate_ip(interface)
ip1 = ip_factory(ip_block_id=block_factory(network_id="1").id)
ip2 = ip_factory(ip_block_id=block_factory(network_id="1").id)
ip3 = ip_factory(ip_block_id=block_factory(network_id="1").id)
ip4 = ip_factory(ip_block_id=block_factory(network_id="1").id)
interface.allow_ip(ip1)
interface.allow_ip(ip2)
interface.allow_ip(ip3)
noise_interface.allow_ip(ip3)
noise_interface.allow_ip(ip4)
response = self.app.get(
"/ipam/tenants/tnt_id/interfaces/vif_id/allowed_ips")
self.assertItemsEqual(response.json['ip_addresses'],
_data([ip1, ip2, ip3]))
_data([ip1, ip2, ip3, ip_on_interface]))
def test_index_returns_404_when_interface_doesnt_exist(self):
noise_interface = factory_models.InterfaceFactory(
@ -2277,6 +2279,9 @@ class TestInterfaceAllowedIpsController(BaseTestController):
def test_create(self):
interface = factory_models.InterfaceFactory(
tenant_id="tnt_id", virtual_interface_id="vif_id")
block = factory_models.IpBlockFactory(network_id="net123")
ip_on_interface = block.allocate_ip(interface)
block = factory_models.IpBlockFactory(network_id="net123")
ip = block.allocate_ip(factory_models.InterfaceFactory(
tenant_id="tnt_id"))
@ -2330,12 +2335,13 @@ class TestInterfaceAllowedIpsController(BaseTestController):
interface = factory_models.InterfaceFactory(
tenant_id="tnt_id", virtual_interface_id="vif_id")
block = factory_models.IpBlockFactory(network_id="net123")
ip = block.allocate_ip(factory_models.InterfaceFactory())
interface.allow_ip(ip)
ip_on_interface = block.allocate_ip(interface)
allowed_ip = block.allocate_ip(factory_models.InterfaceFactory())
interface.allow_ip(allowed_ip)
self.app.delete("/ipam/tenants/tnt_id/interfaces/vif_id/allowed_ips/%s"
% ip.address)
self.assertEqual(interface.ips_allowed(), [])
% allowed_ip.address)
self.assertEqual(interface.ips_allowed(), [ip_on_interface])
def test_delete_fails_for_non_existent_interface(self):
noise_interface = factory_models.InterfaceFactory(
@ -2366,14 +2372,15 @@ class TestInterfaceAllowedIpsController(BaseTestController):
interface = factory_models.InterfaceFactory(
tenant_id="tnt_id", virtual_interface_id="vif_id")
block = factory_models.IpBlockFactory(network_id="net123")
ip = block.allocate_ip(factory_models.InterfaceFactory())
interface.allow_ip(ip)
ip_on_interface = block.allocate_ip(interface)
allowed_ip = block.allocate_ip(factory_models.InterfaceFactory())
interface.allow_ip(allowed_ip)
response = self.app.get("/ipam/tenants/tnt_id/interfaces/vif_id/"
"allowed_ips/%s" % ip.address)
"allowed_ips/%s" % allowed_ip.address)
self.assertEqual(response.status_int, 200)
self.assertEqual(response.json['ip_address'], _data(ip))
self.assertEqual(response.json['ip_address'], _data(allowed_ip))
def test_show_raises_404_when_allowed_address_doesnt_exist(self):
factory_models.InterfaceFactory(