blob: 4d2fbdd958b3b4bdf166c4ba9c40605527072944 [file] [log] [blame]
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Module for running cbuildbot stages in the background."""
import multiprocessing
import os
import Queue
import sys
import tempfile
import traceback
from chromite.buildbot import cbuildbot_commands as commands
from chromite.buildbot import cbuildbot_results as results_lib
_PRINT_INTERVAL = 1
class BackgroundException(Exception):
pass
class BackgroundSteps(multiprocessing.Process):
"""Run a list of functions in sequence in the background.
These functions may be the 'Run' functions from buildbot stages or just plain
functions. They will be run in the background. Output from these functions
is saved to a temporary file and is printed when the 'WaitForStep' function
is called.
"""
def __init__(self):
multiprocessing.Process.__init__(self)
self._steps = []
self._queue = multiprocessing.Queue()
def AddStep(self, step):
"""Add a step to the list of steps to run in the background."""
output = tempfile.NamedTemporaryFile(delete=False)
self._steps.append((step, output))
def WaitForStep(self):
"""Wait for the next step to complete.
Output from the step is printed as the step runs.
If an exception occurs, return a string containing the traceback.
"""
assert not self.Empty()
step, output = self._steps.pop(0)
pos = 0
more_output = True
while more_output:
# Check whether the process is finished.
try:
error, results = self._queue.get(True, _PRINT_INTERVAL)
more_output = False
except Queue.Empty:
more_output = True
# Print output so far.
output.seek(pos)
for line in output:
sys.stdout.write(line)
pos = output.tell()
# Cleanup temp file.
output.close()
os.unlink(output.name)
# Propagate any results.
for result in results:
results_lib.Results.Record(*result)
# If a traceback occurred, return it.
return error
def Empty(self):
"""Return True if there are any steps left to run."""
return len(self._steps) == 0
def run(self):
"""Run the list of steps."""
# Be nice so that foreground processes get CPU if they need it.
commands.SetNiceness(foreground=False)
stdout_fileno = sys.stdout.fileno()
stderr_fileno = sys.stderr.fileno()
for step, output in self._steps:
# Send all output to a named temporary file.
os.dup2(output.fileno(), stdout_fileno)
os.dup2(output.fileno(), stderr_fileno)
error = None
try:
results_lib.Results.Clear()
step()
except Exception:
traceback.print_exc(file=output)
error = traceback.format_exc()
output.close()
results = results_lib.Results.Get()
self._queue.put((error, results))
def RunParallelSteps(steps):
"""Run a list of functions in parallel.
The output from the functions is saved to a temporary file and printed as if
they were run in sequence.
If exceptions occur in the steps, we join together the tracebacks and print
them after all parallel steps have finished running.
"""
# First, start all the steps.
bg_steps = []
for step in steps:
bg = BackgroundSteps()
bg.AddStep(step)
bg.start()
bg_steps.append(bg)
# Wait for each step to complete.
tracebacks = []
for bg in bg_steps:
while not bg.Empty():
error = bg.WaitForStep()
if error:
tracebacks.append(error)
bg.join()
# Propagate any exceptions.
if tracebacks:
raise BackgroundException('\n' + ''.join(tracebacks))