From 4836eecd60d7509ffa1b909d20c3cde739e0365d Mon Sep 17 00:00:00 2001 From: Lars Kellogg-Stedman Date: Tue, 22 Apr 2014 21:20:48 -0400 Subject: [PATCH] Introduce support for mocking command output This change rewrites FakePopen to permit registering commands and synthetic output such that client code can call subprocess.Popen() and receive required output without actually executing commands. With this change, it is possible to mock most of the output require during a packstack run. Change-Id: If81f71cfe89a61e25fb62c97ed5b3759039a0bfb --- tests/installer/test_run_setup.py | 16 ++++-- tests/installer/test_utils.py | 4 +- tests/test_base.py | 87 +++++++++++++++++++++++++++---- 3 files changed, 92 insertions(+), 15 deletions(-) diff --git a/tests/installer/test_run_setup.py b/tests/installer/test_run_setup.py index 6c6e977ec..8e91bfa00 100644 --- a/tests/installer/test_run_setup.py +++ b/tests/installer/test_run_setup.py @@ -23,6 +23,7 @@ from unittest import TestCase from packstack.modules import ospluginutils, puppet from packstack.installer import run_setup, basedefs +from packstack.installer.utils import shell from ..test_base import PackstackTestCaseMixin, FakePopen @@ -45,9 +46,15 @@ class CommandLineTestCase(PackstackTestCaseMixin, TestCase): """ # we need following to pass manage_epel(enabled=1) and # manage_rdo(havana-6.noarch\nenabled=0) functions - fake = FakePopen() - fake.stdout = 'havana-6.noarch\nenabled=0enabled=1' - subprocess.Popen = fake + subprocess.Popen = FakePopen + FakePopen.register('cat /etc/resolv.conf | grep nameserver', + stdout='nameserver 127.0.0.1') + FakePopen.register("rpm -q rdo-release " + "--qf='%{version}-%{release}.%{arch}\n'", + stdout='icehouse-2.noarch\n') + FakePopen.register_as_script('yum-config-manager --enable ' + 'openstack-icehouse', + stdout='[openstack-icehouse]\nenabled=1') # create a dummy public key dummy_public_key = os.path.join(self.tempdir, 'id_rsa.pub') @@ -56,7 +63,8 @@ class CommandLineTestCase(PackstackTestCaseMixin, TestCase): # Save sys.argv and replace it with the args we want optparse to use orig_argv = sys.argv - sys.argv = ['packstack', '--ssh-public-key=%s' % dummy_public_key, + sys.argv = ['packstack', '--debug', + '--ssh-public-key=%s' % dummy_public_key, '--install-hosts=127.0.0.1', '--os-swift-install=y', '--nagios-install=y', '--use-epel=y'] diff --git a/tests/installer/test_utils.py b/tests/installer/test_utils.py index 4d46e2b49..9508018dd 100644 --- a/tests/installer/test_utils.py +++ b/tests/installer/test_utils.py @@ -23,7 +23,7 @@ import shutil import tempfile from unittest import TestCase -from ..test_base import PackstackTestCaseMixin +from ..test_base import PackstackTestCaseMixin, FakePopen from packstack.installer.utils import * from packstack.installer.utils.strings import STR_MASK from packstack.installer.exceptions import ExecuteRuntimeError @@ -36,6 +36,8 @@ class ParameterTestCase(PackstackTestCaseMixin, TestCase): def setUp(self): # Creating a temp directory that can be used by tests self.tempdir = tempfile.mkdtemp() + FakePopen.register('echo "this is test"', + stdout='this is test') def tearDown(self): # remove the temp directory diff --git a/tests/test_base.py b/tests/test_base.py index 753b6b6de..1045699eb 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -18,20 +18,87 @@ import shutil import tempfile import subprocess +import logging +import re + +from packstack.installer.utils.shell import block_fmt +from packstack.installer.exceptions import (ScriptRuntimeError, + NetworkError) +from packstack.installer.utils.strings import mask_string + +LOG = logging.getLogger(__name__) class FakePopen(object): - def __init__(self, returncode=0): - self.returncode = returncode - self.stdout = self.stderr = self.data = "" + '''The FakePopen class replaces subprocess.Popen. Instead of actually + executing commands, it permits the caller to register a list of + commands the output to produce using the FakePopen.register and + FakePopen.register_as_script method. By default, FakePopen will return + empty stdout and stderr and a successful (0) returncode. + ''' - def __call__(self, *args, **kwargs): - self.args = args - self.kwargs = kwargs - return self + cmd_registry = {} + script_registry = {} - def communicate(self, data=None): - self.data += data or '' + @classmethod + def register(cls, args, stdout='', stderr='', returncode=0): + '''Register a fake command.''' + if isinstance(args, list): + args = tuple(args) + cls.cmd_registry[args] = {'stdout': stdout, + 'stderr': stderr, + 'returncode': returncode} + + @classmethod + def register_as_script(cls, args, stdout='', stderr='', returncode=0): + '''Register a fake script.''' + if isinstance(args, list): + args = '\n'.join(args) + prefix = "function t(){ exit $? ; } \n trap t ERR \n" + args = prefix + args + cls.script_registry[args] = {'stdout': stdout, + 'stderr': stderr, + 'returncode': returncode} + + def __init__(self, args, **kwargs): + script = ["ssh", "-o", "StrictHostKeyChecking=no", + "-o", "UserKnownHostsFile=/dev/null"] + if args[-1] == "bash -x" and args[:5] == script: + self._init_as_script(args, **kwargs) + else: + self._init_as_cmd(args, **kwargs) + + def _init_as_cmd(self, args, **kwargs): + self._is_script = False + if isinstance(args, list): + args = tuple(args) + cmd = ' '.join(args) + else: + cmd = args + + if args in self.cmd_registry: + this = self.cmd_registry[args] + else: + LOG.warn('call to unregistered command: %s', cmd) + this = {'stdout': '', 'stderr': '', 'returncode': 0} + + self.stdout = this['stdout'] + self.stderr = this['stderr'] + self.returncode = this['returncode'] + + def _init_as_script(self, args, **kwargs): + self._is_script = True + + def communicate(self, input=None): + if self._is_script: + if input in self.script_registry: + this = self.script_registry[input] + else: + LOG.warn('call to unregistered script: %s', input) + this = {'stdout': '', 'stderr': '', 'returncode': 0} + self.stdout = this['stdout'] + self.stderr = this['stderr'] + self.returncode = this['returncode'] return self.stdout, self.stderr @@ -46,7 +113,7 @@ class PackstackTestCaseMixin(object): # some plugins call popen, we're replacing it for tests self._Popen = subprocess.Popen - self.fake_popen = subprocess.Popen = FakePopen() + self.fake_popen = subprocess.Popen = FakePopen def tearDown(self): # remove the temp directory