Merge "Add support for VFAT ConfigDrive"
This commit is contained in:
commit
75bd71d3a3
@ -29,6 +29,8 @@ opts = [
|
||||
help='Look for an ISO config drive in raw HDDs'),
|
||||
cfg.BoolOpt('config_drive_cdrom', default=True,
|
||||
help='Look for a config drive in the attached cdrom drives'),
|
||||
cfg.BoolOpt('config_drive_vfat', default=True,
|
||||
help='Look for a config drive in VFAT filesystems.'),
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
@ -49,9 +51,11 @@ class ConfigDriveService(baseopenstackservice.BaseOpenStackService):
|
||||
target_path = os.path.join(tempfile.gettempdir(), str(uuid.uuid4()))
|
||||
|
||||
mgr = factory.get_config_drive_manager()
|
||||
found = mgr.get_config_drive_files(target_path,
|
||||
CONF.config_drive_raw_hhd,
|
||||
CONF.config_drive_cdrom)
|
||||
found = mgr.get_config_drive_files(
|
||||
target_path,
|
||||
check_raw_hhd=CONF.config_drive_raw_hhd,
|
||||
check_cdrom=CONF.config_drive_cdrom,
|
||||
check_vfat=CONF.config_drive_vfat)
|
||||
if found:
|
||||
self._metadata_path = target_path
|
||||
LOG.debug('Metadata copied to folder: \'%s\'' %
|
||||
|
@ -19,5 +19,5 @@ class BaseConfigDriveManager(object):
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_config_drive_files(self, target_path, check_raw_hhd=True,
|
||||
check_cdrom=True):
|
||||
check_cdrom=True, check_vfat=True):
|
||||
pass
|
||||
|
@ -26,6 +26,7 @@ from cloudbaseinit.metadata.services.osconfigdrive import base
|
||||
from cloudbaseinit.openstack.common import log as logging
|
||||
from cloudbaseinit.osutils import factory as osutils_factory
|
||||
from cloudbaseinit.utils.windows import physical_disk
|
||||
from cloudbaseinit.utils.windows import vfat
|
||||
|
||||
opts = [
|
||||
cfg.StrOpt('bsdtar_path', default='bsdtar.exe',
|
||||
@ -144,10 +145,24 @@ class WindowsConfigDriveManager(base.BaseConfigDriveManager):
|
||||
phys_disk.close()
|
||||
return iso_disk_found
|
||||
|
||||
def _get_conf_drive_from_vfat(self, target_path):
|
||||
osutils = osutils_factory.get_os_utils()
|
||||
for drive_path in osutils.get_physical_disks():
|
||||
if vfat.is_vfat_drive(osutils, drive_path):
|
||||
LOG.info('Config Drive found on disk %r', drive_path)
|
||||
os.makedirs(target_path)
|
||||
vfat.copy_from_vfat_drive(osutils, drive_path, target_path)
|
||||
return True
|
||||
|
||||
def get_config_drive_files(self, target_path, check_raw_hhd=True,
|
||||
check_cdrom=True):
|
||||
check_cdrom=True, check_vfat=True):
|
||||
config_drive_found = False
|
||||
if check_raw_hhd:
|
||||
|
||||
if check_vfat:
|
||||
LOG.debug('Looking for Config Drive in VFAT filesystems')
|
||||
config_drive_found = self._get_conf_drive_from_vfat(target_path)
|
||||
|
||||
if not config_drive_found and check_raw_hhd:
|
||||
LOG.debug('Looking for Config Drive in raw HDDs')
|
||||
config_drive_found = self._get_conf_drive_from_raw_hdd(
|
||||
target_path)
|
||||
|
@ -23,6 +23,8 @@ except ImportError:
|
||||
from oslo.config import cfg
|
||||
|
||||
from cloudbaseinit import exception
|
||||
from cloudbaseinit.tests import testutils
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
@ -265,22 +267,46 @@ class TestWindowsConfigDriveManager(unittest.TestCase):
|
||||
'WindowsConfigDriveManager._get_conf_drive_from_raw_hdd')
|
||||
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
|
||||
'WindowsConfigDriveManager._get_conf_drive_from_cdrom_drive')
|
||||
def test_get_config_drive_files(self,
|
||||
mock_get_conf_drive_from_cdrom_drive,
|
||||
mock_get_conf_drive_from_raw_hdd):
|
||||
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
|
||||
'WindowsConfigDriveManager._get_conf_drive_from_vfat')
|
||||
def _test_get_config_drive_files(self,
|
||||
mock_get_conf_drive_from_vfat,
|
||||
mock_get_conf_drive_from_cdrom_drive,
|
||||
mock_get_conf_drive_from_raw_hdd,
|
||||
raw_hdd_found=False,
|
||||
cdrom_drive_found=False,
|
||||
vfat_found=False):
|
||||
|
||||
fake_path = os.path.join('fake', 'path')
|
||||
mock_get_conf_drive_from_raw_hdd.return_value = False
|
||||
mock_get_conf_drive_from_cdrom_drive.return_value = True
|
||||
mock_get_conf_drive_from_raw_hdd.return_value = raw_hdd_found
|
||||
mock_get_conf_drive_from_cdrom_drive.return_value = cdrom_drive_found
|
||||
mock_get_conf_drive_from_vfat.return_value = vfat_found
|
||||
|
||||
response = self._config_manager.get_config_drive_files(
|
||||
target_path=fake_path)
|
||||
|
||||
mock_get_conf_drive_from_raw_hdd.assert_called_once_with(fake_path)
|
||||
mock_get_conf_drive_from_cdrom_drive.assert_called_once_with(
|
||||
fake_path)
|
||||
if vfat_found:
|
||||
mock_get_conf_drive_from_vfat.assert_called_once_with(fake_path)
|
||||
self.assertFalse(mock_get_conf_drive_from_raw_hdd.called)
|
||||
self.assertFalse(mock_get_conf_drive_from_cdrom_drive.called)
|
||||
elif cdrom_drive_found:
|
||||
mock_get_conf_drive_from_vfat.assert_called_once_with(fake_path)
|
||||
mock_get_conf_drive_from_cdrom_drive.assert_called_once_with(
|
||||
fake_path)
|
||||
mock_get_conf_drive_from_raw_hdd.assert_called_once_with(
|
||||
fake_path)
|
||||
elif raw_hdd_found:
|
||||
mock_get_conf_drive_from_vfat.assert_called_once_with(fake_path)
|
||||
mock_get_conf_drive_from_raw_hdd.assert_called_once_with(
|
||||
fake_path)
|
||||
self.assertFalse(mock_get_conf_drive_from_cdrom_drive.called)
|
||||
self.assertTrue(response)
|
||||
|
||||
def test_get_config_drive_files(self):
|
||||
self._test_get_config_drive_files(raw_hdd_found=True)
|
||||
self._test_get_config_drive_files(cdrom_drive_found=True)
|
||||
self._test_get_config_drive_files(vfat_found=True)
|
||||
|
||||
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
|
||||
'WindowsConfigDriveManager.'
|
||||
'_get_config_drive_cdrom_mount_point')
|
||||
@ -354,3 +380,43 @@ class TestWindowsConfigDriveManager(unittest.TestCase):
|
||||
|
||||
def test_get_conf_drive_from_raw_hdd_no_drive_found(self):
|
||||
self._test_get_conf_drive_from_raw_hdd(found_drive=False)
|
||||
|
||||
@mock.patch('os.makedirs')
|
||||
@mock.patch('cloudbaseinit.utils.windows.vfat.copy_from_vfat_drive')
|
||||
@mock.patch('cloudbaseinit.utils.windows.vfat.is_vfat_drive')
|
||||
@mock.patch('cloudbaseinit.osutils.factory.get_os_utils')
|
||||
def test_get_conf_drive_from_vfat(self, mock_get_os_utils,
|
||||
mock_is_vfat_drive,
|
||||
mock_copy_from_vfat_drive,
|
||||
mock_os_makedirs):
|
||||
|
||||
mock_osutils = mock_get_os_utils.return_value
|
||||
mock_osutils.get_physical_disks.return_value = (
|
||||
mock.sentinel.drive1,
|
||||
mock.sentinel.drive2,
|
||||
)
|
||||
mock_is_vfat_drive.side_effect = (None, True)
|
||||
|
||||
with testutils.LogSnatcher('cloudbaseinit.metadata.services.'
|
||||
'osconfigdrive.windows') as snatcher:
|
||||
response = self._config_manager._get_conf_drive_from_vfat(
|
||||
mock.sentinel.target_path)
|
||||
|
||||
self.assertTrue(response)
|
||||
mock_osutils.get_physical_disks.assert_called_once_with()
|
||||
|
||||
expected_is_vfat_calls = [
|
||||
mock.call(mock_osutils, mock.sentinel.drive1),
|
||||
mock.call(mock_osutils, mock.sentinel.drive2),
|
||||
]
|
||||
self.assertEqual(expected_is_vfat_calls, mock_is_vfat_drive.mock_calls)
|
||||
mock_copy_from_vfat_drive.assert_called_once_with(
|
||||
mock_osutils,
|
||||
mock.sentinel.drive2,
|
||||
mock.sentinel.target_path)
|
||||
|
||||
expected_logging = [
|
||||
'Config Drive found on disk %r' % mock.sentinel.drive2,
|
||||
]
|
||||
self.assertEqual(expected_logging, snatcher.output)
|
||||
mock_os_makedirs.assert_called_once_with(mock.sentinel.target_path)
|
||||
|
@ -68,7 +68,10 @@ class ConfigDriveServiceTest(unittest.TestCase):
|
||||
mock_gettempdir.assert_called_once_with()
|
||||
mock_get_config_drive_manager.assert_called_once_with()
|
||||
mock_manager.get_config_drive_files.assert_called_once_with(
|
||||
fake_path, CONF.config_drive_raw_hhd, CONF.config_drive_cdrom)
|
||||
fake_path,
|
||||
check_raw_hhd=CONF.config_drive_raw_hhd,
|
||||
check_cdrom=CONF.config_drive_cdrom,
|
||||
check_vfat=CONF.config_drive_vfat)
|
||||
self.assertTrue(response)
|
||||
self.assertEqual(fake_path, self._config_drive._metadata_path)
|
||||
|
||||
|
112
cloudbaseinit/tests/utils/windows/test_vfat.py
Normal file
112
cloudbaseinit/tests/utils/windows/test_vfat.py
Normal file
@ -0,0 +1,112 @@
|
||||
# Copyright 2015 Cloudbase Solutions Srl
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
import unittest
|
||||
|
||||
try:
|
||||
import unittest.mock as mock
|
||||
except ImportError:
|
||||
import mock
|
||||
|
||||
from cloudbaseinit import exception
|
||||
from cloudbaseinit.tests import testutils
|
||||
from cloudbaseinit.utils.windows import vfat
|
||||
|
||||
CONF = vfat.CONF
|
||||
|
||||
|
||||
class TestVfat(unittest.TestCase):
|
||||
|
||||
def _test_is_vfat_drive(self, execute_process_value,
|
||||
expected_logging,
|
||||
expected_response):
|
||||
|
||||
mock_osutils = mock.Mock()
|
||||
mock_osutils.execute_process.return_value = execute_process_value
|
||||
|
||||
with testutils.LogSnatcher('cloudbaseinit.utils.windows.'
|
||||
'vfat') as snatcher:
|
||||
with testutils.ConfPatcher('mtools_path', 'mtools_path'):
|
||||
|
||||
response = vfat.is_vfat_drive(mock_osutils,
|
||||
mock.sentinel.drive)
|
||||
|
||||
mdir = os.path.join(CONF.mtools_path, "mdir.exe")
|
||||
mock_osutils.execute_process.assert_called_once_with(
|
||||
[mdir, "-/", "-b", "-i", mock.sentinel.drive, "/"],
|
||||
shell=False)
|
||||
|
||||
self.assertEqual(expected_logging, snatcher.output)
|
||||
self.assertEqual(expected_response, response)
|
||||
|
||||
def test_is_vfat_drive_fails(self):
|
||||
expected_logging = [
|
||||
"%r is not a VFAT location." % mock.sentinel.drive,
|
||||
]
|
||||
execute_process_value = (None, None, 1)
|
||||
expected_response = None
|
||||
|
||||
self._test_is_vfat_drive(execute_process_value=execute_process_value,
|
||||
expected_logging=expected_logging,
|
||||
expected_response=expected_response)
|
||||
|
||||
def test_is_vfat_drive_works(self):
|
||||
mock_out = mock.Mock()
|
||||
expected_logging = []
|
||||
execute_process_value = (mock_out, None, 0)
|
||||
expected_response = True
|
||||
|
||||
self._test_is_vfat_drive(execute_process_value=execute_process_value,
|
||||
expected_logging=expected_logging,
|
||||
expected_response=expected_response)
|
||||
|
||||
@testutils.ConfPatcher('mtools_path', 'mtools_path')
|
||||
@mock.patch('os.chdir')
|
||||
def test_copy(self, mock_os_chdir):
|
||||
cwd = os.getcwd()
|
||||
mock_osutils = mock.Mock()
|
||||
|
||||
vfat.copy_from_vfat_drive(mock_osutils,
|
||||
mock.sentinel.drive,
|
||||
mock.sentinel.target_path)
|
||||
|
||||
mock_os_chdir_calls = [
|
||||
mock.call(mock.sentinel.target_path),
|
||||
mock.call(cwd),
|
||||
]
|
||||
self.assertEqual(mock_os_chdir_calls, mock_os_chdir.mock_calls)
|
||||
self.assertEqual(os.getcwd(), cwd)
|
||||
|
||||
mcopy = os.path.join(CONF.mtools_path, "mcopy.exe")
|
||||
mock_osutils.execute_process.assert_called_once_with(
|
||||
[mcopy, "-s", "-n", "-i", mock.sentinel.drive, "::/", "."],
|
||||
shell=False)
|
||||
|
||||
def test_is_vfat_drive_mtools_not_given(self):
|
||||
with self.assertRaises(exception.CloudbaseInitException) as cm:
|
||||
vfat.is_vfat_drive(mock.sentinel.osutils,
|
||||
mock.sentinel.target_path)
|
||||
expected_message = ('"mtools_path" needs to be provided in order '
|
||||
'to access VFAT drives')
|
||||
self.assertEqual(expected_message, str(cm.exception.args[0]))
|
||||
|
||||
def test_copy_from_vfat_drive_mtools_not_given(self):
|
||||
with self.assertRaises(exception.CloudbaseInitException) as cm:
|
||||
vfat.copy_from_vfat_drive(mock.sentinel.osutils,
|
||||
mock.sentinel.drive_path,
|
||||
mock.sentinel.target_path)
|
||||
expected_message = ('"mtools_path" needs to be provided in order '
|
||||
'to access VFAT drives')
|
||||
self.assertEqual(expected_message, str(cm.exception.args[0]))
|
68
cloudbaseinit/utils/windows/vfat.py
Normal file
68
cloudbaseinit/utils/windows/vfat.py
Normal file
@ -0,0 +1,68 @@
|
||||
# Copyright 2015 Cloudbase Solutions Srl
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
|
||||
from cloudbaseinit import exception
|
||||
from cloudbaseinit.openstack.common import log as logging
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
|
||||
opts = [
|
||||
cfg.StrOpt('mtools_path', default=None,
|
||||
help='Path to "mtools" program suite, used for interacting '
|
||||
'with VFAT filesystems'),
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(opts)
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _check_mtools_path():
|
||||
if not CONF.mtools_path:
|
||||
raise exception.CloudbaseInitException(
|
||||
'"mtools_path" needs to be provided in order '
|
||||
'to access VFAT drives')
|
||||
|
||||
|
||||
def is_vfat_drive(osutils, drive_path):
|
||||
"""Check if the given drive contains a VFAT filesystem."""
|
||||
_check_mtools_path()
|
||||
mdir = os.path.join(CONF.mtools_path, "mdir.exe")
|
||||
args = [mdir, "-/", "-b", "-i", drive_path, "/"]
|
||||
_, _, exit_code = osutils.execute_process(args, shell=False)
|
||||
if exit_code:
|
||||
LOG.warning("%r is not a VFAT location.", drive_path)
|
||||
return
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def copy_from_vfat_drive(osutils, drive_path, target_path):
|
||||
"""Copy everything from the given VFAT drive into the given target."""
|
||||
_check_mtools_path()
|
||||
cwd = os.getcwd()
|
||||
try:
|
||||
os.chdir(target_path)
|
||||
|
||||
# A mcopy call looks like this:
|
||||
#
|
||||
# mcopy -n -i \\.\PHYSICALDRIVEx ::/file/path destination/path
|
||||
mcopy = os.path.join(CONF.mtools_path, "mcopy.exe")
|
||||
args = [mcopy, "-s", "-n", "-i", drive_path, "::/", "."]
|
||||
osutils.execute_process(args, shell=False)
|
||||
finally:
|
||||
os.chdir(cwd)
|
Loading…
x
Reference in New Issue
Block a user