| import heapq |
| import os |
| import logging |
| |
| import common |
| from autotest_lib.client.common_lib import error |
| from autotest_lib.client.common_lib import global_config |
| from autotest_lib.client.common_lib import utils |
| from autotest_lib.scheduler import drone_task_queue |
| from autotest_lib.scheduler import drone_utility |
| from autotest_lib.scheduler import drones |
| from autotest_lib.scheduler import scheduler_config |
| from autotest_lib.scheduler import thread_lib |
| |
| try: |
| from chromite.lib import metrics |
| except ImportError: |
| metrics = utils.metrics_mock |
| |
| |
| # results on drones will be placed under the drone_installation_directory in a |
| # directory with this name |
| _DRONE_RESULTS_DIR_SUFFIX = 'results' |
| |
| WORKING_DIRECTORY = object() # see execute_command() |
| |
| |
| AUTOSERV_PID_FILE = '.autoserv_execute' |
| CRASHINFO_PID_FILE = '.collect_crashinfo_execute' |
| PARSER_PID_FILE = '.parser_execute' |
| ARCHIVER_PID_FILE = '.archiver_execute' |
| |
| ALL_PIDFILE_NAMES = (AUTOSERV_PID_FILE, CRASHINFO_PID_FILE, PARSER_PID_FILE, |
| ARCHIVER_PID_FILE) |
| |
| _THREADED_DRONE_MANAGER = global_config.global_config.get_config_value( |
| scheduler_config.CONFIG_SECTION, 'threaded_drone_manager', |
| type=bool, default=True) |
| |
| HOSTS_JOB_SUBDIR = 'hosts/' |
| PARSE_LOG = '.parse.log' |
| ENABLE_ARCHIVING = global_config.global_config.get_config_value( |
| scheduler_config.CONFIG_SECTION, 'enable_archiving', type=bool) |
| |
| |
| class DroneManagerError(Exception): |
| pass |
| |
| |
| class CustomEquals(object): |
| def _id(self): |
| raise NotImplementedError |
| |
| |
| def __eq__(self, other): |
| if not isinstance(other, type(self)): |
| return NotImplemented |
| return self._id() == other._id() |
| |
| |
| def __ne__(self, other): |
| return not self == other |
| |
| |
| def __hash__(self): |
| return hash(self._id()) |
| |
| |
| class Process(CustomEquals): |
| def __init__(self, hostname, pid, ppid=None): |
| self.hostname = hostname |
| self.pid = pid |
| self.ppid = ppid |
| |
| def _id(self): |
| return (self.hostname, self.pid) |
| |
| |
| def __str__(self): |
| return '%s/%s' % (self.hostname, self.pid) |
| |
| |
| def __repr__(self): |
| return super(Process, self).__repr__() + '<%s>' % self |
| |
| |
| class PidfileId(CustomEquals): |
| def __init__(self, path): |
| self.path = path |
| |
| |
| def _id(self): |
| return self.path |
| |
| |
| def __str__(self): |
| return str(self.path) |
| |
| |
| class _PidfileInfo(object): |
| age = 0 |
| num_processes = None |
| |
| |
| class PidfileContents(object): |
| process = None |
| exit_status = None |
| num_tests_failed = None |
| |
| def is_invalid(self): |
| return False |
| |
| |
| def is_running(self): |
| return self.process and not self.exit_status |
| |
| |
| class InvalidPidfile(object): |
| process = None |
| exit_status = None |
| num_tests_failed = None |
| |
| |
| def __init__(self, error): |
| self.error = error |
| |
| |
| def is_invalid(self): |
| return True |
| |
| |
| def is_running(self): |
| return False |
| |
| |
| def __str__(self): |
| return self.error |
| |
| |
| class _DroneHeapWrapper(object): |
| """Wrapper to compare drones based on used_capacity(). |
| |
| These objects can be used to keep a heap of drones by capacity. |
| """ |
| def __init__(self, drone): |
| self.drone = drone |
| |
| |
| def __cmp__(self, other): |
| assert isinstance(other, _DroneHeapWrapper) |
| return cmp(self.drone.used_capacity(), other.drone.used_capacity()) |
| |
| |
| class DroneManager(object): |
| """ |
| This class acts as an interface from the scheduler to drones, whether it be |
| only a single "drone" for localhost or multiple remote drones. |
| |
| All paths going into and out of this class are relative to the full results |
| directory, except for those returns by absolute_path(). |
| """ |
| |
| |
| # Minimum time to wait before next email |
| # about a drone hitting process limit is sent. |
| NOTIFY_INTERVAL = 60 * 60 * 24 # one day |
| _STATS_KEY = 'drone_manager' |
| |
| |
| |
| def __init__(self): |
| # absolute path of base results dir |
| self._results_dir = None |
| # holds Process objects |
| self._process_set = set() |
| # holds the list of all processes running on all drones |
| self._all_processes = {} |
| # maps PidfileId to PidfileContents |
| self._pidfiles = {} |
| # same as _pidfiles |
| self._pidfiles_second_read = {} |
| # maps PidfileId to _PidfileInfo |
| self._registered_pidfile_info = {} |
| # used to generate unique temporary paths |
| self._temporary_path_counter = 0 |
| # maps hostname to Drone object |
| self._drones = {} |
| self._results_drone = None |
| # maps results dir to dict mapping file path to contents |
| self._attached_files = {} |
| # heapq of _DroneHeapWrappers |
| self._drone_queue = [] |
| # A threaded task queue used to refresh drones asynchronously. |
| if _THREADED_DRONE_MANAGER: |
| self._refresh_task_queue = thread_lib.ThreadedTaskQueue( |
| name='%s.refresh_queue' % self._STATS_KEY) |
| else: |
| self._refresh_task_queue = drone_task_queue.DroneTaskQueue() |
| |
| |
| def initialize(self, base_results_dir, drone_hostnames, |
| results_repository_hostname): |
| self._results_dir = base_results_dir |
| |
| for hostname in drone_hostnames: |
| self._add_drone(hostname) |
| |
| if not self._drones: |
| # all drones failed to initialize |
| raise DroneManagerError('No valid drones found') |
| |
| self.refresh_drone_configs() |
| |
| logging.info('Using results repository on %s', |
| results_repository_hostname) |
| self._results_drone = drones.get_drone(results_repository_hostname) |
| results_installation_dir = global_config.global_config.get_config_value( |
| scheduler_config.CONFIG_SECTION, |
| 'results_host_installation_directory', default=None) |
| if results_installation_dir: |
| self._results_drone.set_autotest_install_dir( |
| results_installation_dir) |
| # don't initialize() the results drone - we don't want to clear out any |
| # directories and we don't need to kill any processes |
| |
| |
| def reinitialize_drones(self): |
| for drone in self.get_drones(): |
| with metrics.SecondsTimer( |
| 'chromeos/autotest/drone_manager/' |
| 'reinitialize_drones_duration', |
| fields={'drone': drone.hostname}): |
| drone.call('initialize', self._results_dir) |
| |
| |
| def shutdown(self): |
| for drone in self.get_drones(): |
| drone.shutdown() |
| |
| |
| def _get_max_pidfile_refreshes(self): |
| """ |
| Normally refresh() is called on every monitor_db.Dispatcher.tick(). |
| |
| @returns: The number of refresh() calls before we forget a pidfile. |
| """ |
| pidfile_timeout = global_config.global_config.get_config_value( |
| scheduler_config.CONFIG_SECTION, 'max_pidfile_refreshes', |
| type=int, default=2000) |
| return pidfile_timeout |
| |
| |
| def _add_drone(self, hostname): |
| """ |
| Add drone. |
| |
| Catches AutoservRunError if the drone fails initialization and does not |
| add it to the list of usable drones. |
| |
| @param hostname: Hostname of the drone we are trying to add. |
| """ |
| logging.info('Adding drone %s' % hostname) |
| drone = drones.get_drone(hostname) |
| if drone: |
| try: |
| drone.call('initialize', self.absolute_path('')) |
| except error.AutoservRunError as e: |
| logging.error('Failed to initialize drone %s with error: %s', |
| hostname, e) |
| return |
| self._drones[drone.hostname] = drone |
| |
| |
| def _remove_drone(self, hostname): |
| self._drones.pop(hostname, None) |
| |
| |
| def refresh_drone_configs(self): |
| """ |
| Reread global config options for all drones. |
| """ |
| # Import server_manager_utils is delayed rather than at the beginning of |
| # this module. The reason is that test_that imports drone_manager when |
| # importing autoserv_utils. The import is done before test_that setup |
| # django (test_that only setup django in setup_local_afe, since it's |
| # not needed when test_that runs the test in a lab duts through :lab: |
| # option. Therefore, if server_manager_utils is imported at the |
| # beginning of this module, test_that will fail since django is not |
| # setup yet. |
| from autotest_lib.site_utils import server_manager_utils |
| config = global_config.global_config |
| section = scheduler_config.CONFIG_SECTION |
| config.parse_config_file() |
| for hostname, drone in self._drones.iteritems(): |
| if server_manager_utils.use_server_db(): |
| server = server_manager_utils.get_servers(hostname=hostname)[0] |
| attributes = dict([(a.attribute, a.value) |
| for a in server.attributes.all()]) |
| drone.enabled = ( |
| int(attributes.get('disabled', 0)) == 0) |
| drone.max_processes = int( |
| attributes.get( |
| 'max_processes', |
| scheduler_config.config.max_processes_per_drone)) |
| allowed_users = attributes.get('users', None) |
| else: |
| disabled = config.get_config_value( |
| section, '%s_disabled' % hostname, default='') |
| drone.enabled = not bool(disabled) |
| drone.max_processes = config.get_config_value( |
| section, '%s_max_processes' % hostname, type=int, |
| default=scheduler_config.config.max_processes_per_drone) |
| |
| allowed_users = config.get_config_value( |
| section, '%s_users' % hostname, default=None) |
| if allowed_users: |
| drone.allowed_users = set(allowed_users.split()) |
| else: |
| drone.allowed_users = None |
| logging.info('Drone %s.max_processes: %s', hostname, |
| drone.max_processes) |
| logging.info('Drone %s.enabled: %s', hostname, drone.enabled) |
| logging.info('Drone %s.allowed_users: %s', hostname, |
| drone.allowed_users) |
| |
| self._reorder_drone_queue() # max_processes may have changed |
| # Clear notification record about reaching max_processes limit. |
| self._notify_record = {} |
| |
| |
| def get_drones(self): |
| return self._drones.itervalues() |
| |
| |
| def cleanup_orphaned_containers(self): |
| """Queue cleanup_orphaned_containers call at each drone. |
| """ |
| for drone in self._drones.values(): |
| logging.info('Queue cleanup_orphaned_containers at %s', |
| drone.hostname) |
| drone.queue_call('cleanup_orphaned_containers') |
| |
| |
| def _get_drone_for_process(self, process): |
| return self._drones[process.hostname] |
| |
| |
| def _get_drone_for_pidfile_id(self, pidfile_id): |
| pidfile_contents = self.get_pidfile_contents(pidfile_id) |
| if pidfile_contents.process is None: |
| raise DroneManagerError('Fail to get a drone due to empty pidfile') |
| return self._get_drone_for_process(pidfile_contents.process) |
| |
| |
| def get_drone_for_pidfile_id(self, pidfile_id): |
| """Public API for luciferlib. |
| |
| @param pidfile_id: PidfileId instance. |
| """ |
| return self._get_drone_for_pidfile_id(pidfile_id) |
| |
| |
| def _drop_old_pidfiles(self): |
| # use items() since the dict is modified in unregister_pidfile() |
| for pidfile_id, info in self._registered_pidfile_info.items(): |
| if info.age > self._get_max_pidfile_refreshes(): |
| logging.warning('dropping leaked pidfile %s', pidfile_id) |
| self.unregister_pidfile(pidfile_id) |
| else: |
| info.age += 1 |
| |
| |
| def _reset(self): |
| self._process_set = set() |
| self._all_processes = {} |
| self._pidfiles = {} |
| self._pidfiles_second_read = {} |
| self._drone_queue = [] |
| |
| |
| def _parse_pidfile(self, drone, raw_contents): |
| """Parse raw pidfile contents. |
| |
| @param drone: The drone on which this pidfile was found. |
| @param raw_contents: The raw contents of a pidfile, eg: |
| "pid\nexit_staus\nnum_tests_failed\n". |
| """ |
| contents = PidfileContents() |
| if not raw_contents: |
| return contents |
| lines = raw_contents.splitlines() |
| if len(lines) > 3: |
| return InvalidPidfile('Corrupt pid file (%d lines):\n%s' % |
| (len(lines), lines)) |
| try: |
| pid = int(lines[0]) |
| contents.process = Process(drone.hostname, pid) |
| # if len(lines) == 2, assume we caught Autoserv between writing |
| # exit_status and num_failed_tests, so just ignore it and wait for |
| # the next cycle |
| if len(lines) == 3: |
| contents.exit_status = int(lines[1]) |
| contents.num_tests_failed = int(lines[2]) |
| except ValueError, exc: |
| return InvalidPidfile('Corrupt pid file: ' + str(exc.args)) |
| |
| return contents |
| |
| |
| def _process_pidfiles(self, drone, pidfiles, store_in_dict): |
| for pidfile_path, contents in pidfiles.iteritems(): |
| pidfile_id = PidfileId(pidfile_path) |
| contents = self._parse_pidfile(drone, contents) |
| store_in_dict[pidfile_id] = contents |
| |
| |
| def _add_process(self, drone, process_info): |
| process = Process(drone.hostname, int(process_info['pid']), |
| int(process_info['ppid'])) |
| self._process_set.add(process) |
| |
| |
| def _add_autoserv_process(self, drone, process_info): |
| assert process_info['comm'] == 'autoserv' |
| # only root autoserv processes have pgid == pid |
| if process_info['pgid'] != process_info['pid']: |
| return |
| self._add_process(drone, process_info) |
| |
| |
| def _enqueue_drone(self, drone): |
| heapq.heappush(self._drone_queue, _DroneHeapWrapper(drone)) |
| |
| |
| def _reorder_drone_queue(self): |
| heapq.heapify(self._drone_queue) |
| |
| |
| def reorder_drone_queue(self): |
| """Reorder drone queue according to modified process counts. |
| |
| This public API is exposed for luciferlib to wrap. |
| """ |
| self._reorder_drone_queue() |
| |
| |
| def _compute_active_processes(self, drone): |
| drone.active_processes = 0 |
| for pidfile_id, contents in self._pidfiles.iteritems(): |
| is_running = contents.exit_status is None |
| on_this_drone = (contents.process |
| and contents.process.hostname == drone.hostname) |
| if is_running and on_this_drone: |
| info = self._registered_pidfile_info[pidfile_id] |
| if info.num_processes is not None: |
| drone.active_processes += info.num_processes |
| |
| metrics.Gauge('chromeos/autotest/drone/active_processes').set( |
| drone.active_processes, |
| fields={'drone_hostname': drone.hostname}) |
| |
| |
| def _check_drone_process_limit(self, drone): |
| """ |
| Notify if the number of processes on |drone| is approaching limit. |
| |
| @param drone: A Drone object. |
| """ |
| try: |
| percent = float(drone.active_processes) / drone.max_processes |
| except ZeroDivisionError: |
| percent = 100 |
| metrics.Float('chromeos/autotest/drone/active_process_percentage' |
| ).set(percent, fields={'drone_hostname': drone.hostname}) |
| |
| def trigger_refresh(self): |
| """Triggers a drone manager refresh. |
| |
| @raises DroneManagerError: If a drone has un-executed calls. |
| Since they will get clobbered when we queue refresh calls. |
| """ |
| self._reset() |
| self._drop_old_pidfiles() |
| pidfile_paths = [pidfile_id.path |
| for pidfile_id in self._registered_pidfile_info] |
| drones = list(self.get_drones()) |
| for drone in drones: |
| calls = drone.get_calls() |
| if calls: |
| raise DroneManagerError('Drone %s has un-executed calls: %s ' |
| 'which might get corrupted through ' |
| 'this invocation' % |
| (drone, [str(call) for call in calls])) |
| drone.queue_call('refresh', pidfile_paths) |
| logging.info("Invoking drone refresh.") |
| with metrics.SecondsTimer( |
| 'chromeos/autotest/drone_manager/trigger_refresh_duration'): |
| self._refresh_task_queue.execute(drones, wait=False) |
| |
| |
| def sync_refresh(self): |
| """Complete the drone refresh started by trigger_refresh. |
| |
| Waits for all drone threads then refreshes internal datastructures |
| with drone process information. |
| """ |
| |
| # This gives us a dictionary like what follows: |
| # {drone: [{'pidfiles': (raw contents of pidfile paths), |
| # 'autoserv_processes': (autoserv process info from ps), |
| # 'all_processes': (all process info from ps), |
| # 'parse_processes': (parse process infor from ps), |
| # 'pidfile_second_read': (pidfile contents, again),}] |
| # drone2: ...} |
| # The values of each drone are only a list because this adheres to the |
| # drone utility interface (each call is executed and its results are |
| # places in a list, but since we never couple the refresh calls with |
| # any other call, this list will always contain a single dict). |
| with metrics.SecondsTimer( |
| 'chromeos/autotest/drone_manager/sync_refresh_duration'): |
| all_results = self._refresh_task_queue.get_results() |
| logging.info("Drones refreshed.") |
| |
| # The loop below goes through and parses pidfile contents. Pidfiles |
| # are used to track autoserv execution, and will always contain < 3 |
| # lines of the following: pid, exit code, number of tests. Each pidfile |
| # is identified by a PidfileId object, which contains a unique pidfile |
| # path (unique because it contains the job id) making it hashable. |
| # All pidfiles are stored in the drone managers _pidfiles dict as: |
| # {pidfile_id: pidfile_contents(Process(drone, pid), |
| # exit_code, num_tests_failed)} |
| # In handle agents, each agent knows its pidfile_id, and uses this |
| # to retrieve the refreshed contents of its pidfile via the |
| # PidfileRunMonitor (through its tick) before making decisions. If |
| # the agent notices that its process has exited, it unregisters the |
| # pidfile from the drone_managers._registered_pidfile_info dict |
| # through its epilog. |
| for drone, results_list in all_results.iteritems(): |
| results = results_list[0] |
| drone_hostname = drone.hostname.replace('.', '_') |
| |
| for process_info in results['all_processes']: |
| if process_info['comm'] == 'autoserv': |
| self._add_autoserv_process(drone, process_info) |
| drone_pid = drone.hostname, int(process_info['pid']) |
| self._all_processes[drone_pid] = process_info |
| |
| for process_info in results['parse_processes']: |
| self._add_process(drone, process_info) |
| |
| self._process_pidfiles(drone, results['pidfiles'], self._pidfiles) |
| self._process_pidfiles(drone, results['pidfiles_second_read'], |
| self._pidfiles_second_read) |
| |
| self._compute_active_processes(drone) |
| if drone.enabled: |
| self._enqueue_drone(drone) |
| self._check_drone_process_limit(drone) |
| |
| |
| def refresh(self): |
| """Refresh all drones.""" |
| with metrics.SecondsTimer( |
| 'chromeos/autotest/drone_manager/refresh_duration'): |
| self.trigger_refresh() |
| self.sync_refresh() |
| |
| |
| @metrics.SecondsTimerDecorator( |
| 'chromeos/autotest/drone_manager/execute_actions_duration') |
| def execute_actions(self): |
| """ |
| Called at the end of a scheduler cycle to execute all queued actions |
| on drones. |
| """ |
| # Invoke calls queued on all drones since the last call to execute |
| # and wait for them to return. |
| if _THREADED_DRONE_MANAGER: |
| thread_lib.ThreadedTaskQueue( |
| name='%s.execute_queue' % self._STATS_KEY).execute( |
| self._drones.values()) |
| else: |
| drone_task_queue.DroneTaskQueue().execute(self._drones.values()) |
| |
| try: |
| self._results_drone.execute_queued_calls() |
| except error.AutoservError: |
| m = 'chromeos/autotest/errors/results_repository_failed' |
| metrics.Counter(m).increment( |
| fields={'drone_hostname': self._results_drone.hostname}) |
| self._results_drone.clear_call_queue() |
| |
| |
| def get_orphaned_autoserv_processes(self): |
| """ |
| Returns a set of Process objects for orphaned processes only. |
| """ |
| return set(process for process in self._process_set |
| if process.ppid == 1) |
| |
| |
| def kill_process(self, process): |
| """ |
| Kill the given process. |
| """ |
| logging.info('killing %s', process) |
| drone = self._get_drone_for_process(process) |
| drone.queue_kill_process(process) |
| |
| |
| def _ensure_directory_exists(self, path): |
| if not os.path.exists(path): |
| os.makedirs(path) |
| |
| |
| def total_running_processes(self): |
| return sum(drone.active_processes for drone in self.get_drones()) |
| |
| |
| def max_runnable_processes(self, username, drone_hostnames_allowed): |
| """ |
| Return the maximum number of processes that can be run (in a single |
| execution) given the current load on drones. |
| @param username: login of user to run a process. may be None. |
| @param drone_hostnames_allowed: list of drones that can be used. May be |
| None |
| """ |
| usable_drone_wrappers = [wrapper for wrapper in self._drone_queue |
| if wrapper.drone.usable_by(username) and |
| (drone_hostnames_allowed is None or |
| wrapper.drone.hostname in |
| drone_hostnames_allowed)] |
| if not usable_drone_wrappers: |
| # all drones disabled or inaccessible |
| return 0 |
| runnable_processes = [ |
| wrapper.drone.max_processes - wrapper.drone.active_processes |
| for wrapper in usable_drone_wrappers] |
| return max([0] + runnable_processes) |
| |
| |
| def _least_loaded_drone(self, drones): |
| return min(drones, key=lambda d: d.used_capacity()) |
| |
| |
| def pick_drone_to_use(self, num_processes=1): |
| """Return a drone to use. |
| |
| Various options can be passed to optimize drone selection. |
| |
| num_processes is the number of processes the drone is intended |
| to run. |
| |
| This public API is exposed for luciferlib to wrap. |
| |
| Returns a drone instance (see drones.py). |
| """ |
| return self._choose_drone_for_execution( |
| num_processes=num_processes, |
| username=None, # Always allow all drones |
| drone_hostnames_allowed=None, # Always allow all drones |
| ) |
| |
| |
| def _choose_drone_for_execution(self, num_processes, username, |
| drone_hostnames_allowed): |
| """Choose a drone to execute command. |
| |
| @param num_processes: Number of processes needed for execution. |
| @param username: Name of the user to execute the command. |
| @param drone_hostnames_allowed: A list of names of drone allowed. |
| |
| @return: A drone object to be used for execution. |
| """ |
| # cycle through drones is order of increasing used capacity until |
| # we find one that can handle these processes |
| checked_drones = [] |
| usable_drones = [] |
| drone_to_use = None |
| while self._drone_queue: |
| drone = heapq.heappop(self._drone_queue).drone |
| checked_drones.append(drone) |
| logging.info('Checking drone %s', drone.hostname) |
| if not drone.usable_by(username): |
| continue |
| |
| drone_allowed = (drone_hostnames_allowed is None |
| or drone.hostname in drone_hostnames_allowed) |
| if not drone_allowed: |
| logging.debug('Drone %s not allowed: ', drone.hostname) |
| continue |
| |
| usable_drones.append(drone) |
| |
| if drone.active_processes + num_processes <= drone.max_processes: |
| drone_to_use = drone |
| break |
| logging.info('Drone %s has %d active + %s requested > %s max', |
| drone.hostname, drone.active_processes, num_processes, |
| drone.max_processes) |
| |
| if not drone_to_use and usable_drones: |
| # Drones are all over loaded, pick the one with least load. |
| drone_summary = ','.join('%s %s/%s' % (drone.hostname, |
| drone.active_processes, |
| drone.max_processes) |
| for drone in usable_drones) |
| logging.error('No drone has capacity to handle %d processes (%s) ' |
| 'for user %s', num_processes, drone_summary, username) |
| drone_to_use = self._least_loaded_drone(usable_drones) |
| |
| # refill _drone_queue |
| for drone in checked_drones: |
| self._enqueue_drone(drone) |
| |
| return drone_to_use |
| |
| |
| def _substitute_working_directory_into_command(self, command, |
| working_directory): |
| for i, item in enumerate(command): |
| if item is WORKING_DIRECTORY: |
| command[i] = working_directory |
| |
| |
| def execute_command(self, command, working_directory, pidfile_name, |
| num_processes, log_file=None, paired_with_pidfile=None, |
| username=None, drone_hostnames_allowed=None): |
| """ |
| Execute the given command, taken as an argv list. |
| |
| @param command: command to execute as a list. if any item is |
| WORKING_DIRECTORY, the absolute path to the working directory |
| will be substituted for it. |
| @param working_directory: directory in which the pidfile will be written |
| @param pidfile_name: name of the pidfile this process will write |
| @param num_processes: number of processes to account for from this |
| execution |
| @param log_file (optional): path (in the results repository) to hold |
| command output. |
| @param paired_with_pidfile (optional): a PidfileId for an |
| already-executed process; the new process will execute on the |
| same drone as the previous process. |
| @param username (optional): login of the user responsible for this |
| process. |
| @param drone_hostnames_allowed (optional): hostnames of the drones that |
| this command is allowed to |
| execute on |
| """ |
| abs_working_directory = self.absolute_path(working_directory) |
| if not log_file: |
| log_file = self.get_temporary_path('execute') |
| log_file = self.absolute_path(log_file) |
| |
| self._substitute_working_directory_into_command(command, |
| abs_working_directory) |
| |
| if paired_with_pidfile: |
| drone = self._get_drone_for_pidfile_id(paired_with_pidfile) |
| else: |
| drone = self._choose_drone_for_execution( |
| num_processes, username, drone_hostnames_allowed) |
| |
| if not drone: |
| raise DroneManagerError('command failed; no drones available: %s' |
| % command) |
| |
| logging.info("command = %s", command) |
| logging.info('log file = %s:%s', drone.hostname, log_file) |
| self._write_attached_files(working_directory, drone) |
| drone.queue_call('execute_command', command, abs_working_directory, |
| log_file, pidfile_name) |
| drone.active_processes += num_processes |
| self._reorder_drone_queue() |
| |
| pidfile_path = os.path.join(abs_working_directory, pidfile_name) |
| pidfile_id = PidfileId(pidfile_path) |
| self.register_pidfile(pidfile_id) |
| self._registered_pidfile_info[pidfile_id].num_processes = num_processes |
| return pidfile_id |
| |
| |
| def get_pidfile_id_from(self, execution_tag, pidfile_name): |
| path = os.path.join(self.absolute_path(execution_tag), pidfile_name) |
| return PidfileId(path) |
| |
| |
| def register_pidfile(self, pidfile_id): |
| """ |
| Indicate that the DroneManager should look for the given pidfile when |
| refreshing. |
| """ |
| if pidfile_id not in self._registered_pidfile_info: |
| logging.info('monitoring pidfile %s', pidfile_id) |
| self._registered_pidfile_info[pidfile_id] = _PidfileInfo() |
| self._reset_pidfile_age(pidfile_id) |
| |
| |
| def _reset_pidfile_age(self, pidfile_id): |
| if pidfile_id in self._registered_pidfile_info: |
| self._registered_pidfile_info[pidfile_id].age = 0 |
| |
| |
| def unregister_pidfile(self, pidfile_id): |
| if pidfile_id in self._registered_pidfile_info: |
| logging.info('forgetting pidfile %s', pidfile_id) |
| del self._registered_pidfile_info[pidfile_id] |
| |
| |
| def declare_process_count(self, pidfile_id, num_processes): |
| self._registered_pidfile_info[pidfile_id].num_processes = num_processes |
| |
| |
| def get_pidfile_contents(self, pidfile_id, use_second_read=False): |
| """ |
| Retrieve a PidfileContents object for the given pidfile_id. If |
| use_second_read is True, use results that were read after the processes |
| were checked, instead of before. |
| """ |
| self._reset_pidfile_age(pidfile_id) |
| if use_second_read: |
| pidfile_map = self._pidfiles_second_read |
| else: |
| pidfile_map = self._pidfiles |
| return pidfile_map.get(pidfile_id, PidfileContents()) |
| |
| |
| def is_process_running(self, process): |
| """ |
| Check if the given process is in the running process list. |
| """ |
| if process in self._process_set: |
| return True |
| |
| drone_pid = process.hostname, process.pid |
| if drone_pid in self._all_processes: |
| logging.error('Process %s found, but not an autoserv process. ' |
| 'Is %s', process, self._all_processes[drone_pid]) |
| return True |
| |
| return False |
| |
| |
| def get_temporary_path(self, base_name): |
| """ |
| Get a new temporary path guaranteed to be unique across all drones |
| for this scheduler execution. |
| """ |
| self._temporary_path_counter += 1 |
| return os.path.join(drone_utility._TEMPORARY_DIRECTORY, |
| '%s.%s' % (base_name, self._temporary_path_counter)) |
| |
| |
| def absolute_path(self, path, on_results_repository=False): |
| if on_results_repository: |
| base_dir = self._results_dir |
| else: |
| base_dir = os.path.join(drones.AUTOTEST_INSTALL_DIR, |
| _DRONE_RESULTS_DIR_SUFFIX) |
| return os.path.join(base_dir, path) |
| |
| |
| def _copy_results_helper(self, process, source_path, destination_path, |
| to_results_repository=False): |
| logging.debug('_copy_results_helper. process: %s, source_path: %s, ' |
| 'destination_path: %s, to_results_repository: %s', |
| process, source_path, destination_path, |
| to_results_repository) |
| full_source = self.absolute_path(source_path) |
| full_destination = self.absolute_path( |
| destination_path, on_results_repository=to_results_repository) |
| source_drone = self._get_drone_for_process(process) |
| if to_results_repository: |
| source_drone.send_file_to(self._results_drone, full_source, |
| full_destination, can_fail=True) |
| else: |
| source_drone.queue_call('copy_file_or_directory', full_source, |
| full_destination) |
| |
| |
| def copy_to_results_repository(self, process, source_path, |
| destination_path=None): |
| """ |
| Copy results from the given process at source_path to destination_path |
| in the results repository. |
| |
| This will only copy the results back for Special Agent Tasks (Cleanup, |
| Verify, Repair) that reside in the hosts/ subdirectory of results if |
| the copy_task_results_back flag has been set to True inside |
| global_config.ini |
| |
| It will also only copy .parse.log files back to the scheduler if the |
| copy_parse_log_back flag in global_config.ini has been set to True. |
| """ |
| if not ENABLE_ARCHIVING: |
| return |
| copy_task_results_back = global_config.global_config.get_config_value( |
| scheduler_config.CONFIG_SECTION, 'copy_task_results_back', |
| type=bool) |
| copy_parse_log_back = global_config.global_config.get_config_value( |
| scheduler_config.CONFIG_SECTION, 'copy_parse_log_back', |
| type=bool) |
| special_task = source_path.startswith(HOSTS_JOB_SUBDIR) |
| parse_log = source_path.endswith(PARSE_LOG) |
| if (copy_task_results_back or not special_task) and ( |
| copy_parse_log_back or not parse_log): |
| if destination_path is None: |
| destination_path = source_path |
| self._copy_results_helper(process, source_path, destination_path, |
| to_results_repository=True) |
| |
| def _copy_to_results_repository(self, process, source_path, |
| destination_path=None): |
| """ |
| Copy results from the given process at source_path to destination_path |
| in the results repository, without special task handling. |
| """ |
| if destination_path is None: |
| destination_path = source_path |
| self._copy_results_helper(process, source_path, destination_path, |
| to_results_repository=True) |
| |
| |
| def copy_results_on_drone(self, process, source_path, destination_path): |
| """ |
| Copy a results directory from one place to another on the drone. |
| """ |
| self._copy_results_helper(process, source_path, destination_path) |
| |
| |
| def _write_attached_files(self, results_dir, drone): |
| attached_files = self._attached_files.pop(results_dir, {}) |
| for file_path, contents in attached_files.iteritems(): |
| drone.queue_call('write_to_file', self.absolute_path(file_path), |
| contents) |
| |
| |
| def attach_file_to_execution(self, results_dir, file_contents, |
| file_path=None): |
| """ |
| When the process for the results directory is executed, the given file |
| contents will be placed in a file on the drone. Returns the path at |
| which the file will be placed. |
| """ |
| if not file_path: |
| file_path = self.get_temporary_path('attach') |
| files_for_execution = self._attached_files.setdefault(results_dir, {}) |
| assert file_path not in files_for_execution |
| files_for_execution[file_path] = file_contents |
| return file_path |
| |
| |
| def write_lines_to_file(self, file_path, lines, paired_with_process=None): |
| """ |
| Write the given lines (as a list of strings) to a file. If |
| paired_with_process is given, the file will be written on the drone |
| running the given Process. Otherwise, the file will be written to the |
| results repository. |
| """ |
| file_contents = '\n'.join(lines) + '\n' |
| if paired_with_process: |
| drone = self._get_drone_for_process(paired_with_process) |
| on_results_repository = False |
| else: |
| drone = self._results_drone |
| on_results_repository = True |
| full_path = self.absolute_path( |
| file_path, on_results_repository=on_results_repository) |
| drone.queue_call('write_to_file', full_path, file_contents) |
| |
| |
| _the_instance = None |
| |
| def instance(): |
| if _the_instance is None: |
| _set_instance(DroneManager()) |
| return _the_instance |
| |
| |
| def _set_instance(instance): # usable for testing |
| global _the_instance |
| _the_instance = instance |