Add support for update password

If the metadata provider can update the password, the SetUserPassword
plugin will run at every boot and will set the new password only
if it exists.

Change-Id: Ifde8893da7cc2f4452b6cae5cf28b8000a0847ee
This commit is contained in:
Alexandru Coman 2015-03-20 17:27:12 +02:00
parent 4e7e8ac867
commit 40cc7b8fdb
3 changed files with 127 additions and 56 deletions

View File

@ -136,3 +136,30 @@ class BaseMetadataService(object):
def cleanup(self):
pass
@property
def can_update_password(self):
"""The ability to update password of the metadata provider.
If :meth:`~can_update_password` is True, plugins can check
periodically (e.g. at every boot) if the password changed.
:rtype: bool
.. notes:
The password will be updated only if the
:meth:`~is_password_changed` returns True.
"""
return False
def is_password_changed(self):
"""Check if the metadata provider has a new password for this
instance.
:rtype: bool
.. notes:
This method will be used only when :meth:`~can_update_password`
is True.
"""
return False

View File

@ -60,11 +60,6 @@ class SetUserPasswordPlugin(base.BasePlugin):
'changing it as soon as possible')
else:
password = shared_data.get(constants.SHARED_DATA_PASSWORD)
if not password:
LOG.debug('Generating a random user password')
# Generate a random password
maximum_length = osutils.get_maximum_password_length()
password = osutils.generate_random_password(maximum_length)
return password
@ -84,8 +79,27 @@ class SetUserPasswordPlugin(base.BasePlugin):
return True
def _set_password(self, service, osutils, user_name, shared_data):
"""Change the password for the received username if it is required.
The used password can be the one received from the metadata provider,
if it does exist, or a random one will be generated.
.. notes:
This method has a different behaviour depending on the value of
:meth:`~can_update password` if this is True the password will
be set only if the :meth:`~is_password_changed` is also True.
"""
if service.can_update_password and not service.is_password_changed():
LOG.info('Updating password is not required.')
return None
password = self._get_password(service, osutils, shared_data)
LOG.info('Setting the user\'s password')
if not password:
LOG.debug('Generating a random user password')
maximum_length = osutils.get_maximum_password_length()
password = osutils.generate_random_password(
maximum_length)
osutils.set_user_password(user_name, password)
return password
@ -99,14 +113,22 @@ class SetUserPasswordPlugin(base.BasePlugin):
if osutils.user_exists(user_name):
password = self._set_password(service, osutils,
user_name, shared_data)
LOG.info('Password succesfully updated for user %s' % user_name)
if password:
LOG.info('Password succesfully updated for user %s' %
user_name)
# TODO(alexpilotti): encrypt with DPAPI
shared_data[constants.SHARED_DATA_PASSWORD] = password
if not service.can_post_password:
LOG.info('Cannot set the password in the metadata as it is '
'not supported by this service')
LOG.info('Cannot set the password in the metadata as it '
'is not supported by this service')
else:
self._set_metadata_password(password, service)
if service.can_update_password:
# If the metadata provider can update the password, the plugin
# must run at every boot in order to update the password if
# it was changed.
return (base.PLUGIN_EXECUTE_ON_NEXT_BOOT, False)
else:
return (base.PLUGIN_EXECUTION_DONE, False)

View File

@ -71,11 +71,10 @@ class SetUserPasswordPluginTests(unittest.TestCase):
def test_get_ssh_plublic_key_no_pub_keys(self):
self._test_get_ssh_public_key(data_exists=False)
def _test_get_password(self, inject_password, generate_password):
def _test_get_password(self, inject_password):
shared_data = {}
reuse_password = not generate_password and not inject_password
expected_password = 'Passw0rd'
if reuse_password:
if not inject_password:
# The password should be the one created by
# CreateUser plugin.
shared_data[constants.SHARED_DATA_PASSWORD] = (
@ -84,7 +83,6 @@ class SetUserPasswordPluginTests(unittest.TestCase):
mock_service = mock.MagicMock()
mock_osutils = mock.MagicMock()
mock_service.get_admin_password.return_value = expected_password
mock_osutils.generate_random_password.return_value = expected_password
with testutils.ConfPatcher('inject_user_password', inject_password):
response = self._setpassword_plugin._get_password(mock_service,
@ -92,31 +90,17 @@ class SetUserPasswordPluginTests(unittest.TestCase):
shared_data)
if inject_password:
mock_service.get_admin_password.assert_called_with()
elif reuse_password:
self.assertFalse(mock_service.get_admin_password.called)
self.assertFalse(mock_osutils.generate_random_password.called)
expected_password = mock.sentinel.create_user_password
else:
mock_osutils.get_maximum_password_length.assert_called_once_with()
mock_osutils.generate_random_password.assert_called_once_with(
mock_osutils.get_maximum_password_length())
self.assertFalse(mock_service.get_admin_password.called)
expected_password = mock.sentinel.create_user_password
self.assertEqual(expected_password, response)
def test_get_password_inject_true(self):
self._test_get_password(generate_password=False,
inject_password=True)
self._test_get_password(inject_password=True)
def test_get_password_inject_false(self):
self._test_get_password(generate_password=False,
inject_password=False)
def test_get_password_get_from_create_user_plugin(self):
self._test_get_password(inject_password=False,
generate_password=False)
def test_get_password_generate(self):
self._test_get_password(inject_password=False,
generate_password=True)
self._test_get_password(inject_password=False)
@mock.patch('cloudbaseinit.plugins.common.setuserpassword.'
'SetUserPasswordPlugin._get_ssh_public_key')
@ -174,23 +158,54 @@ class SetUserPasswordPluginTests(unittest.TestCase):
@mock.patch('cloudbaseinit.plugins.common.setuserpassword.'
'SetUserPasswordPlugin._get_password')
def test_set_password(self, mock_get_password):
def _test_set_password(self, mock_get_password, password,
can_update_password, is_password_changed):
expected_password = password
expected_logging = []
mock_get_password.return_value = password
mock_service = mock.MagicMock()
mock_osutils = mock.MagicMock()
mock_get_password.return_value = 'fake password'
mock_osutils.get_maximum_password_length.return_value = None
mock_osutils.generate_random_password.return_value = 'fake-password'
mock_service.can_update_password = can_update_password
mock_service.is_password_changed.return_value = is_password_changed
with testutils.LogSnatcher('cloudbaseinit.plugins.common.'
'setuserpassword') as snatcher:
response = self._setpassword_plugin._set_password(
mock_service,
mock_osutils,
'fake user',
mock_service, mock_osutils, 'fake_user',
mock.sentinel.shared_data)
if can_update_password and not is_password_changed:
expected_logging.append('Updating password is not required.')
expected_password = None
if not password:
expected_logging.append('Generating a random user password')
expected_password = 'fake-password'
if not can_update_password or is_password_changed:
mock_get_password.assert_called_once_with(
mock_service,
mock_osutils,
mock.sentinel.shared_data)
mock_osutils.set_user_password.assert_called_once_with(
'fake user',
'fake password')
self.assertEqual(response, 'fake password')
mock_service, mock_osutils, mock.sentinel.shared_data)
self.assertEqual(expected_password, response)
self.assertEqual(expected_logging, snatcher.output)
def test_set_password(self):
self._test_set_password(password='Password',
can_update_password=False,
is_password_changed=False)
self._test_set_password(password=None,
can_update_password=False,
is_password_changed=False)
self._test_set_password(password='Password',
can_update_password=True,
is_password_changed=True)
self._test_set_password(password='Password',
can_update_password=True,
is_password_changed=False)
@mock.patch('cloudbaseinit.plugins.common.setuserpassword.'
'SetUserPasswordPlugin._set_password')
@ -198,14 +213,15 @@ class SetUserPasswordPluginTests(unittest.TestCase):
'SetUserPasswordPlugin._set_metadata_password')
@mock.patch('cloudbaseinit.osutils.factory.get_os_utils')
def _test_execute(self, mock_get_os_utils, mock_set_metadata_password,
mock_set_password, is_password_set=False,
can_post_password=True):
mock_set_password, is_password_set,
can_post_password, can_update_password=False):
mock_service = mock.MagicMock()
mock_osutils = mock.MagicMock()
fake_shared_data = mock.MagicMock()
fake_shared_data.get.return_value = 'fake username'
mock_service.is_password_set = is_password_set
mock_service.can_post_password = can_post_password
mock_service.can_update_password = can_update_password
mock_get_os_utils.return_value = mock_osutils
mock_osutils.user_exists.return_value = True
mock_set_password.return_value = 'fake password'
@ -233,10 +249,16 @@ class SetUserPasswordPluginTests(unittest.TestCase):
"as it is not supported by this service")
self.assertFalse(mock_set_metadata_password.called)
if can_update_password:
self.assertEqual((2, False), response)
else:
self.assertEqual((1, False), response)
self.assertEqual(expected_logging, snatcher.output)
def test_execute(self):
self._test_execute(is_password_set=False, can_post_password=False)
self._test_execute(is_password_set=True, can_post_password=True)
self._test_execute(is_password_set=False, can_post_password=True)
self._test_execute(is_password_set=True, can_post_password=True,
can_update_password=True)