Merge "Replace dict.get(key) in drivers unit tests"
This commit is contained in:
commit
b83040bd30
@ -135,10 +135,10 @@ class TestAgentMethods(db_base.DbTestCase):
|
||||
self.node.instance_info['image_source'])
|
||||
glance_mock.return_value.swift_temp_url.assert_called_once_with(
|
||||
image_info)
|
||||
image_type = task.node.instance_info.get('image_type')
|
||||
image_type = task.node.instance_info['image_type']
|
||||
self.assertEqual('partition', image_type)
|
||||
self.assertEqual('kernel', info.get('kernel'))
|
||||
self.assertEqual('ramdisk', info.get('ramdisk'))
|
||||
self.assertEqual('kernel', info['kernel'])
|
||||
self.assertEqual('ramdisk', info['ramdisk'])
|
||||
self.assertEqual(expected_i_info, info)
|
||||
parse_instance_info_mock.assert_called_once_with(task.node)
|
||||
|
||||
@ -209,7 +209,7 @@ class TestAgentMethods(db_base.DbTestCase):
|
||||
info['image_url'])
|
||||
validate_href_mock.assert_called_once_with(
|
||||
mock.ANY, 'http://image-ref')
|
||||
self.assertEqual('partition', info.get('image_type'))
|
||||
self.assertEqual('partition', info['image_type'])
|
||||
self.assertEqual(expected_i_info, info)
|
||||
parse_instance_info_mock.assert_called_once_with(task.node)
|
||||
|
||||
@ -771,8 +771,8 @@ class TestAgentVendor(db_base.DbTestCase):
|
||||
self.assertFalse(prepare_mock.called)
|
||||
self.assertEqual(states.ACTIVE, task.node.provision_state)
|
||||
self.assertEqual(states.NOSTATE, task.node.target_provision_state)
|
||||
driver_int_info = task.node.driver_internal_info
|
||||
self.assertIsNone(driver_int_info.get('root_uuid_or_disk_id'))
|
||||
self.assertNotIn('root_uuid_or_disk_id',
|
||||
task.node.driver_internal_info)
|
||||
self.assertFalse(uuid_mock.called)
|
||||
|
||||
@mock.patch.object(deploy_utils, 'get_boot_mode_for_deploy', autospec=True)
|
||||
@ -817,8 +817,8 @@ class TestAgentVendor(db_base.DbTestCase):
|
||||
self.assertEqual(states.ACTIVE, task.node.provision_state)
|
||||
self.assertEqual(states.NOSTATE, task.node.target_provision_state)
|
||||
driver_int_info = task.node.driver_internal_info
|
||||
self.assertEqual(driver_int_info.get('root_uuid_or_disk_id'),
|
||||
'root_uuid')
|
||||
self.assertEqual('root_uuid',
|
||||
driver_int_info['root_uuid_or_disk_id']),
|
||||
uuid_mock.assert_called_once_with(self.passthru, task, 'root_uuid')
|
||||
boot_mode_mock.assert_called_once_with(task.node)
|
||||
|
||||
@ -856,8 +856,8 @@ class TestAgentVendor(db_base.DbTestCase):
|
||||
self.assertFalse(prepare_mock.called)
|
||||
power_off_mock.assert_called_once_with(task.node)
|
||||
check_deploy_mock.assert_called_once_with(mock.ANY, task.node)
|
||||
driver_int_info = task.node.driver_internal_info
|
||||
self.assertIsNone(driver_int_info.get('root_uuid_or_disk_id'))
|
||||
self.assertNotIn('root_uuid_or_disk_id',
|
||||
task.node.driver_internal_info)
|
||||
|
||||
get_power_state_mock.assert_called_once_with(task)
|
||||
node_power_action_mock.assert_called_once_with(
|
||||
|
@ -946,8 +946,7 @@ class TestBaseAgentVendor(db_base.DbTestCase):
|
||||
shared=False) as task:
|
||||
self.passthru._cleaning_reboot(task)
|
||||
mock_reboot.assert_called_once_with(task, states.REBOOT)
|
||||
self.assertTrue(task.node.driver_internal_info.get(
|
||||
'cleaning_reboot'))
|
||||
self.assertTrue(task.node.driver_internal_info['cleaning_reboot'])
|
||||
|
||||
@mock.patch.object(manager_utils, 'cleaning_error_handler', autospec=True)
|
||||
@mock.patch.object(manager_utils, 'node_power_action', autospec=True)
|
||||
|
@ -309,8 +309,7 @@ class TestAgentClient(base.TestCase):
|
||||
'step': step,
|
||||
'node': self.node.as_dict(),
|
||||
'ports': [],
|
||||
'clean_version': self.node.driver_internal_info.get(
|
||||
'hardware_manager_version')
|
||||
'clean_version': None
|
||||
}
|
||||
self.client.execute_clean_step(step,
|
||||
self.node,
|
||||
|
@ -97,7 +97,7 @@ class ConsoleUtilsTestCase(db_base.DbTestCase):
|
||||
mock_dir.return_value = tempfile.gettempdir()
|
||||
expected_path = '%(tempdir)s/%(uuid)s.pid' % {
|
||||
'tempdir': mock_dir.return_value,
|
||||
'uuid': self.info.get('uuid')}
|
||||
'uuid': self.info['uuid']}
|
||||
path = console_utils._get_console_pid_file(self.info['uuid'])
|
||||
self.assertEqual(expected_path, path)
|
||||
mock_dir.assert_called_once_with()
|
||||
|
@ -48,11 +48,11 @@ class IBootPrivateMethodTestCase(db_base.DbTestCase):
|
||||
driver='fake_iboot',
|
||||
driver_info=INFO_DICT)
|
||||
info = iboot._parse_driver_info(node)
|
||||
self.assertIsNotNone(info.get('address'))
|
||||
self.assertIsNotNone(info.get('username'))
|
||||
self.assertIsNotNone(info.get('password'))
|
||||
self.assertIsNotNone(info.get('port'))
|
||||
self.assertIsNotNone(info.get('relay_id'))
|
||||
self.assertIsNotNone(info['address'])
|
||||
self.assertIsNotNone(info['username'])
|
||||
self.assertIsNotNone(info['password'])
|
||||
self.assertIsNotNone(info['port'])
|
||||
self.assertIsNotNone(info['relay_id'])
|
||||
|
||||
def test__parse_driver_info_good_with_explicit_port(self):
|
||||
info = dict(INFO_DICT)
|
||||
@ -62,7 +62,7 @@ class IBootPrivateMethodTestCase(db_base.DbTestCase):
|
||||
driver='fake_iboot',
|
||||
driver_info=info)
|
||||
info = iboot._parse_driver_info(node)
|
||||
self.assertEqual(1234, info.get('port'))
|
||||
self.assertEqual(1234, info['port'])
|
||||
|
||||
def test__parse_driver_info_good_with_explicit_relay_id(self):
|
||||
info = dict(INFO_DICT)
|
||||
@ -72,7 +72,7 @@ class IBootPrivateMethodTestCase(db_base.DbTestCase):
|
||||
driver='fake_iboot',
|
||||
driver_info=info)
|
||||
info = iboot._parse_driver_info(node)
|
||||
self.assertEqual(2, info.get('relay_id'))
|
||||
self.assertEqual(2, info['relay_id'])
|
||||
|
||||
def test__parse_driver_info_missing_address(self):
|
||||
info = dict(INFO_DICT)
|
||||
|
@ -51,11 +51,11 @@ class IPMINativePrivateMethodTestCase(db_base.DbTestCase):
|
||||
|
||||
def test__parse_driver_info(self):
|
||||
# make sure we get back the expected things
|
||||
self.assertIsNotNone(self.info.get('address'))
|
||||
self.assertIsNotNone(self.info.get('username'))
|
||||
self.assertIsNotNone(self.info.get('password'))
|
||||
self.assertIsNotNone(self.info.get('uuid'))
|
||||
self.assertIsNotNone(self.info.get('force_boot_device'))
|
||||
self.assertIsNotNone(self.info['address'])
|
||||
self.assertIsNotNone(self.info['username'])
|
||||
self.assertIsNotNone(self.info['password'])
|
||||
self.assertIsNotNone(self.info['uuid'])
|
||||
self.assertIsNotNone(self.info['force_boot_device'])
|
||||
|
||||
# make sure error is raised when info, eg. username, is missing
|
||||
info = dict(INFO_DICT)
|
||||
|
@ -348,7 +348,7 @@ class IPMIToolPrivateMethodTestCase(db_base.DbTestCase):
|
||||
self.assertFalse(os.path.isfile(pw_file))
|
||||
|
||||
def test__make_password_file_str_password(self, mock_sleep):
|
||||
self._test__make_password_file(mock_sleep, self.info.get('password'))
|
||||
self._test__make_password_file(mock_sleep, self.info['password'])
|
||||
|
||||
def test__make_password_file_with_numeric_password(self, mock_sleep):
|
||||
self._test__make_password_file(mock_sleep, 12345)
|
||||
@ -395,7 +395,7 @@ class IPMIToolPrivateMethodTestCase(db_base.DbTestCase):
|
||||
# make sure we get back the expected things
|
||||
_OPTIONS = ['address', 'username', 'password', 'uuid']
|
||||
for option in _OPTIONS:
|
||||
self.assertIsNotNone(self.info.get(option))
|
||||
self.assertIsNotNone(self.info[option])
|
||||
|
||||
info = dict(INFO_DICT)
|
||||
|
||||
|
@ -129,9 +129,9 @@ class PXEPrivateMethodsTestCase(db_base.DbTestCase):
|
||||
self.assertEqual(expected_info, image_info)
|
||||
self.assertFalse(show_mock.called)
|
||||
self.assertEqual('instance_kernel_uuid',
|
||||
self.node.instance_info.get('kernel'))
|
||||
self.node.instance_info['kernel'])
|
||||
self.assertEqual('instance_ramdisk_uuid',
|
||||
self.node.instance_info.get('ramdisk'))
|
||||
self.node.instance_info['ramdisk'])
|
||||
|
||||
def test__get_instance_image_info(self):
|
||||
# Tests when 'is_whole_disk_image' exists in driver_internal_info
|
||||
|
@ -87,11 +87,11 @@ class SeaMicroValidateParametersTestCase(db_base.DbTestCase):
|
||||
driver='fake_seamicro',
|
||||
driver_info=INFO_DICT)
|
||||
info = seamicro._parse_driver_info(node)
|
||||
self.assertIsNotNone(info.get('api_endpoint'))
|
||||
self.assertIsNotNone(info.get('username'))
|
||||
self.assertIsNotNone(info.get('password'))
|
||||
self.assertIsNotNone(info.get('server_id'))
|
||||
self.assertIsNotNone(info.get('uuid'))
|
||||
self.assertIsNotNone(info['api_endpoint'])
|
||||
self.assertIsNotNone(info['username'])
|
||||
self.assertIsNotNone(info['password'])
|
||||
self.assertIsNotNone(info['server_id'])
|
||||
self.assertIsNotNone(info['uuid'])
|
||||
|
||||
def test__parse_driver_info_missing_api_endpoint(self):
|
||||
# make sure error is raised when info is missing
|
||||
|
@ -212,71 +212,69 @@ class SNMPValidateParametersTestCase(db_base.DbTestCase):
|
||||
# Make sure we get back the expected things.
|
||||
node = self._get_test_node(INFO_DICT)
|
||||
info = snmp._parse_driver_info(node)
|
||||
self.assertEqual(INFO_DICT['snmp_driver'], info.get('driver'))
|
||||
self.assertEqual(INFO_DICT['snmp_address'], info.get('address'))
|
||||
self.assertEqual(INFO_DICT['snmp_port'], str(info.get('port')))
|
||||
self.assertEqual(INFO_DICT['snmp_outlet'], info.get('outlet'))
|
||||
self.assertEqual(INFO_DICT['snmp_version'], info.get('version'))
|
||||
self.assertEqual(INFO_DICT.get('snmp_community'),
|
||||
info.get('community'))
|
||||
self.assertEqual(INFO_DICT.get('snmp_security'),
|
||||
info.get('security'))
|
||||
self.assertEqual(INFO_DICT['snmp_driver'], info['driver'])
|
||||
self.assertEqual(INFO_DICT['snmp_address'], info['address'])
|
||||
self.assertEqual(INFO_DICT['snmp_port'], str(info['port']))
|
||||
self.assertEqual(INFO_DICT['snmp_outlet'], info['outlet'])
|
||||
self.assertEqual(INFO_DICT['snmp_version'], info['version'])
|
||||
self.assertEqual(INFO_DICT['snmp_community'], info['community'])
|
||||
self.assertNotIn('security', info)
|
||||
|
||||
def test__parse_driver_info_apc(self):
|
||||
# Make sure the APC driver type is parsed.
|
||||
info = db_utils.get_test_snmp_info(snmp_driver='apc')
|
||||
node = self._get_test_node(info)
|
||||
info = snmp._parse_driver_info(node)
|
||||
self.assertEqual('apc', info.get('driver'))
|
||||
self.assertEqual('apc', info['driver'])
|
||||
|
||||
def test__parse_driver_info_apc_masterswitch(self):
|
||||
# Make sure the APC driver type is parsed.
|
||||
info = db_utils.get_test_snmp_info(snmp_driver='apc_masterswitch')
|
||||
node = self._get_test_node(info)
|
||||
info = snmp._parse_driver_info(node)
|
||||
self.assertEqual('apc_masterswitch', info.get('driver'))
|
||||
self.assertEqual('apc_masterswitch', info['driver'])
|
||||
|
||||
def test__parse_driver_info_apc_masterswitchplus(self):
|
||||
# Make sure the APC driver type is parsed.
|
||||
info = db_utils.get_test_snmp_info(snmp_driver='apc_masterswitchplus')
|
||||
node = self._get_test_node(info)
|
||||
info = snmp._parse_driver_info(node)
|
||||
self.assertEqual('apc_masterswitchplus', info.get('driver'))
|
||||
self.assertEqual('apc_masterswitchplus', info['driver'])
|
||||
|
||||
def test__parse_driver_info_apc_rackpdu(self):
|
||||
# Make sure the APC driver type is parsed.
|
||||
info = db_utils.get_test_snmp_info(snmp_driver='apc_rackpdu')
|
||||
node = self._get_test_node(info)
|
||||
info = snmp._parse_driver_info(node)
|
||||
self.assertEqual('apc_rackpdu', info.get('driver'))
|
||||
self.assertEqual('apc_rackpdu', info['driver'])
|
||||
|
||||
def test__parse_driver_info_aten(self):
|
||||
# Make sure the Aten driver type is parsed.
|
||||
info = db_utils.get_test_snmp_info(snmp_driver='aten')
|
||||
node = self._get_test_node(info)
|
||||
info = snmp._parse_driver_info(node)
|
||||
self.assertEqual('aten', info.get('driver'))
|
||||
self.assertEqual('aten', info['driver'])
|
||||
|
||||
def test__parse_driver_info_cyberpower(self):
|
||||
# Make sure the CyberPower driver type is parsed.
|
||||
info = db_utils.get_test_snmp_info(snmp_driver='cyberpower')
|
||||
node = self._get_test_node(info)
|
||||
info = snmp._parse_driver_info(node)
|
||||
self.assertEqual('cyberpower', info.get('driver'))
|
||||
self.assertEqual('cyberpower', info['driver'])
|
||||
|
||||
def test__parse_driver_info_eatonpower(self):
|
||||
# Make sure the Eaton Power driver type is parsed.
|
||||
info = db_utils.get_test_snmp_info(snmp_driver='eatonpower')
|
||||
node = self._get_test_node(info)
|
||||
info = snmp._parse_driver_info(node)
|
||||
self.assertEqual('eatonpower', info.get('driver'))
|
||||
self.assertEqual('eatonpower', info['driver'])
|
||||
|
||||
def test__parse_driver_info_teltronix(self):
|
||||
# Make sure the Teltronix driver type is parsed.
|
||||
info = db_utils.get_test_snmp_info(snmp_driver='teltronix')
|
||||
node = self._get_test_node(info)
|
||||
info = snmp._parse_driver_info(node)
|
||||
self.assertEqual('teltronix', info.get('driver'))
|
||||
self.assertEqual('teltronix', info['driver'])
|
||||
|
||||
def test__parse_driver_info_snmp_v1(self):
|
||||
# Make sure SNMPv1 is parsed with a community string.
|
||||
@ -284,8 +282,8 @@ class SNMPValidateParametersTestCase(db_base.DbTestCase):
|
||||
snmp_community='public')
|
||||
node = self._get_test_node(info)
|
||||
info = snmp._parse_driver_info(node)
|
||||
self.assertEqual('1', info.get('version'))
|
||||
self.assertEqual('public', info.get('community'))
|
||||
self.assertEqual('1', info['version'])
|
||||
self.assertEqual('public', info['community'])
|
||||
|
||||
def test__parse_driver_info_snmp_v2c(self):
|
||||
# Make sure SNMPv2c is parsed with a community string.
|
||||
@ -293,8 +291,8 @@ class SNMPValidateParametersTestCase(db_base.DbTestCase):
|
||||
snmp_community='private')
|
||||
node = self._get_test_node(info)
|
||||
info = snmp._parse_driver_info(node)
|
||||
self.assertEqual('2c', info.get('version'))
|
||||
self.assertEqual('private', info.get('community'))
|
||||
self.assertEqual('2c', info['version'])
|
||||
self.assertEqual('private', info['community'])
|
||||
|
||||
def test__parse_driver_info_snmp_v3(self):
|
||||
# Make sure SNMPv3 is parsed with a security string.
|
||||
@ -302,8 +300,8 @@ class SNMPValidateParametersTestCase(db_base.DbTestCase):
|
||||
snmp_security='pass')
|
||||
node = self._get_test_node(info)
|
||||
info = snmp._parse_driver_info(node)
|
||||
self.assertEqual('3', info.get('version'))
|
||||
self.assertEqual('pass', info.get('security'))
|
||||
self.assertEqual('3', info['version'])
|
||||
self.assertEqual('pass', info['security'])
|
||||
|
||||
def test__parse_driver_info_snmp_port_default(self):
|
||||
# Make sure default SNMP UDP port numbers are correct
|
||||
@ -311,14 +309,14 @@ class SNMPValidateParametersTestCase(db_base.DbTestCase):
|
||||
del info['snmp_port']
|
||||
node = self._get_test_node(info)
|
||||
info = snmp._parse_driver_info(node)
|
||||
self.assertEqual(161, info.get('port'))
|
||||
self.assertEqual(161, info['port'])
|
||||
|
||||
def test__parse_driver_info_snmp_port(self):
|
||||
# Make sure non-default SNMP UDP port numbers can be configured
|
||||
info = db_utils.get_test_snmp_info(snmp_port='10161')
|
||||
node = self._get_test_node(info)
|
||||
info = snmp._parse_driver_info(node)
|
||||
self.assertEqual(10161, info.get('port'))
|
||||
self.assertEqual(10161, info['port'])
|
||||
|
||||
def test__parse_driver_info_missing_driver(self):
|
||||
# Make sure exception is raised when the driver type is missing.
|
||||
@ -361,8 +359,8 @@ class SNMPValidateParametersTestCase(db_base.DbTestCase):
|
||||
del info['snmp_version']
|
||||
node = self._get_test_node(info)
|
||||
info = snmp._parse_driver_info(node)
|
||||
self.assertEqual('1', info.get('version'))
|
||||
self.assertEqual(INFO_DICT['snmp_community'], info.get('community'))
|
||||
self.assertEqual('1', info['version'])
|
||||
self.assertEqual(INFO_DICT['snmp_community'], info['community'])
|
||||
|
||||
def test__parse_driver_info_invalid_version(self):
|
||||
# Make sure exception is raised when version is invalid.
|
||||
|
@ -50,13 +50,13 @@ class SSHValidateParametersTestCase(db_base.DbTestCase):
|
||||
driver='fake_ssh',
|
||||
driver_info=db_utils.get_test_ssh_info('password'))
|
||||
info = ssh._parse_driver_info(node)
|
||||
self.assertIsNotNone(info.get('host'))
|
||||
self.assertIsNotNone(info.get('username'))
|
||||
self.assertIsNotNone(info.get('password'))
|
||||
self.assertIsNotNone(info.get('port'))
|
||||
self.assertIsNotNone(info.get('virt_type'))
|
||||
self.assertIsNotNone(info.get('cmd_set'))
|
||||
self.assertIsNotNone(info.get('uuid'))
|
||||
self.assertIsNotNone(info['host'])
|
||||
self.assertIsNotNone(info['username'])
|
||||
self.assertIsNotNone(info['password'])
|
||||
self.assertIsNotNone(info['port'])
|
||||
self.assertIsNotNone(info['virt_type'])
|
||||
self.assertIsNotNone(info['cmd_set'])
|
||||
self.assertIsNotNone(info['uuid'])
|
||||
|
||||
def test__parse_driver_info_good_key(self):
|
||||
# make sure we get back the expected things
|
||||
@ -65,13 +65,13 @@ class SSHValidateParametersTestCase(db_base.DbTestCase):
|
||||
driver='fake_ssh',
|
||||
driver_info=db_utils.get_test_ssh_info('key'))
|
||||
info = ssh._parse_driver_info(node)
|
||||
self.assertIsNotNone(info.get('host'))
|
||||
self.assertIsNotNone(info.get('username'))
|
||||
self.assertIsNotNone(info.get('key_contents'))
|
||||
self.assertIsNotNone(info.get('port'))
|
||||
self.assertIsNotNone(info.get('virt_type'))
|
||||
self.assertIsNotNone(info.get('cmd_set'))
|
||||
self.assertIsNotNone(info.get('uuid'))
|
||||
self.assertIsNotNone(info['host'])
|
||||
self.assertIsNotNone(info['username'])
|
||||
self.assertIsNotNone(info['key_contents'])
|
||||
self.assertIsNotNone(info['port'])
|
||||
self.assertIsNotNone(info['virt_type'])
|
||||
self.assertIsNotNone(info['cmd_set'])
|
||||
self.assertIsNotNone(info['uuid'])
|
||||
|
||||
def test__parse_driver_info_good_file(self):
|
||||
# make sure we get back the expected things
|
||||
@ -85,13 +85,13 @@ class SSHValidateParametersTestCase(db_base.DbTestCase):
|
||||
driver='fake_ssh',
|
||||
driver_info=d_info)
|
||||
info = ssh._parse_driver_info(node)
|
||||
self.assertIsNotNone(info.get('host'))
|
||||
self.assertIsNotNone(info.get('username'))
|
||||
self.assertIsNotNone(info.get('key_filename'))
|
||||
self.assertIsNotNone(info.get('port'))
|
||||
self.assertIsNotNone(info.get('virt_type'))
|
||||
self.assertIsNotNone(info.get('cmd_set'))
|
||||
self.assertIsNotNone(info.get('uuid'))
|
||||
self.assertIsNotNone(info['host'])
|
||||
self.assertIsNotNone(info['username'])
|
||||
self.assertIsNotNone(info['key_filename'])
|
||||
self.assertIsNotNone(info['port'])
|
||||
self.assertIsNotNone(info['virt_type'])
|
||||
self.assertIsNotNone(info['cmd_set'])
|
||||
self.assertIsNotNone(info['uuid'])
|
||||
|
||||
def test__parse_driver_info_bad_file(self):
|
||||
# A filename that doesn't exist errors.
|
||||
|
@ -161,8 +161,7 @@ class UtilsTestCase(db_base.DbTestCase):
|
||||
task.node.refresh()
|
||||
self.assertEqual(
|
||||
False,
|
||||
task.node.driver_internal_info.get('is_next_boot_persistent')
|
||||
)
|
||||
task.node.driver_internal_info['is_next_boot_persistent'])
|
||||
|
||||
def test_capabilities_to_dict(self):
|
||||
capabilities_more_than_one_item = 'a:b,c:d'
|
||||
|
Loading…
x
Reference in New Issue
Block a user