Mark SIGKILL exits as potentially flaky, and propagate them up.
Currently, we don't propagate up errors in the case where a
SIGKILL exit was flaky. Do this.
BUG=chromium:308196
TEST=New unit test.
Change-Id: I49b2f1d0f0e0e0dc9641001f7dfa2bcc079389ac
Reviewed-on: https://chromium-review.googlesource.com/175515
Reviewed-by: Mike Frysinger <vapier@chromium.org>
Commit-Queue: David James <davidjames@chromium.org>
Tested-by: David James <davidjames@chromium.org>
diff --git a/buildbot/cbuildbot_stages_unittest.py b/buildbot/cbuildbot_stages_unittest.py
index be0487a..40531df 100755
--- a/buildbot/cbuildbot_stages_unittest.py
+++ b/buildbot/cbuildbot_stages_unittest.py
@@ -1392,6 +1392,7 @@
except parallel.BackgroundFailure as ex:
error = ex
self.assertTrue(error)
+ self.assertFalse(error.possibly_flaky)
expectedResults = [
('Pass', results_lib.Results.SUCCESS),
('Fail', FailStage.FAIL_EXCEPTION),
@@ -1401,6 +1402,26 @@
]
self._verifyRunResults(expectedResults)
+ def testFlakyParallelStages(self):
+ """Verify that stages that die with kill -9 are treated as flaky."""
+ stage_objs = [stage(self.options, self.build_config) for stage in
+ (PassStage, SuicideStage)]
+ error = None
+ with cros_test_lib.OutputCapturer():
+ with cros_test_lib.LoggingCapturer(parallel.__name__):
+ with mock.patch.multiple(parallel._BackgroundTask, PRINT_INTERVAL=0.01):
+ try:
+ cbuildbot.SimpleBuilder._RunParallelStages(stage_objs)
+ except parallel.BackgroundFailure as ex:
+ error = ex
+ self.assertTrue(error)
+ self.assertTrue(error.possibly_flaky)
+ expectedResults = [
+ ('Pass', results_lib.Results.SUCCESS),
+ ('Suicide', error),
+ ]
+ self._verifyRunResults(expectedResults)
+
def testStagesReportSuccess(self):
"""Tests Stage reporting."""
diff --git a/lib/parallel.py b/lib/parallel.py
index ed13df4..a705642 100644
--- a/lib/parallel.py
+++ b/lib/parallel.py
@@ -155,12 +155,14 @@
with open(self._output.name, 'r') as output:
pos = 0
running, exited_cleanly, msg, error = (True, False, None, None)
+ possibly_flaky = False
while running:
# Check whether the process is still alive.
running = self.is_alive()
try:
- error, results = self._queue.get(True, self.PRINT_INTERVAL)
+ error, results, possibly_flaky = \
+ self._queue.get(True, self.PRINT_INTERVAL)
running = False
exited_cleanly = True
except Queue.Empty:
@@ -174,8 +176,12 @@
msg = '%r hung for %r seconds' % (self, self.EXIT_TIMEOUT)
self._KillChildren([self])
elif not exited_cleanly:
+ # Treat SIGKILL signals as potentially flaky.
+ if self.exitcode == -signal.SIGKILL:
+ possibly_flaky = True
msg = ('%r exited unexpectedly with code %s' %
- (self, self.EXIT_TIMEOUT))
+ (self, self.exitcode))
+
# Read output from process.
output.seek(pos)
buf = output.read(_BUFSIZE)
@@ -187,6 +193,9 @@
(self, self.SILENT_TIMEOUT))
self._KillChildren([self])
+ # Timeouts are possibly flaky.
+ possibly_flaky = True
+
# Read remaining output from the process.
output.seek(pos)
buf = output.read(_BUFSIZE)
@@ -218,7 +227,7 @@
self.Cleanup(silent=True)
# If a traceback occurred, return it.
- return error
+ return error, possibly_flaky
def start(self):
"""Invoke multiprocessing.Process.start after flushing output/err."""
@@ -238,13 +247,14 @@
self._semaphore.acquire()
error = 'Unexpected exception in %r' % self
+ possibly_flaky = False
pid = os.getpid()
try:
- error = self._Run()
+ error, possibly_flaky = self._Run()
finally:
if not self._killing.is_set() and os.getpid() == pid:
results = results_lib.Results.Get()
- self._queue.put((error, results))
+ self._queue.put((error, results, possibly_flaky))
if self._semaphore is not None:
self._semaphore.release()
@@ -260,6 +270,7 @@
sys.stdout.flush()
sys.stderr.flush()
+ possibly_flaky = False
# Send all output to a named temporary file.
with open(self._output.name, 'w', 0) as output:
# Back up sys.std{err,out}. These aren't used, but we keep a copy so
@@ -287,6 +298,7 @@
self._task(*self._task_args, **self._task_kwargs)
except results_lib.StepFailure as ex:
error = str(ex)
+ possibly_flaky = ex.possibly_flaky
except BaseException as ex:
error = traceback.format_exc()
if self._killing.is_set():
@@ -295,7 +307,7 @@
sys.stdout.flush()
sys.stderr.flush()
- return error
+ return error, possibly_flaky
@classmethod
def _KillChildren(cls, bg_tasks, log_level=logging.WARNING):
@@ -361,6 +373,7 @@
"""
semaphore = None
+ possibly_flaky = False
if max_parallel is not None:
semaphore = multiprocessing.Semaphore(max_parallel)
@@ -376,10 +389,12 @@
finally:
# Wait for each step to complete.
tracebacks = []
+ flaky_tasks = []
while bg_tasks:
task = bg_tasks.popleft()
- error = task.Wait()
+ error, possibly_flaky = task.Wait()
if error is not None:
+ flaky_tasks.append(possibly_flaky)
tracebacks.append(error)
if halt_on_error:
break
@@ -390,7 +405,8 @@
# Propagate any exceptions.
if tracebacks:
- raise BackgroundFailure('\n' + ''.join(tracebacks))
+ possibly_flaky = flaky_tasks and all(flaky_tasks)
+ raise BackgroundFailure('\n' + ''.join(tracebacks), possibly_flaky)
@staticmethod
def TaskRunner(queue, task, onexit=None, task_args=None, task_kwargs=None):