# -*- coding: utf-8 -*-
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Module containing the sync stages."""

from __future__ import print_function

import collections
import contextlib
import datetime
import itertools
import json
import os
import pprint
import re
import sys
import time
from xml.etree import ElementTree
from xml.dom import minidom

from chromite.cbuildbot import lkgm_manager
from chromite.cbuildbot import manifest_version
from chromite.cbuildbot import patch_series
from chromite.cbuildbot import trybot_patch_pool
from chromite.cbuildbot import validation_pool
from chromite.cbuildbot.stages import generic_stages
from chromite.cbuildbot.stages import build_stages
from chromite.lib import buildbucket_lib
from chromite.lib import build_requests
from chromite.lib import clactions
from chromite.lib import clactions_metrics
from chromite.lib import config_lib
from chromite.lib import constants
from chromite.lib import commandline
from chromite.lib import cq_config
from chromite.lib import cros_build_lib
from chromite.lib import cros_collections
from chromite.lib import cros_logging as logging
from chromite.lib import failures_lib
from chromite.lib import git
from chromite.lib import metrics
from chromite.lib import osutils
from chromite.lib import patch as cros_patch
from chromite.lib import timeout_util
from chromite.lib import tree_status
from chromite.scripts import cros_mark_android_as_stable
from chromite.scripts import cros_mark_chrome_as_stable

PRE_CQ = validation_pool.PRE_CQ

PRECQ_INFLIGHT_TIMEOUT_MSG = (
    'The %s trybot for your change timed out after %s minutes.'
    '\n\n'
    'This problem can happen if your change causes the builder '
    'to hang, or if there is some infrastructure issue. If your '
    'change is not at fault you may mark your change as ready '
    'again. If this problem occurs multiple times please notify '
    'the sheriff and file a bug.')
PRECQ_EXPIRY_MSG = (
    'The pre-cq verification for this change expired after %s minutes. No '
    'action is required on your part.'
    '\n\n'
    'In order to protect the CQ from picking up stale changes, the pre-cq '
    'status for changes are cleared after a generous timeout. This change '
    'will be re-tested by the pre-cq before the CQ picks it up.')
PRECQ_EARLY_CRASH_MSG = (
    'The %s trybot for your change crashed.'
    '\n\n'
    '%s'
    '\n\n'
    'This problem can happen if your change causes the builder '
    'to crash, or if there is some infrastructure issue. If your '
    'change is not at fault you may mark your change as ready '
    'again. If this problem occurs multiple times please notify '
    'the sheriff and file a bug.')

# Default limit for the size of Pre-CQ configs to test for unioned options
# TODO(nxia): make this configurable in the COMMIT-QUEUE.ini
DEFAULT_UNION_PRE_CQ_LIMIT = 15


class ExceedUnionPreCQLimitException(Exception):
  """Exception raised when unioned Pre-CQ config size exceeds the limit."""

  def __init__(self, pre_cq_configs, limit, message=''):
    """Initialize a ExceedUnionPreCQLimitException.

    Args:
      pre_cq_configs: A set of Pre-CQ configs (strings) which exceeds the limit.
      limit: The limit for the size of the Pre-CQ configs.
      message: An error message (optional).
    """
    Exception.__init__(self, message)
    self.pre_cq_configs = pre_cq_configs
    self.limit = limit


class UnknownPreCQConfigRequestedError(Exception):
  """Raised when a config file asked for a config that doesn't exist."""

  def __init__(self, pre_cq_configs):
    super(UnknownPreCQConfigRequestedError, self).__init__(
        'One of the requested pre-cq configs is invalid or nonexistant: %s' %
        pre_cq_configs)
    self.pre_cq_configs = pre_cq_configs


class PatchChangesStage(generic_stages.BuilderStage):
  """Stage that patches a set of Gerrit changes to the buildroot source tree."""

  category = constants.CI_INFRA_STAGE

  def __init__(self, builder_run, buildstore, patch_pool, **kwargs):
    """Construct a PatchChangesStage.

    Args:
      builder_run: BuilderRun object.
      buildstore: BuildStore instance to make DB calls with.
      patch_pool: A TrybotPatchPool object containing the different types of
                  patches to apply.
    """
    super(PatchChangesStage, self).__init__(builder_run, buildstore, **kwargs)
    self.patch_pool = patch_pool

  @staticmethod
  def _CheckForDuplicatePatches(_series, changes):
    conflicts = {}
    duplicates = []
    for change in changes:
      if change.id is None:
        logging.warning(
            "Change %s lacks a usable ChangeId; duplicate checking cannot "
            "be done for this change.  If cherry-picking fails, this is a "
            "potential cause.", change)
        continue
      conflicts.setdefault(change.id, []).append(change)

    duplicates = [x for x in conflicts.itervalues() if len(x) > 1]
    if not duplicates:
      return changes

    for conflict in duplicates:
      logging.error(
          "Changes %s conflict with each other- they have same id %s., ".join(
              map(str, conflict)), conflict[0].id)

    cros_build_lib.Die("Duplicate patches were encountered: %s", duplicates)

  def _PatchSeriesFilter(self, series, changes):
    return self._CheckForDuplicatePatches(series, changes)

  def _ApplyPatchSeries(self, series, patch_pool, **kwargs):
    """Applies a patch pool using a patch series."""
    kwargs.setdefault('frozen', False)
    # Honor the given ordering, so that if a gerrit/remote patch
    # conflicts w/ a local patch, the gerrit/remote patch are
    # blamed rather than local (patch ordering is typically
    # local, gerrit, then remote).
    kwargs.setdefault('honor_ordering', True)
    kwargs['changes_filter'] = self._PatchSeriesFilter

    _applied, failed_tot, failed_inflight = series.Apply(
        list(patch_pool), **kwargs)

    failures = failed_tot + failed_inflight
    if failures:
      self.HandleApplyFailures(failures)

  def HandleApplyFailures(self, failures):
    cros_build_lib.Die("Failed applying patches: %s", "\n".join(
        map(str, failures)))

  def PerformStage(self):

    class NoisyPatchSeries(patch_series.PatchSeries):
      """Custom PatchSeries that adds links to buildbot logs for remote trys."""

      def ApplyChange(self, change):
        if isinstance(change, cros_patch.GerritPatch):
          logging.PrintBuildbotLink(str(change), change.url)
        elif isinstance(change, cros_patch.UploadedLocalPatch):
          logging.PrintBuildbotStepText(str(change))

        return patch_series.PatchSeries.ApplyChange(self, change)

    # If we're an external builder, ignore internal patches.
    helper_pool = patch_series.HelperPool.SimpleCreate(
        cros_internal=self._run.config.internal, cros=True)

    # Limit our resolution to non-manifest patches.
    patches = NoisyPatchSeries(
        self._build_root,
        helper_pool=helper_pool,
        deps_filter_fn=lambda p: not trybot_patch_pool.ManifestFilter(p))

    self._ApplyPatchSeries(patches, self.patch_pool)


class BootstrapStage(PatchChangesStage):
  """Stage that patches a chromite repo and re-executes inside it.

  Attributes:
    returncode - the returncode of the cbuildbot re-execution.  Valid after
                 calling stage.Run().
  """
  option_name = 'bootstrap'
  category = constants.CI_INFRA_STAGE

  def __init__(self, builder_run, buildstore, patch_pool, **kwargs):
    super(BootstrapStage, self).__init__(builder_run, buildstore,
                                         trybot_patch_pool.TrybotPatchPool(),
                                         **kwargs)

    self.patch_pool = patch_pool
    self.returncode = None
    self.tempdir = None

  def _ApplyManifestPatches(self, patch_pool):
    """Apply a pool of manifest patches to a temp manifest checkout.

    Args:
      patch_pool: The pool to apply.

    Returns:
      The path to the patched manifest checkout.

    Raises:
      Exception, if the new patched manifest cannot be parsed.
    """
    checkout_dir = os.path.join(self.tempdir, 'manfest-checkout')
    git.Clone(checkout_dir, self._run.config.manifest_repo_url)

    patches = patch_series.PatchSeries.WorkOnSingleRepo(
        checkout_dir, tracking_branch=self._run.manifest_branch)
    self._ApplyPatchSeries(patches, patch_pool)

    # Verify that the patched manifest loads properly. Propagate any errors as
    # exceptions.
    manifest = os.path.join(checkout_dir, self._run.config.manifest)
    git.Manifest.Cached(manifest, manifest_include_dir=checkout_dir)
    return checkout_dir

  @staticmethod
  def _FilterArgsForApi(parsed_args, api_minor):
    """Remove arguments that are introduced after an api version."""

    def filter_fn(passed_arg):
      return passed_arg.opt_inst.api_version <= api_minor

    accepted, removed = commandline.FilteringParser.FilterArgs(
        parsed_args, filter_fn)

    if removed:
      logging.warning("The following arguments were removed due to api: '%s'",
                      ' '.join(removed))
    return accepted

  @classmethod
  def FilterArgsForTargetCbuildbot(cls, buildroot, cbuildbot_path, options):
    _, minor = cros_build_lib.GetTargetChromiteApiVersion(buildroot)
    args = [cbuildbot_path]
    args.append(options.build_config_name)
    args.extend(cls._FilterArgsForApi(options.parsed_args, minor))

    # Only pass down --cache-dir if it was specified. By default, we want
    # the cache dir to live in the root of each checkout, so this means that
    # each instance of cbuildbot needs to calculate the default separately.
    if minor >= 2 and options.cache_dir_specified:
      args += ['--cache-dir', options.cache_dir]

    if minor >= constants.REEXEC_API_TSMON_TASK_NUM:
      # Increment the ts-mon task_num so the metrics don't collide.
      args.extend(['--ts-mon-task-num', str(options.ts_mon_task_num + 1)])

    return args

  @classmethod
  def BootstrapPatchesNeeded(cls, builder_run, patch_pool):
    """See if bootstrapping is needed for any of the given patches.

    Does NOT determine if they have already been applied.

    Args:
      builder_run: BuilderRun object for this build.
      patch_pool: All patches to be applied this run.

    Returns:
      boolean True if bootstrapping is needed.
    """
    chromite_pool = patch_pool.Filter(project=constants.CHROMITE_PROJECT)
    if builder_run.config.internal:
      manifest_pool = patch_pool.FilterIntManifest()
    else:
      manifest_pool = patch_pool.FilterExtManifest()

    return bool(chromite_pool or manifest_pool)

  def HandleApplyFailures(self, failures):
    """Handle the case where patches fail to apply."""
    if self._run.config.pre_cq:
      # Let the PreCQSync stage handle this failure. The PreCQSync stage will
      # comment on CLs with the appropriate message when they fail to apply.
      #
      # WARNING: For manifest patches, the Pre-CQ attempts to apply external
      # patches to the internal manifest, and this means we may flag a conflict
      # here even if the patch applies cleanly. TODO(davidjames): Fix this.
      logging.PrintBuildbotStepWarnings()
      logging.error('Failed applying patches: %s\n'.join(map(str, failures)))
    else:
      PatchChangesStage.HandleApplyFailures(self, failures)

  def _PerformStageInTempDir(self):
    # The plan for the builders is to use master branch to bootstrap other
    # branches. Now, if we wanted to test patches for both the bootstrap code
    # (on master) and the branched chromite (say, R20), we need to filter the
    # patches by branch.
    filter_branch = self._run.manifest_branch
    if self._run.options.test_bootstrap:
      filter_branch = 'master'

    # Filter all requested patches for the branch.
    branch_pool = self.patch_pool.FilterBranch(filter_branch)

    # Checkout the new version of chromite, and patch it.
    chromite_dir = os.path.join(self.tempdir, 'chromite')
    reference_repo = os.path.join(constants.CHROMITE_DIR, '.git')
    git.Clone(chromite_dir, constants.CHROMITE_URL, reference=reference_repo)
    git.RunGit(chromite_dir, ['checkout', filter_branch])

    chromite_pool = branch_pool.Filter(project=constants.CHROMITE_PROJECT)
    if chromite_pool:
      patches = patch_series.PatchSeries.WorkOnSingleRepo(
          chromite_dir, filter_branch)
      self._ApplyPatchSeries(patches, chromite_pool)

    # Re-exec into new instance of cbuildbot, with proper command line args.
    cbuildbot_path = constants.PATH_TO_CBUILDBOT
    if not os.path.exists(os.path.join(self.tempdir, cbuildbot_path)):
      cbuildbot_path = 'chromite/cbuildbot/cbuildbot'
    cmd = self.FilterArgsForTargetCbuildbot(self.tempdir, cbuildbot_path,
                                            self._run.options)

    extra_params = ['--sourceroot', self._run.options.sourceroot]
    extra_params.extend(self._run.options.bootstrap_args)
    if self._run.options.test_bootstrap:
      # We don't want re-executed instance to see this.
      cmd = [a for a in cmd if a != '--test-bootstrap']
    else:
      # If we've already done the desired number of bootstraps, disable
      # bootstrapping for the next execution.  Also pass in the patched manifest
      # repository.
      extra_params.append('--nobootstrap')
      if self._run.config.internal:
        manifest_pool = branch_pool.FilterIntManifest()
      else:
        manifest_pool = branch_pool.FilterExtManifest()

      if manifest_pool:
        manifest_dir = self._ApplyManifestPatches(manifest_pool)
        extra_params.extend(['--manifest-repo-url', manifest_dir])

    cmd += extra_params
    result_obj = cros_build_lib.RunCommand(
        cmd, cwd=self.tempdir, kill_timeout=30, error_code_ok=True)
    self.returncode = result_obj.returncode

  def PerformStage(self):
    with osutils.TempDir(base_dir=self._run.options.bootstrap_dir) as tempdir:
      self.tempdir = tempdir
      self._PerformStageInTempDir()
    self.tempdir = None


class SyncStage(generic_stages.BuilderStage):
  """Stage that performs syncing for the builder."""

  option_name = 'sync'
  output_manifest_sha1 = True
  category = constants.CI_INFRA_STAGE

  def __init__(self, builder_run, buildstore, **kwargs):
    super(SyncStage, self).__init__(builder_run, buildstore, **kwargs)
    self.repo = None
    self.skip_sync = False

    # TODO(mtennant): Why keep a duplicate copy of this config value
    # at self.internal when it can always be retrieved from config?
    self.internal = self._run.config.internal
    self.buildbucket_client = self.GetBuildbucketClient()

  def _GetManifestVersionsRepoUrl(self, internal=None, test=False):
    if internal is None:
      internal = self._run.config.internal

    site_params = config_lib.GetSiteParams()
    if internal:
      if test:
        return site_params.MANIFEST_VERSIONS_INT_GOB_URL_TEST
      else:
        return site_params.MANIFEST_VERSIONS_INT_GOB_URL
    else:
      if test:
        return site_params.MANIFEST_VERSIONS_GOB_URL_TEST
      else:
        return site_params.MANIFEST_VERSIONS_GOB_URL

  def Initialize(self):
    self._InitializeRepo()

  def _InitializeRepo(self):
    """Set up the RepoRepository object."""
    self.repo = self.GetRepoRepository()

  def GetNextManifest(self):
    """Returns the manifest to use."""
    return self._run.config.manifest

  def ManifestCheckout(self, next_manifest, fetch_all=False):
    """Checks out the repository to the given manifest."""
    self._Print('\n'.join([
        'BUILDROOT: %s' % self.repo.directory,
        'TRACKING BRANCH: %s' % self.repo.branch,
        'NEXT MANIFEST: %s' % next_manifest
    ]))

    if not self.skip_sync:
      self.repo.Sync(next_manifest)

    print(
        self.repo.ExportManifest(mark_revision=self.output_manifest_sha1),
        file=sys.stderr)

    if fetch_all:
      # Perform git fetch all on projects to resolve any git corruption
      # that may occur due to flake.
      # http://crbug/921407
      self.repo.FetchAll()

  def RunPrePatchBuild(self):
    """Run through a pre-patch build to prepare for incremental build.

    This function runs though the InitSDKStage, SetupBoardStage, and
    BuildPackagesStage. It is intended to be called before applying
    any patches under test, to prepare the chroot and sysroot in a state
    corresponding to ToT prior to an incremental build.

    Returns:
      True if all stages were successful, False if any of them failed.
    """
    suffix = ' (pre-Patch)'
    try:
      build_stages.InitSDKStage(
          self._run, self.buildstore, chroot_replace=True, suffix=suffix).Run()
      for builder_run in self._run.GetUngroupedBuilderRuns():
        for board in builder_run.config.boards:
          build_stages.SetupBoardStage(
              builder_run, self.buildstore, board=board, suffix=suffix).Run()
          build_stages.BuildPackagesStage(
              builder_run, self.buildstore, board=board, suffix=suffix).Run()
    except failures_lib.StepFailure:
      return False

    return True

  def WriteChangesToMetadata(self, changes):
    """Write the changes under test into the metadata.

    Args:
      changes: A list of GerritPatch instances.
    """
    changes_list = self._run.attrs.metadata.GetDict().get('changes', [])
    changes_list = changes_list + [c.GetAttributeDict() for c in set(changes)]
    changes_list = sorted(
        changes_list,
        key=
        lambda x: (x[cros_patch.ATTR_GERRIT_NUMBER],
                   x[cros_patch.ATTR_PATCH_NUMBER],
                   x[cros_patch.ATTR_REMOTE])
    )
    self._run.attrs.metadata.UpdateWithDict({'changes': changes_list})
    change_ids = []
    change_gerrit_ids = []
    change_gerrit_numbers = []
    for c in changes_list:
      change_ids.append(c[cros_patch.ATTR_CHANGE_ID])
      gerrit_number = c[cros_patch.ATTR_GERRIT_NUMBER]
      gerrit_id = '/'.join([
          c[cros_patch.ATTR_REMOTE], gerrit_number,
          c[cros_patch.ATTR_PATCH_NUMBER]
      ])
      change_gerrit_ids.append(gerrit_id)
      change_gerrit_numbers.append(gerrit_number)
    tags = {
        'change_ids': change_ids,
        'change_gerrit_ids': change_gerrit_ids,
        'change_gerrit_numbers': change_gerrit_numbers,
    }
    self._run.attrs.metadata.UpdateKeyDictWithDict(constants.METADATA_TAGS,
                                                   tags)

  @failures_lib.SetFailureType(failures_lib.InfrastructureFailure)
  def PerformStage(self):
    self.Initialize()
    with osutils.TempDir() as tempdir:
      # Save off the last manifest.
      fresh_sync = True
      if os.path.exists(self.repo.directory) and not self._run.options.clobber:
        old_filename = os.path.join(tempdir, 'old.xml')
        try:
          old_contents = self.repo.ExportManifest()
        except cros_build_lib.RunCommandError as e:
          logging.warning(str(e))
        else:
          osutils.WriteFile(old_filename, old_contents)
          fresh_sync = False

      # Sync.
      self.ManifestCheckout(self.GetNextManifest())

      # Print the blamelist.
      if fresh_sync:
        logging.PrintBuildbotStepText('(From scratch)')
      elif self._run.options.buildbot:
        lkgm_manager.GenerateBlameList(self.repo, old_filename)

      # Incremental builds request an additional build before patching changes.
      if self._run.config.build_before_patching:
        pre_build_passed = self.RunPrePatchBuild()
        if not pre_build_passed:
          logging.PrintBuildbotStepText('Pre-patch build failed.')


class ManifestVersionedSyncStage(SyncStage):
  """Stage that generates a unique manifest file, and sync's to it."""

  # TODO(mtennant): Make this into a builder run value.
  output_manifest_sha1 = False
  category = constants.CI_INFRA_STAGE

  def __init__(self, builder_run, buildstore, **kwargs):
    # Perform the sync at the end of the stage to the given manifest.
    super(ManifestVersionedSyncStage, self).__init__(builder_run, buildstore,
                                                     **kwargs)
    self.repo = None
    self.manifest_manager = None

    # If a builder pushes changes (even with dryrun mode), we need a writable
    # repository. Otherwise, the push will be rejected by the server.
    self.manifest_repo = self._GetManifestVersionsRepoUrl()

    # 1. Our current logic for calculating whether to re-run a build assumes
    #    that if the build is green, then it doesn't need to be re-run. This
    #    isn't true for canary masters, because the canary master ignores the
    #    status of its slaves and is green even if they fail. So set
    #    force=True in this case.
    # 2. If we're running with --debug, we should always run through to
    #    completion, so as to ensure a complete test.
    self._force = self._run.config.master or self._run.options.debug

  def HandleSkip(self):
    """Initializes a manifest manager to the specified version if skipped."""
    super(ManifestVersionedSyncStage, self).HandleSkip()
    if self._run.options.force_version:
      self.Initialize()
      self.ForceVersion(self._run.options.force_version)

  def ForceVersion(self, version):
    """Creates a manifest manager from given version and returns manifest."""
    logging.PrintBuildbotStepText(version)
    return self.manifest_manager.BootstrapFromVersion(version)

  def VersionIncrementType(self):
    """Return which part of the version number should be incremented."""
    if self._run.manifest_branch == 'master':
      return 'build'

    return 'branch'

  def RegisterManifestManager(self, manifest_manager):
    """Save the given manifest manager for later use in this run.

    Args:
      manifest_manager: Expected to be a BuildSpecsManager.
    """
    self._run.attrs.manifest_manager = self.manifest_manager = manifest_manager

  def Initialize(self):
    """Initializes a manager that manages manifests for associated stages."""

    dry_run = self._run.options.debug

    self._InitializeRepo()

    # If chrome_rev is somehow set, fail.
    assert not self._chrome_rev, \
        'chrome_rev is unsupported on release builders.'

    self.RegisterManifestManager(
        manifest_version.BuildSpecsManager(
            source_repo=self.repo,
            manifest_repo=self.manifest_repo,
            manifest=self._run.config.manifest,
            build_names=self._run.GetBuilderIds(),
            incr_type=self.VersionIncrementType(),
            force=self._force,
            branch=self._run.manifest_branch,
            dry_run=dry_run,
            config=self._run.config,
            metadata=self._run.attrs.metadata,
            buildstore=self.buildstore,
            buildbucket_client=self.buildbucket_client))

  def _SetAndroidVersionIfApplicable(self, manifest):
    """If 'android' is in |manifest|, write version to the BuilderRun object.

    Args:
      manifest: Path to the manifest.
    """
    manifest_dom = minidom.parse(manifest)
    elements = manifest_dom.getElementsByTagName(lkgm_manager.ANDROID_ELEMENT)

    if elements:
      android_version = elements[0].getAttribute(
          lkgm_manager.ANDROID_VERSION_ATTR)
      logging.info('Android version was found in the manifest: %s',
                   android_version)
      # Update the metadata dictionary. This is necessary because the
      # metadata dictionary is preserved through re-executions, so
      # UprevAndroidStage can read the version from the dictionary
      # later. This is easier than parsing the manifest again after
      # the re-execution.
      self._run.attrs.metadata.UpdateKeyDictWithDict(
          'version', {'android': android_version})

  def _SetChromeVersionIfApplicable(self, manifest):
    """If 'chrome' is in |manifest|, write the version to the BuilderRun object.

    Args:
      manifest: Path to the manifest.
    """
    manifest_dom = minidom.parse(manifest)
    elements = manifest_dom.getElementsByTagName(lkgm_manager.CHROME_ELEMENT)

    if elements:
      chrome_version = elements[0].getAttribute(
          lkgm_manager.CHROME_VERSION_ATTR)
      logging.info('Chrome version was found in the manifest: %s',
                   chrome_version)
      # Update the metadata dictionary. This is necessary because the
      # metadata dictionary is preserved through re-executions, so
      # SyncChromeStage can read the version from the dictionary
      # later. This is easier than parsing the manifest again after
      # the re-execution.
      self._run.attrs.metadata.UpdateKeyDictWithDict('version',
                                                     {'chrome': chrome_version})

  def GetNextManifest(self):
    """Uses the initialized manifest manager to get the next manifest."""
    assert self.manifest_manager, \
        'Must run GetStageManager before checkout out build.'

    build_id = self._run.attrs.metadata.GetDict().get('build_id')

    to_return = self.manifest_manager.GetNextBuildSpec(build_id=build_id)
    logging.info('Found next version to build: %s', to_return)
    previous_version = self.manifest_manager.GetLatestPassingSpec()
    target_version = self.manifest_manager.current_version

    # Print the Blamelist here.
    url_prefix = 'https://crosland.corp.google.com/log/'
    url = url_prefix + '%s..%s' % (previous_version, target_version)
    logging.PrintBuildbotLink('Blamelist', url)
    # The testManifestVersionedSyncOnePartBranch interacts badly with this
    # function.  It doesn't fully initialize self.manifest_manager which
    # causes target_version to be None.  Since there isn't a clean fix in
    # either direction, just throw this through str().  In the normal case,
    # it's already a string anyways.
    logging.PrintBuildbotStepText(str(target_version))

    return to_return

  @contextlib.contextmanager
  def LocalizeManifest(self, manifest, filter_cros=False):
    """Remove restricted checkouts from the manifest if needed.

    Args:
      manifest: The manifest to localize.
      filter_cros: If set, then only checkouts with a remote of 'cros' or
        'cros-internal' are kept, and the rest are filtered out.
    """
    if filter_cros:
      with osutils.TempDir() as tempdir:
        filtered_manifest = os.path.join(tempdir, 'filtered.xml')
        doc = ElementTree.parse(manifest)
        root = doc.getroot()
        for node in root.findall('project'):
          remote = node.attrib.get('remote')
          if remote and remote not in config_lib.GetSiteParams().GIT_REMOTES:
            root.remove(node)
        doc.write(filtered_manifest)
        yield filtered_manifest
    else:
      yield manifest

  def _GetMasterVersion(self, master_id, timeout=5 * 60):
    """Get the platform version associated with the master_build_id.

    Args:
      master_id: Our master buildbucket id.
      timeout: How long to wait for the platform version to show up
        in the database. This is needed because the slave builders are
        triggered slightly before the platform version is written. Default
        is 5 minutes.
    """

    # TODO(davidjames): Remove the wait loop here once we've updated slave
    # builders to only get triggered after the platform version is written.
    def _PrintRemainingTime(remaining):
      logging.info('%s until timeout...', remaining)

    def _GetPlatformVersion():
      status = self.buildstore.GetBuildStatuses(buildbucket_ids=[master_id])[0]
      return status['platform_version']

    # Retry until non-None version is returned.
    def _ShouldRetry(x):
      return not x

    return timeout_util.WaitForSuccess(
        _ShouldRetry,
        _GetPlatformVersion,
        timeout,
        period=constants.SLEEP_TIMEOUT,
        side_effect_func=_PrintRemainingTime)

  def _VerifyMasterId(self, master_id):
    """Verify that our master id is current and valid.

    Args:
      master_id: Our master buildbucket id.
    """
    if self.buildstore.AreClientsReady() and master_id:
      assert not self._run.options.force_version
      master_build_status = self.buildstore.GetBuildStatuses(
          buildbucket_ids=[master_id])[0]
      latest = self.buildstore.GetBuildHistory(
          master_build_status['build_config'],
          1,
          branch=self._run.options.branch)
      if latest and latest[0]['buildbucket_id'] != master_id:
        raise failures_lib.MasterSlaveVersionMismatchFailure(
            'This slave\'s master (id=%s) has been supplanted by a newer '
            'master (id=%s). Aborting.' % (master_id, latest[0]['id']))

  @failures_lib.SetFailureType(failures_lib.InfrastructureFailure)
  def PerformStage(self):
    self.Initialize()

    self._VerifyMasterId(self._run.options.master_buildbucket_id)
    version = self._run.options.force_version
    if self._run.options.master_buildbucket_id:
      version = self._GetMasterVersion(self._run.options.master_buildbucket_id)

    next_manifest = None
    if version:
      next_manifest = self.ForceVersion(version)
    else:
      self.skip_sync = True
      try:
        next_manifest = self.GetNextManifest()
      except validation_pool.TreeIsClosedException as e:
        logging.warning(str(e))

    if not next_manifest:
      logging.info('Found no work to do.')
      if self._run.attrs.manifest_manager.DidLastBuildFail():
        raise failures_lib.StepFailure('The previous build failed.')
      else:
        raise failures_lib.ExitEarlyException(
            'ManifestVersionedSyncStage finished and exited early.')

    # Log this early on for the release team to grep out before we finish.
    if self.manifest_manager:
      self._Print(
          '\nRELEASETAG: %s\n' % (self.manifest_manager.current_version))

    self._SetAndroidVersionIfApplicable(next_manifest)
    self._SetChromeVersionIfApplicable(next_manifest)
    # To keep local trybots working, remove restricted checkouts from the
    # official manifest we get from manifest-versions.
    with self.LocalizeManifest(
        next_manifest, filter_cros=self._run.options.local) as new_manifest:
      self.ManifestCheckout(new_manifest)


class MasterSlaveLKGMSyncStage(ManifestVersionedSyncStage):
  """Stage that generates a unique manifest file candidate, and sync's to it.

  This stage uses an LKGM manifest manager that handles LKGM
  candidates and their states.
  """
  # If we are using an internal manifest, but need to be able to create an
  # external manifest, we create a second manager for that manifest.
  external_manager = None
  MAX_BUILD_HISTORY_LENGTH = 10
  MilestoneVersion = collections.namedtuple('MilestoneVersion',
                                            ['milestone', 'platform'])
  category = constants.CI_INFRA_STAGE

  def __init__(self, builder_run, buildstore, **kwargs):
    super(MasterSlaveLKGMSyncStage, self).__init__(builder_run, buildstore,
                                                   **kwargs)
    # lkgm_manager deals with making sure we're synced to whatever manifest
    # we get back in GetNextManifest so syncing again is redundant.
    self._android_version = None
    self._chrome_version = None

  def _GetInitializedManager(self, internal):
    """Returns an initialized lkgm manager.

    Args:
      internal: Boolean.  True if this is using an internal manifest.

    Returns:
      lkgm_manager.LKGMManager.
    """
    increment = self.VersionIncrementType()
    return lkgm_manager.LKGMManager(
        source_repo=self.repo,
        manifest_repo=self._GetManifestVersionsRepoUrl(internal=internal),
        manifest=self._run.config.manifest,
        build_names=self._run.GetBuilderIds(),
        build_type=self._run.config.build_type,
        incr_type=increment,
        force=self._force,
        branch=self._run.manifest_branch,
        dry_run=self._run.options.debug,
        config=self._run.config,
        metadata=self._run.attrs.metadata,
        buildstore=self.buildstore,
        buildbucket_client=self.buildbucket_client)

  def Initialize(self):
    """Override: Creates an LKGMManager rather than a ManifestManager."""
    self._InitializeRepo()
    self.RegisterManifestManager(self._GetInitializedManager(self.internal))
    if self._run.config.master and self._GetSlaveConfigs():
      assert self.internal, 'Unified masters must use an internal checkout.'
      MasterSlaveLKGMSyncStage.external_manager = \
          self._GetInitializedManager(False)

  def ForceVersion(self, version):
    manifest = super(MasterSlaveLKGMSyncStage, self).ForceVersion(version)
    if MasterSlaveLKGMSyncStage.external_manager:
      MasterSlaveLKGMSyncStage.external_manager.BootstrapFromVersion(version)

    return manifest

  def _VerifyMasterId(self, master_id):
    """Verify that our master id is current and valid."""
    super(MasterSlaveLKGMSyncStage, self)._VerifyMasterId(master_id)
    if not self._run.config.master and not master_id:
      raise failures_lib.StepFailure(
          'Cannot start build without a master_build_id. Did you hit force '
          'build on a slave? Please hit force build on the master instead.')

  def GetNextManifest(self):
    """Gets the next manifest using LKGM logic."""
    assert self.manifest_manager, \
        'Must run Initialize before we can get a manifest.'
    assert isinstance(self.manifest_manager, lkgm_manager.LKGMManager), \
        'Manifest manager instantiated with wrong class.'
    assert self._run.config.master

    build_id = self._run.attrs.metadata.GetDict().get('build_id')
    logging.info(
        'Creating new candidate manifest, including chrome version '
        '%s.', self._chrome_version)
    if self._android_version:
      logging.info('Adding Android version to new candidate manifest %s.',
                   self._android_version)
    manifest = self.manifest_manager.CreateNewCandidate(
        android_version=self._android_version,
        chrome_version=self._chrome_version,
        build_id=build_id)
    if MasterSlaveLKGMSyncStage.external_manager:
      MasterSlaveLKGMSyncStage.external_manager.CreateFromManifest(
          manifest, build_id=build_id)

    return manifest

  def GetLatestAndroidVersion(self):
    """Returns the version of Android to uprev."""
    return cros_mark_android_as_stable.GetLatestBuild(
        constants.ANDROID_BUCKET_URL, self._run.config.android_import_branch,
        cros_mark_android_as_stable.MakeBuildTargetDict(
            self._run.config.android_import_branch))[0]

  def GetLatestChromeVersion(self):
    """Returns the version of Chrome to uprev."""
    return cros_mark_chrome_as_stable.GetLatestRelease(
        constants.CHROMIUM_GOB_URL)

  def GetLastChromeOSVersion(self):
    """Fetching ChromeOS version from the last run.

    Fetching the chromeos version from the last run that published a manifest
    by querying CIDB. Master builds that failed before publishing a manifest
    will be ignored.

    Returns:
      A namedtuple MilestoneVersion,
      e.g. MilestoneVersion(milestone='44', platform='7072.0.0-rc4')
      or None if failed to retrieve milestone and platform versions.
    """
    build_identifier, _ = self._run.GetCIDBHandle()
    build_id = build_identifier.cidb_id

    if not self.buildstore.AreClientsReady():
      return None

    builds = self.buildstore.GetBuildHistory(
        build_config=self._run.config.name,
        num_results=self.MAX_BUILD_HISTORY_LENGTH,
        ignore_build_id=build_id)
    full_versions = [b.get('full_version') for b in builds]
    old_version = next(itertools.ifilter(bool, full_versions), None)
    if old_version:
      pattern = r'^R(\d+)-(\d+.\d+.\d+(-rc\d+)*)'
      m = re.match(pattern, old_version)
      if m:
        milestone = m.group(1)
        platform = m.group(2)
      return self.MilestoneVersion(milestone=milestone, platform=platform)
    return None

  @failures_lib.SetFailureType(failures_lib.InfrastructureFailure)
  def PerformStage(self):
    """Performs the stage."""
    if self._android_rev and self._run.config.master:
      self._android_version = self.GetLatestAndroidVersion()
      logging.info('Latest Android version is: %s', self._android_version)

    if (self._chrome_rev == constants.CHROME_REV_LATEST and
        self._run.config.master):
      # PFQ master needs to determine what version of Chrome to build
      # for all slaves.
      logging.info('I am a master running with CHROME_REV_LATEST, '
                   'therefore getting latest chrome version.')
      self._chrome_version = self.GetLatestChromeVersion()
      logging.info('Latest chrome version is: %s', self._chrome_version)

    ManifestVersionedSyncStage.PerformStage(self)

    # Generate blamelist
    cros_version = self.GetLastChromeOSVersion()
    if cros_version:
      old_filename = self.manifest_manager.GetBuildSpecFilePath(
          cros_version.milestone, cros_version.platform)
      if not os.path.exists(old_filename):
        logging.error(
            'Could not generate blamelist, '
            'manifest file does not exist: %s', old_filename)
      else:
        logging.debug('Generate blamelist against: %s', old_filename)
        lkgm_manager.GenerateBlameList(self.repo, old_filename)


class CommitQueueSyncStage(MasterSlaveLKGMSyncStage):
  """Commit Queue Sync stage that handles syncing and applying patches.

  Similar to the MasterSlaveLKGMsync Stage, this stage handles syncing
  to a manifest, passing around that manifest to other builders.

  What makes this stage different is that the CQ master finds the
  patches on Gerrit which are ready to be committed, apply them, and
  includes the patches in the new manifest. The slaves sync to the
  manifest, and apply the patches written in the manifest.
  """

  # The amount of time we wait before assuming that the Pre-CQ is down and
  # that we should start testing changes that haven't been tested by the Pre-CQ.
  PRE_CQ_TIMEOUT = 2 * 60 * 60
  category = constants.CI_INFRA_STAGE

  def __init__(self, builder_run, buildstore, **kwargs):
    super(CommitQueueSyncStage, self).__init__(builder_run, buildstore,
                                               **kwargs)

    # The pool of patches to be picked up by the commit queue.
    # - For the master commit queue, it's initialized in GetNextManifest.
    # - For slave commit queues, it's initialized in _SetPoolFromManifest.
    #
    # In all cases, the pool is saved to disk.
    self.pool = None

  def HandleSkip(self):
    """Handles skip and initializes validation pool from manifest."""
    super(CommitQueueSyncStage, self).HandleSkip()
    filename = self._run.options.validation_pool
    if filename:
      self.pool = validation_pool.ValidationPool.Load(
          filename, builder_run=self._run)
    else:
      self._SetPoolFromManifest(self.manifest_manager.GetLocalManifest())

  def _ChangeFilter(self, _pool, changes, non_manifest_changes):
    # First, look for changes that were tested by the Pre-CQ.
    changes_to_test = []

    _, db = self._run.GetCIDBHandle()
    if db:
      actions_for_changes = db.GetActionsForChanges(changes)
      for change in changes:
        status = clactions.GetCLPreCQStatus(change, actions_for_changes)
        if status == constants.CL_STATUS_PASSED:
          changes_to_test.append(change)
    else:
      logging.warning("DB not available, unable to filter for PreCQ passed.")

    # Allow Commit-Ready=+2 changes to bypass the Pre-CQ, if there are no other
    # changes.
    if not changes_to_test:
      changes_to_test = [x for x in changes if x.HasApproval('COMR', '2')]

    # If we only see changes that weren't verified by Pre-CQ, and some of them
    # are really old changes, try all of the changes. This ensures that the CQ
    # continues to work (albeit slowly) even if the Pre-CQ is down.
    if changes and not changes_to_test:
      oldest = min(x.approval_timestamp for x in changes)
      if time.time() > oldest + self.PRE_CQ_TIMEOUT:
        # It's safest to try all changes here because some of the old changes
        # might depend on newer changes (e.g. via CQ-DEPEND).
        changes_to_test = changes

    return changes_to_test, non_manifest_changes

  def _SetPoolFromManifest(self, manifest):
    """Sets validation pool based on manifest path passed in."""
    # Note that this function is only called after the repo is already
    # sync'd, so AcquirePoolFromManifest does not need to sync.
    self.pool = validation_pool.ValidationPool.AcquirePoolFromManifest(
        manifest=manifest,
        overlays=self._run.config.overlays,
        repo=self.repo,
        build_number=self._run.buildnumber,
        builder_name=self._run.GetBuilderName(),
        buildbucket_id=self._run.options.buildbucket_id,
        is_master=self._run.config.master,
        dryrun=self._run.options.debug,
        builder_run=self._run)

  def GetNextManifest(self):
    """Gets the next manifest using LKGM logic."""
    assert self.manifest_manager, \
        'Must run Initialize before we can get a manifest.'
    assert isinstance(self.manifest_manager, lkgm_manager.LKGMManager), \
        'Manifest manager instantiated with wrong class.'
    assert self._run.config.master

    build_id = self._run.attrs.metadata.GetDict().get('build_id')

    try:
      # In order to acquire a pool, we need an initialized buildroot.
      if not git.FindRepoDir(self.repo.directory):
        self.repo.Initialize()

      query = constants.CQ_READY_QUERY
      if self._run.options.cq_gerrit_override:
        query = (self._run.options.cq_gerrit_override, None)

      self.pool = validation_pool.ValidationPool.AcquirePool(
          overlays=self._run.config.overlays,
          repo=self.repo,
          build_number=self._run.buildnumber,
          builder_name=self._run.GetBuilderName(),
          buildbucket_id=self._run.options.buildbucket_id,
          query=query,
          dryrun=self._run.options.debug,
          check_tree_open=(not self._run.options.debug or
                           self._run.options.mock_tree_status),
          change_filter=self._ChangeFilter,
          builder_run=self._run)
    except validation_pool.TreeIsClosedException as e:
      logging.warning(str(e))
      return None

    build_identifier, _ = self._run.GetCIDBHandle()
    build_id = build_identifier.cidb_id

    logging.info('Creating new candidate manifest.')
    manifest = self.manifest_manager.CreateNewCandidate(
        validation_pool=self.pool, build_id=build_id)
    if MasterSlaveLKGMSyncStage.external_manager:
      MasterSlaveLKGMSyncStage.external_manager.CreateFromManifest(
          manifest, build_id=build_id)

    return manifest

  def ManifestCheckout(self, next_manifest, fetch_all=False):
    """Checks out the repository to the given manifest."""
    # Sync to the provided manifest on slaves. On the master, we're
    # already synced to this manifest, so self.skip_sync is set and
    # this is a no-op.
    super(CommitQueueSyncStage, self).ManifestCheckout(next_manifest, fetch_all)

    if self._run.config.build_before_patching:
      assert not self._run.config.master
      pre_build_passed = self.RunPrePatchBuild()
      logging.PrintBuildbotStepName('CommitQueueSync : Apply Patches')
      if not pre_build_passed:
        logging.PrintBuildbotStepText('Pre-patch build failed.')

    # On slaves, initialize our pool and apply patches. On the master,
    # we've already done that in GetNextManifest, so this is a no-op.
    if not self._run.config.master:
      # Print the list of CHUMP changes since the LKGM, then apply changes and
      # print the list of applied changes.
      self.manifest_manager.GenerateBlameListSinceLKGM()
      self._SetPoolFromManifest(next_manifest)
      try:
        self.pool.ApplyPoolIntoRepo()
      except cros_build_lib.RunCommandError:
        logging.error('Possible git error on ApplyPoolIntoRepo. Retrying after'
                      ' a git cleanup and `fetch --all`.', exc_info=True)
        self.repo.BuildRootGitCleanup(prune_all=True)
        self.repo.FetchAll()
        self.pool.ApplyPoolIntoRepo()

  @failures_lib.SetFailureType(failures_lib.InfrastructureFailure)
  def PerformStage(self):
    """Performs normal stage and prints blamelist at end."""
    if self._run.options.force_version:
      self.HandleSkip()
    else:
      ManifestVersionedSyncStage.PerformStage(self)

    self.WriteChangesToMetadata(self.pool.applied)


class PreCQSyncStage(SyncStage):
  """Sync and apply patches to test if they compile."""

  category = constants.CI_INFRA_STAGE

  def __init__(self, builder_run, buildstore, patches, **kwargs):
    super(PreCQSyncStage, self).__init__(builder_run, buildstore, **kwargs)

    # As a workaround for crbug.com/432706, we scan patches to see if they
    # are already being merged. If they are, we don't test them in the PreCQ.
    self.patches = [p for p in patches if not p.IsBeingMerged()]

    if patches and not self.patches:
      cros_build_lib.Die('No patches that still need testing.')

    # The ValidationPool of patches to test. Initialized in PerformStage, and
    # refreshed after bootstrapping by HandleSkip.
    self.pool = None

  def HandleSkip(self):
    """Handles skip and loads validation pool from disk."""
    super(PreCQSyncStage, self).HandleSkip()
    filename = self._run.options.validation_pool
    if filename:
      self.pool = validation_pool.ValidationPool.Load(
          filename, builder_run=self._run)

  def PerformStage(self):
    super(PreCQSyncStage, self).PerformStage()
    self.pool = validation_pool.ValidationPool.AcquirePreCQPool(
        overlays=self._run.config.overlays,
        build_root=self._build_root,
        build_number=self._run.buildnumber,
        builder_name=self._run.config.name,
        buildbucket_id=self._run.options.buildbucket_id,
        dryrun=self._run.options.debug_forced,
        candidates=self.patches,
        builder_run=self._run)
    self.pool.ApplyPoolIntoRepo()

    if len(self.pool.applied) == 0 and self.patches:
      cros_build_lib.Die('No changes have been applied.')

    changes = self.pool.applied or self.patches
    self.WriteChangesToMetadata(changes)


class PreCQLauncherStage(SyncStage):
  """Scans for CLs and automatically launches Pre-CQ jobs to test them."""

  category = constants.CI_INFRA_STAGE

  # The number of minutes we wait before launching Pre-CQ jobs. This measures
  # the idle time of a given patch series, so, for example, if a user takes
  # 20 minutes to mark a series of 20 patches as ready, we won't launch a
  # tryjob on any of the patches until the user has been idle for 2 minutes.
  LAUNCH_DELAY = 2

  # The number of minutes we allow before considering an in-flight job failed.
  INFLIGHT_TIMEOUT = 240

  # The number of minutes we allow before expiring a pre-cq PASSED or
  # FULLY_VERIFIED status. After this timeout is hit, a CL's status will be
  # reset to None. This prevents very stale CLs from entering the CQ.
  STATUS_EXPIRY_TIMEOUT = 60 * 24 * 7

  # The maximum number of patches we will allow in a given trybot run. This is
  # needed because our trybot infrastructure can only handle so many patches at
  # once.
  MAX_PATCHES_PER_TRYBOT_RUN = 50

  # The maximum derivative of the number of tryjobs we will launch in a given
  # cycle of ProcessChanges. Used to rate-limit the launcher when reopening the
  # tree after building up a large backlog.
  MAX_LAUNCHES_PER_CYCLE_DERIVATIVE = 20

  # Delta time constant for checking buildbucket. Do not check status or
  # cancel builds which were launched >= BUILDBUCKET_DELTA_TIME_HOUR ago.
  BUILDBUCKET_DELTA_TIME_HOUR = 4

  # Delay between launches of sanity-pre-cq builds for the same build config
  PRE_CQ_SANITY_CHECK_PERIOD_HOURS = 5

  # How many days to look back in build history to check for sanity-pre-cq
  PRE_CQ_SANITY_CHECK_LOOK_BACK_HISTORY_DAYS = 1

  def __init__(self, builder_run, buildstore, **kwargs):
    super(PreCQLauncherStage, self).__init__(builder_run, buildstore, **kwargs)
    self.skip_sync = True
    self.last_cycle_launch_count = 0

  def _HasTimedOut(self, start, now, timeout_minutes):
    """Check whether |timeout_minutes| has elapsed between |start| and |now|.

    Args:
      start: datetime.datetime start time.
      now: datetime.datetime current time.
      timeout_minutes: integer number of minutes for timeout.

    Returns:
      True if (now-start) > timeout_minutes.
    """
    diff = datetime.timedelta(minutes=timeout_minutes)
    return (now - start) > diff

  @staticmethod
  def _PrintPatchStatus(patch, status):
    """Print a link to |patch| with |status| info."""
    items = (
        status,
        os.path.basename(patch.project),
        str(patch),
    )
    logging.PrintBuildbotLink(' | '.join(items), patch.url)

  def _GetPreCQConfigsFromOptions(self, change, union_pre_cq_limit=None):
    """Get Pre-CQ configs from CQ config options.

    If union-pre-cq-sub-configs flag is True in the default config file, get
    unioned Pre-CQ configs from the sub configs; else, get Pre-CQ configs from
    the default config file.

    Args:
      change: The instance of cros_patch.GerritPatch to get Pre-CQ configs.
      union_pre_cq_limit: The limit size for unioned Pre-CQ configs if provided.
        Default to None.

    Returns:
      A set of valid Pre-CQ configs (strings) or None.
    """
    try:
      cq_config_parser = cq_config.CQConfigParser(
          self._build_root, change, forgiving=False)
      pre_cq_configs = None
      if cq_config_parser.GetUnionPreCQSubConfigsFlag():
        pre_cq_configs = self._ParsePreCQsFromOption(
            cq_config_parser.GetUnionedPreCQConfigs())
        if (union_pre_cq_limit is not None and pre_cq_configs and
            len(pre_cq_configs) > union_pre_cq_limit):
          raise ExceedUnionPreCQLimitException(pre_cq_configs,
                                               union_pre_cq_limit)

        return pre_cq_configs
      else:
        return self._ParsePreCQsFromOption(cq_config_parser.GetPreCQConfigs())
    except (UnknownPreCQConfigRequestedError,
            cq_config.MalformedCQConfigException):
      logging.exception(
          'Exception encountered when parsing pre-cq options '
          'for change %s. Falling back to default set.', change)
      m = 'chromeos/cbuildbot/pre-cq/bad_pre_cq_options_count'
      metrics.Counter(m).increment()
      return None

  def _ConfiguredVerificationsForChange(self, change):
    """Determine which configs to test |change| with.

    This method returns only the configs that are asked for by the config
    file. It does not include special-case logic for adding additional bots
    based on the type of the repository (see VerificationsForChange for that).

    Args:
      change: GerritPatch instance to get configs-to-test for.

    Returns:
      A set of configs to test.
    """
    configs_to_test = None
    # If a pre-cq config is specified in the commit message, use that.
    # Otherwise, look in appropriate COMMIT-QUEUE.ini. Otherwise, default to
    # constants.PRE_CQ_DEFAULT_CONFIGS
    lines = cros_patch.GetOptionLinesFromCommitMessage(
        change.commit_message, constants.CQ_CONFIG_PRE_CQ_CONFIGS_REGEX)
    if lines is not None:
      try:
        configs_to_test = self._ParsePreCQsFromOption(lines)
      except UnknownPreCQConfigRequestedError:
        logging.exception(
            'Unknown config requested in commit message '
            'for change %s. Falling back to default set.', change)

    configs_from_options = None
    try:
      configs_from_options = self._GetPreCQConfigsFromOptions(
          change, union_pre_cq_limit=DEFAULT_UNION_PRE_CQ_LIMIT)
    except ExceedUnionPreCQLimitException as e:
      pre_cq_configs = list(e.pre_cq_configs)
      pre_cq_configs.sort()
      configs_from_options = pre_cq_configs[:DEFAULT_UNION_PRE_CQ_LIMIT]
      logging.info(
          'Unioned Pre-CQs %s for change %s exceed the limit %d. '
          'Will launch the following Pre-CQ configs: %s', e.pre_cq_configs,
          change.PatchLink(), DEFAULT_UNION_PRE_CQ_LIMIT, configs_from_options)

    configs_to_test = configs_to_test or configs_from_options
    return set(configs_to_test or constants.PRE_CQ_DEFAULT_CONFIGS)

  def VerificationsForChange(self, change):
    """Determine which configs to test |change| with.

    Args:
      change: GerritPatch instance to get configs-to-test for.

    Returns:
      A set of configs to test.
    """
    configs_to_test = self._ConfiguredVerificationsForChange(change)

    # Add the BINHOST_PRE_CQ to any changes that affect an overlay.
    if '/overlays/' in change.project:
      configs_to_test.add(constants.BINHOST_PRE_CQ)

    return configs_to_test

  def _ParsePreCQsFromOption(self, pre_cq_configs):
    """Parse Pre-CQ configs got from option.

    Args:
      pre_cq_configs: A list of Pre-CQ configs got from option, or None.

    Returns:
      A valid Pre-CQ config list, or None.
    """
    if pre_cq_configs:
      configs_to_test = set(pre_cq_configs)

      # Replace 'default' with the default configs.
      if 'default' in configs_to_test:
        configs_to_test.discard('default')
        configs_to_test.update(constants.PRE_CQ_DEFAULT_CONFIGS)

      # Verify that all of the configs are valid.
      if all(c in self._run.site_config for c in configs_to_test):
        return configs_to_test
      else:
        raise UnknownPreCQConfigRequestedError(configs_to_test)

    return None

  def ScreenChangeForPreCQ(self, change):
    """Record which pre-cq tryjobs to test |change| with.

    This method determines which configs to test a given |change| with, and
    writes those as pending tryjobs to the cidb.

    Args:
      change: GerritPatch instance to screen. This change should not yet have
              been screened.
    """
    actions = []
    configs_to_test = self.VerificationsForChange(change)
    for c in configs_to_test:
      actions.append(
          clactions.CLAction.FromGerritPatchAndAction(
              change, constants.CL_ACTION_VALIDATION_PENDING_PRE_CQ, reason=c))
    actions.append(
        clactions.CLAction.FromGerritPatchAndAction(
            change, constants.CL_ACTION_SCREENED_FOR_PRE_CQ))

    build_identifier, db = self._run.GetCIDBHandle()
    build_id = build_identifier.cidb_id
    db.InsertCLActions(build_id, actions)

  def CanSubmitChangeInPreCQ(self, change):
    """Look up whether |change| is configured to be submitted in the pre-CQ.

    Args:
      change: Change to examine.

    Returns:
      Boolean indicating if this change is configured to be submitted in the
      pre-CQ.
    """
    cq_config_parser = cq_config.CQConfigParser(self._build_root, change)
    return cq_config_parser.CanSubmitChangeInPreCQ()

  def GetConfigBuildbucketIdMap(self, output):
    """Convert tryjob json output into a config:buildbucket_id map.

    Config is the config-name of a pre-cq triggered by the pre-cq-launcher.
    buildbucket_id is the request id of the pre-cq build.
    """
    # List of dicts containing 'build_config', 'buildbucket_id', 'url'
    tryjob_output = json.loads(output)
    return {t['build_config']: t['buildbucket_id'] for t in tryjob_output}

  def _LaunchTrybots(self, pool, configs, plan=None, sanity_check_build=False):
    """Launch tryjobs on the configs with patches if provided.

    Args:
      pool: An instance of ValidationPool.validation_pool.
      configs: A list of pre-cq config names to launch.
      plan: A list of patches to test in the pre-cq tryjob, default to None.
      sanity_check_build: Boolean indicating whether to run the tryjobs as
        sanity-check-build.

    Returns:
      A dict mapping from build_config (string) to the buildbucket_id (string)
      of the launched Pre-CQs. An empty dict if any configuration target doesn't
      exist.
    """
    # Verify the configs to test are in the cbuildbot config list.
    for config in configs:
      if config not in self._run.site_config:
        logging.error('No such configuraton target: %s.', config)

        if plan is not None:
          for change in plan:
            logging.error(
                'Skipping trybots on nonexistent config %s for '
                '%s %s', config, str(change), change.url)
            pool.HandleNoConfigTargetFailure(change, config)

        return {}

    cmd = [
        'cros', 'tryjob', '--yes', '--json', '--timeout',
        str(self.INFLIGHT_TIMEOUT * 60)
    ] + configs

    if sanity_check_build:
      cmd += ['--sanity-check-build']

    if plan is not None:
      for patch in plan:
        cmd += ['-g', cros_patch.AddPrefix(patch, patch.gerrit_number)]
        self._PrintPatchStatus(patch, 'testing')

    config_buildbucket_id_map = {}
    if self._run.options.debug:
      logging.debug('Would have launched tryjob with %s', cmd)
    else:
      result = cros_build_lib.RunCommand(
          cmd, cwd=self._build_root, capture_output=True)
      if result and result.output:
        logging.info('output: %s', result.output)
        config_buildbucket_id_map = self.GetConfigBuildbucketIdMap(
            result.output)

    return config_buildbucket_id_map

  def LaunchPreCQs(self, build_id, db, pool, configs, plan):
    """Launch Pre-CQ tryjobs on the configs with patches.

    Args:
      build_id: build_id (string) of the pre-cq-launcher build.
      db: An instance of cidb.CIDBConnection.
      pool: An instance of ValidationPool.validation_pool.
      configs: A list of pre-cq config names to launch.
      plan: The list of patches to test in the pre-cq tryjob.
    """
    logging.info('Launching Pre-CQs for configs %s with changes %s', configs,
                 cros_patch.GetChangesAsString(plan))
    config_buildbucket_id_map = self._LaunchTrybots(pool, configs, plan=plan)

    if not config_buildbucket_id_map:
      return

    # Update cidb clActionTable.
    actions = []
    for config in configs:
      if config in config_buildbucket_id_map:
        for patch in plan:
          actions.append(
              clactions.CLAction.FromGerritPatchAndAction(
                  patch,
                  constants.CL_ACTION_TRYBOT_LAUNCHING,
                  config,
                  buildbucket_id=config_buildbucket_id_map[config]))

    db.InsertCLActions(build_id, actions)

  def LaunchSanityPreCQs(self, build_id, db, pool, configs):
    """Launch Sanity-Pre-CQ tryjobs on the configs.

    Args:
      build_id: build_id (string) of the pre-cq-launcher build.
      db: An instance of cidb.CIDBConnection or None.
      pool: An instance of ValidationPool.validation_pool.
      configs: A set of pre-cq config names to launch.
    """
    logging.info('Launching Sanity-Pre-CQs for configs %s.', configs)
    config_buildbucket_id_map = self._LaunchTrybots(
        pool, configs, sanity_check_build=True)

    if not config_buildbucket_id_map:
      return

    if db:
      launched_build_reqs = []
      for config in configs:
        launched_build_reqs.append(
            build_requests.BuildRequest(
                None, build_id, config, None, config_buildbucket_id_map[config],
                build_requests.REASON_SANITY_PRE_CQ, None))

      if launched_build_reqs:
        db.InsertBuildRequests(launched_build_reqs)

  def GetDisjointTransactionsToTest(self, pool, progress_map):
    """Get the list of disjoint transactions to test.

    Side effect: reject or retry changes that have timed out.

    Args:
      pool: The validation pool.
      progress_map: See return type of clactions.GetPreCQProgressMap.

    Returns:
      A list of (transaction, config) tuples corresponding to different trybots
      that should be launched.
    """
    # Get the set of busy and passed CLs.
    busy, _, verified = clactions.GetPreCQCategories(progress_map)

    screened_changes = set(progress_map)

    # Create a list of disjoint transactions to test.
    manifest = git.ManifestCheckout.Cached(self._build_root)
    logging.info('Creating disjoint transactions.')
    plans, failed = pool.CreateDisjointTransactions(
        manifest,
        screened_changes,
        max_txn_length=self.MAX_PATCHES_PER_TRYBOT_RUN)
    logging.info('Created %s disjoint transactions.', len(plans))

    # Note: |failed| is a list of cros_patch.PatchException instances.
    logging.info('Failed to apply %s CLs. Marked them as failed.', len(failed))
    for f in failed:
      pool.UpdateCLPreCQStatus(f.patch, constants.CL_STATUS_FAILED)

    for plan in plans:
      # If any of the CLs in the plan is not yet screened, wait for them to
      # be screened.
      #
      # If any of the CLs in the plan are currently "busy" being tested,
      # wait until they're done before starting to test this plan.
      #
      # Similarly, if all of the CLs in the plan have already been validated,
      # there's no need to launch a trybot run.
      plan = set(plan)
      if not plan.issubset(screened_changes):
        logging.info(
            'CLs waiting to be screened: %s',
            cros_patch.GetChangesAsString(plan.difference(screened_changes)))
      elif plan.issubset(verified):
        logging.info('CLs already verified: %s',
                     cros_patch.GetChangesAsString(plan))
      elif plan.intersection(busy):
        logging.info('CLs currently being verified: %s',
                     cros_patch.GetChangesAsString(plan.intersection(busy)))
        if plan.difference(busy):
          logging.info('CLs waiting on verification of dependencies: %r',
                       cros_patch.GetChangesAsString(plan.difference(busy)))
      # TODO(akeshet): Consider using a database time rather than gerrit
      # approval time and local clock for launch delay.
      elif any(x.approval_timestamp + self.LAUNCH_DELAY * 60 > time.time()
               for x in plan):
        logging.info('CLs waiting on launch delay: %s',
                     cros_patch.GetChangesAsString(plan))
      else:
        pending_configs = clactions.GetPreCQConfigsToTest(plan, progress_map)
        for config in pending_configs:
          yield (plan, config)

  def _ProcessRequeuedAndSpeculative(self, change, action_history):
    """Detect if |change| was requeued by developer, and mark in cidb.

    Args:
      change: GerritPatch instance to check.
      action_history: List of CLActions.
    """
    action_string = clactions.GetRequeuedOrSpeculative(change, action_history,
                                                       not change.IsMergeable())
    if action_string:
      build_identifier, db = self._run.GetCIDBHandle()
      build_id = build_identifier.cidb_id
      action = clactions.CLAction.FromGerritPatchAndAction(
          change, action_string)
      db.InsertCLActions(build_id, [action])
      logging.info('Record change %s with action %s build_id %s.', change,
                   action_string, build_id)

  def _ProcessExpiry(self, change, status, timestamp, pool, current_time):
    """Enforce expiry of a PASSED or FULLY_VERIFIED status.

    Args:
      change: GerritPatch instance to process.
      status: |change|'s pre-cq status.
      timestamp: datetime.datetime for when |status| was achieved.
      pool: The current validation pool.
      current_time: datetime.datetime for current database time.
    """
    if not timestamp:
      return
    timed_out = self._HasTimedOut(timestamp, current_time,
                                  self.STATUS_EXPIRY_TIMEOUT)
    verified = status in (constants.CL_STATUS_PASSED,
                          constants.CL_STATUS_FULLY_VERIFIED)
    if timed_out and verified:
      msg = PRECQ_EXPIRY_MSG % self.STATUS_EXPIRY_TIMEOUT
      build_identifier, db = self._run.GetCIDBHandle()
      build_id = build_identifier.cidb_id
      if db:
        pool.SendNotification(change, '%(details)s', details=msg)
        action = clactions.CLAction.FromGerritPatchAndAction(
            change, constants.CL_ACTION_PRE_CQ_RESET)
        db.InsertCLActions(build_id, [action])

  def _ProcessTimeouts(self, change, progress_map, pool, current_time):
    """Enforce per-config inflight timeouts.

    Args:
      change: GerritPatch instance to process.
      progress_map: See return type of clactions.GetPreCQProgressMap.
      pool: The current validation pool.
      current_time: datetime.datetime timestamp giving current database time.
    """
    config_progress = progress_map[change]
    for config, progress in config_progress.iteritems():
      # Note: only "INFLIGHT" status has the timeout.
      if (progress.status != constants.CL_PRECQ_CONFIG_STATUS_INFLIGHT or
          not self._HasTimedOut(progress.timestamp, current_time,
                                self.INFLIGHT_TIMEOUT)):
        continue

      msg = PRECQ_INFLIGHT_TIMEOUT_MSG % (config, self.INFLIGHT_TIMEOUT)
      pool.SendNotification(change, '%(details)s', details=msg)
      pool.RemoveReady(change, reason=config)
      pool.UpdateCLPreCQStatus(change, constants.CL_STATUS_FAILED)

  def _CancelPreCQIfNeeded(self, db, old_build_action):
    """Cancel the pre-cq if it's still running.

    Args:
      db: CIDB connection instance.
      old_build_action: Old patch build action.
    """
    buildbucket_id = old_build_action.buildbucket_id
    get_content = self.buildbucket_client.GetBuildRequest(
        buildbucket_id, dryrun=self._run.options.debug)

    status = buildbucket_lib.GetBuildStatus(get_content)
    if status in [
        constants.BUILDBUCKET_BUILDER_STATUS_SCHEDULED,
        constants.BUILDBUCKET_BUILDER_STATUS_STARTED
    ]:
      logging.info(
          'Cancelling old build buildbucket_id: %s, '
          'current status: %s.', buildbucket_id, status)

      cancel_content = self.buildbucket_client.CancelBuildRequest(
          buildbucket_id, dryrun=self._run.options.debug)
      cancel_status = buildbucket_lib.GetBuildStatus(cancel_content)
      if cancel_status:
        logging.info(
            'Cancelled old build buildbucket_id: %s, '
            'current status: %s', buildbucket_id, cancel_status)
        metrics.Counter(constants.MON_BB_CANCEL_PRE_CQ_BUILD_COUNT).increment()

        if self.buildstore.AreClientsReady():
          status_list = self.buildstore.GetBuildStatuses(
              buildbucket_ids=[buildbucket_id])
          old_build = status_list[0] if status_list else None
          if old_build is not None:
            cancel_action = old_build_action._replace(
                action=constants.CL_ACTION_TRYBOT_CANCELLED)
            db.InsertCLActions(old_build['id'], [cancel_action])
      else:
        # If the old pre-cq build already completed, CANCEL response will
        # give 200 returncode with error reasons.
        logging.info('Failed to cancel build buildbucket_id: %s, reason: %s',
                     buildbucket_id,
                     buildbucket_lib.GetErrorReason(cancel_content))

  def _ProcessOldPatchPreCQRuns(self, db, change, action_history):
    """Process Pre-cq runs for change with old patch numbers.

    Args:
      db: CIDB connection instance.
      change: GerritPatch instance to process.
      action_history: List of CLActions.
    """
    min_timestamp = datetime.datetime.now() - datetime.timedelta(
        hours=self.BUILDBUCKET_DELTA_TIME_HOUR)
    old_pre_cq_build_actions = clactions.GetOldPreCQBuildActions(
        change, action_history, min_timestamp)
    for old_build_action in old_pre_cq_build_actions:
      try:
        self._CancelPreCQIfNeeded(db, old_build_action)
      except buildbucket_lib.BuildbucketResponseException as e:
        # Do not raise if it's buildbucket_lib.BuildbucketResponseException.
        logging.error(
            'Failed to cancel the old pre cq run through Buildbucket.'
            ' change: %s buildbucket_id: %s error: %r', change,
            old_build_action.buildbucket_id, e)

  def _ProcessPreCQEarlyCrashes(self, progress_map, pool):
    """Processes Pre-CQ builders crashed in early stages.

    If a Pre-CQ builder crashes in early stages, it does not insert any
    CL actions to CIDB. This function will detect such crashes by querying
    Buildbucket and insert necessary CL actions on behalf of the crashed
    builder.

    Args:
      progress_map: See return type of clactions.GetPreCQProgressMap.
      pool: The current validation pool.
    """
    if not self.buildbucket_client:
      return

    for change, config_map in progress_map.iteritems():
      for config, progress in config_map.iteritems():
        if progress.status != constants.CL_PRECQ_CONFIG_STATUS_LAUNCHED:
          continue
        if progress.buildbucket_id is None:
          continue

        build_info = self.buildbucket_client.GetBuildRequest(
            progress.buildbucket_id, dryrun=False)
        if not build_info:
          continue

        status = buildbucket_lib.GetBuildStatus(build_info)
        if status != constants.BUILDBUCKET_BUILDER_STATUS_COMPLETED:
          continue

        msg = PRECQ_EARLY_CRASH_MSG % (config,
                                       tree_status.ConstructLegolandBuildURL(
                                           progress.buildbucket_id))
        pool.SendNotification(change, '%(details)s', details=msg)
        pool.RemoveReady(change, reason=config)
        pool.UpdateCLPreCQStatus(change, constants.CL_STATUS_FAILED)

  def _GetFailedPreCQConfigs(self, action_history):
    """Get failed Pre-CQ build configs from action history.

    Args:
      action_history: A list of clactions.CLAction.

    Returns:
      A set of failed Pre-CQ build configs.
    """
    site_config = config_lib.GetConfig()
    failed_build_configs = set()
    for action in action_history:
      build_config = action.build_config
      if (build_config not in failed_build_configs and
          site_config.get(build_config) is not None and
          site_config[build_config].build_type == constants.PRE_CQ_TYPE and
          action.action == constants.CL_ACTION_PICKED_UP and
          action.status == constants.BUILDER_STATUS_FAILED):
        failed_build_configs.add(build_config)

    return failed_build_configs

  def _FailureStreakCounterExceedsThreshold(self, build_config, build_history):
    """Check whether the consecutive failure counter exceeds the threshold.

    Args:
      db: CIDBConnection instance.
      build_config: The build config to check.
      build_history: A sorted list of dict containing build information. See
        Return types of cidb.CIDBConnection.GetBuildsHistory.

    Returns:
      A boolean indicating whether the consecutive failure counter of
        build_config exceeds its sanity_check_threshold.
    """
    site_config = config_lib.GetConfig()
    sanity_check_threshold = site_config[build_config].sanity_check_threshold

    if sanity_check_threshold <= 0:
      return False

    streak_counter = 0
    for build in build_history:
      if build['status'] == constants.BUILDER_STATUS_PASSED:
        return False
      elif build['status'] == constants.BUILDER_STATUS_FAILED:
        streak_counter += 1

      if streak_counter >= sanity_check_threshold:
        return True

    return False

  def _GetBuildConfigsToSanityCheck(self, db, build_configs):
    """Get build configs to sanity check.

    Args:
      db: An instance of cidb.CIDBConnection.
      build_configs: A list of build configs (strings) to check whether to
        sanity check.

    Returns:
      A list of build_configs (strings) to sanity check.
    """
    start_date = datetime.datetime.now().date() - datetime.timedelta(
        days=self.PRE_CQ_SANITY_CHECK_LOOK_BACK_HISTORY_DAYS)
    builds_history = db.GetBuildsHistory(
        build_configs,
        db.NUM_RESULTS_NO_LIMIT,
        start_date=start_date,
        final=True)
    build_history_by_build_config = cros_collections.GroupByKey(
        builds_history, 'build_config')

    start_time = datetime.datetime.now() - datetime.timedelta(
        hours=self.PRE_CQ_SANITY_CHECK_PERIOD_HOURS)
    builds_requests = db.GetBuildRequestsForBuildConfigs(
        build_configs, start_time=start_time)
    build_requests_by_build_config = cros_collections.GroupNamedtuplesByKey(
        builds_requests, 'request_build_config')

    sanity_check_build_configs = set()
    for build_config in build_configs:
      build_history = build_history_by_build_config.get(build_config, [])
      build_reqs = build_requests_by_build_config.get(build_config, [])
      if (build_history and
          not build_reqs and self._FailureStreakCounterExceedsThreshold(
              build_config, build_history)):
        sanity_check_build_configs.add(build_config)

    return list(sanity_check_build_configs)

  def _LaunchSanityCheckPreCQsIfNeeded(self, build_id, db, pool,
                                       action_history):
    """Check the Pre-CQs of changes and launch Sanity-Pre-CQs if needed.

    Args:
      build_id: build_id (string) of the pre-cq-launcher build.
      db: An instance of cidb.CIDBConnection.
      pool: An instance of ValidationPool.validation_pool.
      action_history: A list of clactions.CLActions.
    """
    failed_build_configs = self._GetFailedPreCQConfigs(action_history)

    if not failed_build_configs:
      return

    sanity_check_build_configs = self._GetBuildConfigsToSanityCheck(
        db, failed_build_configs)

    if sanity_check_build_configs:
      self.LaunchSanityPreCQs(build_id, db, pool, sanity_check_build_configs)

  def _ProcessVerified(self, change, can_submit, will_submit):
    """Process a change that is fully pre-cq verified.

    Args:
      change: GerritPatch instance to process.
      can_submit: set of changes that can be submitted by the pre-cq.
      will_submit: set of changes that will be submitted by the pre-cq.

    Returns:
      A tuple of (set of changes that should be submitted by pre-cq,
                  set of changes that should be passed by pre-cq)
    """
    # If this change and all its dependencies are pre-cq submittable,
    # and none of them have yet been marked as pre-cq passed, then
    # mark them for submission. Otherwise, mark this change as passed.
    if change in will_submit:
      return set(), set()

    if change in can_submit:
      logging.info('Attempting to determine if %s can be submitted.', change)
      patches = patch_series.PatchSeries(self._build_root)
      try:
        plan = patches.CreateTransaction(change, limit_to=can_submit)
        return plan, set()
      except cros_patch.DependencyError:
        pass

    # Changes that cannot be submitted are marked as passed.
    return set(), set([change])

  def UpdateChangeStatuses(self, changes, status):
    """Update |changes| to |status|.

    Args:
      changes: A set of GerritPatch instances.
      status: One of constants.CL_STATUS_* statuses.
    """
    if changes:
      build_identifier, db = self._run.GetCIDBHandle()
      build_id = build_identifier.cidb_id
      a = clactions.TranslatePreCQStatusToAction(status)
      actions = [
          clactions.CLAction.FromGerritPatchAndAction(c, a) for c in changes
      ]
      db.InsertCLActions(build_id, actions)

  def _LaunchPreCQsIfNeeded(self, pool, changes):
    """Find ready changes and launch Pre-CQs.

    Args:
      pool: An instance of ValidationPool.validation_pool.
      changes: GerritPatch instances.
    """
    build_identifier, db = self._run.GetCIDBHandle()
    build_id = build_identifier.cidb_id

    action_history, _, status_map = (
        self._GetUpdatedActionHistoryAndStatusMaps(db, changes))

    # Filter out failed speculative changes.
    changes = [
        c for c in changes
        if status_map[c] != constants.CL_STATUS_FAILED or c.HasReadyFlag()
    ]

    progress_map = clactions.GetPreCQProgressMap(changes, action_history)

    # Filter out changes that have already failed, and aren't marked trybot
    # ready or commit ready, before launching.
    launchable_progress_map = {
        k: v
        for k, v in progress_map.iteritems()
        if k.HasReadyFlag() or status_map[k] != constants.CL_STATUS_FAILED
    }

    is_tree_open = tree_status.IsTreeOpen(throttled_ok=True)
    launch_count = 0
    cl_launch_count = 0
    launch_count_limit = (
        self.last_cycle_launch_count + self.MAX_LAUNCHES_PER_CYCLE_DERIVATIVE)

    launches = {}
    for plan, config in self.GetDisjointTransactionsToTest(
        pool, launchable_progress_map):
      launches.setdefault(frozenset(plan), []).append(config)

    for plan, configs in launches.iteritems():
      if not is_tree_open:
        logging.info('Tree is closed, not launching configs %r for plan %s.',
                     configs, cros_patch.GetChangesAsString(plan))
      elif launch_count >= launch_count_limit:
        logging.info(
            'Hit or exceeded maximum launch count of %s this cycle, '
            'not launching configs %r for plan %s.', launch_count_limit,
            configs, cros_patch.GetChangesAsString(plan))
      else:
        self.LaunchPreCQs(build_id, db, pool, configs, plan)
        launch_count += len(configs)
        cl_launch_count += len(configs) * len(plan)

    metrics.Counter(constants.MON_PRECQ_LAUNCH_COUNT).increment_by(launch_count)
    metrics.Counter(
        constants.MON_PRECQ_CL_LAUNCH_COUNT).increment_by(cl_launch_count)
    metrics.Counter(constants.MON_PRECQ_TICK_COUNT).increment()

    self.last_cycle_launch_count = launch_count

  def _GetUpdatedActionHistoryAndStatusMaps(self, db, changes):
    """Get updated action_history and Pre-CQ status for changes.

    Args:
      db: cidb.CIDBConnection instance.
      changes: GerritPatch instances to process.

    Returns:
      (The current CLAction list of the changes, the current map from changes to
       (status, timestamp) tuple, the current map from changes to status).
    """
    action_history = db.GetActionsForChanges(changes)

    status_and_timestamp_map = {
        c: clactions.GetCLPreCQStatusAndTime(c, action_history) for c in changes
    }
    status_map = {c: v[0] for c, v in status_and_timestamp_map.items()}

    status_str_map = {c.PatchLink(): s for c, s in status_map.iteritems()}
    logging.info('Processing status_map: %s', pprint.pformat(status_str_map))

    return action_history, status_and_timestamp_map, status_map

  def ProcessChanges(self, pool, changes, _non_manifest_changes):
    """Process a list of changes that were marked as Ready.

    From our list of changes that were marked as Ready, we create a
    list of disjoint transactions and send each one to a separate Pre-CQ
    trybot.

    Non-manifest changes are just submitted here because they don't need to be
    verified by either the Pre-CQ or CQ.
    """
    build_identifier, db = self._run.GetCIDBHandle()
    build_id = build_identifier.cidb_id

    action_history = db.GetActionsForChanges(changes)

    self._LaunchSanityCheckPreCQsIfNeeded(build_id, db, pool, action_history)

    if self.buildbucket_client is not None:
      for change in changes:
        self._ProcessOldPatchPreCQRuns(db, change, action_history)

    for change in changes:
      self._ProcessRequeuedAndSpeculative(change, action_history)

    action_history, status_and_timestamp_map, status_map = (
        self._GetUpdatedActionHistoryAndStatusMaps(db, changes))

    # Filter out failed speculative changes.
    changes = [
        c for c in changes
        if status_map[c] != constants.CL_STATUS_FAILED or c.HasReadyFlag()
    ]

    progress_map = clactions.GetPreCQProgressMap(changes, action_history)
    busy, inflight, verified = clactions.GetPreCQCategories(progress_map)
    logging.info(
        'Changes in busy: %s.\nChanges in inflight: %s.\nChanges in '
        'verified: %s.', cros_patch.GetChangesAsString(busy),
        cros_patch.GetChangesAsString(inflight),
        cros_patch.GetChangesAsString(verified))

    current_db_time = db.GetTime()

    to_process = set(
        c for c in changes if status_map[c] != constants.CL_STATUS_PASSED)

    # Mark verified changes verified.
    to_mark_verified = [
        c for c in verified.intersection(to_process)
        if status_map[c] != constants.CL_STATUS_FULLY_VERIFIED
    ]
    self.UpdateChangeStatuses(to_mark_verified,
                              constants.CL_STATUS_FULLY_VERIFIED)
    # Send notifications to the fully verified changes.
    if to_mark_verified:
      pool.HandlePreCQSuccess(to_mark_verified)

    # Changes that can be submitted, if their dependencies can be too. Only
    # include changes that have not already been marked as passed.
    can_submit = set(
        c for c in (verified.intersection(to_process))
        if c.IsMergeable() and self.CanSubmitChangeInPreCQ(c))

    self.SendChangeCountStats(status_map)

    # Changes that will be submitted.
    will_submit = set()
    # Changes that will be passed.
    will_pass = set()

    for change in inflight:
      if status_map[change] != constants.CL_STATUS_INFLIGHT:
        build_ids = [x.build_id for x in progress_map[change].values()]
        # Change the status to inflight.
        self.UpdateChangeStatuses([change], constants.CL_STATUS_INFLIGHT)
        build_dicts = self.buildstore.GetBuildStatuses(build_ids=build_ids)
        lines = []
        for b in build_dicts:
          url = tree_status.ConstructLegolandBuildURL(b['buildbucket_id'])
          lines.append('(%s) : %s ' % (b['build_config'], url))

        # Send notifications.
        pool.HandleApplySuccess(change, build_log=('\n' + '\n'.join(lines)))

    for change in to_process:
      # Detect if change is ready to be marked as passed, or ready to submit.
      if change in verified and change.IsMergeable():
        to_submit, to_pass = self._ProcessVerified(change, can_submit,
                                                   will_submit)
        will_submit.update(to_submit)
        will_pass.update(to_pass)
        continue

      # Screen unscreened changes to determine which trybots to test them with.
      if not clactions.IsChangeScreened(change, action_history):
        self.ScreenChangeForPreCQ(change)
        continue

      self._ProcessTimeouts(change, progress_map, pool, current_db_time)

    # Process trybots crashed in early stages.
    self._ProcessPreCQEarlyCrashes(progress_map, pool)

    # Mark passed changes as passed
    self.UpdateChangeStatuses(will_pass, constants.CL_STATUS_PASSED)

    # Expire any very stale passed or fully verified changes.
    for c, v in status_and_timestamp_map.items():
      self._ProcessExpiry(c, v[0], v[1], pool, current_db_time)

    # Submit changes that are ready to submit, if we can.
    if tree_status.IsTreeOpen(throttled_ok=True):
      pool.SubmitNonManifestChanges(
          check_tree_open=False, reason=constants.STRATEGY_NONMANIFEST)
      submit_reason = constants.STRATEGY_PRECQ_SUBMIT
      will_submit = {c: submit_reason for c in will_submit}
      submitted, _ = pool.SubmitChanges(will_submit, check_tree_open=False)

      # Record stats about submissions in monarch.
      if db:
        submitted_change_actions = db.GetActionsForChanges(submitted)
        strategies = {m: constants.STRATEGY_PRECQ_SUBMIT for m in submitted}
        clactions_metrics.RecordSubmissionMetrics(
            clactions.CLActionHistory(submitted_change_actions), strategies)

    self._LaunchPreCQsIfNeeded(pool, changes)

    # Tell ValidationPool to keep waiting for more changes until we hit
    # its internal timeout.
    return [], []

  def SendChangeCountStats(self, status_map):
    """Sends metrics of the CL counts to Monarch.

    Args:
      status_map: A map from CLs to statuses.
    """
    # Separately count and log the number of mergable and speculative changes in
    # each of the possible pre-cq statuses (or in status None).
    POSSIBLE_STATUSES = clactions.PRE_CQ_CL_STATUSES | {None}
    status_counts = {}
    for count_bin in itertools.product((True, False), POSSIBLE_STATUSES):
      status_counts[count_bin] = 0
    for c, status in status_map.iteritems():
      count_bin = (c.IsMergeable(), status)
      status_counts[count_bin] += 1
    for (is_mergable, status), count in sorted(status_counts.items()):
      subtype = 'mergeable' if is_mergable else 'speculative'
      metrics.Gauge('chromeos/cbuildbot/pre-cq/cl-count').set(
          count, fields={
              'status': str(status),
              'subtype': subtype
          })

  @failures_lib.SetFailureType(failures_lib.InfrastructureFailure)
  def PerformStage(self):
    # Setup and initialize the repo.
    super(PreCQLauncherStage, self).PerformStage()

    query = constants.PRECQ_READY_QUERY
    if self._run.options.cq_gerrit_override:
      query = (self._run.options.cq_gerrit_override, None)

    # Loop through all of the changes until we hit a timeout.
    validation_pool.ValidationPool.AcquirePool(
        overlays=self._run.config.overlays,
        repo=self.repo,
        build_number=self._run.buildnumber,
        builder_name=self._run.GetBuilderName(),
        buildbucket_id=self._run.options.buildbucket_id,
        query=query,
        dryrun=self._run.options.debug,
        check_tree_open=False,
        change_filter=self.ProcessChanges,
        builder_run=self._run)
