diff --git a/cloudbaseinit/tests/metadata/services/osconfigdrive/test_factory.py b/cloudbaseinit/tests/metadata/services/osconfigdrive/test_factory.py index 99347f13..7b9637f6 100644 --- a/cloudbaseinit/tests/metadata/services/osconfigdrive/test_factory.py +++ b/cloudbaseinit/tests/metadata/services/osconfigdrive/test_factory.py @@ -14,7 +14,6 @@ # License for the specific language governing permissions and limitations # under the License. - import mock import sys import unittest @@ -24,17 +23,27 @@ from cloudbaseinit.metadata.services.osconfigdrive import factory class ClassloaderTest(unittest.TestCase): + def setUp(self): + self.original_platform = sys.platform + + def tearDown(self): + sys.platform = self.original_platform + @mock.patch('cloudbaseinit.utils.classloader.ClassLoader.load_class') def _test_get_config_drive_manager(self, mock_load_class, platform): sys.platform = platform + if platform is not "win32": self.assertRaises(NotImplementedError, factory.get_config_drive_manager) + else: response = factory.get_config_drive_manager() + mock_load_class.assert_called_once_with( 'cloudbaseinit.metadata.services.osconfigdrive.' 'windows.WindowsConfigDriveManager') + self.assertIsNotNone(response) def test_get_config_drive_manager(self): diff --git a/cloudbaseinit/tests/osutils/test_windows.py b/cloudbaseinit/tests/osutils/test_windows.py index 82d2b12f..162cc2c1 100644 --- a/cloudbaseinit/tests/osutils/test_windows.py +++ b/cloudbaseinit/tests/osutils/test_windows.py @@ -15,28 +15,19 @@ # under the License. import ctypes +import importlib import mock import os -import sys import unittest from oslo.config import cfg from cloudbaseinit.utils import s -if sys.platform == 'win32': - import win32process - import win32security - - from ctypes import windll - from ctypes import wintypes - - from cloudbaseinit.osutils import windows as windows_utils CONF = cfg.CONF -@unittest.skipUnless(sys.platform == "win32", "requires Windows") class WindowsUtilsTest(unittest.TestCase): '''Tests for the windows utils class''' @@ -49,36 +40,67 @@ class WindowsUtilsTest(unittest.TestCase): _USERNAME = 'Admin' def setUp(self): - self._winutils = windows_utils.WindowsUtils() - self._conn = mock.MagicMock() + self._pywintypes_mock = mock.MagicMock() + self._win32com_mock = mock.MagicMock() + self._win32process_mock = mock.MagicMock() + self._win32security_mock = mock.MagicMock() + self._wmi_mock = mock.MagicMock() + self._moves_mock = mock.MagicMock() + self._xmlrpc_client_mock = mock.MagicMock() + self._ctypes_mock = mock.MagicMock() + + self._module_patcher = mock.patch.dict( + 'sys.modules', + {'win32com': self._win32com_mock, + 'win32process': self._win32process_mock, + 'win32security': self._win32security_mock, + 'wmi': self._wmi_mock, + 'six.moves': self._moves_mock, + 'six.moves.xmlrpc_client': self._xmlrpc_client_mock, + 'ctypes': self._ctypes_mock, + 'pywintypes': self._pywintypes_mock}) + + self._module_patcher.start() + self.windows_utils = importlib.import_module( + "cloudbaseinit.osutils.windows") + + self._winreg_mock = self._moves_mock.winreg + self._windll_mock = self._ctypes_mock.windll + self._wintypes_mock = self._ctypes_mock.wintypes + self._client_mock = self._win32com_mock.client + self.windows_utils.WindowsError = mock.MagicMock() + + self._winutils = self.windows_utils.WindowsUtils() + + def tearDown(self): + self._module_patcher.stop() def test_enable_shutdown_privilege(self): fake_process = mock.MagicMock() fake_token = True - private_LUID = 'fakeid' - win32process.GetCurrentProcess = mock.MagicMock( - return_value=fake_process) - win32security.OpenProcessToken = mock.MagicMock( - return_value=fake_token) - win32security.LookupPrivilegeValue = mock.MagicMock( - return_value=private_LUID) - win32security.AdjustTokenPrivileges = mock.MagicMock() + LUID = 'fakeid' + self._win32process_mock.GetCurrentProcess.return_value = fake_process + self._win32security_mock.OpenProcessToken.return_value = fake_token + self._win32security_mock.LookupPrivilegeValue.return_value = LUID + self._winutils._enable_shutdown_privilege() - privilege = [(private_LUID, win32security.SE_PRIVILEGE_ENABLED)] - win32security.AdjustTokenPrivileges.assert_called_with( + + privilege = [(LUID, + self._win32security_mock.SE_PRIVILEGE_ENABLED)] + self._win32security_mock.AdjustTokenPrivileges.assert_called_with( fake_token, False, privilege) - win32security.OpenProcessToken.assert_called_with( - fake_process, - win32security.TOKEN_ADJUST_PRIVILEGES | - win32security.TOKEN_QUERY) + self._win32security_mock.OpenProcessToken.assert_called_with( + fake_process, self._win32security_mock.TOKEN_ADJUST_PRIVILEGES | + self._win32security_mock.TOKEN_QUERY) @mock.patch('cloudbaseinit.osutils.windows.WindowsUtils' '._enable_shutdown_privilege') def _test_reboot(self, mock_enable_shutdown_privilege, ret_value): - windll.advapi32.InitiateSystemShutdownW = mock.MagicMock( + advapi32 = self._windll_mock.advapi32 + advapi32.InitiateSystemShutdownW = mock.MagicMock( return_value=ret_value) if not ret_value: @@ -86,7 +108,7 @@ class WindowsUtilsTest(unittest.TestCase): else: self._winutils.reboot() - windll.advapi32.InitiateSystemShutdownW.assert_called_with( + advapi32.InitiateSystemShutdownW.assert_called_with( 0, "Cloudbase-Init reboot", 0, True, True) @@ -97,49 +119,48 @@ class WindowsUtilsTest(unittest.TestCase): def test_reboot_failed(self): self._test_reboot(ret_value=None) - @mock.patch('wmi.WMI') @mock.patch('cloudbaseinit.osutils.windows.WindowsUtils' '._sanitize_wmi_input') - def _test_get_user_wmi_object(self, mock_sanitize_wmi_input, mock_WMI, - returnvalue): - mock_WMI.return_value = self._conn + def _test_get_user_wmi_object(self, mock_sanitize_wmi_input, return_value): + conn = self._wmi_mock.WMI mock_sanitize_wmi_input.return_value = self._USERNAME - self._conn.query.return_value = returnvalue + conn.return_value.query.return_value = return_value + response = self._winutils._get_user_wmi_object(self._USERNAME) - self._conn.query.assert_called_with("SELECT * FROM Win32_Account " - "where name = \'%s\'" % - self._USERNAME) + + conn.return_value.query.assert_called_with( + "SELECT * FROM Win32_Account where name = \'%s\'" % self._USERNAME) mock_sanitize_wmi_input.assert_called_with(self._USERNAME) - mock_WMI.assert_called_with(moniker='//./root/cimv2') - if returnvalue: + conn.assert_called_with(moniker='//./root/cimv2') + if return_value: self.assertTrue(response is not None) else: self.assertTrue(response is None) def test_get_user_wmi_object(self): caption = 'fake' - self._test_get_user_wmi_object(returnvalue=caption) + self._test_get_user_wmi_object(return_value=caption) def test_no_user_wmi_object(self): empty_caption = '' - self._test_get_user_wmi_object(returnvalue=empty_caption) + self._test_get_user_wmi_object(return_value=empty_caption) @mock.patch('cloudbaseinit.osutils.windows.WindowsUtils' '._get_user_wmi_object') - def _test_user_exists(self, mock_get_user_wmi_object, returnvalue): - mock_get_user_wmi_object.return_value = returnvalue - response = self._winutils.user_exists(returnvalue) - mock_get_user_wmi_object.assert_called_with(returnvalue) - if returnvalue: + def _test_user_exists(self, mock_get_user_wmi_object, return_value): + mock_get_user_wmi_object.return_value = return_value + response = self._winutils.user_exists(return_value) + mock_get_user_wmi_object.assert_called_with(return_value) + if return_value: self.assertTrue(response) else: self.assertFalse(response) def test_user_exists(self): - self._test_user_exists(returnvalue=self._USERNAME) + self._test_user_exists(return_value=self._USERNAME) def test_username_does_not_exist(self): - self._test_user_exists(returnvalue=None) + self._test_user_exists(return_value=None) def test_sanitize_wmi_input(self): unsanitised = ' \' ' @@ -163,7 +184,9 @@ class WindowsUtilsTest(unittest.TestCase): args = ['NET', 'USER', self._USERNAME, self._PASSWORD] if create: args.append('/ADD') + mock_execute_process.return_value = (None, None, ret_value) + if not ret_value: self._winutils._create_or_change_user(self._USERNAME, self._PASSWORD, create, @@ -174,6 +197,7 @@ class WindowsUtilsTest(unittest.TestCase): self.assertRaises( Exception, self._winutils._create_or_change_user, self._USERNAME, self._PASSWORD, create, password_expires) + mock_execute_process.assert_called_with(args) def test_create_user_and_add_password_expire_true(self): @@ -209,8 +233,10 @@ class WindowsUtilsTest(unittest.TestCase): def _test_set_user_password_expiration(self, mock_get_user_wmi_object, fake_obj): mock_get_user_wmi_object.return_value = fake_obj + response = self._winutils._set_user_password_expiration( self._USERNAME, True) + if fake_obj: self.assertTrue(fake_obj.PasswordExpires) self.assertTrue(response) @@ -231,16 +257,14 @@ class WindowsUtilsTest(unittest.TestCase): cchReferencedDomainName = mock.Mock() domainName = mock.Mock() sidNameUse = mock.Mock() + advapi32 = self._windll_mock.advapi32 - ctypes.create_string_buffer = mock.MagicMock(return_value=sid) - ctypes.sizeof = mock.MagicMock(return_value=size) - wintypes.DWORD = mock.MagicMock(return_value=cchReferencedDomainName) - ctypes.create_unicode_buffer = mock.MagicMock(return_value=domainName) + self._ctypes_mock.create_string_buffer.return_value = sid + self._ctypes_mock.sizeof.return_value = size + self._wintypes_mock.DWORD.return_value = cchReferencedDomainName + self._ctypes_mock.create_unicode_buffer.return_value = domainName - ctypes.byref = mock.MagicMock() - - windll.advapi32.LookupAccountNameW = mock.MagicMock( - return_value=ret_val) + advapi32.LookupAccountNameW.return_value = ret_val if ret_val is None: self.assertRaises( Exception, self._winutils._get_user_sid_and_domain, @@ -248,10 +272,11 @@ class WindowsUtilsTest(unittest.TestCase): else: response = self._winutils._get_user_sid_and_domain(self._USERNAME) - windll.advapi32.LookupAccountNameW.assert_called_with( - 0, s.unicode(self._USERNAME), sid, ctypes.byref(cbSid), - domainName, ctypes.byref(cchReferencedDomainName), - ctypes.byref(sidNameUse)) + advapi32.LookupAccountNameW.assert_called_with( + 0, s.unicode(self._USERNAME), sid, + self._ctypes_mock.byref(cbSid), domainName, + self._ctypes_mock.byref(cchReferencedDomainName), + self._ctypes_mock.byref(sidNameUse)) self.assertEqual(response, (sid, domainName.value)) def test_get_user_sid_and_domain(self): @@ -261,24 +286,32 @@ class WindowsUtilsTest(unittest.TestCase): def test_get_user_sid_and_domain_no_return_value(self): self._test_get_user_sid_and_domain(ret_val=None) - def _test_add_user_to_local_group(self, ret_value): - windows_utils.Win32_LOCALGROUP_MEMBERS_INFO_3 = mock.MagicMock() - lmi = windows_utils.Win32_LOCALGROUP_MEMBERS_INFO_3() + @mock.patch('cloudbaseinit.osutils.windows' + '.Win32_LOCALGROUP_MEMBERS_INFO_3') + def _test_add_user_to_local_group(self, + mock_Win32_LOCALGROUP_MEMBERS_INFO_3, + ret_value): + lmi = mock_Win32_LOCALGROUP_MEMBERS_INFO_3() group_name = 'Admins' + netapi32 = self._windll_mock.netapi32 - windll.netapi32.NetLocalGroupAddMembers = mock.MagicMock( - return_value=ret_value) + netapi32.NetLocalGroupAddMembers.return_value = ret_value - if ret_value is not 0: + is_in_alias = ret_value != self._winutils.ERROR_MEMBER_IN_ALIAS + + if ret_value is not 0 and is_in_alias: self.assertRaises( Exception, self._winutils.add_user_to_local_group, self._USERNAME, group_name) else: - ctypes.addressof = mock.MagicMock() self._winutils.add_user_to_local_group(self._USERNAME, group_name) - windll.netapi32.NetLocalGroupAddMembers.assert_called_with( - 0, s.unicode(group_name), 3, ctypes.addressof(lmi), 1) + + netapi32.NetLocalGroupAddMembers.assert_called_with( + 0, s.unicode(group_name), 3, + self._ctypes_mock.addressof.return_value, 1) + + self._ctypes_mock.addressof.assert_called_once_with(lmi) self.assertEqual(lmi.lgrmi3_domainandname, s.unicode(self._USERNAME)) @@ -311,11 +344,15 @@ class WindowsUtilsTest(unittest.TestCase): r = mock.Mock() if not fail: mock_get_user_wmi_object.return_value = None + response = self._winutils.get_user_sid(self._USERNAME) + self.assertTrue(response is None) else: mock_get_user_wmi_object.return_value = r + response = self._winutils.get_user_sid(self._USERNAME) + self.assertTrue(response is not None) mock_get_user_wmi_object.assert_called_with(self._USERNAME) @@ -325,12 +362,17 @@ class WindowsUtilsTest(unittest.TestCase): def test_get_user_sid_fail(self): self._test_get_user_sid(fail=True) - def _test_create_user_logon_session(self, logon, loaduser, + @mock.patch('cloudbaseinit.osutils.windows.Win32_PROFILEINFO') + def _test_create_user_logon_session(self, mock_Win32_PROFILEINFO, logon, + loaduser, load_profile=True): - wintypes.HANDLE = mock.MagicMock() - pi = windows_utils.Win32_PROFILEINFO() - windll.advapi32.LogonUserW = mock.MagicMock(return_value=logon) - ctypes.byref = mock.MagicMock() + self._wintypes_mock.HANDLE = mock.MagicMock() + pi = self.windows_utils.Win32_PROFILEINFO() + advapi32 = self._windll_mock.advapi32 + userenv = self._windll_mock.userenv + kernel32 = self._windll_mock.kernel32 + + advapi32.LogonUserW.return_value = logon if not logon: self.assertRaises( @@ -339,17 +381,21 @@ class WindowsUtilsTest(unittest.TestCase): load_profile=load_profile) elif load_profile and not loaduser: - windll.userenv.LoadUserProfileW = mock.MagicMock( - return_value=None) - windll.kernel32.CloseHandle = mock.MagicMock(return_value=None) + userenv.LoadUserProfileW.return_value = None + kernel32.CloseHandle.return_value = None + self.assertRaises(Exception, self._winutils.create_user_logon_session, self._USERNAME, self._PASSWORD, domain='.', load_profile=load_profile) - windll.userenv.LoadUserProfileW.assert_called_with( - wintypes.HANDLE(), ctypes.byref(pi)) - windll.kernel32.CloseHandle.assert_called_with(wintypes.HANDLE()) + userenv.LoadUserProfileW.assert_called_with( + self._wintypes_mock.HANDLE.return_value, + self._ctypes_mock.byref.return_value) + self._ctypes_mock.byref.assert_called_with(pi) + + kernel32.CloseHandle.assert_called_with( + self._wintypes_mock.HANDLE.return_value) elif not load_profile: response = self._winutils.create_user_logon_session( @@ -358,17 +404,17 @@ class WindowsUtilsTest(unittest.TestCase): self.assertTrue(response is not None) else: size = 1024 - windll.userenv.LoadUserProfileW = mock.MagicMock() - ctypes.sizeof = mock.MagicMock(return_value=size) - windows_utils.Win32_PROFILEINFO = mock.MagicMock( - return_value=loaduser) + self._ctypes_mock.sizeof.return_value = size + + mock_Win32_PROFILEINFO.return_value = loaduser response = self._winutils.create_user_logon_session( self._USERNAME, self._PASSWORD, domain='.', load_profile=load_profile) - windll.userenv.LoadUserProfileW.assert_called_with( - wintypes.HANDLE(), ctypes.byref(pi)) + userenv.LoadUserProfileW.assert_called_with( + self._wintypes_mock.HANDLE.return_value, + self._ctypes_mock.byref.return_value) self.assertTrue(response is not None) def test_create_user_logon_session_fail_load_false(self): @@ -397,18 +443,22 @@ class WindowsUtilsTest(unittest.TestCase): def test_close_user_logon_session(self): token = mock.Mock() - windll.kernel32.CloseHandle = mock.MagicMock() + self._windll_mock.kernel32.CloseHandle = mock.MagicMock() + self._winutils.close_user_logon_session(token) - windll.kernel32.CloseHandle.assert_called_with(token) + + self._windll_mock.kernel32.CloseHandle.assert_called_with(token) @mock.patch('ctypes.windll.kernel32.SetComputerNameExW') def _test_set_host_name(self, mock_SetComputerNameExW, ret_value): mock_SetComputerNameExW.return_value = ret_value + if not ret_value: self.assertRaises(Exception, self._winutils.set_host_name, 'fake name') else: self._winutils.set_host_name('fake name') + mock_SetComputerNameExW.assert_called_with( self._winutils.ComputerNamePhysicalDnsHostname, s.unicode('fake name')) @@ -421,23 +471,21 @@ class WindowsUtilsTest(unittest.TestCase): @mock.patch('cloudbaseinit.osutils.windows.WindowsUtils' '.get_user_sid') - @mock.patch('six.moves.winreg.OpenKey') - @mock.patch('six.moves.winreg.QueryValueEx') - @mock.patch('six.moves.winreg.HKEY_LOCAL_MACHINE') - def _test_get_user_home(self, user_sid, mock_hklm, mock_query_value_ex, - mock_open_key, mock_get_user_sid): + def _test_get_user_home(self, user_sid, mock_get_user_sid): mock_get_user_sid.return_value = user_sid response = self._winutils.get_user_home(self._USERNAME) if user_sid: mock_get_user_sid.assert_called_with(self._USERNAME) - mock_open_key.assert_called_with( - mock_hklm, 'SOFTWARE\\Microsoft\\Windows ' - 'NT\\CurrentVersion\\ProfileList\\%s' % mock_get_user_sid()) + self._winreg_mock.OpenKey.assert_called_with( + self._winreg_mock.HKEY_LOCAL_MACHINE, + 'SOFTWARE\\Microsoft\\Windows NT\\CurrentVersion\\' + 'ProfileList\\%s' % mock_get_user_sid()) self.assertTrue(response is not None) - mock_query_value_ex.assert_called_with( - mock_open_key().__enter__(), 'ProfileImagePath') + self._winreg_mock.QueryValueEx.assert_called_with( + self._winreg_mock.OpenKey.return_value.__enter__.return_value, + 'ProfileImagePath') else: self.assertTrue(response is None) @@ -450,12 +498,10 @@ class WindowsUtilsTest(unittest.TestCase): @mock.patch('cloudbaseinit.osutils.windows.WindowsUtils' '.check_os_version') - @mock.patch('wmi.WMI') - def _test_get_network_adapters(self, is_xp_2003, mock_WMI, - mock_check_os_version): - mock_WMI.return_value = self._conn + def _test_get_network_adapters(self, is_xp_2003, mock_check_os_version): + conn = self._wmi_mock.WMI mock_response = mock.MagicMock() - self._conn.query.return_value = [mock_response] + conn.return_value.query.return_value = [mock_response] mock_check_os_version.return_value = not is_xp_2003 @@ -466,7 +512,7 @@ class WindowsUtilsTest(unittest.TestCase): wql += ' AND PhysicalAdapter = True' response = self._winutils.get_network_adapters() - self._conn.query.assert_called_with(wql) + conn.return_value.query.assert_called_with(wql) self.assertEqual(response, [mock_response.Name]) def test_get_network_adapters(self): @@ -477,12 +523,10 @@ class WindowsUtilsTest(unittest.TestCase): @mock.patch('cloudbaseinit.osutils.windows.WindowsUtils' '._sanitize_wmi_input') - @mock.patch('wmi.WMI') - def _test_set_static_network_config(self, mock_wmi, - mock_sanitize_wmi_input, + def _test_set_static_network_config(self, mock_sanitize_wmi_input, adapter, ret_val1=None, ret_val2=None, ret_val3=None): - mock_wmi.return_value = self._conn + conn = self._wmi_mock.WMI address = '10.10.10.10' adapter_name = 'adapter_name' broadcast = '0.0.0.0' @@ -495,13 +539,13 @@ class WindowsUtilsTest(unittest.TestCase): broadcast, self._GATEWAY, dns_list) else: mock_sanitize_wmi_input.return_value = adapter_name - self._conn.query.return_value = adapter + conn.return_value.query.return_value = adapter adapter_config = adapter[0].associators()[0] - adapter_config.EnableStatic = mock.MagicMock(return_value=ret_val1) - adapter_config.SetGateways = mock.MagicMock(return_value=ret_val2) - adapter_config.SetDNSServerSearchOrder = mock.MagicMock( - return_value=ret_val3) - adapter.__len__ = mock.MagicMock(return_value=1) + adapter_config.EnableStatic.return_value = ret_val1 + adapter_config.SetGateways.return_value = ret_val2 + adapter_config.SetDNSServerSearchOrder.return_value = ret_val3 + adapter.__len__.return_value = 1 + if ret_val1[0] > 1: self.assertRaises( Exception, self._winutils.set_static_network_config, @@ -524,6 +568,7 @@ class WindowsUtilsTest(unittest.TestCase): response = self._winutils.set_static_network_config( adapter_name, address, self._NETMASK, broadcast, self._GATEWAY, dns_list) + if ret_val1[0] or ret_val2[0] or ret_val3[0] == 1: self.assertTrue(response) else: @@ -539,7 +584,7 @@ class WindowsUtilsTest(unittest.TestCase): adapter_name) adapter[0].associators.assert_called_with( wmi_result_class='Win32_NetworkAdapterConfiguration') - self._conn.query.assert_called_with( + conn.return_value.query.assert_called_with( 'SELECT * FROM Win32_NetworkAdapter WHERE MACAddress IS ' 'NOT NULL AND Name = \'%(adapter_name_san)s\'' % {'adapter_name_san': adapter_name}) @@ -607,15 +652,7 @@ class WindowsUtilsTest(unittest.TestCase): @mock.patch('cloudbaseinit.osutils.windows.WindowsUtils' '._get_config_key_name') - @mock.patch('six.moves.winreg.CreateKey') - @mock.patch('six.moves.winreg.SetValueEx') - @mock.patch('six.moves.winreg.REG_DWORD') - @mock.patch('six.moves.winreg.REG_SZ') - @mock.patch('six.moves.winreg.HKEY_LOCAL_MACHINE') - def _test_set_config_value(self, value, mock_hklm, mock_reg_sz, - mock_reg_dword, mock_set_value_ex, - mock_create_key, mock_get_config_key_name): - key = mock.MagicMock() + def _test_set_config_value(self, value, mock_get_config_key_name): key_name = (self._winutils._config_key + self._SECTION + '\\' + self._CONFIG_NAME) mock_get_config_key_name.return_value = key_name @@ -623,23 +660,19 @@ class WindowsUtilsTest(unittest.TestCase): self._winutils.set_config_value(self._CONFIG_NAME, value, self._SECTION) - mock_create_key.__enter__.return_value = key - with mock_create_key as m: - assert m == key + key = self._winreg_mock.CreateKey.return_value.__enter__.return_value - mock_create_key.__enter__.assert_called_with() - mock_create_key.__exit__.assert_called_with(None, None, None) - mock_create_key.assert_called_with(mock_hklm, - key_name) + self._winreg_mock.CreateKey.assert_called_with( + self._winreg_mock.HKEY_LOCAL_MACHINE, key_name) mock_get_config_key_name.assert_called_with(self._SECTION) + if type(value) == int: - mock_set_value_ex.assert_called_with( - mock_create_key().__enter__(), self._CONFIG_NAME, 0, - mock_reg_dword, value) + self._winreg_mock.SetValueEx.assert_called_with( + key, self._CONFIG_NAME, 0, self._winreg_mock.REG_DWORD, value) + else: - mock_set_value_ex.assert_called_with( - mock_create_key().__enter__(), self._CONFIG_NAME, 0, - mock_reg_sz, value) + self._winreg_mock.SetValueEx.assert_called_with( + key, self._CONFIG_NAME, 0, self._winreg_mock.REG_SZ, value) def test_set_config_value_int(self): self._test_set_config_value(1) @@ -649,36 +682,35 @@ class WindowsUtilsTest(unittest.TestCase): @mock.patch('cloudbaseinit.osutils.windows.WindowsUtils' '._get_config_key_name') - @mock.patch('six.moves.winreg.OpenKey') - @mock.patch('six.moves.winreg.QueryValueEx') - @mock.patch('six.moves.winreg.REG_DWORD') - @mock.patch('six.moves.winreg.REG_SZ') - @mock.patch('six.moves.winreg.HKEY_LOCAL_MACHINE') - def _test_get_config_value(self, value, mock_hklm, mock_reg_sz, - mock_reg_dword, mock_query_value_ex, - mock_open_key, mock_get_config_key_name): + def _test_get_config_value(self, value, mock_get_config_key_name): key_name = self._winutils._config_key + self._SECTION + '\\' key_name += self._CONFIG_NAME if type(value) == int: - regtype = mock_reg_dword + regtype = self._winreg_mock.REG_DWORD else: - regtype = mock_reg_sz + regtype = self._winreg_mock.REG_SZ - mock_query_value_ex.return_value = (value, regtype) + self._winreg_mock.QueryValueEx.return_value = (value, regtype) if value is None: - mock_get_config_key_name.side_effect = [WindowsError] - self.assertRaises(WindowsError, self._winutils.get_config_value, - self._CONFIG_NAME, None) - else: - mock_get_config_key_name.return_value = key_name + mock_get_config_key_name.side_effect = [ + self.windows_utils.WindowsError] response = self._winutils.get_config_value(self._CONFIG_NAME, self._SECTION) - mock_open_key.assert_called_with(mock_hklm, key_name) + self.assertEqual(response, None) + + else: + mock_get_config_key_name.return_value = key_name + + response = self._winutils.get_config_value(self._CONFIG_NAME, + self._SECTION) + + self._winreg_mock.OpenKey.assert_called_with( + self._winreg_mock.HKEY_LOCAL_MACHINE, key_name) mock_get_config_key_name.assert_called_with(self._SECTION) - mock_query_value_ex.assert_called_with( - mock_open_key().__enter__(), self._CONFIG_NAME) + self._winreg_mock.QueryValueEx.assert_called_with( + self._winreg_mock.OpenKey().__enter__(), self._CONFIG_NAME) self.assertEqual(response, value) def test_get_config_value_type_int(self): @@ -690,45 +722,42 @@ class WindowsUtilsTest(unittest.TestCase): def test_get_config_value_type_error(self): self._test_get_config_value(None) - @mock.patch('six.moves.winreg.OpenKey') - @mock.patch('six.moves.winreg.QueryValueEx') - @mock.patch('six.moves.winreg.KEY_READ') - @mock.patch('six.moves.winreg.HKEY_LOCAL_MACHINE') @mock.patch('time.sleep') - def _test_wait_for_boot_completion(self, ret_val, mock_sleep, mock_hklm, - mock_key_read, mock_query_value_ex, - mock_open_key): - key = mock.MagicMock() - mock_query_value_ex.side_effect = ret_val + def _test_wait_for_boot_completion(self, ret_val, mock_sleep): + self._winreg_mock.QueryValueEx.side_effect = [ret_val] self._winutils.wait_for_boot_completion() - mock_open_key.__enter__.return_value = key - mock_open_key.assert_called_with( - mock_hklm, "SYSTEM\\Setup\\Status\\SysprepStatus", 0, - mock_key_read) + key = self._winreg_mock.OpenKey.return_value.__enter__.return_value + self._winreg_mock.OpenKey.assert_called_with( + self._winreg_mock.HKEY_LOCAL_MACHINE, + "SYSTEM\\Setup\\Status\\SysprepStatus", 0, + self._winreg_mock.KEY_READ) - mock_query_value_ex.assert_called_with( - mock_open_key().__enter__(), "GeneralizationState") + self._winreg_mock.QueryValueEx.assert_called_with( + key, "GeneralizationState") def test_wait_for_boot_completion(self): - ret_val = [[7]] + ret_val = [7] self._test_wait_for_boot_completion(ret_val) - @mock.patch('wmi.WMI') - def test_get_service(self, mock_WMI): - mock_WMI.return_value = self._conn - self._conn.Win32_Service.return_value = ['fake name'] + def test_get_service(self): + conn = self._wmi_mock.WMI + conn.return_value.Win32_Service.return_value = ['fake name'] + response = self._winutils._get_service('fake name') - mock_WMI.assert_called_with(moniker='//./root/cimv2') - self._conn.Win32_Service.assert_called_with(Name='fake name') + + conn.assert_called_with(moniker='//./root/cimv2') + conn.return_value.Win32_Service.assert_called_with(Name='fake name') self.assertEqual(response, 'fake name') @mock.patch('cloudbaseinit.osutils.windows.WindowsUtils' '._get_service') def test_check_service_exists(self, mock_get_service): mock_get_service.return_value = 'not None' + response = self._winutils.check_service_exists('fake name') + self.assertEqual(response, True) @mock.patch('cloudbaseinit.osutils.windows.WindowsUtils' @@ -736,7 +765,9 @@ class WindowsUtilsTest(unittest.TestCase): def test_get_service_status(self, mock_get_service): mock_service = mock.MagicMock() mock_get_service.return_value = mock_service + response = self._winutils.get_service_status('fake name') + self.assertEqual(response, mock_service.State) @mock.patch('cloudbaseinit.osutils.windows.WindowsUtils' @@ -744,7 +775,9 @@ class WindowsUtilsTest(unittest.TestCase): def test_get_service_start_mode(self, mock_get_service): mock_service = mock.MagicMock() mock_get_service.return_value = mock_service + response = self._winutils.get_service_start_mode('fake name') + self.assertEqual(response, mock_service.StartMode) @mock.patch('cloudbaseinit.osutils.windows.WindowsUtils' @@ -753,12 +786,14 @@ class WindowsUtilsTest(unittest.TestCase): mock_service = mock.MagicMock() mock_get_service.return_value = mock_service mock_service.ChangeStartMode.return_value = (ret_val,) + if ret_val != 0: self.assertRaises(Exception, self._winutils.set_service_start_mode, 'fake name', 'fake mode') else: self._winutils.set_service_start_mode('fake name', 'fake mode') + mock_service.ChangeStartMode.assert_called_once_with('fake mode') def test_set_service_start_mode(self): @@ -773,12 +808,14 @@ class WindowsUtilsTest(unittest.TestCase): mock_service = mock.MagicMock() mock_get_service.return_value = mock_service mock_service.StartService.return_value = (ret_val,) + if ret_val != 0: self.assertRaises(Exception, self._winutils.start_service, 'fake name') else: self._winutils.start_service('fake name') + mock_service.StartService.assert_called_once_with() def test_start_service(self): @@ -793,12 +830,14 @@ class WindowsUtilsTest(unittest.TestCase): mock_service = mock.MagicMock() mock_get_service.return_value = mock_service mock_service.StopService.return_value = (ret_val,) + if ret_val != 0: self.assertRaises(Exception, self._winutils.stop_service, 'fake name') else: self._winutils.stop_service('fake name') + mock_service.StopService.assert_called_once_with() def test_stop_service(self): @@ -840,7 +879,9 @@ class WindowsUtilsTest(unittest.TestCase): def _test_check_static_route_exists(self, mock_get_ipv4_routing_table, routing_table): mock_get_ipv4_routing_table.return_value = [routing_table] + response = self._winutils.check_static_route_exists(self._DESTINATION) + mock_get_ipv4_routing_table.assert_called_once_with() if routing_table[0] == self._DESTINATION: self.assertTrue(response) @@ -856,8 +897,7 @@ class WindowsUtilsTest(unittest.TestCase): routing_table = ['0.0.0.0', '1.1.1.1', self._GATEWAY, '8.8.8.8'] self._test_check_static_route_exists(routing_table=routing_table) - @mock.patch('cloudbaseinit.osutils.windows.WindowsUtils' - '.execute_process') + @mock.patch('cloudbaseinit.osutils.windows.WindowsUtils.execute_process') def _test_add_static_route(self, mock_execute_process, err): next_hop = '10.10.10.10' interface_index = 1 @@ -865,10 +905,12 @@ class WindowsUtilsTest(unittest.TestCase): args = ['ROUTE', 'ADD', self._DESTINATION, 'MASK', self._NETMASK, next_hop] mock_execute_process.return_value = (None, err, None) + if err: self.assertRaises(Exception, self._winutils.add_static_route, self._DESTINATION, self._NETMASK, next_hop, interface_index, metric) + else: self._winutils.add_static_route(self._DESTINATION, self._NETMASK, next_hop, interface_index, metric) @@ -880,33 +922,30 @@ class WindowsUtilsTest(unittest.TestCase): def test_add_static_route_fail(self): self._test_add_static_route(err=None) - @mock.patch('ctypes.sizeof') - @mock.patch('ctypes.byref') - @mock.patch('ctypes.windll.kernel32.VerSetConditionMask') - @mock.patch('ctypes.windll.kernel32.VerifyVersionInfoW') - @mock.patch('ctypes.windll.kernel32.GetLastError') - def _test_check_os_version(self, mock_GetLastError, - mock_VerifyVersionInfoW, - mock_VerSetConditionMask, mock_byref, - mock_sizeof, ret_value, error_value=None): - mock_VerSetConditionMask.return_value = 2 - mock_VerifyVersionInfoW.return_value = ret_value - mock_GetLastError.return_value = error_value + def _test_check_os_version(self, ret_value, error_value=None): + self._windll_mock.kernel32.VerSetConditionMask.return_value = 2 + self._windll_mock.kernel32.VerifyVersionInfoW.return_value = ret_value + self._windll_mock.kernel32.GetLastError.return_value = error_value + old_version = self._winutils.ERROR_OLD_WIN_VERSION + if error_value and error_value is not old_version: self.assertRaises(Exception, self._winutils.check_os_version, 3, 1, 2) - mock_GetLastError.assert_called_once_with() + self._windll_mock.kernel32.GetLastError.assert_called_once_with() + else: response = self._winutils.check_os_version(3, 1, 2) - mock_sizeof.assert_called_once_with( - windows_utils.Win32_OSVERSIONINFOEX_W) - self.assertEqual(mock_VerSetConditionMask.call_count, 3) - mock_VerifyVersionInfoW.assert_called_with(mock_byref(), - 1 | 2 | 3 | 7, - 2) + + self._ctypes_mock.sizeof.assert_called_once_with( + self.windows_utils.Win32_OSVERSIONINFOEX_W) + self.assertEqual( + self._windll_mock.kernel32.VerSetConditionMask.call_count, 3) + self._windll_mock.kernel32.VerifyVersionInfoW.assert_called_with( + self._ctypes_mock.byref(), 1 | 2 | 3 | 7, 2) + if error_value is old_version: - mock_GetLastError.assert_called_with() + self._windll_mock.kernel32.GetLastError.assert_called_with() self.assertEqual(response, False) else: self.assertEqual(response, True) @@ -926,17 +965,19 @@ class WindowsUtilsTest(unittest.TestCase): label = mock.MagicMock() max_label_size = 261 drive = 'Fake_drive' - ctypes.create_unicode_buffer = mock.MagicMock(return_value=label) - ctypes.windll.kernel32.GetVolumeInformationW = mock.MagicMock( - return_value=ret_val) + self._ctypes_mock.create_unicode_buffer.return_value = label + self._windll_mock.kernel32.GetVolumeInformationW.return_value = ret_val + response = self._winutils.get_volume_label(drive) + if ret_val: self.assertTrue(response is not None) else: self.assertTrue(response is None) - ctypes.create_unicode_buffer.assert_called_with(max_label_size) - ctypes.windll.kernel32.GetVolumeInformationW.assert_called_with( + self._ctypes_mock.create_unicode_buffer.assert_called_with( + max_label_size) + self._windll_mock.kernel32.GetVolumeInformationW.assert_called_with( drive, label, max_label_size, 0, 0, 0, 0, 0) def test_get_volume_label(self): @@ -953,24 +994,28 @@ class WindowsUtilsTest(unittest.TestCase): length = 14 mock_search.return_value = True mock_generate_random_password.return_value = 'Passw0rd' + response = self._winutils.generate_random_password(length) + mock_generate_random_password.assert_called_once_with(length) self.assertEqual(response, 'Passw0rd') - @mock.patch('ctypes.create_unicode_buffer') - @mock.patch('ctypes.windll.kernel32.GetLogicalDriveStringsW') - def _test_get_logical_drives(self, mock_GetLogicalDriveStringsW, - mock_create_unicode_buffer, buf_length): + def _test_get_logical_drives(self, buf_length): mock_buf = mock.MagicMock() mock_buf.__getitem__.side_effect = ['1', '\x00'] - mock_create_unicode_buffer.return_value = mock_buf - mock_GetLogicalDriveStringsW.return_value = buf_length + mock_get_drives = self._windll_mock.kernel32.GetLogicalDriveStringsW + + self._ctypes_mock.create_unicode_buffer.return_value = mock_buf + mock_get_drives.return_value = buf_length + if buf_length is None: self.assertRaises(Exception, self._winutils._get_logical_drives) + else: response = self._winutils._get_logical_drives() - mock_create_unicode_buffer.assert_called_with(261) - mock_GetLogicalDriveStringsW.assert_called_with(260, mock_buf) + + self._ctypes_mock.create_unicode_buffer.assert_called_with(261) + mock_get_drives.assert_called_with(260, mock_buf) self.assertEqual(response, ['1']) def test_get_logical_drives_exception(self): @@ -985,7 +1030,9 @@ class WindowsUtilsTest(unittest.TestCase): def test_get_cdrom_drives(self, mock_kernel32, mock_get_logical_drives): mock_get_logical_drives.return_value = ['drive'] mock_kernel32.GetDriveTypeW.return_value = self._winutils.DRIVE_CDROM + response = self._winutils.get_cdrom_drives() + mock_get_logical_drives.assert_called_with() self.assertEqual(response, ['drive']) @@ -993,28 +1040,25 @@ class WindowsUtilsTest(unittest.TestCase): @mock.patch('cloudbaseinit.osutils.windows.kernel32') @mock.patch('cloudbaseinit.osutils.windows.setupapi') @mock.patch('cloudbaseinit.osutils.windows.Win32_STORAGE_DEVICE_NUMBER') - @mock.patch('ctypes.byref') - @mock.patch('ctypes.sizeof') - @mock.patch('ctypes.wintypes.DWORD') - @mock.patch('ctypes.cast') - @mock.patch('ctypes.POINTER') - def _test_get_physical_disks(self, mock_POINTER, mock_cast, - mock_DWORD, mock_sizeof, mock_byref, - mock_sdn, mock_setupapi, mock_kernel32, + def _test_get_physical_disks(self, mock_sdn, mock_setupapi, mock_kernel32, mock_msvcrt, handle_disks, last_error, interface_detail, disk_handle, io_control): sizeof_calls = [mock.call( - windows_utils.Win32_SP_DEVICE_INTERFACE_DATA), + self.windows_utils.Win32_SP_DEVICE_INTERFACE_DATA), mock.call(mock_sdn())] - device_interfaces_calls = [mock.call(handle_disks, None, mock_byref(), - 0, mock_byref()), - mock.call(handle_disks, None, mock_byref(), - 1, mock_byref())] - cast_calls = [mock.call(), - mock.call(mock_msvcrt.malloc(), mock_POINTER()), - mock.call(mock_cast().contents.DevicePath, - wintypes.LPWSTR)] + device_interfaces_calls = [ + mock.call( + handle_disks, None, self._ctypes_mock.byref.return_value, 0, + self._ctypes_mock.byref.return_value), + mock.call(handle_disks, None, + self._ctypes_mock.byref.return_value, 1, + self._ctypes_mock.byref.return_value)] + device_path = self._ctypes_mock.cast.return_value.contents.DevicePath + cast_calls = [mock.call(mock_msvcrt.malloc(), + self._ctypes_mock.POINTER.return_value), + mock.call(device_path, + self._wintypes_mock.LPWSTR)] mock_setup_interface = mock_setupapi.SetupDiGetDeviceInterfaceDetailW @@ -1036,36 +1080,50 @@ class WindowsUtilsTest(unittest.TestCase): else: response = self._winutils.get_physical_disks() - self.assertEqual(mock_sizeof.call_args_list, sizeof_calls) + + self.assertEqual(self._ctypes_mock.sizeof.call_args_list, + sizeof_calls) + self.assertEqual( mock_setupapi.SetupDiEnumDeviceInterfaces.call_args_list, device_interfaces_calls) if not interface_detail: mock_kernel32.GetLastError.assert_called_once_with() - mock_POINTER.assert_called_with( - windows_utils.Win32_SP_DEVICE_INTERFACE_DETAIL_DATA_W) - mock_msvcrt.malloc.assert_called_with(mock_DWORD()) + self._ctypes_mock.POINTER.assert_called_with( + self.windows_utils.Win32_SP_DEVICE_INTERFACE_DETAIL_DATA_W) + mock_msvcrt.malloc.assert_called_with( + self._wintypes_mock.DWORD.return_value) - self.assertEqual(mock_cast.call_args_list, cast_calls) + self.assertEqual(self._ctypes_mock.cast.call_args_list, + cast_calls) + + mock_setup_interface.assert_called_with( + handle_disks, self._ctypes_mock.byref.return_value, + self._ctypes_mock.cast.return_value, + self._wintypes_mock.DWORD.return_value, + None, None) - mock_setup_interface.assert_called_with(handle_disks, mock_byref(), - mock_cast(), mock_DWORD(), - None, None) mock_kernel32.CreateFileW.assert_called_with( - mock_cast().value, 0, self._winutils.FILE_SHARE_READ, None, + self._ctypes_mock.cast.return_value.value, 0, + self._winutils.FILE_SHARE_READ, None, self._winutils.OPEN_EXISTING, 0, 0) mock_sdn.assert_called_with() mock_kernel32.DeviceIoControl.assert_called_with( disk_handle, self._winutils.IOCTL_STORAGE_GET_DEVICE_NUMBER, - None, 0, mock_byref(), mock_sizeof(), mock_byref(), None) + None, 0, self._ctypes_mock.byref.return_value, + self._ctypes_mock.sizeof.return_value, + self._ctypes_mock.byref.return_value, None) + self.assertEqual(response, ["\\\\.\PHYSICALDRIVE1"]) + mock_setupapi.SetupDiDestroyDeviceInfoList.assert_called_once_with( handle_disks) mock_setupapi.SetupDiGetClassDevsW.assert_called_once_with( - mock_byref(), None, None, self._winutils.DIGCF_PRESENT | + self._ctypes_mock.byref.return_value, None, None, + self._winutils.DIGCF_PRESENT | self._winutils.DIGCF_DEVICEINTERFACE) def test_get_physical_disks(self): @@ -1110,41 +1168,45 @@ class WindowsUtilsTest(unittest.TestCase): interface_detail='fake interface detail', disk_handle=mock_disk_handle, io_control=True) - @mock.patch('win32com.client.Dispatch') @mock.patch('cloudbaseinit.osutils.windows.WindowsUtils._get_fw_protocol') - def _test_firewall_create_rule(self, mock_get_fw_protocol, mock_Dispatch): - self._winutils.firewall_create_rule( - name='fake name', port=9999, protocol=self._winutils.PROTOCOL_TCP) + def test_firewall_create_rule(self, mock_get_fw_protocol): expected = [mock.call("HNetCfg.FWOpenPort"), mock.call("HNetCfg.FwMgr")] - self.assertEqual(mock_Dispatch.call_args_list, expected) + + self._winutils.firewall_create_rule( + name='fake name', port=9999, protocol=self._winutils.PROTOCOL_TCP) + + self.assertEqual(self._client_mock.Dispatch.call_args_list, expected) mock_get_fw_protocol.assert_called_once_with( self._winutils.PROTOCOL_TCP) - @mock.patch('win32com.client.Dispatch') @mock.patch('cloudbaseinit.osutils.windows.WindowsUtils._get_fw_protocol') - def test_firewall_remove_rule(self, mock_get_fw_protocol, mock_Dispatch): + def test_firewall_remove_rule(self, mock_get_fw_protocol): self._winutils.firewall_remove_rule( name='fake name', port=9999, protocol=self._winutils.PROTOCOL_TCP) - mock_Dispatch.assert_called_once_with("HNetCfg.FwMgr") + + self._client_mock.Dispatch.assert_called_once_with("HNetCfg.FwMgr") mock_get_fw_protocol.assert_called_once_with( self._winutils.PROTOCOL_TCP) - @mock.patch('ctypes.wintypes.BOOL') @mock.patch('cloudbaseinit.osutils.windows.kernel32.IsWow64Process') @mock.patch('cloudbaseinit.osutils.windows.kernel32.GetCurrentProcess') - @mock.patch('ctypes.byref') - def _test_is_wow64(self, mock_byref, mock_GetCurrentProcess, - mock_IsWow64Process, mock_BOOL, ret_val): + def _test_is_wow64(self, mock_GetCurrentProcess, + mock_IsWow64Process, ret_val): mock_IsWow64Process.return_value = ret_val - mock_BOOL().value = ret_val + self._wintypes_mock.BOOL.return_value.value = ret_val + if ret_val is False: self.assertRaises(Exception, self._winutils.is_wow64) + else: response = self._winutils.is_wow64() - mock_byref.assert_called_once_with(mock_BOOL()) + self._ctypes_mock.byref.assert_called_once_with( + self._wintypes_mock.BOOL.return_value) + mock_IsWow64Process.assert_called_once_with( - mock_GetCurrentProcess(), mock_byref()) + mock_GetCurrentProcess(), self._ctypes_mock.byref.return_value) + self.assertEqual(response, True) def test_is_wow64(self): @@ -1156,13 +1218,17 @@ class WindowsUtilsTest(unittest.TestCase): @mock.patch('os.path.expandvars') def test_get_system32_dir(self, mock_expandvars): mock_expandvars.return_value = 'fake_system32' + response = self._winutils.get_system32_dir() + self.assertEqual(response, 'fake_system32') @mock.patch('os.path.expandvars') def test_get_sysnative_dir(self, mock_expandvars): mock_expandvars.return_value = 'fake_sysnative' + response = self._winutils.get_sysnative_dir() + self.assertEqual(response, 'fake_sysnative') @mock.patch('cloudbaseinit.osutils.windows.WindowsUtils' @@ -1172,7 +1238,9 @@ class WindowsUtilsTest(unittest.TestCase): mock_get_sysnative_dir): mock_get_sysnative_dir.return_value = 'fake_sysnative' mock_isdir.return_value = True + response = self._winutils.check_sysnative_dir_exists() + self.assertEqual(response, True) @mock.patch('cloudbaseinit.osutils.windows.WindowsUtils' @@ -1195,12 +1263,15 @@ class WindowsUtilsTest(unittest.TestCase): 'powershell.exe') args = [fake_path, '-ExecutionPolicy', 'RemoteSigned', '-NonInteractive', '-File', 'fake_script_path'] + response = self._winutils.execute_powershell_script( script_path='fake_script_path') + if ret_val is True: mock_get_sysnative_dir.assert_called_once_with() else: mock_get_system32_dir.assert_called_once_with() + mock_execute_process.assert_called_with(args, False) self.assertEqual(response, mock_execute_process()) @@ -1217,7 +1288,9 @@ class WindowsUtilsTest(unittest.TestCase): net_addr["dhcp_server"] = 'fake dhcp server' net_addr["dhcp_enabled"] = True mock_get_adapter_addresses.return_value = [net_addr] + response = self._winutils.get_dhcp_hosts_in_use() + mock_get_adapter_addresses.assert_called() self.assertEqual(response, [('fake mac address', 'fake dhcp server')]) @@ -1234,18 +1307,21 @@ class WindowsUtilsTest(unittest.TestCase): mock_get_sysnative_dir, mock_check_sysnative_dir_exists, sysnative, ret_val): - mock_check_sysnative_dir_exists.return_value = sysnative fake_base_dir = 'fake_dir' + + mock_check_sysnative_dir_exists.return_value = sysnative mock_execute_process.return_value = (None, None, ret_val) w32tm_path = os.path.join(fake_base_dir, "w32tm.exe") mock_get_sysnative_dir.return_value = fake_base_dir mock_get_system32_dir.return_value = fake_base_dir + args = [w32tm_path, '/config', '/manualpeerlist:%s' % 'fake ntp host', '/syncfromflags:manual', '/update'] if ret_val: self.assertRaises(Exception, self._winutils.set_ntp_client_config, args, False) + else: self._winutils.set_ntp_client_config(ntp_host='fake ntp host') @@ -1253,6 +1329,7 @@ class WindowsUtilsTest(unittest.TestCase): mock_get_sysnative_dir.assert_called_once_with() else: mock_get_system32_dir.assert_called_once_with() + mock_execute_process.assert_called_once_with(args, False) def test_set_ntp_client_config_sysnative_true(self): diff --git a/cloudbaseinit/tests/utils/test_log.py b/cloudbaseinit/tests/utils/test_log.py index 05e3c354..d5c03c1b 100644 --- a/cloudbaseinit/tests/utils/test_log.py +++ b/cloudbaseinit/tests/utils/test_log.py @@ -14,51 +14,68 @@ # License for the specific language governing permissions and limitations # under the License. +import importlib import mock import unittest from oslo.config import cfg -from cloudbaseinit.utils import log - CONF = cfg.CONF class SerialPortHandlerTests(unittest.TestCase): - @mock.patch('serial.Serial') - def setUp(self, mock_Serial): + def setUp(self): + self._serial = mock.MagicMock() + self._module_patcher = mock.patch.dict( + 'sys.modules', + {'serial': self._serial}) + + self._module_patcher.start() + + self.log = importlib.import_module("cloudbaseinit.utils.log") + CONF.set_override('logging_serial_port_settings', "COM1,115200,N,8") - self._serial_port_handler = log.SerialPortHandler() + self._serial_port_handler = self.log.SerialPortHandler() self._serial_port_handler._port = mock.MagicMock() - @mock.patch('serial.Serial') - def test_init(self, mock_Serial): - mock_Serial().isOpen.return_value = False - log.SerialPortHandler() + def tearDown(self): + self._module_patcher.stop() + + def test_init(self): + mock_Serial = self._serial.Serial + mock_Serial.return_value.isOpen.return_value = False + + self.log.SerialPortHandler() + mock_Serial.assert_called_with(bytesize=8, baudrate=115200, port='COM1', parity='N') - mock_Serial().isOpen.assert_called_once_with() - mock_Serial().open.assert_called_once_with() + mock_Serial.return_value.isOpen.assert_called_with() + mock_Serial.return_value.open.assert_called_once_with() def test_close(self): self._serial_port_handler._port.isOpen.return_value = True + self._serial_port_handler.close() + self._serial_port_handler._port.isOpen.assert_called_once_with() self._serial_port_handler._port.close.assert_called_once_with() + @mock.patch('cloudbaseinit.openstack.common.log.setup') + @mock.patch('cloudbaseinit.openstack.common.log.getLogger') + @mock.patch('cloudbaseinit.utils.log.SerialPortHandler') + @mock.patch('cloudbaseinit.openstack.common.log.ContextFormatter') + def test_setup(self, mock_ContextFormatter, mock_SerialPortHandler, + mock_getLogger, mock_setup): -@mock.patch('cloudbaseinit.openstack.common.log.setup') -@mock.patch('cloudbaseinit.openstack.common.log.getLogger') -@mock.patch('cloudbaseinit.utils.log.SerialPortHandler') -@mock.patch('cloudbaseinit.openstack.common.log.ContextFormatter') -def test_setup(mock_ContextFormatter, mock_SerialPortHandler, mock_getLogger, - mock_setup): - log.setup(product_name='fake name') - mock_setup.assert_called_once_with('fake name') - mock_getLogger.assert_called_once_with('fake name') - mock_getLogger().logger.addHandler.assert_called_once_with( - mock_SerialPortHandler()) - mock_ContextFormatter.assert_called_once_with( - project='fake name', datefmt=CONF.log_date_format) - mock_SerialPortHandler().setFormatter.assert_called_once_with( - mock_ContextFormatter()) + self.log.setup(product_name='fake name') + + mock_setup.assert_called_once_with('fake name') + mock_getLogger.assert_called_once_with('fake name') + mock_getLogger().logger.addHandler.assert_called_once_with( + mock_SerialPortHandler()) + + mock_ContextFormatter.assert_called_once_with( + project='fake name', datefmt=CONF.log_date_format) + + mock_SerialPortHandler().setFormatter.assert_called_once_with( + mock_ContextFormatter()) diff --git a/cloudbaseinit/tests/utils/windows/test_x509.py b/cloudbaseinit/tests/utils/windows/test_x509.py index 3075ce2d..0b442368 100644 --- a/cloudbaseinit/tests/utils/windows/test_x509.py +++ b/cloudbaseinit/tests/utils/windows/test_x509.py @@ -14,64 +14,68 @@ # License for the specific language governing permissions and limitations # under the License. +import importlib import mock -import sys import unittest -from oslo.config import cfg - from cloudbaseinit.utils import s from cloudbaseinit.utils import x509constants -if sys.platform == 'win32': - from cloudbaseinit.utils.windows import cryptoapi - from cloudbaseinit.utils.windows import x509 -CONF = cfg.CONF - - -@unittest.skipUnless(sys.platform == "win32", "requires Windows") class CryptoAPICertManagerTests(unittest.TestCase): def setUp(self): - self._x509 = x509.CryptoAPICertManager() + self._ctypes = mock.MagicMock() + + self._module_patcher = mock.patch.dict( + 'sys.modules', {'ctypes': self._ctypes}) + + self._module_patcher.start() + + self.x509 = importlib.import_module("cloudbaseinit.utils.windows.x509") + self._x509_manager = self.x509.CryptoAPICertManager() + + def tearDown(self): + self._module_patcher.stop() @mock.patch('cloudbaseinit.utils.windows.x509.free') - @mock.patch('ctypes.c_ubyte') - @mock.patch('ctypes.POINTER') - @mock.patch('ctypes.cast') @mock.patch('cloudbaseinit.utils.windows.x509.malloc') - @mock.patch('ctypes.byref') - @mock.patch('ctypes.wintypes.DWORD') @mock.patch('cloudbaseinit.utils.windows.cryptoapi.' 'CertGetCertificateContextProperty') def _test_get_cert_thumprint(self, mock_CertGetCertificateContextProperty, - mock_DWORD, mock_byref, mock_malloc, - mock_cast, mock_POINTER, mock_c_ubyte, - mock_free, ret_val): + mock_malloc, mock_free, ret_val): + mock_DWORD = self._ctypes.wintypes.DWORD + mock_cast = self._ctypes.cast + mock_POINTER = self._ctypes.POINTER + mock_byref = self._ctypes.byref mock_pointer = mock.MagicMock() fake_cert_context_p = 'fake context' - mock_DWORD().value = 10 + mock_DWORD.return_value.value = 10 mock_CertGetCertificateContextProperty.return_value = ret_val mock_POINTER.return_value = mock_pointer - mock_cast().contents = [16] + mock_cast.return_value.contents = [16] + if not ret_val: - self.assertRaises(cryptoapi.CryptoAPIException, - self._x509._get_cert_thumprint, + self.assertRaises(self.x509.cryptoapi.CryptoAPIException, + self._x509_manager._get_cert_thumprint, fake_cert_context_p) else: expected = [mock.call(fake_cert_context_p, - cryptoapi.CERT_SHA1_HASH_PROP_ID, - None, mock_byref()), + self.x509.cryptoapi.CERT_SHA1_HASH_PROP_ID, + None, mock_byref.return_value), mock.call(fake_cert_context_p, - cryptoapi.CERT_SHA1_HASH_PROP_ID, - mock_malloc(), mock_byref())] - response = self._x509._get_cert_thumprint(fake_cert_context_p) + self.x509.cryptoapi.CERT_SHA1_HASH_PROP_ID, + mock_malloc.return_value, + mock_byref.return_value)] + + response = self._x509_manager._get_cert_thumprint( + fake_cert_context_p) + self.assertEqual( mock_CertGetCertificateContextProperty.call_args_list, expected) - mock_malloc.assert_called_with(mock_DWORD()) + mock_malloc.assert_called_with(mock_DWORD.return_value) mock_cast.assert_called_with(mock_malloc(), mock_pointer) mock_free.assert_called_with(mock_malloc()) self.assertEqual(response, '10') @@ -86,31 +90,35 @@ class CryptoAPICertManagerTests(unittest.TestCase): @mock.patch('cloudbaseinit.utils.windows.cryptoapi.CryptReleaseContext') @mock.patch('cloudbaseinit.utils.windows.cryptoapi.CryptGenKey') @mock.patch('cloudbaseinit.utils.windows.cryptoapi.CryptAcquireContext') - @mock.patch('ctypes.byref') - @mock.patch('ctypes.wintypes.HANDLE') - def _test_generate_key(self, mock_HANDLE, mock_byref, - mock_CryptAcquireContext, mock_CryptGenKey, + def _test_generate_key(self, mock_CryptAcquireContext, mock_CryptGenKey, mock_CryptReleaseContext, mock_CryptDestroyKey, acquired_context, generate_key_ret_val): + + mock_HANDLE = self._ctypes.wintypes.HANDLE + mock_byref = self._ctypes.byref + mock_CryptAcquireContext.return_value = acquired_context mock_CryptGenKey.return_value = generate_key_ret_val + if not acquired_context: - self.assertRaises(cryptoapi.CryptoAPIException, - self._x509._generate_key, + self.assertRaises(self.x509.cryptoapi.CryptoAPIException, + self._x509_manager._generate_key, 'fake container', True) else: - if generate_key_ret_val is None: - self.assertRaises(cryptoapi.CryptoAPIException, - self._x509._generate_key, 'fake container', - True) + if not generate_key_ret_val: + self.assertRaises(self.x509.cryptoapi.CryptoAPIException, + self._x509_manager._generate_key, + 'fake container', True) else: - self._x509._generate_key('fake container', True) + self._x509_manager._generate_key('fake container', True) + mock_CryptAcquireContext.assert_called_with( mock_byref(), 'fake container', None, - cryptoapi.PROV_RSA_FULL, cryptoapi.CRYPT_MACHINE_KEYSET) - mock_CryptGenKey.assert_called_with(mock_HANDLE(), - cryptoapi.AT_SIGNATURE, - 0x08000000, mock_HANDLE()) + self.x509.cryptoapi.PROV_RSA_FULL, + self.x509.cryptoapi.CRYPT_MACHINE_KEYSET) + mock_CryptGenKey.assert_called_with( + mock_HANDLE(), self.x509.cryptoapi.AT_SIGNATURE, + 0x08000000, mock_HANDLE()) mock_CryptDestroyKey.assert_called_once_with( mock_HANDLE()) mock_CryptReleaseContext.assert_called_once_with( @@ -130,16 +138,12 @@ class CryptoAPICertManagerTests(unittest.TestCase): @mock.patch('cloudbaseinit.utils.windows.x509.free') @mock.patch('copy.copy') - @mock.patch('ctypes.byref') @mock.patch('cloudbaseinit.utils.windows.x509.malloc') - @mock.patch('ctypes.POINTER') - @mock.patch('ctypes.cast') @mock.patch('cloudbaseinit.utils.windows.x509.CryptoAPICertManager' '._generate_key') @mock.patch('cloudbaseinit.utils.windows.x509.CryptoAPICertManager' '._get_cert_thumprint') @mock.patch('uuid.uuid4') - @mock.patch('ctypes.wintypes.DWORD') @mock.patch('cloudbaseinit.utils.windows.cryptoapi.' 'CertStrToName') @mock.patch('cloudbaseinit.utils.windows.cryptoapi.' @@ -174,13 +178,16 @@ class CryptoAPICertManagerTests(unittest.TestCase): mock_CRYPT_ALGORITHM_IDENTIFIER, mock_CRYPT_KEY_PROV_INFO, mock_CRYPTOAPI_BLOB, - mock_CertStrToName, mock_DWORD, + mock_CertStrToName, mock_uuid4, mock_get_cert_thumprint, - mock_generate_key, mock_cast, - mock_POINTER, mock_malloc, mock_byref, + mock_generate_key, mock_malloc, mock_copy, mock_free, certstr, - certificate, enhanced_key, - store_handle, context_to_store): + certificate, enhanced_key, store_handle, + context_to_store): + + mock_POINTER = self._ctypes.POINTER + mock_byref = self._ctypes.byref + mock_cast = self._ctypes.cast mock_uuid4.return_value = 'fake_name' mock_CertCreateSelfSignCertificate.return_value = certificate @@ -190,11 +197,12 @@ class CryptoAPICertManagerTests(unittest.TestCase): mock_CertAddCertificateContextToStore.return_value = context_to_store if (certstr is None or certificate is None or enhanced_key is None or store_handle is None or context_to_store is None): - self.assertRaises(cryptoapi.CryptoAPIException, - self._x509.create_self_signed_cert, - 'fake subject', 10, True, x509.STORE_NAME_MY) + self.assertRaises(self.x509.cryptoapi.CryptoAPIException, + self._x509_manager.create_self_signed_cert, + 'fake subject', 10, True, + self.x509.STORE_NAME_MY) else: - response = self._x509.create_self_signed_cert( + response = self._x509_manager.create_self_signed_cert( subject='fake subject') mock_cast.assert_called_with(mock_malloc(), mock_POINTER()) mock_CRYPTOAPI_BLOB.assert_called_once_with() @@ -208,11 +216,11 @@ class CryptoAPICertManagerTests(unittest.TestCase): mock_byref(), mock_byref(), mock_byref(), None) mock_CertAddEnhancedKeyUsageIdentifier.assert_called_with( mock_CertCreateSelfSignCertificate(), - cryptoapi.szOID_PKIX_KP_SERVER_AUTH) + self.x509.cryptoapi.szOID_PKIX_KP_SERVER_AUTH) mock_CertOpenStore.assert_called_with( - cryptoapi.CERT_STORE_PROV_SYSTEM, 0, 0, - cryptoapi.CERT_SYSTEM_STORE_LOCAL_MACHINE, - s.unicode(x509.STORE_NAME_MY)) + self.x509.cryptoapi.CERT_STORE_PROV_SYSTEM, 0, 0, + self.x509.cryptoapi.CERT_SYSTEM_STORE_LOCAL_MACHINE, + s.unicode(self.x509.STORE_NAME_MY)) mock_get_cert_thumprint.assert_called_once_with( mock_CertCreateSelfSignCertificate()) @@ -272,7 +280,7 @@ class CryptoAPICertManagerTests(unittest.TestCase): fake_cert_data += x509constants.PEM_HEADER + '\n' fake_cert_data += 'fake cert' + '\n' fake_cert_data += x509constants.PEM_FOOTER - response = self._x509._get_cert_base64(fake_cert_data) + response = self._x509_manager._get_cert_base64(fake_cert_data) self.assertEqual(response, 'fake cert') @mock.patch('cloudbaseinit.utils.windows.x509.free') @@ -292,15 +300,8 @@ class CryptoAPICertManagerTests(unittest.TestCase): 'CryptStringToBinaryA') @mock.patch('cloudbaseinit.utils.windows.x509.CryptoAPICertManager' '._get_cert_base64') - @mock.patch('ctypes.POINTER') @mock.patch('cloudbaseinit.utils.windows.x509.malloc') - @mock.patch('ctypes.cast') - @mock.patch('ctypes.byref') - @mock.patch('ctypes.wintypes.DWORD') - @mock.patch('ctypes.create_unicode_buffer') - def _test_import_cert(self, mock_create_unicode_buffer, mock_DWORD, - mock_byref, mock_cast, - mock_malloc, mock_POINTER, mock_get_cert_base64, + def _test_import_cert(self, mock_malloc, mock_get_cert_base64, mock_CryptStringToBinaryA, mock_CertOpenStore, mock_CertAddEncodedCertificateToStore, mock_CertGetNameString, @@ -308,6 +309,13 @@ class CryptoAPICertManagerTests(unittest.TestCase): mock_CertCloseStore, mock_get_cert_thumprint, mock_free, crypttstr, store_handle, add_enc_cert, upn_len): + mock_POINTER = self._ctypes.POINTER + mock_cast = self._ctypes.cast + mock_byref = self._ctypes.byref + mock_DWORD = self._ctypes.wintypes.DWORD + + mock_create_unicode_buffer = self._ctypes.create_unicode_buffer + fake_cert_data = '' fake_cert_data += x509constants.PEM_HEADER + '\n' fake_cert_data += 'fake cert' + '\n' @@ -319,45 +327,55 @@ class CryptoAPICertManagerTests(unittest.TestCase): mock_CertGetNameString.side_effect = [2, upn_len] expected = [mock.call('fake cert', len('fake cert'), - cryptoapi.CRYPT_STRING_BASE64, None, + self.x509.cryptoapi.CRYPT_STRING_BASE64, None, mock_byref(), None, None), mock.call('fake cert', len('fake cert'), - cryptoapi.CRYPT_STRING_BASE64, mock_cast(), - mock_byref(), None, None)] - expected2 = [mock.call(mock_POINTER()(), cryptoapi.CERT_NAME_UPN_TYPE, + self.x509.cryptoapi.CRYPT_STRING_BASE64, + mock_cast(), mock_byref(), None, None)] + expected2 = [mock.call(mock_POINTER()(), + self.x509.cryptoapi.CERT_NAME_UPN_TYPE, 0, None, None, 0), - mock.call(mock_POINTER()(), cryptoapi.CERT_NAME_UPN_TYPE, + mock.call(mock_POINTER()(), + self.x509.cryptoapi.CERT_NAME_UPN_TYPE, 0, None, mock_create_unicode_buffer(), 2)] if (not crypttstr or store_handle is None or add_enc_cert is None or upn_len != 2): - self.assertRaises(cryptoapi.CryptoAPIException, - self._x509.import_cert, fake_cert_data, True, - x509.STORE_NAME_MY) + self.assertRaises(self.x509.cryptoapi.CryptoAPIException, + self._x509_manager.import_cert, fake_cert_data, + True, self.x509.STORE_NAME_MY) else: - response = self._x509.import_cert(fake_cert_data) + response = self._x509_manager.import_cert(fake_cert_data) + mock_cast.assert_called_with(mock_malloc(), mock_POINTER()) self.assertEqual(mock_CryptStringToBinaryA.call_args_list, expected) mock_CertOpenStore.assert_called_with( - cryptoapi.CERT_STORE_PROV_SYSTEM, 0, 0, - cryptoapi.CERT_SYSTEM_STORE_LOCAL_MACHINE, - s.unicode(x509.STORE_NAME_MY)) + self.x509.cryptoapi.CERT_STORE_PROV_SYSTEM, 0, 0, + self.x509.cryptoapi.CERT_SYSTEM_STORE_LOCAL_MACHINE, + s.unicode(self.x509.STORE_NAME_MY)) + mock_CertAddEncodedCertificateToStore.assert_called_with( mock_CertOpenStore(), - cryptoapi.X509_ASN_ENCODING | cryptoapi.PKCS_7_ASN_ENCODING, + self.x509.cryptoapi.X509_ASN_ENCODING | + self.x509.cryptoapi.PKCS_7_ASN_ENCODING, mock_cast(), mock_DWORD(), - cryptoapi.CERT_STORE_ADD_REPLACE_EXISTING, mock_byref()) + self.x509.cryptoapi.CERT_STORE_ADD_REPLACE_EXISTING, + mock_byref()) + mock_create_unicode_buffer.assert_called_with(2) self.assertEqual(mock_CertGetNameString.call_args_list, expected2) mock_get_cert_thumprint.assert_called_once_with(mock_POINTER()()) + mock_CertFreeCertificateContext.assert_called_once_with( mock_POINTER()()) mock_CertCloseStore.assert_called_once_with( mock_CertOpenStore(), 0) + mock_free.assert_called_once_with(mock_cast()) self.assertEqual(response, (mock_get_cert_thumprint(), mock_create_unicode_buffer().value)) + mock_get_cert_base64.assert_called_with(fake_cert_data) def test_import_cert(self):