| """ Parallel execution management """ |
| |
| __author__ = """Copyright Andy Whitcroft 2006""" |
| |
| import sys, logging, os, pickle, traceback, gc, time |
| from autotest_lib.client.common_lib import error, utils |
| |
| def fork_start(tmp, l): |
| sys.stdout.flush() |
| sys.stderr.flush() |
| pid = os.fork() |
| if pid: |
| # Parent |
| return pid |
| |
| try: |
| try: |
| l() |
| except error.AutotestError: |
| raise |
| except Exception, e: |
| raise error.UnhandledTestError(e) |
| except Exception, detail: |
| try: |
| try: |
| logging.error('child process failed') |
| # logging.exception() uses ERROR level, but we want DEBUG for |
| # the traceback |
| for line in traceback.format_exc().splitlines(): |
| logging.debug(line) |
| finally: |
| # note that exceptions originating in this block won't make it |
| # to the logs |
| output_dir = os.path.join(tmp, 'debug') |
| if not os.path.exists(output_dir): |
| os.makedirs(output_dir) |
| ename = os.path.join(output_dir, "error-%d" % os.getpid()) |
| pickle.dump(detail, open(ename, "w")) |
| |
| sys.stdout.flush() |
| sys.stderr.flush() |
| finally: |
| # clear exception information to allow garbage collection of |
| # objects referenced by the exception's traceback |
| sys.exc_clear() |
| gc.collect() |
| os._exit(1) |
| else: |
| try: |
| sys.stdout.flush() |
| sys.stderr.flush() |
| finally: |
| os._exit(0) |
| |
| |
| def _check_for_subprocess_exception(temp_dir, pid): |
| ename = temp_dir + "/debug/error-%d" % pid |
| if os.path.exists(ename): |
| try: |
| e = pickle.load(file(ename, 'r')) |
| except ImportError: |
| with open(ename, 'r') as fp: |
| file_text = fp.read() |
| raise error.TestError( |
| 'Subprocess raised an exception that could not be ' |
| 'identified. The root cause exception is in the text ' |
| 'that follows: ' + file_text) |
| finally: |
| # Rename the error-pid file so that they do not affect later child |
| # processes that use recycled pids. |
| i = 0 |
| while True: |
| pename = ename + ('-%d' % i) |
| i += 1 |
| if not os.path.exists(pename): |
| break |
| os.rename(ename, pename) |
| raise e |
| |
| |
| def fork_waitfor(tmp, pid): |
| (pid, status) = os.waitpid(pid, 0) |
| |
| _check_for_subprocess_exception(tmp, pid) |
| |
| if status: |
| raise error.TestError("Test subprocess failed rc=%d" % (status)) |
| |
| def fork_waitfor_timed(tmp, pid, timeout): |
| """ |
| Waits for pid until it terminates or timeout expires. |
| If timeout expires, test subprocess is killed. |
| """ |
| timer_expired = True |
| poll_time = 2 |
| time_passed = 0 |
| while time_passed < timeout: |
| time.sleep(poll_time) |
| (child_pid, status) = os.waitpid(pid, os.WNOHANG) |
| if (child_pid, status) == (0, 0): |
| time_passed = time_passed + poll_time |
| else: |
| timer_expired = False |
| break |
| |
| if timer_expired: |
| logging.info('Timer expired (%d sec.), nuking pid %d', timeout, pid) |
| utils.nuke_pid(pid) |
| (child_pid, status) = os.waitpid(pid, 0) |
| raise error.TestError("Test timeout expired, rc=%d" % (status)) |
| else: |
| _check_for_subprocess_exception(tmp, pid) |
| |
| if status: |
| raise error.TestError("Test subprocess failed rc=%d" % (status)) |
| |
| def fork_nuke_subprocess(tmp, pid): |
| utils.nuke_pid(pid) |
| _check_for_subprocess_exception(tmp, pid) |