blob: b22230bb5b37326b61e63fc7e8c9bfecd89c8dc0 [file] [log] [blame]
Autotest client module for the rdb.
import logging
from autotest_lib.client.common_lib import global_config, utils
from autotest_lib.frontend.afe import models
from autotest_lib.scheduler import metahost_scheduler, scheduler_config
from autotest_lib.scheduler import rdb, scheduler_models
from autotest_lib.site_utils.graphite import stats
from autotest_lib.server.cros import provision
get_site_metahost_schedulers = utils.import_site_function(
__file__, 'autotest_lib.scheduler.site_metahost_scheduler',
'get_metahost_schedulers', lambda : ())
class SchedulerError(Exception):
"""Raised by HostScheduler when an inconsistent state occurs."""
class BaseHostScheduler(metahost_scheduler.HostSchedulingUtility):
"""Handles the logic for choosing when to run jobs and on which hosts.
This class makes several queries to the database on each tick, building up
some auxiliary data structures and using them to determine which hosts are
eligible to run which jobs, taking into account all the various factors that
affect that.
In the past this was done with one or two very large, complex database
queries. It has proven much simpler and faster to build these auxiliary
data structures and perform the logic in Python.
_timer = stats.Timer('host_scheduler')
def __init__(self, db):
self._db = db
self._metahost_schedulers = metahost_scheduler.get_metahost_schedulers()
# load site-specific scheduler selected in global_config
site_schedulers_str = global_config.global_config.get_config_value(
scheduler_config.CONFIG_SECTION, 'site_metahost_schedulers',
site_schedulers = set(site_schedulers_str.split(','))
for scheduler in get_site_metahost_schedulers():
if type(scheduler).__name__ in site_schedulers:
# always prepend, so site schedulers take precedence
self._metahost_schedulers = (
[scheduler] + self._metahost_schedulers)'Metahost schedulers: %s',
', '.join(type(scheduler).__name__ for scheduler
in self._metahost_schedulers))
def _get_ready_hosts(self):
# avoid any host with a currently active queue entry against it
hosts = scheduler_models.Host.fetch(
joins='LEFT JOIN afe_host_queue_entries AS active_hqe '
'ON ( = active_hqe.host_id AND '
where="active_hqe.host_id IS NULL "
"AND NOT afe_hosts.locked "
"AND (afe_hosts.status IS NULL "
"OR afe_hosts.status = 'Ready')")
return dict((, host) for host in hosts)
def _get_sql_id_list(self, id_list):
return ','.join(str(item_id) for item_id in id_list)
def _get_many2many_dict(self, query, id_list, flip=False):
if not id_list:
return {}
query %= self._get_sql_id_list(id_list)
rows = self._db.execute(query)
return self._process_many2many_dict(rows, flip)
def _process_many2many_dict(self, rows, flip=False):
result = {}
for row in rows:
left_id, right_id = int(row[0]), int(row[1])
if flip:
left_id, right_id = right_id, left_id
result.setdefault(left_id, set()).add(right_id)
return result
def _get_job_acl_groups(self, job_ids):
query = """
SELECT, afe_acl_groups_users.aclgroup_id
FROM afe_jobs
INNER JOIN afe_users ON afe_users.login = afe_jobs.owner
INNER JOIN afe_acl_groups_users ON
afe_acl_groups_users.user_id =
return self._get_many2many_dict(query, job_ids)
def _get_job_ineligible_hosts(self, job_ids):
query = """
SELECT job_id, host_id
FROM afe_ineligible_host_queues
WHERE job_id IN (%s)
return self._get_many2many_dict(query, job_ids)
def _get_job_dependencies(self, job_ids):
query = """
SELECT job_id, label_id
FROM afe_jobs_dependency_labels
WHERE job_id IN (%s)
return self._get_many2many_dict(query, job_ids)
def _get_host_acls(self, host_ids):
query = """
SELECT host_id, aclgroup_id
FROM afe_acl_groups_hosts
WHERE host_id IN (%s)
return self._get_many2many_dict(query, host_ids)
def _get_label_hosts(self, host_ids):
if not host_ids:
return {}, {}
query = """
SELECT label_id, host_id
FROM afe_hosts_labels
WHERE host_id IN (%s)
""" % self._get_sql_id_list(host_ids)
rows = self._db.execute(query)
labels_to_hosts = self._process_many2many_dict(rows)
hosts_to_labels = self._process_many2many_dict(rows, flip=True)
return labels_to_hosts, hosts_to_labels
def _get_labels(self, job_dependencies):
Calculate a dict mapping label id to label object so that we don't
frequently round trip to the database every time we need a label.
@param job_dependencies: A dict mapping an integer job id to a list of
integer label id's. ie. {job_id: [label_id]}
@return: A dict mapping an integer label id to a scheduler model label
object. ie. {label_id: label_object}
id_to_label = dict()
# Pull all the labels on hosts we might look at
host_labels = scheduler_models.Label.fetch(
where="id IN (SELECT label_id FROM afe_hosts_labels)")
id_to_label.update([(, label) for label in host_labels])
# and pull all the labels on jobs we might look at.
job_label_set = set()
for job_deps in job_dependencies.values():
# On the rare/impossible chance that no jobs have any labels, we
# can skip this.
if job_label_set:
job_string_label_list = ','.join([str(x) for x in job_label_set])
job_labels = scheduler_models.Label.fetch(
where="id IN (%s)" % job_string_label_list)
id_to_label.update([(, label) for label in job_labels])
return id_to_label
def recovery_on_startup(self):
for metahost_scheduler in self._metahost_schedulers:
def refresh(self, pending_queue_entries):
self._hosts_available = self._get_ready_hosts()
relevant_jobs = [queue_entry.job_id
for queue_entry in pending_queue_entries]
self._job_acls = self._get_job_acl_groups(relevant_jobs)
self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
self._job_dependencies = self._get_job_dependencies(relevant_jobs)
host_ids = self._hosts_available.keys()
self._host_acls = self._get_host_acls(host_ids)
self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
self._labels = self._get_labels(self._job_dependencies)
def tick(self):
for metahost_scheduler in self._metahost_schedulers:
def hosts_in_label(self, label_id):
return set(self._label_hosts.get(label_id, ()))
def remove_host_from_label(self, host_id, label_id):
def pop_host(self, host_id):
return self._hosts_available.pop(host_id)
def ineligible_hosts_for_entry(self, queue_entry):
return set(self._ineligible_hosts.get(queue_entry.job_id, ()))
def _is_acl_accessible(self, host_id, queue_entry):
job_acls = self._job_acls.get(queue_entry.job_id, set())
host_acls = self._host_acls.get(host_id, set())
return len(host_acls.intersection(job_acls)) > 0
def _check_job_dependencies(self, job_dependencies, host_labels):
missing = job_dependencies - host_labels
return len(missing) == 0
def _check_only_if_needed_labels(self, job_dependencies, host_labels,
if not queue_entry.meta_host:
# bypass only_if_needed labels when a specific host is selected
return True
for label_id in host_labels:
label = self._labels[label_id]
if not label.only_if_needed:
# we don't care about non-only_if_needed labels
if queue_entry.meta_host == label_id:
# if the label was requested in a metahost it's OK
if label_id not in job_dependencies:
return False
return True
def _check_atomic_group_labels(self, host_labels, queue_entry):
Determine if the given HostQueueEntry's atomic group settings are okay
to schedule on a host with the given labels.
@param host_labels: A list of label ids that the host has.
@param queue_entry: The HostQueueEntry being considered for the host.
@returns True if atomic group settings are okay, False otherwise.
return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
Return the atomic group label id for a host with the given set of
labels if any, or None otherwise. Raises an exception if more than
one atomic group are found in the set of labels.
@param host_labels: A list of label ids that the host has.
@param queue_entry: The HostQueueEntry we're testing. Only used for
extra info in a potential logged error message.
@returns The id of the atomic group found on a label in host_labels
or None if no atomic group label is found.
atomic_labels = [self._labels[label_id] for label_id in host_labels
if self._labels[label_id].atomic_group_id is not None]
atomic_ids = set(label.atomic_group_id for label in atomic_labels)
if not atomic_ids:
return None
if len(atomic_ids) > 1:
logging.error('More than one Atomic Group on HQE "%s" via: %r',
queue_entry, atomic_labels)
return atomic_ids.pop()
def _get_atomic_group_labels(self, atomic_group_id):
Lookup the label ids that an atomic_group is associated with.
@param atomic_group_id - The id of the AtomicGroup to look up.
@returns A generator yeilding Label ids for this atomic group.
return (id for id, label in self._labels.iteritems()
if label.atomic_group_id == atomic_group_id
and not label.invalid)
def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
@param group_hosts - A sequence of Host ids to test for usability
and eligibility against the Job associated with queue_entry.
@param queue_entry - The HostQueueEntry that these hosts are being
tested for eligibility against.
@returns A subset of group_hosts Host ids that are eligible for the
supplied queue_entry.
return set(host_id for host_id in group_hosts
if self.is_host_usable(host_id)
and self.is_host_eligible_for_job(host_id, queue_entry))
def is_host_eligible_for_job(self, host_id, queue_entry):
if self._is_host_invalid(host_id):
# if an invalid host is scheduled for a job, it's a one-time host
# and it therefore bypasses eligibility checks. note this can only
# happen for non-metahosts, because invalid hosts have their label
# relationships cleared.
return True
job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
# Remove provisionable labels from the set of job_dependencies that we
# need to satisfy
job_dependencies = set([dep for dep in job_dependencies if
not provision.can_provision(self._labels[dep].name)])
host_labels = self._host_labels.get(host_id, set())
return (self._is_acl_accessible(host_id, queue_entry) and
self._check_job_dependencies(job_dependencies, host_labels) and
job_dependencies, host_labels, queue_entry) and
self._check_atomic_group_labels(host_labels, queue_entry))
def _is_host_invalid(self, host_id):
host_object = self._hosts_available.get(host_id, None)
return host_object and host_object.invalid
def is_host_usable(self, host_id):
if host_id not in self._hosts_available:
# host was already used during this scheduling cycle
return False
if self._hosts_available[host_id].invalid:
# Invalid hosts cannot be used for metahosts. They're included in
# the original query because they can be used by non-metahosts.
return False
return True
def get_job_info(self, queue_entry):
Extract job information from a queue_entry/host-scheduler.
Unfortunately the information needed to choose hosts for a job
are split across several tables and not restricted to just the
hqe. At the very least we require the deps and acls of the job, but
we also need to know if the job has a host assigned to it. This method
consolidates this information into a light-weight dictionary that the
host_scheduler and the rdb can pass back and forth.
@param queue_entry: the queue_entry of the job we would like
information about.
@return: A dictionary containing 1. A set of deps 2. A set of acls
3. The host id of the host assigned to the hqe, or None.
job_id = queue_entry.job_id
host_id = queue_entry.host_id
job_deps = self._job_dependencies.get(job_id, set())
job_deps = set([dep for dep in job_deps if
not provision.can_provision(self._labels[dep].name)])
job_acls = self._job_acls.get(job_id, set())
return {'deps': set(job_deps),
'acls': set(job_acls),
'host_id': host_id}
def schedule_entry(self, queue_entry):
Schedule a hqe aginst a host.
A hqe can either have a host assigned to it or not. In eithercase
however, actually scheduling the hqe on the host involves validating
the assignment by checking acls and labels. If the hqe doesn't have a
host we need to find a host before we can perform this validation.
If we successfully validate the host->hqe pairing, return the host. The
scheduler will not begin scheduling special tasks for the hqe until it
acquires a valid host.
@param queue_entry: The queue_entry that requires a host.
@return: The host assigned to the hqe, if any.
host_id = queue_entry.host_id
job_id = queue_entry.job_id
job_info = self.get_job_info(queue_entry)
host = None
if host_id:
host = self._hosts_available.get(host_id, None)
# TODO(beeps): Remove the need for 2 rdb calls. Ideally we should
# just do one call to validate the assignment, however, since we're
# currently still using the host_scheduler, we'd need to pass it
# as an argument to validate_host_assignment, which is less clean
# than just splitting this work into 2 calls.
host_info = rdb.get_host_info(self, host_id)
# If the host is either unavailable or in-eligible for this job,
# defer scheduling this queue_entry till the next tick.
if (host is None or not
rdb.validate_host_assignment(job_info, host_info)):
return None
host = rdb.get_host(self, job_info)
if host is None:
return None
# TODO(beeps): Make it so we don't need to set the hqe active status
# to remove a host from the active pool.
# A host will remain in the available pool for as long as its status
# is Ready and it is not referenced by an active hqe. The state of
# the host is not under our control, as it will only change to
# resetting etc whenever the prejob task starts. However, the hqe
# is theoretically active from the moment we assign a healthy host
# to it. Setting the host on an inactive hqe will not remove it
# from the available pool, leading to unnecessary scheduling
# overhead.
# Without this, we will process each hqe twice because it is still
# judged as 'new', and perform the host<->hqe assignment twice,
# because the host assigned to the hqe is still 'available', as
# the first prejob task only runs at the end of the next tick's
# handle_agents call. Note that the status is still 'Queued', and
# will remaing 'Queued' till an agent changes it.
queue_entry.update_field('active', True)
# The available_hosts dictionary determines our scheduling decisions
# for subsequent jobs processed in this tick.
logging.debug('Scheduling job: %s, Host %s', job_id,
return host
def find_eligible_atomic_group(self, queue_entry):
Given an atomic group host queue entry, locate an appropriate group
of hosts for the associated job to run on.
The caller is responsible for creating new HQEs for the additional
hosts returned in order to run the actual job on them.
@returns A list of Host instances in a ready state to satisfy this
atomic group scheduling. Hosts will all belong to the same
atomic group label as specified by the queue_entry.
An empty list will be returned if no suitable atomic
group could be found.
TODO(gps): what is responsible for kicking off any attempted repairs on
a group of hosts? not this function, but something needs to. We do
not communicate that reason for returning [] outside of here...
For now, we'll just be unschedulable if enough hosts within one group
enter Repair Failed state.
assert queue_entry.atomic_group_id is not None
job = queue_entry.job
assert job.synch_count and job.synch_count > 0
atomic_group = queue_entry.atomic_group
if job.synch_count > atomic_group.max_number_of_machines:
# Such a Job and HostQueueEntry should never be possible to
# create using the frontend. Regardless, we can't process it.
# Abort it immediately and log an error on the scheduler.
'Error: job %d synch_count=%d > requested atomic_group %d '
'max_number_of_machines=%d. Aborted host_queue_entry %d.',, job.synch_count,,
return []
hosts_in_label = self.hosts_in_label(queue_entry.meta_host)
ineligible_host_ids = self.ineligible_hosts_for_entry(queue_entry)
# Look in each label associated with atomic_group until we find one with
# enough hosts to satisfy the job.
for atomic_label_id in self._get_atomic_group_labels(
group_hosts = set(self.hosts_in_label(atomic_label_id))
if queue_entry.meta_host is not None:
# If we have a metahost label, only allow its hosts.
group_hosts -= ineligible_host_ids
eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
group_hosts, queue_entry)
# Job.synch_count is treated as "minimum synch count" when
# scheduling for an atomic group of hosts. The atomic group
# number of machines is the maximum to pick out of a single
# atomic group label for scheduling at one time.
min_hosts = job.synch_count
max_hosts = atomic_group.max_number_of_machines
if len(eligible_host_ids_in_group) < min_hosts:
# Not enough eligible hosts in this atomic group label.
eligible_hosts_in_group = [self._hosts_available[id]
for id in eligible_host_ids_in_group]
# So that they show up in a sane order when viewing the job.
# Limit ourselves to scheduling the atomic group size.
if len(eligible_hosts_in_group) > max_hosts:
eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
# Remove the selected hosts from our cached internal state
# of available hosts in order to return the Host objects.
host_list = []
for host in eligible_hosts_in_group:
return host_list
return []
site_host_scheduler = utils.import_site_class(
__file__, 'autotest_lib.scheduler.site_host_scheduler',
'site_host_scheduler', BaseHostScheduler)
class HostScheduler(site_host_scheduler):