OSC plugin command for cluster-run

Change-Id: I9530f3ca9a679f011c94019af2fe185172ff90ad
This commit is contained in:
tengqm 2016-08-29 04:32:48 -04:00
parent a5f8b227e3
commit 6b812ff405
3 changed files with 389 additions and 3 deletions

View File

@ -11,12 +11,13 @@
# under the License.
import copy
import mock
import six
import subprocess
import mock
from openstack.cluster.v1 import cluster as sdk_cluster
from openstack import exceptions as sdk_exc
from osc_lib import exceptions as exc
import six
from senlinclient.tests.unit.v1 import fakes
from senlinclient.v1 import cluster as osc_cluster
@ -782,7 +783,7 @@ class TestClusterRecover(TestCluster):
self.mock_client.recover_cluster = mock.Mock(
return_value=self.response)
def test_cluster_recoverk(self):
def test_cluster_recover(self):
arglist = ['cluster1', 'cluster2', 'cluster3']
parsed_args = self.check_parser(self.cmd, arglist, [])
self.cmd.take_action(parsed_args)
@ -833,3 +834,181 @@ class TestClusterCollect(TestCluster):
self.mock_client.collect_cluster_attrs.assert_called_once_with(
'cluster1', 'path.to.attr')
self.assertEqual(['node_id', 'attr_value'], columns)
class TestClusterRun(TestCluster):
attrs = [
mock.Mock(node_id="NODE_ID1",
attr_value={"addresses": 'ADDRESS CONTENT 1'}),
mock.Mock(node_id="NODE_ID2",
attr_value={"addresses": 'ADDRESS CONTENT 2'})
]
def setUp(self):
super(TestClusterRun, self).setUp()
self.cmd = osc_cluster.ClusterRun(self.app, None)
self.mock_client.collect_cluster_attrs = mock.Mock(
return_value=self.attrs)
@mock.patch('subprocess.Popen')
def test__run_script(self, mock_proc):
x_proc = mock.Mock(returncode=0)
x_stdout = 'OUTPUT'
x_stderr = 'ERROR'
x_proc.communicate.return_value = (x_stdout, x_stderr)
mock_proc.return_value = x_proc
addr = {
'private': [
{
'OS-EXT-IPS:type': 'floating',
'version': 4,
'addr': '1.2.3.4',
}
]
}
output = {}
self.cmd._run_script('NODE_ID', addr, 'private', 'floating', 22,
'john', False, 'identity_path', 'echo foo',
'-f bar',
output=output)
mock_proc.assert_called_once_with(
['ssh', '-4', '-p22', '-i identity_path', '-f bar', 'john@1.2.3.4',
'echo foo'],
stdout=subprocess.PIPE)
self.assertEqual(
{'status': 'SUCCEEDED (0)', 'output': 'OUTPUT', 'error': 'ERROR'},
output)
def test__run_script_network_not_found(self):
addr = {'foo': 'bar'}
output = {}
self.cmd._run_script('NODE_ID', addr, 'private', 'floating', 22,
'john', False, 'identity_path', 'echo foo',
'-f bar',
output=output)
self.assertEqual(
{'status': 'FAILED',
'error': "Node 'NODE_ID' is not attached to network 'private'."
},
output)
def test__run_script_more_than_one_network(self):
addr = {'foo': 'bar', 'koo': 'tar'}
output = {}
self.cmd._run_script('NODE_ID', addr, '', 'floating', 22, 'john',
False, 'identity_path', 'echo foo', '-f bar',
output=output)
self.assertEqual(
{'status': 'FAILED',
'error': "Node 'NODE_ID' is attached to more than one "
"network. Please pick the network to use."},
output)
def test__run_script_no_network(self):
addr = {}
output = {}
self.cmd._run_script('NODE_ID', addr, '', 'floating', 22, 'john',
False, 'identity_path', 'echo foo', '-f bar',
output=output)
self.assertEqual(
{'status': 'FAILED',
'error': "Node 'NODE_ID' is not attached to any network."},
output)
def test__run_script_no_matching_address(self):
addr = {
'private': [
{
'OS-EXT-IPS:type': 'fixed',
'version': 4,
'addr': '1.2.3.4',
}
]
}
output = {}
self.cmd._run_script('NODE_ID', addr, 'private', 'floating', 22,
'john', False, 'identity_path', 'echo foo',
'-f bar',
output=output)
self.assertEqual(
{'status': 'FAILED',
'error': "No address that matches network 'private' and "
"type 'floating' of IPv4 has been found for node "
"'NODE_ID'."},
output)
def test__run_script_more_than_one_address(self):
addr = {
'private': [
{
'OS-EXT-IPS:type': 'fixed',
'version': 4,
'addr': '1.2.3.4',
},
{
'OS-EXT-IPS:type': 'fixed',
'version': 4,
'addr': '5.6.7.8',
},
]
}
output = {}
self.cmd._run_script('NODE_ID', addr, 'private', 'fixed', 22, 'john',
False, 'identity_path', 'echo foo', '-f bar',
output=output)
self.assertEqual(
{'status': 'FAILED',
'error': "More than one IPv4 fixed address found."},
output)
@mock.patch('threading.Thread')
@mock.patch.object(osc_cluster.ClusterRun, '_run_script')
def test_cluster_run(self, mock_script, mock_thread):
arglist = [
'--port', '22',
'--address-type', 'fixed',
'--network', 'private',
'--user', 'root',
'--identity-file', 'path-to-identity',
'--ssh-options', '-f boo',
'--script', 'script-file',
'cluster1'
]
parsed_args = self.check_parser(self.cmd, arglist, [])
th1 = mock.Mock()
th2 = mock.Mock()
mock_thread.side_effect = [th1, th2]
fake_script = 'blah blah'
with mock.patch('senlinclient.v1.cluster.open',
mock.mock_open(read_data=fake_script)) as mock_open:
self.cmd.take_action(parsed_args)
self.mock_client.collect_cluster_attrs.assert_called_once_with(
'cluster1', 'details')
mock_open.assert_called_once_with('script-file', 'r')
mock_thread.assert_has_calls([
mock.call(target=mock_script,
args=('NODE_ID1', 'ADDRESS CONTENT 1', 'private',
'fixed', 22, 'root', False, 'path-to-identity',
'blah blah', '-f boo'),
kwargs={'output': {}}),
mock.call(target=mock_script,
args=('NODE_ID2', 'ADDRESS CONTENT 2', 'private',
'fixed', 22, 'root', False, 'path-to-identity',
'blah blah', '-f boo'),
kwargs={'output': {}})
])
th1.start.assert_called_once_with()
th2.start.assert_called_once_with()
th1.join.assert_called_once_with()
th2.join.assert_called_once_with()

View File

@ -13,12 +13,16 @@
"""Clustering v1 cluster action implementations"""
import logging
import subprocess
import sys
import threading
import time
from openstack import exceptions as sdk_exc
from osc_lib.command import command
from osc_lib import exceptions as exc
from osc_lib import utils
import six
from senlinclient.common.i18n import _
from senlinclient.common.i18n import _LI
@ -806,3 +810,205 @@ class ClusterCollect(command.Lister):
return (columns,
(utils.get_item_properties(a, columns, formatters=formatters)
for a in attrs))
class ClusterRun(command.Command):
"""Run scripts on cluster."""
log = logging.getLogger(__name__ + ".ClusterRun")
def get_parser(self, prog_name):
parser = super(ClusterRun, self).get_parser(prog_name)
parser.add_argument(
'--port',
metavar='<port>',
type=int,
default=22,
help=_('The TCP port to use for SSH connection')
)
parser.add_argument(
'--address-type',
metavar='<address_type>',
default='floating',
help=_("The type of IP address to use. Possible values include "
"'fixed' and 'floating' (the default)")
)
parser.add_argument(
'--network',
metavar='<network>',
default='',
help=_("The network to use for SSH connection")
)
parser.add_argument(
'--ipv6',
action="store_true",
default=False,
help=_("Whether the IPv6 address should be used for SSH. Default "
"to use IPv4 address.")
)
parser.add_argument(
'--user',
metavar='<user>',
default='root',
help=_("The login name to use for SSH connection. Default to "
"'root'.")
)
parser.add_argument(
'--identity-file',
metavar='<identity_file>',
help=_("The private key file to use, same as the '-i' SSH option")
)
parser.add_argument(
'--ssh-options',
metavar='<ssh_options>',
default="",
help=_("Extra options to pass to SSH. See: man ssh.")
)
parser.add_argument(
'--script',
metavar='<script>',
required=True,
help=_("Path name of the script file to run")
)
parser.add_argument(
'cluster',
metavar='<cluster>',
help=_('ID or name of cluster(s) to operate on.')
)
return parser
def take_action(self, args):
self.log.debug("take_action(%s)", args)
service = self.app.client_manager.clustering
if '@' in args.cluster:
user, cluster = args.cluster.split('@', 1)
args.user = user
args.cluster = cluster
try:
attributes = service.collect_cluster_attrs(args.cluster, 'details')
except sdk_exc.ResourceNotFound:
raise exc.CommandError(_("Cluster not found: %s") % args.cluster)
script = None
try:
f = open(args.script, 'r')
script = f.read()
except Exception:
raise exc.CommandError(_("Cound not open script file: %s") %
args.script)
tasks = dict()
for attr in attributes:
node_id = attr.node_id
addr = attr.attr_value['addresses']
output = dict()
th = threading.Thread(
target=self._run_script,
args=(node_id, addr, args.network, args.address_type,
args.port, args.user, args.ipv6, args.identity_file,
script, args.ssh_options),
kwargs={'output': output})
th.start()
tasks[th] = (node_id, output)
for t in tasks:
t.join()
for t in tasks:
node_id, result = tasks[t]
print("node: %s" % node_id)
print("status: %s" % result.get('status'))
if "reason" in result:
print("reason: %s" % result.get('reason'))
if "output" in result:
print("output:\n%s" % result.get('output'))
if "error" in result:
print("error:\n%s" % result.get('error'))
def _run_script(self, node_id, addr, net, addr_type, port, user, ipv6,
identity_file, script, options, output=None):
version = 6 if ipv6 else 4
# Select the network to use.
if net:
addresses = addr.get(net)
if not addresses:
output['status'] = _('FAILED')
output['error'] = _("Node '%(node)s' is not attached to "
"network '%(net)s'.") % {'node': node_id,
'net': net}
return
else:
# network not specified
if len(addr) > 1:
output['status'] = _('FAILED')
output['error'] = _("Node '%(node)s' is attached to more "
"than one network. Please pick the "
"network to use.") % {'node': node_id}
return
elif not addr:
output['status'] = _('FAILED')
output['error'] = _("Node '%(node)s' is not attached to any "
"network.") % {'node': node_id}
return
else:
addresses = list(six.itervalues(addr))[0]
# Select the address in the selected network.
# If the extension is not present, we assume the address to be
# floating.
matching_addresses = []
for a in addresses:
a_type = a.get('OS-EXT-IPS:type', 'floating')
a_version = a.get('version')
if (a_version == version and a_type == addr_type):
matching_addresses.append(a.get('addr'))
if not matching_addresses:
output['status'] = _('FAILED')
output['error'] = _("No address that matches network '%(net)s' "
"and type '%(type)s' of IPv%(ver)s has been "
"found for node '%(node)s'."
) % {'net': net, 'type': addr_type,
'ver': version, 'node': node_id}
return
if len(matching_addresses) > 1:
output['status'] = _('FAILED')
output['error'] = _("More than one IPv%(ver)s %(type)s address "
"found.") % {'ver': version,
'type': addr_type}
return
ip_address = str(matching_addresses[0])
identity = '-i %s' % identity_file if identity_file else ''
cmd = [
'ssh',
'-%d' % version,
'-p%d' % port,
identity,
options,
'%s@%s' % (user, ip_address),
'%s' % script
]
self.log.debug("%s" % cmd)
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
(stdout, stderr) = proc.communicate()
while proc.returncode is None:
time.sleep(1)
if proc.returncode == 0:
output['status'] = _('SUCCEEDED (0)')
output['output'] = stdout
if stderr:
output['error'] = stderr
else:
output['status'] = _('FAILED (%d)') % proc.returncode
output['output'] = stdout
if stderr:
output['error'] = stderr

View File

@ -82,6 +82,7 @@ openstack.clustering.v1 =
cluster_show = senlinclient.v1.cluster:ShowCluster
cluster_update = senlinclient.v1.cluster:UpdateCluster
cluster_collect = senlinclient.v1.cluster:ClusterCollect
cluster_run = senlinclient.v1.cluster:ClusterRun
[global]
setup-hooks =