blob: fb4f8866fac731fd19d6d6800c9183ceb5b54b0e [file] [log] [blame]
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Module containing the sync stages."""
from __future__ import print_function
import collections
import ConfigParser
import contextlib
import datetime
import itertools
import json
import os
import re
import shutil
import sys
import time
from xml.etree import ElementTree
from xml.dom import minidom
from chromite.cbuildbot import buildbucket_lib
from chromite.cbuildbot import chroot_lib
from chromite.lib import config_lib
from chromite.lib import constants
from chromite.lib import failures_lib
from chromite.cbuildbot import lkgm_manager
from chromite.cbuildbot import manifest_version
from chromite.cbuildbot import repository
from chromite.cbuildbot import tree_status
from chromite.cbuildbot import triage_lib
from chromite.cbuildbot import trybot_patch_pool
from chromite.cbuildbot import validation_pool
from chromite.cbuildbot.stages import generic_stages
from chromite.cbuildbot.stages import build_stages
from chromite.lib import clactions
from chromite.lib import commandline
from chromite.lib import cros_build_lib
from chromite.lib import cros_logging as logging
from chromite.lib import git
from chromite.lib import graphite
from chromite.lib import metrics
from chromite.lib import osutils
from chromite.lib import patch as cros_patch
from chromite.lib import timeout_util
from chromite.scripts import cros_mark_android_as_stable
from chromite.scripts import cros_mark_chrome_as_stable
site_config = config_lib.GetConfig()
PRE_CQ = validation_pool.PRE_CQ
'We were not able to launch a %s trybot for your change within '
'%s minutes.\n\n'
'This problem can happen if the trybot waterfall is very '
'busy, or if there is an infrastructure issue. Please '
'notify the sheriff and mark your change as ready again. If '
'this problem occurs multiple times in a row, please file a '
'The %s trybot for your change timed out after %s minutes.'
'This problem can happen if your change causes the builder '
'to hang, or if there is some infrastructure issue. If your '
'change is not at fault you may mark your change as ready '
'again. If this problem occurs multiple times please notify '
'the sheriff and file a bug.')
'The pre-cq verification for this change expired after %s minutes. No '
'action is required on your part.'
'In order to protect the CQ from picking up stale changes, the pre-cq '
'status for changes are cleared after a generous timeout. This change '
'will be re-tested by the pre-cq before the CQ picks it up.')
class PatchChangesStage(generic_stages.BuilderStage):
"""Stage that patches a set of Gerrit changes to the buildroot source tree."""
def __init__(self, builder_run, patch_pool, **kwargs):
"""Construct a PatchChangesStage.
builder_run: BuilderRun object.
patch_pool: A TrybotPatchPool object containing the different types of
patches to apply.
super(PatchChangesStage, self).__init__(builder_run, **kwargs)
self.patch_pool = patch_pool
def _CheckForDuplicatePatches(_series, changes):
conflicts = {}
duplicates = []
for change in changes:
if is None:
"Change %s lacks a usable ChangeId; duplicate checking cannot "
"be done for this change. If cherry-picking fails, this is a "
"potential cause.", change)
conflicts.setdefault(, []).append(change)
duplicates = [x for x in conflicts.itervalues() if len(x) > 1]
if not duplicates:
return changes
for conflict in duplicates:
"Changes %s conflict with each other- they have same id %s., "
.join(map(str, conflict)), conflict[0].id)
cros_build_lib.Die("Duplicate patches were encountered: %s", duplicates)
def _PatchSeriesFilter(self, series, changes):
return self._CheckForDuplicatePatches(series, changes)
def _ApplyPatchSeries(self, series, patch_pool, **kwargs):
"""Applies a patch pool using a patch series."""
kwargs.setdefault('frozen', False)
# Honor the given ordering, so that if a gerrit/remote patch
# conflicts w/ a local patch, the gerrit/remote patch are
# blamed rather than local (patch ordering is typically
# local, gerrit, then remote).
kwargs.setdefault('honor_ordering', True)
kwargs['changes_filter'] = self._PatchSeriesFilter
_applied, failed_tot, failed_inflight = series.Apply(
list(patch_pool), **kwargs)
failures = failed_tot + failed_inflight
if failures:
def HandleApplyFailures(self, failures):
cros_build_lib.Die("Failed applying patches: %s",
"\n".join(map(str, failures)))
def PerformStage(self):
class NoisyPatchSeries(validation_pool.PatchSeries):
"""Custom PatchSeries that adds links to buildbot logs for remote trys."""
def ApplyChange(self, change):
if isinstance(change, cros_patch.GerritPatch):
logging.PrintBuildbotLink(str(change), change.url)
elif isinstance(change, cros_patch.UploadedLocalPatch):
return validation_pool.PatchSeries.ApplyChange(self, change)
# If we're an external builder, ignore internal patches.
helper_pool = validation_pool.HelperPool.SimpleCreate(
cros_internal=self._run.config.internal, cros=True)
# Limit our resolution to non-manifest patches.
patch_series = NoisyPatchSeries(
deps_filter_fn=lambda p: not trybot_patch_pool.ManifestFilter(p))
self._ApplyPatchSeries(patch_series, self.patch_pool)
class BootstrapStage(PatchChangesStage):
"""Stage that patches a chromite repo and re-executes inside it.
returncode - the returncode of the cbuildbot re-execution. Valid after
calling stage.Run().
option_name = 'bootstrap'
def __init__(self, builder_run, patch_pool, **kwargs):
super(BootstrapStage, self).__init__(
builder_run, trybot_patch_pool.TrybotPatchPool(), **kwargs)
self.patch_pool = patch_pool
self.config_repo = self._run.options.config_repo
self.returncode = None
self.tempdir = None
def _ApplyManifestPatches(self, patch_pool):
"""Apply a pool of manifest patches to a temp manifest checkout.
patch_pool: The pool to apply.
The path to the patched manifest checkout.
Exception, if the new patched manifest cannot be parsed.
checkout_dir = os.path.join(self.tempdir, 'manfest-checkout')
patch_series = validation_pool.PatchSeries.WorkOnSingleRepo(
checkout_dir, tracking_branch=self._run.manifest_branch)
self._ApplyPatchSeries(patch_series, patch_pool)
# Verify that the patched manifest loads properly. Propagate any errors as
# exceptions.
manifest = os.path.join(checkout_dir, self._run.config.manifest)
git.Manifest.Cached(manifest, manifest_include_dir=checkout_dir)
return checkout_dir
def _FilterArgsForApi(parsed_args, api_minor):
"""Remove arguments that are introduced after an api version."""
def filter_fn(passed_arg):
return passed_arg.opt_inst.api_version <= api_minor
accepted, removed = commandline.FilteringParser.FilterArgs(
parsed_args, filter_fn)
if removed:
logging.warning("The following arguments were removed due to api: '%s'"
% ' '.join(removed))
return accepted
def FilterArgsForTargetCbuildbot(cls, buildroot, cbuildbot_path, options):
_, minor = cros_build_lib.GetTargetChromiteApiVersion(buildroot)
args = [cbuildbot_path]
args.extend(cls._FilterArgsForApi(options.parsed_args, minor))
# Only pass down --cache-dir if it was specified. By default, we want
# the cache dir to live in the root of each checkout, so this means that
# each instance of cbuildbot needs to calculate the default separately.
if minor >= 2 and options.cache_dir_specified:
args += ['--cache-dir', options.cache_dir]
return args
def BootstrapPatchesNeeded(cls, builder_run, patch_pool):
"""See if bootstrapping is needed for any of the given patches.
Does NOT determine if they have already been applied.
builder_run: BuilderRun object for this build.
patch_pool: All patches to be applied this run.
boolean True if bootstrapping is needed.
chromite_pool = patch_pool.Filter(project=constants.CHROMITE_PROJECT)
if builder_run.config.internal:
manifest_pool = patch_pool.FilterIntManifest()
manifest_pool = patch_pool.FilterExtManifest()
return bool(chromite_pool or manifest_pool)
def HandleApplyFailures(self, failures):
"""Handle the case where patches fail to apply."""
if self._run.config.pre_cq:
# Let the PreCQSync stage handle this failure. The PreCQSync stage will
# comment on CLs with the appropriate message when they fail to apply.
# WARNING: For manifest patches, the Pre-CQ attempts to apply external
# patches to the internal manifest, and this means we may flag a conflict
# here even if the patch applies cleanly. TODO(davidjames): Fix this.
logging.error('Failed applying patches: %s\n'.join(map(str, failures)))
PatchChangesStage.HandleApplyFailures(self, failures)
def _PerformStageInTempDir(self):
# The plan for the builders is to use master branch to bootstrap other
# branches. Now, if we wanted to test patches for both the bootstrap code
# (on master) and the branched chromite (say, R20), we need to filter the
# patches by branch.
filter_branch = self._run.manifest_branch
if self._run.options.test_bootstrap:
filter_branch = 'master'
# Filter all requested patches for the branch.
branch_pool = self.patch_pool.FilterBranch(filter_branch)
# Checkout the new version of chromite, and patch it.
chromite_dir = os.path.join(self.tempdir, 'chromite')
reference_repo = os.path.join(constants.CHROMITE_DIR, '.git')
repository.CloneGitRepo(chromite_dir, constants.CHROMITE_URL,
git.RunGit(chromite_dir, ['checkout', filter_branch])
chromite_pool = branch_pool.Filter(project=constants.CHROMITE_PROJECT)
if chromite_pool:
patch_series = validation_pool.PatchSeries.WorkOnSingleRepo(
chromite_dir, filter_branch)
self._ApplyPatchSeries(patch_series, chromite_pool)
# Checkout the new version of site config (no patching logic, yet).
if self.config_repo:
site_config_dir = os.path.join(chromite_dir, 'config')
site_config_reference_repo = os.path.join(constants.SITE_CONFIG_DIR,
repository.CloneGitRepo(site_config_dir, self.config_repo,
git.RunGit(site_config_dir, ['checkout', filter_branch])
site_config_pool = branch_pool.FilterGitRemoteUrl(self.config_repo)
if site_config_pool:
site_patch_series = validation_pool.PatchSeries.WorkOnSingleRepo(
site_config_dir, filter_branch)
self._ApplyPatchSeries(site_patch_series, site_config_pool)
# Re-exec into new instance of cbuildbot, with proper command line args.
cbuildbot_path = constants.PATH_TO_CBUILDBOT
if not os.path.exists(os.path.join(self.tempdir, cbuildbot_path)):
cbuildbot_path = 'chromite/cbuildbot/cbuildbot'
cmd = self.FilterArgsForTargetCbuildbot(self.tempdir, cbuildbot_path,
extra_params = ['--sourceroot', self._run.options.sourceroot]
if self._run.options.test_bootstrap:
# We don't want re-executed instance to see this.
cmd = [a for a in cmd if a != '--test-bootstrap']
# If we've already done the desired number of bootstraps, disable
# bootstrapping for the next execution. Also pass in the patched manifest
# repository.
if self._run.config.internal:
manifest_pool = branch_pool.FilterIntManifest()
manifest_pool = branch_pool.FilterExtManifest()
if manifest_pool:
manifest_dir = self._ApplyManifestPatches(manifest_pool)
extra_params.extend(['--manifest-repo-url', manifest_dir])
cmd += extra_params
result_obj = cros_build_lib.RunCommand(
cmd, cwd=self.tempdir, kill_timeout=30, error_code_ok=True)
self.returncode = result_obj.returncode
def PerformStage(self):
with osutils.TempDir(base_dir=self._run.options.bootstrap_dir) as tempdir:
self.tempdir = tempdir
self.tempdir = None
class SyncStage(generic_stages.BuilderStage):
"""Stage that performs syncing for the builder."""
option_name = 'sync'
output_manifest_sha1 = True
def __init__(self, builder_run, **kwargs):
super(SyncStage, self).__init__(builder_run, **kwargs)
self.repo = None
self.skip_sync = False
# TODO(mtennant): Why keep a duplicate copy of this config value
# at self.internal when it can always be retrieved from config?
self.internal = self._run.config.internal
self.buildbucket_client = None
if buildbucket_lib.GetServiceAccount(constants.CHROMEOS_SERVICE_ACCOUNT):
self.buildbucket_client = buildbucket_lib.BuildbucketClient(
if (config_lib.UseBuildbucketScheduler(self._run.config) and
self._run.InProduction() and
self.buildbucket_client is None):
# If it's CQ-master build, running on a buildbot and in production
# mode, buildbucket_client cannot be None in order to schedule
# slave builds.
raise buildbucket_lib.NoBuildbucketClientException(
'Buildbucket_client is None. '
'Please check if the buildbot has a valid service account file. '
'Please find the service account json file at %s.' %
def _GetManifestVersionsRepoUrl(self, internal=None, test=False):
if internal is None:
internal = self._run.config.internal
if internal:
if test:
return site_config.params.MANIFEST_VERSIONS_INT_GOB_URL_TEST
return site_config.params.MANIFEST_VERSIONS_INT_GOB_URL
if test:
return site_config.params.MANIFEST_VERSIONS_GOB_URL_TEST
return site_config.params.MANIFEST_VERSIONS_GOB_URL
def Initialize(self):
def _InitializeRepo(self):
"""Set up the RepoRepository object."""
# If we have no repository at all, but we have a warm cache path, copy in
# the warm cache. This is done so builders can try to avoid doing a sync
# from scratch on a new builder (especially GCE instances).
if (not repository.IsARepoRoot(self._build_root) and
self._run.options.repo_cache and
# If the warm cache is invalid, the wrong branch, or from the wrong
# manifest, Repository will repair it.'Using warm cache "%s" to populate buildroot "%s"',
shutil.copytree(os.path.join(self._run.options.repo_cache, '.repo'),
os.path.join(self._build_root, '.repo'),
self.repo = self.GetRepoRepository()
def GetNextManifest(self):
"""Returns the manifest to use."""
return self._run.config.manifest
def ManifestCheckout(self, next_manifest):
"""Checks out the repository to the given manifest."""
self._Print('\n'.join(['BUILDROOT: %s' %,
'TRACKING BRANCH: %s' % self.repo.branch,
'NEXT MANIFEST: %s' % next_manifest]))
if not self.skip_sync:
def RunPrePatchBuild(self):
"""Run through a pre-patch build to prepare for incremental build.
This function runs though the InitSDKStage, SetupBoardStage, and
BuildPackagesStage. It is intended to be called before applying
any patches under test, to prepare the chroot and sysroot in a state
corresponding to ToT prior to an incremental build.
True if all stages were successful, False if any of them failed.
suffix = ' (pre-Patch)'
self._run, chroot_replace=True, suffix=suffix).Run()
for builder_run in self._run.GetUngroupedBuilderRuns():
for board in builder_run.config.boards:
builder_run, board=board, suffix=suffix).Run()
builder_run, board=board, suffix=suffix).Run()
except failures_lib.StepFailure:
return False
return True
def WriteChangesToMetadata(self, changes):
"""Write the changes under test into the metadata.
changes: A list of GerritPatch instances.
changes_list = self._run.attrs.metadata.GetDict().get('changes', [])
changes_list = changes_list + [c.GetAttributeDict() for c in set(changes)]
changes_list = sorted(changes_list,
key=lambda x: (x[cros_patch.ATTR_GERRIT_NUMBER],
self._run.attrs.metadata.UpdateWithDict({'changes': changes_list})
def _GetBuildbucketBucket(self, build_name, build_config):
"""Get the corresponding Buildbucket bucket.
build_name: name of the build to put to Buildbucket.
build_config: config of the build to put to Buildbucket.
NoBuildbucketBucketFoundException when no Buildbucket bucket found.
bucket = buildbucket_lib.WATERFALL_BUCKET_MAP.get(
if bucket is None:
raise buildbucket_lib.NoBuildbucketBucketFoundException(
'No Buildbucket bucket found for builder %s waterfall: %s' %
(build_name, build_config.active_waterfall))
return bucket
def PostSlaveBuildToBuildbucket(self, build_name, build_config,
master_build_id, buildset_tag, dryrun):
"""Send a Put slave build request to Buildbucket.
build_name: Salve build name to put to Buildbucket.
build_config: Slave build config to put to Buildbucket.
master_build_id: Master build id of the slave build.
buildset_tag: The buildset tag for strong consistent tag queries.
More context:
dryrun: Whether a dryrun.
body = json.dumps({
'bucket': self._GetBuildbucketBucket(build_name, build_config),
'parameters_json': json.dumps({
'builder_name': build_name,
'properties': {
'cbb_config': build_name,
'cbb_branch': self._run.manifest_branch,
'cbb_master_build_id': master_build_id,
'tags':['buildset:%s' % buildset_tag,
'build_type:%s' % build_config.build_type,
'cbb_config:%s' % build_name,
'cbb_master_build_id:%s' % master_build_id]
content = self.buildbucket_client.PutBuildRequest(
body, self._run.options.test_tryjob, dryrun)
buildbucket_id = buildbucket_lib.GetBuildId(content)
created_ts = buildbucket_lib.GetBuildCreated_ts(content)'Build_name %s buildbucket_id %s created_timestamp %s',
build_name, buildbucket_id, created_ts)
return (buildbucket_id, created_ts)
def ScheduleSlaveBuildsViaBuildbucket(self, important_only, dryrun):
"""Schedule slave builds by sending PUT requests to Buildbucket.
important_only: Whether only schedule important slave builds.
dryrun: Whether a dryrun.
if self.buildbucket_client is None:'No buildbucket_client. Skip scheduling slaves.')
build_id, _ = self._run.GetCIDBHandle()
if build_id is None:'No build id. Skip scheduling slaves.')
buildset_tag = 'cbuildbot/%s/%s/%s' % (
self._run.manifest_branch,, build_id)
scheduled_slave_builds = []
unscheduled_slave_builds = []
# Get all active slave build configs.
slave_config_map = self._GetSlaveConfigMap(important_only)
for slave_name, slave_config in slave_config_map.iteritems():
buildbucket_id, created_ts = self.PostSlaveBuildToBuildbucket(
slave_name, slave_config, build_id, buildset_tag, dryrun)
scheduled_slave_builds.append((slave_name, buildbucket_id, created_ts))
except buildbucket_lib.BuildbucketResponseException as e:
# Use 16-digit ts to be consistent with the created_ts from Buildbucket
current_ts = int(round(time.time() * 1000000))
unscheduled_slave_builds.append((slave_name, None, current_ts))
if important_only or slave_config.important:
logging.warning('Failed to schedule %s current timestamp %s: %s'
% (slave_name, current_ts, e))
'scheduled_slaves', scheduled_slave_builds)
'unscheduled_slaves', unscheduled_slave_builds)
def PerformStage(self):
with osutils.TempDir() as tempdir:
# Save off the last manifest.
fresh_sync = True
if os.path.exists( and not self._run.options.clobber:
old_filename = os.path.join(tempdir, 'old.xml')
old_contents = self.repo.ExportManifest()
except cros_build_lib.RunCommandError as e:
osutils.WriteFile(old_filename, old_contents)
fresh_sync = False
# Sync.
# Print the blamelist.
if fresh_sync:
logging.PrintBuildbotStepText('(From scratch)')
elif self._run.options.buildbot:
lkgm_manager.GenerateBlameList(self.repo, old_filename)
# Incremental builds request an additional build before patching changes.
if self._run.config.build_before_patching:
pre_build_passed = self.RunPrePatchBuild()
if not pre_build_passed:
logging.PrintBuildbotStepText('Pre-patch build failed.')
class LKGMSyncStage(SyncStage):
"""Stage that syncs to the last known good manifest blessed by builders."""
output_manifest_sha1 = False
def GetNextManifest(self):
"""Override: Gets the LKGM."""
# TODO(sosa): Should really use an initialized manager here.
if self.internal:
mv_dir = site_config.params.INTERNAL_MANIFEST_VERSIONS_PATH
mv_dir = site_config.params.EXTERNAL_MANIFEST_VERSIONS_PATH
manifest_path = os.path.join(self._build_root, mv_dir)
manifest_repo = self._GetManifestVersionsRepoUrl()
manifest_version.RefreshManifestCheckout(manifest_path, manifest_repo)
return os.path.join(manifest_path, self._run.config.lkgm_manifest)
class ManifestVersionedSyncStage(SyncStage):
"""Stage that generates a unique manifest file, and sync's to it."""
# TODO(mtennant): Make this into a builder run value.
output_manifest_sha1 = False
def __init__(self, builder_run, **kwargs):
# Perform the sync at the end of the stage to the given manifest.
super(ManifestVersionedSyncStage, self).__init__(builder_run, **kwargs)
self.repo = None
self.manifest_manager = None
# If a builder pushes changes (even with dryrun mode), we need a writable
# repository. Otherwise, the push will be rejected by the server.
self.manifest_repo = self._GetManifestVersionsRepoUrl()
# 1. Our current logic for calculating whether to re-run a build assumes
# that if the build is green, then it doesn't need to be re-run. This
# isn't true for canary masters, because the canary master ignores the
# status of its slaves and is green even if they fail. So set
# force=True in this case.
# 2. If we're running with --debug, we should always run through to
# completion, so as to ensure a complete test.
self._force = self._run.config.master or self._run.options.debug
def HandleSkip(self):
"""Initializes a manifest manager to the specified version if skipped."""
super(ManifestVersionedSyncStage, self).HandleSkip()
if self._run.options.force_version:
def ForceVersion(self, version):
"""Creates a manifest manager from given version and returns manifest."""
return self.manifest_manager.BootstrapFromVersion(version)
def VersionIncrementType(self):
"""Return which part of the version number should be incremented."""
if self._run.manifest_branch == 'master':
return 'build'
return 'branch'
def RegisterManifestManager(self, manifest_manager):
"""Save the given manifest manager for later use in this run.
manifest_manager: Expected to be a BuildSpecsManager.
self._run.attrs.manifest_manager = self.manifest_manager = manifest_manager
def Initialize(self):
"""Initializes a manager that manages manifests for associated stages."""
dry_run = self._run.options.debug
# If chrome_rev is somehow set, fail.
assert not self._chrome_rev, \
'chrome_rev is unsupported on release builders.'
def _SetAndroidVersionIfApplicable(self, manifest):
"""If 'android' is in |manifest|, write version to the BuilderRun object.
manifest: Path to the manifest.
manifest_dom = minidom.parse(manifest)
elements = manifest_dom.getElementsByTagName(lkgm_manager.ANDROID_ELEMENT)
if elements:
android_version = elements[0].getAttribute(
'Android version was found in the manifest: %s', android_version)
# Update the metadata dictionary. This is necessary because the
# metadata dictionary is preserved through re-executions, so
# UprevAndroidStage can read the version from the dictionary
# later. This is easier than parsing the manifest again after
# the re-execution.
'version', {'android': android_version})
def _SetChromeVersionIfApplicable(self, manifest):
"""If 'chrome' is in |manifest|, write the version to the BuilderRun object.
manifest: Path to the manifest.
manifest_dom = minidom.parse(manifest)
elements = manifest_dom.getElementsByTagName(lkgm_manager.CHROME_ELEMENT)
if elements:
chrome_version = elements[0].getAttribute(
'Chrome version was found in the manifest: %s', chrome_version)
# Update the metadata dictionary. This is necessary because the
# metadata dictionary is preserved through re-executions, so
# SyncChromeStage can read the version from the dictionary
# later. This is easier than parsing the manifest again after
# the re-execution.
'version', {'chrome': chrome_version})
def GetNextManifest(self):
"""Uses the initialized manifest manager to get the next manifest."""
assert self.manifest_manager, \
'Must run GetStageManager before checkout out build.'
build_id = self._run.attrs.metadata.GetDict().get('build_id')
to_return = self.manifest_manager.GetNextBuildSpec(build_id=build_id)
previous_version = self.manifest_manager.GetLatestPassingSpec()
target_version = self.manifest_manager.current_version
# Print the Blamelist here.
url_prefix = ''
url = url_prefix + '%s..%s' % (previous_version, target_version)
logging.PrintBuildbotLink('Blamelist', url)
# The testManifestVersionedSyncOnePartBranch interacts badly with this
# function. It doesn't fully initialize self.manifest_manager which
# causes target_version to be None. Since there isn't a clean fix in
# either direction, just throw this through str(). In the normal case,
# it's already a string anyways.
return to_return
def LocalizeManifest(self, manifest, filter_cros=False):
"""Remove restricted checkouts from the manifest if needed.
manifest: The manifest to localize.
filter_cros: If set, then only checkouts with a remote of 'cros' or
'cros-internal' are kept, and the rest are filtered out.
if filter_cros:
with osutils.TempDir() as tempdir:
filtered_manifest = os.path.join(tempdir, 'filtered.xml')
doc = ElementTree.parse(manifest)
root = doc.getroot()
for node in root.findall('project'):
remote = node.attrib.get('remote')
if remote and remote not in site_config.params.GIT_REMOTES:
yield filtered_manifest
yield manifest
def _GetMasterVersion(self, master_id, timeout=5 * 60):
"""Get the platform version associated with the master_build_id.
master_id: Our master build id.
timeout: How long to wait for the platform version to show up
in the database. This is needed because the slave builders are
triggered slightly before the platform version is written. Default
is 5 minutes.
# TODO(davidjames): Remove the wait loop here once we've updated slave
# builders to only get triggered after the platform version is written.
def _PrintRemainingTime(remaining):'%s until timeout...', remaining)
def _GetPlatformVersion():
return db.GetBuildStatus(master_id)['platform_version']
# Retry until non-None version is returned.
def _ShouldRetry(x):
return not x
_, db = self._run.GetCIDBHandle()
return timeout_util.WaitForSuccess(_ShouldRetry,
def _VerifyMasterId(self, master_id):
"""Verify that our master id is current and valid.
master_id: Our master build id.
_, db = self._run.GetCIDBHandle()
if db and master_id:
assert not self._run.options.force_version
master_build_status = db.GetBuildStatus(master_id)
latest = db.GetBuildHistory(
master_build_status['build_config'], 1,
if latest and latest[0]['id'] != master_id:
raise failures_lib.MasterSlaveVersionMismatchFailure(
'This slave\'s master (id=%s) has been supplanted by a newer '
'master (id=%s). Aborting.' % (master_id, latest[0]['id']))
def PerformStage(self):
version = self._run.options.force_version
if self._run.options.master_build_id:
version = self._GetMasterVersion(self._run.options.master_build_id)
next_manifest = None
if version:
next_manifest = self.ForceVersion(version)
self.skip_sync = True
next_manifest = self.GetNextManifest()
except validation_pool.TreeIsClosedException as e:
if not next_manifest:'Found no work to do.')
if self._run.attrs.manifest_manager.DidLastBuildFail():
raise failures_lib.StepFailure('The previous build failed.')
raise failures_lib.ExitEarlyException(
'ManifestVersionedSyncStage finished and exited early.')
# Log this early on for the release team to grep out before we finish.
if self.manifest_manager:
self._Print('\nRELEASETAG: %s\n' % (
# To keep local trybots working, remove restricted checkouts from the
# official manifest we get from manifest-versions.
with self.LocalizeManifest(
next_manifest, filter_cros=self._run.options.local) as new_manifest:
# Set the status inflight at the end of the ManifestVersionedSync
# stage. This guarantees that all syncing has completed.
if self.manifest_manager:
class MasterSlaveLKGMSyncStage(ManifestVersionedSyncStage):
"""Stage that generates a unique manifest file candidate, and sync's to it.
This stage uses an LKGM manifest manager that handles LKGM
candidates and their states.
# If we are using an internal manifest, but need to be able to create an
# external manifest, we create a second manager for that manifest.
external_manager = None
MilestoneVersion = collections.namedtuple(
'MilestoneVersion', ['milestone', 'platform'])
def __init__(self, builder_run, **kwargs):
super(MasterSlaveLKGMSyncStage, self).__init__(builder_run, **kwargs)
# lkgm_manager deals with making sure we're synced to whatever manifest
# we get back in GetNextManifest so syncing again is redundant.
self._android_version = None
self._chrome_version = None
def _GetInitializedManager(self, internal):
"""Returns an initialized lkgm manager.
internal: Boolean. True if this is using an internal manifest.
increment = self.VersionIncrementType()
return lkgm_manager.LKGMManager(
def Initialize(self):
"""Override: Creates an LKGMManager rather than a ManifestManager."""
if self._run.config.master and self._GetSlaveConfigs():
assert self.internal, 'Unified masters must use an internal checkout.'
MasterSlaveLKGMSyncStage.external_manager = \
def ForceVersion(self, version):
manifest = super(MasterSlaveLKGMSyncStage, self).ForceVersion(version)
if MasterSlaveLKGMSyncStage.external_manager:
return manifest
def _VerifyMasterId(self, master_id):
"""Verify that our master id is current and valid."""
super(MasterSlaveLKGMSyncStage, self)._VerifyMasterId(master_id)
if not self._run.config.master and not master_id:
raise failures_lib.StepFailure(
'Cannot start build without a master_build_id. Did you hit force '
'build on a slave? Please hit force build on the master instead.')
def GetNextManifest(self):
"""Gets the next manifest using LKGM logic."""
assert self.manifest_manager, \
'Must run Initialize before we can get a manifest.'
assert isinstance(self.manifest_manager, lkgm_manager.LKGMManager), \
'Manifest manager instantiated with wrong class.'
assert self._run.config.master
build_id = self._run.attrs.metadata.GetDict().get('build_id')'Creating new candidate manifest, including chrome version '
'%s.', self._chrome_version)
if self._android_version:'Adding Android version to new candidate manifest %s.',
manifest = self.manifest_manager.CreateNewCandidate(
if MasterSlaveLKGMSyncStage.external_manager:
manifest, build_id=build_id)
return manifest
def GetLatestAndroidVersion(self):
"""Returns the version of Android to uprev."""
return cros_mark_android_as_stable.GetLatestBuild(
def GetLatestChromeVersion(self):
"""Returns the version of Chrome to uprev."""
return cros_mark_chrome_as_stable.GetLatestRelease(
def GetLastChromeOSVersion(self):
"""Fetching ChromeOS version from the last run.
Fetching the chromeos version from the last run that published a manifest
by querying CIDB. Master builds that failed before publishing a manifest
will be ignored.
A namedtuple MilestoneVersion,
e.g. MilestoneVersion(milestone='44', platform='7072.0.0-rc4')
or None if failed to retrieve milestone and platform versions.
build_id, db = self._run.GetCIDBHandle()
if db is None:
return None
builds = db.GetBuildHistory(,
full_versions = [b.get('full_version') for b in builds]
old_version = next(itertools.ifilter(bool, full_versions), None)
if old_version:
pattern = r'^R(\d+)-(\d+.\d+.\d+(-rc\d+)*)'
m = re.match(pattern, old_version)
if m:
milestone =
platform =
return self.MilestoneVersion(
milestone=milestone, platform=platform)
return None
def PerformStage(self):
"""Performs the stage."""
if self._android_rev and self._run.config.master:
self._android_version = self.GetLatestAndroidVersion()'Latest Android version is: %s', self._android_version)
if (self._chrome_rev == constants.CHROME_REV_LATEST and
# PFQ master needs to determine what version of Chrome to build
# for all slaves.'I am a master running with CHROME_REV_LATEST, '
'therefore getting latest chrome version.')
self._chrome_version = self.GetLatestChromeVersion()'Latest chrome version is: %s', self._chrome_version)
# Generate blamelist
cros_version = self.GetLastChromeOSVersion()
if cros_version:
old_filename = self.manifest_manager.GetBuildSpecFilePath(
cros_version.milestone, cros_version.platform)
if not os.path.exists(old_filename):
logging.error('Could not generate blamelist, '
'manifest file does not exist: %s', old_filename)
logging.debug('Generate blamelist against: %s', old_filename)
lkgm_manager.GenerateBlameList(self.repo, old_filename)
class CommitQueueSyncStage(MasterSlaveLKGMSyncStage):
"""Commit Queue Sync stage that handles syncing and applying patches.
Similar to the MasterSlaveLKGMsync Stage, this stage handles syncing
to a manifest, passing around that manifest to other builders.
What makes this stage different is that the CQ master finds the
patches on Gerrit which are ready to be committed, apply them, and
includes the patches in the new manifest. The slaves sync to the
manifest, and apply the patches written in the manifest.
# The amount of time we wait before assuming that the Pre-CQ is down and
# that we should start testing changes that haven't been tested by the Pre-CQ.
PRE_CQ_TIMEOUT = 2 * 60 * 60
def __init__(self, builder_run, **kwargs):
super(CommitQueueSyncStage, self).__init__(builder_run, **kwargs)
# The pool of patches to be picked up by the commit queue.
# - For the master commit queue, it's initialized in GetNextManifest.
# - For slave commit queues, it's initialized in _SetPoolFromManifest.
# In all cases, the pool is saved to disk.
self.pool = None
def HandleSkip(self):
"""Handles skip and initializes validation pool from manifest."""
super(CommitQueueSyncStage, self).HandleSkip()
filename = self._run.options.validation_pool
if filename:
self.pool = validation_pool.ValidationPool.Load(
filename, builder_run=self._run)
def _ChangeFilter(self, _pool, changes, non_manifest_changes):
# First, look for changes that were tested by the Pre-CQ.
changes_to_test = []
_, db = self._run.GetCIDBHandle()
if db:
actions_for_changes = db.GetActionsForChanges(changes)
for change in changes:
status = clactions.GetCLPreCQStatus(change, actions_for_changes)
if status == constants.CL_STATUS_PASSED:
logging.warning("DB not available, unable to filter for PreCQ passed.")
# Allow Commit-Ready=+2 changes to bypass the Pre-CQ, if there are no other
# changes.
if not changes_to_test:
changes_to_test = [x for x in changes if x.HasApproval('COMR', '2')]
# If we only see changes that weren't verified by Pre-CQ, and some of them
# are really old changes, try all of the changes. This ensures that the CQ
# continues to work (albeit slowly) even if the Pre-CQ is down.
if changes and not changes_to_test:
oldest = min(x.approval_timestamp for x in changes)
if time.time() > oldest + self.PRE_CQ_TIMEOUT:
# It's safest to try all changes here because some of the old changes
# might depend on newer changes (e.g. via CQ-DEPEND).
changes_to_test = changes
return changes_to_test, non_manifest_changes
def _SetPoolFromManifest(self, manifest):
"""Sets validation pool based on manifest path passed in."""
# Note that this function is only called after the repo is already
# sync'd, so AcquirePoolFromManifest does not need to sync.
self.pool = validation_pool.ValidationPool.AcquirePoolFromManifest(
manifest, self._run.config.overlays, self.repo,
self._run.buildnumber, self._run.GetBuilderName(),
self._run.config.master, self._run.options.debug,
def _GetLGKMVersionFromManifest(self, manifest):
manifest_dom = minidom.parse(manifest)
elements = manifest_dom.getElementsByTagName(lkgm_manager.LKGM_ELEMENT)
if elements:
lkgm_version = elements[0].getAttribute(lkgm_manager.LKGM_VERSION_ATTR)
'LKGM version was found in the manifest: %s', lkgm_version)
return lkgm_version
def GetNextManifest(self):
"""Gets the next manifest using LKGM logic."""
assert self.manifest_manager, \
'Must run Initialize before we can get a manifest.'
assert isinstance(self.manifest_manager, lkgm_manager.LKGMManager), \
'Manifest manager instantiated with wrong class.'
assert self._run.config.master
build_id = self._run.attrs.metadata.GetDict().get('build_id')
# In order to acquire a pool, we need an initialized buildroot.
if not git.FindRepoDir(
query = constants.CQ_READY_QUERY
if self._run.options.cq_gerrit_override:
query = (self._run.options.cq_gerrit_override, None)
self.pool = validation_pool.ValidationPool.AcquirePool(
self._run.config.overlays, self.repo,
self._run.buildnumber, self._run.GetBuilderName(),
check_tree_open=(not self._run.options.debug or
change_filter=self._ChangeFilter, builder_run=self._run)
except validation_pool.TreeIsClosedException as e:
return None
# We must extend the builder deadline before publishing a new manifest to
# ensure that slaves have enough time to complete the builds about to
# start.
build_id, db = self._run.GetCIDBHandle()
if db:
db.ExtendDeadline(build_id, self._run.config.build_timeout)'Creating new candidate manifest.')
manifest = self.manifest_manager.CreateNewCandidate(
validation_pool=self.pool, build_id=build_id)
if MasterSlaveLKGMSyncStage.external_manager:
manifest, build_id=build_id)
return manifest
def ManifestCheckout(self, next_manifest):
"""Checks out the repository to the given manifest."""
lkgm_version = self._GetLGKMVersionFromManifest(next_manifest)
chroot_manager = chroot_lib.ChrootManager(self._build_root)
# Make sure the chroot version is valid.
# Clear the chroot version as we are in the middle of building it.
# Sync to the provided manifest on slaves. On the master, we're
# already synced to this manifest, so self.skip_sync is set and
# this is a no-op.
super(CommitQueueSyncStage, self).ManifestCheckout(next_manifest)
if self._run.config.build_before_patching:
assert not self._run.config.master
pre_build_passed = self.RunPrePatchBuild()
logging.PrintBuildbotStepName('CommitQueueSync : Apply Patches')
if not pre_build_passed:
logging.PrintBuildbotStepText('Pre-patch build failed.')
# On slaves, initialize our pool and apply patches. On the master,
# we've already done that in GetNextManifest, so this is a no-op.
if not self._run.config.master:
# Print the list of CHUMP changes since the LKGM, then apply changes and
# print the list of applied changes.
def PerformStage(self):
"""Performs normal stage and prints blamelist at end."""
if self._run.options.force_version:
# If this builder is a cq-master but not force_version build,
# schedule all slave builders via Buildbucket. If it's a debug mode run,
# PutSlaveBuildToBuildbucket would be a dryrun.
if (config_lib.UseBuildbucketScheduler(self._run.config) and
not self._run.options.force_version and
class PreCQSyncStage(SyncStage):
"""Sync and apply patches to test if they compile."""
def __init__(self, builder_run, patches, **kwargs):
super(PreCQSyncStage, self).__init__(builder_run, **kwargs)
# As a workaround for, we scan patches to see if they
# are already being merged. If they are, we don't test them in the PreCQ.
self.patches = [p for p in patches if not p.IsBeingMerged()]
if patches and not self.patches:
cros_build_lib.Die('No patches that still need testing.')
# The ValidationPool of patches to test. Initialized in PerformStage, and
# refreshed after bootstrapping by HandleSkip.
self.pool = None
def HandleSkip(self):
"""Handles skip and loads validation pool from disk."""
super(PreCQSyncStage, self).HandleSkip()
filename = self._run.options.validation_pool
if filename:
self.pool = validation_pool.ValidationPool.Load(
filename, builder_run=self._run)
def PerformStage(self):
super(PreCQSyncStage, self).PerformStage()
self.pool = validation_pool.ValidationPool.AcquirePreCQPool(
self._run.config.overlays, self._build_root,
dryrun=self._run.options.debug_forced, candidates=self.patches,
if len(self.pool.applied) == 0 and self.patches:
cros_build_lib.Die('No changes have been applied.')
changes = self.pool.applied or self.patches
class PreCQLauncherStage(SyncStage):
"""Scans for CLs and automatically launches Pre-CQ jobs to test them."""
# The number of minutes we wait before launching Pre-CQ jobs. This measures
# the idle time of a given patch series, so, for example, if a user takes
# 20 minutes to mark a series of 20 patches as ready, we won't launch a
# tryjob on any of the patches until the user has been idle for 2 minutes.
# The number of minutes we allow before considering a launch attempt failed.
# The number of minutes we allow before considering an in-flight job failed.
# The number of minutes we allow before expiring a pre-cq PASSED or
# FULLY_VERIFIED status. After this timeout is hit, a CL's status will be
# reset to None. This prevents very stale CLs from entering the CQ.
# The maximum number of patches we will allow in a given trybot run. This is
# needed because our trybot infrastructure can only handle so many patches at
# once.
# The maximum derivative of the number of tryjobs we will launch in a given
# cycle of ProcessChanges. Used to rate-limit the launcher when reopening the
# tree after building up a large backlog.
# Delta time constant for checking buildbucket. Do not check status or
# cancel builds which were launched >= BUILDBUCKET_DELTA_TIME_HOUR ago.
def __init__(self, builder_run, **kwargs):
super(PreCQLauncherStage, self).__init__(builder_run, **kwargs)
self.skip_sync = True
self.last_cycle_launch_count = 0
def _HasTimedOut(self, start, now, timeout_minutes):
"""Check whether |timeout_minutes| has elapsed between |start| and |now|.
start: datetime.datetime start time.
now: datetime.datetime current time.
timeout_minutes: integer number of minutes for timeout.
True if (now-start) > timeout_minutes.
diff = datetime.timedelta(minutes=timeout_minutes)
return (now - start) > diff
def _PrintPatchStatus(patch, status):
"""Print a link to |patch| with |status| info."""
items = (
logging.PrintBuildbotLink(' | '.join(items), patch.url)
def _ConfiguredVerificationsForChange(self, change):
"""Determine which configs to test |change| with.
This method returns only the configs that are asked for by the config
file. It does not include special-case logic for adding additional bots
based on the type of the repository (see VerificationsForChange for that).
change: GerritPatch instance to get configs-to-test for.
A set of configs to test.
configs_to_test = None
# If a pre-cq config is specified in the commit message, use that.
# Otherwise, look in appropriate COMMIT-QUEUE.ini. Otherwise, default to
lines = cros_patch.GetOptionLinesFromCommitMessage(
change.commit_message, constants.PRE_CQ_CONFIGS_OPTION_REGEX)
if lines is not None:
configs_to_test = self._ParsePreCQOption(' '.join(lines))
configs_to_test = configs_to_test or self._ParsePreCQOption(
triage_lib.GetOptionForChange(self._build_root, change, 'GENERAL',
return set(configs_to_test or constants.PRE_CQ_DEFAULT_CONFIGS)
def VerificationsForChange(self, change):
"""Determine which configs to test |change| with.
change: GerritPatch instance to get configs-to-test for.
A set of configs to test.
configs_to_test = set(self._ConfiguredVerificationsForChange(change))
# Add the BINHOST_PRE_CQ to any changes that affect an overlay.
if '/overlays/' in change.project:
return configs_to_test
def _ParsePreCQOption(self, pre_cq_option):
"""Gets a valid config list, or None, from |pre_cq_option|."""
if pre_cq_option and pre_cq_option.split():
configs_to_test = set(pre_cq_option.split())
# Replace 'default' with the default configs.
if 'default' in configs_to_test:
# Verify that all of the configs are valid.
if all(c in self._run.site_config for c in configs_to_test):
return configs_to_test
return None
def ScreenChangeForPreCQ(self, change):
"""Record which pre-cq tryjobs to test |change| with.
This method determines which configs to test a given |change| with, and
writes those as pending tryjobs to the cidb.
change: GerritPatch instance to screen. This change should not yet have
been screened.
actions = []
configs_to_test = self.VerificationsForChange(change)
for c in configs_to_test:
change, constants.CL_ACTION_SCREENED_FOR_PRE_CQ))
build_id, db = self._run.GetCIDBHandle()
db.InsertCLActions(build_id, actions)
def CanSubmitChangeInPreCQ(self, change):
"""Look up whether |change| is configured to be submitted in the pre-CQ.
This looks up the "submit-in-pre-cq" setting inside the project in
COMMIT-QUEUE.ini and checks whether it is set to "yes".
submit-in-pre-cq: yes
change: Change to examine.
Boolean indicating if this change is configured to be submitted
in the pre-CQ.
result = None
result = triage_lib.GetOptionForChange(
self._build_root, change, 'GENERAL', 'submit-in-pre-cq')
except ConfigParser.Error:
logging.error('%s has malformed config file', change, exc_info=True)
return bool(result and result.lower() == 'yes')
def GetConfigBuildbucketIdMap(self, output):
"""Get a config:buildbucket_id map.
Config is the config-name of a pre-cq triggered by the pre-cq-launcher.
buildbucket_id is the request id of the pre-cq build.
config_buildbucket_id_map = {}
for line in output.splitlines():
config = None
buildbucket_id = None
match_config ='\[config:(\S*)\]', line)
if match_config:
config =
match_id ='\[buildbucket_id:(\S*)\]', line)
if match_id:
buildbucket_id =
if config is not None and buildbucket_id is not None:
config_buildbucket_id_map[config] = buildbucket_id
return config_buildbucket_id_map
def LaunchTrybot(self, pool, plan, configs):
"""Launch a Pre-CQ run with the provided list of CLs.
pool: ValidationPool corresponding to |plan|.
plan: The list of patches to test in the pre-cq tryjob.
configs: A list of pre-cq config names to launch.
# Verify the configs to test are in the cbuildbot config list.
for config in configs:
if config not in self._run.site_config:
for change in plan:
logging.error('No such configuraton target: %s. '
'Skipping trybots for %s %s',
config, str(change), change.url)
pool.HandleNoConfigTargetFailure(change, config)
cmd = ['cbuildbot', '--remote',
'--timeout', str(self.INFLIGHT_TIMEOUT * 60)] + configs
for patch in plan:
cmd += ['-g', cros_patch.AddPrefix(patch, patch.gerrit_number)]
self._PrintPatchStatus(patch, 'testing')
use_buildbucket = False
config_buildbucket_id_map = {}
if buildbucket_lib.GetServiceAccount(constants.CHROMEOS_SERVICE_ACCOUNT):
# use buildbucket to launch trybots.
cmd += ['--use-buildbucket']
use_buildbucket = True
if self._run.options.debug:
logging.debug('Would have launched tryjob with %s', cmd)
result = cros_build_lib.RunCommand(
cmd, cwd=self._build_root, capture_output=True)
if result and result.output:'cbuildbot output: %s' % result.output)
if use_buildbucket:
config_buildbucket_id_map = self.GetConfigBuildbucketIdMap(
actions = []
build_id, db = self._run.GetCIDBHandle()
for patch in plan:
for config in configs:
patch, constants.CL_ACTION_TRYBOT_LAUNCHING, config,
db.InsertCLActions(build_id, actions)
def GetDisjointTransactionsToTest(self, pool, progress_map):
"""Get the list of disjoint transactions to test.
Side effect: reject or retry changes that have timed out.
pool: The validation pool.
progress_map: See return type of clactions.GetPreCQProgressMap.
A list of (transaction, config) tuples corresponding to different trybots
that should be launched.
# Get the set of busy and passed CLs.
busy, _, verified = clactions.GetPreCQCategories(progress_map)
screened_changes = set(progress_map)
# Create a list of disjoint transactions to test.
manifest = git.ManifestCheckout.Cached(self._build_root)'Creating disjoint transactions.')
plans = pool.CreateDisjointTransactions(
manifest, screened_changes,
max_txn_length=self.MAX_PATCHES_PER_TRYBOT_RUN)'Created %s disjoint transactions.', len(plans))
for plan in plans:
# If any of the CLs in the plan is not yet screened, wait for them to
# be screened.
# If any of the CLs in the plan are currently "busy" being tested,
# wait until they're done before starting to test this plan.
# Similarly, if all of the CLs in the plan have already been validated,
# there's no need to launch a trybot run.
plan = set(plan)
if not plan.issubset(screened_changes):'CLs waiting to be screened: %s',
elif plan.issubset(verified):'CLs already verified: %s',
elif plan.intersection(busy):'CLs currently being verified: %s',
if plan.difference(busy):'CLs waiting on verification of dependencies: %r',
# TODO(akeshet): Consider using a database time rather than gerrit
# approval time and local clock for launch delay.
elif any(x.approval_timestamp + self.LAUNCH_DELAY * 60 > time.time()
for x in plan):'CLs waiting on launch delay: %s',
pending_configs = clactions.GetPreCQConfigsToTest(plan, progress_map)
for config in pending_configs:
yield (plan, config)
def _ProcessRequeuedAndSpeculative(self, change, action_history):
"""Detect if |change| was requeued by developer, and mark in cidb.
change: GerritPatch instance to check.
action_history: List of CLActions.
action_string = clactions.GetRequeuedOrSpeculative(
change, action_history, not change.IsMergeable())
if action_string:
build_id, db = self._run.GetCIDBHandle()
action = clactions.CLAction.FromGerritPatchAndAction(
change, action_string)
db.InsertCLActions(build_id, [action])
def _ProcessExpiry(self, change, status, timestamp, pool, current_time):
"""Enforce expiry of a PASSED or FULLY_VERIFIED status.
change: GerritPatch instance to process.
status: |change|'s pre-cq status.
timestamp: datetime.datetime for when |status| was achieved.
pool: The current validation pool.
current_time: datetime.datetime for current database time.
if not timestamp:
timed_out = self._HasTimedOut(timestamp, current_time,
verified = status in (constants.CL_STATUS_PASSED,
if timed_out and verified:
build_id, db = self._run.GetCIDBHandle()
if db:
pool.SendNotification(change, '%(details)s', details=msg)
action = clactions.CLAction.FromGerritPatchAndAction(
change, constants.CL_ACTION_PRE_CQ_RESET)
db.InsertCLActions(build_id, [action])
def _ProcessTimeouts(self, change, progress_map, pool, current_time):
"""Enforce per-config launch and inflight timeouts.
change: GerritPatch instance to process.
progress_map: As returned by clactions.GetCLPreCQProgress a dict mapping
each change in |changes| to a dict mapping config names
to (status, timestamp) tuples for the configs under test.
pool: The current validation pool.
current_time: datetime.datetime timestamp giving current database time.
timeout_statuses = (constants.CL_PRECQ_CONFIG_STATUS_LAUNCHED,
config_progress = progress_map[change]
for config, (config_status, timestamp, _) in config_progress.iteritems():
if not config_status in timeout_statuses:
launched = config_status == constants.CL_PRECQ_CONFIG_STATUS_LAUNCHED
timeout = self.LAUNCH_TIMEOUT if launched else self.INFLIGHT_TIMEOUT
msg = (PRECQ_LAUNCH_TIMEOUT_MSG if launched
else PRECQ_INFLIGHT_TIMEOUT_MSG) % (config, timeout)
if self._HasTimedOut(timestamp, current_time, timeout):
pool.SendNotification(change, '%(details)s', details=msg)
pool.RemoveReady(change, reason=config)
pool.UpdateCLPreCQStatus(change, constants.CL_STATUS_FAILED)
def _CancelPreCQIfNeeded(self, db, old_build_action, testjob=False):
"""Cancel the pre-cq if it's still running.
db: CIDB connection instance.
old_build_action: Old patch build action.
testjob: Whether to use the test instance of the buildbucket server.
buildbucket_id = old_build_action.buildbucket_id
get_content = self.buildbucket_client.GetBuildRequest(
buildbucket_id, testjob, dryrun=self._run.options.debug)
status = buildbucket_lib.GetBuildStatus(get_content)
constants.BUILDBUCKET_BUILDER_STATUS_STARTED]:'Cancelling old build %s %s', buildbucket_id, status)
cancel_content = self.buildbucket_client.CancelBuildRequest(
buildbucket_id, testjob, dryrun=self._run.options.debug)
cancel_status = buildbucket_lib.GetBuildStatus(cancel_content)
if cancel_status:'Cancelled buildbucket_id: %s status: %s \ncontent: %s',
buildbucket_id, cancel_status, cancel_content)
if db:
cancel_action = old_build_action._replace(
db.InsertCLActions(cancel_action.build_id, [cancel_action])
# If the old pre-cq build already completed, CANCEL response will
# give 200 returncode with error reasons.
logging.debug('Failed to cancel buildbucket_id: %s reason: %s',
def _ProcessOldPatchPreCQRuns(self, db, change, action_history):
"""Process Pre-cq runs for change with old patch numbers.
db: CIDB connection instance.
change: GerritPatch instance to process.
action_history: List of CLActions.
min_timestamp = - datetime.timedelta(
old_pre_cq_build_actions = clactions.GetOldPreCQBuildActions(
change, action_history, min_timestamp)
for old_build_action in old_pre_cq_build_actions:
db, old_build_action, testjob=self._run.options.test_tryjob)
except Exception as e:
# Log errors; do not raise exceptions.
logging.error('_CancelPreCQIfNeeded failed. '
'change: %s old_build_action: %s error: %r',
change, old_build_action, e)
def _ProcessVerified(self, change, can_submit, will_submit):
"""Process a change that is fully pre-cq verified.
change: GerritPatch instance to process.
can_submit: set of changes that can be submitted by the pre-cq.
will_submit: set of changes that will be submitted by the pre-cq.
A tuple of (set of changes that should be submitted by pre-cq,
set of changes that should be passed by pre-cq)
# If this change and all its dependencies are pre-cq submittable,
# and none of them have yet been marked as pre-cq passed, then
# mark them for submission. Otherwise, mark this change as passed.
if change in will_submit:
return set(), set()
if change in can_submit:'Attempting to determine if %s can be submitted.', change)
patch_series = validation_pool.PatchSeries(self._build_root)
plan = patch_series.CreateTransaction(change, limit_to=can_submit)
return plan, set()
except cros_patch.DependencyError:
# Changes that cannot be submitted are marked as passed.
return set(), set([change])
def UpdateChangeStatuses(self, changes, status):
"""Update |changes| to |status|.
changes: A set of GerritPatch instances.
status: One of constants.CL_STATUS_* statuses.
if changes:
build_id, db = self._run.GetCIDBHandle()
a = clactions.TranslatePreCQStatusToAction(status)
actions = [clactions.CLAction.FromGerritPatchAndAction(c, a)
for c in changes]
db.InsertCLActions(build_id, actions)
def ProcessChanges(self, pool, changes, _non_manifest_changes):
"""Process a list of changes that were marked as Ready.
From our list of changes that were marked as Ready, we create a
list of disjoint transactions and send each one to a separate Pre-CQ
Non-manifest changes are just submitted here because they don't need to be
verified by either the Pre-CQ or CQ.
_, db = self._run.GetCIDBHandle()
action_history = db.GetActionsForChanges(changes)
if self.buildbucket_client is not None:
for change in changes:
self._ProcessOldPatchPreCQRuns(db, change, action_history)
for change in changes:
self._ProcessRequeuedAndSpeculative(change, action_history)
status_and_timestamp_map = {
c: clactions.GetCLPreCQStatusAndTime(c, action_history)
for c in changes}
status_map = {c: v[0] for c, v in status_and_timestamp_map.items()}
# Filter out failed speculative changes.
changes = [c for c in changes if status_map[c] != constants.CL_STATUS_FAILED
or c.HasReadyFlag()]
progress_map = clactions.GetPreCQProgressMap(changes, action_history)
_, inflight, verified = clactions.GetPreCQCategories(progress_map)
current_db_time = db.GetTime()
to_process = set(c for c in changes
if status_map[c] != constants.CL_STATUS_PASSED)
# Mark verified changes verified.
to_mark_verified = [c for c in verified.intersection(to_process) if
status_map[c] != constants.CL_STATUS_FULLY_VERIFIED]
# Send notifications to the fully verified changes.
if to_mark_verified:
# Changes that can be submitted, if their dependencies can be too. Only
# include changes that have not already been marked as passed.
can_submit = set(c for c in (verified.intersection(to_process)) if
c.IsMergeable() and self.CanSubmitChangeInPreCQ(c))
# Changes that will be submitted.
will_submit = set()
# Changes that will be passed.
will_pass = set()
for change in inflight:
if status_map[change] != constants.CL_STATUS_INFLIGHT:
build_ids = [x for _, _, x in progress_map[change].values()]
# Change the status to inflight.
self.UpdateChangeStatuses([change], constants.CL_STATUS_INFLIGHT)
build_dicts = db.GetBuildStatuses(build_ids)
lines = []
for b in build_dicts:
waterfall_url = constants.WATERFALL_TO_DASHBOARD[b['waterfall']]
url = tree_status.ConstructDashboardURL(
waterfall_url, b['builder_name'], b['build_number'])
lines.append('(%s) : %s' % (b['build_config'], url))
# Send notifications.
pool.HandleApplySuccess(change, build_log=('\n' + '\n'.join(lines)))
for change in to_process:
# Detect if change is ready to be marked as passed, or ready to submit.
if change in verified and change.IsMergeable():
to_submit, to_pass = self._ProcessVerified(change, can_submit,
# Screen unscreened changes to determine which trybots to test them with.
if not clactions.IsChangeScreened(change, action_history):
self._ProcessTimeouts(change, progress_map, pool, current_db_time)
# Filter out changes that have already failed, and aren't marked trybot
# ready or commit ready, before launching.
launchable_progress_map = {
k: v for k, v in progress_map.iteritems()
if k.HasReadyFlag() or status_map[k] != constants.CL_STATUS_FAILED}
is_tree_open = tree_status.IsTreeOpen(throttled_ok=True)
launch_count = 0
cl_launch_count = 0
launch_count_limit = (self.last_cycle_launch_count +
launches = {}
for plan, config in self.GetDisjointTransactionsToTest(
pool, launchable_progress_map):
launches.setdefault(frozenset(plan), []).append(config)
for plan, configs in launches.iteritems():
if not is_tree_open:'Tree is closed, not launching configs %r for plan %s.',
configs, cros_patch.GetChangesAsString(plan))
elif launch_count >= launch_count_limit:'Hit or exceeded maximum launch count of %s this cycle, '
'not launching configs %r for plan %s.',
launch_count_limit, configs,
self.LaunchTrybot(pool, plan, configs)
launch_count += len(configs)
cl_launch_count += len(configs) * len(plan)
self.last_cycle_launch_count = launch_count
# Mark passed changes as passed
self.UpdateChangeStatuses(will_pass, constants.CL_STATUS_PASSED)
# Expire any very stale passed or fully verified changes.
for c, v in status_and_timestamp_map.items():
self._ProcessExpiry(c, v[0], v[1], pool, current_db_time)
# Submit changes that are ready to submit, if we can.
if tree_status.IsTreeOpen(throttled_ok=True):
submit_reason = constants.STRATEGY_PRECQ_SUBMIT
will_submit = {c:submit_reason for c in will_submit}
submitted, _ = pool.SubmitChanges(will_submit, check_tree_open=False)
# Record stats about submissions in monarch.
if db:
submitted_change_actions = db.GetActionsForChanges(submitted)
strategies = {m: constants.STRATEGY_PRECQ_SUBMIT for m in submitted}
clactions.CLActionHistory(submitted_change_actions), strategies)
# Tell ValidationPool to keep waiting for more changes until we hit
# its internal timeout.
return [], []
def SendChangeCountStats(self, status_map):
"""Sends metrics of the CL counts to Monarch and statsd.
status_map: A map from CLs to statuses.
# Separately count and log the number of mergable and speculative changes in
# each of the possible pre-cq statuses (or in status None).
status_counts = {}
for count_bin in itertools.product((True, False), POSSIBLE_STATUSES):
status_counts[count_bin] = 0
for c, status in status_map.iteritems():
count_bin = (c.IsMergeable(), status)
status_counts[count_bin] += 1
for (is_mergable, status), count in sorted(status_counts.items()):
subtype = 'mergeable' if is_mergable else 'speculative'
name = '.'.join(['pre-cq-status', status if status else 'None'])'Sending stat (name, subtype, count): (%s, %s, %s)',
name, subtype, count)
graphite.StatsFactory.GetInstance().Gauge(name).send(subtype, count)
count, {'status': str(status), 'subtype': subtype})
def PerformStage(self):
# Setup and initialize the repo.
super(PreCQLauncherStage, self).PerformStage()
query = constants.PRECQ_READY_QUERY
if self._run.options.cq_gerrit_override:
query = (self._run.options.cq_gerrit_override, None)
# Loop through all of the changes until we hit a timeout.
self._run.config.overlays, self.repo,
check_tree_open=False, change_filter=self.ProcessChanges,