release_stages: Split Paygen -> Signing, Paygen stages.

This creates a SigningStage that must be run before the Paygen stage to
wait for signed images to be uploaded. This means that signing errors
are easily distinguished from Paygen errors on the waterfall.

The signing stage signals to Paygen with the board attribute
'signed_images_ready' as soon as all boards are signed and ready.

We don't start Paygen for any channels until all channels are
signed. And won't run Paygen for any channels if any channels get a
signing failure (which is a regression).

BUG=chromium:588282
TEST=run_tests

Change-Id: I73e180a728d097353117a54009c39b194f157c19
Reviewed-on: https://chromium-review.googlesource.com/332275
Commit-Ready: Don Garrett <dgarrett@chromium.org>
Tested-by: Don Garrett <dgarrett@chromium.org>
Reviewed-by: Matthew Sartori <msartori@chromium.org>
(cherry picked from commit a982dde9d6e249bed8309be9521e2ea099e1f12f)
Reviewed-on: https://chromium-review.googlesource.com/337056
Reviewed-by: Bernie Thompson <bhthompson@chromium.org>
Commit-Queue: Bernie Thompson <bhthompson@chromium.org>
Tested-by: Bernie Thompson <bhthompson@chromium.org>
diff --git a/cbuildbot/builders/simple_builders.py b/cbuildbot/builders/simple_builders.py
index ecbfc35..f1bb747 100644
--- a/cbuildbot/builders/simple_builders.py
+++ b/cbuildbot/builders/simple_builders.py
@@ -192,6 +192,7 @@
 
     stage_list += [
         [release_stages.SignerTestStage, board, archive_stage],
+        [release_stages.SigningStage, board, archive_stage],
         [release_stages.PaygenStage, board, archive_stage],
         [test_stages.ImageTestStage, board],
         [test_stages.UnitTestStage, board],
diff --git a/cbuildbot/stages/release_stages.py b/cbuildbot/stages/release_stages.py
index c382724..f91cac1 100644
--- a/cbuildbot/stages/release_stages.py
+++ b/cbuildbot/stages/release_stages.py
@@ -70,8 +70,8 @@
   """Paygen can't run with a release.conf config for the board."""
 
 
-class PaygenStage(artifact_stages.ArchivingStage):
-  """Stage that generates release payloads.
+class SigningStage(artifact_stages.ArchivingStage):
+  """Stage that waits for image signing.
 
   If this stage is created with a 'channels' argument, it can run
   independently. Otherwise, it's dependent on values queued up by
@@ -86,49 +86,32 @@
   # Timeout for the signing process. 2 hours in seconds.
   SIGNING_TIMEOUT = 2 * 60 * 60
 
-  FINISHED = 'finished'
-
-  def __init__(self, builder_run, board, archive_stage, channels=None,
-               **kwargs):
+  def __init__(self, builder_run, board, archive_stage, **kwargs):
     """Init that accepts the channels argument, if present.
 
     Args:
       builder_run: See builder_run on ArchivingStage.
       board: See board on ArchivingStage.
       archive_stage: See archive_stage on ArchivingStage.
-      channels: Explicit list of channels to generate payloads for.
-                If empty, will instead wait on values from push_image.
-                Channels is normally None in release builds, and normally set
-                for trybot 'payloads' builds.
     """
-    super(PaygenStage, self).__init__(builder_run, board, archive_stage,
-                                      **kwargs)
+    super(SigningStage, self).__init__(builder_run, board, archive_stage,
+                                       **kwargs)
+
+    # Used to remember partial results between retries.
     self.signing_results = {}
-    self.channels = channels
 
   def _HandleStageException(self, exc_info):
     """Override and don't set status to FAIL but FORGIVEN instead."""
-    exc_type, exc_value, _exc_tb = exc_info
+    exc_type, _exc_value, _exc_tb = exc_info
 
-    # If Paygen fails to find anything needed in release.conf, treat it
-    # as a warning, not a failure. This is common during new board bring up.
-    if issubclass(exc_type, PaygenNoPaygenConfigForBoard):
-      return self._HandleExceptionAsWarning(exc_info)
+    # Notify stages blocked on us if we error out.
+    self.board_runattrs.SetParallel('signed_images_ready', None)
 
     # Warn so people look at ArchiveStage for the real error.
     if issubclass(exc_type, MissingInstructionException):
       return self._HandleExceptionAsWarning(exc_info)
 
-    # If the exception is a TestLabFailure that means we couldn't schedule the
-    # test. We don't fail the build for that. We do the CompoundFailure dance,
-    # because that's how we'll get failures from background processes returned
-    # to us.
-    if (issubclass(exc_type, failures_lib.TestLabFailure) or
-        (issubclass(exc_type, failures_lib.CompoundFailure) and
-         exc_value.MatchesFailureType(failures_lib.TestLabFailure))):
-      return self._HandleExceptionAsWarning(exc_info)
-
-    return super(PaygenStage, self)._HandleStageException(exc_info)
+    return super(SigningStage, self)._HandleStageException(exc_info)
 
   def _JsonFromUrl(self, gs_ctx, url):
     """Fetch a GS Url, and parse it as Json.
@@ -168,15 +151,14 @@
     return (signer_json or {}).get('status', {}).get('status', '')
 
   def _CheckForResults(self, gs_ctx, instruction_urls_per_channel,
-                       channel_notifier):
+                       channel_notifier=None):
     """timeout_util.WaitForSuccess func to check a list of signer results.
 
     Args:
       gs_ctx: Google Storage Context.
       instruction_urls_per_channel: Urls of the signer result files
                                     we're expecting.
-      channel_notifier: BackgroundTaskRunner into which we push channels for
-                        processing.
+      channel_notifier: Method to call when a channel is ready or None.
 
     Returns:
       Number of results not yet collected.
@@ -227,8 +209,8 @@
         if self._SigningStatusFromJson(signer_result) != 'passed':
           channel_success = False
 
-      # If we successfully completed the channel, inform paygen.
-      if channel_success:
+      # If we successfully completed the channel, inform someone.
+      if channel_success and channel_notifier:
         channel_notifier(channel)
 
     return results_completed
@@ -256,13 +238,12 @@
 
   def _WaitForSigningResults(self,
                              instruction_urls_per_channel,
-                             channel_notifier):
+                             channel_notifier=None):
     """Do the work of waiting for signer results and logging them.
 
     Args:
       instruction_urls_per_channel: push_image data (see _WaitForPushImage).
-      channel_notifier: BackgroundTaskRunner into which we push channels for
-                        processing.
+      channel_notifier: Method to call with channel name when ready or None.
 
     Raises:
       ValueError: If the signer result isn't valid json.
@@ -309,6 +290,78 @@
     version = self._run.attrs.release_tag
 
     assert version, "We can't generate payloads without a release_tag."
+    logging.info("Waiting for image uploads for: %s, %s", board, version)
+
+    instruction_urls_per_channel = self._WaitForPushImage()
+
+    logging.info("Waiting for image signing for: %s, %s", board, version)
+    logging.info("GS errors are a normal part of the polling for results.")
+    self._WaitForSigningResults(instruction_urls_per_channel)
+
+    # Notify stages blocked on us that images are for the given channel list.
+    channels = instruction_urls_per_channel.keys()
+    self.board_runattrs.SetParallel('signed_images_ready', channels)
+
+
+class PaygenStage(artifact_stages.ArchivingStage):
+  """Stage that generates release payloads.
+
+  If this stage is created with a 'channels' argument, it can run
+  independently. Otherwise, it's dependent on values queued up by
+  the ArchiveStage (push_image).
+  """
+  option_name = 'paygen'
+  config_name = 'paygen'
+
+  def __init__(self, builder_run, board, archive_stage, channels=None,
+               **kwargs):
+    """Init that accepts the channels argument, if present.
+
+    Args:
+      builder_run: See builder_run on ArchivingStage.
+      board: See board on ArchivingStage.
+      archive_stage: See archive_stage on ArchivingStage.
+      channels: Explicit list of channels to generate payloads for.
+                If empty, will instead wait on values from push_image.
+                Channels is normally None in release builds, and normally set
+                for trybot 'payloads' builds.
+    """
+    super(PaygenStage, self).__init__(builder_run, board, archive_stage,
+                                      **kwargs)
+    self.channels = channels
+
+  def _HandleStageException(self, exc_info):
+    """Override and don't set status to FAIL but FORGIVEN instead."""
+    exc_type, exc_value, _exc_tb = exc_info
+
+    # If Paygen fails to find anything needed in release.conf, treat it
+    # as a warning. This is common during new board bring up.
+    if issubclass(exc_type, PaygenNoPaygenConfigForBoard):
+      return self._HandleExceptionAsWarning(exc_info)
+
+    # If the SigningStage failed, we warn that we didn't run, but don't fail
+    # outright. Let SigningStage decide if this should kill the build.
+    if issubclass(exc_type, SignerFailure):
+      return self._HandleExceptionAsWarning(exc_info)
+
+    # If the exception is a TestLabFailure that means we couldn't schedule the
+    # test. We don't fail the build for that. We do the CompoundFailure dance,
+    # because that's how we'll get failures from background processes returned
+    # to us.
+    if (issubclass(exc_type, failures_lib.TestLabFailure) or
+        (issubclass(exc_type, failures_lib.CompoundFailure) and
+         exc_value.MatchesFailureType(failures_lib.TestLabFailure))):
+      return self._HandleExceptionAsWarning(exc_info)
+
+    return super(PaygenStage, self)._HandleStageException(exc_info)
+
+  def PerformStage(self):
+    """Do the work of generating our release payloads."""
+    # Convert to release tools naming for boards.
+    board = self._current_board.replace('_', '-')
+    version = self._run.attrs.release_tag
+
+    assert version, "We can't generate payloads without a release_tag."
     logging.info("Generating payloads for: %s, %s", board, version)
 
     # Test to see if the current board has a Paygen configuration. We do
@@ -321,22 +374,24 @@
           'No release.conf entry was found for board %s. Get a TPM to fix.' %
           board)
 
+
+    # If we did not get an explicit channel list, we wait on the signing
+    # stage to give us a list, and so signal that signing is done.
+    if self.channels is None:
+      # Wait for channels from signing stage.
+      self.channels = self.board_runattrs.GetParallel(
+          'signed_images_ready', timeout=None)
+      if self.channels is None:
+        raise SignerFailure('SigningStage failed, no payloads generated.')
+
     with parallel.BackgroundTaskRunner(self._RunPaygenInProcess) as per_channel:
-      def channel_notifier(channel):
+      logging.info("Using channels: %s", self.channels)
+      # If we have an explicit list of channels, use it.
+      for channel in self.channels:
         per_channel.put((channel, board, version, self._run.debug,
                          self._run.config.paygen_skip_testing,
                          self._run.config.paygen_skip_delta_payloads))
 
-      if self.channels:
-        logging.info("Using explicit channels: %s", self.channels)
-        # If we have an explicit list of channels, use it.
-        for channel in self.channels:
-          channel_notifier(channel)
-      else:
-        instruction_urls_per_channel = self._WaitForPushImage()
-        self._WaitForSigningResults(instruction_urls_per_channel,
-                                    channel_notifier)
-
   def _RunPaygenInProcess(self, channel, board, version, debug,
                           disable_tests, skip_delta_payloads):
     """Helper for PaygenStage that invokes payload generation.
diff --git a/cbuildbot/stages/release_stages_unittest.py b/cbuildbot/stages/release_stages_unittest.py
index 89dbe09..570b136 100644
--- a/cbuildbot/stages/release_stages_unittest.py
+++ b/cbuildbot/stages/release_stages_unittest.py
@@ -25,11 +25,10 @@
 # pylint: disable=protected-access
 
 
-class PaygenStageTest(generic_stages_unittest.AbstractStageTestCase,
-                      cbuildbot_unittest.SimpleBuilderTestCase):
-  """Test the PaygenStageStage."""
+class SigningStageTest(generic_stages_unittest.AbstractStageTestCase,
+                       cbuildbot_unittest.SimpleBuilderTestCase):
+  """Test the SigningStage."""
 
-  BOT_ID = 'x86-mario-release'
   RELEASE_TAG = '0.0.1'
 
   SIGNER_RESULT = """
@@ -47,8 +46,8 @@
 
   def ConstructStage(self):
     archive_stage = artifact_stages.ArchiveStage(self._run, self._current_board)
-    return release_stages.PaygenStage(self._run, self._current_board,
-                                      archive_stage)
+    return release_stages.SigningStage(self._run, self._current_board,
+                                       archive_stage)
 
   def testWaitForPushImageSuccess(self):
     """Test waiting for input from PushImage."""
@@ -86,6 +85,20 @@
       for result in results:
         mock_gs_ctx.Cat.assert_any_call(result)
 
+  def testWaitForSigningResultsSuccessNoNotifier(self):
+    """Test that _WaitForSigningResults works when signing works."""
+    results = ['chan1_uri1.json', 'chan1_uri2.json', 'chan2_uri1.json']
+
+    with patch(release_stages.gs, 'GSContext') as mock_gs_ctx_init:
+      mock_gs_ctx = mock_gs_ctx_init.return_value
+      mock_gs_ctx.Cat.return_value = self.SIGNER_RESULT
+
+      stage = self.ConstructStage()
+      stage._WaitForSigningResults(self.INSNS_URLS_PER_CHANNEL, None)
+
+      for result in results:
+        mock_gs_ctx.Cat.assert_any_call(result)
+
   def testWaitForSigningResultsSuccessNothingSigned(self):
     """Test _WaitForSigningResults when there are no signed images."""
     with patch(release_stages.gs, 'GSContext') as mock_gs_ctx_init:
@@ -265,145 +278,133 @@
       })
       self.assertEqual(notifier.mock_calls, [])
 
-  def generateNotifyCalls(self, channels):
-    def side_effect(_, notifier):
-      for channel in channels:
-        notifier(channel)
-    return side_effect
+  def testPerformStageSuccess(self):
+    """Test that SigningStage works when signing works."""
+    stage = self.ConstructStage()
+
+    self.PatchObject(stage, '_WaitForPushImage',
+                     return_value=self.INSNS_URLS_PER_CHANNEL)
+    self.PatchObject(stage, '_WaitForSigningResults')
+
+    stage.PerformStage()
+
+    # Verify that we send the right notifications.
+    result = stage.board_runattrs.GetParallel('signed_images_ready', timeout=0)
+    self.assertEqual(result, ['chan1', 'chan2'])
+
+
+class PaygenStageTest(generic_stages_unittest.AbstractStageTestCase,
+                      cbuildbot_unittest.SimpleBuilderTestCase):
+  """Test the PaygenStageStage."""
+
+  # We use a variant board to make sure the '_' is translated to '-'.
+  BOT_ID = 'x86-alex_he-release'
+  RELEASE_TAG = '0.0.1'
+
+  def setUp(self):
+    self._Prepare()
+
+    # This method fetches a file from GS, mock it out.
+    self.validateMock = self.PatchObject(
+        paygen_build_lib, 'ValidateBoardConfig')
+
+  def ConstructStage(self):
+    return release_stages.PaygenStage(self._run, self._current_board, None)
 
   def testPerformStageSuccess(self):
     """Test that PaygenStage works when signing works."""
-
     with patch(release_stages.parallel, 'BackgroundTaskRunner') as background:
       queue = background().__enter__()
 
-      # This patch is only required for external builds with no config data.
-      with patch(paygen_build_lib, 'ValidateBoardConfig'):
+      stage = self.ConstructStage()
+      stage.board_runattrs.SetParallel('signed_images_ready',
+                                       ['stable', 'beta'])
 
-        stage = self.ConstructStage()
+      stage.PerformStage()
 
-        with patch(stage, '_WaitForPushImage') as wait_push:
-          with patch(stage, '_WaitForSigningResults') as wait_signing:
-            wait_push.return_value = self.INSNS_URLS_PER_CHANNEL
-            wait_signing.side_effect = self.generateNotifyCalls(('stable',
-                                                                 'beta'))
-            stage.PerformStage()
+      # Verify that we queue up work
+      self.assertEqual(
+          queue.put.call_args_list,
+          [mock.call(('stable', 'x86-alex-he', '0.0.1', False, True, False)),
+           mock.call(('beta', 'x86-alex-he', '0.0.1', False, True, False))])
 
-        # Verify that we queue up work
-        self.assertEqual(
-            queue.put.call_args_list,
-            [mock.call(('stable', 'x86-mario', '0.0.1', False, False, False)),
-             mock.call(('beta', 'x86-mario', '0.0.1', False, False, False))])
-
-  def testPerformStageSuccessVarientBoard(self):
-    """Test that SignerResultsStage works with varient boards.
-
-    Varient boards need some name conversion. Make sure that's okay.
-    """
-    self._current_board = 'x86-alex_he'
-
-    with patch(release_stages.parallel, 'BackgroundTaskRunner') as background:
-      queue = background().__enter__()
-
-      # This patch is only required for external builds with no config data.
-      with patch(paygen_build_lib, 'ValidateBoardConfig'):
-        stage = self.ConstructStage()
-
-        with patch(stage, '_WaitForPushImage') as wait_push:
-          with patch(stage, '_WaitForSigningResults') as wait_signing:
-            wait_push.return_value = self.INSNS_URLS_PER_CHANNEL
-            wait_signing.side_effect = self.generateNotifyCalls(('stable',
-                                                                 'beta'))
-            stage.PerformStage()
-
-        # Verify that we queue up work
-        self.assertEqual(
-            queue.put.call_args_list,
-            [mock.call(('stable', 'x86-alex-he', '0.0.1', False, False, False)),
-             mock.call(('beta', 'x86-alex-he', '0.0.1', False, False, False))])
-
-  def testPerformStageSigningFailed(self):
+  def testPerformStageNoChannels(self):
     """Test that PaygenStage works when signing works."""
     with patch(release_stages.parallel, 'BackgroundTaskRunner') as background:
       queue = background().__enter__()
 
-      # This patch is only required for external builds with no config data.
-      with patch(paygen_build_lib, 'ValidateBoardConfig'):
-        stage = self.ConstructStage()
+      stage = self.ConstructStage()
+      stage.board_runattrs.SetParallel('signed_images_ready', [])
 
-        with patch(stage, '_WaitForPushImage') as wait_push:
-          with patch(stage, '_WaitForSigningResults') as wait_signing:
-            wait_push.return_value = self.INSNS_URLS_PER_CHANNEL
-            wait_signing.side_effect = release_stages.SignerFailure
+      stage.PerformStage()
 
-            self.assertRaises(release_stages.SignerFailure,
-                              stage.PerformStage)
+      # Verify that we queue up work
+      self.assertEqual(queue.put.call_args_list, [])
 
-        # Ensure no work was queued up.
-        self.assertFalse(queue.put.called)
+  def testPerformStageSigningFailure(self):
+    """Test that PaygenStage works when signing works."""
+    with patch(release_stages.parallel, 'BackgroundTaskRunner') as background:
+      queue = background().__enter__()
+
+      stage = self.ConstructStage()
+      stage.board_runattrs.SetParallel('signed_images_ready', None)
+
+      with self.assertRaises(release_stages.SignerFailure):
+        stage.PerformStage()
+
+      # Verify that we queue up work
+      self.assertEqual(queue.put.call_args_list, [])
 
   def testPerformStageBackgroundFail(self):
     """Test that exception from background processes are properly handled."""
     with patch(paygen_build_lib, 'CreatePayloads') as create_payloads:
       create_payloads.side_effect = failures_lib.TestLabFailure
 
-      # This patch is only required for external builds with no config data.
-      with patch(paygen_build_lib, 'ValidateBoardConfig'):
-        stage = release_stages.PaygenStage(
-            self._run, self._current_board,
-            archive_stage=None, channels=['foo', 'bar'])
+      stage = release_stages.PaygenStage(
+          self._run, self._current_board,
+          archive_stage=None, channels=['foo', 'bar'])
 
-        with patch(stage, '_HandleExceptionAsWarning') as warning_handler:
-          warning_handler.return_value = (results_lib.Results.FORGIVEN,
-                                          'description',
-                                          0)
+      with patch(stage, '_HandleExceptionAsWarning') as warning_handler:
+        warning_handler.return_value = (results_lib.Results.FORGIVEN,
+                                        'description',
+                                        0)
 
-          stage.Run()
+        stage.Run()
 
-          # This proves the exception was turned into a warning.
-          self.assertTrue(warning_handler.called)
+        # This proves the exception was turned into a warning.
+        self.assertTrue(warning_handler.called)
 
   def testPerformStageTrybot(self):
     """Test the PerformStage alternate behavior for trybot runs."""
     with patch(release_stages.parallel, 'BackgroundTaskRunner') as background:
       queue = background().__enter__()
 
-      # This patch is only required for external builds with no config data.
-      with patch(paygen_build_lib, 'ValidateBoardConfig'):
-        # The stage is constructed differently for trybots, so don't use
-        # ConstructStage.
-        stage = release_stages.PaygenStage(
-            self._run, self._current_board, archive_stage=None,
-            channels=['foo', 'bar'])
-        with patch(stage, '_WaitForPushImage') as wait_push:
-          with patch(stage, '_WaitForSigningResults') as wait_signing:
-            stage.PerformStage()
+      # The stage is constructed differently for trybots, so don't use
+      # ConstructStage.
+      stage = release_stages.PaygenStage(
+          self._run, self._current_board, archive_stage=None,
+          channels=['foo', 'bar'])
+      stage.PerformStage()
 
-          # Make sure we don't wait on push_image or signing in this case.
-          self.assertEqual(wait_push.mock_calls, [])
-          self.assertEqual(wait_signing.mock_calls, [])
-
-        # Notice that we didn't put anything in _wait_for_channel_signing, but
-        # still got results right away.
-        self.assertEqual(
-            queue.put.call_args_list,
-            [mock.call(('foo', 'x86-mario', '0.0.1', False, False, False)),
-             mock.call(('bar', 'x86-mario', '0.0.1', False, False, False))])
+      # Notice that we didn't put anything in _wait_for_channel_signing, but
+      # still got results right away.
+      self.assertEqual(
+          queue.put.call_args_list,
+          [mock.call(('foo', 'x86-alex-he', '0.0.1', False, True, False)),
+           mock.call(('bar', 'x86-alex-he', '0.0.1', False, True, False))])
 
   def testPerformStageUnknownBoard(self):
     """Test that PaygenStage exits when an unknown board is specified."""
     self._current_board = 'unknown-board-name'
 
+    # Setup a board validation failure.
     badBoardException = paygen_build_lib.BoardNotConfigured(self._current_board)
+    self.validateMock.side_effect = badBoardException
 
-    # This patch is only required for external builds with no config data.
-    with patch(paygen_build_lib, 'ValidateBoardConfig') as validate_boards:
-      validate_boards.side_effect = badBoardException
+    stage = self.ConstructStage()
 
-      stage = self.ConstructStage()
-
-      self.assertRaises(release_stages.PaygenNoPaygenConfigForBoard,
-                        stage.PerformStage)
+    with self.assertRaises(release_stages.PaygenNoPaygenConfigForBoard):
+      stage.PerformStage()
 
   def testRunPaygenInProcess(self):
     """Test that _RunPaygenInProcess works in the simple case."""