blob: 1cb816960afd95a5a6cf2f717280ac5c7109ae95 [file] [log] [blame] [edit]
"""Autotest AFE Cleanup used by the scheduler"""
import contextlib
import logging
import random
import time
from autotest_lib.client.common_lib import utils
from autotest_lib.frontend.afe import models
from autotest_lib.scheduler import scheduler_config
from autotest_lib.client.common_lib import global_config
from autotest_lib.client.common_lib import host_protections
try:
from chromite.lib import metrics
except ImportError:
metrics = utils.metrics_mock
_METRICS_PREFIX = 'chromeos/autotest/scheduler/cleanup'
class PeriodicCleanup(object):
"""Base class to schedule periodical cleanup work.
"""
def __init__(self, db, clean_interval_minutes, run_at_initialize=False):
self._db = db
self.clean_interval_minutes = clean_interval_minutes
self._last_clean_time = time.time()
self._run_at_initialize = run_at_initialize
def initialize(self):
"""Method called by scheduler at the startup.
"""
if self._run_at_initialize:
self._cleanup()
def run_cleanup_maybe(self):
"""Test if cleanup method should be called.
"""
should_cleanup = (self._last_clean_time +
self.clean_interval_minutes * 60
< time.time())
if should_cleanup:
self._cleanup()
self._last_clean_time = time.time()
def _cleanup(self):
"""Abrstract cleanup method."""
raise NotImplementedError
class UserCleanup(PeriodicCleanup):
"""User cleanup that is controlled by the global config variable
clean_interval_minutes in the SCHEDULER section.
"""
def __init__(self, db, clean_interval_minutes):
super(UserCleanup, self).__init__(db, clean_interval_minutes)
self._last_reverify_time = time.time()
@metrics.SecondsTimerDecorator(_METRICS_PREFIX + '/user/durations')
def _cleanup(self):
logging.info('Running periodic cleanup')
self._abort_timed_out_jobs()
self._abort_jobs_past_max_runtime()
self._clear_inactive_blocks()
self._check_for_db_inconsistencies()
self._reverify_dead_hosts()
self._django_session_cleanup()
def _abort_timed_out_jobs(self):
logging.info(
'Aborting all jobs that have timed out and are not complete')
query = models.Job.objects.filter(hostqueueentry__complete=False).extra(
where=['created_on + INTERVAL timeout_mins MINUTE < NOW()'])
jobs = query.distinct()
if not jobs:
return
with _cleanup_warning_banner('timed out jobs', len(jobs)):
for job in jobs:
logging.warning('Aborting job %d due to job timeout', job.id)
job.abort()
_report_detected_errors('jobs_timed_out', len(jobs))
def _abort_jobs_past_max_runtime(self):
"""
Abort executions that have started and are past the job's max runtime.
"""
logging.info('Aborting all jobs that have passed maximum runtime')
rows = self._db.execute("""
SELECT hqe.id FROM afe_host_queue_entries AS hqe
WHERE NOT hqe.complete AND NOT hqe.aborted AND EXISTS
(select * from afe_jobs where hqe.job_id=afe_jobs.id and
hqe.started_on + INTERVAL afe_jobs.max_runtime_mins MINUTE < NOW())
""")
query = models.HostQueueEntry.objects.filter(
id__in=[row[0] for row in rows])
hqes = query.distinct()
if not hqes:
return
with _cleanup_warning_banner('hqes past max runtime', len(hqes)):
for queue_entry in hqes:
logging.warning('Aborting entry %s due to max runtime',
queue_entry)
queue_entry.abort()
_report_detected_errors('hqes_past_max_runtime', len(hqes))
def _check_for_db_inconsistencies(self):
logging.info('Cleaning db inconsistencies')
self._check_all_invalid_related_objects()
def _check_invalid_related_objects_one_way(self, invalid_model,
relation_field, valid_model):
if 'invalid' not in invalid_model.get_field_dict():
return
invalid_objects = list(invalid_model.objects.filter(invalid=True))
invalid_model.objects.populate_relationships(
invalid_objects, valid_model, 'related_objects')
if not invalid_objects:
return
num_objects_with_invalid_relations = 0
errors = []
for invalid_object in invalid_objects:
if invalid_object.related_objects:
related_objects = invalid_object.related_objects
related_list = ', '.join(str(x) for x in related_objects)
num_objects_with_invalid_relations += 1
errors.append('Invalid %s is related to: %s' %
(invalid_object, related_list))
related_manager = getattr(invalid_object, relation_field)
related_manager.clear()
# Only log warnings after we're sure we've seen at least one invalid
# model with some valid relations to avoid empty banners from getting
# printed.
if errors:
invalid_model_name = invalid_model.__name__
valid_model_name = valid_model.__name__
banner = 'invalid %s related to valid %s' % (invalid_model_name,
valid_model_name)
with _cleanup_warning_banner(banner, len(errors)):
for error in errors:
logging.warning(error)
_report_detected_errors(
'invalid_related_objects',
num_objects_with_invalid_relations,
fields={'invalid_model': invalid_model_name,
'valid_model': valid_model_name})
_report_detected_errors(
'invalid_related_objects_relations',
len(errors),
fields={'invalid_model': invalid_model_name,
'valid_model': valid_model_name})
def _check_invalid_related_objects(self, first_model, first_field,
second_model, second_field):
self._check_invalid_related_objects_one_way(
first_model,
first_field,
second_model,
)
self._check_invalid_related_objects_one_way(
second_model,
second_field,
first_model,
)
def _check_all_invalid_related_objects(self):
model_pairs = ((models.Host, 'labels', models.Label, 'host_set'),
(models.AclGroup, 'hosts', models.Host, 'aclgroup_set'),
(models.AclGroup, 'users', models.User, 'aclgroup_set'),
(models.Test, 'dependency_labels', models.Label,
'test_set'))
for first_model, first_field, second_model, second_field in model_pairs:
self._check_invalid_related_objects(
first_model,
first_field,
second_model,
second_field,
)
def _clear_inactive_blocks(self):
logging.info('Clear out blocks for all completed jobs.')
# this would be simpler using NOT IN (subquery), but MySQL
# treats all IN subqueries as dependent, so this optimizes much
# better
self._db.execute("""
DELETE ihq FROM afe_ineligible_host_queues ihq
WHERE NOT EXISTS
(SELECT job_id FROM afe_host_queue_entries hqe
WHERE NOT hqe.complete AND hqe.job_id = ihq.job_id)""")
def _should_reverify_hosts_now(self):
reverify_period_sec = (scheduler_config.config.reverify_period_minutes
* 60)
if reverify_period_sec == 0:
return False
return (self._last_reverify_time + reverify_period_sec) <= time.time()
def _choose_subset_of_hosts_to_reverify(self, hosts):
"""Given hosts needing verification, return a subset to reverify."""
max_at_once = scheduler_config.config.reverify_max_hosts_at_once
if (max_at_once > 0 and len(hosts) > max_at_once):
return random.sample(hosts, max_at_once)
return sorted(hosts)
def _reverify_dead_hosts(self):
if not self._should_reverify_hosts_now():
return
self._last_reverify_time = time.time()
logging.info('Checking for dead hosts to reverify')
hosts = models.Host.objects.filter(
status=models.Host.Status.REPAIR_FAILED,
locked=False,
invalid=False)
hosts = hosts.exclude(
protection=host_protections.Protection.DO_NOT_VERIFY)
if not hosts:
return
hosts = list(hosts)
total_hosts = len(hosts)
hosts = self._choose_subset_of_hosts_to_reverify(hosts)
logging.info('Reverifying dead hosts (%d of %d)', len(hosts),
total_hosts)
with _cleanup_warning_banner('reverify dead hosts', len(hosts)):
for host in hosts:
logging.warning(host.hostname)
_report_detected_errors('dead_hosts_triggered_reverify', len(hosts))
_report_detected_errors('dead_hosts_require_reverify', total_hosts)
for host in hosts:
models.SpecialTask.schedule_special_task(
host=host, task=models.SpecialTask.Task.VERIFY)
def _django_session_cleanup(self):
"""Clean up django_session since django doesn't for us.
http://www.djangoproject.com/documentation/0.96/sessions/
"""
logging.info('Deleting old sessions from django_session')
sql = 'TRUNCATE TABLE django_session'
self._db.execute(sql)
class TwentyFourHourUpkeep(PeriodicCleanup):
"""Cleanup that runs at the startup of monitor_db and every subsequent
twenty four hours.
"""
def __init__(self, db, drone_manager, run_at_initialize=True):
"""Initialize TwentyFourHourUpkeep.
@param db: Database connection object.
@param drone_manager: DroneManager to access drones.
@param run_at_initialize: True to run cleanup when scheduler starts.
Default is set to True.
"""
self.drone_manager = drone_manager
clean_interval_minutes = 24 * 60 # 24 hours
super(TwentyFourHourUpkeep, self).__init__(
db, clean_interval_minutes, run_at_initialize=run_at_initialize)
@metrics.SecondsTimerDecorator(_METRICS_PREFIX + '/daily/durations')
def _cleanup(self):
logging.info('Running 24 hour clean up')
self._check_for_uncleanable_db_inconsistencies()
self._cleanup_orphaned_containers()
def _check_for_uncleanable_db_inconsistencies(self):
logging.info('Checking for uncleanable DB inconsistencies')
self._check_for_active_and_complete_queue_entries()
self._check_for_multiple_platform_hosts()
self._check_for_no_platform_hosts()
def _check_for_active_and_complete_queue_entries(self):
query = models.HostQueueEntry.objects.filter(active=True, complete=True)
num_bad_hqes = query.count()
if num_bad_hqes == 0:
return
num_aborted = 0
logging.warning('%d queue entries found with active=complete=1',
num_bad_hqes)
with _cleanup_warning_banner('active and complete hqes', num_bad_hqes):
for entry in query:
if entry.status == 'Aborted':
entry.active = False
entry.save()
recovery_path = 'was also aborted, set active to False'
num_aborted += 1
else:
recovery_path = 'can not recover'
logging.warning('%s (recovery: %s)', entry.get_object_dict(),
recovery_path)
_report_detected_errors('hqes_active_and_complete', num_bad_hqes)
_report_detected_errors('hqes_aborted_set_to_inactive', num_aborted)
def _check_for_multiple_platform_hosts(self):
rows = self._db.execute("""
SELECT afe_hosts.id, hostname, COUNT(1) AS platform_count,
GROUP_CONCAT(afe_labels.name)
FROM afe_hosts
INNER JOIN afe_hosts_labels ON
afe_hosts.id = afe_hosts_labels.host_id
INNER JOIN afe_labels ON afe_hosts_labels.label_id = afe_labels.id
WHERE afe_labels.platform
GROUP BY afe_hosts.id
HAVING platform_count > 1
ORDER BY hostname""")
if rows:
logging.warning('Cleanup found hosts with multiple platforms')
with _cleanup_warning_banner('hosts with multiple platforms',
len(rows)):
for row in rows:
logging.warning(' '.join(str(item) for item in row))
_report_detected_errors('hosts_with_multiple_platforms', len(rows))
def _check_for_no_platform_hosts(self):
rows = self._db.execute("""
SELECT hostname
FROM afe_hosts
LEFT JOIN afe_hosts_labels
ON afe_hosts.id = afe_hosts_labels.host_id
AND afe_hosts_labels.label_id IN (SELECT id FROM afe_labels
WHERE platform)
WHERE NOT afe_hosts.invalid AND afe_hosts_labels.host_id IS NULL""")
if rows:
with _cleanup_warning_banner('hosts with no platform', len(rows)):
for row in rows:
logging.warning(row[0])
_report_detected_errors('hosts_with_no_platform', len(rows))
def _cleanup_orphaned_containers(self):
"""Cleanup orphaned containers in each drone.
The function queues a lxc_cleanup call in each drone without waiting for
the script to finish, as the cleanup procedure could take minutes and the
script output is logged.
"""
ssp_enabled = global_config.global_config.get_config_value(
'AUTOSERV', 'enable_ssp_container')
if not ssp_enabled:
logging.info(
'Server-side packaging is not enabled, no need to clean '
'up orphaned containers.')
return
self.drone_manager.cleanup_orphaned_containers()
def _report_detected_errors(metric_name, count, fields={}):
"""Reports a counter metric for recovered errors
@param metric_name: Name of the metric to report about.
@param count: How many "errors" were fixed this cycle.
@param fields: Optional fields to include with the metric.
"""
m = '%s/errors_recovered/%s' % (_METRICS_PREFIX, metric_name)
metrics.Counter(m).increment_by(count, fields=fields)
def _report_detected_errors(metric_name, gauge, fields={}):
"""Reports a gauge metric for errors detected
@param metric_name: Name of the metric to report about.
@param gauge: Outstanding number of unrecoverable errors of this type.
@param fields: Optional fields to include with the metric.
"""
m = '%s/errors_detected/%s' % (_METRICS_PREFIX, metric_name)
metrics.Gauge(m).set(gauge, fields=fields)
@contextlib.contextmanager
def _cleanup_warning_banner(banner, error_count=None):
"""Put a clear context in the logs around list of errors
@param: banner: The identifying header to print for context.
@param: error_count: If not None, the number of errors detected.
"""
if error_count is not None:
banner += ' (total: %d)' % error_count
logging.warning('#### START: %s ####', banner)
try:
yield
finally:
logging.warning('#### END: %s ####', banner)