blob: c616f4176fb8d2e92ddbdb937bee062c065658fc [file] [log] [blame]
# Copyright 2017 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Event handlers."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import datetime
import logging
import time
from lucifer import autotest
from lucifer import jobx
logger = logging.getLogger(__name__)
class EventHandler(object):
"""Event handling dispatcher.
Event handlers are implemented as methods named _handle_<event value>.
Each handler method must handle its exceptions accordingly. If an
exception escapes, the job dies on the spot.
Instances have one public attribute completed. completed is set to
True once the final COMPLETED event is received and the handler
finishes.
"""
def __init__(self, metrics, job, autoserv_exit, results_dir):
"""Initialize instance.
@param metrics: Metrics instance
@param job: frontend.afe.models.Job instance to own
@param hqes: list of HostQueueEntry instances for the job
@param autoserv_exit: autoserv exit status
@param results_dir: Job results directory
"""
self.completed = False
self._metrics = metrics
self._job = job
# TODO(crbug.com/748234): autoserv not implemented yet.
self._autoserv_exit = autoserv_exit
self._results_dir = results_dir
def __call__(self, event, msg):
logger.debug('Received event %r with message %r', event.name, msg)
method_name = '_handle_%s' % event.value
try:
handler = getattr(self, method_name)
except AttributeError:
raise NotImplementedError('%s is not implemented for handling %s',
method_name, event.name)
_retry_db_errors(lambda: handler(msg))
def _handle_starting(self, msg):
# TODO(crbug.com/748234): No event update needed yet.
pass
def _handle_running(self, _msg):
models = autotest.load('frontend.afe.models')
self._job.hostqueueentry_set.all().update(
status=models.HostQueueEntry.Status.RUNNING,
started_on=datetime.datetime.now())
def _handle_gathering(self, _msg):
models = autotest.load('frontend.afe.models')
self._job.hostqueueentry_set.all().update(
status=models.HostQueueEntry.Status.GATHERING)
def _handle_parsing(self, _msg):
models = autotest.load('frontend.afe.models')
self._job.hostqueueentry_set.all().update(
status=models.HostQueueEntry.Status.PARSING)
def _handle_aborted(self, _msg):
for hqe in self._job.hostqueueentry_set.all().prefetch_related('host'):
_mark_hqe_aborted(hqe)
jobx.write_aborted_keyvals_and_status(self._job, self._results_dir)
def _handle_completed(self, _msg):
self._mark_job_complete()
self.completed = True
def _handle_test_passed(self, msg):
if msg == 'autoserv':
self._autoserv_exit = 0
def _handle_test_failed(self, msg):
if msg == 'autoserv':
self._autoserv_exit = 1
def _handle_host_running(self, msg):
models = autotest.load('frontend.afe.models')
host = models.Host.objects.get(hostname=msg)
host.status = models.Host.Status.RUNNING
host.dirty = 1
host.save(update_fields=['status', 'dirty'])
self._metrics.send_host_status(host)
def _handle_host_ready(self, msg):
models = autotest.load('frontend.afe.models')
host = models.Host.objects.get(hostname=msg)
host.status = models.Host.Status.READY
host.save(update_fields=['status'])
self._metrics.send_host_status(host)
def _handle_host_needs_cleanup(self, msg):
models = autotest.load('frontend.afe.models')
host = models.Host.objects.get(hostname=msg)
models.SpecialTask.objects.create(
host_id=host.id,
task=models.SpecialTask.Task.CLEANUP,
requested_by=models.User.objects.get(login=self._job.owner))
def _handle_host_needs_reset(self, msg):
models = autotest.load('frontend.afe.models')
host = models.Host.objects.get(hostname=msg)
models.SpecialTask.objects.create(
host_id=host.id,
task=models.SpecialTask.Task.RESET,
requested_by=models.User.objects.get(login=self._job.owner))
def _handle_x_tests_done(self, msg):
"""Taken from GatherLogsTask.epilog."""
autoserv_exit, failures = (int(x) for x in msg.split(','))
logger.debug('Got autoserv_exit=%d, failures=%d',
autoserv_exit, failures)
success = (autoserv_exit == 0 and failures == 0)
reset_after_failure = not self._job.run_reset and not success
hqes = self._job.hostqueueentry_set.all().prefetch_related('host')
if self._should_reboot_duts(autoserv_exit, failures,
reset_after_failure):
logger.debug('Creating cleanup jobs for hosts')
for entry in hqes:
self._handle_host_needs_cleanup(entry.host.hostname)
else:
logger.debug('Not creating cleanup jobs for hosts')
for entry in hqes:
self._handle_host_ready(entry.host.hostname)
if not reset_after_failure:
logger.debug('Skipping reset because reset_after_failure is False')
return
logger.debug('Creating reset jobs for hosts')
self._metrics.send_reset_after_failure(autoserv_exit, failures)
for entry in hqes:
self._handle_host_needs_reset(entry.host.hostname)
def _should_reboot_duts(self, autoserv_exit, failures, reset_after_failure):
models = autotest.load('frontend.afe.models')
reboot_after = self._job.reboot_after
if self._final_status() == models.HostQueueEntry.Status.ABORTED:
logger.debug('Should reboot because reboot_after=ABORTED')
return True
elif reboot_after == models.Job.RebootAfter.ALWAYS:
logger.debug('Should reboot because reboot_after=ALWAYS')
return True
elif (reboot_after == models.Job.RebootAfter.IF_ALL_TESTS_PASSED
and autoserv_exit == 0 and failures == 0):
logger.debug('Should reboot because'
' reboot_after=IF_ALL_TESTS_PASSED')
return True
else:
return failures > 0 and not reset_after_failure
def _mark_job_complete(self):
"""Perform Autotest operations needed for job completion."""
final_status = self._final_status()
self._mark_hqes_complete(final_status)
self._stop_job_if_necessary(final_status)
self._release_job_if_sharded()
def _mark_hqes_complete(self, final_status):
"""Perform Autotest HQE operations needed for job completion."""
for hqe in self._job.hostqueueentry_set.all():
self._set_completed_status(hqe, final_status)
def _stop_job_if_necessary(self, final_status):
"""Equivalent to scheduler.modes.Job.stop_if_necessary().
The name isn't informative, but this will stop pre-job tasks as
necessary.
"""
models = autotest.load('frontend.afe.models')
if final_status is not models.HostQueueEntry.Status.ABORTED:
_stop_prejob_hqes(self._job)
def _release_job_if_sharded(self):
if self._job.shard_id is not None:
# If shard_id is None, the job will be synced back to the master
self._job.shard_id = None
self._job.save(update_fields=['shard_id'])
def _final_status(self):
models = autotest.load('frontend.afe.models')
Status = models.HostQueueEntry.Status
if jobx.is_aborted(self._job):
return Status.ABORTED
if self._autoserv_exit == 0:
return Status.COMPLETED
return Status.FAILED
def _set_completed_status(self, hqe, status):
"""Set completed status of HQE.
This is a cleaned up version of the one in scheduler_models to work
with Django models.
"""
hqe.status = status
hqe.active = False
hqe.complete = True
if hqe.started_on:
hqe.finished_on = datetime.datetime.now()
hqe.save(update_fields=['status', 'active', 'complete', 'finished_on'])
self._metrics.send_hqe_completion(hqe)
self._metrics.send_hqe_duration(hqe)
class Metrics(object):
"""Class for sending job metrics."""
def __init__(self):
# Metrics
metrics = autotest.chromite_load('metrics')
self._hqe_completion_metric = metrics.Counter(
'chromeos/autotest/scheduler/hqe_completion_count')
self._reset_after_failure_metric = metrics.Counter(
'chromeos/autotest/scheduler/postjob_tasks/'
'reset_after_failure')
self._host_status_metric = metrics.Boolean(
'chromeos/autotest/dut_status', reset_after=True)
def send_host_status(self, host):
"""Send ts_mon metrics for host status.
@param host: frontend.afe.models.Host instance
"""
labellib = autotest.load('utils.labellib')
labels = labellib.LabelsMapping.from_host(host)
fields = {
'dut_host_name': host.hostname,
'board': labels['board'],
'model': labels['model'],
}
# As each device switches state, indicate that it is not in any
# other state. This allows Monarch queries to avoid double counting
# when additional points are added by the Window Align operation.
for s in host.Status.names:
fields['status'] = s
self._host_status_metric.set(s == host.status, fields=fields)
def send_hqe_completion(self, hqe):
"""Send ts_mon metrics for HQE completion."""
fields = {
'status': hqe.status.lower(),
'board': 'NO_HOST',
'pool': 'NO_HOST',
}
if hqe.host:
labellib = autotest.load('utils.labellib')
labels = labellib.LabelsMapping.from_host(hqe.host)
fields['board'] = labels.get('board', '')
fields['pool'] = labels.get('pool', '')
self._hqe_completion_metric.increment(fields=fields)
def send_hqe_duration(self, hqe):
"""Send CloudTrace metrics for HQE duration."""
if not (hqe.started_on and hqe.finished_on):
return
scheduler_models = autotest.load('scheduler.scheduler_models')
cloud_trace = autotest.chromite_load('cloud_trace')
types = autotest.deps_load('google.protobuf.internal.well_known_types')
hqe_trace_id = scheduler_models.hqe_trace_id
span = cloud_trace.Span(
'HQE', spanId='0', traceId=hqe_trace_id(hqe.id))
span.startTime = types.Timestamp()
span.startTime.FromDatetime(hqe.started_on)
span.endTime = types.Timestamp()
span.endTime.FromDatetime(hqe.finished_on)
cloud_trace.LogSpan(span)
def send_reset_after_failure(self, autoserv_exit, failures):
"""Send reset_after_failure metric."""
self._reset_after_failure_metric.increment(
fields={'autoserv_process_success': autoserv_exit == 0,
# Yes, this is a boolean
'num_tests_failed': failures > 0})
def _mark_hqe_aborted(hqe):
"""Perform Autotest operations needed for HQE abortion.
This also operates on the HQE's host, so prefetch it when possible.
This logic is from scheduler_models.HostQueueEntry.abort().
"""
models = autotest.load('frontend.afe.models')
transaction = autotest.deps_load('django.db.transaction')
Status = models.HostQueueEntry.Status
with transaction.commit_on_success():
if hqe.status in (Status.GATHERING, Status.PARSING):
return
if hqe.status in (Status.STARTING, Status.PENDING, Status.RUNNING):
if hqe.host is None:
return
hqe.host.status = models.Host.Status.READY
hqe.host.save(update_fields=['status'])
hqe.status = Status.ABORTED
hqe.save(update_fields=['status'])
def _stop_prejob_hqes(job):
"""Stop pending HQEs for a job (for synch_count)."""
models = autotest.load('frontend.afe.models')
HQEStatus = models.HostQueueEntry.Status
HostStatus = models.Host.Status
not_yet_run = _get_prejob_hqes(job)
if not_yet_run.count() == job.synch_count:
return
entries_to_stop = _get_prejob_hqes(job, include_active=False)
for hqe in entries_to_stop:
if hqe.status == HQEStatus.PENDING:
hqe.host.status = HostStatus.READY
hqe.host.save(update_fields=['status'])
hqe.status = HQEStatus.STOPPED
hqe.save(update_fields=['status'])
def _get_prejob_hqes(job, include_active=True):
"""Return a queryset of not run HQEs for the job (for synch_count)."""
models = autotest.load('frontend.afe.models')
if include_active:
statuses = list(models.HostQueueEntry.PRE_JOB_STATUSES)
else:
statuses = list(models.HostQueueEntry.IDLE_PRE_JOB_STATUSES)
return models.HostQueueEntry.objects.filter(
job=job, status__in=statuses)
def _retry_db_errors(func):
"""Call func, retrying multiple times if database errors are raised.
crbug.com/863504
"""
django = autotest.deps_load('django')
MySQLdb = autotest.deps_load('MySQLdb')
max_retries = 10
# n ... 0 means n + 1 tries, or 1 try plus n retries
for i in xrange(max_retries, -1, -1):
try:
func()
except (django.db.utils.DatabaseError, MySQLdb.OperationalError) as e:
if i == 0:
raise
logger.debug('Got database error %s, retrying', e)
django.db.close_connection()
time.sleep(5)
else:
break