# Copyright 2013 Cloudbase Solutions Srl
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import unittest

try:
    import unittest.mock as mock
except ImportError:
    import mock


from cloudbaseinit import conf as cloudbaseinit_conf
from cloudbaseinit import constant
from cloudbaseinit.plugins.common import constants
from cloudbaseinit.plugins.common import setuserpassword
from cloudbaseinit.tests.metadata import fake_json_response
from cloudbaseinit.tests import testutils

CONF = cloudbaseinit_conf.CONF


class SetUserPasswordPluginTests(unittest.TestCase):

    def setUp(self):
        self._setpassword_plugin = setuserpassword.SetUserPasswordPlugin()
        self.fake_data = fake_json_response.get_fake_metadata_json(
            '2013-04-04')

    @mock.patch('cloudbaseinit.utils.crypt.CryptManager'
                '.public_encrypt')
    def test_encrypt_password(self, mock_public_encrypt):
        fake_ssh_pub_key = 'ssh-rsa key'
        fake_password = 'fake password'
        fake_encrypt_pwd = 'encrypted password'
        mock_public_encrypt.return_value = fake_encrypt_pwd
        response = self._setpassword_plugin._encrypt_password(
            fake_ssh_pub_key, fake_password)

        self.assertEqual(fake_encrypt_pwd, response)

    def _test_get_password(self, inject_password):
        shared_data = {}
        expected_password = 'Passw0rd'
        if not inject_password:
            # The password should be the one created by
            # CreateUser plugin.
            shared_data[constants.SHARED_DATA_PASSWORD] = (
                mock.sentinel.create_user_password)

        mock_service = mock.MagicMock()
        mock_service.get_admin_password.return_value = expected_password

        with testutils.LogSnatcher('cloudbaseinit.plugins.common.'
                                   'setuserpassword') as snatcher:
            with testutils.ConfPatcher('inject_user_password',
                                       inject_password):
                response = self._setpassword_plugin._get_password(
                    mock_service, shared_data)

        expected_logging = [
            'Using admin_pass metadata user password. '
            'Consider changing it as soon as possible'
        ]
        if inject_password:
            self.assertEqual(expected_logging, snatcher.output)
            mock_service.get_admin_password.assert_called_with()
            expected_password = (expected_password, True)
        else:
            self.assertFalse(mock_service.get_admin_password.called)
            expected_password = (mock.sentinel.create_user_password, False)

        self.assertEqual(expected_password, response)

    def test_get_password_inject_true(self):
        self._test_get_password(inject_password=True)

    def test_get_password_inject_false(self):
        self._test_get_password(inject_password=False)

    @mock.patch('cloudbaseinit.plugins.common.setuserpassword.'
                'SetUserPasswordPlugin._encrypt_password')
    def _test_set_metadata_password(self, mock_encrypt_password,
                                    ssh_pub_key):
        fake_passw0rd = 'fake Passw0rd'
        mock_service = mock.MagicMock()
        mock_encrypt_password.return_value = 'encrypted password'
        mock_service.post_password.return_value = 'value'
        mock_service.can_post_password = True
        mock_service.get_user_pwd_encryption_key.return_value = ssh_pub_key
        mock_service.is_password_set = False
        with testutils.LogSnatcher('cloudbaseinit.plugins.common.'
                                   'setuserpassword') as snatcher:
            response = self._setpassword_plugin._set_metadata_password(
                fake_passw0rd, mock_service)

        expected_logging = []
        if ssh_pub_key is None:
            expected_logging = [
                'No SSH public key available for password encryption'
            ]
            self.assertTrue(response)
        else:
            mock_encrypt_password.assert_called_once_with(ssh_pub_key,
                                                          fake_passw0rd)
            mock_service.post_password.assert_called_with(
                'encrypted password')
            self.assertEqual('value', response)
        self.assertEqual(expected_logging, snatcher.output)

    def test_set_metadata_password_with_ssh_key(self):
        fake_key = 'fake key'
        self._test_set_metadata_password(ssh_pub_key=fake_key)

    def test_set_metadata_password_no_ssh_key(self):
        self._test_set_metadata_password(ssh_pub_key=None)

    def test_set_metadata_password_already_set(self):
        mock_service = mock.MagicMock()
        mock_service.is_password_set = True
        with testutils.LogSnatcher('cloudbaseinit.plugins.common.'
                                   'setuserpassword') as snatcher:
            response = self._setpassword_plugin._set_metadata_password(
                mock.sentinel.fake_password, mock_service)

        self.assertTrue(response)
        expected_logging = ['User\'s password already set in the '
                            'instance metadata and it cannot be '
                            'updated in the instance metadata']
        self.assertEqual(expected_logging, snatcher.output)

    @mock.patch('cloudbaseinit.plugins.common.setuserpassword.'
                'SetUserPasswordPlugin._change_logon_behaviour')
    @mock.patch('cloudbaseinit.plugins.common.setuserpassword.'
                'SetUserPasswordPlugin._get_password')
    def _test_set_password(self, mock_get_password,
                           mock_change_logon_behaviour,
                           password, can_update_password,
                           is_password_changed, max_password_length=20,
                           injected=False):
        expected_password = password
        expected_logging = []
        user = 'fake_user'

        mock_get_password.return_value = (password, injected)

        mock_service = mock.MagicMock()
        mock_osutils = mock.MagicMock()
        if not password:
            expected_password = "*" * CONF.user_password_length

        mock_osutils.generate_random_password.return_value = expected_password
        mock_service.can_update_password = can_update_password
        mock_service.is_password_changed.return_value = is_password_changed

        with testutils.ConfPatcher('user_password_length',
                                   max_password_length):
            with testutils.LogSnatcher('cloudbaseinit.plugins.common.'
                                       'setuserpassword') as snatcher:
                response = self._setpassword_plugin._set_password(
                    mock_service, mock_osutils, user,
                    mock.sentinel.shared_data)

        if can_update_password and not is_password_changed:
            expected_logging.append('Updating password is not required.')
            expected_password = None

        if not password:
            expected_logging.append('Generating a random user password')

        if not can_update_password or is_password_changed:
            mock_get_password.assert_called_once_with(
                mock_service, mock.sentinel.shared_data)

        self.assertEqual(expected_password, response)
        self.assertEqual(expected_logging, snatcher.output)
        if password and can_update_password and is_password_changed:
            mock_change_logon_behaviour.assert_called_once_with(
                user, password_injected=injected)

    def test_set_password(self):
        self._test_set_password(password='Password',
                                can_update_password=False,
                                is_password_changed=False)
        self._test_set_password(password=None,
                                can_update_password=False,
                                is_password_changed=False,
                                max_password_length=25)
        self._test_set_password(password=None,
                                can_update_password=False,
                                is_password_changed=False,
                                max_password_length=10)
        self._test_set_password(password='Password',
                                can_update_password=True,
                                is_password_changed=True)
        self._test_set_password(password='Password',
                                can_update_password=True,
                                is_password_changed=False)

    @mock.patch('cloudbaseinit.plugins.common.setuserpassword.'
                'SetUserPasswordPlugin._set_password')
    @mock.patch('cloudbaseinit.plugins.common.setuserpassword.'
                'SetUserPasswordPlugin._set_metadata_password')
    @mock.patch('cloudbaseinit.osutils.factory.get_os_utils')
    def _test_execute(self, mock_get_os_utils, mock_set_metadata_password,
                      mock_set_password, is_password_set,
                      can_post_password, can_update_password=False):
        mock_service = mock.MagicMock()
        mock_osutils = mock.MagicMock()
        fake_shared_data = mock.MagicMock()
        fake_shared_data.get.return_value = 'fake username'
        mock_service.is_password_set = is_password_set
        mock_service.can_post_password = can_post_password
        mock_service.can_update_password = can_update_password
        mock_get_os_utils.return_value = mock_osutils
        mock_osutils.user_exists.return_value = True
        mock_set_password.return_value = 'fake password'

        with testutils.LogSnatcher('cloudbaseinit.plugins.common.'
                                   'setuserpassword') as snatcher:
            response = self._setpassword_plugin.execute(mock_service,
                                                        fake_shared_data)
        mock_get_os_utils.assert_called_once_with()
        fake_shared_data.get.assert_called_with(
            constants.SHARED_DATA_USERNAME, CONF.username)
        mock_osutils.user_exists.assert_called_once_with('fake username')
        mock_set_password.assert_called_once_with(mock_service, mock_osutils,
                                                  'fake username',
                                                  fake_shared_data)

        expected_logging = [
            "Password succesfully updated for user fake username",
        ]
        if can_post_password:
            mock_set_metadata_password.assert_called_once_with('fake password',
                                                               mock_service)
        else:
            expected_logging.append("Cannot set the password in the metadata "
                                    "as it is not supported by this service")
            self.assertFalse(mock_set_metadata_password.called)

        if can_update_password:
            self.assertEqual((2, False), response)
        else:
            self.assertEqual((1, False), response)

        self.assertEqual(expected_logging, snatcher.output)

    def test_execute(self):
        self._test_execute(is_password_set=False, can_post_password=False)
        self._test_execute(is_password_set=True, can_post_password=True)
        self._test_execute(is_password_set=False, can_post_password=True)
        self._test_execute(is_password_set=True, can_post_password=True,
                           can_update_password=True)

    @mock.patch.object(setuserpassword.osutils_factory, 'get_os_utils')
    @testutils.ConfPatcher('first_logon_behaviour',
                           constant.NEVER_CHANGE)
    def test_logon_behaviour_never_change(self, mock_get_os_utils):
        self._setpassword_plugin._change_logon_behaviour(
            mock.sentinel.username)

        self.assertFalse(mock_get_os_utils.called)

    @testutils.ConfPatcher('first_logon_behaviour',
                           constant.ALWAYS_CHANGE)
    @mock.patch.object(setuserpassword, 'osutils_factory')
    def test_logon_behaviour_always(self, mock_factory):
        self._setpassword_plugin._change_logon_behaviour(
            mock.sentinel.username)

        mock_get_os_utils = mock_factory.get_os_utils
        self.assertTrue(mock_get_os_utils.called)
        osutils = mock_get_os_utils.return_value
        osutils.change_password_next_logon.assert_called_once_with(
            mock.sentinel.username)

    @testutils.ConfPatcher('first_logon_behaviour',
                           constant.CLEAR_TEXT_INJECTED_ONLY)
    @mock.patch.object(setuserpassword, 'osutils_factory')
    def test_change_logon_behaviour_clear_text_password_not_injected(
            self, mock_factory):
        self._setpassword_plugin._change_logon_behaviour(
            mock.sentinel.username,
            password_injected=False)

        mock_get_os_utils = mock_factory.get_os_utils
        self.assertFalse(mock_get_os_utils.called)

    @testutils.ConfPatcher('first_logon_behaviour',
                           constant.CLEAR_TEXT_INJECTED_ONLY)
    @mock.patch.object(setuserpassword, 'osutils_factory')
    def test_logon_behaviour_clear_text_password_injected(
            self, mock_factory):
        self._setpassword_plugin._change_logon_behaviour(
            mock.sentinel.username,
            password_injected=True)

        mock_get_os_utils = mock_factory.get_os_utils
        self.assertTrue(mock_get_os_utils.called)
        osutils = mock_get_os_utils.return_value
        osutils.change_password_next_logon.assert_called_once_with(
            mock.sentinel.username)