| # Lint as: python2, python3 |
| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
| |
| __author__ = """Copyright Andy Whitcroft, Martin J. Bligh - 2006, 2007""" |
| |
| import sys, os, signal, time, six.moves.cPickle, logging |
| |
| from autotest_lib.client.common_lib import error, utils |
| from autotest_lib.client.common_lib.cros import retry |
| from six.moves import zip |
| |
| |
| # entry points that use subcommand must set this to their logging manager |
| # to get log redirection for subcommands |
| logging_manager_object = None |
| |
| |
| def parallel(tasklist, timeout=None, return_results=False): |
| """ |
| Run a set of predefined subcommands in parallel. |
| |
| @param tasklist: A list of subcommand instances to execute. |
| @param timeout: Number of seconds after which the commands should timeout. |
| @param return_results: If True instead of an AutoServError being raised |
| on any error a list of the results|exceptions from the tasks is |
| returned. [default: False] |
| """ |
| run_error = False |
| for task in tasklist: |
| task.fork_start() |
| |
| remaining_timeout = None |
| if timeout: |
| endtime = time.time() + timeout |
| |
| results = [] |
| for task in tasklist: |
| if timeout: |
| remaining_timeout = max(endtime - time.time(), 1) |
| try: |
| status = task.fork_waitfor(timeout=remaining_timeout) |
| except error.AutoservSubcommandError: |
| run_error = True |
| else: |
| if status != 0: |
| run_error = True |
| |
| results.append(six.moves.cPickle.load(task.result_pickle)) |
| task.result_pickle.close() |
| |
| if return_results: |
| return results |
| elif run_error: |
| message = 'One or more subcommands failed:\n' |
| for task, result in zip(tasklist, results): |
| message += 'task: %s returned/raised: %r\n' % (task, result) |
| raise error.AutoservError(message) |
| |
| |
| def parallel_simple(function, arglist, subdir_name_constructor=lambda x: str(x), |
| log=True, timeout=None, return_results=False): |
| """ |
| Each element in the arglist used to create a subcommand object, |
| where that arg is used both as a subdir name, and a single argument |
| to pass to "function". |
| |
| We create a subcommand object for each element in the list, |
| then execute those subcommand objects in parallel. |
| |
| NOTE: As an optimization, if len(arglist) == 1 a subcommand is not used. |
| |
| @param function: A callable to run in parallel once per arg in arglist. |
| @param arglist: A list of arguments to be used one per subcommand |
| @param subdir_name_constructor: A function that returns a name for the |
| result sub-directory created per subcommand. |
| Signature is: |
| subdir_name_constructor(arg) |
| where arg is the argument passed to function. |
| @param log: If True, output will be written to output in a subdirectory |
| named after each subcommand's arg. |
| @param timeout: Number of seconds after which the commands should timeout. |
| @param return_results: If True instead of an AutoServError being raised |
| on any error a list of the results|exceptions from the function |
| called on each arg is returned. [default: False] |
| |
| @returns None or a list of results/exceptions. |
| """ |
| if not arglist: |
| logging.warning('parallel_simple was called with an empty arglist, ' |
| 'did you forget to pass in a list of machines?') |
| |
| # Bypass the multithreading if only one machine. |
| if len(arglist) == 1: |
| arg = arglist[0] |
| if return_results: |
| try: |
| result = function(arg) |
| except Exception as e: |
| return [e] |
| return [result] |
| else: |
| function(arg) |
| return |
| |
| subcommands = [] |
| for arg in arglist: |
| args = [arg] |
| subdir = subdir_name_constructor(arg) if log else None |
| subcommands.append(subcommand(function, args, subdir)) |
| return parallel(subcommands, timeout, return_results=return_results) |
| |
| |
| class subcommand(object): |
| fork_hooks, join_hooks = [], [] |
| |
| def __init__(self, func, args, subdir = None): |
| # func(args) - the subcommand to run |
| # subdir - the subdirectory to log results in |
| if subdir: |
| self.subdir = os.path.abspath(subdir) |
| if not os.path.exists(self.subdir): |
| os.mkdir(self.subdir) |
| self.debug = os.path.join(self.subdir, 'debug') |
| if not os.path.exists(self.debug): |
| os.mkdir(self.debug) |
| else: |
| self.subdir = None |
| self.debug = None |
| |
| self.func = func |
| self.args = args |
| self.pid = None |
| self.returncode = None |
| |
| |
| def __str__(self): |
| return str('subcommand(func=%s, args=%s, subdir=%s)' % |
| (self.func, self.args, self.subdir)) |
| |
| |
| @classmethod |
| def register_fork_hook(cls, hook): |
| """ Register a function to be called from the child process after |
| forking. """ |
| cls.fork_hooks.append(hook) |
| |
| |
| @classmethod |
| def register_join_hook(cls, hook): |
| """ Register a function to be called when from the child process |
| just before the child process terminates (joins to the parent). """ |
| cls.join_hooks.append(hook) |
| |
| |
| def redirect_output(self): |
| if self.subdir and logging_manager_object: |
| tag = os.path.basename(self.subdir) |
| logging_manager_object.tee_redirect_debug_dir(self.debug, tag=tag) |
| |
| |
| def fork_start(self): |
| sys.stdout.flush() |
| sys.stderr.flush() |
| r, w = os.pipe() |
| self.returncode = None |
| self.pid = os.fork() |
| |
| if self.pid: # I am the parent |
| os.close(w) |
| self.result_pickle = os.fdopen(r, 'r') |
| return |
| else: |
| os.close(r) |
| |
| # We are the child from this point on. Never return. |
| signal.signal(signal.SIGTERM, signal.SIG_DFL) # clear handler |
| if self.subdir: |
| os.chdir(self.subdir) |
| self.redirect_output() |
| |
| try: |
| for hook in self.fork_hooks: |
| hook(self) |
| result = self.func(*self.args) |
| os.write(w, six.moves.cPickle.dumps(result, six.moves.cPickle.HIGHEST_PROTOCOL)) |
| exit_code = 0 |
| except Exception as e: |
| logging.exception('function failed') |
| exit_code = 1 |
| os.write(w, six.moves.cPickle.dumps(e, six.moves.cPickle.HIGHEST_PROTOCOL)) |
| |
| os.close(w) |
| |
| try: |
| for hook in self.join_hooks: |
| hook(self) |
| finally: |
| sys.stdout.flush() |
| sys.stderr.flush() |
| os._exit(exit_code) |
| |
| |
| def _handle_exitstatus(self, sts): |
| """ |
| This is partially borrowed from subprocess.Popen. |
| """ |
| if os.WIFSIGNALED(sts): |
| self.returncode = -os.WTERMSIG(sts) |
| elif os.WIFEXITED(sts): |
| self.returncode = os.WEXITSTATUS(sts) |
| else: |
| # Should never happen |
| raise RuntimeError("Unknown child exit status!") |
| |
| if self.returncode != 0: |
| print("subcommand failed pid %d" % self.pid) |
| print("%s" % (self.func,)) |
| print("rc=%d" % self.returncode) |
| print() |
| if self.debug: |
| stderr_file = os.path.join(self.debug, 'autoserv.stderr') |
| if os.path.exists(stderr_file): |
| for line in open(stderr_file).readlines(): |
| print(line, end=' ') |
| print("\n--------------------------------------------\n") |
| raise error.AutoservSubcommandError(self.func, self.returncode) |
| |
| |
| def poll(self): |
| """ |
| This is borrowed from subprocess.Popen. |
| """ |
| if self.returncode is None: |
| try: |
| pid, sts = os.waitpid(self.pid, os.WNOHANG) |
| if pid == self.pid: |
| self._handle_exitstatus(sts) |
| except os.error: |
| pass |
| return self.returncode |
| |
| |
| def wait(self): |
| """ |
| This is borrowed from subprocess.Popen. |
| """ |
| if self.returncode is None: |
| pid, sts = os.waitpid(self.pid, 0) |
| self._handle_exitstatus(sts) |
| return self.returncode |
| |
| |
| def fork_waitfor(self, timeout=None): |
| if not timeout: |
| return self.wait() |
| else: |
| _, result = retry.timeout(self.wait, timeout_sec=timeout) |
| |
| if result is None: |
| utils.nuke_pid(self.pid) |
| print("subcommand failed pid %d" % self.pid) |
| print("%s" % (self.func,)) |
| print("timeout after %ds" % timeout) |
| print() |
| result = self.wait() |
| |
| return result |