More refactoring of jobs
Move more of db_upgrade into a generic shell script executor. Change-Id: I71d4e4ac800d1b7dd8f66d00b2ef757d5c39d0ab
This commit is contained in:
parent
333aa0590f
commit
81f87ed6d6
@ -30,15 +30,43 @@ class Task(object):
|
||||
self.global_config = global_config
|
||||
self.plugin_config = plugin_config
|
||||
self.job_name = job_name
|
||||
self._reset()
|
||||
|
||||
# Define the number of steps we will do to determine our progress.
|
||||
self.total_steps = 0
|
||||
|
||||
def _reset(self):
|
||||
self.job = None
|
||||
self.job_arguments = None
|
||||
self.work_data = None
|
||||
self.cancelled = False
|
||||
|
||||
# Define the number of steps we will do to determine our progress.
|
||||
self.success = True
|
||||
self.messages = []
|
||||
self.current_step = 0
|
||||
self.total_steps = 0
|
||||
|
||||
def start_job(self, job):
|
||||
self._reset()
|
||||
self.job = job
|
||||
|
||||
if self.job is not None:
|
||||
try:
|
||||
self.job_arguments = \
|
||||
json.loads(self.job.arguments.decode('utf-8'))
|
||||
self.log.debug("Got job from ZUUL %s" % self.job_arguments)
|
||||
|
||||
# Send an initial WORK_DATA and WORK_STATUS packets
|
||||
self._send_work_data()
|
||||
|
||||
# Execute the job_steps
|
||||
self.do_job_steps()
|
||||
|
||||
# Finally, send updated work data and completed packets
|
||||
self._send_final_results()
|
||||
|
||||
except Exception as e:
|
||||
self.log.exception('Exception handling log event.')
|
||||
if not self.cancelled:
|
||||
self.job.sendWorkException(str(e).encode('utf-8'))
|
||||
|
||||
def stop_worker(self, number):
|
||||
# Check the number is for this job instance
|
||||
@ -49,27 +77,6 @@ class Task(object):
|
||||
self.cancelled = True
|
||||
# TODO: Work out how to kill current step
|
||||
|
||||
@common.task_step
|
||||
def _grab_patchset(self, job_args, job_log_file_path):
|
||||
""" Checkout the reference into config['git_working_dir'] """
|
||||
|
||||
self.log.debug("Grab the patchset we want to test against")
|
||||
local_path = os.path.join(self.global_config['git_working_dir'],
|
||||
self.job_name, job_args['ZUUL_PROJECT'])
|
||||
if not os.path.exists(local_path):
|
||||
os.makedirs(local_path)
|
||||
|
||||
git_args = copy.deepcopy(job_args)
|
||||
git_args['GIT_ORIGIN'] = 'git://git.openstack.org/'
|
||||
|
||||
cmd = os.path.join(os.path.join(os.path.dirname(__file__),
|
||||
'gerrit-git-prep.sh'))
|
||||
cmd += ' https://review.openstack.org'
|
||||
cmd += ' http://zuul.rcbops.com'
|
||||
utils.execute_to_log(cmd, job_log_file_path, env=git_args,
|
||||
cwd=local_path)
|
||||
return local_path
|
||||
|
||||
def _get_work_data(self):
|
||||
if self.work_data is None:
|
||||
hostname = os.uname()[1]
|
||||
@ -87,6 +94,15 @@ class Task(object):
|
||||
json.dumps(self._get_work_data()))
|
||||
self.job.sendWorkData(json.dumps(self._get_work_data()))
|
||||
|
||||
def _send_final_results(self):
|
||||
self._send_work_data()
|
||||
|
||||
if self.work_data['result'] is 'SUCCESS':
|
||||
self.job.sendWorkComplete(
|
||||
json.dumps(self._get_work_data()))
|
||||
else:
|
||||
self.job.sendWorkFail()
|
||||
|
||||
def _do_next_step(self):
|
||||
""" Send a WORK_STATUS command to the gearman server.
|
||||
This can provide a progress bar. """
|
||||
@ -110,43 +126,58 @@ class ShellTask(Task):
|
||||
# Define the number of steps we will do to determine our progress.
|
||||
self.total_steps = 4
|
||||
|
||||
def start_job(self, job):
|
||||
self.job = job
|
||||
self.success = True
|
||||
self.messages = []
|
||||
def _reset(self):
|
||||
super(ShellTask, self)._reset()
|
||||
self.git_path = None
|
||||
|
||||
if self.job is not None:
|
||||
try:
|
||||
self.job_arguments = \
|
||||
json.loads(self.job.arguments.decode('utf-8'))
|
||||
self.log.debug("Got job from ZUUL %s" % self.job_arguments)
|
||||
|
||||
# Send an initial WORK_DATA and WORK_STATUS packets
|
||||
self._send_work_data()
|
||||
|
||||
# Step 1: Checkout updates from git!
|
||||
self.git_path = self._grab_patchset(
|
||||
self.job_arguments,
|
||||
def do_job_steps(self, job):
|
||||
# Step 1: Checkout updates from git
|
||||
self._grab_patchset(self.job_arguments,
|
||||
self.job_datasets[0]['job_log_file_path'])
|
||||
|
||||
# Step 3: execute shell script
|
||||
# TODO
|
||||
# Step 2: Run shell script
|
||||
self._execute_script()
|
||||
|
||||
# Step 4: Analyse logs for errors
|
||||
# TODO
|
||||
# Step 3: Analyse logs for errors
|
||||
self._parse_and_check_results()
|
||||
|
||||
# Step 5: handle the results (and upload etc)
|
||||
# TODO
|
||||
# Step 4: handle the results (and upload etc)
|
||||
self._handle_results()
|
||||
|
||||
# Finally, send updated work data and completed packets
|
||||
self._send_work_data()
|
||||
@common.task_step
|
||||
def _grab_patchset(self, job_args, job_log_file_path):
|
||||
""" Checkout the reference into config['git_working_dir'] """
|
||||
|
||||
if self.work_data['result'] is 'SUCCESS':
|
||||
self.job.sendWorkComplete(
|
||||
json.dumps(self._get_work_data()))
|
||||
else:
|
||||
self.job.sendWorkFail()
|
||||
except Exception as e:
|
||||
self.log.exception('Exception handling log event.')
|
||||
if not self.cancelled:
|
||||
self.job.sendWorkException(str(e).encode('utf-8'))
|
||||
self.log.debug("Grab the patchset we want to test against")
|
||||
local_path = os.path.join(self.global_config['git_working_dir'],
|
||||
self.job_name, job_args['ZUUL_PROJECT'])
|
||||
if not os.path.exists(local_path):
|
||||
os.makedirs(local_path)
|
||||
|
||||
git_args = copy.deepcopy(job_args)
|
||||
git_args['GIT_ORIGIN'] = 'git://git.openstack.org/'
|
||||
|
||||
cmd = os.path.join(os.path.join(os.path.dirname(__file__),
|
||||
'gerrit-git-prep.sh'))
|
||||
cmd += ' https://review.openstack.org'
|
||||
cmd += ' http://zuul.rcbops.com'
|
||||
utils.execute_to_log(cmd, job_log_file_path, env=git_args,
|
||||
cwd=local_path)
|
||||
self.git_path = local_path
|
||||
return local_path
|
||||
|
||||
@common.task_step
|
||||
def _execute_script(self):
|
||||
# Run script
|
||||
self.script_return_code = 0
|
||||
|
||||
@common.task_step
|
||||
def _parse_and_check_results(self):
|
||||
if self.script_return_code > 0:
|
||||
self.success = False
|
||||
self.messages.append('Return code from test script was non-zero '
|
||||
'(%d)' % self.script_return_code)
|
||||
|
||||
@common.task_step
|
||||
def _handle_results(self):
|
||||
pass
|
||||
|
@ -32,7 +32,7 @@ MIGRATION_START_RE = re.compile('([0-9]+) -> ([0-9]+)\.\.\.$')
|
||||
MIGRATION_END_RE = re.compile('^done$')
|
||||
|
||||
|
||||
class Runner(models.Task):
|
||||
class Runner(models.ShellTask):
|
||||
|
||||
""" This thread handles the actual sql-migration tests.
|
||||
It pulls in a gearman job from the build:gate-real-db-upgrade
|
||||
@ -50,53 +50,52 @@ class Runner(models.Task):
|
||||
# Define the number of steps we will do to determine our progress.
|
||||
self.total_steps = 5
|
||||
|
||||
def start_job(self, job):
|
||||
self.job = job
|
||||
self.success = True
|
||||
self.messages = []
|
||||
|
||||
if self.job is not None:
|
||||
try:
|
||||
self.job_arguments = \
|
||||
json.loads(self.job.arguments.decode('utf-8'))
|
||||
self.log.debug("Got job from ZUUL %s" % self.job_arguments)
|
||||
|
||||
# Send an initial WORK_DATA and WORK_STATUS packets
|
||||
self._send_work_data()
|
||||
|
||||
def do_job_steps(self):
|
||||
# Step 1: Figure out which datasets to run
|
||||
self.job_datasets = self._get_job_datasets()
|
||||
|
||||
# Step 2: Checkout updates from git!
|
||||
self.git_path = self._grab_patchset(
|
||||
self.job_arguments,
|
||||
self.job_datasets[0]['job_log_file_path'])
|
||||
# all other steps are common to running a shell script
|
||||
super(Runner, self).job_steps()
|
||||
|
||||
# Step 3: Run migrations on datasets
|
||||
if self._execute_migrations() > 0:
|
||||
self.success = False
|
||||
self.messages.append('Return code from test script was '
|
||||
'non-zero')
|
||||
@common.task_step
|
||||
def _get_job_datasets(self):
|
||||
""" Take the applicable datasets for this job and set them up in
|
||||
self.job_datasets """
|
||||
|
||||
# Step 4: Analyse logs for errors
|
||||
job_datasets = []
|
||||
for dataset in self._get_datasets():
|
||||
# Only load a dataset if it is the right project and we
|
||||
# know how to process the upgrade
|
||||
if (self.job_arguments['ZUUL_PROJECT'] ==
|
||||
dataset['config']['project'] and
|
||||
self._get_project_command(dataset['config']['type'])):
|
||||
dataset['determined_path'] = utils.determine_job_identifier(
|
||||
self.job_arguments, self.plugin_config['function'],
|
||||
self.job.unique
|
||||
)
|
||||
dataset['job_log_file_path'] = os.path.join(
|
||||
self.global_config['jobs_working_dir'],
|
||||
dataset['determined_path'],
|
||||
dataset['name'] + '.log'
|
||||
)
|
||||
dataset['result'] = 'UNTESTED'
|
||||
dataset['command'] = \
|
||||
self._get_project_command(dataset['config']['type'])
|
||||
|
||||
job_datasets.append(dataset)
|
||||
|
||||
return job_datasets
|
||||
|
||||
@common.task_step
|
||||
def _execute_script(self):
|
||||
# Run script
|
||||
self.script_return_code = self._execute_migrations()
|
||||
|
||||
@common.task_step
|
||||
def _parse_and_check_results(self):
|
||||
super(Runner, self)._parse_and_check_results()
|
||||
self._check_all_dataset_logs_for_errors()
|
||||
|
||||
# Step 5: handle the results (and upload etc)
|
||||
self._handle_results()
|
||||
|
||||
# Finally, send updated work data and completed packets
|
||||
self._send_work_data()
|
||||
|
||||
if self.work_data['result'] is 'SUCCESS':
|
||||
self.job.sendWorkComplete(
|
||||
json.dumps(self._get_work_data()))
|
||||
else:
|
||||
self.job.sendWorkFail()
|
||||
except Exception as e:
|
||||
self.log.exception('Exception handling log event.')
|
||||
if not self.cancelled:
|
||||
self.job.sendWorkException(str(e).encode('utf-8'))
|
||||
|
||||
@common.task_step
|
||||
def _handle_results(self):
|
||||
""" pass over the results to handle_results.py for post-processing """
|
||||
@ -108,11 +107,8 @@ class Runner(models.Task):
|
||||
self.log.debug("Index URL found at %s" % index_url)
|
||||
self.work_data['url'] = index_url
|
||||
|
||||
@common.task_step
|
||||
def _check_all_dataset_logs_for_errors(self):
|
||||
self.log.debug('Check logs for errors')
|
||||
self.success = True
|
||||
self.messages = []
|
||||
|
||||
for i, dataset in enumerate(self.job_datasets):
|
||||
success, messages = handle_results.check_log_file(
|
||||
@ -156,35 +152,6 @@ class Runner(models.Task):
|
||||
|
||||
return self.datasets
|
||||
|
||||
@common.task_step
|
||||
def _get_job_datasets(self):
|
||||
""" Take the applicable datasets for this job and set them up in
|
||||
self.job_datasets """
|
||||
|
||||
job_datasets = []
|
||||
for dataset in self._get_datasets():
|
||||
# Only load a dataset if it is the right project and we
|
||||
# know how to process the upgrade
|
||||
if (self.job_arguments['ZUUL_PROJECT'] ==
|
||||
dataset['config']['project'] and
|
||||
self._get_project_command(dataset['config']['type'])):
|
||||
dataset['determined_path'] = utils.determine_job_identifier(
|
||||
self.job_arguments, self.plugin_config['function'],
|
||||
self.job.unique
|
||||
)
|
||||
dataset['job_log_file_path'] = os.path.join(
|
||||
self.global_config['jobs_working_dir'],
|
||||
dataset['determined_path'],
|
||||
dataset['name'] + '.log'
|
||||
)
|
||||
dataset['result'] = 'UNTESTED'
|
||||
dataset['command'] = \
|
||||
self._get_project_command(dataset['config']['type'])
|
||||
|
||||
job_datasets.append(dataset)
|
||||
|
||||
return job_datasets
|
||||
|
||||
def _get_project_command(self, db_type):
|
||||
command = (self.job_arguments['ZUUL_PROJECT'].split('/')[-1] + '_' +
|
||||
db_type + '_migrations.sh')
|
||||
@ -193,7 +160,6 @@ class Runner(models.Task):
|
||||
return command
|
||||
return False
|
||||
|
||||
@common.task_step
|
||||
def _execute_migrations(self):
|
||||
""" Execute the migration on each dataset in datasets """
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user