blob: d637a2bc43a47395964a3b92821c1fdb53798009 [file] [log] [blame] [edit]
# pylint: disable=missing-docstring
import contextlib
from datetime import datetime
from datetime import timedelta
import logging
import os
import django.core
try:
from django.db import models as dbmodels, connection
except django.core.exceptions.ImproperlyConfigured:
raise ImportError('Django database not yet configured. Import either '
'setup_django_environment or '
'setup_django_lite_environment from '
'autotest_lib.frontend before any imports that '
'depend on django models.')
from xml.sax import saxutils
import common
from autotest_lib.frontend.afe import model_logic, model_attributes
from autotest_lib.frontend.afe import rdb_model_extensions
from autotest_lib.frontend import settings, thread_local
from autotest_lib.client.common_lib import autotest_enum, error, host_protections
from autotest_lib.client.common_lib import global_config
from autotest_lib.client.common_lib import host_queue_entry_states
from autotest_lib.client.common_lib import control_data, priorities, decorators
from autotest_lib.server import utils as server_utils
# job options and user preferences
DEFAULT_REBOOT_BEFORE = model_attributes.RebootBefore.IF_DIRTY
DEFAULT_REBOOT_AFTER = model_attributes.RebootBefore.NEVER
RESPECT_STATIC_LABELS = global_config.global_config.get_config_value(
'SKYLAB', 'respect_static_labels', type=bool, default=False)
RESPECT_STATIC_ATTRIBUTES = global_config.global_config.get_config_value(
'SKYLAB', 'respect_static_attributes', type=bool, default=False)
class AclAccessViolation(Exception):
"""\
Raised when an operation is attempted with proper permissions as
dictated by ACLs.
"""
class AtomicGroup(model_logic.ModelWithInvalid, dbmodels.Model):
"""\
An atomic group defines a collection of hosts which must only be scheduled
all at once. Any host with a label having an atomic group will only be
scheduled for a job at the same time as other hosts sharing that label.
Required:
name: A name for this atomic group, e.g. 'rack23' or 'funky_net'.
max_number_of_machines: The maximum number of machines that will be
scheduled at once when scheduling jobs to this atomic group.
The job.synch_count is considered the minimum.
Optional:
description: Arbitrary text description of this group's purpose.
"""
name = dbmodels.CharField(max_length=255, unique=True)
description = dbmodels.TextField(blank=True)
# This magic value is the default to simplify the scheduler logic.
# It must be "large". The common use of atomic groups is to want all
# machines in the group to be used, limits on which subset used are
# often chosen via dependency labels.
# TODO(dennisjeffrey): Revisit this so we don't have to assume that
# "infinity" is around 3.3 million.
INFINITE_MACHINES = 333333333
max_number_of_machines = dbmodels.IntegerField(default=INFINITE_MACHINES)
invalid = dbmodels.BooleanField(default=False,
editable=settings.FULL_ADMIN)
name_field = 'name'
objects = model_logic.ModelWithInvalidManager()
valid_objects = model_logic.ValidObjectsManager()
def enqueue_job(self, job, is_template=False):
"""Enqueue a job on an associated atomic group of hosts.
@param job: A job to enqueue.
@param is_template: Whether the status should be "Template".
"""
queue_entry = HostQueueEntry.create(atomic_group=self, job=job,
is_template=is_template)
queue_entry.save()
def clean_object(self):
self.label_set.clear()
class Meta:
"""Metadata for class AtomicGroup."""
db_table = 'afe_atomic_groups'
def __unicode__(self):
return unicode(self.name)
class Label(model_logic.ModelWithInvalid, dbmodels.Model):
"""\
Required:
name: label name
Optional:
kernel_config: URL/path to kernel config for jobs run on this label.
platform: If True, this is a platform label (defaults to False).
only_if_needed: If True, a Host with this label can only be used if that
label is requested by the job/test (either as the meta_host or
in the job_dependencies).
atomic_group: The atomic group associated with this label.
"""
name = dbmodels.CharField(max_length=255, unique=True)
kernel_config = dbmodels.CharField(max_length=255, blank=True)
platform = dbmodels.BooleanField(default=False)
invalid = dbmodels.BooleanField(default=False,
editable=settings.FULL_ADMIN)
only_if_needed = dbmodels.BooleanField(default=False)
name_field = 'name'
objects = model_logic.ModelWithInvalidManager()
valid_objects = model_logic.ValidObjectsManager()
atomic_group = dbmodels.ForeignKey(AtomicGroup, null=True, blank=True)
def clean_object(self):
self.host_set.clear()
self.test_set.clear()
def enqueue_job(self, job, is_template=False):
"""Enqueue a job on any host of this label.
@param job: A job to enqueue.
@param is_template: Whether the status should be "Template".
"""
queue_entry = HostQueueEntry.create(meta_host=self, job=job,
is_template=is_template)
queue_entry.save()
class Meta:
"""Metadata for class Label."""
db_table = 'afe_labels'
def __unicode__(self):
return unicode(self.name)
def is_replaced_by_static(self):
"""Detect whether a label is replaced by a static label.
'Static' means it can only be modified by skylab inventory tools.
"""
if RESPECT_STATIC_LABELS:
replaced = ReplacedLabel.objects.filter(label__id=self.id)
if len(replaced) > 0:
return True
return False
class StaticLabel(model_logic.ModelWithInvalid, dbmodels.Model):
"""\
Required:
name: label name
Optional:
kernel_config: URL/path to kernel config for jobs run on this label.
platform: If True, this is a platform label (defaults to False).
only_if_needed: Deprecated. This is always False.
atomic_group: Deprecated. This is always NULL.
"""
name = dbmodels.CharField(max_length=255, unique=True)
kernel_config = dbmodels.CharField(max_length=255, blank=True)
platform = dbmodels.BooleanField(default=False)
invalid = dbmodels.BooleanField(default=False,
editable=settings.FULL_ADMIN)
only_if_needed = dbmodels.BooleanField(default=False)
name_field = 'name'
objects = model_logic.ModelWithInvalidManager()
valid_objects = model_logic.ValidObjectsManager()
atomic_group = dbmodels.ForeignKey(AtomicGroup, null=True, blank=True)
def clean_object(self):
self.host_set.clear()
self.test_set.clear()
class Meta:
"""Metadata for class StaticLabel."""
db_table = 'afe_static_labels'
def __unicode__(self):
return unicode(self.name)
class ReplacedLabel(dbmodels.Model, model_logic.ModelExtensions):
"""The tag to indicate Whether to replace labels with static labels."""
label = dbmodels.ForeignKey(Label)
objects = model_logic.ExtendedManager()
class Meta:
"""Metadata for class ReplacedLabel."""
db_table = 'afe_replaced_labels'
def __unicode__(self):
return unicode(self.label)
class Shard(dbmodels.Model, model_logic.ModelExtensions):
hostname = dbmodels.CharField(max_length=255, unique=True)
name_field = 'hostname'
labels = dbmodels.ManyToManyField(Label, blank=True,
db_table='afe_shards_labels')
class Meta:
"""Metadata for class ParameterizedJob."""
db_table = 'afe_shards'
class Drone(dbmodels.Model, model_logic.ModelExtensions):
"""
A scheduler drone
hostname: the drone's hostname
"""
hostname = dbmodels.CharField(max_length=255, unique=True)
name_field = 'hostname'
objects = model_logic.ExtendedManager()
def save(self, *args, **kwargs):
if not User.current_user().is_superuser():
raise Exception('Only superusers may edit drones')
super(Drone, self).save(*args, **kwargs)
def delete(self):
if not User.current_user().is_superuser():
raise Exception('Only superusers may delete drones')
super(Drone, self).delete()
class Meta:
"""Metadata for class Drone."""
db_table = 'afe_drones'
def __unicode__(self):
return unicode(self.hostname)
class DroneSet(dbmodels.Model, model_logic.ModelExtensions):
"""
A set of scheduler drones
These will be used by the scheduler to decide what drones a job is allowed
to run on.
name: the drone set's name
drones: the drones that are part of the set
"""
DRONE_SETS_ENABLED = global_config.global_config.get_config_value(
'SCHEDULER', 'drone_sets_enabled', type=bool, default=False)
DEFAULT_DRONE_SET_NAME = global_config.global_config.get_config_value(
'SCHEDULER', 'default_drone_set_name', default=None)
name = dbmodels.CharField(max_length=255, unique=True)
drones = dbmodels.ManyToManyField(Drone, db_table='afe_drone_sets_drones')
name_field = 'name'
objects = model_logic.ExtendedManager()
def save(self, *args, **kwargs):
if not User.current_user().is_superuser():
raise Exception('Only superusers may edit drone sets')
super(DroneSet, self).save(*args, **kwargs)
def delete(self):
if not User.current_user().is_superuser():
raise Exception('Only superusers may delete drone sets')
super(DroneSet, self).delete()
@classmethod
def drone_sets_enabled(cls):
"""Returns whether drone sets are enabled.
@param cls: Implicit class object.
"""
return cls.DRONE_SETS_ENABLED
@classmethod
def default_drone_set_name(cls):
"""Returns the default drone set name.
@param cls: Implicit class object.
"""
return cls.DEFAULT_DRONE_SET_NAME
@classmethod
def get_default(cls):
"""Gets the default drone set name, compatible with Job.add_object.
@param cls: Implicit class object.
"""
return cls.smart_get(cls.DEFAULT_DRONE_SET_NAME)
@classmethod
def resolve_name(cls, drone_set_name):
"""
Returns the name of one of these, if not None, in order of preference:
1) the drone set given,
2) the current user's default drone set, or
3) the global default drone set
or returns None if drone sets are disabled
@param cls: Implicit class object.
@param drone_set_name: A drone set name.
"""
if not cls.drone_sets_enabled():
return None
user = User.current_user()
user_drone_set_name = user.drone_set and user.drone_set.name
return drone_set_name or user_drone_set_name or cls.get_default().name
def get_drone_hostnames(self):
"""
Gets the hostnames of all drones in this drone set
"""
return set(self.drones.all().values_list('hostname', flat=True))
class Meta:
"""Metadata for class DroneSet."""
db_table = 'afe_drone_sets'
def __unicode__(self):
return unicode(self.name)
class User(dbmodels.Model, model_logic.ModelExtensions):
"""\
Required:
login :user login name
Optional:
access_level: 0=User (default), 1=Admin, 100=Root
"""
ACCESS_ROOT = 100
ACCESS_ADMIN = 1
ACCESS_USER = 0
AUTOTEST_SYSTEM = 'autotest_system'
login = dbmodels.CharField(max_length=255, unique=True)
access_level = dbmodels.IntegerField(default=ACCESS_USER, blank=True)
# user preferences
reboot_before = dbmodels.SmallIntegerField(
choices=model_attributes.RebootBefore.choices(), blank=True,
default=DEFAULT_REBOOT_BEFORE)
reboot_after = dbmodels.SmallIntegerField(
choices=model_attributes.RebootAfter.choices(), blank=True,
default=DEFAULT_REBOOT_AFTER)
drone_set = dbmodels.ForeignKey(DroneSet, null=True, blank=True)
show_experimental = dbmodels.BooleanField(default=False)
name_field = 'login'
objects = model_logic.ExtendedManager()
def save(self, *args, **kwargs):
# is this a new object being saved for the first time?
first_time = (self.id is None)
user = thread_local.get_user()
if user and not user.is_superuser() and user.login != self.login:
raise AclAccessViolation("You cannot modify user " + self.login)
super(User, self).save(*args, **kwargs)
if first_time:
everyone = AclGroup.objects.get(name='Everyone')
everyone.users.add(self)
def is_superuser(self):
"""Returns whether the user has superuser access."""
return self.access_level >= self.ACCESS_ROOT
@classmethod
def current_user(cls):
"""Returns the current user.
@param cls: Implicit class object.
"""
user = thread_local.get_user()
if user is None:
user, _ = cls.objects.get_or_create(login=cls.AUTOTEST_SYSTEM)
user.access_level = cls.ACCESS_ROOT
user.save()
return user
@classmethod
def get_record(cls, data):
"""Check the database for an identical record.
Check for a record with matching id and login. If one exists,
return it. If one does not exist there is a possibility that
the following cases have happened:
1. Same id, different login
We received: "1 chromeos-test"
And we have: "1 debug-user"
In this case we need to delete "1 debug_user" and insert
"1 chromeos-test".
2. Same login, different id:
We received: "1 chromeos-test"
And we have: "2 chromeos-test"
In this case we need to delete "2 chromeos-test" and insert
"1 chromeos-test".
As long as this method deletes bad records and raises the
DoesNotExist exception the caller will handle creating the
new record.
@raises: DoesNotExist, if a record with the matching login and id
does not exist.
"""
# Both the id and login should be uniqe but there are cases when
# we might already have a user with the same login/id because
# current_user will proactively create a user record if it doesn't
# exist. Since we want to avoid conflict between the main and
# shard, just delete any existing user records that don't match
# what we're about to deserialize from the main.
try:
return cls.objects.get(login=data['login'], id=data['id'])
except cls.DoesNotExist:
cls.delete_matching_record(login=data['login'])
cls.delete_matching_record(id=data['id'])
raise
class Meta:
"""Metadata for class User."""
db_table = 'afe_users'
def __unicode__(self):
return unicode(self.login)
class Host(model_logic.ModelWithInvalid, rdb_model_extensions.AbstractHostModel,
model_logic.ModelWithAttributes):
"""\
Required:
hostname
optional:
locked: if true, host is locked and will not be queued
Internal:
From AbstractHostModel:
status: string describing status of host
invalid: true if the host has been deleted
protection: indicates what can be done to this host during repair
lock_time: DateTime at which the host was locked
dirty: true if the host has been used without being rebooted
Local:
locked_by: user that locked the host, or null if the host is unlocked
"""
SERIALIZATION_LINKS_TO_FOLLOW = set(['aclgroup_set',
'hostattribute_set',
'labels',
'shard'])
SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['invalid'])
def custom_deserialize_relation(self, link, data):
assert link == 'shard', 'Link %s should not be deserialized' % link
self.shard = Shard.deserialize(data)
# Note: Only specify foreign keys here, specify all native host columns in
# rdb_model_extensions instead.
Protection = host_protections.Protection
labels = dbmodels.ManyToManyField(Label, blank=True,
db_table='afe_hosts_labels')
static_labels = dbmodels.ManyToManyField(
StaticLabel, blank=True, db_table='afe_static_hosts_labels')
locked_by = dbmodels.ForeignKey(User, null=True, blank=True, editable=False)
name_field = 'hostname'
objects = model_logic.ModelWithInvalidManager()
valid_objects = model_logic.ValidObjectsManager()
leased_objects = model_logic.LeasedHostManager()
shard = dbmodels.ForeignKey(Shard, blank=True, null=True)
def __init__(self, *args, **kwargs):
super(Host, self).__init__(*args, **kwargs)
self._record_attributes(['status'])
@classmethod
def classify_labels(cls, label_names):
"""Split labels to static & non-static.
@label_names: a list of labels (string).
@returns: a list of StaticLabel objects & a list of
(non-static) Label objects.
"""
if not label_names:
return [], []
labels = Label.objects.filter(name__in=label_names)
if not RESPECT_STATIC_LABELS:
return [], labels
return cls.classify_label_objects(labels)
@classmethod
def classify_label_objects(cls, label_objects):
if not RESPECT_STATIC_LABELS:
return [], label_objects
replaced_labels = ReplacedLabel.objects.filter(label__in=label_objects)
replaced_ids = [l.label.id for l in replaced_labels]
non_static_labels = [
l for l in label_objects if not l.id in replaced_ids]
static_label_names = [
l.name for l in label_objects if l.id in replaced_ids]
static_labels = StaticLabel.objects.filter(name__in=static_label_names)
return static_labels, non_static_labels
@classmethod
def get_hosts_with_labels(cls, label_names, initial_query):
"""Get hosts by label filters.
@param label_names: label (string) lists for fetching hosts.
@param initial_query: a model_logic.QuerySet of Host object, e.g.
Host.objects.all(), Host.valid_objects.all().
This initial_query cannot be a sliced QuerySet, e.g.
Host.objects.all().filter(query_limit=10)
"""
if not label_names:
return initial_query
static_labels, non_static_labels = cls.classify_labels(label_names)
if len(static_labels) + len(non_static_labels) != len(label_names):
# Some labels don't exist in afe db, which means no hosts
# should be matched.
return set()
for l in static_labels:
initial_query = initial_query.filter(static_labels=l)
for l in non_static_labels:
initial_query = initial_query.filter(labels=l)
return initial_query
@classmethod
def get_hosts_with_label_ids(cls, label_ids, initial_query):
"""Get hosts by label_id filters.
@param label_ids: label id (int) lists for fetching hosts.
@param initial_query: a list of Host object, e.g.
[<Host: 100.107.151.253>, <Host: 100.107.151.251>, ...]
"""
labels = Label.objects.filter(id__in=label_ids)
label_names = [l.name for l in labels]
return cls.get_hosts_with_labels(label_names, initial_query)
@staticmethod
def create_one_time_host(hostname):
"""Creates a one-time host.
@param hostname: The name for the host.
"""
query = Host.objects.filter(hostname=hostname)
if query.count() == 0:
host = Host(hostname=hostname, invalid=True)
host.do_validate()
else:
host = query[0]
if not host.invalid:
raise model_logic.ValidationError({
'hostname' : '%s already exists in the autotest DB. '
'Select it rather than entering it as a one time '
'host.' % hostname
})
host.protection = host_protections.Protection.DO_NOT_REPAIR
host.locked = False
host.save()
host.clean_object()
return host
@classmethod
def _assign_to_shard_nothing_helper(cls):
"""Does nothing.
This method is called in the middle of assign_to_shard, and does
nothing. It exists to allow integration tests to simulate a race
condition."""
@classmethod
def assign_to_shard(cls, shard, known_ids):
"""Assigns hosts to a shard.
For all labels that have been assigned to a shard, all hosts that
have at least one of the shard's labels are assigned to the shard.
Hosts that are assigned to the shard but aren't already present on the
shard are returned.
Any boards that are in |known_ids| but that do not belong to the shard
are incorrect ids, which are also returned so that the shard can remove
them locally.
Board to shard mapping is many-to-one. Many different boards can be
hosted in a shard. However, DUTs of a single board cannot be distributed
into more than one shard.
@param shard: The shard object to assign labels/hosts for.
@param known_ids: List of all host-ids the shard already knows.
This is used to figure out which hosts should be sent
to the shard. If shard_ids were used instead, hosts
would only be transferred once, even if the client
failed persisting them.
The number of hosts usually lies in O(100), so the
overhead is acceptable.
@returns a tuple of (hosts objects that should be sent to the shard,
incorrect host ids that should not belong to]
shard)
"""
# Disclaimer: concurrent heartbeats should theoretically not occur in
# the current setup. As they may be introduced in the near future,
# this comment will be left here.
# Sending stuff twice is acceptable, but forgetting something isn't.
# Detecting duplicates on the client is easy, but here it's harder. The
# following options were considered:
# - SELECT ... WHERE and then UPDATE ... WHERE: Update might update more
# than select returned, as concurrently more hosts might have been
# inserted
# - UPDATE and then SELECT WHERE shard=shard: select always returns all
# hosts for the shard, this is overhead
# - SELECT and then UPDATE only selected without requerying afterwards:
# returns the old state of the records.
new_hosts = []
possible_new_host_ids = set(Host.objects.filter(
labels__in=shard.labels.all(),
leased=False
).exclude(
id__in=known_ids,
).values_list('pk', flat=True))
# No-op in production, used to simulate race condition in tests.
cls._assign_to_shard_nothing_helper()
if possible_new_host_ids:
Host.objects.filter(
pk__in=possible_new_host_ids,
labels__in=shard.labels.all(),
leased=False
).update(shard=shard)
new_hosts = list(Host.objects.filter(
pk__in=possible_new_host_ids,
shard=shard
).all())
invalid_host_ids = list(Host.objects.filter(
id__in=known_ids
).exclude(
shard=shard
).values_list('pk', flat=True))
return new_hosts, invalid_host_ids
def resurrect_object(self, old_object):
super(Host, self).resurrect_object(old_object)
# invalid hosts can be in use by the scheduler (as one-time hosts), so
# don't change the status
self.status = old_object.status
def clean_object(self):
self.aclgroup_set.clear()
self.labels.clear()
self.static_labels.clear()
def save(self, *args, **kwargs):
# extra spaces in the hostname can be a sneaky source of errors
self.hostname = self.hostname.strip()
# is this a new object being saved for the first time?
first_time = (self.id is None)
if not first_time:
AclGroup.check_for_acl_violation_hosts([self])
# If locked is changed, send its status and user made the change to
# metaDB. Locks are important in host history because if a device is
# locked then we don't really care what state it is in.
if self.locked and not self.locked_by:
self.locked_by = User.current_user()
if not self.lock_time:
self.lock_time = datetime.now()
self.dirty = True
elif not self.locked and self.locked_by:
self.locked_by = None
self.lock_time = None
super(Host, self).save(*args, **kwargs)
if first_time:
everyone = AclGroup.objects.get(name='Everyone')
everyone.hosts.add(self)
# remove attributes that may have lingered from an old host and
# should not be associated with a new host
for host_attribute in self.hostattribute_set.all():
self.delete_attribute(host_attribute.attribute)
self._check_for_updated_attributes()
def delete(self):
AclGroup.check_for_acl_violation_hosts([self])
logging.info('Preconditions for deleting host %s...', self.hostname)
for queue_entry in self.hostqueueentry_set.all():
logging.info(' Deleting and aborting hqe %s...', queue_entry)
queue_entry.deleted = True
queue_entry.abort()
logging.info(' ... done with hqe %s.', queue_entry)
for host_attribute in self.hostattribute_set.all():
logging.info(' Deleting attribute %s...', host_attribute)
self.delete_attribute(host_attribute.attribute)
logging.info(' ... done with attribute %s.', host_attribute)
logging.info('... preconditions done for host %s.', self.hostname)
logging.info('Deleting host %s...', self.hostname)
super(Host, self).delete()
logging.info('... done.')
def on_attribute_changed(self, attribute, old_value):
assert attribute == 'status'
logging.info('%s -> %s', self.hostname, self.status)
def enqueue_job(self, job, is_template=False):
"""Enqueue a job on this host.
@param job: A job to enqueue.
@param is_template: Whther the status should be "Template".
"""
queue_entry = HostQueueEntry.create(host=self, job=job,
is_template=is_template)
# allow recovery of dead hosts from the frontend
if not self.active_queue_entry() and self.is_dead():
self.status = Host.Status.READY
self.save()
queue_entry.save()
block = IneligibleHostQueue(job=job, host=self)
block.save()
def platform(self):
"""The platform of the host."""
# TODO(showard): slighly hacky?
platforms = self.labels.filter(platform=True)
if len(platforms) == 0:
return None
return platforms[0]
platform.short_description = 'Platform'
@classmethod
def check_no_platform(cls, hosts):
"""Verify the specified hosts have no associated platforms.
@param cls: Implicit class object.
@param hosts: The hosts to verify.
@raises model_logic.ValidationError if any hosts already have a
platform.
"""
Host.objects.populate_relationships(hosts, Label, 'label_list')
Host.objects.populate_relationships(hosts, StaticLabel,
'staticlabel_list')
errors = []
for host in hosts:
platforms = [label.name for label in host.label_list
if label.platform]
if RESPECT_STATIC_LABELS:
platforms += [label.name for label in host.staticlabel_list
if label.platform]
if platforms:
# do a join, just in case this host has multiple platforms,
# we'll be able to see it
errors.append('Host %s already has a platform: %s' % (
host.hostname, ', '.join(platforms)))
if errors:
raise model_logic.ValidationError({'labels': '; '.join(errors)})
@classmethod
def check_board_labels_allowed(cls, hosts, new_labels=[]):
"""Verify the specified hosts have valid board labels and the given
new board labels can be added.
@param cls: Implicit class object.
@param hosts: The hosts to verify.
@param new_labels: A list of labels to be added to the hosts.
@raises model_logic.ValidationError if any host has invalid board labels
or the given board labels cannot be added to the hsots.
"""
Host.objects.populate_relationships(hosts, Label, 'label_list')
Host.objects.populate_relationships(hosts, StaticLabel,
'staticlabel_list')
errors = []
for host in hosts:
boards = [label.name for label in host.label_list
if label.name.startswith('board:')]
if RESPECT_STATIC_LABELS:
boards += [label.name for label in host.staticlabel_list
if label.name.startswith('board:')]
new_boards = [name for name in new_labels
if name.startswith('board:')]
if len(boards) + len(new_boards) > 1:
# do a join, just in case this host has multiple boards,
# we'll be able to see it
errors.append('Host %s already has board labels: %s' % (
host.hostname, ', '.join(boards)))
if errors:
raise model_logic.ValidationError({'labels': '; '.join(errors)})
def is_dead(self):
"""Returns whether the host is dead (has status repair failed)."""
return self.status == Host.Status.REPAIR_FAILED
def active_queue_entry(self):
"""Returns the active queue entry for this host, or None if none."""
active = list(self.hostqueueentry_set.filter(active=True))
if not active:
return None
assert len(active) == 1, ('More than one active entry for '
'host ' + self.hostname)
return active[0]
def _get_attribute_model_and_args(self, attribute):
return HostAttribute, dict(host=self, attribute=attribute)
def _get_static_attribute_model_and_args(self, attribute):
return StaticHostAttribute, dict(host=self, attribute=attribute)
def _is_replaced_by_static_attribute(self, attribute):
if RESPECT_STATIC_ATTRIBUTES:
model, args = self._get_static_attribute_model_and_args(attribute)
try:
static_attr = model.objects.get(**args)
return True
except StaticHostAttribute.DoesNotExist:
return False
return False
@classmethod
def get_attribute_model(cls):
"""Return the attribute model.
Override method in parent class. See ModelExtensions for details.
@returns: The attribute model of Host.
"""
return HostAttribute
class Meta:
"""Metadata for the Host class."""
db_table = 'afe_hosts'
def __unicode__(self):
return unicode(self.hostname)
class HostAttribute(dbmodels.Model, model_logic.ModelExtensions):
"""Arbitrary keyvals associated with hosts."""
SERIALIZATION_LINKS_TO_KEEP = set(['host'])
SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['value'])
host = dbmodels.ForeignKey(Host)
attribute = dbmodels.CharField(max_length=90)
value = dbmodels.CharField(max_length=300)
objects = model_logic.ExtendedManager()
class Meta:
"""Metadata for the HostAttribute class."""
db_table = 'afe_host_attributes'
@classmethod
def get_record(cls, data):
"""Check the database for an identical record.
Use host_id and attribute to search for a existing record.
@raises: DoesNotExist, if no record found
@raises: MultipleObjectsReturned if multiple records found.
"""
# TODO(fdeng): We should use host_id and attribute together as
# a primary key in the db.
return cls.objects.get(host_id=data['host_id'],
attribute=data['attribute'])
@classmethod
def deserialize(cls, data):
"""Override deserialize in parent class.
Do not deserialize id as id is not kept consistent on main and shards.
@param data: A dictionary of data to deserialize.
@returns: A HostAttribute object.
"""
if data:
data.pop('id')
return super(HostAttribute, cls).deserialize(data)
class StaticHostAttribute(dbmodels.Model, model_logic.ModelExtensions):
"""Static arbitrary keyvals associated with hosts."""
SERIALIZATION_LINKS_TO_KEEP = set(['host'])
SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['value'])
host = dbmodels.ForeignKey(Host)
attribute = dbmodels.CharField(max_length=90)
value = dbmodels.CharField(max_length=300)
objects = model_logic.ExtendedManager()
class Meta:
"""Metadata for the StaticHostAttribute class."""
db_table = 'afe_static_host_attributes'
@classmethod
def get_record(cls, data):
"""Check the database for an identical record.
Use host_id and attribute to search for a existing record.
@raises: DoesNotExist, if no record found
@raises: MultipleObjectsReturned if multiple records found.
"""
return cls.objects.get(host_id=data['host_id'],
attribute=data['attribute'])
@classmethod
def deserialize(cls, data):
"""Override deserialize in parent class.
Do not deserialize id as id is not kept consistent on main and shards.
@param data: A dictionary of data to deserialize.
@returns: A StaticHostAttribute object.
"""
if data:
data.pop('id')
return super(StaticHostAttribute, cls).deserialize(data)
class Test(dbmodels.Model, model_logic.ModelExtensions):
"""\
Required:
author: author name
description: description of the test
name: test name
time: short, medium, long
test_class: This describes the class for your the test belongs in.
test_category: This describes the category for your tests
test_type: Client or Server
path: path to pass to run_test()
sync_count: is a number >=1 (1 being the default). If it's 1, then it's an
async job. If it's >1 it's sync job for that number of machines
i.e. if sync_count = 2 it is a sync job that requires two
machines.
Optional:
dependencies: What the test requires to run. Comma deliminated list
dependency_labels: many-to-many relationship with labels corresponding to
test dependencies.
experimental: If this is set to True production servers will ignore the test
run_verify: Whether or not the scheduler should run the verify stage
run_reset: Whether or not the scheduler should run the reset stage
test_retry: Number of times to retry test if the test did not complete
successfully. (optional, default: 0)
"""
TestTime = autotest_enum.AutotestEnum('SHORT', 'MEDIUM', 'LONG',
start_value=1)
name = dbmodels.CharField(max_length=255, unique=True)
author = dbmodels.CharField(max_length=255)
test_class = dbmodels.CharField(max_length=255)
test_category = dbmodels.CharField(max_length=255)
dependencies = dbmodels.CharField(max_length=255, blank=True)
description = dbmodels.TextField(blank=True)
experimental = dbmodels.BooleanField(default=True)
run_verify = dbmodels.BooleanField(default=False)
test_time = dbmodels.SmallIntegerField(choices=TestTime.choices(),
default=TestTime.MEDIUM)
test_type = dbmodels.SmallIntegerField(
choices=control_data.CONTROL_TYPE.choices())
sync_count = dbmodels.IntegerField(default=1)
path = dbmodels.CharField(max_length=255, unique=True)
test_retry = dbmodels.IntegerField(blank=True, default=0)
run_reset = dbmodels.BooleanField(default=True)
dependency_labels = (
dbmodels.ManyToManyField(Label, blank=True,
db_table='afe_autotests_dependency_labels'))
name_field = 'name'
objects = model_logic.ExtendedManager()
def admin_description(self):
"""Returns a string representing the admin description."""
escaped_description = saxutils.escape(self.description)
return '<span style="white-space:pre">%s</span>' % escaped_description
admin_description.allow_tags = True
admin_description.short_description = 'Description'
class Meta:
"""Metadata for class Test."""
db_table = 'afe_autotests'
def __unicode__(self):
return unicode(self.name)
class TestParameter(dbmodels.Model):
"""
A declared parameter of a test
"""
test = dbmodels.ForeignKey(Test)
name = dbmodels.CharField(max_length=255)
class Meta:
"""Metadata for class TestParameter."""
db_table = 'afe_test_parameters'
unique_together = ('test', 'name')
def __unicode__(self):
return u'%s (%s)' % (self.name, self.test.name)
class Profiler(dbmodels.Model, model_logic.ModelExtensions):
"""\
Required:
name: profiler name
test_type: Client or Server
Optional:
description: arbirary text description
"""
name = dbmodels.CharField(max_length=255, unique=True)
description = dbmodels.TextField(blank=True)
name_field = 'name'
objects = model_logic.ExtendedManager()
class Meta:
"""Metadata for class Profiler."""
db_table = 'afe_profilers'
def __unicode__(self):
return unicode(self.name)
class AclGroup(dbmodels.Model, model_logic.ModelExtensions):
"""\
Required:
name: name of ACL group
Optional:
description: arbitrary description of group
"""
SERIALIZATION_LINKS_TO_FOLLOW = set(['users'])
name = dbmodels.CharField(max_length=255, unique=True)
description = dbmodels.CharField(max_length=255, blank=True)
users = dbmodels.ManyToManyField(User, blank=False,
db_table='afe_acl_groups_users')
hosts = dbmodels.ManyToManyField(Host, blank=True,
db_table='afe_acl_groups_hosts')
name_field = 'name'
objects = model_logic.ExtendedManager()
@staticmethod
def check_for_acl_violation_hosts(hosts):
"""Verify the current user has access to the specified hosts.
@param hosts: The hosts to verify against.
@raises AclAccessViolation if the current user doesn't have access
to a host.
"""
user = User.current_user()
if user.is_superuser():
return
accessible_host_ids = set(
host.id for host in Host.objects.filter(aclgroup__users=user))
for host in hosts:
# Check if the user has access to this host,
# but only if it is not a metahost or a one-time-host.
no_access = (isinstance(host, Host)
and not host.invalid
and int(host.id) not in accessible_host_ids)
if no_access:
raise AclAccessViolation("%s does not have access to %s" %
(str(user), str(host)))
@staticmethod
def check_abort_permissions(queue_entries):
"""Look for queue entries that aren't abortable by the current user.
An entry is not abortable if:
* the job isn't owned by this user, and
* the machine isn't ACL-accessible, or
* the machine is in the "Everyone" ACL
@param queue_entries: The queue entries to check.
@raises AclAccessViolation if a queue entry is not abortable by the
current user.
"""
user = User.current_user()
if user.is_superuser():
return
not_owned = queue_entries.exclude(job__owner=user.login)
# I do this using ID sets instead of just Django filters because
# filtering on M2M dbmodels is broken in Django 0.96. It's better in
# 1.0.
# TODO: Use Django filters, now that we're using 1.0.
accessible_ids = set(
entry.id for entry
in not_owned.filter(host__aclgroup__users__login=user.login))
public_ids = set(entry.id for entry
in not_owned.filter(host__aclgroup__name='Everyone'))
cannot_abort = [entry for entry in not_owned.select_related()
if entry.id not in accessible_ids
or entry.id in public_ids]
if len(cannot_abort) == 0:
return
entry_names = ', '.join('%s-%s/%s' % (entry.job.id, entry.job.owner,
entry.host_or_metahost_name())
for entry in cannot_abort)
raise AclAccessViolation('You cannot abort the following job entries: '
+ entry_names)
def check_for_acl_violation_acl_group(self):
"""Verifies the current user has acces to this ACL group.
@raises AclAccessViolation if the current user doesn't have access to
this ACL group.
"""
user = User.current_user()
if user.is_superuser():
return
if self.name == 'Everyone':
raise AclAccessViolation("You cannot modify 'Everyone'!")
if not user in self.users.all():
raise AclAccessViolation("You do not have access to %s"
% self.name)
@staticmethod
def on_host_membership_change():
"""Invoked when host membership changes."""
everyone = AclGroup.objects.get(name='Everyone')
# find hosts that aren't in any ACL group and add them to Everyone
# TODO(showard): this is a bit of a hack, since the fact that this query
# works is kind of a coincidence of Django internals. This trick
# doesn't work in general (on all foreign key relationships). I'll
# replace it with a better technique when the need arises.
orphaned_hosts = Host.valid_objects.filter(aclgroup__id__isnull=True)
everyone.hosts.add(*orphaned_hosts.distinct())
# find hosts in both Everyone and another ACL group, and remove them
# from Everyone
hosts_in_everyone = Host.valid_objects.filter(aclgroup__name='Everyone')
acled_hosts = set()
for host in hosts_in_everyone:
# Has an ACL group other than Everyone
if host.aclgroup_set.count() > 1:
acled_hosts.add(host)
everyone.hosts.remove(*acled_hosts)
def delete(self):
if (self.name == 'Everyone'):
raise AclAccessViolation("You cannot delete 'Everyone'!")
self.check_for_acl_violation_acl_group()
super(AclGroup, self).delete()
self.on_host_membership_change()
def add_current_user_if_empty(self):
"""Adds the current user if the set of users is empty."""
if not self.users.count():
self.users.add(User.current_user())
def perform_after_save(self, change):
"""Called after a save.
@param change: Whether there was a change.
"""
if not change:
self.users.add(User.current_user())
self.add_current_user_if_empty()
self.on_host_membership_change()
def save(self, *args, **kwargs):
change = bool(self.id)
if change:
# Check the original object for an ACL violation
AclGroup.objects.get(id=self.id).check_for_acl_violation_acl_group()
super(AclGroup, self).save(*args, **kwargs)
self.perform_after_save(change)
class Meta:
"""Metadata for class AclGroup."""
db_table = 'afe_acl_groups'
def __unicode__(self):
return unicode(self.name)
class ParameterizedJob(dbmodels.Model):
"""
Auxiliary configuration for a parameterized job.
This class is obsolete, and ought to be dead. Due to a series of
unfortunate events, it can't be deleted:
* In `class Job` we're required to keep a reference to this class
for the sake of the scheduler unit tests.
* The existence of the reference in `Job` means that certain
methods here will get called from the `get_jobs` RPC.
So, the definitions below seem to be the minimum stub we can support
unless/until we change the database schema.
"""
@classmethod
def smart_get(cls, id_or_name, *args, **kwargs):
"""For compatibility with Job.add_object.
@param cls: Implicit class object.
@param id_or_name: The ID or name to get.
@param args: Non-keyword arguments.
@param kwargs: Keyword arguments.
"""
return cls.objects.get(pk=id_or_name)
def job(self):
"""Returns the job if it exists, or else None."""
jobs = self.job_set.all()
assert jobs.count() <= 1
return jobs and jobs[0] or None
class Meta:
"""Metadata for class ParameterizedJob."""
db_table = 'afe_parameterized_jobs'
def __unicode__(self):
return u'%s (parameterized) - %s' % (self.test.name, self.job())
class JobManager(model_logic.ExtendedManager):
'Custom manager to provide efficient status counts querying.'
def get_status_counts(self, job_ids):
"""Returns a dict mapping the given job IDs to their status count dicts.
@param job_ids: A list of job IDs.
"""
if not job_ids:
return {}
id_list = '(%s)' % ','.join(str(job_id) for job_id in job_ids)
cursor = connection.cursor()
cursor.execute("""
SELECT job_id, status, aborted, complete, COUNT(*)
FROM afe_host_queue_entries
WHERE job_id IN %s
GROUP BY job_id, status, aborted, complete
""" % id_list)
all_job_counts = dict((job_id, {}) for job_id in job_ids)
for job_id, status, aborted, complete, count in cursor.fetchall():
job_dict = all_job_counts[job_id]
full_status = HostQueueEntry.compute_full_status(status, aborted,
complete)
job_dict.setdefault(full_status, 0)
job_dict[full_status] += count
return all_job_counts
class Job(dbmodels.Model, model_logic.ModelExtensions):
"""\
owner: username of job owner
name: job name (does not have to be unique)
priority: Integer priority value. Higher is more important.
control_file: contents of control file
control_type: Client or Server
created_on: date of job creation
submitted_on: date of job submission
synch_count: how many hosts should be used per autoserv execution
run_verify: Whether or not to run the verify phase
run_reset: Whether or not to run the reset phase
timeout: DEPRECATED - hours from queuing time until job times out
timeout_mins: minutes from job queuing time until the job times out
max_runtime_hrs: DEPRECATED - hours from job starting time until job
times out
max_runtime_mins: minutes from job starting time until job times out
email_list: list of people to email on completion delimited by any of:
white space, ',', ':', ';'
dependency_labels: many-to-many relationship with labels corresponding to
job dependencies
reboot_before: Never, If dirty, or Always
reboot_after: Never, If all tests passed, or Always
parse_failed_repair: if True, a failed repair launched by this job will have
its results parsed as part of the job.
drone_set: The set of drones to run this job on
parent_job: Parent job (optional)
test_retry: Number of times to retry test if the test did not complete
successfully. (optional, default: 0)
require_ssp: Require server-side packaging unless require_ssp is set to
False. (optional, default: None)
"""
# TODO: Investigate, if jobkeyval_set is really needed.
# dynamic_suite will write them into an attached file for the drone, but
# it doesn't seem like they are actually used. If they aren't used, remove
# jobkeyval_set here.
SERIALIZATION_LINKS_TO_FOLLOW = set(['dependency_labels',
'hostqueueentry_set',
'jobkeyval_set',
'shard'])
EXCLUDE_KNOWN_JOBS_CLAUSE = '''
AND NOT (afe_host_queue_entries.aborted = 0
AND afe_jobs.id IN (%(known_ids)s))
'''
EXCLUDE_OLD_JOBS_CLAUSE = 'AND (afe_jobs.created_on > "%(cutoff)s")'
SQL_SHARD_JOBS = '''
SELECT DISTINCT(afe_jobs.id) FROM afe_jobs
INNER JOIN afe_host_queue_entries
ON (afe_jobs.id = afe_host_queue_entries.job_id)
LEFT OUTER JOIN afe_jobs_dependency_labels
ON (afe_jobs.id = afe_jobs_dependency_labels.job_id)
JOIN afe_shards_labels
ON (afe_shards_labels.label_id = afe_jobs_dependency_labels.label_id
OR afe_shards_labels.label_id = afe_host_queue_entries.meta_host)
WHERE (afe_shards_labels.shard_id = %(shard_id)s
AND afe_host_queue_entries.complete != 1
AND afe_host_queue_entries.active != 1
%(exclude_known_jobs)s
%(exclude_old_jobs)s)
'''
# Jobs can be created with assigned hosts and have no dependency
# labels nor meta_host.
# We are looking for:
# - a job whose hqe's meta_host is null
# - a job whose hqe has a host
# - one of the host's labels matches the shard's label.
# Non-aborted known jobs, completed jobs, active jobs, jobs
# without hqe are exluded as we do with SQL_SHARD_JOBS.
SQL_SHARD_JOBS_WITH_HOSTS = '''
SELECT DISTINCT(afe_jobs.id) FROM afe_jobs
INNER JOIN afe_host_queue_entries
ON (afe_jobs.id = afe_host_queue_entries.job_id)
LEFT OUTER JOIN %(host_label_table)s
ON (afe_host_queue_entries.host_id = %(host_label_table)s.host_id)
WHERE (%(host_label_table)s.%(host_label_column)s IN %(label_ids)s
AND afe_host_queue_entries.complete != 1
AND afe_host_queue_entries.active != 1
AND afe_host_queue_entries.meta_host IS NULL
AND afe_host_queue_entries.host_id IS NOT NULL
%(exclude_known_jobs)s
%(exclude_old_jobs)s)
'''
# Even if we had filters about complete, active and aborted
# bits in the above two SQLs, there is a chance that
# the result may still contain a job with an hqe with 'complete=1'
# or 'active=1'.'
# This happens when a job has two (or more) hqes and at least
# one hqe has different bits than others.
# We use a second sql to ensure we exclude all un-desired jobs.
SQL_JOBS_TO_EXCLUDE = '''
SELECT afe_jobs.id FROM afe_jobs
INNER JOIN afe_host_queue_entries
ON (afe_jobs.id = afe_host_queue_entries.job_id)
WHERE (afe_jobs.id in (%(candidates)s)
AND (afe_host_queue_entries.complete=1
OR afe_host_queue_entries.active=1))
'''
def _deserialize_relation(self, link, data):
if link in ['hostqueueentry_set', 'jobkeyval_set']:
for obj in data:
obj['job_id'] = self.id
super(Job, self)._deserialize_relation(link, data)
def custom_deserialize_relation(self, link, data):
assert link == 'shard', 'Link %s should not be deserialized' % link
self.shard = Shard.deserialize(data)
def sanity_check_update_from_shard(self, shard, updated_serialized):
# If the job got aborted on the main after the client fetched it
# no shard_id will be set. The shard might still push updates though,
# as the job might complete before the abort bit syncs to the shard.
# Alternative considered: The main scheduler could be changed to not
# set aborted jobs to completed that are sharded out. But that would
# require database queries and seemed more complicated to implement.
# This seems safe to do, as there won't be updates pushed from the wrong
# shards should be powered off and wiped hen they are removed from the
# main.
if self.shard_id and self.shard_id != shard.id:
raise error.IgnorableUnallowedRecordsSentToMain(
'Job id=%s is assigned to shard (%s). Cannot update it with %s '
'from shard %s.' % (self.id, self.shard_id, updated_serialized,
shard.id))
RebootBefore = model_attributes.RebootBefore
RebootAfter = model_attributes.RebootAfter
# TIMEOUT is deprecated.
DEFAULT_TIMEOUT = global_config.global_config.get_config_value(
'AUTOTEST_WEB', 'job_timeout_default', default=24)
DEFAULT_TIMEOUT_MINS = global_config.global_config.get_config_value(
'AUTOTEST_WEB', 'job_timeout_mins_default', default=24*60)
# MAX_RUNTIME_HRS is deprecated. Will be removed after switch to mins is
# completed.
DEFAULT_MAX_RUNTIME_HRS = global_config.global_config.get_config_value(
'AUTOTEST_WEB', 'job_max_runtime_hrs_default', default=72)
DEFAULT_MAX_RUNTIME_MINS = global_config.global_config.get_config_value(
'AUTOTEST_WEB', 'job_max_runtime_mins_default', default=72*60)
DEFAULT_PARSE_FAILED_REPAIR = global_config.global_config.get_config_value(
'AUTOTEST_WEB', 'parse_failed_repair_default', type=bool, default=False)
FETCH_READONLY_JOBS = global_config.global_config.get_config_value(
'AUTOTEST_WEB','readonly_heartbeat', type=bool, default=False)
SKIP_JOBS_CREATED_BEFORE = global_config.global_config.get_config_value(
'SHARD', 'skip_jobs_created_before', type=int, default=0)
owner = dbmodels.CharField(max_length=255)
name = dbmodels.CharField(max_length=255)
priority = dbmodels.SmallIntegerField(default=priorities.Priority.DEFAULT)
control_file = dbmodels.TextField(null=True, blank=True)
control_type = dbmodels.SmallIntegerField(
choices=control_data.CONTROL_TYPE.choices(),
blank=True, # to allow 0
default=control_data.CONTROL_TYPE.CLIENT)
created_on = dbmodels.DateTimeField()
synch_count = dbmodels.IntegerField(blank=True, default=0)
timeout = dbmodels.IntegerField(default=DEFAULT_TIMEOUT)
run_verify = dbmodels.BooleanField(default=False)
email_list = dbmodels.CharField(max_length=250, blank=True)
dependency_labels = (
dbmodels.ManyToManyField(Label, blank=True,
db_table='afe_jobs_dependency_labels'))
reboot_before = dbmodels.SmallIntegerField(
choices=model_attributes.RebootBefore.choices(), blank=True,
default=DEFAULT_REBOOT_BEFORE)
reboot_after = dbmodels.SmallIntegerField(
choices=model_attributes.RebootAfter.choices(), blank=True,
default=DEFAULT_REBOOT_AFTER)
parse_failed_repair = dbmodels.BooleanField(
default=DEFAULT_PARSE_FAILED_REPAIR)
# max_runtime_hrs is deprecated. Will be removed after switch to mins is
# completed.
max_runtime_hrs = dbmodels.IntegerField(default=DEFAULT_MAX_RUNTIME_HRS)
max_runtime_mins = dbmodels.IntegerField(default=DEFAULT_MAX_RUNTIME_MINS)
drone_set = dbmodels.ForeignKey(DroneSet, null=True, blank=True)
# TODO(jrbarnette) We have to keep `parameterized_job` around or it
# breaks the scheduler_models unit tests (and fixing the unit tests
# will break the scheduler, so don't do that).
#
# The ultimate fix is to delete the column from the database table
# at which point, you _must_ delete this. Until you're ready to do
# that, DON'T MUCK WITH IT.
parameterized_job = dbmodels.ForeignKey(ParameterizedJob, null=True,
blank=True)
parent_job = dbmodels.ForeignKey('self', blank=True, null=True)
test_retry = dbmodels.IntegerField(blank=True, default=0)
run_reset = dbmodels.BooleanField(default=True)
timeout_mins = dbmodels.IntegerField(default=DEFAULT_TIMEOUT_MINS)
# If this is None on the main, a shard should be found.
# If this is None on a shard, it should be synced back to the main
shard = dbmodels.ForeignKey(Shard, blank=True, null=True)
# If this is None, server-side packaging will be used for server side test.
require_ssp = dbmodels.NullBooleanField(default=None, blank=True, null=True)
# custom manager
objects = JobManager()
@decorators.cached_property
def labels(self):
"""All the labels of this job"""
# We need to convert dependency_labels to a list, because all() gives us
# back an iterator, and storing/caching an iterator means we'd only be
# able to read from it once.
return list(self.dependency_labels.all())
def is_server_job(self):
"""Returns whether this job is of type server."""
return self.control_type == control_data.CONTROL_TYPE.SERVER
@classmethod
def create(cls, owner, options, hosts):
"""Creates a job.
The job is created by taking some information (the listed args) and
filling in the rest of the necessary information.
@param cls: Implicit class object.
@param owner: The owner for the job.
@param options: An options object.
@param hosts: The hosts to use.
"""
AclGroup.check_for_acl_violation_hosts(hosts)
control_file = options.get('control_file')
user = User.current_user()
if options.get('reboot_before') is None:
options['reboot_before'] = user.get_reboot_before_display()
if options.get('reboot_after') is None:
options['reboot_after'] = user.get_reboot_after_display()
drone_set = DroneSet.resolve_name(options.get('drone_set'))
if options.get('timeout_mins') is None and options.get('timeout'):
options['timeout_mins'] = options['timeout'] * 60
job = cls.add_object(
owner=owner,
name=options['name'],
priority=options['priority'],
control_file=control_file,
control_type=options['control_type'],
synch_count=options.get('synch_count'),
# timeout needs to be deleted in the future.
timeout=options.get('timeout'),
timeout_mins=options.get('timeout_mins'),
max_runtime_mins=options.get('max_runtime_mins'),
run_verify=options.get('run_verify'),
email_list=options.get('email_list'),
reboot_before=options.get('reboot_before'),
reboot_after=options.get('reboot_after'),
parse_failed_repair=options.get('parse_failed_repair'),
created_on=datetime.now(),
drone_set=drone_set,
parent_job=options.get('parent_job_id'),
test_retry=options.get('test_retry'),
run_reset=options.get('run_reset'),
require_ssp=options.get('require_ssp'))
job.dependency_labels = options['dependencies']
if options.get('keyvals'):
for key, value in options['keyvals'].iteritems():
# None (or NULL) is not acceptable by DB, so change it to an
# empty string in case.
JobKeyval.objects.create(job=job, key=key,
value='' if value is None else value)
return job
@classmethod
def assign_to_shard(cls, shard, known_ids):
"""Assigns unassigned jobs to a shard.
For all labels that have been assigned to this shard, all jobs that
have this label are assigned to this shard.
@param shard: The shard to assign jobs to.
@param known_ids: List of all ids of incomplete jobs the shard already
knows about.
@returns The job objects that should be sent to the shard.
"""
with cls._readonly_job_query_context():
job_ids = cls._get_new_jobs_for_shard(shard, known_ids)
if not job_ids:
return []
cls._assign_jobs_to_shard(job_ids, shard)
return cls._jobs_with_ids(job_ids)
@classmethod
@contextlib.contextmanager
def _readonly_job_query_context(cls):
#TODO: Get rid of this kludge if/when we update Django to >=1.7
#correct usage would be .raw(..., using='readonly')
old_db = Job.objects._db
try:
if cls.FETCH_READONLY_JOBS:
Job.objects._db = 'readonly'
yield
finally:
Job.objects._db = old_db
@classmethod
def _assign_jobs_to_shard(cls, job_ids, shard):
Job.objects.filter(pk__in=job_ids).update(shard=shard)
@classmethod
def _jobs_with_ids(cls, job_ids):
return list(Job.objects.filter(pk__in=job_ids).all())
@classmethod
def _get_new_jobs_for_shard(cls, shard, known_ids):
job_ids = cls._get_jobs_without_hosts(shard, known_ids)
job_ids |= cls._get_jobs_with_hosts(shard, known_ids)
if job_ids:
job_ids -= cls._filter_finished_jobs(job_ids)
return job_ids
@classmethod
def _filter_finished_jobs(cls, job_ids):
query = Job.objects.raw(
cls.SQL_JOBS_TO_EXCLUDE %
{'candidates': ','.join([str(i) for i in job_ids])})
return set([j.id for j in query])
@classmethod
def _get_jobs_without_hosts(cls, shard, known_ids):
raw_sql = cls.SQL_SHARD_JOBS % {
'exclude_known_jobs': cls._exclude_known_jobs_clause(known_ids),
'exclude_old_jobs': cls._exclude_old_jobs_clause(),
'shard_id': shard.id
}
return set([j.id for j in Job.objects.raw(raw_sql)])
@classmethod
def _get_jobs_with_hosts(cls, shard, known_ids):
job_ids = set([])
static_labels, non_static_labels = Host.classify_label_objects(
shard.labels.all())
if static_labels:
label_ids = [str(l.id) for l in static_labels]
query = Job.objects.raw(cls.SQL_SHARD_JOBS_WITH_HOSTS % {
'exclude_known_jobs': cls._exclude_known_jobs_clause(known_ids),
'exclude_old_jobs': cls._exclude_old_jobs_clause(),
'host_label_table': 'afe_static_hosts_labels',
'host_label_column': 'staticlabel_id',
'label_ids': '(%s)' % ','.join(label_ids)})
job_ids |= set([j.id for j in query])
if non_static_labels:
label_ids = [str(l.id) for l in non_static_labels]
query = Job.objects.raw(cls.SQL_SHARD_JOBS_WITH_HOSTS % {
'exclude_known_jobs': cls._exclude_known_jobs_clause(known_ids),
'exclude_old_jobs': cls._exclude_old_jobs_clause(),
'host_label_table': 'afe_hosts_labels',
'host_label_column': 'label_id',
'label_ids': '(%s)' % ','.join(label_ids)})
job_ids |= set([j.id for j in query])
return job_ids
@classmethod
def _exclude_known_jobs_clause(cls, known_ids):
if not known_ids:
return ''
return (cls.EXCLUDE_KNOWN_JOBS_CLAUSE %
{'known_ids': ','.join([str(i) for i in known_ids])})
@classmethod
def _exclude_old_jobs_clause(cls):
"""Filter queried jobs to be created within a few hours in the past.
With this clause, any jobs older than a configurable number of hours are
skipped in the jobs query.
The job creation window affects the overall query performance. Longer
creation windows require a range query over more Job table rows using
the created_on column index. c.f. http://crbug.com/966872#c35
"""
if cls.SKIP_JOBS_CREATED_BEFORE <= 0:
return ''
cutoff = datetime.now()- timedelta(hours=cls.SKIP_JOBS_CREATED_BEFORE)
return (cls.EXCLUDE_OLD_JOBS_CLAUSE %
{'cutoff': cutoff.strftime('%Y-%m-%d %H:%M:%S')})
def queue(self, hosts, is_template=False):
"""Enqueue a job on the given hosts.
@param hosts: The hosts to use.
@param is_template: Whether the status should be "Template".
"""
if not hosts:
# hostless job
entry = HostQueueEntry.create(job=self, is_template=is_template)
entry.save()
return
for host in hosts:
host.enqueue_job(self, is_template=is_template)
def user(self):
"""Gets the user of this job, or None if it doesn't exist."""
try:
return User.objects.get(login=self.owner)
except self.DoesNotExist:
return None
def abort(self):
"""Aborts this job."""
for queue_entry in self.hostqueueentry_set.all():
queue_entry.abort()
def tag(self):
"""Returns a string tag for this job."""
return server_utils.get_job_tag(self.id, self.owner)
def keyval_dict(self):
"""Returns all keyvals for this job as a dictionary."""
return dict((keyval.key, keyval.value)
for keyval in self.jobkeyval_set.all())
@classmethod
def get_attribute_model(cls):
"""Return the attribute model.
Override method in parent class. This class is called when
deserializing the one-to-many relationship betwen Job and JobKeyval.
On deserialization, we will try to clear any existing job keyvals
associated with a job to avoid any inconsistency.
Though Job doesn't implement ModelWithAttribute, we still treat
it as an attribute model for this purpose.
@returns: The attribute model of Job.
"""
return JobKeyval
class Meta:
"""Metadata for class Job."""
db_table = 'afe_jobs'
def __unicode__(self):
return u'%s (%s-%s)' % (self.name, self.id, self.owner)
class JobHandoff(dbmodels.Model, model_logic.ModelExtensions):
"""Jobs that have been handed off to lucifer."""
job = dbmodels.OneToOneField(Job, on_delete=dbmodels.CASCADE,
primary_key=True)
created = dbmodels.DateTimeField(auto_now_add=True)
completed = dbmodels.BooleanField(default=False)
drone = dbmodels.CharField(
max_length=128, null=True,
help_text='''
The hostname of the drone the job is running on and whose job_aborter
should be responsible for aborting the job if the job process dies.
NULL means any drone's job_aborter has free reign to abort the job.
''')
class Meta:
"""Metadata for class Job."""
db_table = 'afe_job_handoffs'
class JobKeyval(dbmodels.Model, model_logic.ModelExtensions):
"""Keyvals associated with jobs"""
SERIALIZATION_LINKS_TO_KEEP = set(['job'])
SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['value'])
job = dbmodels.ForeignKey(Job)
key = dbmodels.CharField(max_length=90)
value = dbmodels.CharField(max_length=300)
objects = model_logic.ExtendedManager()
@classmethod
def get_record(cls, data):
"""Check the database for an identical record.
Use job_id and key to search for a existing record.
@raises: DoesNotExist, if no record found
@raises: MultipleObjectsReturned if multiple records found.
"""
# TODO(fdeng): We should use job_id and key together as
# a primary key in the db.
return cls.objects.get(job_id=data['job_id'], key=data['key'])
@classmethod
def deserialize(cls, data):
"""Override deserialize in parent class.
Do not deserialize id as id is not kept consistent on main and shards.
@param data: A dictionary of data to deserialize.
@returns: A JobKeyval object.
"""
if data:
data.pop('id')
return super(JobKeyval, cls).deserialize(data)
class Meta:
"""Metadata for class JobKeyval."""
db_table = 'afe_job_keyvals'
class IneligibleHostQueue(dbmodels.Model, model_logic.ModelExtensions):
"""Represents an ineligible host queue."""
job = dbmodels.ForeignKey(Job)
host = dbmodels.ForeignKey(Host)
objects = model_logic.ExtendedManager()
class Meta:
"""Metadata for class IneligibleHostQueue."""
db_table = 'afe_ineligible_host_queues'
class HostQueueEntry(dbmodels.Model, model_logic.ModelExtensions):
"""Represents a host queue entry."""
SERIALIZATION_LINKS_TO_FOLLOW = set(['meta_host'])
SERIALIZATION_LINKS_TO_KEEP = set(['host'])
SERIALIZATION_LOCAL_LINKS_TO_UPDATE = set(['aborted'])
def custom_deserialize_relation(self, link, data):
assert link == 'meta_host'
self.meta_host = Label.deserialize(data)
def sanity_check_update_from_shard(self, shard, updated_serialized,
job_ids_sent):
if self.job_id not in job_ids_sent:
raise error.IgnorableUnallowedRecordsSentToMain(
'Sent HostQueueEntry without corresponding '
'job entry: %s' % updated_serialized)
Status = host_queue_entry_states.Status
ACTIVE_STATUSES = host_queue_entry_states.ACTIVE_STATUSES
COMPLETE_STATUSES = host_queue_entry_states.COMPLETE_STATUSES
PRE_JOB_STATUSES = host_queue_entry_states.PRE_JOB_STATUSES
IDLE_PRE_JOB_STATUSES = host_queue_entry_states.IDLE_PRE_JOB_STATUSES
job = dbmodels.ForeignKey(Job)
host = dbmodels.ForeignKey(Host, blank=True, null=True)
status = dbmodels.CharField(max_length=255)
meta_host = dbmodels.ForeignKey(Label, blank=True, null=True,
db_column='meta_host')
active = dbmodels.BooleanField(default=False)
complete = dbmodels.BooleanField(default=False)
deleted = dbmodels.BooleanField(default=False)
execution_subdir = dbmodels.CharField(max_length=255, blank=True,
default='')
# If atomic_group is set, this is a virtual HostQueueEntry that will
# be expanded into many actual hosts within the group at schedule time.
atomic_group = dbmodels.ForeignKey(AtomicGroup, blank=True, null=True)
aborted = dbmodels.BooleanField(default=False)
started_on = dbmodels.DateTimeField(null=True, blank=True)
finished_on = dbmodels.DateTimeField(null=True, blank=True)
objects = model_logic.ExtendedManager()
def __init__(self, *args, **kwargs):
super(HostQueueEntry, self).__init__(*args, **kwargs)
self._record_attributes(['status'])
@classmethod
def create(cls, job, host=None, meta_host=None,
is_template=False):
"""Creates a new host queue entry.
@param cls: Implicit class object.
@param job: The associated job.
@param host: The associated host.
@param meta_host: The associated meta host.
@param is_template: Whether the status should be "Template".
"""
if is_template:
status = cls.Status.TEMPLATE
else:
status = cls.Status.QUEUED
return cls(job=job, host=host, meta_host=meta_host, status=status)
def save(self, *args, **kwargs):
self._set_active_and_complete()
super(HostQueueEntry, self).save(*args, **kwargs)
self._check_for_updated_attributes()
def execution_path(self):
"""
Path to this entry's results (relative to the base results directory).
"""
return server_utils.get_hqe_exec_path(self.job.tag(),
self.execution_subdir)
def host_or_metahost_name(self):
"""Returns the first non-None name found in priority order.
The priority order checked is: (1) host name; (2) meta host name
"""
if self.host:
return self.host.hostname
else:
assert self.meta_host
return self.meta_host.name
def _set_active_and_complete(self):
if self.status in self.ACTIVE_STATUSES:
self.active, self.complete = True, False
elif self.status in self.COMPLETE_STATUSES:
self.active, self.complete = False, True
else:
self.active, self.complete = False, False
def on_attribute_changed(self, attribute, old_value):
assert attribute == 'status'
logging.info('%s/%d (%d) -> %s', self.host, self.job.id, self.id,
self.status)
def is_meta_host_entry(self):
'True if this is a entry has a meta_host instead of a host.'
return self.host is None and self.meta_host is not None
# This code is shared between rpc_interface and models.HostQueueEntry.
# Sadly due to circular imports between the 2 (crbug.com/230100) making it
# a class method was the best way to refactor it. Attempting to put it in
# rpc_utils or a new utils module failed as that would require us to import
# models.py but to call it from here we would have to import the utils.py
# thus creating a cycle.
@classmethod
def abort_host_queue_entries(cls, host_queue_entries):
"""Aborts a collection of host_queue_entries.
Abort these host queue entry and all host queue entries of jobs created
by them.
@param host_queue_entries: List of host queue entries we want to abort.
"""
# This isn't completely immune to race conditions since it's not atomic,
# but it should be safe given the scheduler's behavior.
# TODO(milleral): crbug.com/230100
# The |abort_host_queue_entries| rpc does nearly exactly this,
# however, trying to re-use the code generates some horrible
# circular import error. I'd be nice to refactor things around
# sometime so the code could be reused.
# Fixpoint algorithm to find the whole tree of HQEs to abort to
# minimize the total number of database queries:
children = set()
new_children = set(host_queue_entries)
while new_children:
children.update(new_children)
new_child_ids = [hqe.job_id for hqe in new_children]
new_children = HostQueueEntry.objects.filter(
job__parent_job__in=new_child_ids,
complete=False, aborted=False).all()
# To handle circular parental relationships
new_children = set(new_children) - children
# Associate a user with the host queue entries that we're about
# to abort so that we can look up who to blame for the aborts.
child_ids = [hqe.id for hqe in children]
# Get a list of hqe ids that already exists, so we can exclude them when
# we do bulk_create later to avoid IntegrityError.
existing_hqe_ids = set(AbortedHostQueueEntry.objects.
filter(queue_entry_id__in=child_ids).
values_list('queue_entry_id', flat=True))
now = datetime.now()
user = User.current_user()
aborted_hqes = [AbortedHostQueueEntry(queue_entry=hqe,
aborted_by=user, aborted_on=now) for hqe in children
if hqe.id not in existing_hqe_ids]
AbortedHostQueueEntry.objects.bulk_create(aborted_hqes)
# Bulk update all of the HQEs to set the abort bit.
HostQueueEntry.objects.filter(id__in=child_ids).update(aborted=True)
def abort(self):
""" Aborts this host queue entry.
Abort this host queue entry and all host queue entries of jobs created by
this one.
"""
if not self.complete and not self.aborted:
HostQueueEntry.abort_host_queue_entries([self])
@classmethod
def compute_full_status(cls, status, aborted, complete):
"""Returns a modified status msg if the host queue entry was aborted.
@param cls: Implicit class object.
@param status: The original status message.
@param aborted: Whether the host queue entry was aborted.
@param complete: Whether the host queue entry was completed.
"""
if aborted and not complete:
return 'Aborted (%s)' % status
return status
def full_status(self):
"""Returns the full status of this host queue entry, as a string."""
return self.compute_full_status(self.status, self.aborted,
self.complete)
def _postprocess_object_dict(self, object_dict):
object_dict['full_status'] = self.full_status()
class Meta:
"""Metadata for class HostQueueEntry."""
db_table = 'afe_host_queue_entries'
def __unicode__(self):
hostname = None
if self.host:
hostname = self.host.hostname
return u"%s/%d (%d)" % (hostname, self.job.id, self.id)
class HostQueueEntryStartTimes(dbmodels.Model):
"""An auxilary table to HostQueueEntry to index by start time."""
insert_time = dbmodels.DateTimeField()
highest_hqe_id = dbmodels.IntegerField()
class Meta:
"""Metadata for class HostQueueEntryStartTimes."""
db_table = 'afe_host_queue_entry_start_times'
class AbortedHostQueueEntry(dbmodels.Model, model_logic.ModelExtensions):
"""Represents an aborted host queue entry."""
queue_entry = dbmodels.OneToOneField(HostQueueEntry, primary_key=True)
aborted_by = dbmodels.ForeignKey(User)
aborted_on = dbmodels.DateTimeField()
objects = model_logic.ExtendedManager()
def save(self, *args, **kwargs):
self.aborted_on = datetime.now()
super(AbortedHostQueueEntry, self).save(*args, **kwargs)
class Meta:
"""Metadata for class AbortedHostQueueEntry."""
db_table = 'afe_aborted_host_queue_entries'
class SpecialTask(dbmodels.Model, model_logic.ModelExtensions):
"""\
Tasks to run on hosts at the next time they are in the Ready state. Use this
for high-priority tasks, such as forced repair or forced reinstall.
host: host to run this task on
task: special task to run
time_requested: date and time the request for this task was made
is_active: task is currently running
is_complete: task has finished running
is_aborted: task was aborted
time_started: date and time the task started
time_finished: date and time the task finished
queue_entry: Host queue entry waiting on this task (or None, if task was not
started in preparation of a job)
"""
Task = autotest_enum.AutotestEnum('Verify', 'Cleanup', 'Repair', 'Reset',
'Provision', string_values=True)
host = dbmodels.ForeignKey(Host, blank=False, null=False)
task = dbmodels.CharField(max_length=64, choices=Task.choices(),
blank=False, null=False)
requested_by = dbmodels.ForeignKey(User)
time_requested = dbmodels.DateTimeField(auto_now_add=True, blank=False,
null=False)
is_active = dbmodels.BooleanField(default=False, blank=False, null=False)
is_complete = dbmodels.BooleanField(default=False, blank=False, null=False)
is_aborted = dbmodels.BooleanField(default=False, blank=False, null=False)
time_started = dbmodels.DateTimeField(null=True, blank=True)
queue_entry = dbmodels.ForeignKey(HostQueueEntry, blank=True, null=True)
success = dbmodels.BooleanField(default=False, blank=False, null=False)
time_finished = dbmodels.DateTimeField(null=True, blank=True)
objects = model_logic.ExtendedManager()
def save(self, **kwargs):
if self.queue_entry:
self.requested_by = User.objects.get(
login=self.queue_entry.job.owner)
super(SpecialTask, self).save(**kwargs)
def execution_path(self):
"""Returns the execution path for a special task."""
return server_utils.get_special_task_exec_path(
self.host.hostname, self.id, self.task, self.time_requested)
# property to emulate HostQueueEntry.status
@property
def status(self):
"""Returns a host queue entry status appropriate for a speical task."""
return server_utils.get_special_task_status(
self.is_complete, self.success, self.is_active)
# property to emulate HostQueueEntry.started_on
@property
def started_on(self):
"""Returns the time at which this special task started."""
return self.time_started
@classmethod
def schedule_special_task(cls, host, task):
"""Schedules a special task on a host if not already scheduled.
@param cls: Implicit class object.
@param host: The host to use.
@param task: The task to schedule.
"""
existing_tasks = SpecialTask.objects.filter(host__id=host.id, task=task,
is_active=False,
is_complete=False)
if existing_tasks:
return existing_tasks[0]
special_task = SpecialTask(host=host, task=task,
requested_by=User.current_user())
special_task.save()
return special_task
def abort(self):
""" Abort this special task."""
self.is_aborted = True
self.save()
def activate(self):
"""
Sets a task as active and sets the time started to the current time.
"""
logging.info('Starting: %s', self)
self.is_active = True
self.time_started = datetime.now()
self.save()
def finish(self, success):
"""Sets a task as completed.
@param success: Whether or not the task was successful.
"""
logging.info('Finished: %s', self)
self.is_active = False
self.is_complete = True
self.success = success
if self.time_started:
self.time_finished = datetime.now()
self.save()
class Meta:
"""Metadata for class SpecialTask."""
db_table = 'afe_special_tasks'
def __unicode__(self):
result = u'Special Task %s (host %s, task %s, time %s)' % (
self.id, self.host, self.task, self.time_requested)
if self.is_complete:
result += u' (completed)'
elif self.is_active:
result += u' (active)'
return result
class StableVersion(dbmodels.Model, model_logic.ModelExtensions):
board = dbmodels.CharField(max_length=255, unique=True)
version = dbmodels.CharField(max_length=255)
class Meta:
"""Metadata for class StableVersion."""
db_table = 'afe_stable_versions'
def save(self, *args, **kwargs):
if os.getenv("OVERRIDE_STABLE_VERSION_BAN"):
super(StableVersion, self).save(*args, **kwargs)
else:
raise RuntimeError("the ability to save StableVersions has been intentionally removed")
# pylint:disable=undefined-variable
def delete(self):
if os.getenv("OVERRIDE_STABLE_VERSION_BAN"):
super(StableVersion, self).delete(*args, **kwargs)
else:
raise RuntimeError("the ability to delete StableVersions has been intentionally removed")