Cleanup Ansible executor
This commit is contained in:
parent
87d0a2c708
commit
dbf3187d2f
@ -1,5 +1,3 @@
|
||||
# Copyright (c) 2016 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
@ -14,7 +12,6 @@
|
||||
# limitations under the License.
|
||||
|
||||
from collections import namedtuple
|
||||
import copy
|
||||
|
||||
from ansible.executor import task_queue_manager
|
||||
from ansible import inventory
|
||||
@ -28,13 +25,14 @@ from os_failures import utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
STATUS_OK = 'OK'
|
||||
STATUS_FAILED = 'FAILED'
|
||||
STATUS_UNREACHABLE = 'UNREACHABLE'
|
||||
STATUS_SKIPPED = 'SKIPPED'
|
||||
|
||||
DEFAULT_ERROR_STATUSES = [STATUS_FAILED, STATUS_UNREACHABLE]
|
||||
DEFAULT_ERROR_STATUSES = {STATUS_FAILED, STATUS_UNREACHABLE}
|
||||
|
||||
SSH_COMMON_ARGS = '-o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no'
|
||||
|
||||
|
||||
class AnsibleExecutionException(Exception):
|
||||
@ -45,19 +43,8 @@ class AnsibleExecutionUnreachable(AnsibleExecutionException):
|
||||
pass
|
||||
|
||||
|
||||
def _light_rec(result):
|
||||
for r in result:
|
||||
c = copy.deepcopy(r)
|
||||
if 'records' in c:
|
||||
del c['records']
|
||||
if 'series' in c:
|
||||
del c['series']
|
||||
yield c
|
||||
|
||||
|
||||
def _log_result(result):
|
||||
# todo check current log level before doing heavy things
|
||||
LOG.debug('Execution result (filtered): %s', list(_light_rec(result)))
|
||||
AnsibleExecutionRecord = namedtuple('AnsibleExecutionRecord',
|
||||
['host', 'status', 'task', 'payload'])
|
||||
|
||||
|
||||
class MyCallback(callback_pkg.CallbackBase):
|
||||
@ -71,10 +58,9 @@ class MyCallback(callback_pkg.CallbackBase):
|
||||
self.storage = storage
|
||||
|
||||
def _store(self, result, status):
|
||||
record = dict(host=result._host.get_name(),
|
||||
status=status,
|
||||
task=result._task.get_name(),
|
||||
payload=result._result)
|
||||
record = AnsibleExecutionRecord(
|
||||
host=result._host.get_name(), status=status,
|
||||
task=result._task.get_name(), payload=result._result)
|
||||
self.storage.append(record)
|
||||
|
||||
def v2_runner_on_failed(self, result, ignore_errors=False):
|
||||
@ -96,25 +82,28 @@ class MyCallback(callback_pkg.CallbackBase):
|
||||
|
||||
Options = namedtuple('Options',
|
||||
['connection', 'password', 'module_path', 'forks',
|
||||
'remote_user',
|
||||
'private_key_file', 'ssh_common_args', 'ssh_extra_args',
|
||||
'sftp_extra_args', 'scp_extra_args', 'become',
|
||||
'become_method', 'become_user', 'verbosity', 'check'])
|
||||
'remote_user', 'private_key_file',
|
||||
'ssh_common_args', 'ssh_extra_args', 'sftp_extra_args',
|
||||
'scp_extra_args', 'become', 'become_method',
|
||||
'become_user', 'verbosity', 'check'])
|
||||
|
||||
|
||||
class AnsibleRunner(object):
|
||||
def __init__(self, remote_user='root', password=None, forks=100,
|
||||
ssh_common_args=None):
|
||||
ssh_common_args=''):
|
||||
super(AnsibleRunner, self).__init__()
|
||||
|
||||
module_path = utils.resolve_relative_path(
|
||||
'os_failures/ansible/modules')
|
||||
ssh_common_args = ' '.join([ssh_common_args, SSH_COMMON_ARGS])
|
||||
|
||||
self.options = Options(
|
||||
connection='smart', password=password, module_path=module_path,
|
||||
forks=forks, remote_user=remote_user, private_key_file=None,
|
||||
ssh_common_args=ssh_common_args, ssh_extra_args=None, sftp_extra_args=None,
|
||||
scp_extra_args=None, become=None, become_method='sudo',
|
||||
become_user='root', verbosity=100, check=False)
|
||||
ssh_common_args=ssh_common_args, ssh_extra_args=None,
|
||||
sftp_extra_args=None, scp_extra_args=None,
|
||||
become=None, become_method='sudo', become_user='root',
|
||||
verbosity=100, check=False)
|
||||
|
||||
def _run_play(self, play_source):
|
||||
LOG.debug('Running play: %s', play_source)
|
||||
@ -153,11 +142,11 @@ class AnsibleRunner(object):
|
||||
if tqm is not None:
|
||||
tqm.cleanup()
|
||||
|
||||
_log_result(storage)
|
||||
LOG.debug('Execution result: %s', storage)
|
||||
|
||||
return storage
|
||||
|
||||
def run(self, playbook):
|
||||
def run_playbook(self, playbook):
|
||||
result = []
|
||||
|
||||
for play_source in playbook:
|
||||
@ -168,22 +157,33 @@ class AnsibleRunner(object):
|
||||
return result
|
||||
|
||||
def execute(self, hosts, task, raise_on_statuses=DEFAULT_ERROR_STATUSES):
|
||||
"""
|
||||
Executes the task on every host from the list. Raises exception if any
|
||||
of the commands fails with one of specified statuses.
|
||||
:param hosts: list of host addresses
|
||||
:param task: Ansible task
|
||||
:param raise_on_statuses: raise exception if any of commands return
|
||||
any of these statuses
|
||||
:return: execution result, type AnsibleExecutionRecord
|
||||
"""
|
||||
LOG.debug('Executing task: %s on hosts: %s', task, hosts)
|
||||
|
||||
task_play = {'hosts': hosts, 'tasks': [task]}
|
||||
result = self.run([task_play])
|
||||
result = self.run_playbook([task_play])
|
||||
|
||||
if raise_on_statuses:
|
||||
errors = []
|
||||
only_unreachable = True
|
||||
|
||||
for r in result:
|
||||
if r['status'] in raise_on_statuses:
|
||||
if r['status'] != STATUS_UNREACHABLE:
|
||||
if r.status in raise_on_statuses:
|
||||
if r.status != STATUS_UNREACHABLE:
|
||||
only_unreachable = False
|
||||
errors.append(r)
|
||||
|
||||
if errors:
|
||||
msg = 'Execution failed: %s' % ', '.join((
|
||||
'(host: %s, status: %s)' % (r['host'], r['status']))
|
||||
'(host: %s, status: %s)' % (r.host, r.status))
|
||||
for r in errors)
|
||||
ek = (AnsibleExecutionUnreachable if only_unreachable
|
||||
else AnsibleExecutionException)
|
@ -3,7 +3,7 @@ import json
|
||||
import random
|
||||
import six
|
||||
|
||||
from os_failures.ansible import runner
|
||||
from os_failures.ansible import executor
|
||||
from os_failures.api import cloud_management
|
||||
from os_failures.api import node_collection
|
||||
from os_failures.api import service
|
||||
@ -86,10 +86,10 @@ class FuelManagement(cloud_management.CloudManagement):
|
||||
self.username = params['username']
|
||||
self.password = params['password']
|
||||
|
||||
self.master_node_executor = runner.AnsibleRunner(
|
||||
self.master_node_executor = executor.AnsibleRunner(
|
||||
remote_user=self.username)
|
||||
|
||||
self.cloud_executor = runner.AnsibleRunner(
|
||||
self.cloud_executor = executor.AnsibleRunner(
|
||||
remote_user=self.username,
|
||||
ssh_common_args='-o ProxyCommand="ssh -W %%h:%%p %s@%s"' %
|
||||
(self.username, self.master_node_address))
|
||||
@ -105,7 +105,7 @@ class FuelManagement(cloud_management.CloudManagement):
|
||||
def get_cloud_hosts(self):
|
||||
task = {'command': 'fuel2 node list -f json'}
|
||||
r = self.execute_on_master_node(task)
|
||||
return json.loads(r[0]['payload']['stdout'])
|
||||
return json.loads(r[0].payload['stdout'])
|
||||
|
||||
def execute_on_master_node(self, task):
|
||||
return self.master_node_executor.execute([self.master_node_address], task)
|
||||
|
@ -1,4 +1,4 @@
|
||||
from os_failures.ansible import runner
|
||||
from os_failures.ansible import executor
|
||||
from os_failures.api import power_management
|
||||
|
||||
|
||||
@ -8,7 +8,7 @@ class KVM(power_management.PowerManagement):
|
||||
self.username = params.get('username')
|
||||
self.password = params.get('password')
|
||||
|
||||
self.executor = runner.AnsibleRunner(remote_user=self.username)
|
||||
self.executor = executor.AnsibleRunner(remote_user=self.username)
|
||||
|
||||
def poweroff(self, hosts):
|
||||
print('Power off hosts %s', hosts)
|
||||
|
Loading…
x
Reference in New Issue
Block a user