Mark SIGKILL exits as potentially flaky, and propagate them up.

Currently, we don't propagate up errors in the case where a
SIGKILL exit was flaky. Do this.

BUG=chromium:308196
TEST=New unit test.

Change-Id: I49b2f1d0f0e0e0dc9641001f7dfa2bcc079389ac
Reviewed-on: https://chromium-review.googlesource.com/175515
Reviewed-by: Mike Frysinger <vapier@chromium.org>
Commit-Queue: David James <davidjames@chromium.org>
Tested-by: David James <davidjames@chromium.org>
diff --git a/buildbot/cbuildbot_stages_unittest.py b/buildbot/cbuildbot_stages_unittest.py
index be0487a..40531df 100755
--- a/buildbot/cbuildbot_stages_unittest.py
+++ b/buildbot/cbuildbot_stages_unittest.py
@@ -1392,6 +1392,7 @@
       except parallel.BackgroundFailure as ex:
         error = ex
     self.assertTrue(error)
+    self.assertFalse(error.possibly_flaky)
     expectedResults = [
         ('Pass', results_lib.Results.SUCCESS),
         ('Fail', FailStage.FAIL_EXCEPTION),
@@ -1401,6 +1402,26 @@
     ]
     self._verifyRunResults(expectedResults)
 
+  def testFlakyParallelStages(self):
+    """Verify that stages that die with kill -9 are treated as flaky."""
+    stage_objs = [stage(self.options, self.build_config) for stage in
+                  (PassStage, SuicideStage)]
+    error = None
+    with cros_test_lib.OutputCapturer():
+      with cros_test_lib.LoggingCapturer(parallel.__name__):
+        with mock.patch.multiple(parallel._BackgroundTask, PRINT_INTERVAL=0.01):
+          try:
+            cbuildbot.SimpleBuilder._RunParallelStages(stage_objs)
+          except parallel.BackgroundFailure as ex:
+            error = ex
+        self.assertTrue(error)
+        self.assertTrue(error.possibly_flaky)
+        expectedResults = [
+            ('Pass', results_lib.Results.SUCCESS),
+            ('Suicide', error),
+        ]
+        self._verifyRunResults(expectedResults)
+
   def testStagesReportSuccess(self):
     """Tests Stage reporting."""
 
diff --git a/lib/parallel.py b/lib/parallel.py
index ed13df4..a705642 100644
--- a/lib/parallel.py
+++ b/lib/parallel.py
@@ -155,12 +155,14 @@
       with open(self._output.name, 'r') as output:
         pos = 0
         running, exited_cleanly, msg, error = (True, False, None, None)
+        possibly_flaky = False
         while running:
           # Check whether the process is still alive.
           running = self.is_alive()
 
           try:
-            error, results = self._queue.get(True, self.PRINT_INTERVAL)
+            error, results, possibly_flaky = \
+                self._queue.get(True, self.PRINT_INTERVAL)
             running = False
             exited_cleanly = True
           except Queue.Empty:
@@ -174,8 +176,12 @@
               msg = '%r hung for %r seconds' % (self, self.EXIT_TIMEOUT)
               self._KillChildren([self])
             elif not exited_cleanly:
+              # Treat SIGKILL signals as potentially flaky.
+              if self.exitcode == -signal.SIGKILL:
+                possibly_flaky = True
               msg = ('%r exited unexpectedly with code %s' %
-                     (self, self.EXIT_TIMEOUT))
+                     (self, self.exitcode))
+
           # Read output from process.
           output.seek(pos)
           buf = output.read(_BUFSIZE)
@@ -187,6 +193,9 @@
                    (self, self.SILENT_TIMEOUT))
             self._KillChildren([self])
 
+            # Timeouts are possibly flaky.
+            possibly_flaky = True
+
             # Read remaining output from the process.
             output.seek(pos)
             buf = output.read(_BUFSIZE)
@@ -218,7 +227,7 @@
       self.Cleanup(silent=True)
 
     # If a traceback occurred, return it.
-    return error
+    return error, possibly_flaky
 
   def start(self):
     """Invoke multiprocessing.Process.start after flushing output/err."""
@@ -238,13 +247,14 @@
       self._semaphore.acquire()
 
     error = 'Unexpected exception in %r' % self
+    possibly_flaky = False
     pid = os.getpid()
     try:
-      error = self._Run()
+      error, possibly_flaky = self._Run()
     finally:
       if not self._killing.is_set() and os.getpid() == pid:
         results = results_lib.Results.Get()
-        self._queue.put((error, results))
+        self._queue.put((error, results, possibly_flaky))
         if self._semaphore is not None:
           self._semaphore.release()
 
@@ -260,6 +270,7 @@
 
     sys.stdout.flush()
     sys.stderr.flush()
+    possibly_flaky = False
     # Send all output to a named temporary file.
     with open(self._output.name, 'w', 0) as output:
       # Back up sys.std{err,out}. These aren't used, but we keep a copy so
@@ -287,6 +298,7 @@
         self._task(*self._task_args, **self._task_kwargs)
       except results_lib.StepFailure as ex:
         error = str(ex)
+        possibly_flaky = ex.possibly_flaky
       except BaseException as ex:
         error = traceback.format_exc()
         if self._killing.is_set():
@@ -295,7 +307,7 @@
         sys.stdout.flush()
         sys.stderr.flush()
 
-    return error
+    return error, possibly_flaky
 
   @classmethod
   def _KillChildren(cls, bg_tasks, log_level=logging.WARNING):
@@ -361,6 +373,7 @@
     """
 
     semaphore = None
+    possibly_flaky = False
     if max_parallel is not None:
       semaphore = multiprocessing.Semaphore(max_parallel)
 
@@ -376,10 +389,12 @@
     finally:
       # Wait for each step to complete.
       tracebacks = []
+      flaky_tasks = []
       while bg_tasks:
         task = bg_tasks.popleft()
-        error = task.Wait()
+        error, possibly_flaky = task.Wait()
         if error is not None:
+          flaky_tasks.append(possibly_flaky)
           tracebacks.append(error)
           if halt_on_error:
             break
@@ -390,7 +405,8 @@
 
       # Propagate any exceptions.
       if tracebacks:
-        raise BackgroundFailure('\n' + ''.join(tracebacks))
+        possibly_flaky = flaky_tasks and all(flaky_tasks)
+        raise BackgroundFailure('\n' + ''.join(tracebacks), possibly_flaky)
 
   @staticmethod
   def TaskRunner(queue, task, onexit=None, task_args=None, task_kwargs=None):