| #pylint: disable-msg=C0111 |
| |
| # Copyright (c) 2014 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Scheduler library classes. |
| """ |
| |
| import collections |
| import logging |
| |
| import common |
| |
| from autotest_lib.client.common_lib.cros.graphite import stats |
| from autotest_lib.frontend import setup_django_environment |
| from autotest_lib.frontend.afe import models |
| from autotest_lib.scheduler import scheduler_models |
| from autotest_lib.scheduler import scheduler_lib |
| |
| |
| class AFEJobQueryManager(object): |
| """Query manager for AFE Jobs.""" |
| |
| # A subquery to only get inactive hostless jobs. |
| hostless_query = 'host_id IS NULL AND meta_host IS NULL' |
| |
| |
| def get_pending_queue_entries(self, only_hostless=False): |
| """ |
| Fetch a list of new host queue entries. |
| |
| The ordering of this list is important, as every new agent |
| we schedule can potentially contribute to the process count |
| on the drone, which has a static limit. The sort order |
| prioritizes jobs as follows: |
| 1. High priority jobs: Based on the afe_job's priority |
| 2. With hosts and metahosts: This will only happen if we don't |
| activate the hqe after assigning a host to it in |
| schedule_new_jobs. |
| 3. With hosts but without metahosts: When tests are scheduled |
| through the frontend the owner of the job would have chosen |
| a host for it. |
| 4. Without hosts but with metahosts: This is the common case of |
| a new test that needs a DUT. We assign a host and set it to |
| active so it shouldn't show up in case 2 on the next tick. |
| 5. Without hosts and without metahosts: Hostless suite jobs, that |
| will result in new jobs that fall under category 4. |
| |
| A note about the ordering of cases 3 and 4: |
| Prioritizing one case above the other leads to earlier acquisition |
| of the following resources: 1. process slots on the drone 2. machines. |
| - When a user schedules a job through the afe they choose a specific |
| host for it. Jobs with metahost can utilize any host that satisfies |
| the metahost criterion. This means that if we had scheduled 4 before |
| 3 there is a good chance that a job which could've used another host, |
| will now use the host assigned to a metahost-less job. Given the |
| availability of machines in pool:suites, this almost guarantees |
| starvation for jobs scheduled through the frontend. |
| - Scheduling 4 before 3 also has its pros however, since a suite |
| has the concept of a time out, whereas users can wait. If we hit the |
| process count on the drone a suite can timeout waiting on the test, |
| but a user job generally has a much longer timeout, and relatively |
| harmless consequences. |
| The current ordering was chosed because it is more likely that we will |
| run out of machines in pool:suites than processes on the drone. |
| |
| @returns A list of HQEs ordered according to sort_order. |
| """ |
| sort_order = ('afe_jobs.priority DESC, ' |
| 'ISNULL(host_id), ' |
| 'ISNULL(meta_host), ' |
| 'parent_job_id, ' |
| 'job_id') |
| query=('NOT complete AND NOT active AND status="Queued"' |
| 'AND NOT aborted') |
| if only_hostless: |
| query = '%s AND (%s)' % (query, self.hostless_query) |
| return list(scheduler_models.HostQueueEntry.fetch( |
| joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)', |
| where=query, order_by=sort_order)) |
| |
| |
| def get_prioritized_special_tasks(self, only_tasks_with_leased_hosts=False): |
| """ |
| Returns all queued SpecialTasks prioritized for repair first, then |
| cleanup, then verify. |
| |
| @param only_tasks_with_leased_hosts: If true, this method only returns |
| tasks with leased hosts. |
| |
| @return: list of afe.models.SpecialTasks sorted according to priority. |
| """ |
| queued_tasks = models.SpecialTask.objects.filter(is_active=False, |
| is_complete=False, |
| host__locked=False) |
| # exclude hosts with active queue entries unless the SpecialTask is for |
| # that queue entry |
| queued_tasks = models.SpecialTask.objects.add_join( |
| queued_tasks, 'afe_host_queue_entries', 'host_id', |
| join_condition='afe_host_queue_entries.active', |
| join_from_key='host_id', force_left_join=True) |
| queued_tasks = queued_tasks.extra( |
| where=['(afe_host_queue_entries.id IS NULL OR ' |
| 'afe_host_queue_entries.id = ' |
| 'afe_special_tasks.queue_entry_id)']) |
| if only_tasks_with_leased_hosts: |
| queued_tasks = queued_tasks.filter(host__leased=True) |
| |
| # reorder tasks by priority |
| task_priority_order = [models.SpecialTask.Task.REPAIR, |
| models.SpecialTask.Task.CLEANUP, |
| models.SpecialTask.Task.VERIFY, |
| models.SpecialTask.Task.RESET, |
| models.SpecialTask.Task.PROVISION] |
| def task_priority_key(task): |
| return task_priority_order.index(task.task) |
| return sorted(queued_tasks, key=task_priority_key) |
| |
| |
| @classmethod |
| def get_overlapping_jobs(cls): |
| """A helper method to get all active jobs using the same host. |
| |
| @return: A list of dictionaries with the hqe id, job_id and host_id |
| of the currently overlapping jobs. |
| """ |
| # Filter all active hqes and stand alone special tasks to make sure |
| # a host isn't being used by two jobs at the same time. An incomplete |
| # stand alone special task can share a host with an active hqe, an |
| # example of this is the cleanup scheduled in gathering. |
| hqe_hosts = list(models.HostQueueEntry.objects.filter( |
| active=1, complete=0, host_id__isnull=False).values_list( |
| 'host_id', flat=True)) |
| special_task_hosts = list(models.SpecialTask.objects.filter( |
| is_active=1, is_complete=0, host_id__isnull=False, |
| queue_entry_id__isnull=True).values_list('host_id', flat=True)) |
| host_counts = collections.Counter( |
| hqe_hosts + special_task_hosts).most_common() |
| multiple_hosts = [count[0] for count in host_counts if count[1] > 1] |
| return list(models.HostQueueEntry.objects.filter( |
| host_id__in=multiple_hosts, active=True).values( |
| 'id', 'job_id', 'host_id')) |
| |
| |
| _timer = stats.Timer('scheduler.host_query_manager') |
| class AFEHostQueryManager(object): |
| """Query manager for AFE Hosts.""" |
| |
| def __init__(self): |
| """Create an AFEHostQueryManager. |
| |
| @param db: A connection to the database with the afe_hosts table. |
| """ |
| self._db = scheduler_lib.ConnectionManager().get_connection() |
| |
| |
| def _process_many2many_dict(self, rows, flip=False): |
| result = {} |
| for row in rows: |
| left_id, right_id = int(row[0]), int(row[1]) |
| if flip: |
| left_id, right_id = right_id, left_id |
| result.setdefault(left_id, set()).add(right_id) |
| return result |
| |
| |
| def _get_sql_id_list(self, id_list): |
| return ','.join(str(item_id) for item_id in id_list) |
| |
| |
| def _get_many2many_dict(self, query, id_list, flip=False): |
| if not id_list: |
| return {} |
| query %= self._get_sql_id_list(id_list) |
| rows = self._db.execute(query) |
| return self._process_many2many_dict(rows, flip) |
| |
| |
| @_timer.decorate |
| def _get_ready_hosts(self): |
| # We don't lose anything by re-doing these checks |
| # even though we release hosts on the same conditions. |
| # In the future we might have multiple clients that |
| # release_hosts and/or lock them independent of the |
| # scheduler tick. |
| hosts = scheduler_models.Host.fetch( |
| where="NOT afe_hosts.leased " |
| "AND NOT afe_hosts.locked " |
| "AND (afe_hosts.status IS NULL " |
| "OR afe_hosts.status = 'Ready')") |
| return dict((host.id, host) for host in hosts) |
| |
| |
| @_timer.decorate |
| def _get_job_acl_groups(self, job_ids): |
| query = """ |
| SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id |
| FROM afe_jobs |
| INNER JOIN afe_users ON afe_users.login = afe_jobs.owner |
| INNER JOIN afe_acl_groups_users ON |
| afe_acl_groups_users.user_id = afe_users.id |
| WHERE afe_jobs.id IN (%s) |
| """ |
| return self._get_many2many_dict(query, job_ids) |
| |
| |
| @_timer.decorate |
| def _get_job_ineligible_hosts(self, job_ids): |
| query = """ |
| SELECT job_id, host_id |
| FROM afe_ineligible_host_queues |
| WHERE job_id IN (%s) |
| """ |
| return self._get_many2many_dict(query, job_ids) |
| |
| |
| @_timer.decorate |
| def _get_job_dependencies(self, job_ids): |
| query = """ |
| SELECT job_id, label_id |
| FROM afe_jobs_dependency_labels |
| WHERE job_id IN (%s) |
| """ |
| return self._get_many2many_dict(query, job_ids) |
| |
| @_timer.decorate |
| def _get_host_acls(self, host_ids): |
| query = """ |
| SELECT host_id, aclgroup_id |
| FROM afe_acl_groups_hosts |
| WHERE host_id IN (%s) |
| """ |
| return self._get_many2many_dict(query, host_ids) |
| |
| |
| @_timer.decorate |
| def _get_label_hosts(self, host_ids): |
| if not host_ids: |
| return {}, {} |
| query = """ |
| SELECT label_id, host_id |
| FROM afe_hosts_labels |
| WHERE host_id IN (%s) |
| """ % self._get_sql_id_list(host_ids) |
| rows = self._db.execute(query) |
| labels_to_hosts = self._process_many2many_dict(rows) |
| hosts_to_labels = self._process_many2many_dict(rows, flip=True) |
| return labels_to_hosts, hosts_to_labels |
| |
| |
| @classmethod |
| def find_unused_healty_hosts(cls): |
| """Get hosts that are currently unused and in the READY state. |
| |
| @return: A list of host objects, one for each unused healthy host. |
| """ |
| # Avoid any host with a currently active queue entry against it. |
| hqe_join = ('LEFT JOIN afe_host_queue_entries AS active_hqe ' |
| 'ON (afe_hosts.id = active_hqe.host_id AND ' |
| 'active_hqe.active)') |
| |
| # Avoid any host with a new special task against it. There are 2 cases |
| # when an inactive but incomplete special task will not use the host |
| # this tick: 1. When the host is locked 2. When an active hqe already |
| # has special tasks for the same host. In both these cases this host |
| # will not be in the ready hosts list anyway. In all other cases, |
| # an incomplete special task will grab the host before a new job does |
| # by assigning an agent to it. |
| special_task_join = ('LEFT JOIN afe_special_tasks as new_tasks ' |
| 'ON (afe_hosts.id = new_tasks.host_id AND ' |
| 'new_tasks.is_complete=0)') |
| |
| return scheduler_models.Host.fetch( |
| joins='%s %s' % (hqe_join, special_task_join), |
| where="active_hqe.host_id IS NULL AND new_tasks.host_id IS NULL " |
| "AND afe_hosts.leased " |
| "AND NOT afe_hosts.locked " |
| "AND (afe_hosts.status IS NULL " |
| "OR afe_hosts.status = 'Ready')") |
| |
| |
| @_timer.decorate |
| def set_leased(self, leased_value, **kwargs): |
| """Modify the leased bit on the hosts with ids in host_ids. |
| |
| @param leased_value: The True/False value of the leased column for |
| the hosts with ids in host_ids. |
| @param kwargs: The args to use in finding matching hosts. |
| """ |
| logging.info('Setting leased = %s for the hosts that match %s', |
| leased_value, kwargs) |
| models.Host.objects.filter(**kwargs).update(leased=leased_value) |
| |
| |
| @_timer.decorate |
| def _get_labels(self, job_dependencies): |
| """ |
| Calculate a dict mapping label id to label object so that we don't |
| frequently round trip to the database every time we need a label. |
| |
| @param job_dependencies: A dict mapping an integer job id to a list of |
| integer label id's. ie. {job_id: [label_id]} |
| @return: A dict mapping an integer label id to a scheduler model label |
| object. ie. {label_id: label_object} |
| |
| """ |
| id_to_label = dict() |
| # Pull all the labels on hosts we might look at |
| host_labels = scheduler_models.Label.fetch( |
| where="id IN (SELECT label_id FROM afe_hosts_labels)") |
| id_to_label.update([(label.id, label) for label in host_labels]) |
| # and pull all the labels on jobs we might look at. |
| job_label_set = set() |
| for job_deps in job_dependencies.values(): |
| job_label_set.update(job_deps) |
| # On the rare/impossible chance that no jobs have any labels, we |
| # can skip this. |
| if job_label_set: |
| job_string_label_list = ','.join([str(x) for x in job_label_set]) |
| job_labels = scheduler_models.Label.fetch( |
| where="id IN (%s)" % job_string_label_list) |
| id_to_label.update([(label.id, label) for label in job_labels]) |
| return id_to_label |
| |
| |
| @_timer.decorate |
| def refresh(self, pending_queue_entries): |
| """Update the query manager. |
| |
| Cache information about a list of queue entries and eligible hosts |
| from the database so clients can avoid expensive round trips during |
| host acquisition. |
| |
| @param pending_queue_entries: A list of queue entries about which we |
| need information. |
| """ |
| self._hosts_available = self._get_ready_hosts() |
| relevant_jobs = [queue_entry.job_id |
| for queue_entry in pending_queue_entries] |
| self._job_acls = self._get_job_acl_groups(relevant_jobs) |
| self._ineligible_hosts = (self._get_job_ineligible_hosts(relevant_jobs)) |
| self._job_dependencies = (self._get_job_dependencies(relevant_jobs)) |
| host_ids = self._hosts_available.keys() |
| self._host_acls = self._get_host_acls(host_ids) |
| self._label_hosts, self._host_labels = ( |
| self._get_label_hosts(host_ids)) |
| self._labels = self._get_labels(self._job_dependencies) |
| |