blob: d6ae414234d3bbd6523c796377554e0413dfcd1e [file] [log] [blame] [edit]
# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Thread library for drone management.
This library contains a threaded task queue capable of starting, monitoring
and syncing threads across remote and localhost drones asynchronously. It also
contains a wrapper for standard python threads that records exceptions so they
can be re-raised in the thread manager. The api exposed by the threaded task
queue is as follows:
1. worker: The staticmethod executed by all worker threads.
2. execute: Takes a list of drones and invokes a worker thread per drone.
This method assumes that all drones have a queue of pending calls
for execution.
3. wait_on_drones: Waits for all worker threads started by execute to finish
and raises any exceptions as a consolidated DroneTaskQueueException.
4. get_results: Returns the results of all threads as a dictionary keyed
on the drones.
"""
import collections
import Queue
import threading
import logging
import common
from autotest_lib.client.common_lib.cros.graphite import autotest_stats
from autotest_lib.scheduler import drone_task_queue
class ExceptionRememberingThread(threading.Thread):
"""A wrapper around regular python threads that records exceptions."""
def run(self):
"""Wrapper around the thread's run method."""
try:
with autotest_stats.Timer(self.name):
super(ExceptionRememberingThread, self).run()
except Exception as self.err:
logging.error('%s raised an exception that will be re-raised by '
'the thread pool manager.', self.getName())
else:
self.err = None
class PersistentTimer(object):
"""A class to handle timers across local scopes."""
def __init__(self, name):
"""Initialize a persistent timer.
@param name: The name/key to insert timings under.
"""
self.name = name
self.timer = None
def start(self):
"""Create and start a new timer."""
self.timer = autotest_stats.Timer(self.name)
self.timer.start()
def stop(self):
"""Stop a previously started timer."""
try:
self.timer.stop()
except (AssertionError, AttributeError) as e:
logging.info('Stopping timer %s failed: %s', self.name, e)
finally:
self.timer = None
class ThreadedTaskQueue(drone_task_queue.DroneTaskQueue):
"""Threaded implementation of a drone task queue."""
result = collections.namedtuple('task', ['drone', 'results'])
def __init__(self, name='thread_queue'):
self.results_queue = Queue.Queue()
self.drone_threads = {}
self.name = name
# The persistent timer is used to measure net time spent
# refreshing all drones across 'execute' and 'get_results'.
self.timer = PersistentTimer(self.name)
@staticmethod
def worker(drone, results_queue):
"""Worker for task execution.
Execute calls queued against the given drone and place the return value
in results_queue.
@param drone: A drone with calls to execute.
@param results_queue: A queue, into which the worker places
ThreadedTaskQueue.result from the drone calls.
"""
logging.info('(Worker.%s) starting.', drone.hostname)
results_queue.put(ThreadedTaskQueue.result(
drone, drone.execute_queued_calls()))
logging.info('(Worker.%s) finished.', drone.hostname)
def wait_on_drones(self):
"""Wait on all threads that are currently refreshing a drone.
@raises DroneTaskQueueException: Consolidated exception for all
drone thread exceptions.
"""
if not self.drone_threads:
return
# TODO: Make this process more resilient. We can:
# 1. Timeout the join.
# 2. Kick out the exception/timeout drone.
# 3. Selectively retry exceptions.
# For now, it is compliant with the single threaded drone manager which
# will raise all drone_utility, ssh and drone_manager exceptions.
drone_exceptions = []
for drone, thread in self.drone_threads.iteritems():
tname = thread.getName()
logging.info('(Task Queue) Waiting for %s', tname)
thread.join()
if thread.err:
drone_exceptions.append((drone, thread.err))
logging.info('(Task Queue) All threads have returned, clearing map.')
self.drone_threads = {}
if not drone_exceptions:
return
exception_msg = ''
for drone, err in drone_exceptions:
exception_msg += ('Drone %s raised Exception %s\n' %
(drone.hostname, err))
raise drone_task_queue.DroneTaskQueueException(exception_msg)
def get_results(self):
"""Get a results dictionary keyed on the drones.
This method synchronously waits till all drone threads have returned
before checking for results. It is meant to be invoked in conjunction
with the 'execute' method, which creates a thread per drone.
@return: A dictionary of return values from the drones.
"""
self.wait_on_drones()
self.timer.stop()
results = {}
while not self.results_queue.empty():
drone_results = self.results_queue.get()
if drone_results.drone in results:
raise drone_task_queue.DroneTaskQueueException(
'Task queue has recorded results for drone %s: %s' %
(drone_results.drone, results))
results[drone_results.drone] = drone_results.results
return results
def execute(self, drones, wait=True):
"""Invoke a thread per drone, to execute drone_utility in parallel.
@param drones: A list of drones with calls to execute.
@param wait: If True, this method will only return when all the drones
have returned the result of their respective invocations of
drone_utility. The results_queue and drone_threads will be cleared.
If False, the caller must clear both the queue and the map before
the next invocation of 'execute', by calling 'get_results'.
@return: A dictionary keyed on the drones, containing a list of return
values from the execution of drone_utility.
@raises DroneManagerError: If the results queue or drone map isn't empty
at the time of invocation.
"""
if not self.results_queue.empty():
raise drone_task_queue.DroneTaskQueueException(
'Cannot clobber results queue: %s, it should be cleared '
'through get_results.' % self.results_queue)
if self.drone_threads:
raise drone_task_queue.DroneTaskQueueException(
'Cannot clobber thread map: %s, it should be cleared '
'through wait_on_drones' % self.drone_threads)
self.timer.start()
for drone in drones:
if not drone.get_calls():
continue
worker_thread = ExceptionRememberingThread(
target=ThreadedTaskQueue.worker,
args=(drone, self.results_queue))
# None of these threads are allowed to survive past the tick they
# were spawned in, and the scheduler won't die mid-tick, so none
# of the threads need to be daemons. However, if the scheduler does
# die unexpectedly we can just forsake the daemon threads.
self.drone_threads[drone] = worker_thread
# The name is only used for debugging
worker_thread.setName('%s.%s' %
(self.name, drone.hostname.replace('.', '_')))
worker_thread.daemon = True
worker_thread.start()
return self.get_results() if wait else None