blob: 10f6f2e8ebbf90743eaa0e7f4b1b383a9d33897d [file] [log] [blame]
""" Parallel execution management """
__author__ = """Copyright Andy Whitcroft 2006"""
import sys, logging, os, pickle, traceback, gc, time
from autotest_lib.client.common_lib import error, utils
def fork_start(tmp, l):
sys.stdout.flush()
sys.stderr.flush()
pid = os.fork()
if pid:
# Parent
return pid
try:
try:
l()
except error.AutotestError:
raise
except Exception, e:
raise error.UnhandledTestError(e)
except Exception, detail:
try:
try:
logging.error('child process failed')
# logging.exception() uses ERROR level, but we want DEBUG for
# the traceback
for line in traceback.format_exc().splitlines():
logging.debug(line)
finally:
# note that exceptions originating in this block won't make it
# to the logs
output_dir = os.path.join(tmp, 'debug')
if not os.path.exists(output_dir):
os.makedirs(output_dir)
ename = os.path.join(output_dir, "error-%d" % os.getpid())
pickle.dump(detail, open(ename, "w"))
sys.stdout.flush()
sys.stderr.flush()
finally:
# clear exception information to allow garbage collection of
# objects referenced by the exception's traceback
sys.exc_clear()
gc.collect()
os._exit(1)
else:
try:
sys.stdout.flush()
sys.stderr.flush()
finally:
os._exit(0)
def _check_for_subprocess_exception(temp_dir, pid):
ename = temp_dir + "/debug/error-%d" % pid
if os.path.exists(ename):
try:
e = pickle.load(file(ename, 'r'))
except ImportError:
with open(ename, 'r') as fp:
file_text = fp.read()
raise error.TestError(
'Subprocess raised an exception that could not be '
'identified. The root cause exception is in the text '
'that follows: ' + file_text)
finally:
# Rename the error-pid file so that they do not affect later child
# processes that use recycled pids.
i = 0
while True:
pename = ename + ('-%d' % i)
i += 1
if not os.path.exists(pename):
break
os.rename(ename, pename)
raise e
def fork_waitfor(tmp, pid):
(pid, status) = os.waitpid(pid, 0)
_check_for_subprocess_exception(tmp, pid)
if status:
raise error.TestError("Test subprocess failed rc=%d" % (status))
def fork_waitfor_timed(tmp, pid, timeout):
"""
Waits for pid until it terminates or timeout expires.
If timeout expires, test subprocess is killed.
"""
timer_expired = True
poll_time = 2
time_passed = 0
while time_passed < timeout:
time.sleep(poll_time)
(child_pid, status) = os.waitpid(pid, os.WNOHANG)
if (child_pid, status) == (0, 0):
time_passed = time_passed + poll_time
else:
timer_expired = False
break
if timer_expired:
logging.info('Timer expired (%d sec.), nuking pid %d', timeout, pid)
utils.nuke_pid(pid)
(child_pid, status) = os.waitpid(pid, 0)
raise error.TestError("Test timeout expired, rc=%d" % (status))
else:
_check_for_subprocess_exception(tmp, pid)
if status:
raise error.TestError("Test subprocess failed rc=%d" % (status))
def fork_nuke_subprocess(tmp, pid):
utils.nuke_pid(pid)
_check_for_subprocess_exception(tmp, pid)