blob: b779018c7435b1f29568176b673a44940efb7091 [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright 2019 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Logic to handle uprevving packages."""
from __future__ import print_function
import collections
import enum
import filecmp
import functools
import os
import re
import sys
from typing import Iterable, List, Optional, Tuple, Union
from chromite.cbuildbot import manifest_version
from chromite.lib import constants
from chromite.lib import cros_build_lib
from chromite.lib import cros_logging as logging
from chromite.lib import git
from chromite.lib import osutils
from chromite.lib import parallel
from chromite.lib import portage_util
from chromite.lib.chroot_lib import Chroot
assert sys.version_info >= (3, 6), 'This module requires Python 3.6+'
CHROME_VERSION_REGEX = r'\d+\.\d+\.\d+\.\d+'
# The directory relative to the source root housing the chrome packages.
_CHROME_OVERLAY_DIR = 'src/third_party/chromiumos-overlay'
GitRef = collections.namedtuple('GitRef', ['path', 'ref', 'revision'])
class Error(Exception):
"""Base error class for the module."""
class NoUnstableEbuildError(Error):
"""When no unstable ebuild can be found."""
class EbuildUprevError(Error):
"""An error occurred while uprevving packages."""
class EbuildManifestError(Error):
"""Error when running ebuild manifest."""
class ChromeEBuild(portage_util.EBuild):
"""Thin sub-class of EBuild that adds a few small helpers."""
chrome_version_re = re.compile(r'.*-(%s|9999).*' % CHROME_VERSION_REGEX)
chrome_version = ''
def __init__(self, path):
portage_util.EBuild.__init__(self, path)
re_match = self.chrome_version_re.match(self.ebuild_path_no_revision)
if re_match:
self.chrome_version =
def __str__(self):
return self.ebuild_path
def is_unstable(self):
return not self.is_stable
def atom(self):
return '%s-%s' % (self.package, self.version)
def get_chrome_version_from_refs(refs):
"""Get the chrome version to use from the list of provided tags.
refs (list[GitRef]): The tags to parse for the best chrome version.
str: The chrome version to use.
assert refs
# Each tag is a chrome version string, e.g. "78.0.3876.1", so extract the
# tag name from the ref, e.g. "refs/tags/78.0.3876.1".
versions = [ref.ref.split('/')[-1] for ref in refs]
return best_chrome_version(versions)
def best_chrome_version(versions):
# Convert each version from a string like "78.0.3876.1" to a list of ints
# to compare them, find the most recent (max), and then reconstruct the
# version string.
assert versions
version = max([int(part) for part in v.split('.')] for v in versions)
return '.'.join(str(part) for part in version)
def best_chrome_ebuild(ebuilds):
"""Determine the best/newest chrome ebuild from a list of ebuilds."""
assert ebuilds
version = best_chrome_version(ebuild.chrome_version for ebuild in ebuilds)
candidates = [
ebuild for ebuild in ebuilds if ebuild.chrome_version == version
if len(candidates) == 1:
# Only one, return it.
return candidates[0]
# Compare revisions to break a tie.
best = candidates[0]
for candidate in candidates[1:]:
if best.current_revision < candidate.current_revision:
best = candidate
return best
def find_chrome_ebuilds(package_dir):
"""Return a tuple of chrome's unstable ebuild and stable ebuilds.
package_dir: The path to where the package ebuild is stored.
Tuple [unstable_ebuild, stable_ebuilds].
Exception: if no unstable ebuild exists for Chrome.
stable_ebuilds = []
unstable_ebuilds = []
for ebuild_path in portage_util.EBuild.List(package_dir):
ebuild = ChromeEBuild(ebuild_path)
if not ebuild.chrome_version:
logging.warning('Poorly formatted ebuild found at %s', ebuild_path)
if ebuild.is_unstable:
# Apply some sanity checks.
if not unstable_ebuilds:
raise NoUnstableEbuildError('Missing 9999 ebuild for %s' % package_dir)
if not stable_ebuilds:
logging.warning('Missing stable ebuild for %s', package_dir)
return best_chrome_ebuild(unstable_ebuilds), stable_ebuilds
class Outcome(enum.Enum):
"""An enum representing the possible outcomes of a package uprev attempt.
NEWER_VERSION_EXISTS: An ebuild with a higher version than the requested
version already exists, so no change occurred.
SAME_VERSION_EXISTS: An ebuild with the same version as the requested
version already exists and the stable & unstable ebuilds are identical,
so no change occurred.
REVISION_BUMP: An ebuild with the same version as the requested version
already exists but the contents of the stable & unstable ebuilds differ,
so the stable ebuild was updated and the revision number was increased.
VERSION_BUMP: The requested uprev version was greater than that of any
stable ebuild that exists, so a new stable ebuild was created at the
requested version.
NEW_EBUILD_CREATED: No stable ebuild for this package existed yet, so a new
stable ebuild was created at the requested version.
class UprevResult(object):
"""The result of a package uprev attempt.
This object is truthy if files were altered by the uprev and falsey if no
files were changed.
outcome: An instance of Outcome documenting what change took place.
changed_files: A list of the paths of the files that were altered by this
uprev attempt.
outcome: Outcome
changed_files: List[str]
def __init__(self,
outcome: Outcome,
changed_files: Optional[Iterable[str]] = None):
self.outcome = outcome
if isinstance(changed_files, str):
raise TypeError('changed_files must be a list of str, not a bare str.')
self.changed_files = list(changed_files or [])
def __bool__(self):
"""Returns True only if this Result indicates that a file was modified."""
return self.outcome in (
class UprevChromeManager(object):
"""Class to handle uprevving chrome and its related packages."""
def __init__(self, version, build_targets=None, overlay_dir=None,
self._version = version
self._build_targets = build_targets or []
self._new_ebuild_files = []
self._removed_ebuild_files = []
self._overlay_dir = str(overlay_dir or _CHROME_OVERLAY_PATH)
self._chroot = chroot
def modified_ebuilds(self):
return self._new_ebuild_files + self._removed_ebuild_files
def uprev(self, package: str) -> UprevResult:
"""Uprev a chrome package."""
package_dir = os.path.join(self._overlay_dir, package)
package_name = os.path.basename(package)
# Find the unstable (9999) ebuild and any existing stable ebuilds.
unstable_ebuild, stable_ebuilds = find_chrome_ebuilds(package_dir)
# Find the best stable candidate to uprev -- the one that will be replaced.
should_uprev, candidate = self._find_chrome_uprev_candidate(stable_ebuilds)
if not should_uprev and candidate:
return UprevResult(Outcome.NEWER_VERSION_EXISTS)
result = self._mark_as_stable(candidate, unstable_ebuild, package_name,
# If result is falsey then no files changed, and we don't need to do any
# clean-up.
if not result:
return result
if candidate and not candidate.IsSticky():
return result
def _find_chrome_uprev_candidate(
self, stable_ebuilds: List[ChromeEBuild]
) -> Tuple[bool, Optional[ChromeEBuild]]:
"""Find the ebuild to replace.
stable_ebuilds: All stable ebuilds that were found.
A (okay_to_uprev, best_stable_candidate) tuple.
okay_to_uprev: A bool indicating that an uprev should proceed. False if
a newer stable ebuild than the requested version exists.
best_stable_candidate: The highest version stable ebuild that exists, or
None if no stable ebuilds exist.
candidates = []
# This is an artifact from the old process.
chrome_branch_re = re.compile(r'%s.*_rc.*' % CHROME_VERSION_REGEX)
for ebuild in stable_ebuilds:
if not candidates:
return (True, None)
candidate = best_chrome_ebuild(candidates)
# A candidate is only a valid uprev candidate if its chrome version
# is no better than the target version. We can uprev equal versions
# (i.e. a revision bump), but not older. E.g.:
# Case 1 - Uprev: self._version =, Candidate =
# Case 2 - Uprev: self._version =, Candidate =
# Case 3 - Skip: self._version =, Candidate =
best_version = best_chrome_version(
[self._version, candidate.chrome_version])
if self._version == best_version:
# Cases 1 and 2.
return (True, candidate)
logging.warning('A chrome ebuild candidate with a higher version than the '
'requested uprev version was found.')
logging.debug('Requested uprev version: %s', self._version)
logging.debug('Candidate version found: %s', candidate.chrome_version)
return (False, candidate)
def _mark_as_stable(self, stable_candidate, unstable_ebuild, package_name,
package_dir) -> UprevResult:
"""Uprevs the chrome ebuild specified by chrome_rev.
This is the main function that uprevs the chrome_rev from a stable candidate
to its new version.
stable_candidate: ebuild that corresponds to the stable ebuild we are
revving from. If None, builds the a new ebuild given the version
and logic for chrome_rev type with revision set to 1.
unstable_ebuild: ebuild corresponding to the unstable ebuild for chrome.
package_name: package name.
package_dir: Path to the chromeos-chrome package dir.
Full portage version atom (including rc's, etc) that was revved.
def _is_new_ebuild_redundant(uprevved_ebuild, stable_ebuild) -> bool:
"""Returns True if the new ebuild is redundant.
This is True if there if the current stable ebuild is the exact same copy
of the new one.
if (stable_ebuild and
stable_candidate.chrome_version == uprevved_ebuild.chrome_version):
return filecmp.cmp(
return False
# Case where we have the last stable candidate with same version just rev.
if stable_candidate and stable_candidate.chrome_version == self._version:
new_ebuild_path = '%s-r%d.ebuild' % (
stable_candidate.current_revision + 1)
rev_bump = True
pf = '%s-%s_rc-r1' % (package_name, self._version)
new_ebuild_path = os.path.join(package_dir, '%s.ebuild' % pf)
rev_bump = False
new_ebuild_path, {})
new_ebuild = ChromeEBuild(new_ebuild_path)
# Determine whether this is ebuild is redundant.
if _is_new_ebuild_redundant(new_ebuild, stable_candidate):
msg = 'Previous ebuild with same version found and ebuild is redundant.'
return UprevResult(Outcome.SAME_VERSION_EXISTS)
if rev_bump:
return UprevResult(Outcome.REVISION_BUMP, [new_ebuild.ebuild_path])
elif stable_candidate:
# If a stable ebuild already existed and rev_bump is False, then a stable
# ebuild with a new major version has been generated.
return UprevResult(Outcome.VERSION_BUMP, [new_ebuild.ebuild_path])
# If no stable ebuild existed, then we've created the first stable ebuild
# for this package.
return UprevResult(Outcome.NEW_EBUILD_CREATED, [new_ebuild.ebuild_path])
def _clean_stale_package(self, package):
clean_stale_packages([package], self._build_targets, chroot=self._chroot)
class UprevOverlayManager(object):
"""Class to handle the uprev process for a set of overlays.
This handles the standard uprev process that covers most packages. There are
also specialized uprev processes for a few specific packages not handled by
this class, e.g. chrome and android.
TODO (saklein): The manifest object for this class is used deep in the
portage_util uprev process. Look into whether it's possible to redo it so
the manifest isn't required.
def __init__(self, overlays, manifest, build_targets=None, chroot=None,
"""Init function.
overlays (list[str]): The overlays to search for ebuilds.
manifest (git.ManifestCheckout): The manifest object.
build_targets (list[build_target_lib.BuildTarget]|None): The build
targets to clean in |chroot|, if desired. No effect unless |chroot| is
chroot (chroot_lib.Chroot|None): The chroot to clean, if desired.
output_dir (str|None): The path to optionally dump result files.
self.overlays = overlays
self.manifest = manifest
self.build_targets = build_targets or []
self.chroot = chroot
self.output_dir = output_dir
self._revved_packages = None
self._new_package_atoms = None
self._new_ebuild_files = None
self._removed_ebuild_files = None
self._overlay_ebuilds = None
# We cleaned up self referential ebuilds by this version, but don't enforce
# the check on older ones to avoid breaking factory/firmware branches.
root_version = manifest_version.VersionInfo.from_repo(constants.SOURCE_ROOT)
no_self_repos_version = manifest_version.VersionInfo('13099.0.0')
self._reject_self_repo = root_version >= no_self_repos_version
def modified_ebuilds(self):
if self._new_ebuild_files is not None:
return self._new_ebuild_files + self._removed_ebuild_files
return []
def uprev(self, package_list=None, force=False):
"""Uprev ebuilds.
Uprev ebuilds for the packages in package_list. If package_list is not
specified, uprevs all ebuilds for overlays in self.overlays.
package_list (list[str]): A list of packages to uprev.
force: Boolean indicating whether or not to consider blacklisted ebuilds.
# Use all found packages if an explicit package_list is not given.
use_all = not bool(package_list)
use_all=use_all, package_list=package_list, force=force)
with parallel.Manager() as manager:
# Contains the list of packages we actually revved.
self._revved_packages = manager.list()
# The new package atoms for cleanup.
self._new_package_atoms = manager.list()
# The list of added ebuild files.
self._new_ebuild_files = manager.list()
# The list of removed ebuild files.
self._removed_ebuild_files = manager.list()
inputs = [[overlay] for overlay in self.overlays]
parallel.RunTasksInProcessPool(self._uprev_overlay, inputs)
self._revved_packages = list(self._revved_packages)
self._new_package_atoms = list(self._new_package_atoms)
self._new_ebuild_files = list(self._new_ebuild_files)
self._removed_ebuild_files = list(self._removed_ebuild_files)
if self.output_dir and os.path.exists(self.output_dir):
# Write out dumps of the results. This is largely meant for sanity
# checking results.
osutils.WriteFile(os.path.join(self.output_dir, 'revved_packages'),
osutils.WriteFile(os.path.join(self.output_dir, 'new_package_atoms'),
osutils.WriteFile(os.path.join(self.output_dir, 'new_ebuild_files'),
osutils.WriteFile(os.path.join(self.output_dir, 'removed_ebuild_files'),
def _uprev_overlay(self, overlay):
"""Execute uprevs for an overlay.
overlay: The overlay to uprev.
if not os.path.isdir(overlay):
logging.warning('Skipping %s, which is not a directory.', overlay)
ebuilds = self._overlay_ebuilds.get(overlay, [])
if not ebuilds:
inputs = [[overlay, ebuild] for ebuild in ebuilds]
parallel.RunTasksInProcessPool(self._uprev_ebuild, inputs)
def _uprev_ebuild(self, overlay, ebuild):
"""Work on a single ebuild.
overlay: The overlay the ebuild belongs to.
ebuild: The ebuild to work on.
logging.debug('Working on %s, info %s', ebuild.package,
result = ebuild.RevWorkOnEBuild(
os.path.join(constants.SOURCE_ROOT, 'src'), self.manifest,
except (portage_util.InvalidUprevSourceError,
portage_util.EbuildVersionError) as e:
logging.error('An error occurred while uprevving %s: %s',
ebuild.package, e)
except OSError:
'Cannot rev %s\n'
'Note you will have to go into %s '
'and reset the git repo yourself.', ebuild.package, overlay)
if result:
new_package, ebuild_path_to_add, ebuild_path_to_remove = result
if ebuild_path_to_add:
if ebuild_path_to_remove:
self._new_package_atoms.append('=%s' % new_package)
def _populate_overlay_ebuilds(self,
"""Populates the overlay to ebuilds mapping.
Populate self._overlay_ebuilds for all overlays in self.overlays unless
otherwise specified by package_list.
use_all: Whether to include all ebuilds in the specified directories.
If true, then we gather all packages in the directories regardless
of whether they are in our set of packages.
package_list (list[str]): A set of the packages we want to gather. If
use_all is True, this argument is ignored, and should be None.
force: Boolean indicating whether or not to consider blacklisted ebuilds.
# See for origins of this.
root_version = manifest_version.VersionInfo.from_repo(constants.SOURCE_ROOT)
subdir_removal = manifest_version.VersionInfo('10363.0.0')
require_subdir_support = root_version < subdir_removal
if not package_list:
package_list = []
overlay_ebuilds = {}
inputs = [[overlay, use_all, package_list, force, require_subdir_support]
for overlay in self.overlays]
result = parallel.RunTasksInProcessPool(portage_util.GetOverlayEBuilds,
for idx, ebuilds in enumerate(result):
overlay_ebuilds[self.overlays[idx]] = ebuilds
self._overlay_ebuilds = overlay_ebuilds
def _clean_stale_packages(self):
"""Cleans up stale package info from a previous build."""
clean_stale_packages(self._new_package_atoms, self.build_targets,
def clean_stale_packages(new_package_atoms, build_targets, chroot=None):
"""Cleans up stale package info from a previous build."""
if new_package_atoms:'Cleaning up stale packages %s.', new_package_atoms)
chroot = chroot or Chroot()
if cros_build_lib.IsOutsideChroot() and not chroot.exists():
logging.warning('Unable to clean packages. No chroot to enter.')
# First unmerge all the packages for a board, then eclean it.
# We need these two steps to run in order (unmerge/eclean),
# but we can let all the boards run in parallel.
def _do_clean_stale_packages(board):
if board:
suffix = '-' + board
runcmd =
suffix = ''
runcmd = cros_build_lib.sudo_run
if cros_build_lib.IsOutsideChroot():
# Setup runcmd with the chroot arguments once.
runcmd = functools.partial(
runcmd, enter_chroot=True, chroot_args=chroot.get_enter_args())
emerge, eclean = 'emerge' + suffix, 'eclean' + suffix
if not osutils.FindMissingBinaries([emerge, eclean]):
if new_package_atoms:
# If nothing was found to be unmerged, emerge will exit(1).
result = runcmd(
[emerge, '-q', '--unmerge'] + new_package_atoms,
extra_env={'CLEAN_DELAY': '0'},
if result.returncode not in (0, 1):
raise cros_build_lib.RunCommandError('unexpected error', result)
runcmd([eclean, '-d', 'packages'],
tasks = []
for build_target in build_targets:
parallel.RunTasksInProcessPool(_do_clean_stale_packages, tasks)
UprevVersionedPackageModifications = collections.namedtuple(
'UprevVersionedPackageModifications', ('new_version', 'files'))
class UprevVersionedPackageResult(object):
"""Data object for uprev_versioned_package."""
def __init__(self):
self.modified = []
def add_result(self, new_version, modified_files):
"""Adds version/ebuilds tuple to result.
new_version: New version number of package.
modified_files: List of files modified for the given version.
result = UprevVersionedPackageModifications(new_version, modified_files)
return self
def uprevved(self):
return bool(self.modified)
def uprev_ebuild_from_pin(package_path, version_no_rev, chroot):
"""Changes the package ebuild's version to match the version pin file.
package_path: The path of the package relative to the src root. This path
should contain a stable and an unstable ebuild with the same name
as the package.
version_no_rev: The version string to uprev to (excluding revision). The
ebuild's version will be directly set to this number.
chroot (chroot_lib.Chroot): specify a chroot to enter.
UprevVersionedPackageResult: The result.
package = os.path.basename(package_path)
package_src_path = os.path.join(constants.SOURCE_ROOT, package_path)
ebuild_paths = list(portage_util.EBuild.List(package_src_path))
stable_ebuild = None
unstable_ebuild = None
for path in ebuild_paths:
ebuild = portage_util.EBuild(path)
if ebuild.is_stable:
stable_ebuild = ebuild
unstable_ebuild = ebuild
if stable_ebuild is None:
raise EbuildUprevError('No stable ebuild found for %s' % package)
if unstable_ebuild is None:
raise EbuildUprevError('No unstable ebuild found for %s' % package)
if len(ebuild_paths) > 2:
raise EbuildUprevError('Found too many ebuilds for %s: '
'expected one stable and one unstable' % package)
# If the new version is the same as the old version, bump the revision number,
# otherwise reset it to 1
if version_no_rev == stable_ebuild.version_no_rev:
version = '%s-r%d' % (version_no_rev, stable_ebuild.current_revision + 1)
version = version_no_rev + '-r1'
new_ebuild_path = os.path.join(package_path,
'%s-%s.ebuild' % (package, version))
new_ebuild_src_path = os.path.join(constants.SOURCE_ROOT,
manifest_src_path = os.path.join(package_src_path, 'Manifest')
new_ebuild_src_path, {})
# UpdateEbuildManifest runs inside the chroot and therefore needs a
# chroot-relative path.
new_ebuild_chroot_path = os.path.join(constants.CHROOT_SOURCE_ROOT,
portage_util.UpdateEbuildManifest(new_ebuild_chroot_path, chroot=chroot)
except cros_build_lib.RunCommandError as e:
raise EbuildManifestError(
'Unable to update manifest for %s: %s' % (package, e.stderr))
result = UprevVersionedPackageResult()
return result
def uprev_workon_ebuild_to_version(
package_path: Union[str, 'pathlib.Path'],
target_version: str,
chroot: Optional['chromite.lib.chroot_lib.Chroot'] = None,
src_root: str = constants.SOURCE_ROOT,
chroot_src_root: str = constants.CHROOT_SOURCE_ROOT) -> UprevResult:
"""Uprev a cros-workon ebuild to a specified version.
package_path: The path of the package relative to the src root. This path
should contain an unstable 9999 ebuild that inherits from cros-workon.
target_version: The version to use for the stable ebuild to be generated.
Should not contain a revision number.
chroot: The path to the chroot to enter, if not the default.
srcroot: Path to the root of the source checkout. Only override for testing.
chroot_src_root: Path to the root of the source checkout when inside the
chroot. Only override for testing.
package_path = str(package_path)
package = os.path.basename(package_path)
package_src_path = os.path.join(src_root, package_path)
ebuild_paths = list(portage_util.EBuild.List(package_src_path))
stable_ebuild = None
unstable_ebuild = None
for path in ebuild_paths:
ebuild = portage_util.EBuild(path)
if ebuild.is_stable:
stable_ebuild = ebuild
unstable_ebuild = ebuild
outcome = None
if stable_ebuild is None:
outcome = outcome or Outcome.NEW_EBUILD_CREATED
if unstable_ebuild is None:
raise EbuildUprevError(f'No unstable ebuild found for {package}')
if len(ebuild_paths) > 2:
raise EbuildUprevError(f'Found too many ebuilds for {package}: '
'expected one stable and one unstable')
if not unstable_ebuild.is_workon:
raise EbuildUprevError('A workon ebuild was expected '
f'but {unstable_ebuild.ebuild_path} is not workon.')
# If the new version is the same as the old version, bump the revision number,
# otherwise reset it to 1
if stable_ebuild and target_version == stable_ebuild.version_no_rev:
output_version = f'{target_version}-r{stable_ebuild.current_revision + 1}'
outcome = outcome or Outcome.REVISION_BUMP
output_version = f'{target_version}-r1'
outcome = outcome or Outcome.VERSION_BUMP
new_ebuild_path = os.path.join(package_path,
new_ebuild_src_path = os.path.join(src_root, new_ebuild_path)
manifest_src_path = os.path.join(package_src_path, 'Manifest')
# Go through the normal uprev process for a cros-workon ebuild, by calculating
# and writing out the commit & tree IDs for the projects and subtrees
# specified in the unstable ebuild.
manifest = git.ManifestCheckout.Cached(constants.SOURCE_ROOT)
info = unstable_ebuild.GetSourceInfo(
os.path.join(constants.SOURCE_ROOT, 'src'), manifest)
commit_ids = [unstable_ebuild.GetCommitId(x) for x in info.srcdirs]
if not commit_ids:
raise EbuildUprevError('No commit_ids found for %s' % info.srcdirs)
tree_ids = [unstable_ebuild.GetTreeId(x) for x in info.subtrees]
tree_ids = [tree_id for tree_id in tree_ids if tree_id]
if not tree_ids:
raise EbuildUprevError('No tree_ids found for %s' % info.subtrees)
variables = dict(
new_ebuild_src_path, variables)
# If the newly generated stable ebuild is identical to the previous one,
# early return without incrementing the revision number.
if stable_ebuild and target_version == stable_ebuild.version_no_rev and \
new_ebuild_src_path, stable_ebuild.ebuild_path, shallow=False):
return UprevResult(outcome=Outcome.SAME_VERSION_EXISTS)
if stable_ebuild is not None:
# UpdateEbuildManifest runs inside the chroot and therefore needs a
# chroot-relative path.
new_ebuild_chroot_path = os.path.join(chroot_src_root, new_ebuild_path)
portage_util.UpdateEbuildManifest(new_ebuild_chroot_path, chroot=chroot)
except cros_build_lib.RunCommandError as e:
raise EbuildManifestError(
f'Unable to update manifest for {package}: {e.stderr}')
changed_files = [new_ebuild_src_path]
if os.path.exists(manifest_src_path):
result = UprevResult(outcome=outcome, changed_files=changed_files)
if stable_ebuild is not None:
return result