| # Copyright 2017 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Monitor jobs and abort them as necessary. |
| |
| This daemon does a number of upkeep tasks: |
| |
| * When a process owning a job crashes, job_aborter will mark the job as |
| aborted in the database and clean up its lease files. |
| |
| * When a job is marked aborted in the database, job_aborter will signal |
| the process owning the job to abort. |
| |
| See also http://goto.google.com/monitor_db_per_job_refactor |
| """ |
| |
| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
| |
| import argparse |
| import logging |
| import sys |
| import time |
| |
| from lucifer import autotest |
| from lucifer import handoffs |
| from lucifer import leasing |
| from lucifer import loglib |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| def main(args): |
| """Main function |
| |
| @param args: list of command line args |
| """ |
| |
| parser = argparse.ArgumentParser(prog='job_aborter', description=__doc__) |
| parser.add_argument('--jobdir', required=True) |
| loglib.add_logging_options(parser) |
| args = parser.parse_args(args) |
| loglib.configure_logging_with_args(parser, args) |
| logger.info('Starting with args: %r', args) |
| |
| autotest.monkeypatch() |
| ts_mon_config = autotest.chromite_load('ts_mon_config') |
| with ts_mon_config.SetupTsMonGlobalState('job_aborter'): |
| _main_loop(jobdir=args.jobdir) |
| assert False # cannot exit normally |
| |
| |
| def _main_loop(jobdir): |
| transaction = autotest.deps_load('django.db.transaction') |
| |
| @transaction.commit_manually |
| def flush_transaction(): |
| """Flush transaction https://stackoverflow.com/questions/3346124/""" |
| transaction.commit() |
| |
| metrics = _Metrics() |
| metrics.send_starting() |
| while True: |
| logger.debug('Tick') |
| metrics.send_tick() |
| _main_loop_body(metrics, jobdir) |
| flush_transaction() |
| time.sleep(20) |
| |
| |
| def _main_loop_body(metrics, jobdir): |
| active_leases = { |
| lease.id: lease for lease in leasing.leases_iter(jobdir) |
| if not lease.expired() |
| } |
| _mark_expired_jobs_failed(metrics, active_leases) |
| _abort_timed_out_jobs(active_leases) |
| _abort_jobs_marked_aborting(active_leases) |
| _abort_special_tasks_marked_aborted() |
| _clean_up_expired_leases(jobdir) |
| # TODO(crbug.com/748234): abort_jobs_past_max_runtime goes into lucifer |
| |
| |
| def _mark_expired_jobs_failed(metrics, active_leases): |
| """Mark expired jobs failed. |
| |
| Expired jobs are jobs that have an incomplete JobHandoff and that do |
| not have an active lease. These jobs have been handed off to a |
| job_reporter, but that job_reporter has crashed. These jobs are |
| marked failed in the database. |
| |
| @param metrics: _Metrics instance. |
| @param active_leases: dict mapping job ids to Leases. |
| """ |
| logger.debug('Looking for expired jobs') |
| job_ids = [] |
| for handoff in handoffs.incomplete(): |
| logger.debug('Found handoff: %d', handoff.job_id) |
| if handoff.job_id not in active_leases: |
| logger.info('Handoff %d is missing active lease; cleaning up', |
| handoff.job_id) |
| job_ids.append(handoff.job_id) |
| handoffs.clean_up(job_ids) |
| handoffs.mark_complete(job_ids) |
| metrics.send_expired_jobs(len(job_ids)) |
| |
| |
| def _abort_timed_out_jobs(active_leases): |
| """Send abort to timed out jobs. |
| |
| @param active_leases: dict mapping job ids to Leases. |
| """ |
| for job in _timed_out_jobs_queryset(): |
| if job.id in active_leases: |
| logger.info('Job %d is timed out; aborting', job.id) |
| active_leases[job.id].maybe_abort() |
| |
| |
| def _abort_jobs_marked_aborting(active_leases): |
| """Send abort to jobs marked aborting in Autotest database. |
| |
| @param active_leases: dict mapping job ids to Leases. |
| """ |
| for job in _aborting_jobs_queryset(): |
| if job.id in active_leases: |
| logger.info('Job %d is marked for aborting; aborting', job.id) |
| active_leases[job.id].maybe_abort() |
| |
| |
| def _abort_special_tasks_marked_aborted(): |
| # TODO(crbug.com/748234): Special tasks not implemented yet. This |
| # would abort jobs running on the behalf of special tasks and thus |
| # need to check a different database table. |
| pass |
| |
| |
| def _clean_up_expired_leases(jobdir): |
| """Clean up files for expired leases. |
| |
| We only care about active leases, so we can remove the stale files |
| for expired leases. |
| """ |
| for lease in leasing.leases_iter(jobdir): |
| if lease.expired(): |
| lease.cleanup() |
| |
| |
| def _timed_out_jobs_queryset(): |
| """Return a QuerySet of timed out Jobs. |
| |
| @returns: Django QuerySet |
| """ |
| models = autotest.load('frontend.afe.models') |
| return ( |
| models.Job.objects |
| .filter(hostqueueentry__complete=False) |
| .extra(where=['created_on + INTERVAL timeout_mins MINUTE < NOW()']) |
| .distinct() |
| ) |
| |
| |
| def _aborting_jobs_queryset(): |
| """Return a QuerySet of aborting Jobs. |
| |
| @returns: Django QuerySet |
| """ |
| models = autotest.load('frontend.afe.models') |
| return ( |
| models.Job.objects |
| .filter(hostqueueentry__aborted=True) |
| .filter(hostqueueentry__complete=False) |
| .distinct() |
| ) |
| |
| |
| class _Metrics(object): |
| |
| """Class for sending job_aborter metrics.""" |
| |
| def __init__(self): |
| metrics = autotest.chromite_load('metrics') |
| prefix = 'chromeos/lucifer/job_aborter' |
| self._starting_m = metrics.Counter(prefix + '/start') |
| self._tick_m = metrics.Counter(prefix + '/tick') |
| self._expired_m = metrics.Counter(prefix + '/expired_jobs') |
| |
| def send_starting(self): |
| """Send starting metric.""" |
| self._starting_m.increment() |
| |
| def send_tick(self): |
| """Send tick metric.""" |
| self._tick_m.increment() |
| |
| def send_expired_jobs(self, count): |
| """Send expired_jobs metric.""" |
| self._expired_m.increment_by(count) |
| |
| |
| if __name__ == '__main__': |
| main(sys.argv[1:]) |