NSX|V3+P: Validate allowed address pairs ipv6 cidr
Change-Id: Ib9085da9ff64c81d45d7e2a2c1a5542ab69bcaa9
This commit is contained in:
parent
2918ce6e52
commit
4c18521905
@ -13,6 +13,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import ipaddress
|
||||
from unittest import mock
|
||||
|
||||
import decorator
|
||||
@ -328,8 +329,19 @@ class NsxPluginV3Base(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
|
||||
|
||||
for pair in address_pairs:
|
||||
ip = pair.get('ip_address')
|
||||
# Validate ipv4 cidrs (No limitation on ipv6):
|
||||
if ':' not in ip:
|
||||
if ':' in ip:
|
||||
# Validate ipv6 cidrs:
|
||||
ip_split = ip.split('/')
|
||||
if len(ip_split) > 1 and ip_split[1] != '128':
|
||||
try:
|
||||
ipaddress.ip_network(ip)
|
||||
except ValueError:
|
||||
# This means the host bits are set
|
||||
err_msg = (_("Allowed address pairs Cidr %s cannot "
|
||||
"have host bits set") % ip)
|
||||
raise n_exc.InvalidInput(error_message=err_msg)
|
||||
else:
|
||||
# Validate ipv4 cidrs (No limitation on ipv6):
|
||||
if len(ip.split('/')) > 1 and ip.split('/')[1] != '32':
|
||||
LOG.error("Cidr %s is not supported in allowed address "
|
||||
"pairs", ip)
|
||||
|
@ -60,6 +60,7 @@ class TestAllowedAddressPairsNSXp(test_p_plugin.NsxPPluginTestCaseMixin,
|
||||
|
||||
def test_create_port_allowed_address_pairs_v6(self):
|
||||
with self.network() as net:
|
||||
# Single IPv6
|
||||
address_pairs = [{'ip_address': '1001::12'}]
|
||||
res = self._create_port(self.fmt, net['network']['id'],
|
||||
arg_list=(addr_apidef.ADDRESS_PAIRS,),
|
||||
@ -70,6 +71,25 @@ class TestAllowedAddressPairsNSXp(test_p_plugin.NsxPPluginTestCaseMixin,
|
||||
address_pairs)
|
||||
self._delete('ports', port['port']['id'])
|
||||
|
||||
# IPv6 cidr
|
||||
address_pairs = [{'ip_address': '1001::/64'}]
|
||||
res = self._create_port(self.fmt, net['network']['id'],
|
||||
arg_list=(addr_apidef.ADDRESS_PAIRS,),
|
||||
allowed_address_pairs=address_pairs)
|
||||
port = self.deserialize(self.fmt, res)
|
||||
address_pairs[0]['mac_address'] = port['port']['mac_address']
|
||||
self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS],
|
||||
address_pairs)
|
||||
self._delete('ports', port['port']['id'])
|
||||
|
||||
# Illegal IPv6 cidr
|
||||
address_pairs = [{'ip_address': '1001::12/64'}]
|
||||
res = self._create_port(self.fmt, net['network']['id'],
|
||||
arg_list=(addr_apidef.ADDRESS_PAIRS,),
|
||||
allowed_address_pairs=address_pairs)
|
||||
port = self.deserialize(self.fmt, res)
|
||||
self.assertIn('NeutronError', port)
|
||||
|
||||
def test_update_add_bad_address_pairs_with_cidr(self):
|
||||
with self.network() as net:
|
||||
res = self._create_port(self.fmt, net['network']['id'])
|
||||
@ -116,6 +136,7 @@ class TestAllowedAddressPairsNSXv3(test_v3_plugin.NsxV3PluginTestCaseMixin,
|
||||
|
||||
def test_create_port_allowed_address_pairs_v6(self):
|
||||
with self.network() as net:
|
||||
# Single IPv6 address
|
||||
address_pairs = [{'ip_address': '1001::12'}]
|
||||
res = self._create_port(self.fmt, net['network']['id'],
|
||||
arg_list=(addr_apidef.ADDRESS_PAIRS,),
|
||||
@ -126,6 +147,25 @@ class TestAllowedAddressPairsNSXv3(test_v3_plugin.NsxV3PluginTestCaseMixin,
|
||||
address_pairs)
|
||||
self._delete('ports', port['port']['id'])
|
||||
|
||||
# IPv6 cidr
|
||||
address_pairs = [{'ip_address': '1001::/64'}]
|
||||
res = self._create_port(self.fmt, net['network']['id'],
|
||||
arg_list=(addr_apidef.ADDRESS_PAIRS,),
|
||||
allowed_address_pairs=address_pairs)
|
||||
port = self.deserialize(self.fmt, res)
|
||||
address_pairs[0]['mac_address'] = port['port']['mac_address']
|
||||
self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS],
|
||||
address_pairs)
|
||||
self._delete('ports', port['port']['id'])
|
||||
|
||||
# Illegal IPv6 cidr
|
||||
address_pairs = [{'ip_address': '1001::12/64'}]
|
||||
res = self._create_port(self.fmt, net['network']['id'],
|
||||
arg_list=(addr_apidef.ADDRESS_PAIRS,),
|
||||
allowed_address_pairs=address_pairs)
|
||||
port = self.deserialize(self.fmt, res)
|
||||
self.assertIn('NeutronError', port)
|
||||
|
||||
def test_update_add_bad_address_pairs_with_cidr(self):
|
||||
with self.network() as net:
|
||||
res = self._create_port(self.fmt, net['network']['id'])
|
||||
|
Loading…
x
Reference in New Issue
Block a user