From 2026ee8e0f766f91cf62904ceca3de7382eea787 Mon Sep 17 00:00:00 2001 From: Claudiu Popa Date: Mon, 23 Feb 2015 13:11:57 +0200 Subject: [PATCH] Add support for VFAT ConfigDrive In order to provide support for VFAT ConfigDrive, this patch uses a couple of tools from the mtools package, mdir for listing VFAT filesystems, respectively mcopy, for copying files from such filesystems. The first step when retrieving the metadata from a ConfigDrive is now the lookup for VFAT filesystems. If any such filesystem is found, the metadata will be taken from there. Change-Id: I2fa9c96ce22936ca3d8d8fb9f55ca1ab29ce1d1d --- .../metadata/services/configdrive.py | 10 +- .../metadata/services/osconfigdrive/base.py | 2 +- .../services/osconfigdrive/windows.py | 19 ++- .../services/osconfigdrive/test_windows.py | 82 +++++++++++-- .../metadata/services/test_configdrive.py | 5 +- .../tests/utils/windows/test_vfat.py | 112 ++++++++++++++++++ cloudbaseinit/utils/windows/vfat.py | 68 +++++++++++ 7 files changed, 283 insertions(+), 15 deletions(-) create mode 100644 cloudbaseinit/tests/utils/windows/test_vfat.py create mode 100644 cloudbaseinit/utils/windows/vfat.py diff --git a/cloudbaseinit/metadata/services/configdrive.py b/cloudbaseinit/metadata/services/configdrive.py index 0ab3914e..3cd7824b 100644 --- a/cloudbaseinit/metadata/services/configdrive.py +++ b/cloudbaseinit/metadata/services/configdrive.py @@ -29,6 +29,8 @@ opts = [ help='Look for an ISO config drive in raw HDDs'), cfg.BoolOpt('config_drive_cdrom', default=True, help='Look for a config drive in the attached cdrom drives'), + cfg.BoolOpt('config_drive_vfat', default=True, + help='Look for a config drive in VFAT filesystems.'), ] CONF = cfg.CONF @@ -49,9 +51,11 @@ class ConfigDriveService(baseopenstackservice.BaseOpenStackService): target_path = os.path.join(tempfile.gettempdir(), str(uuid.uuid4())) mgr = factory.get_config_drive_manager() - found = mgr.get_config_drive_files(target_path, - CONF.config_drive_raw_hhd, - CONF.config_drive_cdrom) + found = mgr.get_config_drive_files( + target_path, + check_raw_hhd=CONF.config_drive_raw_hhd, + check_cdrom=CONF.config_drive_cdrom, + check_vfat=CONF.config_drive_vfat) if found: self._metadata_path = target_path LOG.debug('Metadata copied to folder: \'%s\'' % diff --git a/cloudbaseinit/metadata/services/osconfigdrive/base.py b/cloudbaseinit/metadata/services/osconfigdrive/base.py index 71009a59..fcd064fc 100644 --- a/cloudbaseinit/metadata/services/osconfigdrive/base.py +++ b/cloudbaseinit/metadata/services/osconfigdrive/base.py @@ -19,5 +19,5 @@ class BaseConfigDriveManager(object): @abc.abstractmethod def get_config_drive_files(self, target_path, check_raw_hhd=True, - check_cdrom=True): + check_cdrom=True, check_vfat=True): pass diff --git a/cloudbaseinit/metadata/services/osconfigdrive/windows.py b/cloudbaseinit/metadata/services/osconfigdrive/windows.py index 161310dd..ac588c25 100644 --- a/cloudbaseinit/metadata/services/osconfigdrive/windows.py +++ b/cloudbaseinit/metadata/services/osconfigdrive/windows.py @@ -26,6 +26,7 @@ from cloudbaseinit.metadata.services.osconfigdrive import base from cloudbaseinit.openstack.common import log as logging from cloudbaseinit.osutils import factory as osutils_factory from cloudbaseinit.utils.windows import physical_disk +from cloudbaseinit.utils.windows import vfat opts = [ cfg.StrOpt('bsdtar_path', default='bsdtar.exe', @@ -144,10 +145,24 @@ class WindowsConfigDriveManager(base.BaseConfigDriveManager): phys_disk.close() return iso_disk_found + def _get_conf_drive_from_vfat(self, target_path): + osutils = osutils_factory.get_os_utils() + for drive_path in osutils.get_physical_disks(): + if vfat.is_vfat_drive(osutils, drive_path): + LOG.info('Config Drive found on disk %r', drive_path) + os.makedirs(target_path) + vfat.copy_from_vfat_drive(osutils, drive_path, target_path) + return True + def get_config_drive_files(self, target_path, check_raw_hhd=True, - check_cdrom=True): + check_cdrom=True, check_vfat=True): config_drive_found = False - if check_raw_hhd: + + if check_vfat: + LOG.debug('Looking for Config Drive in VFAT filesystems') + config_drive_found = self._get_conf_drive_from_vfat(target_path) + + if not config_drive_found and check_raw_hhd: LOG.debug('Looking for Config Drive in raw HDDs') config_drive_found = self._get_conf_drive_from_raw_hdd( target_path) diff --git a/cloudbaseinit/tests/metadata/services/osconfigdrive/test_windows.py b/cloudbaseinit/tests/metadata/services/osconfigdrive/test_windows.py index c60ac1c3..6e17a069 100644 --- a/cloudbaseinit/tests/metadata/services/osconfigdrive/test_windows.py +++ b/cloudbaseinit/tests/metadata/services/osconfigdrive/test_windows.py @@ -23,6 +23,8 @@ except ImportError: from oslo.config import cfg from cloudbaseinit import exception +from cloudbaseinit.tests import testutils + CONF = cfg.CONF @@ -265,22 +267,46 @@ class TestWindowsConfigDriveManager(unittest.TestCase): 'WindowsConfigDriveManager._get_conf_drive_from_raw_hdd') @mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.' 'WindowsConfigDriveManager._get_conf_drive_from_cdrom_drive') - def test_get_config_drive_files(self, - mock_get_conf_drive_from_cdrom_drive, - mock_get_conf_drive_from_raw_hdd): + @mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.' + 'WindowsConfigDriveManager._get_conf_drive_from_vfat') + def _test_get_config_drive_files(self, + mock_get_conf_drive_from_vfat, + mock_get_conf_drive_from_cdrom_drive, + mock_get_conf_drive_from_raw_hdd, + raw_hdd_found=False, + cdrom_drive_found=False, + vfat_found=False): fake_path = os.path.join('fake', 'path') - mock_get_conf_drive_from_raw_hdd.return_value = False - mock_get_conf_drive_from_cdrom_drive.return_value = True + mock_get_conf_drive_from_raw_hdd.return_value = raw_hdd_found + mock_get_conf_drive_from_cdrom_drive.return_value = cdrom_drive_found + mock_get_conf_drive_from_vfat.return_value = vfat_found response = self._config_manager.get_config_drive_files( target_path=fake_path) - mock_get_conf_drive_from_raw_hdd.assert_called_once_with(fake_path) - mock_get_conf_drive_from_cdrom_drive.assert_called_once_with( - fake_path) + if vfat_found: + mock_get_conf_drive_from_vfat.assert_called_once_with(fake_path) + self.assertFalse(mock_get_conf_drive_from_raw_hdd.called) + self.assertFalse(mock_get_conf_drive_from_cdrom_drive.called) + elif cdrom_drive_found: + mock_get_conf_drive_from_vfat.assert_called_once_with(fake_path) + mock_get_conf_drive_from_cdrom_drive.assert_called_once_with( + fake_path) + mock_get_conf_drive_from_raw_hdd.assert_called_once_with( + fake_path) + elif raw_hdd_found: + mock_get_conf_drive_from_vfat.assert_called_once_with(fake_path) + mock_get_conf_drive_from_raw_hdd.assert_called_once_with( + fake_path) + self.assertFalse(mock_get_conf_drive_from_cdrom_drive.called) self.assertTrue(response) + def test_get_config_drive_files(self): + self._test_get_config_drive_files(raw_hdd_found=True) + self._test_get_config_drive_files(cdrom_drive_found=True) + self._test_get_config_drive_files(vfat_found=True) + @mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.' 'WindowsConfigDriveManager.' '_get_config_drive_cdrom_mount_point') @@ -354,3 +380,43 @@ class TestWindowsConfigDriveManager(unittest.TestCase): def test_get_conf_drive_from_raw_hdd_no_drive_found(self): self._test_get_conf_drive_from_raw_hdd(found_drive=False) + + @mock.patch('os.makedirs') + @mock.patch('cloudbaseinit.utils.windows.vfat.copy_from_vfat_drive') + @mock.patch('cloudbaseinit.utils.windows.vfat.is_vfat_drive') + @mock.patch('cloudbaseinit.osutils.factory.get_os_utils') + def test_get_conf_drive_from_vfat(self, mock_get_os_utils, + mock_is_vfat_drive, + mock_copy_from_vfat_drive, + mock_os_makedirs): + + mock_osutils = mock_get_os_utils.return_value + mock_osutils.get_physical_disks.return_value = ( + mock.sentinel.drive1, + mock.sentinel.drive2, + ) + mock_is_vfat_drive.side_effect = (None, True) + + with testutils.LogSnatcher('cloudbaseinit.metadata.services.' + 'osconfigdrive.windows') as snatcher: + response = self._config_manager._get_conf_drive_from_vfat( + mock.sentinel.target_path) + + self.assertTrue(response) + mock_osutils.get_physical_disks.assert_called_once_with() + + expected_is_vfat_calls = [ + mock.call(mock_osutils, mock.sentinel.drive1), + mock.call(mock_osutils, mock.sentinel.drive2), + ] + self.assertEqual(expected_is_vfat_calls, mock_is_vfat_drive.mock_calls) + mock_copy_from_vfat_drive.assert_called_once_with( + mock_osutils, + mock.sentinel.drive2, + mock.sentinel.target_path) + + expected_logging = [ + 'Config Drive found on disk %r' % mock.sentinel.drive2, + ] + self.assertEqual(expected_logging, snatcher.output) + mock_os_makedirs.assert_called_once_with(mock.sentinel.target_path) diff --git a/cloudbaseinit/tests/metadata/services/test_configdrive.py b/cloudbaseinit/tests/metadata/services/test_configdrive.py index 24325e43..349ab1b7 100644 --- a/cloudbaseinit/tests/metadata/services/test_configdrive.py +++ b/cloudbaseinit/tests/metadata/services/test_configdrive.py @@ -68,7 +68,10 @@ class ConfigDriveServiceTest(unittest.TestCase): mock_gettempdir.assert_called_once_with() mock_get_config_drive_manager.assert_called_once_with() mock_manager.get_config_drive_files.assert_called_once_with( - fake_path, CONF.config_drive_raw_hhd, CONF.config_drive_cdrom) + fake_path, + check_raw_hhd=CONF.config_drive_raw_hhd, + check_cdrom=CONF.config_drive_cdrom, + check_vfat=CONF.config_drive_vfat) self.assertTrue(response) self.assertEqual(fake_path, self._config_drive._metadata_path) diff --git a/cloudbaseinit/tests/utils/windows/test_vfat.py b/cloudbaseinit/tests/utils/windows/test_vfat.py new file mode 100644 index 00000000..b1b1578a --- /dev/null +++ b/cloudbaseinit/tests/utils/windows/test_vfat.py @@ -0,0 +1,112 @@ +# Copyright 2015 Cloudbase Solutions Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import unittest + +try: + import unittest.mock as mock +except ImportError: + import mock + +from cloudbaseinit import exception +from cloudbaseinit.tests import testutils +from cloudbaseinit.utils.windows import vfat + +CONF = vfat.CONF + + +class TestVfat(unittest.TestCase): + + def _test_is_vfat_drive(self, execute_process_value, + expected_logging, + expected_response): + + mock_osutils = mock.Mock() + mock_osutils.execute_process.return_value = execute_process_value + + with testutils.LogSnatcher('cloudbaseinit.utils.windows.' + 'vfat') as snatcher: + with testutils.ConfPatcher('mtools_path', 'mtools_path'): + + response = vfat.is_vfat_drive(mock_osutils, + mock.sentinel.drive) + + mdir = os.path.join(CONF.mtools_path, "mdir.exe") + mock_osutils.execute_process.assert_called_once_with( + [mdir, "-/", "-b", "-i", mock.sentinel.drive, "/"], + shell=False) + + self.assertEqual(expected_logging, snatcher.output) + self.assertEqual(expected_response, response) + + def test_is_vfat_drive_fails(self): + expected_logging = [ + "%r is not a VFAT location." % mock.sentinel.drive, + ] + execute_process_value = (None, None, 1) + expected_response = None + + self._test_is_vfat_drive(execute_process_value=execute_process_value, + expected_logging=expected_logging, + expected_response=expected_response) + + def test_is_vfat_drive_works(self): + mock_out = mock.Mock() + expected_logging = [] + execute_process_value = (mock_out, None, 0) + expected_response = True + + self._test_is_vfat_drive(execute_process_value=execute_process_value, + expected_logging=expected_logging, + expected_response=expected_response) + + @testutils.ConfPatcher('mtools_path', 'mtools_path') + @mock.patch('os.chdir') + def test_copy(self, mock_os_chdir): + cwd = os.getcwd() + mock_osutils = mock.Mock() + + vfat.copy_from_vfat_drive(mock_osutils, + mock.sentinel.drive, + mock.sentinel.target_path) + + mock_os_chdir_calls = [ + mock.call(mock.sentinel.target_path), + mock.call(cwd), + ] + self.assertEqual(mock_os_chdir_calls, mock_os_chdir.mock_calls) + self.assertEqual(os.getcwd(), cwd) + + mcopy = os.path.join(CONF.mtools_path, "mcopy.exe") + mock_osutils.execute_process.assert_called_once_with( + [mcopy, "-s", "-n", "-i", mock.sentinel.drive, "::/", "."], + shell=False) + + def test_is_vfat_drive_mtools_not_given(self): + with self.assertRaises(exception.CloudbaseInitException) as cm: + vfat.is_vfat_drive(mock.sentinel.osutils, + mock.sentinel.target_path) + expected_message = ('"mtools_path" needs to be provided in order ' + 'to access VFAT drives') + self.assertEqual(expected_message, str(cm.exception.args[0])) + + def test_copy_from_vfat_drive_mtools_not_given(self): + with self.assertRaises(exception.CloudbaseInitException) as cm: + vfat.copy_from_vfat_drive(mock.sentinel.osutils, + mock.sentinel.drive_path, + mock.sentinel.target_path) + expected_message = ('"mtools_path" needs to be provided in order ' + 'to access VFAT drives') + self.assertEqual(expected_message, str(cm.exception.args[0])) diff --git a/cloudbaseinit/utils/windows/vfat.py b/cloudbaseinit/utils/windows/vfat.py new file mode 100644 index 00000000..e6bea79c --- /dev/null +++ b/cloudbaseinit/utils/windows/vfat.py @@ -0,0 +1,68 @@ +# Copyright 2015 Cloudbase Solutions Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os + +from cloudbaseinit import exception +from cloudbaseinit.openstack.common import log as logging + +from oslo.config import cfg + + +opts = [ + cfg.StrOpt('mtools_path', default=None, + help='Path to "mtools" program suite, used for interacting ' + 'with VFAT filesystems'), +] + +CONF = cfg.CONF +CONF.register_opts(opts) +LOG = logging.getLogger(__name__) + + +def _check_mtools_path(): + if not CONF.mtools_path: + raise exception.CloudbaseInitException( + '"mtools_path" needs to be provided in order ' + 'to access VFAT drives') + + +def is_vfat_drive(osutils, drive_path): + """Check if the given drive contains a VFAT filesystem.""" + _check_mtools_path() + mdir = os.path.join(CONF.mtools_path, "mdir.exe") + args = [mdir, "-/", "-b", "-i", drive_path, "/"] + _, _, exit_code = osutils.execute_process(args, shell=False) + if exit_code: + LOG.warning("%r is not a VFAT location.", drive_path) + return + + return True + + +def copy_from_vfat_drive(osutils, drive_path, target_path): + """Copy everything from the given VFAT drive into the given target.""" + _check_mtools_path() + cwd = os.getcwd() + try: + os.chdir(target_path) + + # A mcopy call looks like this: + # + # mcopy -n -i \\.\PHYSICALDRIVEx ::/file/path destination/path + mcopy = os.path.join(CONF.mtools_path, "mcopy.exe") + args = [mcopy, "-s", "-n", "-i", drive_path, "::/", "."] + osutils.execute_process(args, shell=False) + finally: + os.chdir(cwd)