diff --git a/cloudbaseinit/metadata/services/base.py b/cloudbaseinit/metadata/services/base.py
index c4b0735a..214b53fa 100644
--- a/cloudbaseinit/metadata/services/base.py
+++ b/cloudbaseinit/metadata/services/base.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2012 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -14,13 +12,17 @@
# License for the specific language governing permissions and limitations
# under the License.
+
import abc
+import collections
import time
+import warnings
from oslo.config import cfg
from cloudbaseinit.openstack.common import log as logging
+
opts = [
cfg.IntOpt('retry_count', default=5,
help='Max. number of attempts for fetching metadata in '
@@ -35,6 +37,20 @@ CONF.register_opts(opts)
LOG = logging.getLogger(__name__)
+# Both the custom service(s) and the networking plugin
+# should know about the entries of these kind of objects.
+NetworkDetails = collections.namedtuple(
+ "NetworkDetails",
+ [
+ "mac",
+ "address",
+ "netmask",
+ "broadcast",
+ "gateway",
+ "dnsnameservers",
+ ]
+)
+
class NotExistingMetadataException(Exception):
pass
@@ -82,6 +98,7 @@ class BaseMetadataService(object):
pass
def get_content(self, name):
+ # this will also be deprecated due to `get_network_config`
pass
def get_user_data(self):
@@ -94,7 +111,17 @@ class BaseMetadataService(object):
pass
def get_network_config(self):
- pass
+ """Deprecated, use `get_network_details` instead."""
+ warnings.warn("deprecated method, use `get_network_details`",
+ DeprecationWarning)
+
+ def get_network_details(self):
+ """Return a list of `NetworkDetails` objects.
+
+ These objects provide details regarding static
+ network configuration, details which can be found
+ in the namedtuple defined above.
+ """
def get_admin_password(self):
pass
diff --git a/cloudbaseinit/osutils/base.py b/cloudbaseinit/osutils/base.py
index f22bf8f4..ace015ae 100644
--- a/cloudbaseinit/osutils/base.py
+++ b/cloudbaseinit/osutils/base.py
@@ -70,7 +70,7 @@ class BaseOSUtils(object):
def get_network_adapters(self):
raise NotImplementedError()
- def set_static_network_config(self, adapter_name, address, netmask,
+ def set_static_network_config(self, mac_address, address, netmask,
broadcast, gateway, dnsnameservers):
raise NotImplementedError()
diff --git a/cloudbaseinit/osutils/windows.py b/cloudbaseinit/osutils/windows.py
index 0b47cd2f..3fe2e199 100644
--- a/cloudbaseinit/osutils/windows.py
+++ b/cloudbaseinit/osutils/windows.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2012 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -15,33 +13,33 @@
# under the License.
import ctypes
+from ctypes import wintypes
import os
import re
-import six
import time
+
+import six
+from six.moves import winreg
+from win32com import client
import win32process
import win32security
import wmi
-from ctypes import windll
-from ctypes import wintypes
-from six.moves import winreg
-from win32com import client
-
from cloudbaseinit import exception
from cloudbaseinit.openstack.common import log as logging
from cloudbaseinit.osutils import base
from cloudbaseinit.utils.windows import network
+
LOG = logging.getLogger(__name__)
-advapi32 = windll.advapi32
-kernel32 = windll.kernel32
-netapi32 = windll.netapi32
-userenv = windll.userenv
-iphlpapi = windll.iphlpapi
-Ws2_32 = windll.Ws2_32
-setupapi = windll.setupapi
+advapi32 = ctypes.windll.advapi32
+kernel32 = ctypes.windll.kernel32
+netapi32 = ctypes.windll.netapi32
+userenv = ctypes.windll.userenv
+iphlpapi = ctypes.windll.iphlpapi
+Ws2_32 = ctypes.windll.Ws2_32
+setupapi = ctypes.windll.setupapi
msvcrt = ctypes.cdll.msvcrt
@@ -454,7 +452,7 @@ class WindowsUtils(base.BaseOSUtils):
raise exception.CloudbaseInitException("Cannot set host name")
def get_network_adapters(self):
- l = []
+ """Return available adapters as a list of tuples of (name, mac)."""
conn = wmi.WMI(moniker='//./root/cimv2')
# Get Ethernet adapters only
wql = ('SELECT * FROM Win32_NetworkAdapter WHERE '
@@ -464,9 +462,7 @@ class WindowsUtils(base.BaseOSUtils):
wql += ' AND PhysicalAdapter = True'
q = conn.query(wql)
- for r in q:
- l.append(r.Name)
- return l
+ return [(r.Name, r.MACAddress) for r in q]
def get_dhcp_hosts_in_use(self):
dhcp_hosts = []
@@ -524,14 +520,12 @@ class WindowsUtils(base.BaseOSUtils):
'value "%(mtu)s" failed' % {'mac_address': mac_address,
'mtu': mtu})
- def set_static_network_config(self, adapter_name, address, netmask,
+ def set_static_network_config(self, mac_address, address, netmask,
broadcast, gateway, dnsnameservers):
conn = wmi.WMI(moniker='//./root/cimv2')
- adapter_name_san = self._sanitize_wmi_input(adapter_name)
- q = conn.query('SELECT * FROM Win32_NetworkAdapter WHERE '
- 'MACAddress IS NOT NULL AND '
- 'Name = \'%s\'' % adapter_name_san)
+ q = conn.query("SELECT * FROM Win32_NetworkAdapter WHERE "
+ "MACAddress = '{}'".format(mac_address))
if not len(q):
raise exception.CloudbaseInitException(
"Network adapter not found")
diff --git a/cloudbaseinit/plugins/windows/networkconfig.py b/cloudbaseinit/plugins/windows/networkconfig.py
index 64037bb3..fcca5520 100644
--- a/cloudbaseinit/plugins/windows/networkconfig.py
+++ b/cloudbaseinit/plugins/windows/networkconfig.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2012 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -14,75 +12,132 @@
# License for the specific language governing permissions and limitations
# under the License.
+
import re
from oslo.config import cfg
from cloudbaseinit import exception
+from cloudbaseinit.metadata.services import base as service_base
from cloudbaseinit.openstack.common import log as logging
from cloudbaseinit.osutils import factory as osutils_factory
-from cloudbaseinit.plugins import base
+from cloudbaseinit.plugins import base as plugin_base
+
LOG = logging.getLogger(__name__)
opts = [
cfg.StrOpt('network_adapter', default=None, help='Network adapter to '
'configure. If not specified, the first available ethernet '
- 'adapter will be chosen'),
+ 'adapter will be chosen.\n'
+ 'WARNING: This option is deprecated and will be removed soon.'),
]
CONF = cfg.CONF
CONF.register_opts(opts)
-class NetworkConfigPlugin(base.BasePlugin):
+class NetworkConfigPlugin(plugin_base.BasePlugin):
+
def execute(self, service, shared_data):
- network_config = service.get_network_config()
- if not network_config:
- return (base.PLUGIN_EXECUTION_DONE, False)
-
- if 'content_path' not in network_config:
- return (base.PLUGIN_EXECUTION_DONE, False)
-
- content_path = network_config['content_path']
- content_name = content_path.rsplit('/', 1)[-1]
- debian_network_conf = service.get_content(content_name)
-
- LOG.debug('network config content:\n%s' % debian_network_conf)
-
- # TODO(alexpilotti): implement a proper grammar
- m = re.search(r'iface eth0 inet static\s+'
- r'address\s+(?P
[^\s]+)\s+'
- r'netmask\s+(?P[^\s]+)\s+'
- r'broadcast\s+(?P[^\s]+)\s+'
- r'gateway\s+(?P[^\s]+)\s+'
- r'dns\-nameservers\s+(?P[^\r\n]+)\s+',
- debian_network_conf)
- if not m:
- raise exception.CloudbaseInitException(
- "network_config format not recognized")
-
- address = m.group('address')
- netmask = m.group('netmask')
- broadcast = m.group('broadcast')
- gateway = m.group('gateway')
- dnsnameservers = m.group('dnsnameservers').strip().split(' ')
+ # FIXME(cpoieana): `network_config` is deprecated
+ # * refactor all services by providing NetworkDetails objects *
+ # Also, the old method is not supporting multiple NICs.
osutils = osutils_factory.get_os_utils()
+ network_details = service.get_network_details()
+ if not network_details:
+ network_config = service.get_network_config()
+ if not network_config:
+ return (plugin_base.PLUGIN_EXECUTION_DONE, False)
- network_adapter_name = CONF.network_adapter
- if not network_adapter_name:
- # Get the first available one
- available_adapters = osutils.get_network_adapters()
- if not len(available_adapters):
+ # ---- BEGIN deprecated code ----
+ if not network_details:
+ if 'content_path' not in network_config:
+ return (plugin_base.PLUGIN_EXECUTION_DONE, False)
+
+ content_path = network_config['content_path']
+ content_name = content_path.rsplit('/', 1)[-1]
+ debian_network_conf = service.get_content(content_name)
+
+ LOG.debug('network config content:\n%s' % debian_network_conf)
+
+ # TODO(alexpilotti): implement a proper grammar
+ m = re.search(r'iface eth0 inet static\s+'
+ r'address\s+(?P[^\s]+)\s+'
+ r'netmask\s+(?P[^\s]+)\s+'
+ r'broadcast\s+(?P[^\s]+)\s+'
+ r'gateway\s+(?P[^\s]+)\s+'
+ r'dns\-nameservers\s+'
+ r'(?P[^\r\n]+)\s+',
+ debian_network_conf)
+ if not m:
raise exception.CloudbaseInitException(
- "No network adapter available")
- network_adapter_name = available_adapters[0]
+ "network_config format not recognized")
- LOG.info('Configuring network adapter: \'%s\'' % network_adapter_name)
+ mac = None
+ network_adapters = osutils.get_network_adapters()
+ if network_adapters:
+ adapter_name = CONF.network_adapter
+ if adapter_name:
+ # configure with the specified one
+ for network_adapter in network_adapters:
+ if network_adapter[0] == adapter_name:
+ mac = network_adapter[1]
+ break
+ else:
+ # configure with the first one
+ mac = network_adapters[0][1]
+ network_details = [
+ service_base.NetworkDetails(
+ mac,
+ m.group('address'),
+ m.group('netmask'),
+ m.group('broadcast'),
+ m.group('gateway'),
+ m.group('dnsnameservers').strip().split(' ')
+ )
+ ]
+ # ---- END deprecated code ----
- reboot_required = osutils.set_static_network_config(
- network_adapter_name, address, netmask, broadcast,
- gateway, dnsnameservers)
+ # check NICs' type and save them by MAC
+ macnics = {}
+ for nic in network_details:
+ if not isinstance(nic, service_base.NetworkDetails):
+ raise exception.CloudbaseInitException(
+ "invalid NetworkDetails object {!r}"
+ .format(type(nic))
+ )
+ # assuming that the MAC address is unique
+ macnics[nic.mac] = nic
+ # try configuring all the available adapters
+ adapter_macs = [pair[1] for pair in
+ osutils.get_network_adapters()]
+ if not adapter_macs:
+ raise exception.CloudbaseInitException(
+ "no network adapters available")
+ # configure each one
+ reboot_required = False
+ configured = False
+ for mac in adapter_macs:
+ nic = macnics.pop(mac, None)
+ if not nic:
+ LOG.warn("Missing details for adapter %s", mac)
+ continue
+ LOG.info("Configuring network adapter %s", mac)
+ reboot = osutils.set_static_network_config(
+ mac,
+ nic.address,
+ nic.netmask,
+ nic.broadcast,
+ nic.gateway,
+ nic.dnsnameservers
+ )
+ reboot_required = reboot or reboot_required
+ configured = True
+ for mac in macnics:
+ LOG.warn("Details not used for adapter %s", mac)
+ if not configured:
+ LOG.error("No adapters were configured")
- return (base.PLUGIN_EXECUTION_DONE, reboot_required)
+ return (plugin_base.PLUGIN_EXECUTION_DONE, reboot_required)
diff --git a/cloudbaseinit/tests/osutils/test_windows.py b/cloudbaseinit/tests/osutils/test_windows.py
index 90b23302..5b4af119 100644
--- a/cloudbaseinit/tests/osutils/test_windows.py
+++ b/cloudbaseinit/tests/osutils/test_windows.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2013 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -14,16 +12,18 @@
# License for the specific language governing permissions and limitations
# under the License.
+
import importlib
-import mock
import os
-import six
import unittest
+import mock
from oslo.config import cfg
+import six
from cloudbaseinit import exception
+
CONF = cfg.CONF
@@ -517,7 +517,8 @@ class WindowsUtilsTest(unittest.TestCase):
response = self._winutils.get_network_adapters()
conn.return_value.query.assert_called_with(wql)
- self.assertEqual([mock_response.Name], response)
+ self.assertEqual([(mock_response.Name, mock_response.MACAddress)],
+ response)
def test_get_network_adapters(self):
self._test_get_network_adapters(False)
@@ -532,7 +533,7 @@ class WindowsUtilsTest(unittest.TestCase):
ret_val2=None, ret_val3=None):
conn = self._wmi_mock.WMI
address = '10.10.10.10'
- adapter_name = 'adapter_name'
+ mac_address = '54:EE:75:19:F4:61'
broadcast = '0.0.0.0'
dns_list = ['8.8.8.8']
@@ -540,10 +541,9 @@ class WindowsUtilsTest(unittest.TestCase):
self.assertRaises(
exception.CloudbaseInitException,
self._winutils.set_static_network_config,
- adapter_name, address, self._NETMASK,
+ mac_address, address, self._NETMASK,
broadcast, self._GATEWAY, dns_list)
else:
- mock_sanitize_wmi_input.return_value = adapter_name
conn.return_value.query.return_value = adapter
adapter_config = adapter[0].associators()[0]
adapter_config.EnableStatic.return_value = ret_val1
@@ -555,26 +555,26 @@ class WindowsUtilsTest(unittest.TestCase):
self.assertRaises(
exception.CloudbaseInitException,
self._winutils.set_static_network_config,
- adapter_name, address, self._NETMASK,
+ mac_address, address, self._NETMASK,
broadcast, self._GATEWAY, dns_list)
elif ret_val2[0] > 1:
self.assertRaises(
exception.CloudbaseInitException,
self._winutils.set_static_network_config,
- adapter_name, address, self._NETMASK,
+ mac_address, address, self._NETMASK,
broadcast, self._GATEWAY, dns_list)
elif ret_val3[0] > 1:
self.assertRaises(
exception.CloudbaseInitException,
self._winutils.set_static_network_config,
- adapter_name, address, self._NETMASK,
+ mac_address, address, self._NETMASK,
broadcast, self._GATEWAY, dns_list)
else:
response = self._winutils.set_static_network_config(
- adapter_name, address, self._NETMASK,
+ mac_address, address, self._NETMASK,
broadcast, self._GATEWAY, dns_list)
if ret_val1[0] or ret_val2[0] or ret_val3[0] == 1:
@@ -588,14 +588,12 @@ class WindowsUtilsTest(unittest.TestCase):
adapter_config.SetDNSServerSearchOrder.assert_called_with(
dns_list)
- self._winutils._sanitize_wmi_input.assert_called_with(
- adapter_name)
adapter[0].associators.assert_called_with(
wmi_result_class='Win32_NetworkAdapterConfiguration')
conn.return_value.query.assert_called_with(
- 'SELECT * FROM Win32_NetworkAdapter WHERE MACAddress IS '
- 'NOT NULL AND Name = \'%(adapter_name_san)s\'' %
- {'adapter_name_san': adapter_name})
+ "SELECT * FROM Win32_NetworkAdapter WHERE "
+ "MACAddress = '{}'".format(mac_address)
+ )
def test_set_static_network_config(self):
adapter = mock.MagicMock()
diff --git a/cloudbaseinit/tests/plugins/windows/test_networkconfig.py b/cloudbaseinit/tests/plugins/windows/test_networkconfig.py
index 02d72d59..18801154 100644
--- a/cloudbaseinit/tests/plugins/windows/test_networkconfig.py
+++ b/cloudbaseinit/tests/plugins/windows/test_networkconfig.py
@@ -1,5 +1,3 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
# Copyright 2013 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -14,21 +12,24 @@
# License for the specific language governing permissions and limitations
# under the License.
-import mock
+
import re
import unittest
+import mock
from oslo.config import cfg
from cloudbaseinit import exception
-from cloudbaseinit.plugins import base
+from cloudbaseinit.metadata.services import base as service_base
+from cloudbaseinit.plugins import base as plugin_base
from cloudbaseinit.plugins.windows import networkconfig
from cloudbaseinit.tests.metadata import fake_json_response
+
CONF = cfg.CONF
-class NetworkConfigPluginPluginTests(unittest.TestCase):
+class TestNetworkConfigPlugin(unittest.TestCase):
def setUp(self):
self._network_plugin = networkconfig.NetworkConfigPlugin()
@@ -36,50 +37,139 @@ class NetworkConfigPluginPluginTests(unittest.TestCase):
'2013-04-04')
@mock.patch('cloudbaseinit.osutils.factory.get_os_utils')
- def _test_execute(self, mock_get_os_utils, search_result, no_adapters):
- CONF.set_override('network_adapter', 'fake adapter')
+ def _test_execute(self, mock_get_os_utils,
+ search_result=mock.MagicMock(),
+ no_adapter_name=False, no_adapters=False,
+ using_content=0, details_list=None,
+ missing_content_path=False):
+ fake_adapter = ("fake_name_0", "fake_mac_0")
mock_service = mock.MagicMock()
mock_osutils = mock.MagicMock()
+ mock_ndetails = mock.Mock()
re.search = mock.MagicMock(return_value=search_result)
fake_shared_data = 'fake shared data'
network_config = self.fake_data['network_config']
- mock_service.get_network_config.return_value = network_config
- mock_service.get_content.return_value = search_result
+ if not details_list:
+ details_list = [None] * 6
+ details_list[0] = fake_adapter[1] # set MAC for matching
+ if no_adapter_name: # nothing provided in the config file
+ CONF.set_override("network_adapter", None)
+ else:
+ CONF.set_override("network_adapter", fake_adapter[0])
+ mock_osutils.get_network_adapters.return_value = [
+ fake_adapter,
+ # and other adapters
+ ("name1", "mac1"),
+ ("name2", "mac2")
+ ]
mock_get_os_utils.return_value = mock_osutils
mock_osutils.set_static_network_config.return_value = False
- if search_result is None:
+ # service method setup
+ methods = ["get_network_config", "get_content", "get_network_details"]
+ for method in methods:
+ mock_method = getattr(mock_service, method)
+ mock_method.return_value = None
+ if using_content == 1:
+ mock_service.get_network_config.return_value = network_config
+ mock_service.get_content.return_value = search_result
+
+ elif using_content == 2:
+ mock_service.get_network_details.return_value = [mock_ndetails]
+ # actual tests
+ if search_result is None and using_content == 1:
self.assertRaises(exception.CloudbaseInitException,
self._network_plugin.execute,
mock_service, fake_shared_data)
- elif no_adapters:
- CONF.set_override('network_adapter', None)
+ return
+ if no_adapters:
mock_osutils.get_network_adapters.return_value = []
self.assertRaises(exception.CloudbaseInitException,
self._network_plugin.execute,
mock_service, fake_shared_data)
-
- else:
+ return
+ attrs = [
+ "address",
+ "netmask",
+ "broadcast",
+ "gateway",
+ "dnsnameservers",
+ ]
+ if using_content == 0:
response = self._network_plugin.execute(mock_service,
fake_shared_data)
-
- mock_service.get_network_config.assert_called_once_with()
- mock_service.get_content.assert_called_once_with(
- network_config['content_path'])
+ elif using_content == 1:
+ if missing_content_path:
+ mock_service.get_network_config.return_value.pop(
+ "content_path", None
+ )
+ response = self._network_plugin.execute(mock_service,
+ fake_shared_data)
+ if not missing_content_path:
+ mock_service.get_network_config.assert_called_once_with()
+ mock_service.get_content.assert_called_once_with(
+ network_config['content_path'])
+ adapters = mock_osutils.get_network_adapters()
+ if CONF.network_adapter:
+ mac = [pair[1] for pair in adapters
+ if pair == fake_adapter][0]
+ else:
+ mac = adapters[0][1]
+ (
+ address,
+ netmask,
+ broadcast,
+ gateway,
+ dnsnameserver
+ ) = map(search_result.group, attrs)
+ dnsnameservers = dnsnameserver.strip().split(" ")
+ elif using_content == 2:
+ with self.assertRaises(exception.CloudbaseInitException):
+ self._network_plugin.execute(mock_service,
+ fake_shared_data)
+ mock_service.get_network_details.reset_mock()
+ mock_ndetails = service_base.NetworkDetails(*details_list)
+ mock_service.get_network_details.return_value = [mock_ndetails]
+ response = self._network_plugin.execute(mock_service,
+ fake_shared_data)
+ mock_service.get_network_details.assert_called_once_with()
+ mac = mock_ndetails.mac
+ (
+ address,
+ netmask,
+ broadcast,
+ gateway,
+ dnsnameservers
+ ) = map(lambda attr: getattr(mock_ndetails, attr), attrs)
+ if using_content in (1, 2) and not missing_content_path:
mock_osutils.set_static_network_config.assert_called_once_with(
- 'fake adapter', search_result.group('address'),
- search_result.group('netmask'),
- search_result.group('broadcast'),
- search_result.group('gateway'),
- search_result.group('dnsnameservers').strip().split(' '))
- self.assertEqual((base.PLUGIN_EXECUTION_DONE, False), response)
+ mac,
+ address,
+ netmask,
+ broadcast,
+ gateway,
+ dnsnameservers
+ )
+ self.assertEqual((plugin_base.PLUGIN_EXECUTION_DONE, False),
+ response)
def test_execute(self):
- m = mock.MagicMock()
- self._test_execute(search_result=m, no_adapters=False)
+ self._test_execute(using_content=1)
+
+ def test_execute_missing_content_path(self):
+ self._test_execute(using_content=1, missing_content_path=True)
def test_execute_no_debian(self):
- self._test_execute(search_result=None, no_adapters=False)
+ self._test_execute(search_result=None, using_content=1)
- def test_execute_no_adapters(self):
- m = mock.MagicMock()
- self._test_execute(search_result=m, no_adapters=True)
+ def test_execute_no_adapter_name(self):
+ self._test_execute(no_adapter_name=True, using_content=1)
+
+ def test_execute_no_adapter_name_or_adapters(self):
+ self._test_execute(no_adapter_name=True, no_adapters=True,
+ using_content=1)
+
+ def test_execute_network_details(self):
+ self._test_execute(using_content=2)
+
+ def test_execute_no_config_or_details(self):
+ self._test_execute(using_content=0)