| # Copyright 2012 The ChromiumOS Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Routines and classes for working with Portage overlays and ebuilds.""" |
| |
| import collections |
| import errno |
| import functools |
| import glob |
| import itertools |
| import json |
| import logging |
| import os |
| from pathlib import Path |
| import re |
| import shutil |
| from typing import ( |
| Any, |
| Dict, |
| Iterable, |
| Iterator, |
| List, |
| NamedTuple, |
| Optional, |
| Set, |
| Tuple, |
| Union, |
| ) |
| |
| from chromite.lib import build_query |
| from chromite.lib import build_target_lib |
| from chromite.lib import chroot_lib |
| from chromite.lib import constants |
| from chromite.lib import cros_build_lib |
| from chromite.lib import failures_lib |
| from chromite.lib import git |
| from chromite.lib import osutils |
| from chromite.lib import parallel |
| from chromite.lib import path_util |
| from chromite.lib.parser import package_info |
| from chromite.utils import key_value_store |
| from chromite.utils import pms |
| from chromite.utils.parser import pms_dependency |
| from chromite.utils.parser import portage_md5_cache |
| |
| |
| # Type used for buildroot arguments. Typically this comes from constants such as |
| # constants.SOURCE_ROOT, but can also be passed as a string (e.g. |
| # cbuildbot/stages/generic_stages.py). Note this must be hashable for use with |
| # functools.lru_cache (cannot be os.PathLike). |
| # TODO(build): Replace BuildrootType with just `Path`. |
| BuildrootType = Union[Path, str] |
| |
| |
| # The parsed output of running `ebuild <ebuild path> info`. |
| RepositoryInfoTuple = collections.namedtuple( |
| "RepositoryInfoTuple", ("srcdir", "project") |
| ) |
| |
| _PRIVATE_PREFIX = "%(buildroot)s/src/private-overlays" |
| |
| # This regex matches a category name. |
| _category_re = re.compile(r"^(?P<category>\w[\w\+\.\-]*)$", re.VERBOSE) |
| |
| # This regex matches blank lines, commented lines, and the EAPI line. |
| _blank_or_eapi_re = re.compile(r"^\s*(?:#|EAPI=|$)") |
| |
| # This regex is used to extract test names from IUSE_TESTS |
| _autotest_re = re.compile(r"\+tests_(\w+)", re.VERBOSE) |
| |
| # Where portage stores metadata about installed packages. |
| VDB_PATH = "var/db/pkg" |
| |
| WORKON_EBUILD_VERSION = "9999" |
| WORKON_EBUILD_SUFFIX = "-%s.ebuild" % WORKON_EBUILD_VERSION |
| |
| # A structure to hold computed values of CROS_WORKON_*. |
| CrosWorkonVars = collections.namedtuple( |
| "CrosWorkonVars", |
| ( |
| "localname", |
| "project", |
| "subdir", |
| "always_live", |
| "commit", |
| "subtrees", |
| ), |
| ) |
| |
| # EBuild source information computed from CrosWorkonVars. |
| SourceInfo = collections.namedtuple( |
| "SourceInfo", |
| ( |
| # List of project names. |
| "projects", |
| # List of source git directory paths. They are guaranteed to exist, |
| # be a directory. |
| "srcdirs", |
| # Obsolete concept of subdirs, present for old branch support. |
| "subdirs", |
| # List of source paths under subdirs. Their existence is ensured by |
| # cros-workon.eclass. They can be directories or regular files. |
| "subtrees", |
| ), |
| ) |
| |
| |
| class Error(Exception): |
| """Base exception class for portage_util.""" |
| |
| |
| class EbuildVersionError(Error): |
| """Error for when an invalid version is generated for an ebuild.""" |
| |
| |
| class InvalidUprevSourceError(Error): |
| """Error for when an uprev source is invalid.""" |
| |
| |
| class MissingOverlayError(Error): |
| """This exception indicates that a needed overlay is missing.""" |
| |
| |
| class NoVisiblePackageError(Error): |
| """Error when there is no package matching an atom.""" |
| |
| |
| class SourceDirectoryDoesNotExistError(Error, FileNotFoundError): |
| """Error when at least one of an ebuild's sources does not exist.""" |
| |
| |
| class MissingCacheEntry(Error, FileNotFoundError): |
| """No on-disk cache entry could be found for a package.""" |
| |
| |
| @functools.lru_cache(maxsize=None) |
| def _GetKnownOverlays(buildroot: BuildrootType) -> Dict[str, Dict]: |
| """Return the list of overlays for a buildroot irrespective of board. |
| |
| Find all the overlays for a buildroot and cache the result since finding the |
| overlays is the same no matter the board given for _ListOverlays. |
| |
| Args: |
| buildroot: Source root to find overlays. |
| |
| Returns: |
| List of overlays. |
| """ |
| paths = ( |
| "projects", |
| "src/overlays", |
| "src/private-overlays", |
| "src/third_party", |
| ) |
| overlays: Dict[str, Dict] = {} |
| for path in paths: |
| path = os.path.join(buildroot, path, "*") |
| for overlay in sorted(glob.glob(path)): |
| name = GetOverlayName(overlay) |
| if name is None: |
| continue |
| |
| # Sanity check the sets of repos. |
| if name in overlays: |
| raise RuntimeError( |
| 'multiple repos with same name "%s": %s and %s' |
| % (name, overlays[name]["path"], overlay) |
| ) |
| |
| try: |
| masters = key_value_store.LoadFile( |
| os.path.join(overlay, "metadata", "layout.conf") |
| )["masters"].split() |
| except (KeyError, IOError): |
| masters = [] |
| overlays[name] = { |
| "masters": masters, |
| "path": overlay, |
| } |
| return overlays |
| |
| |
| @functools.lru_cache(maxsize=None) |
| def _ListOverlays( |
| board: Optional[str] = None, |
| buildroot: BuildrootType = constants.SOURCE_ROOT, |
| ) -> List: |
| """Return the list of overlays to use for a given buildbot. |
| |
| Always returns all overlays in parent -> child order, and does not |
| perform any filtering. |
| |
| Args: |
| board: Board to look at. |
| buildroot: Source root to find overlays. |
| """ |
| # Load all the known overlays so we can extract the details below. |
| overlays = _GetKnownOverlays(buildroot) |
| |
| # Easy enough -- dump them all. |
| if board is None: |
| return sorted([x["path"] for x in overlays.values()]) |
| |
| # Build up the list of repos we need. |
| ret = [] |
| seen = set() |
| |
| def _AddRepo(repo: str, optional: bool = False) -> bool: |
| """Recursively add |repo|'s masters from |overlays| to |ret|. |
| |
| Args: |
| repo: The repo name to look up. |
| optional: If |repo| does not exist, return False, else raise an |
| MissingOverlayError. |
| |
| Returns: |
| True if |repo| was found. |
| """ |
| if repo not in overlays: |
| if optional: |
| return False |
| else: |
| raise MissingOverlayError("%s was not found" % repo) |
| |
| for master in overlays[repo]["masters"] + [repo]: |
| if master not in seen: |
| seen.add(master) |
| _AddRepo(master) |
| ret.append(overlays[master]["path"]) |
| if not master.endswith("-private"): |
| _AddRepo("%s-private" % master, True) |
| return True |
| |
| # Legacy: load the global configs. In the future, this should be found |
| # via the overlay's masters. |
| _AddRepo("chromeos", optional=True) |
| path = os.path.join( |
| buildroot, "src", "private-overlays", "chromeos-*-overlay" |
| ) |
| ret += sorted(glob.glob(path)) |
| |
| # Locate the board repo by name. |
| # Load the public & private versions if available. |
| found_pub = _AddRepo(board, optional=True) |
| found_priv = _AddRepo("%s-private" % board, optional=True) |
| |
| # If neither public nor private board was found, die. |
| if not found_pub and not found_priv: |
| raise MissingOverlayError("board overlay not found: %s" % board) |
| |
| return ret |
| |
| |
| @functools.lru_cache(maxsize=None) |
| def FindOverlays( |
| overlay_type: str, |
| board: Optional[str] = None, |
| buildroot: BuildrootType = constants.SOURCE_ROOT, |
| ) -> List: |
| """Return the list of overlays to use for a given buildbot. |
| |
| The returned list of overlays will be in parent -> child order. |
| |
| Args: |
| overlay_type: A string describing which overlays you want. |
| 'private': Just the private overlays. |
| 'public': Just the public overlays. |
| 'both': Both the public and private overlays. |
| board: Board to look at. |
| buildroot: Source root to find overlays. |
| """ |
| overlays = _ListOverlays(board=board, buildroot=buildroot) |
| private_prefix = _PRIVATE_PREFIX % dict(buildroot=buildroot) |
| if overlay_type == constants.PRIVATE_OVERLAYS: |
| return [x for x in overlays if x.startswith(private_prefix)] |
| elif overlay_type == constants.PUBLIC_OVERLAYS: |
| return [x for x in overlays if not x.startswith(private_prefix)] |
| elif overlay_type == constants.BOTH_OVERLAYS: |
| return overlays |
| else: |
| assert overlay_type is None |
| return [] |
| |
| |
| def FindOverlaysForBoards(overlay_type: str, boards: List[str]) -> List[str]: |
| """Convenience function to find the overlays for multiple boards. |
| |
| Unlike FindOverlays, there is no guarantee about the overlay ordering. |
| Produces a unique list of overlays. |
| |
| Args: |
| overlay_type: Type of overlays to search. See FindOverlays. |
| boards: The list of boards to compile a the overlays for. |
| |
| Returns: |
| The list of unique overlays from all boards. |
| """ |
| overlays = set() |
| for board in boards: |
| board_overlays = FindOverlays(overlay_type, board=board) |
| overlays |= set(board_overlays) |
| |
| return list(overlays) |
| |
| |
| def FindOverlayFile( |
| filename: str, |
| overlay_type: str = constants.BOTH_OVERLAYS, |
| board: Optional[str] = None, |
| buildroot: BuildrootType = constants.SOURCE_ROOT, |
| ) -> Optional[str]: |
| """Attempt to find a file in the overlay directories. |
| |
| Searches through this board's overlays for the specified file. The |
| overlays are searched in child -> parent order. |
| |
| Args: |
| filename: Path to search for inside the overlay. |
| overlay_type: A string describing which overlays you want. |
| 'private': Just the private overlays. |
| 'public': Just the public overlays. |
| 'both': Both the public and private overlays. |
| board: Board to look at. |
| buildroot: Source root to find overlays. |
| |
| Returns: |
| Path to the first file found in the search. None if the file is not |
| found. |
| """ |
| for overlay in reversed(FindOverlays(overlay_type, board, buildroot)): |
| if os.path.isfile(os.path.join(overlay, filename)): |
| return os.path.join(overlay, filename) |
| return None |
| |
| |
| def ReadOverlayFile( |
| filename: str, |
| overlay_type: str = constants.BOTH_OVERLAYS, |
| board: Optional[str] = None, |
| buildroot: BuildrootType = constants.SOURCE_ROOT, |
| ) -> Optional[str]: |
| """Attempt to open a file in the overlay directories. |
| |
| Searches through this board's overlays for the specified file. The |
| overlays are searched in child -> parent order. |
| |
| Args: |
| filename: Path to open inside the overlay. |
| overlay_type: A string describing which overlays you want. |
| 'private': Just the private overlays. |
| 'public': Just the public overlays. |
| 'both': Both the public and private overlays. |
| board: Board to look at. |
| buildroot: Source root to find overlays. |
| |
| Returns: |
| The contents of the file, or None if no files could be opened. |
| """ |
| file_found = FindOverlayFile(filename, overlay_type, board, buildroot) |
| if file_found is None: |
| return None |
| return osutils.ReadText(file_found) |
| |
| |
| @functools.lru_cache(maxsize=None) |
| def GetOverlayName(overlay: str) -> Optional[str]: |
| """Get the self-declared repo name for the |overlay| path.""" |
| try: |
| return key_value_store.LoadFile( |
| os.path.join(overlay, "metadata", "layout.conf") |
| )["repo-name"] |
| except (KeyError, IOError): |
| # Not all layout.conf files have a repo-name, so don't make a fuss. |
| try: |
| with open( |
| os.path.join(overlay, "profiles", "repo_name"), encoding="utf-8" |
| ) as f: |
| return f.readline().rstrip() |
| except IOError: |
| # Not all overlays have a repo_name, so don't make a fuss. |
| return None |
| |
| |
| def get_cache_file(ebuild_path: Path) -> Path: |
| """Find the cache file for |ebuild_path|.""" |
| pinfo = package_info.parse(ebuild_path) |
| md5_cache_file_path = ( |
| ebuild_path.parents[2] / "metadata" / "md5-cache" / pinfo.cpvr |
| ) |
| edb_cache_file_path = ( |
| constants.CHROOT_EDB_CACHE_ROOT |
| / "dep" |
| / ebuild_path.parents[2].relative_to("/") |
| / pinfo.cpvr |
| ) |
| |
| if edb_cache_file_path.is_file(): |
| return edb_cache_file_path |
| elif md5_cache_file_path.is_file(): |
| return md5_cache_file_path |
| else: |
| raise MissingCacheEntry( |
| f"No cache entry found for package: {pinfo.pvr}" |
| ) |
| |
| |
| def _GetSysrootTool( |
| tool: str, |
| board: Optional[str] = None, |
| sysroot: Optional[Union[str, os.PathLike]] = None, |
| ) -> str: |
| """Return the |tool| to use for a sysroot/board/host.""" |
| # If there is no board or sysroot, return the host tool. |
| if sysroot is None and board is None: |
| return tool |
| |
| # Prefer the sysroot tool if it exists. |
| if sysroot is None: |
| sysroot = build_target_lib.get_default_sysroot_path(board) |
| tool_path = cros_build_lib.GetSysrootToolPath(sysroot, tool) |
| if os.path.exists(tool_path): |
| return tool_path |
| |
| # Fallback to the general PATH wrappers if possible. |
| return tool if board is None else f"{tool}-{board}" |
| |
| |
| class EBuildVersionFormatError(Error): |
| """Exception for bad ebuild version string format.""" |
| |
| def __init__(self, filename) -> None: |
| self.filename = filename |
| message = ( |
| "Ebuild file name %s does not match expected format." % filename |
| ) |
| super().__init__(message) |
| |
| |
| class EbuildFormatIncorrectError(Error): |
| """Exception for bad ebuild format.""" |
| |
| def __init__(self, filename, message) -> None: |
| message = "Ebuild %s has invalid format: %s " % (filename, message) |
| super().__init__(message) |
| |
| |
| # Container for Classify return values. |
| EBuildClassifyAttributes = collections.namedtuple( |
| "EBuildClassifyAttributes", |
| ("is_workon", "is_stable", "is_manually_uprevved", "has_test"), |
| ) |
| |
| |
| class EBuild: |
| """Wrapper class for information about an ebuild.""" |
| |
| VERBOSE = False |
| _PACKAGE_VERSION_PATTERN = re.compile( |
| r".*-(([0-9][0-9a-z_.]*)(-r[0-9]+)?)[.]ebuild" |
| ) |
| _WORKON_COMMIT_PATTERN = re.compile(r"^CROS_WORKON_COMMIT=") |
| |
| # TODO(crbug.com/1125947): Drop CROS_WORKON_BLACKLIST. We can do this once |
| # we no longer support branches <R89 / 13623.0.0. |
| _RE_MANUAL_UPREV = re.compile( |
| r"""^CROS_WORKON_(MANUAL_UPREV|BLACKLIST)=(['"])?1\2?$""" |
| ) |
| |
| # These eclass files imply that src_test is defined for an ebuild. |
| _ECLASS_IMPLIES_TEST = { |
| "cros-common.mk", |
| "cros-firmware", |
| "cros-go", # defines src_test |
| "cros-rust", # defines src_test |
| "tast-bundle", # inherits cros-go |
| } |
| |
| @classmethod |
| def _RunGit(cls, cwd, command, **kwargs): |
| result = git.RunGit(cwd, command, print_cmd=cls.VERBOSE, **kwargs) |
| return None if result is None else result.stdout |
| |
| def IsSticky(self) -> bool: |
| """Returns True if the ebuild is sticky.""" |
| return self.is_stable and self.current_revision == 0 |
| |
| @classmethod |
| def UpdateEBuild( |
| cls, |
| ebuild_path: str, |
| variables: Dict, |
| make_stable: bool = True, |
| ) -> None: |
| """Static function that updates WORKON information in the ebuild. |
| |
| Args: |
| ebuild_path: The path of the ebuild. |
| variables: Dictionary of variables to update in ebuild. |
| make_stable: Actually make the ebuild stable. |
| """ |
| written = False |
| old_lines = osutils.ReadText(ebuild_path).splitlines() |
| new_lines = [] |
| for line in old_lines: |
| # Always add variables at the top of the ebuild, before the first |
| # nonblank line other than the EAPI line. |
| if not written and not _blank_or_eapi_re.match(line): |
| for key, value in sorted(variables.items()): |
| assert key is not None and value is not None |
| new_lines.append("%s=%s" % (key, value)) |
| written = True |
| |
| # Mark KEYWORDS as stable by removing ~'s. |
| if line.startswith("KEYWORDS=") and make_stable: |
| new_lines.append(line.replace("~", "")) |
| continue |
| |
| varname, eq, _ = line.partition("=") |
| if ( |
| not (eq == "=" and varname.strip() in variables) |
| or "portage_util: no edit" in line |
| ): |
| # Don't write out the old value of the variable. |
| new_lines.append(line) |
| |
| osutils.WriteFile(ebuild_path, "\n".join(new_lines) + "\n") |
| |
| @classmethod |
| def MarkAsStable( |
| cls, |
| unstable_ebuild_path: str, |
| new_stable_ebuild_path: str, |
| variables: Dict, |
| make_stable: bool = True, |
| ) -> None: |
| """Static function that creates a revved stable ebuild. |
| |
| This function assumes you have already figured out the name of the new |
| stable ebuild path and then creates that file from the given unstable |
| ebuild and marks it as stable. If the commit_value is set, it also |
| set the commit_keyword=commit_value pair in the ebuild. |
| |
| Args: |
| unstable_ebuild_path: The path to the unstable ebuild. |
| new_stable_ebuild_path: The path you want to use for the new stable |
| ebuild. |
| variables: Dictionary of variables to update in ebuild. |
| make_stable: Actually make the ebuild stable. |
| """ |
| shutil.copyfile(unstable_ebuild_path, new_stable_ebuild_path) |
| EBuild.UpdateEBuild(new_stable_ebuild_path, variables, make_stable) |
| |
| @classmethod |
| def CommitChange( |
| cls, message: str, overlay: Union[str, os.PathLike] |
| ) -> None: |
| """Commits current changes in git locally with given commit message. |
| |
| Args: |
| message: the commit string to write when committing to git. |
| overlay: directory in which to commit the changes. |
| |
| Raises: |
| RunCommandError: Error occurred while committing. |
| """ |
| logging.info("Committing changes with commit message: %s", message) |
| git_commit_cmd = ["commit", "-a", "-m", message] |
| cls._RunGit(overlay, git_commit_cmd) |
| |
| def __init__( |
| self, |
| path, |
| subdir_support=False, |
| use_flags: Optional[List[str]] = None, |
| cache_file: Optional[Path] = None, |
| ) -> None: |
| """Sets up data about an ebuild from its path. |
| |
| Args: |
| path: Path to the ebuild. |
| subdir_support: Support obsolete CROS_WORKON_SUBDIR. Intended for |
| branches older than 10363.0.0. |
| use_flags: USE flags to apply. |
| cache_file: Path to the ebuild's md5-cache file. We assume it |
| exists and is up-to-date already. |
| """ |
| self.subdir_support = subdir_support |
| self._ebuild_cache = ( |
| portage_md5_cache.Md5Cache(path=cache_file) if cache_file else None |
| ) |
| |
| self.overlay, self.category, self.pkgname, filename = path.rsplit( |
| "/", 3 |
| ) |
| m = self._PACKAGE_VERSION_PATTERN.match(filename) |
| if not m: |
| raise EBuildVersionFormatError(filename) |
| self.version, self.version_no_rev, revision = m.groups() |
| if revision is not None: |
| self.current_revision = int(revision.replace("-r", "")) |
| else: |
| self.current_revision = 0 |
| self.package = "%s/%s" % (self.category, self.pkgname) |
| |
| self._ebuild_path_no_version = os.path.join( |
| os.path.dirname(path), self.pkgname |
| ) |
| self.ebuild_path_no_revision = "%s-%s" % ( |
| self._ebuild_path_no_version, |
| self.version_no_rev, |
| ) |
| self._unstable_ebuild_path = "%s%s" % ( |
| self._ebuild_path_no_version, |
| WORKON_EBUILD_SUFFIX, |
| ) |
| self.ebuild_path = path |
| |
| self.is_workon = False |
| self.is_stable = False |
| self.is_manually_uprevved = False |
| self.has_test = False |
| self._ReadEBuild(path, use_flags, self._ebuild_cache) |
| |
| # Grab the latest project settings. |
| new_vars = None |
| if self._unstable_ebuild_path != self.ebuild_path and os.path.exists( |
| self._unstable_ebuild_path |
| ): |
| try: |
| new_vars = EBuild.GetCrosWorkonVars( |
| self._unstable_ebuild_path, self.pkgname |
| ) |
| except EbuildFormatIncorrectError as e: |
| logging.warning("%s: %s", self._unstable_ebuild_path, e) |
| |
| # Grab the current project settings. |
| try: |
| old_vars = EBuild.GetCrosWorkonVars(self.ebuild_path, self.pkgname) |
| except EbuildFormatIncorrectError: |
| old_vars = None |
| |
| # Merge the two settings. |
| self.cros_workon_vars = old_vars |
| if new_vars is not None and old_vars is not None: |
| merged_vars = new_vars._replace(commit=old_vars.commit) |
| # If the project settings have changed, throw away existing vars |
| # (use new_vars). |
| if merged_vars != old_vars: |
| self.cros_workon_vars = new_vars |
| |
| @staticmethod |
| def Classify( |
| ebuild_path, |
| use_flags: Optional[List[str]] = None, |
| ebuild_cache: Optional[portage_md5_cache.Md5Cache] = None, |
| ): |
| """Return the workon, stable, and manually uprev status for the ebuild. |
| |
| workon is determined by whether the ebuild inherits from the |
| 'cros-workon' eclass. stable is determined by whether there's a '~' |
| in the KEYWORDS setting in the ebuild. An ebuild is only manually |
| uprevved if a line in it starts with 'CROS_WORKON_MANUAL_UPREV='. |
| """ |
| is_workon = False |
| is_stable = False |
| is_manually_uprevved = False |
| has_test = False |
| restrict_tests = False |
| with open(ebuild_path, mode="rb") as fp: |
| for i, line in enumerate(fp): |
| # If the file has bad encodings, produce a helpful diagnostic |
| # for the user. The default encoding exception lacks direct |
| # file context. |
| try: |
| line = line.decode("utf-8") |
| except UnicodeDecodeError: |
| logging.exception( |
| "%s: line %i: invalid UTF-8: %s", ebuild_path, i, line |
| ) |
| raise |
| |
| if line.startswith("inherit "): |
| eclasses = set(line.split()) |
| if "cros-workon" in eclasses: |
| is_workon = True |
| if EBuild._ECLASS_IMPLIES_TEST & eclasses: |
| has_test = True |
| elif line.startswith("KEYWORDS="): |
| # Strip off the comments, then extract the value of the |
| # variable, then strip off any quotes. |
| line = line.split("#", 1)[0].split("=", 1)[1].strip("\"'") |
| for keyword in line.split(): |
| if not keyword.startswith("~") and keyword != "-*": |
| is_stable = True |
| elif EBuild._RE_MANUAL_UPREV.match(line.strip()): |
| is_manually_uprevved = True |
| elif ( |
| line.startswith("src_test()") |
| or line.startswith("platform_pkg_test()") |
| or line.startswith("multilib_src_test()") |
| ): |
| has_test = True |
| elif line.startswith("RESTRICT="): |
| # Strip off the comments, then extract the value of the |
| # variable, then strip off any quotes. |
| value = ( |
| line.split("#", 1)[0].split("=", 1)[1].strip(' \t\n"') |
| ) |
| try: |
| node = pms_dependency.parse(value) |
| # Remember if an earlier RESTRICT line disabled tests. |
| restrict_tests = ( |
| "test" in node.reduce(use_flags) or restrict_tests |
| ) |
| except pms_dependency.PmsSyntaxError as e: |
| logging.warning( |
| "%s: unable to parse RESTRICT: %s", ebuild_path, e |
| ) |
| |
| # If we have a cache file, trust it over any ad-hoc ebuild parsing. |
| if ebuild_cache: |
| restrict_tests = "test" in ebuild_cache.restrict.reduce(use_flags) |
| |
| return EBuildClassifyAttributes( |
| is_workon, |
| is_stable, |
| is_manually_uprevved, |
| has_test and not restrict_tests, |
| ) |
| |
| def _ReadEBuild( |
| self, |
| path, |
| use_flags: Optional[List[str]] = None, |
| cache: Optional[portage_md5_cache.Md5Cache] = None, |
| ) -> None: |
| """Determine is_workon, is_stable and is_manually_uprevved settings. |
| |
| These are determined using the static Classify function. |
| """ |
| ( |
| self.is_workon, |
| self.is_stable, |
| self.is_manually_uprevved, |
| self.has_test, |
| ) = EBuild.Classify(path, use_flags, cache) |
| |
| @staticmethod |
| def _GetAutotestTestsFromSettings(settings) -> List[str]: |
| """Return a list of test names, when given a settings dictionary. |
| |
| Args: |
| settings: A dictionary containing ebuild variables contents. |
| |
| Returns: |
| A list of test name strings. |
| """ |
| # We do a bit of string wrangling to extract directory names from test |
| # names. First, get rid of special characters. |
| test_list: List[str] = [] |
| raw_tests_str = settings["IUSE_TESTS"] |
| if not raw_tests_str: |
| return test_list |
| |
| test_list.extend(_autotest_re.findall(raw_tests_str)) |
| return test_list |
| |
| @staticmethod |
| def GetAutotestSubdirsToRev(ebuild_path, srcdir) -> List[str]: |
| """Return list of subdirs to be watched while deciding whether to uprev. |
| |
| This logic is specific to autotest related ebuilds, that derive from the |
| autotest eclass. |
| |
| Args: |
| ebuild_path: Path to the ebuild file (e.g |
| autotest-tests-graphics-9999.ebuild). |
| srcdir: The path of the source for the test. |
| |
| Returns: |
| A list of strings mentioning directory paths. |
| """ |
| results: List[str] = [] |
| test_vars = ("IUSE_TESTS",) |
| |
| if not ebuild_path or not srcdir: |
| return results |
| |
| # TODO(pmalani): Can we get this from get_test_list in autotest.eclass ? |
| settings = osutils.SourceEnvironment( |
| ebuild_path, test_vars, env=None, multiline=True |
| ) |
| if "IUSE_TESTS" not in settings: |
| return results |
| |
| test_list = EBuild._GetAutotestTestsFromSettings(settings) |
| |
| location = ["client", "server"] |
| test_type = ["tests", "site_tests"] |
| |
| # Check the existence of every directory combination of location and |
| # test_type for each test. |
| # This is a re-implementation of the same logic in autotest_src_prepare |
| # in the chromiumos-overlay/eclass/autotest.eclass. |
| for cur_test in test_list: |
| for x, y in list(itertools.product(location, test_type)): |
| a = os.path.join(srcdir, x, y, cur_test) |
| if os.path.isdir(a): |
| results.append(os.path.join(x, y, cur_test)) |
| |
| return results |
| |
| @staticmethod |
| def GetCrosWorkonVars(ebuild_path: Union[str, os.PathLike], pkg_name: str): |
| """Return the finalized values of CROS_WORKON vars in an ebuild script. |
| |
| Args: |
| ebuild_path: Path to the ebuild file (e.g: platform2-9999.ebuild). |
| pkg_name: The package name (e.g.: platform2). |
| |
| Returns: |
| A CrosWorkonVars tuple. |
| """ |
| cros_workon_vars = EBuild._ReadCrosWorkonVars(ebuild_path, pkg_name) |
| return EBuild._FinalizeCrosWorkonVars(cros_workon_vars, ebuild_path) |
| |
| @staticmethod |
| def _ReadCrosWorkonVars( |
| ebuild_path: Union[str, os.PathLike], |
| pkg_name: str, |
| ) -> CrosWorkonVars: |
| """Return the raw values of CROS_WORKON vars in an ebuild script. |
| |
| Args: |
| ebuild_path: Path to the ebuild file (e.g: platform2-9999.ebuild). |
| pkg_name: The package name (e.g.: platform2). |
| |
| Returns: |
| A CrosWorkonVars tuple. |
| """ |
| workon_vars = ( |
| "CROS_WORKON_LOCALNAME", |
| "CROS_WORKON_PROJECT", |
| "CROS_WORKON_SUBDIR", # Obsolete, used for older branches. |
| "CROS_WORKON_ALWAYS_LIVE", |
| "CROS_WORKON_COMMIT", |
| "CROS_WORKON_SUBTREE", |
| ) |
| env = { |
| "CROS_WORKON_LOCALNAME": pkg_name, |
| "CROS_WORKON_ALWAYS_LIVE": "", |
| } |
| settings = osutils.SourceEnvironment(ebuild_path, workon_vars, env=env) |
| # Try to detect problems extracting the variables by checking whether |
| # CROS_WORKON_PROJECT is set. If it isn't, something went wrong, |
| # possibly because we're simplistically sourcing the ebuild without most |
| # of portage being available. That still breaks this script and needs to |
| # be flagged as an error. We won't catch problems setting |
| # CROS_WORKON_LOCALNAME or if CROS_WORKON_PROJECT is set to the wrong |
| # thing, but at least this covers some types of failures. |
| projects = [] |
| subdirs = [] |
| if "CROS_WORKON_PROJECT" in settings: |
| projects = settings["CROS_WORKON_PROJECT"].split(",") |
| if "CROS_WORKON_SUBDIR" in settings: |
| subdirs = settings["CROS_WORKON_SUBDIR"].split(",") |
| |
| if not projects: |
| raise EbuildFormatIncorrectError( |
| ebuild_path, |
| "Unable to determine CROS_WORKON_PROJECT values.", |
| ) |
| |
| localnames = settings["CROS_WORKON_LOCALNAME"].split(",") |
| live = settings["CROS_WORKON_ALWAYS_LIVE"] |
| commit = settings.get("CROS_WORKON_COMMIT") |
| subtrees = [ |
| tuple(subtree.split() or [""]) |
| for subtree in settings.get("CROS_WORKON_SUBTREE", "").split(",") |
| ] |
| |
| return CrosWorkonVars( |
| localname=localnames, |
| project=projects, |
| subdir=subdirs, |
| always_live=live, |
| commit=commit, |
| subtrees=subtrees, |
| ) |
| |
| @staticmethod |
| def _FinalizeCrosWorkonVars( |
| cros_workon_vars: CrosWorkonVars, |
| ebuild_path: Union[str, os.PathLike], |
| ): |
| """Finalize CrosWorkonVars tuple. |
| |
| It is allowed to set different number of entries in CROS_WORKON array |
| variables. In that case, this function completes those variable so that |
| all variables have the same number of entries. |
| |
| Args: |
| cros_workon_vars: A CrosWorkonVars tuple. |
| ebuild_path: Path to the ebuild file (e.g: platform2-9999.ebuild). |
| |
| Returns: |
| A completed CrosWorkonVars tuple. |
| """ |
| localnames = cros_workon_vars.localname |
| projects = cros_workon_vars.project |
| subdirs = cros_workon_vars.subdir |
| subtrees = cros_workon_vars.subtrees |
| |
| # Sanity checks and completion. |
| num_projects = len(projects) |
| |
| # Each project specification has to have the same amount of items. |
| if num_projects != len(localnames): |
| raise EbuildFormatIncorrectError( |
| ebuild_path, |
| "Number of _PROJECT and _LOCALNAME items don't match.", |
| ) |
| |
| if subdirs: |
| if len(subdirs) != len(projects): |
| raise EbuildFormatIncorrectError( |
| ebuild_path, |
| "_SUBDIR is defined inconsistently with _PROJECT (%d vs %d)" |
| % (len(subdirs), len(projects)), |
| ) |
| else: |
| subdirs = [""] * len(projects) |
| |
| # We better have at least one PROJECT at this point. |
| if num_projects == 0: |
| raise EbuildFormatIncorrectError( |
| ebuild_path, "No _PROJECT value found." |
| ) |
| |
| # Subtree must be either 1 or len(project). |
| if num_projects != len(subtrees): |
| if len(subtrees) > 1: |
| raise EbuildFormatIncorrectError( |
| ebuild_path, "Incorrect number of _SUBTREE items." |
| ) |
| # Multiply by num_projects. Note that subtrees is a list of tuples, |
| # and there should be at least one element. |
| subtrees *= num_projects |
| |
| return cros_workon_vars._replace( |
| localname=localnames, |
| project=projects, |
| subdir=subdirs, |
| subtrees=subtrees, |
| ) |
| |
| def GetSourceInfo(self, srcroot, manifest, reject_self_repo=True): |
| """Get source information for this ebuild. |
| |
| Args: |
| srcroot: Full path to the "src" subdirectory in the source |
| repository. |
| manifest: git.ManifestCheckout object. |
| reject_self_repo: Whether to abort if the ebuild lives in the same |
| git repo as it is tracking for uprevs. |
| |
| Returns: |
| EBuild.SourceInfo namedtuple. |
| |
| Raises: |
| raise Error if there are errors with extracting/validating data |
| required for constructing SourceInfo. |
| """ |
| if self.cros_workon_vars is None: |
| if self.is_workon: |
| raise Error(f"{self.ebuild_path} missing workon vars") |
| return SourceInfo(projects=[], srcdirs=[], subdirs=[], subtrees=[]) |
| |
| localnames = self.cros_workon_vars.localname |
| projects = self.cros_workon_vars.project |
| subdirs = self.cros_workon_vars.subdir |
| always_live = self.cros_workon_vars.always_live |
| subtrees = self.cros_workon_vars.subtrees |
| |
| if always_live: |
| return SourceInfo(projects=[], srcdirs=[], subdirs=[], subtrees=[]) |
| |
| # Calculate srcdir (used for core packages). |
| if self.category in ("chromeos-base", "brillo-base"): |
| dir_ = "" |
| else: |
| dir_ = "third_party" |
| |
| # See what git repo the ebuild lives in to make sure the ebuild isn't |
| # tracking the same repo. https://crbug.com/1050663 |
| # We set strict=False if we're running under pytest because it's valid |
| # for ebuilds created in tests to not live in a git tree. |
| under_test = os.environ.get("CHROMITE_INSIDE_PYTEST") == "1" |
| ebuild_git_tree = manifest.FindCheckoutFromPath( |
| self.ebuild_path, strict=not under_test |
| ) |
| if ebuild_git_tree: |
| ebuild_git_tree_path = ebuild_git_tree.get("local_path") |
| else: |
| ebuild_git_tree_path = None |
| |
| subdir_paths = [] |
| subtree_paths = [] |
| for local, project, subdir, subtree in zip( |
| localnames, projects, subdirs, subtrees |
| ): |
| subdir_path = os.path.realpath(os.path.join(srcroot, dir_, local)) |
| if dir_ == "" and not os.path.isdir(subdir_path): |
| subdir_path = os.path.realpath( |
| os.path.join(srcroot, "platform", local) |
| ) |
| |
| if self.subdir_support and subdir: |
| subdir_path = os.path.join(subdir_path, subdir) |
| |
| # Verify that we're grabbing the commit id from the right |
| # project name. |
| real_project = manifest.FindCheckoutFromPath(subdir_path)["name"] |
| if project != real_project: |
| raise Error( |
| "Project name mismatch for %s (found %s, expected %s)" |
| % (subdir_path, real_project, project) |
| ) |
| |
| if subdir_path == ebuild_git_tree_path: |
| msg = ( |
| "%s: ebuilds may not live in & track the same source " |
| "repository (%s); use the empty-project instead" |
| % (self.ebuild_path, subdir_path) |
| ) |
| if reject_self_repo: |
| raise Error(msg) |
| else: |
| logging.warning("Ignoring error: %s", msg) |
| subdir_paths.append(subdir_path) |
| subtree_paths.extend( |
| os.path.join(subdir_path, s) if s else subdir_path |
| for s in subtree |
| ) |
| |
| return SourceInfo( |
| projects=projects, |
| srcdirs=subdir_paths, |
| subdirs=[], |
| subtrees=subtree_paths, |
| ) |
| |
| def GetCommitId(self, srcdir, ref: str = "HEAD"): |
| """Get the commit id for this ebuild. |
| |
| Returns: |
| Commit id (string) for this ebuild. |
| |
| Raises: |
| raise Error if git fails to return the HEAD commit id. |
| """ |
| if not os.path.exists(srcdir): |
| raise SourceDirectoryDoesNotExistError( |
| "Source repository %s for project %s does not exist." |
| % (srcdir, self.pkgname) |
| ) |
| output = self._RunGit(srcdir, ["rev-parse", "%s^{commit}" % ref]) |
| if not output: |
| raise Error("Cannot determine %s commit for %s" % (ref, srcdir)) |
| return output.rstrip() |
| |
| def GetTreeId(self, path, ref: str = "HEAD"): |
| """Get the SHA1 of the source tree for this ebuild. |
| |
| Unlike the commit hash, the SHA1 of the source tree is unaffected by the |
| history of the repository, or by commit messages. |
| |
| Given path can point a regular file, not a directory. If it does not |
| exist, None is returned. |
| |
| Raises: |
| raise Error if git fails to determine the HEAD tree hash. |
| """ |
| if not os.path.exists(path): |
| return None |
| if os.path.isdir(path): |
| basedir = path |
| relpath = "" |
| else: |
| basedir = os.path.dirname(path) |
| relpath = os.path.basename(path) |
| output = self._RunGit(basedir, ["rev-parse", ref + ":./%s" % relpath]) |
| if not output: |
| raise Error("Cannot determine %s tree hash for %s" % (ref, path)) |
| return output.rstrip() |
| |
| def GetVersion(self, srcroot, manifest, default): |
| """Get the base version number for this ebuild. |
| |
| The version is provided by the ebuild through a specific script in |
| the $FILESDIR (chromeos-version.sh). |
| |
| Raises: |
| raise Error when chromeos-version.sh script fails to return the raw |
| version number. |
| """ |
| vers_script = os.path.join( |
| os.path.dirname(self._ebuild_path_no_version), |
| "files", |
| "chromeos-version.sh", |
| ) |
| |
| if not os.path.exists(vers_script): |
| return default |
| |
| if not self.is_workon: |
| raise EbuildFormatIncorrectError( |
| self._ebuild_path_no_version, |
| "Package has a chromeos-version.sh script but is not " |
| "workon-able.", |
| ) |
| |
| srcdirs = self.GetSourceInfo(srcroot, manifest).srcdirs |
| |
| # The chromeos-version script will output a usable raw version number, |
| # or nothing in case of error or no available version |
| result = cros_build_lib.run( |
| ["bash", "-x", vers_script] + srcdirs, |
| capture_output=True, |
| check=False, |
| encoding="utf-8", |
| ) |
| |
| output = result.stdout.strip() |
| if result.returncode or not output: |
| raise Error( |
| "Package %s has a chromeos-version.sh script but failed:\n" |
| "return code = %s\nstdout = %s\nstderr = %s\n" |
| % ( |
| self.pkgname, |
| result.returncode, |
| result.stdout, |
| result.stderr, |
| ) |
| ) |
| |
| # Sanity check: disallow versions that will be larger than the 9999 |
| # ebuild used by cros-workon. |
| main_pv = output.split(".", 1)[0] |
| try: |
| main_pv = int(main_pv) |
| except ValueError: |
| raise ValueError( |
| "%s: PV returned is invalid: %s" % (vers_script, output) |
| ) |
| if main_pv >= int(WORKON_EBUILD_VERSION): |
| raise ValueError( |
| "%s: cros-workon packages must have a PV < %s; not %s" |
| % (vers_script, WORKON_EBUILD_VERSION, output) |
| ) |
| |
| # Sanity check: We should be able to parse a CPV string with the |
| # produced version number. |
| verify_pkg_str = f"foo/bar-{output}" |
| verify_pkg = package_info.parse(verify_pkg_str) |
| if verify_pkg.cpvr != verify_pkg_str: |
| raise EbuildVersionError( |
| "PV returned does not match the version spec: %s" % output |
| ) |
| |
| return output |
| |
| @staticmethod |
| def FormatBashArray(unformatted_list): |
| """Returns a python list in a bash array format. |
| |
| If the list only has one item, format as simple quoted value. |
| That is both backwards-compatible and more readable. |
| |
| Args: |
| unformatted_list: an iterable to format as a bash array. This |
| variable has to be sanitized first, as we don't do any safeties. |
| |
| Returns: |
| A text string that can be used by bash as array declaration. |
| """ |
| if len(unformatted_list) > 1: |
| return '("%s")' % '" "'.join(unformatted_list) |
| else: |
| return '"%s"' % unformatted_list[0] |
| |
| def RevWorkOnEBuild( |
| self, srcroot, manifest, reject_self_repo=True, new_version=None |
| ) -> None: |
| """Revs a workon ebuild given the git commit hash. |
| |
| By default, this class overwrites a new ebuild given the normal |
| ebuild rev'ing logic. |
| |
| Args: |
| srcroot: full path to the 'src' subdirectory in the source |
| repository. |
| manifest: git.ManifestCheckout object. |
| reject_self_repo: Whether to abort if the ebuild lives in the same |
| git repo as it is tracking for uprevs. |
| new_version: The new version number for this ebuild. No revision |
| number. |
| |
| Returns: |
| If the revved package is different than the old ebuild, return a |
| tuple of (full revved package name (including the version number), |
| new stable ebuild path to add to git, old ebuild path to remove from |
| git (if any)). Otherwise, return None. |
| |
| Raises: |
| OSError: Error occurred while creating a new ebuild. |
| IOError: Error occurred while writing to the new revved ebuild file. |
| """ |
| if new_version is not None: |
| if not pms.version_valid(new_version): |
| raise ValueError(f"Invalid version {new_version}") |
| if re.search(r"-r[0-9]+$", new_version): |
| raise ValueError( |
| f"Revision is not allowed, given {new_version}" |
| ) |
| |
| if self.is_stable: |
| starting_pv = self.version_no_rev |
| else: |
| # If given unstable ebuild, use preferred version rather than 9999. |
| starting_pv = "0.0.1" |
| |
| try: |
| stable_version_no_rev = self.GetVersion( |
| srcroot, manifest, starting_pv |
| ) |
| except Exception as e: |
| logging.critical("%s: uprev failed: %s", self.ebuild_path, e) |
| raise |
| |
| old_version = "%s-r%d" % (stable_version_no_rev, self.current_revision) |
| old_stable_ebuild_path = "%s-%s.ebuild" % ( |
| self._ebuild_path_no_version, |
| old_version, |
| ) |
| |
| if new_version or not self.is_stable: |
| revision = ( |
| 1 |
| if new_version != stable_version_no_rev |
| else self.current_revision + 1 |
| ) |
| version = f"{new_version or stable_version_no_rev}-r{revision}" |
| new_stable_ebuild_path = "%s-%s.ebuild" % ( |
| self._ebuild_path_no_version, |
| version, |
| ) |
| else: |
| current_pkg_info = package_info.parse(Path(self.ebuild_path)) |
| stable_version_pkg_info = current_pkg_info.with_version( |
| stable_version_no_rev |
| ) |
| # Use {stable_version}-r1 if there's a new version, otherwise |
| # revbump the current stable ebuild. |
| new_stable_pkg = ( |
| stable_version_pkg_info.revision_bump() |
| if stable_version_pkg_info > current_pkg_info.with_rev0() |
| else current_pkg_info.revision_bump() |
| ) |
| version = new_stable_pkg.vr |
| new_stable_ebuild_path = "%s-%s.ebuild" % ( |
| self._ebuild_path_no_version, |
| new_stable_pkg.vr, |
| ) |
| |
| info = self.GetSourceInfo( |
| srcroot, manifest, reject_self_repo=reject_self_repo |
| ) |
| srcdirs = info.srcdirs |
| subtrees = info.subtrees |
| commit_ids = [self.GetCommitId(x) for x in srcdirs] |
| if not commit_ids: |
| raise InvalidUprevSourceError( |
| "No commit_ids found for %s" % srcdirs |
| ) |
| tree_ids = [self.GetTreeId(x) for x in subtrees] |
| # Make sure they are all valid (e.g. a deleted repo). |
| tree_ids = [tree_id for tree_id in tree_ids if tree_id] |
| if not tree_ids: |
| raise InvalidUprevSourceError("No tree_ids found for %s" % subtrees) |
| variables = dict( |
| CROS_WORKON_COMMIT=self.FormatBashArray(commit_ids), |
| CROS_WORKON_TREE=self.FormatBashArray(tree_ids), |
| ) |
| |
| # We use |self._unstable_ebuild_path| because that will contain the |
| # newest changes to the ebuild (and potentially changes to test subdirs |
| # themselves). |
| subdirs_to_rev = self.GetAutotestSubdirsToRev( |
| self._unstable_ebuild_path, srcdirs[0] |
| ) |
| old_subdirs_to_rev = self.GetAutotestSubdirsToRev( |
| self.ebuild_path, srcdirs[0] |
| ) |
| test_dirs_changed = False |
| if sorted(subdirs_to_rev) != sorted(old_subdirs_to_rev): |
| logging.info( |
| "The list of subdirs in the ebuild %s has changed, uprevving.", |
| self.pkgname, |
| ) |
| test_dirs_changed = True |
| |
| def _CheckForChanges(): |
| # It is possible for chromeos-version.sh to have changed leading to |
| # a non-existent old_stable_ebuild_path. If this is the case, a |
| # change happened so perform the uprev. |
| if not os.path.exists(old_stable_ebuild_path): |
| return True |
| |
| old_stable_commit = self._RunGit( |
| self.overlay, |
| ["log", "--pretty=%H", "-n1", "--", old_stable_ebuild_path], |
| ).rstrip() |
| output = self._RunGit( |
| self.overlay, |
| [ |
| "log", |
| "%s..HEAD" % old_stable_commit, |
| "--", |
| self._unstable_ebuild_path, |
| os.path.join(os.path.dirname(self.ebuild_path), "files"), |
| os.path.join(os.path.dirname(self.ebuild_path), "cros"), |
| ], |
| ) |
| return bool(output) |
| |
| unstable_ebuild_or_files_changed = _CheckForChanges() |
| |
| # If there has been any change in the tests list, choose to uprev. |
| if ( |
| not test_dirs_changed |
| and not unstable_ebuild_or_files_changed |
| and not self._ShouldRevEBuild(commit_ids, srcdirs, subdirs_to_rev) |
| ): |
| logging.info( |
| "Skipping uprev of ebuild %s, none of the rev_subdirs have " |
| "been modified, no files/, nor has the -9999 ebuild.", |
| self.pkgname, |
| ) |
| return |
| |
| logging.info( |
| "Determining whether to create new ebuild %s", |
| new_stable_ebuild_path, |
| ) |
| |
| assert os.path.exists(self._unstable_ebuild_path), ( |
| "Missing unstable ebuild: %s" % self._unstable_ebuild_path |
| ) |
| |
| self.MarkAsStable( |
| self._unstable_ebuild_path, new_stable_ebuild_path, variables |
| ) |
| |
| old_ebuild_path = self.ebuild_path |
| if ( |
| EBuild._AlmostSameEBuilds(old_ebuild_path, new_stable_ebuild_path) |
| and not unstable_ebuild_or_files_changed |
| ): |
| logging.info( |
| "Old and new ebuild %s are exactly identical; skipping uprev", |
| new_stable_ebuild_path, |
| ) |
| os.unlink(new_stable_ebuild_path) |
| return |
| else: |
| logging.info( |
| "Creating new stable ebuild %s", new_stable_ebuild_path |
| ) |
| logging.info( |
| "New ebuild commit id: %s", self.FormatBashArray(commit_ids) |
| ) |
| ebuild_path_to_remove = old_ebuild_path if self.is_stable else None |
| return ( |
| "%s-%s" % (self.package, version), |
| new_stable_ebuild_path, |
| ebuild_path_to_remove, |
| ) |
| |
| def _ShouldRevEBuild(self, commit_ids, srcdirs, subdirs_to_rev): |
| """Determine whether we should attempt to rev |ebuild|. |
| |
| If subdirs_to_rev is empty, this function trivially returns True. |
| |
| Args: |
| commit_ids: Commit ID of the tip of tree for the source dir. |
| srcdirs: Source directory where the git repo is located. |
| subdirs_to_rev: Test subdirectories which have to be checked for |
| modifications since the last stable commit hash. |
| |
| Returns: |
| True is an Uprev is needed, False otherwise. |
| """ |
| if not self.cros_workon_vars: |
| return True |
| if not self.cros_workon_vars.commit: |
| return True |
| if len(commit_ids) != 1: |
| return True |
| if len(srcdirs) != 1: |
| return True |
| if not subdirs_to_rev: |
| return True |
| |
| current_commit_hash = commit_ids[0] |
| stable_commit_hash = self.cros_workon_vars.commit |
| srcdir = srcdirs[0] |
| logrange = "%s..%s" % (stable_commit_hash, current_commit_hash) |
| dirs = [] |
| dirs.extend(subdirs_to_rev) |
| if dirs: |
| # Any change to the unstable ebuild must generate an uprev. If there |
| # are no dirs then this happens automatically (since the git log has |
| # no file list). Otherwise, we must ensure that it works here. |
| dirs.append("*9999.ebuild") |
| git_args = ["log", "--oneline", logrange, "--"] + dirs |
| |
| try: |
| output = EBuild._RunGit(srcdir, git_args) |
| except cros_build_lib.RunCommandError as ex: |
| logging.warning(str(ex)) |
| return True |
| |
| if output: |
| logging.info( |
| " Rev: Determined that one+ of the ebuild %s rev_subdirs " |
| "was touched %s", |
| self.pkgname, |
| list(subdirs_to_rev), |
| ) |
| return True |
| else: |
| logging.info( |
| "Skip: Determined that none of the ebuild %s rev_subdirs " |
| "was touched %s", |
| self.pkgname, |
| list(subdirs_to_rev), |
| ) |
| return False |
| |
| @classmethod |
| def GitRepoHasChanges(cls, directory): |
| """Returns True if there are changes in the given directory.""" |
| # Refresh the index first. This squashes just metadata changes. |
| cls._RunGit(directory, ["update-index", "-q", "--refresh"]) |
| output = cls._RunGit(directory, ["diff-index", "--name-only", "HEAD"]) |
| return output not in [None, ""] |
| |
| @classmethod |
| def _AlmostSameEBuilds(cls, ebuild_path1, ebuild_path2): |
| """Checks if two ebuilds are the same except CROS_WORKON_COMMIT line. |
| |
| Even if CROS_WORKON_COMMIT is different, as long as CROS_WORKON_TREE is |
| the same, we can guarantee the source tree is identical. |
| """ |
| return cls._LoadEBuildForComparison( |
| ebuild_path1 |
| ) == cls._LoadEBuildForComparison(ebuild_path2) |
| |
| @classmethod |
| def _LoadEBuildForComparison(cls, ebuild_path): |
| """Loads an ebuild file dropping CROS_WORKON_COMMIT line.""" |
| lines = osutils.ReadText(ebuild_path).splitlines() |
| return "\n".join( |
| line |
| for line in lines |
| if not cls._WORKON_COMMIT_PATTERN.search(line) |
| ) |
| |
| @classmethod |
| def List(cls, package_dir: Union[str, os.PathLike]): |
| """Generate the path to each ebuild in |package_dir|. |
| |
| Args: |
| package_dir: The package directory. |
| """ |
| for entry in os.listdir(package_dir): |
| if entry.endswith(".ebuild"): |
| yield os.path.join(package_dir, entry) |
| |
| |
| class PortageDBError(Error): |
| """Generic PortageDB error.""" |
| |
| |
| class PortageDB: |
| """Wrapper class to access the portage database located in var/db/pkg.""" |
| |
| _ebuilds: Dict[str, "InstalledPackage"] |
| |
| def __init__( |
| self, |
| root: Union[os.PathLike, str] = "/", |
| vdb: Optional[os.PathLike] = None, |
| package_install_path: Optional[os.PathLike] = None, |
| ) -> None: |
| """Initialize the internal structure for the database in the given root. |
| |
| Args: |
| root: The path to the root to inspect, for example "/build/foo". |
| vdb: Known path to package database in the partition. Defaults to |
| VDB_PATH. |
| package_install_path: The subdirectory path from the root where |
| installed files are stored. e.g. for stateful partitions on test |
| images, 'dev_image'. |
| """ |
| self.root = root |
| self.package_install_path = os.path.join( |
| root, package_install_path or "" |
| ) |
| self.db_path = os.path.join(root, vdb or VDB_PATH) |
| self._ebuilds = {} |
| |
| def GetInstalledPackage(self, category, pv): |
| """Get the InstalledPackage instance for the passed package. |
| |
| Args: |
| category: The category of the package. For example "chromeos-base". |
| pv: The package name with the version (and revision) of the |
| installed package. For example "libchrome-271506-r5". |
| |
| Returns: |
| An InstalledPackage instance for the requested package or None if |
| the requested package is not found. |
| """ |
| pkg_key = "%s/%s" % (category, pv) |
| if pkg_key in self._ebuilds: |
| return self._ebuilds[pkg_key] |
| |
| # Create a new InstalledPackage instance and cache it. |
| pkgdir = os.path.join(self.db_path, category, pv) |
| try: |
| pkg = InstalledPackage(self, pkgdir, category, pv) |
| except PortageDBError: |
| return None |
| self._ebuilds[pkg_key] = pkg |
| return pkg |
| |
| def InstalledPackages(self): |
| """Lists all portage packages in the database. |
| |
| Returns: |
| A list of InstalledPackage instances for each package in the |
| database. |
| """ |
| ebuild_pattern = os.path.join(self.db_path, "*/*/*.ebuild") |
| packages = [] |
| |
| for path in glob.glob(ebuild_pattern): |
| category, pf, packagecheck = SplitEbuildPath(path) |
| if not _category_re.match(category): |
| continue |
| if pf != packagecheck: |
| continue |
| pkg_key = "%s/%s" % (category, pf) |
| if pkg_key not in self._ebuilds: |
| self._ebuilds[pkg_key] = InstalledPackage( |
| self, os.path.join(self.db_path, category, pf), category, pf |
| ) |
| packages.append(self._ebuilds[pkg_key]) |
| |
| return packages |
| |
| |
| class InstalledPackage: |
| """Wrapper class for information about an installed package. |
| |
| This class accesses the information provided by var/db/pkg for an installed |
| ebuild, such as the list of files installed by this package. |
| """ |
| |
| # "type" constants for the ListContents() return value. |
| OBJ = "obj" |
| SYM = "sym" |
| DIR = "dir" |
| |
| def __init__(self, portage_db, pkgdir, category=None, pf=None) -> None: |
| """Initialize the installed ebuild wrapper. |
| |
| Args: |
| portage_db: The PortageDB instance where the ebuild is installed. |
| This is used to query the database about other installed |
| ebuilds, for example, the ones listed in DEPEND, but otherwise |
| it isn't used. |
| pkgdir: The directory where the installed package resides. This |
| could be for example a directory like "var/db/pkg/category/pf" |
| or the "build-info" directory in the portage temporary directory |
| where the package is being built. |
| category: The category of the package. If omitted, it will be loaded |
| from the package contents. |
| pf: The package and version of the package. If omitted, it will be |
| loaded from the package contents. This avoids unnecessary lookup |
| when this value is known. |
| |
| Raises: |
| PortageDBError if the pkgdir doesn't contain a valid package. |
| """ |
| self._portage_db = portage_db |
| self.pkgdir = pkgdir |
| self._fields = {} |
| # Prepopulate the field cache with the category and pf (if provided). |
| if not category is None: |
| self._fields["CATEGORY"] = category |
| if not pf is None: |
| self._fields["PF"] = pf |
| |
| if self.pf is None: |
| raise PortageDBError( |
| "Package doesn't contain package-version value." |
| ) |
| |
| # Check that the ebuild is present. |
| ebuild_path = os.path.join(self.pkgdir, "%s.ebuild" % self.pf) |
| if not os.path.exists(ebuild_path): |
| raise PortageDBError( |
| "Package doesn't contain an ebuild file; expected " |
| f"{ebuild_path}." |
| ) |
| |
| split_pv = package_info.parse(self.pf) |
| if not split_pv.pv: |
| raise PortageDBError( |
| 'Package and version "%s" doesn\'t have a valid format.' |
| % self.pf |
| ) |
| self.package = split_pv.package |
| self.version = split_pv.vr |
| self._pkg_info = None |
| |
| def _ReadField(self, field_name): |
| """Reads the contents of the file in the installed package directory. |
| |
| Args: |
| field_name: The name of the field to read, for example, 'SLOT' or |
| 'LICENSE'. |
| |
| Returns: |
| A string with the contents of the file. The contents of the file are |
| cached in _fields. If the file doesn't exists returns None. |
| """ |
| if field_name not in self._fields: |
| try: |
| value = osutils.ReadText( |
| os.path.join(self.pkgdir, field_name) |
| ).strip() |
| except IOError as e: |
| if e.errno != errno.ENOENT: |
| raise |
| value = None |
| self._fields[field_name] = value |
| return self._fields[field_name] |
| |
| def _read_depend_field(self, field_name: str) -> pms_dependency.RootNode: |
| """Read a dependency field. |
| |
| NB: We don't reduce for the caller. While the package manager will |
| flatten USE conditionals like flag?(), it retains ||(). This has been |
| a bit of a debate upstream as to what's "correct", but it's hard for the |
| PM to guess what the ebuild ended up actually using at build time, and |
| what it plans on doing at runtime. We too will defer the flattening to |
| the caller to try and evaluate what's best. |
| |
| A lazy implementation would just call `.reduce()` on the result and hope |
| for the best, and most of the time it probably would be correct. Either |
| way, it really needs the complete installed state (e.g. PortageDB) to be |
| able to produce a better answer. |
| """ |
| value = self._ReadField(field_name) |
| if not value: |
| # Returning an empty node rather than None makes callers easier to |
| # implement, and we probably will never care about "file exists at |
| # all" vs "file existed but was empty". |
| return pms_dependency.RootNode() |
| return pms_dependency.parse(value) |
| |
| @property |
| def category(self): |
| return self._ReadField("CATEGORY") |
| |
| @property |
| def homepage(self): |
| return self._ReadField("HOMEPAGE") |
| |
| @property |
| def license(self): |
| return self._ReadField("LICENSE") |
| |
| @property |
| def requires(self) -> Dict[str, List[str]]: |
| """Get the REQUIRES field. |
| |
| The raw file format is: |
| arch1: lib1.so lib2.so |
| arch2: lib3.so lib4.so |
| |
| This gets parsed into a dictionary mapping architectures to libraries. |
| """ |
| result: Dict[str, List[str]] = {} |
| field = self._ReadField("REQUIRES") |
| if not field: |
| return result |
| for line in field.splitlines(): |
| arch, _, libs = line.partition(":") |
| result[arch] = libs.split() |
| return result |
| |
| @property |
| def pf(self): |
| return self._ReadField("PF") |
| |
| @property |
| def repository(self): |
| return self._ReadField("repository") |
| |
| @property |
| def size(self): |
| return self._ReadField("SIZE") |
| |
| @property |
| def needed(self): |
| """Returns a mapping of files to the libraries they need.""" |
| needed = self._ReadField("NEEDED") |
| if needed is None: |
| return needed |
| # Example: |
| # /usr/sbin/bootlockboxd libmetrics.so,libhwsec.so,... |
| # /usr/sbin/bootlockboxtool libbootlockbox-client.so,... |
| # ... |
| mapping = {} |
| for line in needed.split("\n"): |
| parts = line.split(" ", 1) |
| if len(parts) > 1: |
| mapping[parts[0]] = parts[1].split(",") if parts[1] else [] |
| return mapping |
| |
| @property |
| def package_info(self): |
| if not self._pkg_info: |
| self._pkg_info = package_info.parse(f"{self.category}/{self.pf}") |
| |
| return self._pkg_info |
| |
| @property |
| def depend(self) -> pms_dependency.RootNode: |
| """The packages we built against.""" |
| return self._read_depend_field("DEPEND") |
| |
| @property |
| def rdepend(self) -> pms_dependency.RootNode: |
| """The packages we need to run with.""" |
| return self._read_depend_field("RDEPEND") |
| |
| @property |
| def bdepend(self) -> pms_dependency.RootNode: |
| """The packages we compiled with.""" |
| return self._read_depend_field("BDEPEND") |
| |
| @property |
| def use(self) -> Set[str]: |
| """The effective USE flags from the package build.""" |
| return set(self._ReadField("USE").split(" ")) |
| |
| def ListContents(self): |
| """List of files and directories installed by this package. |
| |
| Returns: |
| A list of tuples (file_type, path) where the file_type is a string |
| determining the type of the installed file: InstalledPackage.OBJ |
| (regular files), InstalledPackage.SYM (symlinks) or |
| InstalledPackage.DIR (directory), and path is the relative path of |
| the file to the root like 'usr/bin/ls'. |
| """ |
| path = os.path.join(self.pkgdir, "CONTENTS") |
| if not os.path.exists(path): |
| return [] |
| |
| result = [] |
| # TODO(b/236161656): Fix. |
| # pylint: disable-next=consider-using-with |
| for line in open(path, encoding="utf-8"): |
| line = line.strip() |
| # Line format is: "type file_path [more space-separated fields]". |
| # Discard any other line without at least the first two fields. The |
| # remaining fields depend on the type. |
| typ, data = line.split(" ", 1) |
| if typ == self.OBJ: |
| file_path, _file_hash, _mtime = data.rsplit(" ", 2) |
| elif typ == self.DIR: |
| file_path = data |
| elif typ == self.SYM: |
| file_path, _ = data.split(" -> ", 1) |
| else: |
| # Unknown type. |
| continue |
| result.append((typ, file_path.lstrip("/"))) |
| |
| return result |
| |
| |
| def BestEBuild(ebuilds: List[EBuild]) -> Optional[EBuild]: |
| """Returns the newest EBuild from a list of EBuild objects.""" |
| # pylint: disable-next=import-error |
| from portage.versions import vercmp # type: ignore |
| |
| if not ebuilds: |
| return None |
| winner = ebuilds[0] |
| for ebuild in ebuilds[1:]: |
| if vercmp(winner.version, ebuild.version) < 0: |
| winner = ebuild |
| return winner |
| |
| |
| def _FindUprevCandidates( |
| files: Iterable[str], |
| allow_manual_uprev: bool, |
| subdir_support: bool, |
| ): |
| """Return the uprev candidate ebuild from a specified list of files. |
| |
| Usually an uprev candidate is a the stable ebuild in a cros_workon |
| directory. However, if no such stable ebuild exists (someone just |
| checked in the 9999 ebuild), this is the unstable ebuild. |
| |
| If the package isn't a cros_workon package, return None. |
| |
| Args: |
| files: List of files in a package directory. |
| allow_manual_uprev: If False, discard manually uprevved packages. |
| subdir_support: Support obsolete CROS_WORKON_SUBDIR. Intended for |
| branches older than 10363.0.0. |
| |
| Raises: |
| raise Error if there is error with validating the ebuild files. |
| """ |
| stable_ebuilds = [] |
| unstable_ebuilds = [] |
| # Track if we found an unstable ebuild that is a manual uprev. This is to |
| # allow adding CROS_WORKON_MANUAL_UPREV to only the unstable ebuild without |
| # causing an error to be thrown here. |
| unstable_manual_uprev = False |
| for path in files: |
| if not path.endswith(".ebuild") or os.path.islink(path): |
| continue |
| ebuild = EBuild(path, subdir_support) |
| if not ebuild.is_workon: |
| continue |
| elif ebuild.is_manually_uprevved and not allow_manual_uprev: |
| if not ebuild.is_stable: |
| unstable_manual_uprev = True |
| continue |
| |
| if ebuild.is_stable: |
| if ebuild.version == WORKON_EBUILD_VERSION: |
| raise Error( |
| "KEYWORDS in %s ebuild should not be stable %s" |
| % (WORKON_EBUILD_VERSION, path) |
| ) |
| stable_ebuilds.append(ebuild) |
| else: |
| unstable_ebuilds.append(ebuild) |
| |
| # If both ebuild lists are empty, the passed in file list was for |
| # a non-workon package. |
| if not unstable_ebuilds: |
| if stable_ebuilds and not unstable_manual_uprev: |
| path = os.path.dirname(stable_ebuilds[0].ebuild_path) |
| raise Error( |
| "Missing %s ebuild in %s" % (WORKON_EBUILD_VERSION, path) |
| ) |
| return None |
| |
| path = os.path.dirname(unstable_ebuilds[0].ebuild_path) |
| assert len(unstable_ebuilds) <= 1, ( |
| "Found multiple unstable ebuilds in %s" % path |
| ) |
| |
| if not stable_ebuilds: |
| logging.warning("Missing stable ebuild in %s", path) |
| return unstable_ebuilds[0] |
| |
| if len(stable_ebuilds) == 1: |
| return stable_ebuilds[0] |
| |
| stable_versions = set(ebuild.version_no_rev for ebuild in stable_ebuilds) |
| if len(stable_versions) > 1: |
| package = stable_ebuilds[0].package |
| message = "Found multiple stable ebuild versions in %s:" % path |
| for version in stable_versions: |
| message += "\n %s-%s" % (package, version) |
| raise Error(message) |
| |
| uprev_ebuild = max(stable_ebuilds, key=lambda eb: eb.current_revision) |
| for ebuild in stable_ebuilds: |
| if ebuild != uprev_ebuild: |
| logging.warning( |
| "Ignoring stable ebuild revision %s in %s", ebuild.version, path |
| ) |
| return uprev_ebuild |
| |
| |
| def GetOverlayEBuilds( |
| overlay, use_all, packages, allow_manual_uprev=False, subdir_support=False |
| ): |
| """Get ebuilds from the specified overlay. |
| |
| Args: |
| overlay: The path of the overlay to get ebuilds. |
| use_all: Whether to include all ebuilds in the specified directories. If |
| true, then we gather all packages in the directories regardless of |
| whether they are in our set of packages. |
| packages: A set of the packages we want to gather. If use_all is True, |
| this argument is ignored, and should be None. |
| allow_manual_uprev: Whether to consider manually uprevved ebuilds. |
| subdir_support: Support obsolete CROS_WORKON_SUBDIR. Intended for |
| branches older than 10363.0.0. |
| |
| Returns: |
| A list of ebuilds of the overlay. |
| """ |
| ebuilds = [] |
| for package_dir, _dirs, files in os.walk(overlay): |
| # If we were given a list of packages to uprev, only consider the files |
| # whose potential CP match. |
| # This allows us to manually uprev a specific ebuild without throwing |
| # errors on every badly formatted manually uprevved ebuild. |
| package_name = os.path.basename(package_dir) |
| category = os.path.basename(os.path.dirname(package_dir)) |
| |
| # If the --all option isn't used, we only want to update packages that |
| # are in packages. |
| if not (use_all or os.path.join(category, package_name) in packages): |
| continue |
| |
| paths = [os.path.join(package_dir, path) for path in files] |
| ebuild = _FindUprevCandidates(paths, allow_manual_uprev, subdir_support) |
| |
| # Add stable ebuild. |
| if ebuild: |
| ebuilds.append(ebuild) |
| |
| return ebuilds |
| |
| |
| def _Egencache( |
| repo_name: str, |
| repos_conf: Optional[str] = None, |
| chroot: Optional[chroot_lib.Chroot] = None, |
| log_output: bool = True, |
| ) -> cros_build_lib.CompletedProcess: |
| """Execute egencache for repo_name inside the chroot. |
| |
| Args: |
| repo_name: Name of the repo for the overlay. |
| repos_conf: Alternative repos.conf file. |
| chroot: Chroot to run in. |
| log_output: Log output of cros_build_run commands. |
| |
| Returns: |
| A cros_build_lib.CompletedProcess object. |
| """ |
| cmd = [ |
| "egencache", |
| "--update", |
| "--repo", |
| repo_name, |
| "--jobs", |
| str(os.cpu_count()), |
| ] |
| if repos_conf: |
| cmd += ["--repos-conf", repos_conf] |
| chroot = chroot or chroot_lib.Chroot() |
| return chroot.run(cmd, log_output=log_output) |
| |
| |
| def generate_repositories_configuration( |
| chroot: Optional[chroot_lib.Chroot] = None, |
| ) -> str: |
| """Make a repositories configuration with all overlays for egencache. |
| |
| Generate the repositories configuration containing every overlay in the same |
| format as repos.conf so egencache can produce an md5-cache for every overlay |
| The repositories configuration can be accepted as a string in egencache. |
| |
| Returns: |
| The string of the new repositories configuration. |
| """ |
| repos_config = "" |
| overlays = FindOverlays(constants.BOTH_OVERLAYS) |
| for overlay_path in overlays: |
| overlay_name = GetOverlayName(overlay_path) |
| if chroot: |
| overlay_path = chroot.chroot_path(overlay_path) |
| repos_config += f"[{overlay_name}]\nlocation = {overlay_path}\n" |
| return repos_config |
| |
| |
| def RegenCache( |
| overlay: str, |
| commit_changes: bool = True, |
| chroot: Optional[chroot_lib.Chroot] = None, |
| repos_conf: Optional[str] = None, |
| ) -> Optional[str]: |
| """Regenerate the cache of the specified overlay. |
| |
| Args: |
| overlay: The tree to regenerate the cache for. |
| commit_changes: Whether to commit the changes. |
| chroot: A chroot to enter. |
| repos_conf: Alternative repos.conf file. |
| |
| Returns: |
| The overlay when there are outstanding changes, or None when there were |
| no updates or all updates were committed. This is meant to be a simple, |
| parallel_lib friendly means of identifying which overlays have been |
| changed. |
| """ |
| repo_name = GetOverlayName(overlay) |
| if not repo_name: |
| return None |
| |
| layout = key_value_store.LoadFile( |
| os.path.join(overlay, "metadata", "layout.conf"), |
| ignore_missing=True, |
| ) |
| if layout.get("cache-format") != "md5-dict": |
| return None |
| |
| if chroot and repos_conf: |
| repos_conf = chroot.chroot_path(repos_conf) |
| |
| # Regen for the whole repo. |
| _Egencache(repo_name, repos_conf=repos_conf, chroot=chroot) |
| # If there was nothing new generated, then let's just bail. |
| result = git.RunGit(overlay, ["status", "-s", "metadata/"]) |
| if not result.stdout: |
| return None |
| |
| if not commit_changes: |
| return overlay |
| |
| # Explicitly add any new files to the index. |
| git.RunGit(overlay, ["add", "metadata/"]) |
| # Explicitly tell git to also include rm-ed files. |
| git.RunGit(overlay, ["commit", "-m", "regen cache", "metadata/"]) |
| return None |
| |
| |
| def RegenDependencyCache( |
| board: Optional[str] = None, |
| sysroot: Optional[Union[str, os.PathLike]] = None, |
| jobs: Optional[int] = None, |
| ) -> None: |
| """Regenerate the dependency cache in parallel. |
| |
| Use emerge --regen instead of egencache to regenerate metadata caches for |
| all overlays (egencache only updates the cache for specific overlays). |
| |
| Args: |
| board: The board to inspect. |
| sysroot: The root directory being inspected. |
| jobs: The number of regeneration jobs to run in parallel. |
| |
| Raises: |
| cros_build_lib.RunCommandError |
| """ |
| logging.info("Rebuilding Portage dependency cache.") |
| cmd = ["parallel_emerge", "--regen", "--quiet"] |
| |
| if board: |
| cmd.append(f"--board={board}") |
| if sysroot: |
| cmd.append(f"--sysroot={sysroot}") |
| if jobs: |
| cmd.append(f"--jobs={jobs}") |
| |
| cros_build_lib.run(cmd, enter_chroot=True) |
| |
| |
| def ParseBashArray(value): |
| """Parse a valid bash array into python list.""" |
| # The syntax for bash arrays is nontrivial, so let's use bash to do the |
| # heavy lifting for us. |
| sep = "," |
| # Because %s may contain bash comments (#), put a clever newline in the way. |
| cmd = 'ARR=%s\nIFS=%s; echo -n "${ARR[*]}"' % (value, sep) |
| return cros_build_lib.run( |
| cmd, print_cmd=False, shell=True, capture_output=True, encoding="utf-8" |
| ).stdout.split(sep) |
| |
| |
| def WorkonEBuildGeneratorForDirectory(base_dir, subdir_support=False): |
| """Yields cros_workon EBuilds in |base_dir|. |
| |
| Args: |
| base_dir: Path to the base directory. |
| subdir_support: Support obsolete CROS_WORKON_SUBDIR. Intended for |
| branches older than 10363.0.0. |
| |
| Yields: |
| A cros_workon EBuild instance. |
| """ |
| for root, _, files in os.walk(base_dir): |
| for filename in files: |
| # Only look at *-9999.ebuild files. |
| if filename.endswith(WORKON_EBUILD_SUFFIX): |
| full_path = os.path.join(root, filename) |
| ebuild = EBuild(full_path, subdir_support) |
| if not ebuild.is_workon: |
| continue |
| yield ebuild |
| |
| |
| def WorkonEBuildGenerator(buildroot, overlay_type): |
| """Scans all overlays and yields cros_workon EBuilds. |
| |
| Args: |
| buildroot: Path to source root to find overlays. |
| overlay_type: The type of overlay to use (one of |
| constants.VALID_OVERLAYS). |
| |
| Yields: |
| A cros_workon EBuild instance. |
| """ |
| # Get the list of all overlays. |
| overlays = FindOverlays(overlay_type, buildroot=buildroot) |
| # Iterate through overlays and gather all workon ebuilds |
| for overlay in overlays: |
| for ebuild in WorkonEBuildGeneratorForDirectory(overlay): |
| yield ebuild |
| |
| |
| def GetWorkonProjectMap(overlay, subdirectories): |
| """Get a mapping of cros_workon ebuilds to projects and source paths. |
| |
| Args: |
| overlay: Overlay to look at. |
| subdirectories: List of subdirectories to look in on the overlay. |
| |
| Yields: |
| Tuples containing (filename, projects) for cros-workon ebuilds |
| in the given overlay under the given subdirectories. |
| """ |
| # Search ebuilds for project names, ignoring non-existent directories. |
| # Also filter out ebuilds which are not cros_workon. |
| for subdir in subdirectories: |
| base_dir = os.path.join(overlay, subdir) |
| for ebuild in WorkonEBuildGeneratorForDirectory(base_dir): |
| full_path = ebuild.ebuild_path |
| workon_vars = ebuild.cros_workon_vars |
| relpath = os.path.relpath(full_path, start=overlay) |
| yield relpath, workon_vars.project |
| |
| |
| def EbuildToCP(path): |
| """Return the category/path string from an ebuild path. |
| |
| Args: |
| path: Path to an ebuild. |
| |
| Returns: |
| '$CATEGORY/$PN' (e.g. 'sys-apps/dbus') |
| """ |
| return os.path.join(*SplitEbuildPath(path)[0:2]) |
| |
| |
| def SplitEbuildPath(path): |
| """Split an ebuild path into its components. |
| |
| Given a specified ebuild filename, returns $CATEGORY, $PN, $P. It does not |
| perform any check on ebuild name elements or their validity, merely splits |
| a filename, absolute or relative, and returns the last 3 components. |
| |
| Examples: |
| For /any/path/chromeos-base/power_manager/power_manager-9999.ebuild, |
| returns ('chromeos-base', 'power_manager', 'power_manager-9999'). |
| |
| Args: |
| path: Path to the ebuild. |
| |
| Returns: |
| $CATEGORY, $PN, $P |
| """ |
| return os.path.splitext(path)[0].rsplit("/", 3)[-3:] |
| |
| |
| def FindWorkonProjects(packages): |
| """Find the projects associated with the specified cros_workon packages. |
| |
| Args: |
| packages: List of cros_workon packages. |
| |
| Returns: |
| The set of projects associated with the specified cros_workon packages. |
| """ |
| all_projects = set() |
| buildroot, both = constants.SOURCE_ROOT, constants.BOTH_OVERLAYS |
| for overlay in FindOverlays(both, buildroot=buildroot): |
| for _, projects in GetWorkonProjectMap(overlay, packages): |
| all_projects.update(projects) |
| return all_projects |
| |
| |
| def ListInstalledPackages(sysroot): |
| """[DEPRECATED] Lists all portage packages in a given portage-managed root. |
| |
| Assumes the existence of a /var/db/pkg package database. |
| |
| This function is DEPRECATED, please use PortageDB.InstalledPackages instead. |
| |
| Args: |
| sysroot: The root directory being inspected. |
| |
| Returns: |
| A list of (cp,v) tuples in the given sysroot. |
| """ |
| return [ |
| ("%s/%s" % (pkg.category, pkg.package), pkg.version) |
| for pkg in PortageDB(sysroot).InstalledPackages() |
| ] |
| |
| |
| def IsPackageInstalled(package, sysroot="/"): |
| """Return whether a portage package is in a given portage-managed root. |
| |
| Args: |
| package: The CP to look for. |
| sysroot: The root being inspected. |
| """ |
| for key, _version in ListInstalledPackages(sysroot): |
| if key == package: |
| return True |
| |
| return False |
| |
| |
| def _Equery( |
| module: str, |
| *args: str, |
| board: Optional[str] = None, |
| sysroot: Optional[str] = None, |
| chroot: Optional[chroot_lib.Chroot] = None, |
| quiet: bool = True, |
| extra_env: Optional[Dict[str, str]] = None, |
| check: bool = True, |
| ) -> cros_build_lib.CompletedProcess: |
| """Executes equery commands. |
| |
| Args: |
| module: The equery module to run. |
| *args: Arguments to the equery module. |
| board: The board to inspect. |
| sysroot: The root directory being inspected. |
| chroot: The chroot to work with. |
| quiet: Whether to run the module in quiet mode. This is module |
| specific, so consult the documentation for behavior. |
| extra_env: Extra environment settings to pass down. |
| check: Whether to throw an exception if the command fails. |
| |
| Returns: |
| A cros_build_lib.CompletedProcess object. |
| """ |
| chroot = chroot or chroot_lib.Chroot() |
| # There is no situation where we want color or pipe detection. |
| cmd = [_GetSysrootTool("equery", board, sysroot), "--no-color", "--no-pipe"] |
| if quiet: |
| cmd += ["--quiet"] |
| cmd += [module] |
| cmd += args |
| |
| return chroot.run( |
| cmd, |
| debug_level=logging.DEBUG, |
| capture_output=True, |
| extra_env=extra_env, |
| check=check, |
| encoding="utf-8", |
| ) |
| |
| |
| def _EqueryList( |
| pkg_str: str, |
| board: Optional[str] = None, |
| chroot: Optional[chroot_lib.Chroot] = None, |
| ) -> cros_build_lib.CompletedProcess: |
| """Executes equery list command. |
| |
| Args: |
| pkg_str: The package name with optional category, version, and slot. |
| board: The board to inspect. |
| chroot: The chroot to work with. |
| |
| Returns: |
| A cros_build_lib.CompletedProcess object. |
| """ |
| return _Equery( |
| "list", |
| pkg_str, |
| board=board, |
| chroot=chroot, |
| check=False, |
| ) |
| |
| |
| def FindPackageNameMatches( |
| pkg_str: str, |
| board: Optional[str] = None, |
| chroot: Optional[chroot_lib.Chroot] = None, |
| ) -> List[package_info.PackageInfo]: |
| """Finds a list of installed packages matching |pkg_str|. |
| |
| Args: |
| pkg_str: The package name with optional category, version, and slot. |
| board: The board to inspect. |
| chroot: The chroot to work with. |
| |
| Returns: |
| An iterable of matched PackageInfo objects. |
| """ |
| result = _EqueryList(pkg_str, board=board, chroot=chroot) |
| |
| matches = [] |
| if result.returncode == 0: |
| matches = [package_info.parse(x) for x in result.stdout.splitlines()] |
| |
| return matches |
| |
| |
| def FindPackageNamesForFiles(*args: str) -> List[package_info.PackageInfo]: |
| """Finds the list of packages that own the files in |args|. |
| |
| Returns: |
| An iterable of matched PackageInfo objects. |
| """ |
| result = _Equery("belongs", quiet=True, check=False, *args) |
| return [package_info.parse(x) for x in result.stdout.strip().splitlines()] |
| |
| |
| def FindOverlaysForPackages(*args: str) -> List[str]: |
| """Find the corresponding overlays that installed the provided packages. |
| |
| Returns: |
| A same-sized list of strings with corresponding overlay repo names. |
| """ |
| if not args: |
| return [] |
| equery_args = ["--format=$repo"] |
| equery_args += args |
| result = _Equery("list", quiet=True, check=True, *equery_args) |
| overlays = result.stdout.strip().splitlines() |
| # If returncode was 0, output lines should match arg count. But not if it's |
| # been mocked incorrectly. |
| if len(args) != len(overlays): |
| raise ValueError( |
| "Incorrect line count in `equery list` output." |
| f" Args={args}, stdout='{result.stdout}'" |
| ) |
| return overlays |
| |
| |
| def FindEbuildForBoardPackage( |
| pkg_str: str, board: str, buildroot: BuildrootType = constants.SOURCE_ROOT |
| ): |
| """Returns a path to an ebuild for a particular board.""" |
| cmd = [f"equery-{board}", "which", pkg_str] |
| return cros_build_lib.run( |
| cmd, |
| cwd=buildroot, |
| enter_chroot=True, |
| capture_output=True, |
| encoding="utf-8", |
| ).stdout.strip() |
| |
| |
| def _EqueryWhich( |
| packages_list: List[str], |
| sysroot: str, |
| chroot: Optional[chroot_lib.Chroot] = None, |
| include_masked: bool = False, |
| extra_env: Optional[Dict[str, str]] = None, |
| check: bool = False, |
| ) -> cros_build_lib.CompletedProcess: |
| """Executes an equery command, returns the result of the cmd. |
| |
| Args: |
| packages_list: The list of package (string) names with optional |
| category, version, and slot. |
| sysroot: The root directory being inspected. |
| chroot: The chroot to work with. |
| include_masked: True iff we should include masked ebuilds in our query. |
| extra_env: optional dictionary of extra string/string pairs to use as |
| the environment of equery command. |
| check: If False, do not raise an exception when run returns a non-zero |
| exit code. If any package does not exist causing the run to fail, we |
| will return information for none of the packages, i.e: return an |
| empty dictionary. |
| |
| Returns: |
| result (cros_build_lib.CompletedProcess) |
| """ |
| args = [] |
| if include_masked: |
| args += ["--include-masked"] |
| args += packages_list |
| return _Equery( |
| "which", |
| *args, |
| sysroot=sysroot, |
| chroot=chroot, |
| quiet=False, |
| extra_env=extra_env, |
| check=check, |
| ) |
| |
| |
| def FindEbuildsForPackages( |
| packages_list, |
| sysroot, |
| chroot: Optional[chroot_lib.Chroot] = None, |
| include_masked=False, |
| extra_env=None, |
| check=False, |
| ): |
| """Returns paths to the ebuilds for the packages in |packages_list|. |
| |
| Args: |
| packages_list: The list of package (string) names with optional |
| category, version, and slot. |
| sysroot: The root directory being inspected. |
| chroot: The chroot to work with. |
| include_masked: True iff we should include masked ebuilds in our query. |
| extra_env: optional dictionary of extra string/string pairs to use as |
| the environment of equery command. |
| check: If False, do not raise an exception when run returns a non-zero |
| exit code. If any package does not exist causing the run to fail, we |
| will return information for none of the packages, i.e: return an |
| empty dictionary. |
| |
| Returns: |
| A map from packages in |packages_list| to their corresponding ebuilds. |
| """ |
| if not packages_list: |
| return {} |
| |
| result = _EqueryWhich( |
| packages_list, |
| sysroot, |
| chroot=chroot, |
| include_masked=include_masked, |
| extra_env=extra_env, |
| check=check, |
| ) |
| if result.returncode: |
| return {} |
| |
| ebuilds_results = result.stdout.strip().splitlines() |
| # Asserting the directory name of the ebuild matches the package name. |
| mismatches = [] |
| ret = dict(zip(packages_list, ebuilds_results)) |
| for full_package_name, ebuild_path in ret.items(): |
| cpv = package_info.parse(full_package_name) |
| path_category, path_package_name, _ = SplitEbuildPath(ebuild_path) |
| if not ( |
| (cpv.category is None or path_category == cpv.category) |
| and cpv.package.startswith(path_package_name) |
| ): |
| mismatches.append( |
| "%s doesn't match %s" % (ebuild_path, full_package_name) |
| ) |
| |
| assert not mismatches, ( |
| "Detected mismatches between the package & corresponding ebuilds: %s" |
| % "\n".join(mismatches) |
| ) |
| |
| return ret |
| |
| |
| def FindEbuildForPackage( |
| pkg_str, |
| sysroot, |
| chroot: Optional[chroot_lib.Chroot] = None, |
| include_masked=False, |
| extra_env=None, |
| check=False, |
| ): |
| """Returns a path to an ebuild responsible for package matching |pkg_str|. |
| |
| Args: |
| pkg_str: The package name with optional category, version, and slot. |
| sysroot: The root directory being inspected. |
| chroot: The chroot to work with. |
| include_masked: True iff we should include masked ebuilds in our query. |
| extra_env: optional dictionary of extra string/string pairs to use as |
| the environment of equery command. |
| check: If False, do not raise an exception when run returns a non-zero |
| exit code. Instead, return None. |
| |
| Returns: |
| Path to ebuild for this package. |
| """ |
| ebuilds_map = FindEbuildsForPackages( |
| [pkg_str], |
| sysroot, |
| chroot=chroot, |
| include_masked=include_masked, |
| extra_env=extra_env, |
| check=check, |
| ) |
| if not ebuilds_map: |
| return None |
| return ebuilds_map[pkg_str] |
| |
| |
| def FindEbuildsForOverlays( |
| overlays: List[Union[str, os.PathLike]] |
| ) -> Iterator[Path]: |
| """Get paths to ebuilds using the given overlay paths. |
| |
| Args: |
| overlays: A list of overlay paths to get ebuilds for. |
| |
| Returns: |
| A generator of paths to ebuild files. |
| """ |
| return itertools.chain.from_iterable( |
| Path(x).rglob("*.ebuild") for x in overlays |
| ) |
| |
| |
| def _EqueryDepgraph( |
| pkg_str: str, |
| sysroot: str, |
| board: Optional[str], |
| chroot: Optional[chroot_lib.Chroot] = None, |
| depth: int = 0, |
| ) -> cros_build_lib.CompletedProcess: |
| """Executes equery depgraph to find dependencies. |
| |
| Args: |
| pkg_str: The package name with optional category, version, and slot. |
| sysroot: The root directory being inspected. |
| board: The board to inspect. |
| chroot: The chroot to work with. |
| depth: The depth of the transitive dependency tree to explore. 0 for |
| unlimited. |
| |
| Returns: |
| result (cros_build_lib.CompletedProcess) |
| """ |
| return _Equery( |
| "depgraph", |
| f"--depth={depth}", |
| pkg_str, |
| sysroot=sysroot, |
| board=board, |
| chroot=chroot, |
| ) |
| |
| |
| def GetFlattenedDepsForPackage( |
| pkg_str, sysroot="/", board: Optional[str] = "", depth=0 |
| ): |
| """Returns a depth-limited list of the dependencies for a given package. |
| |
| Args: |
| pkg_str: The package name with optional category, version, and slot. |
| sysroot: The root directory being inspected. |
| board: The board being inspected. |
| depth: The depth of the transitive dependency tree to explore. 0 for |
| unlimited. |
| |
| Returns: |
| List[str]: A list of the dependencies of the package. Includes the |
| package itself. |
| """ |
| if not pkg_str: |
| raise ValueError("pkg_str must be non-empty") |
| |
| result = _EqueryDepgraph(pkg_str, sysroot, board, depth=depth) |
| |
| return _ParseDepTreeOutput(result.stdout) |
| |
| |
| def _ParseDepTreeOutput(equery_output): |
| """Parses the output of `equery -CQn depgraph` in to a list of package CPVs. |
| |
| Args: |
| equery_output: A string containing the output of the `equery depgraph` |
| command as formatted by the -C/--nocolor and -q/--quiet command line |
| options. The contents should roughly resemble: |
| ``` |
| app-editors/vim-8.1.1486: |
| [ 0] app-editors/vim-8.1.1486 |
| [ 1] app-eselect/eselect-vi-1.1.9 |
| ``` |
| |
| Returns: |
| List[str]: A list of package CPVs parsed from the command output. |
| """ |
| equery_output_regex = r"\[[\d ]+\]\s*([^\s]+)" |
| return re.findall(equery_output_regex, equery_output) |
| |
| |
| def GetReverseDependencies( |
| packages: List[str], |
| sysroot: Union[str, os.PathLike] = "/", |
| chroot: Optional[chroot_lib.Chroot] = None, |
| indirect: bool = False, |
| ) -> List[Optional[package_info.PackageInfo]]: |
| """List all reverse dependencies for the given list of packages. |
| |
| Args: |
| packages: Packages with optional category, version, and slot. |
| sysroot: The root directory being inspected. |
| chroot: The chroot to work with. |
| indirect: If True, search for both the direct and indirect dependencies |
| on the specified packages. |
| |
| Returns: |
| List[package_info.PackageInfo]: Packages that depend on the given |
| packages. |
| |
| Raises: |
| ValueError when no packages are provided. |
| cros_build_lib.RunCommandError when the equery depends command errors. |
| """ |
| if not packages: |
| raise ValueError("Must provide at least one package.") |
| |
| sysroot = Path(sysroot) |
| |
| args = [] |
| if indirect: |
| args += ["--indirect"] |
| args += packages |
| |
| result = _Equery( |
| "depends", |
| *args, |
| sysroot=str(sysroot), |
| chroot=chroot, |
| check=False, |
| ) |
| return [package_info.parse(x) for x in result.stdout.strip().splitlines()] |
| |
| |
| def _Qlist( |
| args: List[str], |
| board: Optional[str] = None, |
| buildroot: BuildrootType = constants.SOURCE_ROOT, |
| ) -> cros_build_lib.CompletedProcess: |
| """Run qlist with the given args. |
| |
| Args: |
| args: The qlist arguments. |
| board: The board to inspect. |
| buildroot: Source root to find overlays. |
| |
| Returns: |
| The command result. |
| """ |
| cmd = [_GetSysrootTool("qlist", board=board)] |
| # Simplify output. |
| cmd += ["-Cq"] |
| cmd += args |
| |
| return cros_build_lib.run( |
| cmd, |
| enter_chroot=True, |
| capture_output=True, |
| check=False, |
| encoding="utf-8", |
| cwd=buildroot, |
| ) |
| |
| |
| def get_installed_packages(board=None) -> List[str]: |
| """Get the installed packages.""" |
| return _Qlist(["-Iv"], board).stdout.splitlines() |
| |
| |
| def GetInstalledPackageUseFlags( |
| pkg_str, board=None, buildroot=constants.SOURCE_ROOT |
| ): |
| """Gets the list of USE flags for installed packages matching |pkg_str|. |
| |
| Args: |
| pkg_str: The package name with optional category, version, and slot. |
| board: The board to inspect. |
| buildroot: Source root to find overlays. |
| |
| Returns: |
| A dictionary with the key being a package CP and the value being the |
| list of USE flags for that package. |
| """ |
| result = _Qlist(["-U", pkg_str], board, buildroot) |
| use_flags = {} |
| if result.returncode == 0: |
| for line in result.stdout.splitlines(): |
| tokens = line.split() |
| use_flags[tokens[0]] = tokens[1:] |
| |
| return use_flags |
| |
| |
| def GetBinaryPackageDir(sysroot="/", packages_dir=None): |
| """Returns the binary package directory of |sysroot|.""" |
| dir_name = packages_dir if packages_dir else "packages" |
| return os.path.join(sysroot, dir_name) |
| |
| |
| def GetBinaryPackagePath(c, p, v, sysroot="/", packages_dir=None): |
| """Returns the path to the binary package. |
| |
| Args: |
| c: category. |
| p: package. |
| v: version. |
| sysroot: The root being inspected. |
| packages_dir: Name of the packages directory in |sysroot|. |
| |
| Returns: |
| The path to the binary package. |
| """ |
| pkgdir = GetBinaryPackageDir(sysroot=sysroot, packages_dir=packages_dir) |
| path = os.path.join(pkgdir, c, "%s-%s.tbz2" % (p, v)) |
| if not os.path.exists(path): |
| raise ValueError("Cannot find the binary package %s!" % path) |
| |
| return path |
| |
| |
| def GetBoardUseFlags(board: str) -> List[str]: |
| """Returns a list of USE flags in effect for a board.""" |
| return sorted( |
| build_query.Query(build_query.Board) |
| .filter(lambda x: x.name == board) |
| .one() |
| .use_flags |
| ) |
| |
| |
| def _EmergeBoard( |
| package: str, |
| board: Optional[str] = None, |
| sysroot: Optional[Union[str, os.PathLike]] = None, |
| buildroot: BuildrootType = constants.SOURCE_ROOT, |
| set_empty_root: bool = False, |
| ) -> cros_build_lib.CompletedProcess: |
| """Call emerge board to get dependences of package. |
| |
| Args: |
| board: The board to inspect. |
| sysroot: The root directory being inspected. |
| package: The package name with optional category, version, and slot. |
| buildroot: Source root to find overlays. |
| set_empty_root: Set the --root argument to /mnt/empty. This is a |
| workaround for an issue for portage versions before 2.3.75. |
| |
| Returns: |
| result (cros_build_lib.CompletedProcess) |
| """ |
| emerge = _GetSysrootTool("emerge", board=board, sysroot=sysroot) |
| cmd = [emerge, "-p", "--cols", "--quiet", "-e"] |
| if set_empty_root: |
| cmd += ["--root", "/mnt/empty"] |
| cmd.append(package) |
| |
| return cros_build_lib.run( |
| cmd, |
| cwd=buildroot, |
| enter_chroot=True, |
| capture_output=True, |
| encoding="utf-8", |
| ) |
| |
| |
| def GetPackageDependencies( |
| package: str, |
| board: Optional[str] = None, |
| sysroot: Optional[Union[str, os.PathLike]] = None, |
| buildroot: BuildrootType = constants.SOURCE_ROOT, |
| set_empty_root: bool = False, |
| ) -> List[str]: |
| """Returns the depgraph list of packages for a board and package.""" |
| output = _EmergeBoard( |
| package, board, sysroot, buildroot, set_empty_root |
| ).stdout.splitlines() |
| packages = [] |
| for line in output: |
| # The first column is ' NRfUD ' |
| columns = line[7:].split() |
| try: |
| package = columns[0] + "-" + columns[1] |
| packages.append(package) |
| except IndexError: |
| logging.error("Wrong format of output: \n%r", output) |
| raise |
| |
| return packages |
| |
| |
| def GetRepositoryFromEbuildInfo(info): |
| """Parse output of the result of `ebuild <ebuild_path> info` |
| |
| This command should return output that looks a lot like: |
| CROS_WORKON_SRCDIR=("/mnt/host/source/src/platform2") |
| CROS_WORKON_PROJECT=("chromiumos/platform2") |
| """ |
| srcdir_match = re.search( |
| r'^CROS_WORKON_SRCDIR=(\(["\'].*["\']\))$', info, re.MULTILINE |
| ) |
| project_match = re.search( |
| r'^CROS_WORKON_PROJECT=(\(["\'].*["\']\))$', info, re.MULTILINE |
| ) |
| if not srcdir_match or not project_match: |
| return None |
| |
| srcdirs = ParseBashArray(srcdir_match.group(1)) |
| projects = ParseBashArray(project_match.group(1)) |
| if len(srcdirs) != len(projects): |
| return None |
| |
| return [ |
| RepositoryInfoTuple(srcdir, project) |
| for srcdir, project in zip(srcdirs, projects) |
| ] |
| |
| |
| def _EbuildInfo( |
| ebuild_path: str, sysroot: str |
| ) -> cros_build_lib.CompletedProcess: |
| """Get ebuild info for <ebuild_path>. |
| |
| Args: |
| ebuild_path: string full path to ebuild file. |
| sysroot: The root directory being inspected. |
| |
| Returns: |
| result (cros_build_lib.CompletedProcess) |
| """ |
| cmd = (_GetSysrootTool("ebuild", sysroot=sysroot), ebuild_path, "info") |
| return cros_build_lib.dbg_run( |
| cmd, capture_output=True, check=False, encoding="utf-8" |
| ) |
| |
| |
| def GetRepositoryForEbuild(ebuild_path, sysroot): |
| """Get parsed output of `ebuild <ebuild_path> info` |
| |
| ebuild ... info runs the pkg_info step of an ebuild. |
| cros-workon.eclass defines that step and prints both variables. |
| |
| Args: |
| ebuild_path: string full path to ebuild file. |
| sysroot: The root directory being inspected. |
| |
| Returns: |
| list of RepositoryInfoTuples. |
| """ |
| result = _EbuildInfo(ebuild_path, sysroot) |
| return GetRepositoryFromEbuildInfo(result.stdout) |
| |
| |
| def CleanOutdatedBinaryPackages( |
| sysroot: Union[str, os.PathLike], |
| deep: bool = True, |
| exclusion_file: Optional[Union[str, os.PathLike]] = None, |
| ) -> cros_build_lib.CompletedProcess: |
| """Cleans outdated binary packages from |sysroot|. |
| |
| Args: |
| sysroot: The root directory being inspected. |
| deep: If set to True, keep minimal files for reinstallation by examining |
| vartree for installed packages. If set to False, use porttree, which |
| contains every ebuild in the tree, to determine which binpkgs to |
| clean. |
| exclusion_file: Path to the exclusion file. |
| |
| Returns: |
| result (cros_build_lib.CompletedProcess) |
| |
| Raises: |
| cros_build_lib.RunCommandError |
| """ |
| sysroot = Path(sysroot) |
| if exclusion_file: |
| exclusion_file = Path(exclusion_file) |
| |
| cmd = [_GetSysrootTool("eclean", sysroot=str(sysroot))] |
| if deep: |
| cmd += ["-d"] |
| if exclusion_file: |
| cmd += ["-e", str(exclusion_file)] |
| cmd += ["packages"] |
| return cros_build_lib.run(cmd) |
| |
| |
| def _CheckHasTest(cp, sysroot, require_workon: bool = False) -> None: |
| """Checks if the ebuild for |cp| has tests. |
| |
| Args: |
| cp: A portage package in the form category/package_name. |
| sysroot: Path to the sysroot. |
| require_workon: Whether to only test workon packages. |
| |
| Returns: |
| |cp| if the ebuild for |cp| defines a test stanza, None otherwise. |
| |
| Raises: |
| raise failures_lib.PackageBuildFailure if FindEbuildForPackage |
| raises a RunCommandError |
| """ |
| try: |
| path = FindEbuildForPackage(cp, sysroot, check=True) |
| except cros_build_lib.RunCommandError as e: |
| logging.error("FindEbuildForPackage error %s", e) |
| raise failures_lib.PackageBuildFailure(e, "equery", [cp]) |
| use_flags = ["test"] |
| if sysroot == "/": |
| use_flags += ["cros_host"] |
| ebuild = EBuild( |
| path, False, use_flags, cache_file=get_cache_file(Path(path)) |
| ) |
| if require_workon and not ebuild.is_workon: |
| return None |
| elif ebuild.has_test: |
| return cp |
| return None |
| |
| |
| def PackagesWithTest( |
| sysroot, packages: Iterable[str], require_workon: bool = False |
| ) -> Set[str]: |
| """Returns the subset of |packages| that have unit tests. |
| |
| Args: |
| sysroot: Path to the sysroot. |
| packages: List of packages to filter. |
| require_workon: Whether to only test workon packages. |
| |
| Returns: |
| The subset of |packages| that defines unit tests. |
| """ |
| inputs = [(cp, sysroot, require_workon) for cp in packages] |
| pkg_with_test = set(parallel.RunTasksInProcessPool(_CheckHasTest, inputs)) |
| |
| # CheckHasTest will return None for packages that do not have tests. We can |
| # discard that value. |
| pkg_with_test.discard(None) |
| return pkg_with_test |
| |
| |
| def get_die_hook_status_file() -> Path: |
| """Get the die hook status file path.""" |
| return ( |
| Path(path_util.FromChrootPath("/tmp")) |
| / constants.DIE_HOOK_STATUS_FILE_NAME |
| ) |
| |
| |
| def ParseDieHookStatusFile() -> List[package_info.PackageInfo]: |
| """Parse the status file generated by the failed packages die_hook |
| |
| Returns: |
| Packages that failed in the build attempt. |
| """ |
| status_file = get_die_hook_status_file() |
| if not status_file.exists(): |
| return [] |
| |
| failed_pkgs = [] |
| for line in status_file.read_text(encoding="utf-8").splitlines(): |
| if not line.strip(): |
| continue |
| cpv, _phase = line.split() |
| failed_pkgs.append(package_info.parse(cpv)) |
| |
| return failed_pkgs |
| |
| |
| def HasPrebuilt(atom, board=None, extra_env=None): |
| """Check if the atom's best visible version has a prebuilt available.""" |
| cmd = [ |
| os.path.join( |
| constants.CHROOT_SOURCE_ROOT, "chromite", "scripts", "has_prebuilt" |
| ), |
| atom, |
| ] |
| if board: |
| cmd += ["--build-target", board] |
| |
| if logging.getLogger().isEnabledFor(logging.DEBUG): |
| cmd += ["--debug"] |
| |
| with osutils.TempDir() as tempdir: |
| output_file = os.path.join(tempdir, "has_prebuilt.json") |
| cmd += ["--output", output_file] |
| result = cros_build_lib.run( |
| cmd, enter_chroot=True, extra_env=extra_env, check=False |
| ) |
| |
| if result.returncode: |
| logging.warning( |
| "Error when checking for prebuilts: %s", result.stderr |
| ) |
| return False |
| |
| raw = osutils.ReadText(output_file) |
| logging.debug("Raw result: %s", raw) |
| prebuilts = json.loads(raw) |
| |
| if atom not in prebuilts: |
| logging.warning("%s not found in has_prebuilt output.", atom) |
| return False |
| |
| return prebuilts[atom] |
| |
| |
| class PortageqError(Error): |
| """Portageq command error.""" |
| |
| |
| def _Portageq( |
| command: List[str], |
| board: Optional[str] = None, |
| sysroot: Optional[str] = None, |
| chroot: Optional[chroot_lib.Chroot] = None, |
| **kwargs, |
| ) -> cros_build_lib.CompletedProcess: |
| """Run a portageq command. |
| |
| Args: |
| command: Portageq command to run excluding portageq. |
| board: Specific board to query. |
| sysroot: The sysroot to query. |
| chroot: Chroot to work with. |
| **kwargs: Additional run arguments. |
| |
| Returns: |
| cros_build_lib.CompletedProcess |
| |
| Raises: |
| cros_build_lib.RunCommandError |
| """ |
| if "capture_output" not in kwargs: |
| kwargs.setdefault("stdout", True) |
| kwargs.setdefault("stderr", True) |
| kwargs.setdefault("cwd", constants.SOURCE_ROOT) |
| kwargs.setdefault("debug_level", logging.DEBUG) |
| kwargs.setdefault("encoding", "utf-8") |
| |
| chroot = chroot or chroot_lib.Chroot() |
| portageq = _GetSysrootTool("portageq", board, sysroot) |
| return chroot.run([portageq] + command, **kwargs) |
| |
| |
| def PortageqBestVisible( |
| atom: str, |
| board: Optional[str] = None, |
| sysroot: Optional[str] = None, |
| pkg_type: str = "ebuild", |
| cwd: Optional[str] = None, |
| chroot: Optional[chroot_lib.Chroot] = None, |
| ) -> package_info.PackageInfo: |
| """Get the best visible ebuild CPV for the given atom. |
| |
| Args: |
| atom: Portage atom. |
| board: Board to look at. By default, look in chroot. |
| sysroot: The sysroot to query. |
| pkg_type: Package type (ebuild, binary, or installed). |
| cwd: Path to use for the working directory for run. |
| chroot: Chroot to work with. |
| |
| Returns: |
| The parsed package information, which may be empty. |
| |
| Raises: |
| NoVisiblePackageError when no version of the package can be found. |
| """ |
| if sysroot is None: |
| sysroot = build_target_lib.get_default_sysroot_path(board) |
| cmd = ["best_visible", sysroot, pkg_type, atom] |
| try: |
| result = _Portageq( |
| cmd, board=board, sysroot=sysroot, cwd=cwd, chroot=chroot |
| ) |
| except cros_build_lib.RunCommandError as e: |
| logging.error(e) |
| raise NoVisiblePackageError( |
| f'No best visible package for "{atom}" could be found.' |
| ) from e |
| |
| return package_info.parse(result.stdout.strip()) |
| |
| |
| def PortageqEnvvar( |
| variable, |
| board=None, |
| sysroot=None, |
| allow_undefined=False, |
| chroot: Optional[chroot_lib.Chroot] = None, |
| ): |
| """Run portageq envvar for a single variable. |
| |
| Like PortageqEnvvars, but returns the value of the single variable rather |
| than a mapping. |
| |
| Args: |
| variable: str - The variable to retrieve. |
| board: str|None - See PortageqEnvvars. |
| sysroot: The sysroot to query. |
| allow_undefined: bool - See PortageqEnvvars. |
| chroot: Chroot to work with. |
| |
| Returns: |
| str - The value retrieved from portageq envvar. |
| |
| Raises: |
| See PortageqEnvvars. |
| TypeError when variable is not a valid type. |
| ValueError when variable is empty. |
| """ |
| if not isinstance(variable, str): |
| raise TypeError("Variable must be a string.") |
| elif not variable: |
| raise ValueError("Variable must not be empty.") |
| |
| result = PortageqEnvvars( |
| [variable], |
| board=board, |
| sysroot=sysroot, |
| allow_undefined=allow_undefined, |
| chroot=chroot, |
| ) |
| return result.get(variable) |
| |
| |
| def PortageqEnvvars( |
| variables: List[str], |
| board: Optional[str] = None, |
| sysroot: Optional[str] = None, |
| allow_undefined: bool = False, |
| chroot: Optional[chroot_lib.Chroot] = None, |
| ) -> Dict[str, str]: |
| """Run portageq envvar for the given variables. |
| |
| Args: |
| variables: Variables to query. |
| board: Specific board to query. |
| sysroot: The sysroot to query. |
| allow_undefined: True to quietly allow empty strings when the variable |
| is undefined. False to raise an error. |
| chroot: Chroot to work with. |
| |
| Returns: |
| Variable to envvar value mapping for each of the |variables|. |
| |
| Raises: |
| TypeError if variables is a string. |
| PortageqError when a variable is undefined and not allowed to be. |
| cros_build_lib.RunCommandError when the command does not run |
| successfully. |
| """ |
| if isinstance(variables, str): |
| raise TypeError( |
| "Variables must not be a string. " |
| "See PortageqEnvvar for single variable support." |
| ) |
| |
| if not variables: |
| return {} |
| |
| try: |
| result = _Portageq( |
| ["envvar", "-v"] + variables, |
| board=board, |
| sysroot=sysroot, |
| chroot=chroot, |
| ) |
| output = result.stdout |
| except cros_build_lib.RunCommandError as e: |
| if e.returncode != 1: |
| # Actual error running command, raise. |
| raise e |
| elif not allow_undefined: |
| # Error for undefined variable. |
| raise PortageqError(f"One or more variables undefined: {e.stdout}") |
| else: |
| # Undefined variable but letting it slide. |
| output = e.stdout |
| |
| return key_value_store.LoadData(output, multiline=True) |
| |
| |
| def PortageqHasVersion( |
| category_package: str, |
| board: Optional[str] = None, |
| sysroot: Optional[str] = None, |
| chroot: Optional[chroot_lib.Chroot] = None, |
| ) -> bool: |
| """Run portageq has_version. |
| |
| Args: |
| category_package: The atom whose version is to be verified. |
| board: Specific board to query. |
| sysroot: Root directory to consider. |
| chroot: Chroot to work with. |
| |
| Returns: |
| bool |
| |
| Raises: |
| cros_build_lib.RunCommandError when the command fails to run. |
| """ |
| if sysroot is None: |
| sysroot = build_target_lib.get_default_sysroot_path(board) |
| # Exit codes 0/1+ indicate "have"/"don't have". |
| # Normalize them into True/False values. |
| result = _Portageq( |
| ["has_version", os.path.abspath(sysroot), category_package], |
| board=board, |
| sysroot=sysroot, |
| check=False, |
| chroot=chroot, |
| ) |
| return not result.returncode |
| |
| |
| def PortageqMatch( |
| atom: str, |
| board: Optional[str] = None, |
| sysroot: Optional[str] = None, |
| chroot: Optional[chroot_lib.Chroot] = None, |
| ): |
| """Run portageq match. |
| |
| Find the full category/package-version for the specified atom. |
| |
| Args: |
| atom: Portage atom. |
| board: Specific board to query. |
| sysroot: The sysroot to query. |
| chroot: Chroot to work with. |
| |
| Returns: |
| package_info.PackageInfo|None |
| """ |
| if sysroot is None: |
| sysroot = build_target_lib.get_default_sysroot_path(board) |
| result = _Portageq( |
| ["match", sysroot, atom], board=board, sysroot=sysroot, chroot=chroot |
| ) |
| return package_info.parse(result.stdout.strip()) if result.stdout else None |
| |
| |
| class PackageNotFoundError(Error): |
| """Error indicating that the package asked for was not found.""" |
| |
| |
| def GenerateInstalledPackages(db, root, packages): |
| """Generate a sequence of installed package objects from package names.""" |
| for package in packages: |
| category, pv = package.split("/") |
| installed_package = db.GetInstalledPackage(category, pv) |
| if not installed_package: |
| raise PackageNotFoundError( |
| "Unable to locate package %s in %s" % (package, root) |
| ) |
| yield installed_package |
| |
| |
| class PackageSizes(NamedTuple): |
| """Size of an installed package on disk.""" |
| |
| apparent_size: int |
| disk_utilization_size: int |
| |
| |
| def GeneratePackageSizes(db, root, installed_packages): |
| """Collect package sizes and generate package size pairs. |
| |
| Yields: |
| (str, int): A pair of cpv and total package size. |
| """ |
| visited_cpvs = set() |
| for installed_package in installed_packages: |
| package_cpv = "%s/%s/%s" % ( |
| installed_package.category, |
| installed_package.package, |
| installed_package.version, |
| ) |
| |
| assert package_cpv not in visited_cpvs |
| visited_cpvs.add(package_cpv) |
| |
| if not installed_package: |
| raise PackageNotFoundError( |
| "Unable to locate installed_package %s in %s" |
| % (package_cpv, root) |
| ) |
| package_filesize = CalculatePackageSize( |
| installed_package.ListContents(), db.package_install_path |
| ) |
| logging.debug( |
| "%s installed_package size is %d", |
| package_cpv, |
| package_filesize.apparent_size, |
| ) |
| yield package_cpv, package_filesize.apparent_size |
| |
| |
| def CalculatePackageSize( |
| files_in_package: Iterable[Tuple[str, str]], |
| package_install_path: os.PathLike, |
| ) -> PackageSizes: |
| """Given a fileset for a Portage package, calculate the package size. |
| |
| This function provides the size of the given package on disk in both |
| apparent size and disk utilization. It is provided in bytes (not blocks). |
| |
| By implication, these two figures may be different when files are very |
| small, or when the apparent size is much larger than allocated blocks on |
| disk (as is the case with sparse files). |
| |
| This function also only calculates the size of package files which are |
| actual OBJ-type files listed in the CONTENTS file for the package in the |
| package db. As such, the storage needs for any existing symlinks or relevant |
| inodes is not accounted for in this function. |
| |
| Args: |
| files_in_package: A list of file information for all files installed by |
| a single package. |
| package_install_path: The path prefix for the installation location. |
| |
| Returns: |
| A PackageSizes object containing the calculated apparent size and disk |
| utilization. |
| """ |
| total_apparent_size = 0 |
| total_disk_utilization = 0 |
| for content_type, path in files_in_package: |
| if content_type == InstalledPackage.OBJ: |
| filename = os.path.join(package_install_path, path) |
| try: |
| file_details = os.lstat(filename) |
| apparent_size = file_details.st_size |
| disk_utilization = file_details.st_blocks * 512 |
| except FileNotFoundError as e: |
| logging.warning( |
| "unable to compute the size of %s because the file " |
| "doesn't exist (skipping): %s", |
| filename, |
| e, |
| ) |
| continue |
| except PermissionError as e: |
| logging.warning( |
| "cannot compute size of %s due to inadequate " |
| "permissions (skipping): %s", |
| filename, |
| e, |
| ) |
| continue |
| except OSError as e: |
| logging.warning( |
| "unable to compute the size of %s (skipping): %s", |
| filename, |
| e, |
| ) |
| continue |
| logging.debug( |
| "apparent size of %s = %d bytes, du = %d bytes", |
| filename, |
| apparent_size, |
| disk_utilization, |
| ) |
| total_apparent_size += apparent_size |
| total_disk_utilization += disk_utilization |
| return PackageSizes( |
| apparent_size=total_apparent_size, |
| disk_utilization_size=total_disk_utilization, |
| ) |
| |
| |
| def UpdateEbuildManifest( |
| ebuild_path: Union[str, os.PathLike], |
| chroot: chroot_lib.Chroot, |
| ) -> cros_build_lib.CompletedProcess: |
| """Updates the ebuild manifest for the provided ebuild path. |
| |
| Args: |
| ebuild_path: The outside-chroot absolute path to the ebuild. |
| chroot: The chroot to enter. |
| |
| Returns: |
| The command result. |
| """ |
| |
| chroot_ebuild_path = chroot.chroot_path(ebuild_path) |
| command = ["ebuild", chroot_ebuild_path, "manifest", "--force"] |
| return chroot.run(command) |
| |
| |
| def EbuildManifestFileHash( |
| ebuild_dir: Union[str, os.PathLike], |
| archive_file: str, |
| hash_type: str, |
| entry_type: str = "DIST", |
| ) -> str: |
| manifest_path = os.path.join(ebuild_dir, "Manifest") |
| manifest_content = osutils.ReadText(manifest_path) |
| for line in manifest_content.splitlines(): |
| # The general form of each line is: |
| # <type> <filename> <size> <hash-type> <hash> [<hash-type> <hash> ...] |
| # Example: "DIST <filename> <size> BLAKE2B <hash> SHA512 <hash>" |
| fields = line.split() |
| if fields[0] != entry_type or fields[1] != archive_file: |
| continue |
| for found_type, digest in zip(fields[3::2], fields[4::2]): |
| if found_type == hash_type: |
| return digest |
| |
| # Log the entire manifest file content, since this is rare. |
| logging.error( |
| 'no %s hash for %s (type %s) in "%s": file content:\n%s', |
| hash_type, |
| archive_file, |
| entry_type, |
| manifest_path, |
| manifest_content, |
| ) |
| raise ValueError("unexpected Manifest file content") |
| |
| |
| def read_depgraph_counters( |
| content: str, combine: bool = True |
| ) -> Union[List[Dict[str, Any]], Dict[str, Any]]: |
| """Transform contents of /var/log/portage/depgraph_counters.log to JSON. |
| |
| We can expect each line of this file to be a stringified dict. Convert back |
| to json to make querying easier. We also combine attributes by default when |
| multiple lines are present, since the primary use case is to add this data |
| to OpenTelemetry spans, which don't accept arbitrary dicts. |
| |
| Args: |
| content: The log file contents |
| combine: Whether to combine multiple entries into one dict. |
| """ |
| if not content: |
| return {} |
| |
| def add(a: Dict[str, Any], b: Dict[str, Any]): |
| # We make a (possibly faulty) assumption that the inputs are |
| # well-formed. |
| # All values are either int, str, or list(str), and that the data type |
| # per key is consistent. All these items should correctly use the + |
| # operator without issue. |
| # Dict members that aren't present in both inputs are dropped in the |
| # intersection operation, so we can safely expect no key errors. |
| return dict((k, a[k] + b[k]) for k in set(b) & set(a)) |
| |
| results = [] |
| for line in content.splitlines(): |
| results.append(json.loads(line)) |
| if combine: |
| r = results[0] |
| for i in range(1, len(results)): |
| r = add(r, results[i]) |
| return r |
| return results |
| |
| |
| def write_depgraph_counters_to_span(span): |
| try: |
| contents = osutils.ReadFile(constants.PORTAGE_DEPGRAPH_COUNTERS_LOG) |
| except FileNotFoundError: |
| return |
| if contents: |
| transformed = read_depgraph_counters(contents) |
| span.set_attributes( |
| {f"depgraph_counters_{k}": v for k, v in transformed.items()} |
| ) |