| # Copyright 2015 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import logging |
| |
| from chromite.lib import metrics |
| |
| |
| DRONE_ACCESSIBILITY_METRIC = metrics.Boolean( |
| 'chromeos/autotest/scheduler/drone_accessibility') |
| |
| class DroneTaskQueueException(Exception): |
| """Generic task queue exception.""" |
| pass |
| |
| |
| class DroneTaskQueue(object): |
| """A manager to run queued tasks in drones and gather results from them.""" |
| |
| def __init__(self): |
| self.results = dict() |
| |
| |
| def get_results(self): |
| """Get a results dictionary keyed on drones. |
| |
| @return: A dictionary of return values from drones. |
| """ |
| results_copy = self.results.copy() |
| self.results.clear() |
| return results_copy |
| |
| |
| def execute(self, drones, wait=True): |
| """Invoke methods via SSH to a drone. |
| |
| @param drones: A list of drones with calls to execute. |
| @param wait: If True, this method will only return when all the drones |
| have returned the result of their respective invocations of |
| drone_utility. The `results` map will be cleared. |
| If False, the caller must clear the map before the next invocation |
| of `execute`, by calling `get_results`. |
| |
| @return: A dictionary keyed on the drones, containing a list of return |
| values from the execution of drone_utility. |
| |
| @raises DroneTaskQueueException: If the results map isn't empty at the |
| time of invocation. |
| """ |
| if self.results: |
| raise DroneTaskQueueException( |
| 'Cannot clobber results map: %s, it should be cleared ' |
| 'through get_results.' % self.results) |
| for drone in drones: |
| if not drone.get_calls(): |
| logging.debug("Drone %s has no work, skipping. crbug.com/853861" |
| , drone) |
| continue |
| metric_fields = { |
| 'drone_hostname': drone.hostname, |
| 'call_count': len(drone.get_calls()) |
| } |
| drone_reachable = True |
| try: |
| drone_results = drone.execute_queued_calls() |
| logging.debug("Drone %s scheduled. crbug.com/853861", drone) |
| except IOError: |
| drone_reachable = False |
| logging.error( |
| "Drone %s is not reachable by the scheduler.", drone) |
| continue |
| finally: |
| DRONE_ACCESSIBILITY_METRIC.set( |
| drone_reachable, fields=metric_fields) |
| if drone in self.results: |
| raise DroneTaskQueueException( |
| 'Task queue has recorded results for drone %s: %s' % |
| (drone, self.results)) |
| self.results[drone] = drone_results |
| return self.get_results() if wait else None |