blob: 7ff5b2396cbf2002df8aa2bfb850f23d84092010 [file] [log] [blame]
# Copyright 2017 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Event handlers."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import datetime
import logging
from lucifer import autotest
from lucifer import jobx
logger = logging.getLogger(__name__)
class EventHandler(object):
"""Event handling dispatcher.
Event handlers are implemented as methods named _handle_<event value>.
Each handler method must handle its exceptions accordingly. If an
exception escapes, the job dies on the spot.
Instances have one public attribute completed. completed is set to
True once the final COMPLETED event is received and the handler
finishes.
"""
def __init__(self, metrics, job, autoserv_exit):
"""Initialize instance.
@param metrics: Metrics instance
@param job: frontend.afe.models.Job instance to own
@param hqes: list of HostQueueEntry instances for the job
@param autoserv_exit: autoserv exit status
"""
self.completed = False
self._metrics = metrics
self._job = job
# TODO(crbug.com/748234): autoserv not implemented yet.
self._autoserv_exit = autoserv_exit
def __call__(self, event, msg):
logger.debug('Received event %r with message %r', event.name, msg)
method_name = '_handle_%s' % event.value
try:
handler = getattr(self, method_name)
except AttributeError:
raise NotImplementedError('%s is not implemented for handling %s',
method_name, event.name)
handler(msg)
def _handle_starting(self, msg):
# TODO(crbug.com/748234): No event update needed yet.
pass
def _handle_gathering(self, msg):
# TODO(crbug.com/794779): monitor_db leaves HQEs in GATHERING
pass
def _handle_x_tests_done(self, msg):
"""Taken from GatherLogsTask.epilog."""
autoserv_exit, failures = (int(x) for x in msg.split(','))
logger.debug('Got autoserv_exit=%d, failures=%d',
autoserv_exit, failures)
success = (autoserv_exit == 0 and failures == 0)
reset_after_failure = not self._job.run_reset and not success
hqes = self._job.hostqueueentry_set.all().prefetch_related('host')
if self._should_reboot_duts(autoserv_exit, failures,
reset_after_failure):
logger.debug('Creating cleanup jobs for hosts')
for entry in hqes:
self._handle_host_needs_cleanup(entry.host.hostname)
else:
logger.debug('Not creating cleanup jobs for hosts')
for entry in hqes:
self._handle_host_ready(entry.host.hostname)
if not reset_after_failure:
logger.debug('Skipping reset because reset_after_failure is False')
return
logger.debug('Creating reset jobs for hosts')
self._metrics.send_reset_after_failure(autoserv_exit, failures)
for entry in hqes:
self._handle_host_needs_reset(entry.host.hostname)
def _handle_parsing(self, _msg):
models = autotest.load('frontend.afe.models')
self._job.hostqueueentry_set.all().update(
status=models.HostQueueEntry.Status.PARSING)
def _handle_completed(self, _msg):
models = autotest.load('frontend.afe.models')
final_status = self._final_status()
for hqe in self._job.hostqueueentry_set.all():
self._set_completed_status(hqe, final_status)
if final_status is not models.HostQueueEntry.Status.ABORTED:
_stop_prejob_hqes(self._job)
if self._job.shard_id is not None:
# If shard_id is None, the job will be synced back to the master
self._job.shard_id = None
self._job.save()
self.completed = True
def _handle_host_ready(self, msg):
models = autotest.load('frontend.afe.models')
(models.Host.objects.filter(hostname=msg)
.update(status=models.Host.Status.READY))
def _handle_host_needs_cleanup(self, msg):
models = autotest.load('frontend.afe.models')
host = models.Host.objects.get(hostname=msg)
models.SpecialTask.objects.create(
host_id=host.id,
task=models.SpecialTask.Task.CLEANUP,
requested_by=models.User.objects.get(login=self._job.owner))
def _handle_host_needs_reset(self, msg):
models = autotest.load('frontend.afe.models')
host = models.Host.objects.get(hostname=msg)
models.SpecialTask.objects.create(
host_id=host.id,
task=models.SpecialTask.Task.RESET,
requested_by=models.User.objects.get(login=self._job.owner))
def _should_reboot_duts(self, autoserv_exit, failures, reset_after_failure):
models = autotest.load('frontend.afe.models')
reboot_after = self._job.reboot_after
if self._final_status() == models.HostQueueEntry.Status.ABORTED:
logger.debug('Should reboot because reboot_after=ABORTED')
return True
elif reboot_after == models.Job.RebootAfter.ALWAYS:
logger.debug('Should reboot because reboot_after=ALWAYS')
return True
elif (reboot_after == models.Job.RebootAfter.IF_ALL_TESTS_PASSED
and autoserv_exit == 0 and failures == 0):
logger.debug('Should reboot because'
' reboot_after=IF_ALL_TESTS_PASSED')
return True
else:
return failures > 0 and not reset_after_failure
def _final_status(self):
models = autotest.load('frontend.afe.models')
Status = models.HostQueueEntry.Status
if jobx.is_aborted(self._job):
return Status.ABORTED
if self._autoserv_exit == 0:
return Status.COMPLETED
return Status.FAILED
def _set_completed_status(self, hqe, status):
"""Set completed status of HQE.
This is a cleaned up version of the one in scheduler_models to work
with Django models.
"""
hqe.status = status
hqe.active = False
hqe.complete = True
if hqe.started_on:
hqe.finished_on = datetime.datetime.now()
hqe.save()
self._metrics.send_hqe_completion(hqe)
self._metrics.send_hqe_duration(hqe)
class Metrics(object):
"""Class for sending job metrics."""
def __init__(self):
# Metrics
metrics = autotest.chromite_load('metrics')
self._hqe_completion_metric = metrics.Counter(
'chromeos/autotest/scheduler/hqe_completion_count')
self._reset_after_failure_metric = metrics.Counter(
'chromeos/autotest/scheduler/postjob_tasks/'
'reset_after_failure')
def send_hqe_completion(self, hqe):
"""Send ts_mon metrics for HQE completion."""
fields = {
'status': hqe.status.lower(),
'board': 'NO_HOST',
'pool': 'NO_HOST',
}
if hqe.host:
labellib = autotest.load('utils.labellib')
labels = labellib.LabelsMapping.from_host(hqe.host)
fields['board'] = labels.get('board', '')
fields['pool'] = labels.get('pool', '')
self._hqe_completion_metric.increment(fields=fields)
def send_hqe_duration(self, hqe):
"""Send CloudTrace metrics for HQE duration."""
if not (hqe.started_on and hqe.finished_on):
return
scheduler_models = autotest.load('scheduler.scheduler_models')
cloud_trace = autotest.chromite_load('cloud_trace')
types = autotest.deps_load('google.protobuf.internal.well_known_types')
hqe_trace_id = scheduler_models.hqe_trace_id
span = cloud_trace.Span(
'HQE', spanId='0', traceId=hqe_trace_id(hqe.id))
span.startTime = types.Timestamp()
span.startTime.FromDatetime(hqe.started_on)
span.endTime = types.Timestamp()
span.endTime.FromDatetime(hqe.finished_on)
cloud_trace.LogSpan(span)
def send_reset_after_failure(self, autoserv_exit, failures):
"""Send reset_after_failure metric."""
self._reset_after_failure_metric.increment(
fields={'autoserv_process_success': autoserv_exit == 0,
# Yes, this is a boolean
'num_tests_failed': failures > 0})
def _stop_prejob_hqes(job):
"""Stop pending HQEs for a job (for synch_count)."""
models = autotest.load('frontend.afe.models')
HQEStatus = models.HostQueueEntry.Status
HostStatus = models.Host.Status
not_yet_run = _get_prejob_hqes(job)
if not_yet_run.count() == job.synch_count:
return
entries_to_stop = _get_prejob_hqes(job, include_active=False)
for hqe in entries_to_stop:
if hqe.status == HQEStatus.PENDING:
hqe.host.status = HostStatus.READY
hqe.host.save()
hqe.status = HQEStatus.STOPPED
hqe.save()
def _get_prejob_hqes(job, include_active=True):
"""Return a queryset of not run HQEs for the job (for synch_count)."""
models = autotest.load('frontend.afe.models')
if include_active:
statuses = list(models.HostQueueEntry.PRE_JOB_STATUSES)
else:
statuses = list(models.HostQueueEntry.IDLE_PRE_JOB_STATUSES)
return models.HostQueueEntry.objects.filter(
job=job, status__in=statuses)