| #!/usr/bin/python |
| #pylint: disable-msg=C0111 |
| |
| # Copyright (c) 2014 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Host scheduler. |
| |
| If run as a standalone service, the host scheduler ensures the following: |
| 1. Hosts will not be assigned to multiple hqes simultaneously. The process |
| of assignment in this case refers to the modification of the host_id |
| column of a row in the afe_host_queue_entries table, to reflect the host |
| id of a leased host matching the dependencies of the job. |
| 2. Hosts that are not being used by active hqes or incomplete special tasks |
| will be released back to the available hosts pool, for acquisition by |
| subsequent hqes. |
| In addition to these guarantees, the host scheduler also confirms that no 2 |
| active hqes/special tasks are assigned the same host, and sets the leased bit |
| for hosts needed by frontend special tasks. The need for the latter is only |
| apparent when viewed in the context of the job-scheduler (monitor_db), which |
| runs special tasks only after their hosts have been leased. |
| |
| ** Suport minimum duts requirement for suites (non-inline mode) ** |
| |
| Each suite can specify the minimum number of duts it requires by |
| dropping a 'suite_min_duts' job keyval which defaults to 0. |
| |
| When suites are competing for duts, if any suite has not got minimum duts |
| it requires, the host scheduler will try to meet the requirement first, |
| even if other suite may have higher priority or earlier timestamp. Once |
| all suites' minimum duts requirement have been fullfilled, the host |
| scheduler will allocate the rest of duts based on job priority and suite job id. |
| This is to prevent low priority suites from starving when sharing pool with |
| high-priority suites. |
| |
| Note: |
| 1. Prevent potential starvation: |
| We need to carefully choose |suite_min_duts| for both low and high |
| priority suites. If a high priority suite didn't specify it but a low |
| priority one does, the high priority suite can be starved! |
| 2. Restart requirement: |
| Restart host scheduler if you manually released a host by setting |
| leased=0 in db. This is needed because host scheduler maintains internal |
| state of host assignment for suites. |
| 3. Exchanging duts triggers provisioning: |
| TODO(fdeng): There is a chance two suites can exchange duts, |
| if the two suites are for different builds, the exchange |
| will trigger provisioning. This can be optimized by preferring getting |
| hosts with the same build. |
| """ |
| |
| import argparse |
| import collections |
| import datetime |
| import logging |
| import os |
| import signal |
| import sys |
| import time |
| |
| import common |
| from autotest_lib.client.common_lib import utils |
| from autotest_lib.frontend import setup_django_environment |
| |
| # This import needs to come earlier to avoid using autotest's version of |
| # httplib2, which is out of date. |
| try: |
| from chromite.lib import metrics |
| from chromite.lib import ts_mon_config |
| except ImportError: |
| metrics = utils.metrics_mock |
| ts_mon_config = utils.metrics_mock |
| |
| from autotest_lib.client.common_lib import global_config |
| from autotest_lib.scheduler import email_manager |
| from autotest_lib.scheduler import query_managers |
| from autotest_lib.scheduler import rdb_lib |
| from autotest_lib.scheduler import rdb_utils |
| from autotest_lib.scheduler import scheduler_lib |
| from autotest_lib.scheduler import scheduler_models |
| from autotest_lib.site_utils import server_manager_utils |
| |
| |
| _db_manager = None |
| _shutdown = False |
| _tick_pause_sec = global_config.global_config.get_config_value( |
| 'SCHEDULER', 'tick_pause_sec', type=int, default=5) |
| _monitor_db_host_acquisition = global_config.global_config.get_config_value( |
| 'SCHEDULER', 'inline_host_acquisition', type=bool, default=True) |
| _METRICS_PREFIX = 'chromeos/autotest/host_scheduler' |
| |
| class SuiteRecorder(object): |
| """Recording the host assignment for suites. |
| |
| The recorder holds two things: |
| * suite_host_num, records how many duts a suite is holding, |
| which is a map <suite_job_id -> num_of_hosts> |
| * hosts_to_suites, records which host is assigned to which |
| suite, it is a map <host_id -> suite_job_id> |
| The two datastructure got updated when a host is assigned to or released |
| by a job. |
| |
| The reason to maintain hosts_to_suites is that, when a host is released, |
| we need to know which suite it was leased to. Querying the db for the |
| latest completed job that has run on a host is slow. Therefore, we go with |
| an alternative: keeping a <host id, suite job id> map |
| in memory (for 10K hosts, the map should take less than 1M memory on |
| 64-bit machine with python 2.7) |
| |
| """ |
| |
| |
| def __init__(self, job_query_manager): |
| """Initialize. |
| |
| @param job_queue_manager: A JobQueueryManager object. |
| """ |
| self.job_query_manager = job_query_manager |
| self.suite_host_num, self.hosts_to_suites = ( |
| self.job_query_manager.get_suite_host_assignment()) |
| |
| |
| def record_assignment(self, queue_entry): |
| """Record that the hqe has got a host. |
| |
| @param queue_entry: A scheduler_models.HostQueueEntry object which has |
| got a host. |
| """ |
| parent_id = queue_entry.job.parent_job_id |
| if not parent_id: |
| return |
| if self.hosts_to_suites.get(queue_entry.host_id, None) == parent_id: |
| logging.error('HQE (id: %d, parent_job_id: %d, host: %s) ' |
| 'seems already recorded', queue_entry.id, |
| parent_id, queue_entry.host.hostname) |
| return |
| num_hosts = self.suite_host_num.get(parent_id, 0) |
| self.suite_host_num[parent_id] = num_hosts + 1 |
| self.hosts_to_suites[queue_entry.host_id] = parent_id |
| logging.debug('Suite %d got host %s, currently holding %d hosts', |
| parent_id, queue_entry.host.hostname, |
| self.suite_host_num[parent_id]) |
| |
| |
| def record_release(self, hosts): |
| """Update the record with host releasing event. |
| |
| @param hosts: A list of scheduler_models.Host objects. |
| """ |
| for host in hosts: |
| if host.id in self.hosts_to_suites: |
| parent_job_id = self.hosts_to_suites.pop(host.id) |
| count = self.suite_host_num[parent_job_id] - 1 |
| if count == 0: |
| del self.suite_host_num[parent_job_id] |
| else: |
| self.suite_host_num[parent_job_id] = count |
| logging.debug( |
| 'Suite %d releases host %s, currently holding %d hosts', |
| parent_job_id, host.hostname, count) |
| |
| |
| def get_min_duts(self, suite_job_ids): |
| """Figure out min duts to request. |
| |
| Given a set ids of suite jobs, figure out minimum duts to request for |
| each suite. It is determined by two factors: min_duts specified |
| for each suite in its job keyvals, and how many duts a suite is |
| currently holding. |
| |
| @param suite_job_ids: A set of suite job ids. |
| |
| @returns: A dictionary, the key is suite_job_id, the value |
| is the minimum number of duts to request. |
| """ |
| suite_min_duts = self.job_query_manager.get_min_duts_of_suites( |
| suite_job_ids) |
| for parent_id in suite_job_ids: |
| min_duts = suite_min_duts.get(parent_id, 0) |
| cur_duts = self.suite_host_num.get(parent_id, 0) |
| suite_min_duts[parent_id] = max(0, min_duts - cur_duts) |
| logging.debug('Minimum duts to get for suites (suite_id: min_duts): %s', |
| suite_min_duts) |
| return suite_min_duts |
| |
| |
| class BaseHostScheduler(object): |
| """Base class containing host acquisition logic. |
| |
| This class contains all the core host acquisition logic needed by the |
| scheduler to run jobs on hosts. It is only capable of releasing hosts |
| back to the rdb through its tick, any other action must be instigated by |
| the job scheduler. |
| """ |
| |
| |
| host_assignment = collections.namedtuple('host_assignment', ['host', 'job']) |
| |
| |
| def __init__(self): |
| self.host_query_manager = query_managers.AFEHostQueryManager() |
| |
| |
| def _release_hosts(self): |
| """Release hosts to the RDB. |
| |
| Release all hosts that are ready and are currently not being used by an |
| active hqe, and don't have a new special task scheduled against them. |
| |
| @return a list of hosts that are released. |
| """ |
| release_hosts = self.host_query_manager.find_unused_healty_hosts() |
| release_hostnames = [host.hostname for host in release_hosts] |
| if release_hostnames: |
| self.host_query_manager.set_leased( |
| False, hostname__in=release_hostnames) |
| return release_hosts |
| |
| |
| @classmethod |
| def schedule_host_job(cls, host, queue_entry): |
| """Schedule a job on a host. |
| |
| Scheduling a job involves: |
| 1. Setting the active bit on the queue_entry. |
| 2. Scheduling a special task on behalf of the queue_entry. |
| Performing these actions will lead the job scheduler through a chain of |
| events, culminating in running the test and collecting results from |
| the host. |
| |
| @param host: The host against which to schedule the job. |
| @param queue_entry: The queue_entry to schedule. |
| """ |
| if queue_entry.host_id is None: |
| queue_entry.set_host(host) |
| elif host.id != queue_entry.host_id: |
| raise rdb_utils.RDBException('The rdb returned host: %s ' |
| 'but the job:%s was already assigned a host: %s. ' % |
| (host.hostname, queue_entry.job_id, |
| queue_entry.host.hostname)) |
| queue_entry.update_field('active', True) |
| |
| # TODO: crbug.com/373936. The host scheduler should only be assigning |
| # jobs to hosts, but the criterion we use to release hosts depends |
| # on it not being used by an active hqe. Since we're activating the |
| # hqe here, we also need to schedule its first prejob task. OTOH, |
| # we could converge to having the host scheduler manager all special |
| # tasks, since their only use today is to verify/cleanup/reset a host. |
| logging.info('Scheduling pre job tasks for entry: %s', queue_entry) |
| queue_entry.schedule_pre_job_tasks() |
| |
| |
| def acquire_hosts(self, host_jobs): |
| """Accquire hosts for given jobs. |
| |
| This method sends jobs that need hosts to rdb. |
| Child class can override this method to pipe more args |
| to rdb. |
| |
| @param host_jobs: A list of queue entries that either require hosts, |
| or require host assignment validation through the rdb. |
| |
| @param return: A generator that yields an rdb_hosts.RDBClientHostWrapper |
| for each host acquired on behalf of a queue_entry, |
| or None if a host wasn't found. |
| """ |
| return rdb_lib.acquire_hosts(host_jobs) |
| |
| |
| def find_hosts_for_jobs(self, host_jobs): |
| """Find and verify hosts for a list of jobs. |
| |
| @param host_jobs: A list of queue entries that either require hosts, |
| or require host assignment validation through the rdb. |
| @return: A generator of tuples of the form (host, queue_entry) for each |
| valid host-queue_entry assignment. |
| """ |
| hosts = self.acquire_hosts(host_jobs) |
| for host, job in zip(hosts, host_jobs): |
| if host: |
| yield self.host_assignment(host, job) |
| |
| |
| def tick(self): |
| """Schedule core host management activities.""" |
| self._release_hosts() |
| |
| |
| class HostScheduler(BaseHostScheduler): |
| """A scheduler capable managing host acquisition for new jobs.""" |
| |
| |
| def __init__(self): |
| super(HostScheduler, self).__init__() |
| self.job_query_manager = query_managers.AFEJobQueryManager() |
| # Keeping track on how many hosts each suite is holding |
| # {suite_job_id: num_hosts} |
| self._suite_recorder = SuiteRecorder(self.job_query_manager) |
| |
| |
| def _record_host_assignment(self, host, queue_entry): |
| """Record that |host| is assigned to |queue_entry|. |
| |
| Record: |
| 1. How long it takes to assign a host to a job in metadata db. |
| 2. Record host assignment of a suite. |
| |
| @param host: A Host object. |
| @param queue_entry: A HostQueueEntry object. |
| """ |
| secs_in_queued = (datetime.datetime.now() - |
| queue_entry.job.created_on).total_seconds() |
| self._suite_recorder.record_assignment(queue_entry) |
| |
| |
| @metrics.SecondsTimerDecorator( |
| '%s/schedule_jobs_duration' % _METRICS_PREFIX) |
| def _schedule_jobs(self): |
| """Schedule new jobs against hosts.""" |
| |
| new_jobs_with_hosts = 0 |
| queue_entries = self.job_query_manager.get_pending_queue_entries( |
| only_hostless=False) |
| unverified_host_jobs = [job for job in queue_entries |
| if not job.is_hostless()] |
| if unverified_host_jobs: |
| for acquisition in self.find_hosts_for_jobs(unverified_host_jobs): |
| self.schedule_host_job(acquisition.host, acquisition.job) |
| self._record_host_assignment(acquisition.host, acquisition.job) |
| new_jobs_with_hosts += 1 |
| metrics.Counter('%s/new_jobs_with_hosts' % _METRICS_PREFIX |
| ).increment_by(new_jobs_with_hosts) |
| |
| num_jobs_without_hosts = (len(unverified_host_jobs) - |
| new_jobs_with_hosts) |
| metrics.Gauge('%s/current_jobs_without_hosts' % _METRICS_PREFIX |
| ).set(num_jobs_without_hosts) |
| |
| metrics.Counter('%s/tick' % _METRICS_PREFIX).increment() |
| |
| @metrics.SecondsTimerDecorator('%s/lease_hosts_duration' % _METRICS_PREFIX) |
| def _lease_hosts_of_frontend_tasks(self): |
| """Lease hosts of tasks scheduled through the frontend.""" |
| # We really don't need to get all the special tasks here, just the ones |
| # without hqes, but reusing the method used by the scheduler ensures |
| # we prioritize the same way. |
| lease_hostnames = [ |
| task.host.hostname for task in |
| self.job_query_manager.get_prioritized_special_tasks( |
| only_tasks_with_leased_hosts=False) |
| if task.queue_entry_id is None and not task.host.leased] |
| # Leasing a leased hosts here shouldn't be a problem: |
| # 1. The only way a host can be leased is if it's been assigned to |
| # an active hqe or another similar frontend task, but doing so will |
| # have already precluded it from the list of tasks returned by the |
| # job_query_manager. |
| # 2. The unleasing is done based on global conditions. Eg: Even if a |
| # task has already leased a host and we lease it again, the |
| # host scheduler won't release the host till both tasks are complete. |
| if lease_hostnames: |
| self.host_query_manager.set_leased( |
| True, hostname__in=lease_hostnames) |
| |
| |
| def acquire_hosts(self, host_jobs): |
| """Override acquire_hosts. |
| |
| This method overrides the method in parent class. |
| It figures out a set of suites that |host_jobs| belong to; |
| and get min_duts requirement for each suite. |
| It pipes min_duts for each suite to rdb. |
| |
| """ |
| parent_job_ids = set([q.job.parent_job_id |
| for q in host_jobs if q.job.parent_job_id]) |
| suite_min_duts = self._suite_recorder.get_min_duts(parent_job_ids) |
| return rdb_lib.acquire_hosts(host_jobs, suite_min_duts) |
| |
| |
| @metrics.SecondsTimerDecorator('%s/tick_time' % _METRICS_PREFIX) |
| def tick(self): |
| logging.info('Calling new tick.') |
| logging.info('Leasing hosts for frontend tasks.') |
| self._lease_hosts_of_frontend_tasks() |
| logging.info('Finding hosts for new jobs.') |
| self._schedule_jobs() |
| logging.info('Releasing unused hosts.') |
| released_hosts = self._release_hosts() |
| logging.info('Updating suite assignment with released hosts') |
| self._suite_recorder.record_release(released_hosts) |
| logging.info('Calling email_manager.') |
| email_manager.manager.send_queued_emails() |
| |
| |
| class DummyHostScheduler(BaseHostScheduler): |
| """A dummy host scheduler that doesn't acquire or release hosts.""" |
| |
| def __init__(self): |
| pass |
| |
| |
| def tick(self): |
| pass |
| |
| |
| def handle_signal(signum, frame): |
| """Sigint handler so we don't crash mid-tick.""" |
| global _shutdown |
| _shutdown = True |
| logging.info("Shutdown request received.") |
| |
| |
| def initialize(testing=False): |
| """Initialize the host scheduler.""" |
| if testing: |
| # Don't import testing utilities unless we're in testing mode, |
| # as the database imports have side effects. |
| from autotest_lib.scheduler import rdb_testing_utils |
| rdb_testing_utils.FileDatabaseHelper().initialize_database_for_testing( |
| db_file_path=rdb_testing_utils.FileDatabaseHelper.DB_FILE) |
| global _db_manager |
| _db_manager = scheduler_lib.ConnectionManager() |
| scheduler_lib.setup_logging( |
| os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None), |
| None, timestamped_logfile_prefix='host_scheduler') |
| logging.info("Setting signal handler") |
| signal.signal(signal.SIGINT, handle_signal) |
| signal.signal(signal.SIGTERM, handle_signal) |
| scheduler_models.initialize() |
| |
| |
| def parse_arguments(argv): |
| """ |
| Parse command line arguments |
| |
| @param argv: argument list to parse |
| @returns: parsed arguments. |
| """ |
| parser = argparse.ArgumentParser(description='Host scheduler.') |
| parser.add_argument('--testing', action='store_true', default=False, |
| help='Start the host scheduler in testing mode.') |
| parser.add_argument('--production', |
| help=('Indicate that scheduler is running in production' |
| ' environment and it can use database that is not' |
| ' hosted in localhost. If it is set to False, ' |
| 'scheduler will fail if database is not in ' |
| 'localhost.'), |
| action='store_true', default=False) |
| parser.add_argument( |
| '--lifetime-hours', |
| type=float, |
| default=None, |
| help='If provided, number of hours the scheduler should run for. ' |
| 'At the expiry of this time, the process will exit ' |
| 'gracefully.', |
| ) |
| parser.add_argument( |
| '--metrics-file', |
| help='If provided, drop metrics to this local file instead of ' |
| 'reporting to ts_mon', |
| type=str, |
| default=None, |
| ) |
| options = parser.parse_args(argv) |
| |
| return options |
| |
| |
| def main(): |
| if _monitor_db_host_acquisition: |
| logging.info('Please set inline_host_acquisition=False in the shadow ' |
| 'config before starting the host scheduler.') |
| sys.exit(0) |
| try: |
| options = parse_arguments(sys.argv[1:]) |
| scheduler_lib.check_production_settings(options) |
| |
| # If server database is enabled, check if the server has role |
| # `host_scheduler`. If the server does not have host_scheduler role, |
| # exception will be raised and host scheduler will not continue to run. |
| if server_manager_utils.use_server_db(): |
| server_manager_utils.confirm_server_has_role(hostname='localhost', |
| role='host_scheduler') |
| |
| initialize(options.testing) |
| |
| with ts_mon_config.SetupTsMonGlobalState( |
| 'autotest_host_scheduler', |
| indirect=True, |
| debug_file=options.metrics_file, |
| ): |
| metrics.Counter('%s/start' % _METRICS_PREFIX).increment() |
| process_start_time = time.time() |
| host_scheduler = HostScheduler() |
| minimum_tick_sec = global_config.global_config.get_config_value( |
| 'SCHEDULER', 'host_scheduler_minimum_tick_sec', type=float) |
| while not _shutdown: |
| if _lifetime_expired(options.lifetime_hours, |
| process_start_time): |
| break |
| start = time.time() |
| host_scheduler.tick() |
| curr_tick_sec = time.time() - start |
| if (minimum_tick_sec > curr_tick_sec): |
| time.sleep(minimum_tick_sec - curr_tick_sec) |
| else: |
| time.sleep(0.0001) |
| logging.info('Shutdown request recieved. Bye! Bye!') |
| except server_manager_utils.ServerActionError: |
| # This error is expected when the server is not in primary status |
| # for host-scheduler role. Thus do not send email for it. |
| raise |
| except Exception: |
| metrics.Counter('%s/uncaught_exception' % _METRICS_PREFIX).increment() |
| raise |
| finally: |
| email_manager.manager.send_queued_emails() |
| if _db_manager: |
| _db_manager.disconnect() |
| |
| |
| def _lifetime_expired(lifetime_hours, process_start_time): |
| """Returns True if we've expired the process lifetime, False otherwise. |
| |
| Also sets the global _shutdown so that any background processes also take |
| the cue to exit. |
| """ |
| if lifetime_hours is None: |
| return False |
| if time.time() - process_start_time > lifetime_hours * 3600: |
| logging.info('Process lifetime %0.3f hours exceeded. Shutting down.', |
| lifetime_hours) |
| global _shutdown |
| _shutdown = True |
| return True |
| return False |
| |
| |
| if __name__ == '__main__': |
| main() |