diff --git a/cloudbaseinit/metadata/services/osconfigdrive/windows.py b/cloudbaseinit/metadata/services/osconfigdrive/windows.py index bbf1eb5d..c7639af8 100644 --- a/cloudbaseinit/metadata/services/osconfigdrive/windows.py +++ b/cloudbaseinit/metadata/services/osconfigdrive/windows.py @@ -63,7 +63,6 @@ class WindowsConfigDriveManager(base.BaseConfigDriveManager): def _get_iso_disk_size(self, phys_disk): geom = phys_disk.get_geometry() - if geom.MediaType != physical_disk.Win32_DiskGeometry.FixedMedia: return None @@ -74,9 +73,9 @@ class WindowsConfigDriveManager(base.BaseConfigDriveManager): id_off = 1 volume_size_off = 80 block_size_off = 128 - iso_id = 'CD001' + iso_id = b'CD001' - offset = boot_record_off / geom.BytesPerSector * geom.BytesPerSector + offset = boot_record_off // geom.BytesPerSector * geom.BytesPerSector bytes_to_read = geom.BytesPerSector if disk_size <= offset + bytes_to_read: @@ -102,7 +101,7 @@ class WindowsConfigDriveManager(base.BaseConfigDriveManager): geom = phys_disk.get_geometry() offset = 0 # Get a multiple of the sector byte size - bytes_to_read = 4096 / geom.BytesPerSector * geom.BytesPerSector + bytes_to_read = 4096 // geom.BytesPerSector * geom.BytesPerSector while offset < iso_file_size: phys_disk.seek(offset) diff --git a/cloudbaseinit/osutils/windows.py b/cloudbaseinit/osutils/windows.py index 07433b69..94aa6ace 100644 --- a/cloudbaseinit/osutils/windows.py +++ b/cloudbaseinit/osutils/windows.py @@ -381,12 +381,17 @@ class WindowsUtils(base.BaseOSUtils): raise exception.CloudbaseInitException( "Setting password expiration failed: %s" % ex.args[2]) + @staticmethod + def _get_cch_referenced_domain_name(domain_name): + return wintypes.DWORD( + ctypes.sizeof(domain_name) // ctypes.sizeof(wintypes.WCHAR)) + def _get_user_sid_and_domain(self, username): sid = ctypes.create_string_buffer(1024) cbSid = wintypes.DWORD(ctypes.sizeof(sid)) domainName = ctypes.create_unicode_buffer(1024) - cchReferencedDomainName = wintypes.DWORD( - ctypes.sizeof(domainName) / ctypes.sizeof(wintypes.WCHAR)) + cchReferencedDomainName = self._get_cch_referenced_domain_name( + domainName) sidNameUse = wintypes.DWORD() ret_val = advapi32.LookupAccountNameW( diff --git a/cloudbaseinit/tests/metadata/services/osconfigdrive/test_windows.py b/cloudbaseinit/tests/metadata/services/osconfigdrive/test_windows.py index 633df925..548a4f5b 100644 --- a/cloudbaseinit/tests/metadata/services/osconfigdrive/test_windows.py +++ b/cloudbaseinit/tests/metadata/services/osconfigdrive/test_windows.py @@ -96,8 +96,8 @@ class TestWindowsConfigDriveManager(unittest.TestCase): @mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.' 'WindowsConfigDriveManager._c_char_array_to_c_ushort') def _test_get_iso_disk_size(self, mock_c_char_array_to_c_ushort, - media_type, value, iso_id): - + media_type, value, iso_id, + bytes_per_sector=2): if media_type == "fixed": media_type = self.physical_disk.Win32_DiskGeometry.FixedMedia @@ -115,7 +115,7 @@ class TestWindowsConfigDriveManager(unittest.TestCase): mock_geom.Cylinders = value mock_geom.TracksPerCylinder = 2 mock_geom.SectorsPerTrack = 2 - mock_geom.BytesPerSector = 2 + mock_geom.BytesPerSector = bytes_per_sector mock_phys_disk.read.return_value = (mock_buff, 'fake value') @@ -125,7 +125,7 @@ class TestWindowsConfigDriveManager(unittest.TestCase): disk_size = mock_geom.Cylinders * mock_geom.TracksPerCylinder * \ mock_geom.SectorsPerTrack * mock_geom.BytesPerSector - offset = boot_record_off / mock_geom.BytesPerSector * \ + offset = boot_record_off // mock_geom.BytesPerSector * \ mock_geom.BytesPerSector buf_off_volume = boot_record_off - offset + volume_size_off @@ -134,7 +134,6 @@ class TestWindowsConfigDriveManager(unittest.TestCase): response = self._config_manager._get_iso_disk_size(mock_phys_disk) mock_phys_disk.get_geometry.assert_called_once_with() - if media_type != self.physical_disk.Win32_DiskGeometry.FixedMedia: self.assertIsNone(response) @@ -173,7 +172,12 @@ class TestWindowsConfigDriveManager(unittest.TestCase): media_type="fixed", value=100, iso_id='other id') - def test_write_iso_file(self): + def test_test_get_iso_disk_size_bigger_offset(self): + self._test_get_iso_disk_size( + media_type="other media", value=100, iso_id='other id', + bytes_per_sector=2000) + + def test_write_iso_file(self, bytes_per_sector=40): mock_buff = mock.MagicMock() mock_geom = mock.MagicMock() mock_geom.BytesPerSector = 2 @@ -187,10 +191,16 @@ class TestWindowsConfigDriveManager(unittest.TestCase): with mock.patch('six.moves.builtins.open', mock.mock_open(), create=True) as f: self._config_manager._write_iso_file(mock_phys_disk, fake_path, - 10) - f().write.assert_called_once_with(mock_buff) - mock_phys_disk.seek.assert_called_once_with(0) - mock_phys_disk.read.assert_called_once_with(10) + 4098) + f().write.assert_called_with(mock_buff) + + seek_calls = [mock.call(value) for value in range(0, 4098, 10)] + read_calls = ( + [mock.call(4096)] + + [mock.call(value) for value in range(4088, 0, -10)] + ) + self.assertEqual(seek_calls, mock_phys_disk.seek.mock_calls) + self.assertEqual(read_calls, mock_phys_disk.read.mock_calls) @mock.patch('os.makedirs') def _test_extract_iso_files(self, mock_makedirs, exit_code): diff --git a/cloudbaseinit/tests/osutils/test_windows.py b/cloudbaseinit/tests/osutils/test_windows.py index cf69fa28..64c2717e 100644 --- a/cloudbaseinit/tests/osutils/test_windows.py +++ b/cloudbaseinit/tests/osutils/test_windows.py @@ -237,7 +237,10 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase): sanitised = ' \\" ' self.assertEqual(sanitised, response) - def _test_get_user_sid_and_domain(self, ret_val, last_error=None): + @mock.patch('cloudbaseinit.osutils.windows.WindowsUtils.' + '_get_cch_referenced_domain_name') + def _test_get_user_sid_and_domain(self, mock_cch_referenced, + ret_val, last_error=None): cbSid = mock.Mock() sid = mock.Mock() size = 1024 @@ -245,13 +248,14 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase): domainName = mock.Mock() sidNameUse = mock.Mock() advapi32 = self._windll_mock.advapi32 + mock_cch_referenced.return_value = cchReferencedDomainName self._ctypes_mock.create_string_buffer.return_value = sid self._ctypes_mock.sizeof.return_value = size - self._wintypes_mock.DWORD.return_value = cchReferencedDomainName self._ctypes_mock.create_unicode_buffer.return_value = domainName advapi32.LookupAccountNameW.return_value = ret_val + if ret_val is None: with self.assert_raises_windows_message( "Cannot get user SID: %r", @@ -267,6 +271,17 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase): self._ctypes_mock.byref(cchReferencedDomainName), self._ctypes_mock.byref(sidNameUse)) self.assertEqual((sid, domainName.value), response) + mock_cch_referenced.assert_called_once_with( + self._ctypes_mock.create_unicode_buffer.return_value) + + def test_get_cch_referenced_domain_name(self): + self._ctypes_mock.sizeof.side_effect = [42, 24] + + result = self._winutils._get_cch_referenced_domain_name( + mock.sentinel.domain_name) + + self._wintypes_mock.DWORD.assert_called_once_with(42 // 24) + self.assertEqual(result, self._wintypes_mock.DWORD.return_value) def test_get_user_sid_and_domain(self): fake_obj = mock.Mock()