
Adjust unit tests and filters in tox.ini Change-Id: I05a3a1b37d0ce3cf33b3b5b9637350f0fd1f0afb
772 lines
34 KiB
Python
772 lines
34 KiB
Python
# coding=utf-8
|
|
|
|
# Copyright 2014 International Business Machines Corporation
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
"""Test class for console_utils driver module."""
|
|
|
|
import errno
|
|
import fcntl
|
|
import ipaddress
|
|
import os
|
|
import random
|
|
import signal
|
|
import socket
|
|
import string
|
|
import subprocess
|
|
import tempfile
|
|
import time
|
|
from unittest import mock
|
|
|
|
from ironic_lib import utils as ironic_utils
|
|
from oslo_config import cfg
|
|
from oslo_service import loopingcall
|
|
import psutil
|
|
|
|
from ironic.common import exception
|
|
from ironic.drivers.modules import console_utils
|
|
from ironic.drivers.modules import ipmitool as ipmi
|
|
from ironic.tests.unit.db import base as db_base
|
|
from ironic.tests.unit.db import utils as db_utils
|
|
from ironic.tests.unit.objects import utils as obj_utils
|
|
|
|
|
|
CONF = cfg.CONF
|
|
|
|
INFO_DICT = db_utils.get_test_ipmi_info()
|
|
|
|
|
|
class ConsoleUtilsTestCase(db_base.DbTestCase):
|
|
|
|
def setUp(self):
|
|
super(ConsoleUtilsTestCase, self).setUp()
|
|
self.node = obj_utils.get_test_node(
|
|
self.context,
|
|
driver_info=INFO_DICT)
|
|
self.info = ipmi._parse_driver_info(self.node)
|
|
self.mock_stdout = tempfile.NamedTemporaryFile(delete=False)
|
|
self.mock_stderr = tempfile.NamedTemporaryFile(delete=False)
|
|
|
|
def tearDown(self):
|
|
super(ConsoleUtilsTestCase, self).tearDown()
|
|
self.mock_stdout.close()
|
|
self.mock_stderr.close()
|
|
os.remove(self.mock_stdout.name)
|
|
os.remove(self.mock_stderr.name)
|
|
|
|
def test__get_console_pid_dir(self):
|
|
pid_dir = '/tmp/pid_dir'
|
|
self.config(terminal_pid_dir=pid_dir, group='console')
|
|
dir = console_utils._get_console_pid_dir()
|
|
self.assertEqual(pid_dir, dir)
|
|
|
|
def test__get_console_pid_dir_tempdir(self):
|
|
self.config(tempdir='/tmp/fake_dir')
|
|
dir = console_utils._get_console_pid_dir()
|
|
self.assertEqual(CONF.tempdir, dir)
|
|
|
|
@mock.patch.object(os, 'makedirs', autospec=True)
|
|
@mock.patch.object(os.path, 'exists', autospec=True)
|
|
def test__ensure_console_pid_dir_exists(self, mock_path_exists,
|
|
mock_makedirs):
|
|
mock_path_exists.return_value = True
|
|
mock_makedirs.side_effect = OSError
|
|
pid_dir = console_utils._get_console_pid_dir()
|
|
|
|
console_utils._ensure_console_pid_dir_exists()
|
|
|
|
mock_path_exists.assert_called_once_with(pid_dir)
|
|
self.assertFalse(mock_makedirs.called)
|
|
|
|
@mock.patch.object(os, 'makedirs', autospec=True)
|
|
@mock.patch.object(os.path, 'exists', autospec=True)
|
|
def test__ensure_console_pid_dir_exists_fail(self, mock_path_exists,
|
|
mock_makedirs):
|
|
mock_path_exists.return_value = False
|
|
mock_makedirs.side_effect = OSError
|
|
pid_dir = console_utils._get_console_pid_dir()
|
|
|
|
self.assertRaises(exception.ConsoleError,
|
|
console_utils._ensure_console_pid_dir_exists)
|
|
|
|
mock_path_exists.assert_called_once_with(pid_dir)
|
|
mock_makedirs.assert_called_once_with(pid_dir)
|
|
|
|
@mock.patch.object(console_utils, '_get_console_pid_dir', autospec=True)
|
|
def test__get_console_pid_file(self, mock_dir):
|
|
mock_dir.return_value = tempfile.gettempdir()
|
|
expected_path = '%(tempdir)s/%(uuid)s.pid' % {
|
|
'tempdir': mock_dir.return_value,
|
|
'uuid': self.info['uuid']}
|
|
path = console_utils._get_console_pid_file(self.info['uuid'])
|
|
self.assertEqual(expected_path, path)
|
|
mock_dir.assert_called_once_with()
|
|
|
|
@mock.patch.object(console_utils, 'open',
|
|
mock.mock_open(read_data='12345\n'))
|
|
@mock.patch.object(console_utils, '_get_console_pid_file', autospec=True)
|
|
def test__get_console_pid(self, mock_pid_file):
|
|
tmp_file_handle = tempfile.NamedTemporaryFile()
|
|
tmp_file = tmp_file_handle.name
|
|
|
|
mock_pid_file.return_value = tmp_file
|
|
|
|
pid = console_utils._get_console_pid(self.info['uuid'])
|
|
|
|
mock_pid_file.assert_called_once_with(self.info['uuid'])
|
|
self.assertEqual(pid, 12345)
|
|
|
|
@mock.patch.object(console_utils, 'open',
|
|
mock.mock_open(read_data='Hello World\n'))
|
|
@mock.patch.object(console_utils, '_get_console_pid_file', autospec=True)
|
|
def test__get_console_pid_not_a_num(self, mock_pid_file):
|
|
tmp_file_handle = tempfile.NamedTemporaryFile()
|
|
tmp_file = tmp_file_handle.name
|
|
|
|
mock_pid_file.return_value = tmp_file
|
|
|
|
self.assertRaises(exception.NoConsolePid,
|
|
console_utils._get_console_pid,
|
|
self.info['uuid'])
|
|
mock_pid_file.assert_called_once_with(self.info['uuid'])
|
|
|
|
def test__get_console_pid_file_not_found(self):
|
|
self.assertRaises(exception.NoConsolePid,
|
|
console_utils._get_console_pid,
|
|
self.info['uuid'])
|
|
|
|
@mock.patch.object(ironic_utils, 'unlink_without_raise', autospec=True)
|
|
@mock.patch.object(os, 'kill', autospec=True)
|
|
@mock.patch.object(console_utils, '_get_console_pid', autospec=True)
|
|
def test__stop_console(self, mock_pid, mock_kill, mock_unlink):
|
|
pid_file = console_utils._get_console_pid_file(self.info['uuid'])
|
|
mock_pid.return_value = 12345
|
|
|
|
console_utils._stop_console(self.info['uuid'])
|
|
|
|
mock_pid.assert_called_once_with(self.info['uuid'])
|
|
|
|
# a check if process still exist (signal 0) in a loop
|
|
mock_kill.assert_any_call(mock_pid.return_value, signal.SIG_DFL)
|
|
# and that it receives the SIGTERM
|
|
mock_kill.assert_any_call(mock_pid.return_value, signal.SIGTERM)
|
|
mock_unlink.assert_called_once_with(pid_file)
|
|
|
|
@mock.patch.object(ironic_utils, 'unlink_without_raise', autospec=True)
|
|
@mock.patch.object(os, 'kill', autospec=True)
|
|
@mock.patch.object(psutil, 'pid_exists', autospec=True, return_value=True)
|
|
@mock.patch.object(console_utils, '_get_console_pid', autospec=True)
|
|
def test__stop_console_forced_kill(self, mock_pid, mock_psutil, mock_kill,
|
|
mock_unlink):
|
|
pid_file = console_utils._get_console_pid_file(self.info['uuid'])
|
|
mock_pid.return_value = 12345
|
|
|
|
console_utils._stop_console(self.info['uuid'])
|
|
|
|
mock_pid.assert_called_once_with(self.info['uuid'])
|
|
|
|
# Make sure console process receives hard SIGKILL
|
|
mock_kill.assert_any_call(mock_pid.return_value, signal.SIGKILL)
|
|
mock_unlink.assert_called_once_with(pid_file)
|
|
|
|
@mock.patch.object(ironic_utils, 'unlink_without_raise', autospec=True)
|
|
@mock.patch.object(os, 'kill', autospec=True)
|
|
@mock.patch.object(console_utils, '_get_console_pid', autospec=True)
|
|
def test__stop_console_nopid(self, mock_pid, mock_kill, mock_unlink):
|
|
pid_file = console_utils._get_console_pid_file(self.info['uuid'])
|
|
mock_pid.side_effect = exception.NoConsolePid(pid_path="/tmp/blah")
|
|
|
|
self.assertRaises(exception.NoConsolePid,
|
|
console_utils._stop_console,
|
|
self.info['uuid'])
|
|
|
|
mock_pid.assert_called_once_with(self.info['uuid'])
|
|
self.assertFalse(mock_kill.called)
|
|
mock_unlink.assert_called_once_with(pid_file)
|
|
|
|
@mock.patch.object(ironic_utils, 'unlink_without_raise', autospec=True)
|
|
@mock.patch.object(os, 'kill', autospec=True)
|
|
@mock.patch.object(console_utils, '_get_console_pid', autospec=True)
|
|
def test__stop_console_shellinabox_not_running(self, mock_pid,
|
|
mock_kill, mock_unlink):
|
|
pid_file = console_utils._get_console_pid_file(self.info['uuid'])
|
|
mock_pid.return_value = 12345
|
|
mock_kill.side_effect = OSError(errno.ESRCH, 'message')
|
|
|
|
console_utils._stop_console(self.info['uuid'])
|
|
|
|
mock_pid.assert_called_once_with(self.info['uuid'])
|
|
mock_kill.assert_called_once_with(mock_pid.return_value,
|
|
signal.SIGTERM)
|
|
mock_unlink.assert_called_once_with(pid_file)
|
|
|
|
@mock.patch.object(ironic_utils, 'unlink_without_raise', autospec=True)
|
|
@mock.patch.object(os, 'kill', autospec=True)
|
|
@mock.patch.object(console_utils, '_get_console_pid', autospec=True)
|
|
def test__stop_console_exception(self, mock_pid, mock_kill, mock_unlink):
|
|
pid_file = console_utils._get_console_pid_file(self.info['uuid'])
|
|
mock_pid.return_value = 12345
|
|
mock_kill.side_effect = OSError(2, 'message')
|
|
|
|
self.assertRaises(exception.ConsoleError,
|
|
console_utils._stop_console,
|
|
self.info['uuid'])
|
|
|
|
mock_pid.assert_called_once_with(self.info['uuid'])
|
|
mock_kill.assert_called_once_with(mock_pid.return_value,
|
|
signal.SIGTERM)
|
|
mock_unlink.assert_called_once_with(pid_file)
|
|
|
|
def _get_shellinabox_console(self, scheme):
|
|
generated_url = (
|
|
console_utils.get_shellinabox_console_url(self.info['port']))
|
|
console_host = CONF.my_ip
|
|
if ipaddress.ip_address(console_host).version == 6:
|
|
console_host = '[%s]' % console_host
|
|
http_url = "%s://%s:%s" % (scheme, console_host, self.info['port'])
|
|
self.assertEqual(http_url, generated_url)
|
|
|
|
def test_get_shellinabox_console_url(self):
|
|
self._get_shellinabox_console('http')
|
|
|
|
def test_get_shellinabox_console_https_url(self):
|
|
# specify terminal_cert_dir in /etc/ironic/ironic.conf
|
|
self.config(terminal_cert_dir='/tmp', group='console')
|
|
# use https
|
|
self._get_shellinabox_console('https')
|
|
|
|
def test_make_persistent_password_file(self):
|
|
filepath = '%(tempdir)s/%(node_uuid)s' % {
|
|
'tempdir': tempfile.gettempdir(),
|
|
'node_uuid': self.info['uuid']}
|
|
password = ''.join([random.choice(string.ascii_letters)
|
|
for n in range(16)])
|
|
console_utils.make_persistent_password_file(filepath, password)
|
|
# make sure file exists
|
|
self.assertTrue(os.path.exists(filepath))
|
|
# make sure the content is correct
|
|
with open(filepath) as file:
|
|
content = file.read()
|
|
self.assertEqual(password, content)
|
|
# delete the file
|
|
os.unlink(filepath)
|
|
|
|
@mock.patch.object(os, 'chmod', autospec=True)
|
|
def test_make_persistent_password_file_fail(self, mock_chmod):
|
|
mock_chmod.side_effect = IOError()
|
|
filepath = '%(tempdir)s/%(node_uuid)s' % {
|
|
'tempdir': tempfile.gettempdir(),
|
|
'node_uuid': self.info['uuid']}
|
|
self.assertRaises(exception.PasswordFileFailedToCreate,
|
|
console_utils.make_persistent_password_file,
|
|
filepath,
|
|
'password')
|
|
|
|
@mock.patch.object(fcntl, 'fcntl', autospec=True)
|
|
@mock.patch.object(console_utils, 'open',
|
|
mock.mock_open(read_data='12345\n'))
|
|
@mock.patch.object(os.path, 'exists', autospec=True)
|
|
@mock.patch.object(subprocess, 'Popen', autospec=True)
|
|
@mock.patch.object(psutil, 'pid_exists', autospec=True)
|
|
@mock.patch.object(console_utils, '_ensure_console_pid_dir_exists',
|
|
autospec=True)
|
|
@mock.patch.object(console_utils, '_stop_console', autospec=True)
|
|
def test_start_shellinabox_console(self, mock_stop,
|
|
mock_dir_exists,
|
|
mock_pid_exists,
|
|
mock_popen,
|
|
mock_path_exists, mock_fcntl):
|
|
mock_popen.return_value.poll.return_value = 0
|
|
|
|
self.mock_stdout.write(b'0')
|
|
self.mock_stdout.seek(0)
|
|
mock_popen.return_value.stdout = self.mock_stdout
|
|
|
|
self.mock_stderr.write(b'1')
|
|
self.mock_stderr.seek(0)
|
|
mock_popen.return_value.stderr = self.mock_stderr
|
|
|
|
mock_pid_exists.return_value = True
|
|
mock_path_exists.return_value = True
|
|
|
|
console_utils.start_shellinabox_console(self.info['uuid'],
|
|
self.info['port'],
|
|
'ls&')
|
|
|
|
mock_stop.assert_called_once_with(self.info['uuid'])
|
|
mock_dir_exists.assert_called_once_with()
|
|
mock_pid_exists.assert_called_once_with(12345)
|
|
mock_popen.assert_called_once_with(mock.ANY,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE)
|
|
mock_popen.return_value.poll.assert_called_once_with()
|
|
|
|
@mock.patch.object(fcntl, 'fcntl', autospec=True)
|
|
@mock.patch.object(console_utils, 'open',
|
|
mock.mock_open(read_data='12345\n'))
|
|
@mock.patch.object(os.path, 'exists', autospec=True)
|
|
@mock.patch.object(subprocess, 'Popen', autospec=True)
|
|
@mock.patch.object(psutil, 'pid_exists', autospec=True)
|
|
@mock.patch.object(console_utils, '_ensure_console_pid_dir_exists',
|
|
autospec=True)
|
|
@mock.patch.object(console_utils, '_stop_console', autospec=True)
|
|
def test_start_shellinabox_console_nopid(self, mock_stop,
|
|
mock_dir_exists,
|
|
mock_pid_exists,
|
|
mock_popen,
|
|
mock_path_exists, mock_fcntl):
|
|
# no existing PID file before starting
|
|
mock_stop.side_effect = exception.NoConsolePid('/tmp/blah')
|
|
mock_popen.return_value.poll.return_value = 0
|
|
|
|
self.mock_stdout.write(b'0')
|
|
self.mock_stdout.seek(0)
|
|
mock_popen.return_value.stdout = self.mock_stdout
|
|
|
|
self.mock_stderr.write(b'1')
|
|
self.mock_stderr.seek(0)
|
|
mock_popen.return_value.stderr = self.mock_stderr
|
|
|
|
mock_pid_exists.return_value = True
|
|
mock_path_exists.return_value = True
|
|
|
|
console_utils.start_shellinabox_console(self.info['uuid'],
|
|
self.info['port'],
|
|
'ls&')
|
|
|
|
mock_stop.assert_called_once_with(self.info['uuid'])
|
|
mock_dir_exists.assert_called_once_with()
|
|
mock_pid_exists.assert_called_once_with(12345)
|
|
mock_popen.assert_called_once_with(mock.ANY,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE)
|
|
mock_popen.return_value.poll.assert_called_once_with()
|
|
|
|
@mock.patch.object(time, 'sleep', autospec=True)
|
|
@mock.patch.object(os, 'read', autospec=True)
|
|
@mock.patch.object(fcntl, 'fcntl', autospec=True)
|
|
@mock.patch.object(subprocess, 'Popen', autospec=True)
|
|
@mock.patch.object(console_utils, '_ensure_console_pid_dir_exists',
|
|
autospec=True)
|
|
@mock.patch.object(console_utils, '_stop_console', autospec=True)
|
|
def test_start_shellinabox_console_fail(
|
|
self, mock_stop, mock_dir_exists, mock_popen, mock_fcntl,
|
|
mock_os_read, mock_sleep):
|
|
mock_popen.return_value.poll.return_value = 1
|
|
|
|
self.mock_stdout.write(b'0')
|
|
self.mock_stdout.seek(0)
|
|
mock_popen.return_value.stdout = self.mock_stdout
|
|
|
|
self.mock_stderr.write(b'1')
|
|
self.mock_stderr.seek(0)
|
|
mock_popen.return_value.stderr = self.mock_stderr
|
|
|
|
err_output = b'error output'
|
|
mock_os_read.side_effect = [err_output] * 2 + [OSError] * 2
|
|
mock_fcntl.side_effect = [1, mock.Mock()] * 2
|
|
|
|
self.assertRaisesRegex(
|
|
exception.ConsoleSubprocessFailed, "Stdout: %r" % err_output,
|
|
console_utils.start_shellinabox_console, self.info['uuid'],
|
|
self.info['port'], 'ls&')
|
|
|
|
mock_stop.assert_called_once_with(self.info['uuid'])
|
|
mock_sleep.assert_has_calls([mock.call(1), mock.call(1)])
|
|
mock_dir_exists.assert_called_once_with()
|
|
for obj in (self.mock_stdout, self.mock_stderr):
|
|
mock_fcntl.assert_has_calls([
|
|
mock.call(obj, fcntl.F_GETFL),
|
|
mock.call(obj, fcntl.F_SETFL, 1 | os.O_NONBLOCK)])
|
|
mock_popen.assert_called_once_with(mock.ANY,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE)
|
|
mock_popen.return_value.poll.assert_called_with()
|
|
|
|
@mock.patch.object(fcntl, 'fcntl', autospec=True)
|
|
@mock.patch.object(subprocess, 'Popen', autospec=True)
|
|
@mock.patch.object(console_utils, '_ensure_console_pid_dir_exists',
|
|
autospec=True)
|
|
@mock.patch.object(console_utils, '_stop_console', autospec=True)
|
|
def test_start_shellinabox_console_timeout(
|
|
self, mock_stop, mock_dir_exists, mock_popen, mock_fcntl):
|
|
self.config(subprocess_timeout=0, group='console')
|
|
self.config(subprocess_checking_interval=0, group='console')
|
|
mock_popen.return_value.poll.return_value = None
|
|
|
|
self.mock_stdout.write(b'0')
|
|
self.mock_stdout.seek(0)
|
|
mock_popen.return_value.stdout = self.mock_stdout
|
|
|
|
self.mock_stderr.write(b'1')
|
|
self.mock_stderr.seek(0)
|
|
mock_popen.return_value.stderr = self.mock_stderr
|
|
|
|
self.assertRaisesRegex(
|
|
exception.ConsoleSubprocessFailed, 'Timeout or error',
|
|
console_utils.start_shellinabox_console, self.info['uuid'],
|
|
self.info['port'], 'ls&')
|
|
|
|
mock_stop.assert_called_once_with(self.info['uuid'])
|
|
mock_dir_exists.assert_called_once_with()
|
|
mock_popen.assert_called_once_with(mock.ANY,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE)
|
|
mock_popen.return_value.poll.assert_called_with()
|
|
self.assertEqual(0, mock_popen.return_value.communicate.call_count)
|
|
|
|
@mock.patch.object(time, 'sleep', autospec=True)
|
|
@mock.patch.object(os, 'read', autospec=True)
|
|
@mock.patch.object(fcntl, 'fcntl', autospec=True)
|
|
@mock.patch.object(console_utils, 'open',
|
|
mock.mock_open(read_data='12345\n'))
|
|
@mock.patch.object(os.path, 'exists', autospec=True)
|
|
@mock.patch.object(subprocess, 'Popen', autospec=True)
|
|
@mock.patch.object(psutil, 'pid_exists', autospec=True)
|
|
@mock.patch.object(console_utils, '_ensure_console_pid_dir_exists',
|
|
autospec=True)
|
|
@mock.patch.object(console_utils, '_stop_console', autospec=True)
|
|
def test_start_shellinabox_console_fail_no_pid(
|
|
self, mock_stop, mock_dir_exists, mock_pid_exists, mock_popen,
|
|
mock_path_exists, mock_fcntl, mock_os_read, mock_sleep):
|
|
mock_popen.return_value.poll.return_value = 0
|
|
|
|
self.mock_stdout.write(b'0')
|
|
self.mock_stdout.seek(0)
|
|
mock_popen.return_value.stdout = self.mock_stdout
|
|
|
|
self.mock_stderr.write(b'1')
|
|
self.mock_stderr.seek(0)
|
|
mock_popen.return_value.stderr = self.mock_stderr
|
|
|
|
mock_pid_exists.return_value = False
|
|
mock_os_read.side_effect = [b'error output'] * 2 + [OSError] * 2
|
|
mock_fcntl.side_effect = [1, mock.Mock()] * 2
|
|
mock_path_exists.return_value = True
|
|
|
|
self.assertRaises(exception.ConsoleSubprocessFailed,
|
|
console_utils.start_shellinabox_console,
|
|
self.info['uuid'],
|
|
self.info['port'],
|
|
'ls&')
|
|
|
|
mock_stop.assert_called_once_with(self.info['uuid'])
|
|
mock_sleep.assert_has_calls([mock.call(1), mock.call(1)])
|
|
mock_dir_exists.assert_called_once_with()
|
|
for obj in (self.mock_stdout, self.mock_stderr):
|
|
mock_fcntl.assert_has_calls([
|
|
mock.call(obj, fcntl.F_GETFL),
|
|
mock.call(obj, fcntl.F_SETFL, 1 | os.O_NONBLOCK)])
|
|
mock_pid_exists.assert_called_with(12345)
|
|
mock_popen.assert_called_once_with(mock.ANY,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE)
|
|
mock_popen.return_value.poll.assert_called_with()
|
|
|
|
@mock.patch.object(subprocess, 'Popen', autospec=True)
|
|
@mock.patch.object(console_utils, '_ensure_console_pid_dir_exists',
|
|
autospec=True)
|
|
@mock.patch.object(console_utils, '_stop_console', autospec=True)
|
|
def test_start_shellinabox_console_fail_nopiddir(self, mock_stop,
|
|
mock_dir_exists,
|
|
mock_popen):
|
|
mock_dir_exists.side_effect = exception.ConsoleError(message='fail')
|
|
mock_popen.return_value.poll.return_value = 0
|
|
|
|
self.assertRaises(exception.ConsoleError,
|
|
console_utils.start_shellinabox_console,
|
|
self.info['uuid'],
|
|
self.info['port'],
|
|
'ls&')
|
|
|
|
mock_stop.assert_called_once_with(self.info['uuid'])
|
|
mock_dir_exists.assert_called_once_with()
|
|
self.assertFalse(mock_popen.called)
|
|
|
|
@mock.patch.object(console_utils, '_stop_console', autospec=True)
|
|
def test_stop_shellinabox_console(self, mock_stop):
|
|
|
|
console_utils.stop_shellinabox_console(self.info['uuid'])
|
|
|
|
mock_stop.assert_called_once_with(self.info['uuid'])
|
|
|
|
@mock.patch.object(console_utils, '_stop_console', autospec=True)
|
|
def test_stop_shellinabox_console_fail_nopid(self, mock_stop):
|
|
mock_stop.side_effect = exception.NoConsolePid('/tmp/blah')
|
|
|
|
console_utils.stop_shellinabox_console(self.info['uuid'])
|
|
|
|
mock_stop.assert_called_once_with(self.info['uuid'])
|
|
|
|
def test_get_socat_console_url_tcp(self):
|
|
self.config(my_ip="10.0.0.1")
|
|
url = console_utils.get_socat_console_url(self.info['port'])
|
|
self.assertEqual("tcp://10.0.0.1:%s" % self.info['port'], url)
|
|
|
|
def test_get_socat_console_url_tcp6(self):
|
|
self.config(my_ip='::1')
|
|
url = console_utils.get_socat_console_url(self.info['port'])
|
|
self.assertEqual("tcp://[::1]:%s" % self.info['port'], url)
|
|
|
|
def test_get_socat_console_url_tcp_with_address_conf(self):
|
|
self.config(socat_address="10.0.0.1", group='console')
|
|
url = console_utils.get_socat_console_url(self.info['port'])
|
|
self.assertEqual("tcp://10.0.0.1:%s" % self.info['port'], url)
|
|
|
|
@mock.patch.object(subprocess, 'Popen', autospec=True)
|
|
@mock.patch.object(console_utils, '_get_console_pid_file', autospec=True)
|
|
@mock.patch.object(console_utils, '_ensure_console_pid_dir_exists',
|
|
autospec=True)
|
|
@mock.patch.object(console_utils, '_stop_console', autospec=True)
|
|
@mock.patch.object(loopingcall.FixedIntervalLoopingCall, 'start',
|
|
autospec=True)
|
|
def _test_start_socat_console_check_arg(self, mock_timer_start,
|
|
mock_stop, mock_dir_exists,
|
|
mock_get_pid, mock_popen):
|
|
mock_timer_start.return_value = mock.Mock()
|
|
mock_get_pid.return_value = '/tmp/%s.pid' % self.info['uuid']
|
|
|
|
console_utils.start_socat_console(self.info['uuid'],
|
|
self.info['port'],
|
|
'ls&')
|
|
|
|
mock_stop.assert_called_once_with(self.info['uuid'])
|
|
mock_dir_exists.assert_called_once_with()
|
|
mock_get_pid.assert_called_once_with(self.info['uuid'])
|
|
mock_timer_start.assert_called_once_with(mock.ANY, interval=mock.ANY)
|
|
mock_popen.assert_called_once_with(mock.ANY, stderr=subprocess.PIPE)
|
|
return mock_popen.call_args[0][0]
|
|
|
|
def test_start_socat_console_check_arg_default_timeout(self):
|
|
args = self._test_start_socat_console_check_arg()
|
|
self.assertIn('-T600', args)
|
|
|
|
def test_start_socat_console_check_arg_timeout(self):
|
|
self.config(terminal_timeout=1, group='console')
|
|
args = self._test_start_socat_console_check_arg()
|
|
self.assertIn('-T1', args)
|
|
|
|
def test_start_socat_console_check_arg_timeout_disabled(self):
|
|
self.config(terminal_timeout=0, group='console')
|
|
args = self._test_start_socat_console_check_arg()
|
|
self.assertNotIn('-T0', args)
|
|
|
|
def test_start_socat_console_check_arg_bind_addr_default_ipv4(self):
|
|
self.config(my_ip='10.0.0.1')
|
|
args = self._test_start_socat_console_check_arg()
|
|
self.assertIn('TCP4-LISTEN:%s,bind=10.0.0.1,reuseaddr,fork,'
|
|
'max-children=1' %
|
|
self.info['port'], args)
|
|
|
|
def test_start_socat_console_check_arg_bind_addr_ipv4(self):
|
|
self.config(socat_address='10.0.0.1', group='console')
|
|
args = self._test_start_socat_console_check_arg()
|
|
self.assertIn('TCP4-LISTEN:%s,bind=10.0.0.1,reuseaddr,fork,'
|
|
'max-children=1' %
|
|
self.info['port'], args)
|
|
|
|
@mock.patch.object(os.path, 'exists', autospec=True)
|
|
@mock.patch.object(subprocess, 'Popen', autospec=True)
|
|
@mock.patch.object(psutil, 'pid_exists', autospec=True)
|
|
@mock.patch.object(console_utils, '_get_console_pid', autospec=True)
|
|
@mock.patch.object(console_utils, '_ensure_console_pid_dir_exists',
|
|
autospec=True)
|
|
@mock.patch.object(console_utils, '_stop_console', autospec=True)
|
|
def test_start_socat_console(self, mock_stop,
|
|
mock_dir_exists,
|
|
mock_get_pid,
|
|
mock_pid_exists,
|
|
mock_popen,
|
|
mock_path_exists):
|
|
mock_popen.return_value.pid = 23456
|
|
mock_popen.return_value.poll.return_value = None
|
|
mock_popen.return_value.communicate.return_value = (None, None)
|
|
|
|
mock_get_pid.return_value = 23456
|
|
mock_path_exists.return_value = True
|
|
|
|
console_utils.start_socat_console(self.info['uuid'],
|
|
self.info['port'],
|
|
'ls&')
|
|
|
|
mock_stop.assert_called_once_with(self.info['uuid'])
|
|
mock_dir_exists.assert_called_once_with()
|
|
mock_get_pid.assert_called_with(self.info['uuid'])
|
|
mock_path_exists.assert_called_with(mock.ANY)
|
|
mock_popen.assert_called_once_with(mock.ANY, stderr=subprocess.PIPE)
|
|
|
|
@mock.patch.object(os.path, 'exists', autospec=True)
|
|
@mock.patch.object(subprocess, 'Popen', autospec=True)
|
|
@mock.patch.object(psutil, 'pid_exists', autospec=True)
|
|
@mock.patch.object(console_utils, '_get_console_pid', autospec=True)
|
|
@mock.patch.object(console_utils, '_ensure_console_pid_dir_exists',
|
|
autospec=True)
|
|
@mock.patch.object(console_utils, '_stop_console', autospec=True)
|
|
def test_start_socat_console_nopid(self, mock_stop,
|
|
mock_dir_exists,
|
|
mock_get_pid,
|
|
mock_pid_exists,
|
|
mock_popen,
|
|
mock_path_exists):
|
|
# no existing PID file before starting
|
|
mock_stop.side_effect = exception.NoConsolePid('/tmp/blah')
|
|
mock_popen.return_value.pid = 23456
|
|
mock_popen.return_value.poll.return_value = None
|
|
mock_popen.return_value.communicate.return_value = (None, None)
|
|
|
|
mock_get_pid.return_value = 23456
|
|
mock_path_exists.return_value = True
|
|
|
|
console_utils.start_socat_console(self.info['uuid'],
|
|
self.info['port'],
|
|
'ls&')
|
|
|
|
mock_stop.assert_called_once_with(self.info['uuid'])
|
|
mock_dir_exists.assert_called_once_with()
|
|
mock_get_pid.assert_called_with(self.info['uuid'])
|
|
mock_path_exists.assert_called_with(mock.ANY)
|
|
mock_popen.assert_called_once_with(mock.ANY, stderr=subprocess.PIPE)
|
|
|
|
@mock.patch.object(subprocess, 'Popen', autospec=True)
|
|
@mock.patch.object(console_utils, '_ensure_console_pid_dir_exists',
|
|
autospec=True)
|
|
@mock.patch.object(console_utils, '_stop_console', autospec=True)
|
|
def test_start_socat_console_fail(self, mock_stop, mock_dir_exists,
|
|
mock_popen):
|
|
mock_popen.side_effect = OSError()
|
|
mock_popen.return_value.pid = 23456
|
|
mock_popen.return_value.poll.return_value = 1
|
|
mock_popen.return_value.communicate.return_value = (None, 'error')
|
|
|
|
self.assertRaises(exception.ConsoleSubprocessFailed,
|
|
console_utils.start_socat_console,
|
|
self.info['uuid'],
|
|
self.info['port'],
|
|
'ls&')
|
|
|
|
mock_stop.assert_called_once_with(self.info['uuid'])
|
|
mock_dir_exists.assert_called_once_with()
|
|
mock_popen.assert_called_once_with(mock.ANY, stderr=subprocess.PIPE)
|
|
|
|
@mock.patch.object(subprocess, 'Popen', autospec=True)
|
|
@mock.patch.object(console_utils, '_ensure_console_pid_dir_exists',
|
|
autospec=True)
|
|
@mock.patch.object(console_utils, '_stop_console', autospec=True)
|
|
def test_start_socat_console_fail_nopiddir(self, mock_stop,
|
|
mock_dir_exists,
|
|
mock_popen):
|
|
mock_dir_exists.side_effect = exception.ConsoleError(message='fail')
|
|
|
|
self.assertRaises(exception.ConsoleError,
|
|
console_utils.start_socat_console,
|
|
self.info['uuid'],
|
|
self.info['port'],
|
|
'ls&')
|
|
|
|
mock_stop.assert_called_once_with(self.info['uuid'])
|
|
mock_dir_exists.assert_called_once_with()
|
|
self.assertEqual(0, mock_popen.call_count)
|
|
|
|
@mock.patch.object(console_utils, '_stop_console', autospec=True)
|
|
def test_stop_socat_console(self, mock_stop):
|
|
console_utils.stop_socat_console(self.info['uuid'])
|
|
mock_stop.assert_called_once_with(self.info['uuid'])
|
|
|
|
@mock.patch.object(console_utils.LOG, 'warning', autospec=True)
|
|
@mock.patch.object(console_utils, '_stop_console', autospec=True)
|
|
def test_stop_socat_console_fail_nopid(self, mock_stop, mock_log_warning):
|
|
mock_stop.side_effect = exception.NoConsolePid('/tmp/blah')
|
|
console_utils.stop_socat_console(self.info['uuid'])
|
|
mock_stop.assert_called_once_with(self.info['uuid'])
|
|
# LOG.warning() is called when _stop_console() raises NoConsolePid
|
|
self.assertTrue(mock_log_warning.called)
|
|
|
|
def test_valid_console_port_range(self):
|
|
self.config(port_range='10000:20000', group='console')
|
|
start, stop = console_utils._get_port_range()
|
|
self.assertEqual((start, stop), (10000, 20000))
|
|
|
|
def test_invalid_console_port_range(self):
|
|
self.config(port_range='20000:10000', group='console')
|
|
self.assertRaises(exception.InvalidParameterValue,
|
|
console_utils._get_port_range)
|
|
|
|
@mock.patch.object(console_utils, 'ALLOCATED_PORTS', autospec=True)
|
|
@mock.patch.object(console_utils, '_verify_port', autospec=True)
|
|
def test_allocate_port_success(self, mock_verify, mock_ports):
|
|
self.config(port_range='10000:10001', group='console')
|
|
port = console_utils.acquire_port()
|
|
mock_verify.assert_called_once_with(10000, host=None)
|
|
self.assertEqual(port, 10000)
|
|
mock_ports.add.assert_called_once_with(10000)
|
|
|
|
@mock.patch.object(console_utils, 'ALLOCATED_PORTS', autospec=True)
|
|
@mock.patch.object(console_utils, '_verify_port', autospec=True)
|
|
def test_allocate_port_range_retry(self, mock_verify, mock_ports):
|
|
self.config(port_range='10000:10003', group='console')
|
|
mock_verify.side_effect = (exception.Conflict, exception.Conflict,
|
|
None)
|
|
port = console_utils.acquire_port()
|
|
verify_calls = [mock.call(10000, host=None),
|
|
mock.call(10001, host=None),
|
|
mock.call(10002, host=None)]
|
|
mock_verify.assert_has_calls(verify_calls)
|
|
self.assertEqual(port, 10002)
|
|
mock_ports.add.assert_called_once_with(10002)
|
|
|
|
@mock.patch.object(console_utils, 'ALLOCATED_PORTS', autospec=True)
|
|
@mock.patch.object(console_utils, '_verify_port', autospec=True)
|
|
def test_allocate_port_no_free_ports(self, mock_verify, mock_ports):
|
|
self.config(port_range='10000:10005', group='console')
|
|
mock_verify.side_effect = exception.Conflict
|
|
self.assertRaises(exception.NoFreeIPMITerminalPorts,
|
|
console_utils.acquire_port)
|
|
verify_calls = [mock.call(p, host=None) for p in range(10000, 10005)]
|
|
mock_verify.assert_has_calls(verify_calls)
|
|
|
|
@mock.patch.object(socket, 'socket', autospec=True)
|
|
def test__verify_port_default(self, mock_socket):
|
|
self.config(host='localhost.localdomain')
|
|
mock_sock = mock.MagicMock()
|
|
mock_socket.return_value = mock_sock
|
|
console_utils._verify_port(10000)
|
|
mock_sock.bind.assert_called_once_with(('localhost.localdomain',
|
|
10000))
|
|
|
|
@mock.patch.object(socket, 'socket', autospec=True)
|
|
def test__verify_port_hostname(self, mock_socket):
|
|
mock_sock = mock.MagicMock()
|
|
mock_socket.return_value = mock_sock
|
|
console_utils._verify_port(10000, host='localhost.localdomain')
|
|
mock_socket.assert_called_once_with()
|
|
mock_sock.bind.assert_called_once_with(('localhost.localdomain',
|
|
10000))
|
|
|
|
@mock.patch.object(socket, 'socket', autospec=True)
|
|
def test__verify_port_ipv4(self, mock_socket):
|
|
mock_sock = mock.MagicMock()
|
|
mock_socket.return_value = mock_sock
|
|
console_utils._verify_port(10000, host='1.2.3.4')
|
|
mock_socket.assert_called_once_with()
|
|
mock_sock.bind.assert_called_once_with(('1.2.3.4', 10000))
|
|
|
|
@mock.patch.object(socket, 'socket', autospec=True)
|
|
def test__verify_port_ipv6(self, mock_socket):
|
|
mock_sock = mock.MagicMock()
|
|
mock_socket.return_value = mock_sock
|
|
console_utils._verify_port(10000, host='2001:dead:beef::1')
|
|
mock_socket.assert_called_once_with(socket.AF_INET6)
|
|
mock_sock.bind.assert_called_once_with(('2001:dead:beef::1', 10000))
|