blob: 73a1cb9a4c2e589dcf7a5836b0093ef52c9a6c31 [file] [log] [blame] [edit]
#pylint: disable-msg=C0111
"""
Postjob task.
Postjob tasks are responsible for setting the final status of the HQE
and Host, and scheduling additional special agents such as cleanup,
if necessary.
"""
import os
from autotest_lib.client.common_lib import utils
from autotest_lib.frontend.afe import models, model_attributes
from autotest_lib.scheduler import agent_task, drones, drone_manager
from autotest_lib.scheduler import email_manager, pidfile_monitor
from autotest_lib.scheduler import scheduler_config
from autotest_lib.server import autoserv_utils
try:
from chromite.lib import metrics
except ImportError:
metrics = utils.metrics_mock
_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
class PostJobTask(agent_task.AgentTask):
def __init__(self, queue_entries, log_file_name):
super(PostJobTask, self).__init__(log_file_name=log_file_name)
self.queue_entries = queue_entries
self._autoserv_monitor = pidfile_monitor.PidfileRunMonitor()
self._autoserv_monitor.attach_to_existing_process(
self._working_directory())
def _command_line(self):
# Do we need testing_mode?
return self._generate_command(
self._drone_manager.absolute_path(self._working_directory()))
def _generate_command(self, results_dir):
raise NotImplementedError('Subclasses must override this')
@property
def owner_username(self):
return self.queue_entries[0].job.owner
def _working_directory(self):
return self._get_consistent_execution_path(self.queue_entries)
def _paired_with_monitor(self):
return self._autoserv_monitor
def _job_was_aborted(self):
was_aborted = None
for queue_entry in self.queue_entries:
queue_entry.update_from_database()
if was_aborted is None: # first queue entry
was_aborted = bool(queue_entry.aborted)
elif was_aborted != bool(queue_entry.aborted): # subsequent entries
entries = ['%s (aborted: %s)' % (entry, entry.aborted)
for entry in self.queue_entries]
email_manager.manager.enqueue_notify_email(
'Inconsistent abort state',
'Queue entries have inconsistent abort state:\n' +
'\n'.join(entries))
# don't crash here, just assume true
return True
return was_aborted
def _final_status(self):
if self._job_was_aborted():
return models.HostQueueEntry.Status.ABORTED
# we'll use a PidfileRunMonitor to read the autoserv exit status
if self._autoserv_monitor.exit_code() == 0:
return models.HostQueueEntry.Status.COMPLETED
return models.HostQueueEntry.Status.FAILED
def _set_all_statuses(self, status):
for queue_entry in self.queue_entries:
queue_entry.set_status(status)
def abort(self):
# override AgentTask.abort() to avoid killing the process and ending
# the task. post-job tasks continue when the job is aborted.
pass
def _pidfile_label(self):
# '.autoserv_execute' -> 'autoserv'
return self._pidfile_name()[1:-len('_execute')]
class SelfThrottledPostJobTask(PostJobTask):
"""
PostJobTask that maintains its own process limit.
We throttle tasks like parsing because we don't want them to
hold up tests. At the same time we don't wish to build up load
that will take forever to parse.
"""
_num_running_processes = 0
# Last known limit of max processes, used to check whether
# max processes config has been changed.
_last_known_max_processes = 0
# Whether an email should be sent to notifiy process limit being hit.
_notification_on = True
# Once process limit is hit, an email will be sent.
# To prevent spams, do not send another email until
# it drops to lower than the following level.
REVIVE_NOTIFICATION_THRESHOLD = 0.80
@classmethod
def _gauge_metrics(cls):
"""Report to monarch the number of running processes."""
m = metrics.Gauge('chromeos/autotest/scheduler/postjob_tasks')
m.set(cls._num_running_processes, fields={'task_name': cls.__name__})
@classmethod
def _increment_running_processes(cls):
cls._num_running_processes += 1
cls._gauge_metrics()
@classmethod
def _decrement_running_processes(cls):
cls._num_running_processes -= 1
cls._gauge_metrics()
@classmethod
def _max_processes(cls):
raise NotImplementedError
@classmethod
def _can_run_new_process(cls):
return cls._num_running_processes < cls._max_processes()
def _process_started(self):
return bool(self.monitor)
def tick(self):
# override tick to keep trying to start until the process count goes
# down and we can, at which point we revert to default behavior
if self._process_started():
super(SelfThrottledPostJobTask, self).tick()
else:
self._try_starting_process()
def run(self):
# override run() to not actually run unless we can
self._try_starting_process()
@classmethod
def _notify_process_limit_hit(cls):
"""Send an email to notify that process limit is hit."""
if cls._notification_on:
subject = '%s: hitting max process limit.' % cls.__name__
message = ('Running processes/Max processes: %d/%d'
% (cls._num_running_processes, cls._max_processes()))
email_manager.manager.enqueue_notify_email(subject, message)
cls._notification_on = False
@classmethod
def _reset_notification_switch_if_necessary(cls):
"""Reset _notification_on if necessary.
Set _notification_on to True on the following cases:
1) If the limit of max processes configuration changes;
2) If _notification_on is False and the number of running processes
drops to lower than a level defined in REVIVE_NOTIFICATION_THRESHOLD.
"""
if cls._last_known_max_processes != cls._max_processes():
cls._notification_on = True
cls._last_known_max_processes = cls._max_processes()
return
percentage = float(cls._num_running_processes) / cls._max_processes()
if (not cls._notification_on and
percentage < cls.REVIVE_NOTIFICATION_THRESHOLD):
cls._notification_on = True
def _try_starting_process(self):
self._reset_notification_switch_if_necessary()
if not self._can_run_new_process():
self._notify_process_limit_hit()
return
# actually run the command
super(SelfThrottledPostJobTask, self).run()
if self._process_started():
self._increment_running_processes()
def finished(self, success):
super(SelfThrottledPostJobTask, self).finished(success)
if self._process_started():
self._decrement_running_processes()
class GatherLogsTask(PostJobTask):
"""
Task responsible for
* gathering uncollected logs (if Autoserv crashed hard or was killed)
* copying logs to the results repository
* spawning CleanupTasks for hosts, if necessary
* spawning a FinalReparseTask for the job
* setting the final status of the host, directly or through a cleanup
"""
def __init__(self, queue_entries, recover_run_monitor=None):
self._job = queue_entries[0].job
super(GatherLogsTask, self).__init__(
queue_entries, log_file_name='.collect_crashinfo.log')
self._set_ids(queue_entries=queue_entries)
# TODO: Refactor into autoserv_utils. crbug.com/243090
def _generate_command(self, results_dir):
host_list = ','.join(queue_entry.host.hostname
for queue_entry in self.queue_entries)
return [autoserv_utils.autoserv_path , '-p',
'--pidfile-label=%s' % self._pidfile_label(),
'--use-existing-results', '--collect-crashinfo',
'-m', host_list, '-r', results_dir]
@property
def num_processes(self):
return len(self.queue_entries)
def _pidfile_name(self):
return drone_manager.CRASHINFO_PID_FILE
def prolog(self):
self._check_queue_entry_statuses(
self.queue_entries,
allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
allowed_host_statuses=(models.Host.Status.RUNNING,))
super(GatherLogsTask, self).prolog()
def epilog(self):
super(GatherLogsTask, self).epilog()
self._parse_results(self.queue_entries)
final_success, num_tests_failed = self._get_monitor_info()
reset_after_failure = (
not self._job.run_reset and (
not final_success or num_tests_failed > 0))
self._reboot_hosts(final_success, num_tests_failed, reset_after_failure)
if reset_after_failure:
m = metrics.Counter('chromeos/autotest/scheduler/postjob_tasks/'
'reset_after_failure')
m.increment(fields={'autoserv_process_success': final_success,
'num_tests_failed': num_tests_failed > 0})
self._reset_after_failure()
def _get_monitor_info(self):
"""Read monitor info from pidfile.
@returns: a tuple including
final_success: whether the monitor is successfully finished;
num_tests_failed: how many failed tests in the process.
"""
if self._autoserv_monitor.has_process():
final_success = (self._final_status() ==
models.HostQueueEntry.Status.COMPLETED)
num_tests_failed = self._autoserv_monitor.num_tests_failed()
else:
final_success = False
num_tests_failed = 0
return final_success, num_tests_failed
def _reboot_hosts(self, final_success, num_tests_failed,
reset_after_failure):
"""Reboot hosts by scheduling a CLEANUP task on host if needed.
@param final_success: whether the monitor successfully exits.
@param num_tests_failed: how many failed tests in total.
@param reset_after_failure: whether to schedule RESET task later.
"""
reboot_after = self._job.reboot_after
do_reboot = (
# always reboot after aborted jobs
self._final_status() == models.HostQueueEntry.Status.ABORTED
or reboot_after == model_attributes.RebootAfter.ALWAYS
or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
and final_success and num_tests_failed == 0)
or (num_tests_failed > 0 and not reset_after_failure))
for queue_entry in self.queue_entries:
if do_reboot:
# don't pass the queue entry to the CleanupTask. if the cleanup
# fails, the job doesn't care -- it's over.
models.SpecialTask.objects.create(
host=models.Host.objects.get(id=queue_entry.host.id),
task=models.SpecialTask.Task.CLEANUP,
requested_by=self._job.owner_model())
else:
queue_entry.host.set_status(models.Host.Status.READY)
def _reset_after_failure(self):
"""Queue a RESET job for the host if job fails.
The current hqe entry is not passed into the RESET job.
"""
for queue_entry in self.queue_entries:
models.SpecialTask.objects.create(
host=models.Host.objects.get(id=queue_entry.host.id),
task=models.SpecialTask.Task.RESET,
requested_by=self._job.owner_model())
def run(self):
autoserv_exit_code = self._autoserv_monitor.exit_code()
# only run if Autoserv exited due to some signal. if we have no exit
# code, assume something bad (and signal-like) happened.
if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
super(GatherLogsTask, self).run()
else:
self.finished(True)
class FinalReparseTask(SelfThrottledPostJobTask):
def __init__(self, queue_entries):
super(FinalReparseTask, self).__init__(queue_entries,
log_file_name='.parse.log')
# don't use _set_ids, since we don't want to set the host_ids
self.queue_entry_ids = [entry.id for entry in queue_entries]
def _generate_command(self, results_dir):
return [_parser_path, '--detach', '--write-pidfile',
'--record-duration', '--suite-report', '-l', '2', '-r', '-o',
results_dir]
@property
def num_processes(self):
return 0 # don't include parser processes in accounting
def _pidfile_name(self):
return drone_manager.PARSER_PID_FILE
@classmethod
def _max_processes(cls):
return scheduler_config.config.max_parse_processes
def prolog(self):
self._check_queue_entry_statuses(
self.queue_entries,
allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
super(FinalReparseTask, self).prolog()
def epilog(self):
super(FinalReparseTask, self).epilog()
self._set_all_statuses(self._final_status())