
FakeServer is a Fake of a novaclient object. Use actual json instead. Speaking of - transform the dict through json dump/load so that it's unicode where it's supposed to be. This makes diffs easier to make. Change-Id: I45c1136078f604c73c1fea54a9eb52bbbc8c69b5
764 lines
29 KiB
Python
764 lines
29 KiB
Python
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
|
|
import copy
|
|
|
|
import shade
|
|
from shade import meta
|
|
from shade.tests.unit import base
|
|
from shade.tests import fakes
|
|
|
|
# TODO(mordred): Make a fakes.make_fake_nova_security_group and a
|
|
# fakes.make_fake_nova_security_group and remove all uses of
|
|
# meta.obj_to_munch here. Also, we have hardcoded id names -
|
|
# move those to using a getUniqueString() value.
|
|
|
|
neutron_grp_obj = fakes.FakeSecgroup(
|
|
id='1',
|
|
name='neutron-sec-group',
|
|
description='Test Neutron security group',
|
|
rules=[
|
|
dict(id='1', port_range_min=80, port_range_max=81,
|
|
protocol='tcp', remote_ip_prefix='0.0.0.0/0')
|
|
]
|
|
)
|
|
|
|
|
|
nova_grp_obj = fakes.FakeSecgroup(
|
|
id='2',
|
|
name='nova-sec-group',
|
|
description='Test Nova security group #1',
|
|
rules=[
|
|
dict(id='2', from_port=8000, to_port=8001, ip_protocol='tcp',
|
|
ip_range=dict(cidr='0.0.0.0/0'), parent_group_id=None)
|
|
]
|
|
)
|
|
|
|
|
|
# Neutron returns dicts instead of objects, so the dict versions should
|
|
# be used as expected return values from neutron API methods.
|
|
neutron_grp_dict = meta.obj_to_munch(neutron_grp_obj)
|
|
nova_grp_dict = meta.obj_to_munch(nova_grp_obj)
|
|
|
|
|
|
class TestSecurityGroups(base.RequestsMockTestCase):
|
|
|
|
def setUp(self):
|
|
super(TestSecurityGroups, self).setUp()
|
|
self.has_neutron = True
|
|
|
|
def fake_has_service(*args, **kwargs):
|
|
return self.has_neutron
|
|
self.cloud.has_service = fake_has_service
|
|
|
|
def test_list_security_groups_neutron(self):
|
|
project_id = 42
|
|
self.cloud.secgroup_source = 'neutron'
|
|
self.register_uris([
|
|
dict(method='GET',
|
|
uri=self.get_mock_url(
|
|
'network', 'public',
|
|
append=['v2.0', 'security-groups.json'],
|
|
qs_elements=["project_id=%s" % project_id]),
|
|
json={'security_groups': [neutron_grp_dict]})
|
|
])
|
|
self.cloud.list_security_groups(filters={'project_id': project_id})
|
|
self.assert_calls()
|
|
|
|
def test_list_security_groups_nova(self):
|
|
self.register_uris([
|
|
dict(method='GET',
|
|
uri='{endpoint}/os-security-groups?project_id=42'.format(
|
|
endpoint=fakes.COMPUTE_ENDPOINT),
|
|
json={'security_groups': []}),
|
|
])
|
|
self.cloud.secgroup_source = 'nova'
|
|
self.has_neutron = False
|
|
self.cloud.list_security_groups(filters={'project_id': 42})
|
|
|
|
self.assert_calls()
|
|
|
|
def test_list_security_groups_none(self):
|
|
|
|
self.cloud.secgroup_source = None
|
|
self.has_neutron = False
|
|
self.assertRaises(shade.OpenStackCloudUnavailableFeature,
|
|
self.cloud.list_security_groups)
|
|
|
|
def test_delete_security_group_neutron(self):
|
|
sg_id = neutron_grp_dict['id']
|
|
self.cloud.secgroup_source = 'neutron'
|
|
self.register_uris([
|
|
dict(method='GET',
|
|
uri=self.get_mock_url(
|
|
'network', 'public',
|
|
append=['v2.0', 'security-groups.json']),
|
|
json={'security_groups': [neutron_grp_dict]}),
|
|
dict(method='DELETE',
|
|
uri=self.get_mock_url(
|
|
'network', 'public',
|
|
append=['v2.0', 'security-groups', '%s.json' % sg_id]),
|
|
json={})
|
|
])
|
|
self.assertTrue(self.cloud.delete_security_group('1'))
|
|
self.assert_calls()
|
|
|
|
def test_delete_security_group_nova(self):
|
|
self.cloud.secgroup_source = 'nova'
|
|
self.has_neutron = False
|
|
nova_return = [nova_grp_dict]
|
|
self.register_uris([
|
|
dict(method='GET',
|
|
uri='{endpoint}/os-security-groups'.format(
|
|
endpoint=fakes.COMPUTE_ENDPOINT),
|
|
json={'security_groups': nova_return}),
|
|
dict(method='DELETE',
|
|
uri='{endpoint}/os-security-groups/2'.format(
|
|
endpoint=fakes.COMPUTE_ENDPOINT)),
|
|
])
|
|
self.cloud.delete_security_group('2')
|
|
self.assert_calls()
|
|
|
|
def test_delete_security_group_neutron_not_found(self):
|
|
self.cloud.secgroup_source = 'neutron'
|
|
self.register_uris([
|
|
dict(method='GET',
|
|
uri=self.get_mock_url(
|
|
'network', 'public',
|
|
append=['v2.0', 'security-groups.json']),
|
|
json={'security_groups': [neutron_grp_dict]})
|
|
])
|
|
self.assertFalse(self.cloud.delete_security_group('10'))
|
|
self.assert_calls()
|
|
|
|
def test_delete_security_group_nova_not_found(self):
|
|
self.cloud.secgroup_source = 'nova'
|
|
self.has_neutron = False
|
|
nova_return = [nova_grp_dict]
|
|
self.register_uris([
|
|
dict(method='GET',
|
|
uri='{endpoint}/os-security-groups'.format(
|
|
endpoint=fakes.COMPUTE_ENDPOINT),
|
|
json={'security_groups': nova_return}),
|
|
])
|
|
self.assertFalse(self.cloud.delete_security_group('doesNotExist'))
|
|
|
|
def test_delete_security_group_none(self):
|
|
self.cloud.secgroup_source = None
|
|
self.assertRaises(shade.OpenStackCloudUnavailableFeature,
|
|
self.cloud.delete_security_group,
|
|
'doesNotExist')
|
|
|
|
def test_create_security_group_neutron(self):
|
|
self.cloud.secgroup_source = 'neutron'
|
|
group_name = self.getUniqueString()
|
|
group_desc = 'security group from test_create_security_group_neutron'
|
|
new_group = meta.obj_to_munch(
|
|
fakes.FakeSecgroup(
|
|
id='2',
|
|
name=group_name,
|
|
description=group_desc,
|
|
rules=[]))
|
|
self.register_uris([
|
|
dict(method='POST',
|
|
uri=self.get_mock_url(
|
|
'network', 'public',
|
|
append=['v2.0', 'security-groups.json']),
|
|
json={'security_group': new_group},
|
|
validate=dict(
|
|
json={'security_group': {
|
|
'name': group_name,
|
|
'description': group_desc
|
|
}}))
|
|
])
|
|
|
|
r = self.cloud.create_security_group(group_name, group_desc)
|
|
self.assertEqual(group_name, r['name'])
|
|
self.assertEqual(group_desc, r['description'])
|
|
|
|
self.assert_calls()
|
|
|
|
def test_create_security_group_neutron_specific_tenant(self):
|
|
self.cloud.secgroup_source = 'neutron'
|
|
project_id = "861808a93da0484ea1767967c4df8a23"
|
|
group_name = self.getUniqueString()
|
|
group_desc = 'security group from' \
|
|
' test_create_security_group_neutron_specific_tenant'
|
|
new_group = meta.obj_to_munch(
|
|
fakes.FakeSecgroup(
|
|
id='2',
|
|
name=group_name,
|
|
description=group_desc,
|
|
project_id=project_id,
|
|
rules=[]))
|
|
self.register_uris([
|
|
dict(method='POST',
|
|
uri=self.get_mock_url(
|
|
'network', 'public',
|
|
append=['v2.0', 'security-groups.json']),
|
|
json={'security_group': new_group},
|
|
validate=dict(
|
|
json={'security_group': {
|
|
'name': group_name,
|
|
'description': group_desc,
|
|
'tenant_id': project_id
|
|
}}))
|
|
])
|
|
|
|
r = self.cloud.create_security_group(
|
|
group_name,
|
|
group_desc,
|
|
project_id
|
|
)
|
|
self.assertEqual(group_name, r['name'])
|
|
self.assertEqual(group_desc, r['description'])
|
|
self.assertEqual(project_id, r['tenant_id'])
|
|
|
|
self.assert_calls()
|
|
|
|
def test_create_security_group_nova(self):
|
|
group_name = self.getUniqueString()
|
|
self.has_neutron = False
|
|
group_desc = 'security group from test_create_security_group_neutron'
|
|
new_group = meta.obj_to_munch(
|
|
fakes.FakeSecgroup(
|
|
id='2',
|
|
name=group_name,
|
|
description=group_desc,
|
|
rules=[]))
|
|
self.register_uris([
|
|
dict(method='POST',
|
|
uri='{endpoint}/os-security-groups'.format(
|
|
endpoint=fakes.COMPUTE_ENDPOINT),
|
|
json={'security_group': new_group},
|
|
validate=dict(json={
|
|
'security_group': {
|
|
'name': group_name,
|
|
'description': group_desc
|
|
}})),
|
|
])
|
|
|
|
self.cloud.secgroup_source = 'nova'
|
|
r = self.cloud.create_security_group(group_name, group_desc)
|
|
self.assertEqual(group_name, r['name'])
|
|
self.assertEqual(group_desc, r['description'])
|
|
|
|
self.assert_calls()
|
|
|
|
def test_create_security_group_none(self):
|
|
self.cloud.secgroup_source = None
|
|
self.has_neutron = False
|
|
self.assertRaises(shade.OpenStackCloudUnavailableFeature,
|
|
self.cloud.create_security_group,
|
|
'', '')
|
|
|
|
def test_update_security_group_neutron(self):
|
|
self.cloud.secgroup_source = 'neutron'
|
|
new_name = self.getUniqueString()
|
|
sg_id = neutron_grp_obj.id
|
|
update_return = meta.obj_to_munch(neutron_grp_obj)
|
|
update_return['name'] = new_name
|
|
self.register_uris([
|
|
dict(method='GET',
|
|
uri=self.get_mock_url(
|
|
'network', 'public',
|
|
append=['v2.0', 'security-groups.json']),
|
|
json={'security_groups': [neutron_grp_dict]}),
|
|
dict(method='PUT',
|
|
uri=self.get_mock_url(
|
|
'network', 'public',
|
|
append=['v2.0', 'security-groups', '%s.json' % sg_id]),
|
|
json={'security_group': update_return},
|
|
validate=dict(json={
|
|
'security_group': {'name': new_name}}))
|
|
])
|
|
r = self.cloud.update_security_group(neutron_grp_obj.id, name=new_name)
|
|
self.assertEqual(r['name'], new_name)
|
|
self.assert_calls()
|
|
|
|
def test_update_security_group_nova(self):
|
|
self.has_neutron = False
|
|
new_name = self.getUniqueString()
|
|
self.cloud.secgroup_source = 'nova'
|
|
nova_return = [nova_grp_dict]
|
|
update_return = meta.obj_to_munch(nova_grp_obj)
|
|
update_return['name'] = new_name
|
|
|
|
self.register_uris([
|
|
dict(method='GET',
|
|
uri='{endpoint}/os-security-groups'.format(
|
|
endpoint=fakes.COMPUTE_ENDPOINT),
|
|
json={'security_groups': nova_return}),
|
|
dict(method='PUT',
|
|
uri='{endpoint}/os-security-groups/2'.format(
|
|
endpoint=fakes.COMPUTE_ENDPOINT),
|
|
json={'security_group': update_return}),
|
|
])
|
|
|
|
r = self.cloud.update_security_group(nova_grp_obj.id, name=new_name)
|
|
self.assertEqual(r['name'], new_name)
|
|
self.assert_calls()
|
|
|
|
def test_update_security_group_bad_kwarg(self):
|
|
self.assertRaises(TypeError,
|
|
self.cloud.update_security_group,
|
|
'doesNotExist', bad_arg='')
|
|
|
|
def test_create_security_group_rule_neutron(self):
|
|
self.cloud.secgroup_source = 'neutron'
|
|
args = dict(
|
|
port_range_min=-1,
|
|
port_range_max=40000,
|
|
protocol='tcp',
|
|
remote_ip_prefix='0.0.0.0/0',
|
|
remote_group_id='456',
|
|
direction='egress',
|
|
ethertype='IPv6'
|
|
)
|
|
expected_args = copy.copy(args)
|
|
# For neutron, -1 port should be converted to None
|
|
expected_args['port_range_min'] = None
|
|
expected_args['security_group_id'] = neutron_grp_dict['id']
|
|
|
|
expected_new_rule = copy.copy(expected_args)
|
|
expected_new_rule['id'] = '1234'
|
|
expected_new_rule['tenant_id'] = ''
|
|
expected_new_rule['project_id'] = expected_new_rule['tenant_id']
|
|
|
|
self.register_uris([
|
|
dict(method='GET',
|
|
uri=self.get_mock_url(
|
|
'network', 'public',
|
|
append=['v2.0', 'security-groups.json']),
|
|
json={'security_groups': [neutron_grp_dict]}),
|
|
dict(method='POST',
|
|
uri=self.get_mock_url(
|
|
'network', 'public',
|
|
append=['v2.0', 'security-group-rules.json']),
|
|
json={'security_group_rule': expected_new_rule},
|
|
validate=dict(json={
|
|
'security_group_rule': expected_args}))
|
|
])
|
|
new_rule = self.cloud.create_security_group_rule(
|
|
secgroup_name_or_id=neutron_grp_dict['id'], **args)
|
|
# NOTE(slaweq): don't check location and properties in new rule
|
|
new_rule.pop("location")
|
|
new_rule.pop("properties")
|
|
self.assertEqual(expected_new_rule, new_rule)
|
|
self.assert_calls()
|
|
|
|
def test_create_security_group_rule_neutron_specific_tenant(self):
|
|
self.cloud.secgroup_source = 'neutron'
|
|
args = dict(
|
|
port_range_min=-1,
|
|
port_range_max=40000,
|
|
protocol='tcp',
|
|
remote_ip_prefix='0.0.0.0/0',
|
|
remote_group_id='456',
|
|
direction='egress',
|
|
ethertype='IPv6',
|
|
project_id='861808a93da0484ea1767967c4df8a23'
|
|
)
|
|
expected_args = copy.copy(args)
|
|
# For neutron, -1 port should be converted to None
|
|
expected_args['port_range_min'] = None
|
|
expected_args['security_group_id'] = neutron_grp_dict['id']
|
|
expected_args['tenant_id'] = expected_args['project_id']
|
|
expected_args.pop('project_id')
|
|
|
|
expected_new_rule = copy.copy(expected_args)
|
|
expected_new_rule['id'] = '1234'
|
|
expected_new_rule['project_id'] = expected_new_rule['tenant_id']
|
|
|
|
self.register_uris([
|
|
dict(method='GET',
|
|
uri=self.get_mock_url(
|
|
'network', 'public',
|
|
append=['v2.0', 'security-groups.json']),
|
|
json={'security_groups': [neutron_grp_dict]}),
|
|
dict(method='POST',
|
|
uri=self.get_mock_url(
|
|
'network', 'public',
|
|
append=['v2.0', 'security-group-rules.json']),
|
|
json={'security_group_rule': expected_new_rule},
|
|
validate=dict(json={
|
|
'security_group_rule': expected_args}))
|
|
])
|
|
new_rule = self.cloud.create_security_group_rule(
|
|
secgroup_name_or_id=neutron_grp_dict['id'], ** args)
|
|
# NOTE(slaweq): don't check location and properties in new rule
|
|
new_rule.pop("location")
|
|
new_rule.pop("properties")
|
|
self.assertEqual(expected_new_rule, new_rule)
|
|
self.assert_calls()
|
|
|
|
def test_create_security_group_rule_nova(self):
|
|
self.has_neutron = False
|
|
self.cloud.secgroup_source = 'nova'
|
|
|
|
nova_return = [nova_grp_dict]
|
|
|
|
new_rule = meta.obj_to_munch(fakes.FakeNovaSecgroupRule(
|
|
id='xyz', from_port=1, to_port=2000, ip_protocol='tcp',
|
|
cidr='1.2.3.4/32'))
|
|
|
|
self.register_uris([
|
|
dict(method='GET',
|
|
uri='{endpoint}/os-security-groups'.format(
|
|
endpoint=fakes.COMPUTE_ENDPOINT),
|
|
json={'security_groups': nova_return}),
|
|
dict(method='POST',
|
|
uri='{endpoint}/os-security-group-rules'.format(
|
|
endpoint=fakes.COMPUTE_ENDPOINT),
|
|
json={'security_group_rule': new_rule},
|
|
validate=dict(json={
|
|
"security_group_rule": {
|
|
"from_port": 1,
|
|
"ip_protocol": "tcp",
|
|
"to_port": 2000,
|
|
"parent_group_id": "2",
|
|
"cidr": "1.2.3.4/32",
|
|
"group_id": "123"}})),
|
|
])
|
|
|
|
self.cloud.create_security_group_rule(
|
|
'2', port_range_min=1, port_range_max=2000, protocol='tcp',
|
|
remote_ip_prefix='1.2.3.4/32', remote_group_id='123')
|
|
|
|
self.assert_calls()
|
|
|
|
def test_create_security_group_rule_nova_no_ports(self):
|
|
|
|
self.has_neutron = False
|
|
self.cloud.secgroup_source = 'nova'
|
|
|
|
new_rule = fakes.FakeNovaSecgroupRule(
|
|
id='xyz', from_port=1, to_port=65535, ip_protocol='tcp',
|
|
cidr='1.2.3.4/32')
|
|
|
|
nova_return = [nova_grp_dict]
|
|
|
|
new_rule = meta.obj_to_munch(fakes.FakeNovaSecgroupRule(
|
|
id='xyz', from_port=1, to_port=65535, ip_protocol='tcp',
|
|
cidr='1.2.3.4/32'))
|
|
|
|
self.register_uris([
|
|
dict(method='GET',
|
|
uri='{endpoint}/os-security-groups'.format(
|
|
endpoint=fakes.COMPUTE_ENDPOINT),
|
|
json={'security_groups': nova_return}),
|
|
dict(method='POST',
|
|
uri='{endpoint}/os-security-group-rules'.format(
|
|
endpoint=fakes.COMPUTE_ENDPOINT),
|
|
json={'security_group_rule': new_rule},
|
|
validate=dict(json={
|
|
"security_group_rule": {
|
|
"from_port": 1,
|
|
"ip_protocol": "tcp",
|
|
"to_port": 65535,
|
|
"parent_group_id": "2",
|
|
"cidr": "1.2.3.4/32",
|
|
"group_id": "123"}})),
|
|
])
|
|
|
|
self.cloud.create_security_group_rule(
|
|
'2', protocol='tcp',
|
|
remote_ip_prefix='1.2.3.4/32', remote_group_id='123')
|
|
|
|
self.assert_calls()
|
|
|
|
def test_create_security_group_rule_none(self):
|
|
self.has_neutron = False
|
|
self.cloud.secgroup_source = None
|
|
self.assertRaises(shade.OpenStackCloudUnavailableFeature,
|
|
self.cloud.create_security_group_rule,
|
|
'')
|
|
|
|
def test_delete_security_group_rule_neutron(self):
|
|
rule_id = "xyz"
|
|
self.cloud.secgroup_source = 'neutron'
|
|
self.register_uris([
|
|
dict(method='DELETE',
|
|
uri=self.get_mock_url(
|
|
'network', 'public',
|
|
append=['v2.0', 'security-group-rules',
|
|
'%s.json' % rule_id]),
|
|
json={})
|
|
])
|
|
self.assertTrue(self.cloud.delete_security_group_rule(rule_id))
|
|
self.assert_calls()
|
|
|
|
def test_delete_security_group_rule_nova(self):
|
|
self.has_neutron = False
|
|
self.cloud.secgroup_source = 'nova'
|
|
self.register_uris([
|
|
dict(method='DELETE',
|
|
uri='{endpoint}/os-security-group-rules/xyz'.format(
|
|
endpoint=fakes.COMPUTE_ENDPOINT)),
|
|
])
|
|
r = self.cloud.delete_security_group_rule('xyz')
|
|
self.assertTrue(r)
|
|
self.assert_calls()
|
|
|
|
def test_delete_security_group_rule_none(self):
|
|
self.has_neutron = False
|
|
self.cloud.secgroup_source = None
|
|
self.assertRaises(shade.OpenStackCloudUnavailableFeature,
|
|
self.cloud.delete_security_group_rule,
|
|
'')
|
|
|
|
def test_delete_security_group_rule_not_found(self):
|
|
rule_id = "doesNotExist"
|
|
self.cloud.secgroup_source = 'neutron'
|
|
self.register_uris([
|
|
dict(method='GET',
|
|
uri=self.get_mock_url(
|
|
'network', 'public',
|
|
append=['v2.0', 'security-groups.json']),
|
|
json={'security_groups': [neutron_grp_dict]})
|
|
])
|
|
self.assertFalse(self.cloud.delete_security_group(rule_id))
|
|
self.assert_calls()
|
|
|
|
def test_delete_security_group_rule_not_found_nova(self):
|
|
self.has_neutron = False
|
|
self.cloud.secgroup_source = 'nova'
|
|
self.register_uris([
|
|
dict(method='GET',
|
|
uri='{endpoint}/os-security-groups'.format(
|
|
endpoint=fakes.COMPUTE_ENDPOINT),
|
|
json={'security_groups': [nova_grp_dict]}),
|
|
])
|
|
r = self.cloud.delete_security_group('doesNotExist')
|
|
self.assertFalse(r)
|
|
|
|
self.assert_calls()
|
|
|
|
def test_nova_egress_security_group_rule(self):
|
|
self.has_neutron = False
|
|
self.cloud.secgroup_source = 'nova'
|
|
self.register_uris([
|
|
dict(method='GET',
|
|
uri='{endpoint}/os-security-groups'.format(
|
|
endpoint=fakes.COMPUTE_ENDPOINT),
|
|
json={'security_groups': [nova_grp_dict]}),
|
|
])
|
|
self.assertRaises(shade.OpenStackCloudException,
|
|
self.cloud.create_security_group_rule,
|
|
secgroup_name_or_id='nova-sec-group',
|
|
direction='egress')
|
|
|
|
self.assert_calls()
|
|
|
|
def test_list_server_security_groups_nova(self):
|
|
self.has_neutron = False
|
|
|
|
server = dict(id='server_id')
|
|
|
|
self.register_uris([
|
|
dict(
|
|
method='GET',
|
|
uri='{endpoint}/servers/{id}/os-security-groups'.format(
|
|
endpoint=fakes.COMPUTE_ENDPOINT,
|
|
id='server_id'),
|
|
json={'security_groups': [nova_grp_dict]}),
|
|
])
|
|
groups = self.cloud.list_server_security_groups(server)
|
|
self.assertIn('location', groups[0])
|
|
self.assertEqual(
|
|
groups[0]['security_group_rules'][0]['remote_ip_prefix'],
|
|
nova_grp_dict['rules'][0]['ip_range']['cidr'])
|
|
|
|
self.assert_calls()
|
|
|
|
def test_list_server_security_groups_bad_source(self):
|
|
self.has_neutron = False
|
|
self.cloud.secgroup_source = 'invalid'
|
|
server = dict(id='server_id')
|
|
ret = self.cloud.list_server_security_groups(server)
|
|
self.assertEqual([], ret)
|
|
|
|
def test_add_security_group_to_server_nova(self):
|
|
|
|
self.has_neutron = False
|
|
self.cloud.secgroup_source = 'nova'
|
|
|
|
self.register_uris([
|
|
dict(
|
|
method='GET',
|
|
uri='{endpoint}/os-security-groups'.format(
|
|
endpoint=fakes.COMPUTE_ENDPOINT,
|
|
id='server_id'),
|
|
json={'security_groups': [nova_grp_dict]}),
|
|
dict(
|
|
method='POST',
|
|
uri='%s/servers/%s/action' % (fakes.COMPUTE_ENDPOINT, '1234'),
|
|
validate={'addSecurityGroup': {'name': 'nova-sec-group'}},
|
|
status_code=202,
|
|
),
|
|
])
|
|
|
|
ret = self.cloud.add_server_security_groups(
|
|
dict(id='1234'), 'nova-sec-group')
|
|
|
|
self.assertTrue(ret)
|
|
|
|
self.assert_calls()
|
|
|
|
def test_add_security_group_to_server_neutron(self):
|
|
# fake to get server by name, server-name must match
|
|
fake_server = fakes.make_fake_server('1234', 'server-name', 'ACTIVE')
|
|
|
|
# use neutron for secgroup list and return an existing fake
|
|
self.cloud.secgroup_source = 'neutron'
|
|
|
|
self.register_uris([
|
|
dict(method='GET',
|
|
uri=self.get_mock_url(
|
|
'compute', 'public',
|
|
append=['servers', 'detail']),
|
|
json={'servers': [meta.obj_to_munch(fake_server)]}),
|
|
dict(method='GET',
|
|
uri=self.get_mock_url(
|
|
'network', 'public',
|
|
append=['v2.0', 'security-groups.json']),
|
|
json={'security_groups': [neutron_grp_dict]}),
|
|
dict(method='POST',
|
|
uri='%s/servers/%s/action' % (fakes.COMPUTE_ENDPOINT, '1234'),
|
|
validate={'addSecurityGroup': {'name': 'neutron-sec-group'}},
|
|
status_code=202),
|
|
])
|
|
|
|
self.assertTrue(self.cloud.add_server_security_groups(
|
|
'server-name', 'neutron-sec-group'))
|
|
self.assert_calls()
|
|
|
|
def test_remove_security_group_from_server_nova(self):
|
|
|
|
self.has_neutron = False
|
|
self.cloud.secgroup_source = 'nova'
|
|
|
|
self.register_uris([
|
|
dict(
|
|
method='GET',
|
|
uri='{endpoint}/os-security-groups'.format(
|
|
endpoint=fakes.COMPUTE_ENDPOINT),
|
|
json={'security_groups': [nova_grp_dict]}),
|
|
dict(
|
|
method='POST',
|
|
uri='%s/servers/%s/action' % (fakes.COMPUTE_ENDPOINT, '1234'),
|
|
validate={'removeSecurityGroup': {'name': 'nova-sec-group'}},
|
|
),
|
|
])
|
|
|
|
ret = self.cloud.remove_server_security_groups(
|
|
dict(id='1234'), 'nova-sec-group')
|
|
self.assertTrue(ret)
|
|
|
|
self.assert_calls()
|
|
|
|
def test_remove_security_group_from_server_neutron(self):
|
|
# fake to get server by name, server-name must match
|
|
fake_server = fakes.make_fake_server('1234', 'server-name', 'ACTIVE')
|
|
|
|
# use neutron for secgroup list and return an existing fake
|
|
self.cloud.secgroup_source = 'neutron'
|
|
|
|
validate = {'removeSecurityGroup': {'name': 'neutron-sec-group'}}
|
|
self.register_uris([
|
|
dict(method='GET',
|
|
uri=self.get_mock_url(
|
|
'compute', 'public',
|
|
append=['servers', 'detail']),
|
|
json={'servers': [meta.obj_to_munch(fake_server)]}),
|
|
dict(method='GET',
|
|
uri=self.get_mock_url(
|
|
'network', 'public',
|
|
append=['v2.0', 'security-groups.json']),
|
|
json={'security_groups': [neutron_grp_dict]}),
|
|
dict(method='POST',
|
|
uri='%s/servers/%s/action' % (fakes.COMPUTE_ENDPOINT, '1234'),
|
|
validate=validate),
|
|
])
|
|
|
|
self.assertTrue(self.cloud.remove_server_security_groups(
|
|
'server-name', 'neutron-sec-group'))
|
|
self.assert_calls()
|
|
|
|
def test_add_bad_security_group_to_server_nova(self):
|
|
# fake to get server by name, server-name must match
|
|
fake_server = fakes.make_fake_server('1234', 'server-name', 'ACTIVE')
|
|
|
|
# use nova for secgroup list and return an existing fake
|
|
self.has_neutron = False
|
|
self.cloud.secgroup_source = 'nova'
|
|
self.register_uris([
|
|
dict(
|
|
method='GET',
|
|
uri='{endpoint}/servers/detail'.format(
|
|
endpoint=fakes.COMPUTE_ENDPOINT),
|
|
json={'servers': [fake_server]}),
|
|
dict(
|
|
method='GET',
|
|
uri='{endpoint}/os-security-groups'.format(
|
|
endpoint=fakes.COMPUTE_ENDPOINT),
|
|
json={'security_groups': [nova_grp_dict]}),
|
|
])
|
|
|
|
ret = self.cloud.add_server_security_groups('server-name',
|
|
'unknown-sec-group')
|
|
self.assertFalse(ret)
|
|
|
|
self.assert_calls()
|
|
|
|
def test_add_bad_security_group_to_server_neutron(self):
|
|
# fake to get server by name, server-name must match
|
|
fake_server = fakes.make_fake_server('1234', 'server-name', 'ACTIVE')
|
|
|
|
# use neutron for secgroup list and return an existing fake
|
|
self.cloud.secgroup_source = 'neutron'
|
|
|
|
self.register_uris([
|
|
dict(method='GET',
|
|
uri=self.get_mock_url(
|
|
'compute', 'public',
|
|
append=['servers', 'detail']),
|
|
json={'servers': [meta.obj_to_munch(fake_server)]}),
|
|
dict(method='GET',
|
|
uri=self.get_mock_url(
|
|
'network', 'public',
|
|
append=['v2.0', 'security-groups.json']),
|
|
json={'security_groups': [neutron_grp_dict]})
|
|
])
|
|
self.assertFalse(self.cloud.add_server_security_groups(
|
|
'server-name', 'unknown-sec-group'))
|
|
self.assert_calls()
|
|
|
|
def test_add_security_group_to_bad_server(self):
|
|
# fake to get server by name, server-name must match
|
|
fake_server = fakes.make_fake_server('1234', 'server-name', 'ACTIVE')
|
|
|
|
self.register_uris([
|
|
dict(
|
|
method='GET',
|
|
uri='{endpoint}/servers/detail'.format(
|
|
endpoint=fakes.COMPUTE_ENDPOINT),
|
|
json={'servers': [fake_server]}),
|
|
])
|
|
|
|
ret = self.cloud.add_server_security_groups('unknown-server-name',
|
|
'nova-sec-group')
|
|
self.assertFalse(ret)
|
|
|
|
self.assert_calls()
|