blob: 234c81a83b82e5bf1bc4f866e9c3c2eb6192baa2 [file] [log] [blame]
# Lint as: python2, python3
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
__author__ = """Copyright Andy Whitcroft, Martin J. Bligh - 2006, 2007"""
import sys, os, signal, time, six.moves.cPickle, logging
from autotest_lib.client.common_lib import error, utils
from autotest_lib.client.common_lib.cros import retry
from six.moves import zip
# entry points that use subcommand must set this to their logging manager
# to get log redirection for subcommands
logging_manager_object = None
def parallel(tasklist, timeout=None, return_results=False):
"""
Run a set of predefined subcommands in parallel.
@param tasklist: A list of subcommand instances to execute.
@param timeout: Number of seconds after which the commands should timeout.
@param return_results: If True instead of an AutoServError being raised
on any error a list of the results|exceptions from the tasks is
returned. [default: False]
"""
run_error = False
for task in tasklist:
task.fork_start()
remaining_timeout = None
if timeout:
endtime = time.time() + timeout
results = []
for task in tasklist:
if timeout:
remaining_timeout = max(endtime - time.time(), 1)
try:
status = task.fork_waitfor(timeout=remaining_timeout)
except error.AutoservSubcommandError:
run_error = True
else:
if status != 0:
run_error = True
results.append(six.moves.cPickle.load(task.result_pickle))
task.result_pickle.close()
if return_results:
return results
elif run_error:
message = 'One or more subcommands failed:\n'
for task, result in zip(tasklist, results):
message += 'task: %s returned/raised: %r\n' % (task, result)
raise error.AutoservError(message)
def parallel_simple(function, arglist, subdir_name_constructor=lambda x: str(x),
log=True, timeout=None, return_results=False):
"""
Each element in the arglist used to create a subcommand object,
where that arg is used both as a subdir name, and a single argument
to pass to "function".
We create a subcommand object for each element in the list,
then execute those subcommand objects in parallel.
NOTE: As an optimization, if len(arglist) == 1 a subcommand is not used.
@param function: A callable to run in parallel once per arg in arglist.
@param arglist: A list of arguments to be used one per subcommand
@param subdir_name_constructor: A function that returns a name for the
result sub-directory created per subcommand.
Signature is:
subdir_name_constructor(arg)
where arg is the argument passed to function.
@param log: If True, output will be written to output in a subdirectory
named after each subcommand's arg.
@param timeout: Number of seconds after which the commands should timeout.
@param return_results: If True instead of an AutoServError being raised
on any error a list of the results|exceptions from the function
called on each arg is returned. [default: False]
@returns None or a list of results/exceptions.
"""
if not arglist:
logging.warning('parallel_simple was called with an empty arglist, '
'did you forget to pass in a list of machines?')
# Bypass the multithreading if only one machine.
if len(arglist) == 1:
arg = arglist[0]
if return_results:
try:
result = function(arg)
except Exception as e:
return [e]
return [result]
else:
function(arg)
return
subcommands = []
for arg in arglist:
args = [arg]
subdir = subdir_name_constructor(arg) if log else None
subcommands.append(subcommand(function, args, subdir))
return parallel(subcommands, timeout, return_results=return_results)
class subcommand(object):
fork_hooks, join_hooks = [], []
def __init__(self, func, args, subdir = None):
# func(args) - the subcommand to run
# subdir - the subdirectory to log results in
if subdir:
self.subdir = os.path.abspath(subdir)
if not os.path.exists(self.subdir):
os.mkdir(self.subdir)
self.debug = os.path.join(self.subdir, 'debug')
if not os.path.exists(self.debug):
os.mkdir(self.debug)
else:
self.subdir = None
self.debug = None
self.func = func
self.args = args
self.pid = None
self.returncode = None
def __str__(self):
return str('subcommand(func=%s, args=%s, subdir=%s)' %
(self.func, self.args, self.subdir))
@classmethod
def register_fork_hook(cls, hook):
""" Register a function to be called from the child process after
forking. """
cls.fork_hooks.append(hook)
@classmethod
def register_join_hook(cls, hook):
""" Register a function to be called when from the child process
just before the child process terminates (joins to the parent). """
cls.join_hooks.append(hook)
def redirect_output(self):
if self.subdir and logging_manager_object:
tag = os.path.basename(self.subdir)
logging_manager_object.tee_redirect_debug_dir(self.debug, tag=tag)
def fork_start(self):
sys.stdout.flush()
sys.stderr.flush()
r, w = os.pipe()
self.returncode = None
self.pid = os.fork()
if self.pid: # I am the parent
os.close(w)
self.result_pickle = os.fdopen(r, 'r')
return
else:
os.close(r)
# We are the child from this point on. Never return.
signal.signal(signal.SIGTERM, signal.SIG_DFL) # clear handler
if self.subdir:
os.chdir(self.subdir)
self.redirect_output()
try:
for hook in self.fork_hooks:
hook(self)
result = self.func(*self.args)
os.write(w, six.moves.cPickle.dumps(result, six.moves.cPickle.HIGHEST_PROTOCOL))
exit_code = 0
except Exception as e:
logging.exception('function failed')
exit_code = 1
os.write(w, six.moves.cPickle.dumps(e, six.moves.cPickle.HIGHEST_PROTOCOL))
os.close(w)
try:
for hook in self.join_hooks:
hook(self)
finally:
sys.stdout.flush()
sys.stderr.flush()
os._exit(exit_code)
def _handle_exitstatus(self, sts):
"""
This is partially borrowed from subprocess.Popen.
"""
if os.WIFSIGNALED(sts):
self.returncode = -os.WTERMSIG(sts)
elif os.WIFEXITED(sts):
self.returncode = os.WEXITSTATUS(sts)
else:
# Should never happen
raise RuntimeError("Unknown child exit status!")
if self.returncode != 0:
print("subcommand failed pid %d" % self.pid)
print("%s" % (self.func,))
print("rc=%d" % self.returncode)
print()
if self.debug:
stderr_file = os.path.join(self.debug, 'autoserv.stderr')
if os.path.exists(stderr_file):
for line in open(stderr_file).readlines():
print(line, end=' ')
print("\n--------------------------------------------\n")
raise error.AutoservSubcommandError(self.func, self.returncode)
def poll(self):
"""
This is borrowed from subprocess.Popen.
"""
if self.returncode is None:
try:
pid, sts = os.waitpid(self.pid, os.WNOHANG)
if pid == self.pid:
self._handle_exitstatus(sts)
except os.error:
pass
return self.returncode
def wait(self):
"""
This is borrowed from subprocess.Popen.
"""
if self.returncode is None:
pid, sts = os.waitpid(self.pid, 0)
self._handle_exitstatus(sts)
return self.returncode
def fork_waitfor(self, timeout=None):
if not timeout:
return self.wait()
else:
_, result = retry.timeout(self.wait, timeout_sec=timeout)
if result is None:
utils.nuke_pid(self.pid)
print("subcommand failed pid %d" % self.pid)
print("%s" % (self.func,))
print("timeout after %ds" % timeout)
print()
result = self.wait()
return result