blob: 89f52b223456c9c19abbc3d1d8d6bf4c19b0b7f3 [file] [log] [blame]
# Copyright 2023 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Python API for `cros query`."""
from __future__ import annotations
import abc
import enum
import functools
import logging
from pathlib import Path
from typing import (
Any,
Callable,
Dict,
Iterator,
List,
Optional,
Set,
Tuple,
Type,
Union,
)
from chromite.lib import constants
from chromite.lib import portage_util
from chromite.lib.parser import package_info
from chromite.utils import key_value_store
from chromite.utils.parser import make_defaults
from chromite.utils.parser import portage_profile_conf
# We use docstrings in this file frequently for property documentation, which
# doesn't follow the traditional format of leaving the second line blank used
# with traditional functions/methods.
# pylint: disable=docstring-second-line-blank
class Stability(enum.Enum):
"""Enumeration for package stability.
UNSPECIFIED: The stability for this architecture was not listed in KEYWORDS.
STABLE: The package is well tested on this architecture.
UNSTABLE: The package may not have complete testing on this architecture.
BAD: The package has known issues with this architecture.
"""
UNSPECIFIED = enum.auto()
STABLE = enum.auto()
UNSTABLE = enum.auto()
BAD = enum.auto()
class QueryTarget(abc.ABC):
"""Abstract base class for all query targets."""
@classmethod
@abc.abstractmethod
def find_all(
cls,
board: Optional[str] = None,
overlays: str = constants.BOTH_OVERLAYS,
) -> Iterator[QueryTarget]:
"""Find all instances of this target.
Args:
board: Limit results to only those relevant to a named
board.
overlays: Limit overlays to the specified type.
Yields:
Instances of this class.
"""
def tree(self) -> Iterator[QueryTarget]:
"""Yield any relevant children for the tree-view from the CLI."""
yield from ()
class Overlay(QueryTarget):
"""An overlay, e.g., src/third_party/chromiumos-overlay."""
def __init__(self, path):
self.path = Path(path)
@classmethod
def find_all(
cls, board=None, overlays=constants.BOTH_OVERLAYS
) -> Iterator[Overlay]:
for overlay_path in portage_util.FindOverlays(overlays, board=board):
yield cls(overlay_path)
def tree(self):
yield from self.parents
@functools.cached_property
def layout_conf(self) -> Dict[str, str]:
"""The layout.conf variables."""
return key_value_store.LoadFile(
self.path / "metadata" / "layout.conf", ignore_missing=True
)
@functools.cached_property
def parents(self) -> List[Overlay]:
"""The Portage masters of this overlay. Note the COIL rename."""
all_overlays = _get_all_overlays_by_name()
parents = []
for name in self.layout_conf.get("masters", "").split():
if name:
parents.append(all_overlays[name])
# portage_util implicitly adds chromeos-overlay and chromeos-*-overlay
# if found. Mimic that here by considering them implicit parents of
# board-level overlays.
if self.board_name:
if "chromeos" in all_overlays:
parents.append(all_overlays["chromeos"])
for path in (
constants.SOURCE_ROOT / "src" / "private-overlays"
).glob("chromeos-*-overlay"):
parents.append(Overlay(path))
return parents
@functools.cached_property
def name(self) -> str:
"""The repo-name in metadata/layout.conf."""
return portage_util.GetOverlayName(self.path)
@property
def is_private(self) -> bool:
"""True if the overlay appears to be private, false otherwise."""
return self.name.endswith("-private")
@property
def board_name(self) -> Optional[str]:
"""If this overlay is a top-level overlay for a board, the name of that
board. Otherwise, this is None.
"""
if not self.path.name.startswith("overlay-"):
return None
if self.is_private:
board_name, _, _ = self.name.rpartition("-")
return board_name
return self.name
@property
def profiles_dir(self) -> Path:
"""The profiles directory for this overlay."""
return self.path / "profiles"
@property
def metadata_dir(self) -> Path:
"""The metadata directory for this overlay."""
return self.path / "metadata"
@property
def md5_cache_dir(self) -> Path:
"""The md5-cache directory for this overlay."""
return self.metadata_dir / "md5-cache"
@functools.cached_property
def profiles(self) -> List[Profile]:
"""A list of all profiles defined in this overlay."""
if not self.profiles_dir.is_dir():
return []
def _scan_profiles(path):
is_profile = False
for ent in path.iterdir():
if ent.is_dir():
yield from _scan_profiles(ent)
elif ent.is_file():
if ent.name in (
"parent",
"eapi",
"deprecated",
"make.defaults",
"packages",
"packages.build",
"packages.mask",
"package.provided",
"package.use",
"use.mask",
"use.force",
"use.stable.mask",
"use.stable.force",
"package.use.mask",
"package.use.force",
"package.use.stable.mask",
"package.use",
):
is_profile = True
if is_profile:
yield Profile(
str(path.relative_to(self.profiles_dir)), path, self
)
return list(_scan_profiles(self.profiles_dir))
def get_profile(self, name: Union[Path, str]) -> Optional[Profile]:
"""Get a specific profile by name.
Args:
name: The name of the profile (e.g., "base").
Returns:
The Profile object with this name, or None if this profile does not
exist.
"""
profile_dir = self.path / "profiles" / name
if profile_dir.is_dir():
return Profile(str(name), profile_dir, self)
return None
@functools.cached_property
def ebuilds(self) -> List[Ebuild]:
"""A list of all ebuilds in this overlay."""
ebuilds = []
for ebuild_path in self.path.glob("*/*/*.ebuild"):
ebuilds.append(Ebuild(ebuild_file=ebuild_path, overlay=self))
return ebuilds
@functools.cached_property
def make_conf_vars(self) -> Dict[str, str]:
"""The variables defined in make.conf."""
make_conf_path = self.path / "make.conf"
if make_conf_path.is_file():
contents = make_conf_path.read_text(encoding="utf-8")
return make_defaults.parse(contents)
return {}
def __repr__(self):
return str(self.path)
def __eq__(self, other):
return self.path == other.path
@functools.lru_cache(maxsize=None)
def _get_all_overlays_by_name() -> Dict[str, Overlay]:
"""Get all overlays, in a dictionary by name.
Returns:
A dictionary mapping overlay names to the Overlay objects.
"""
overlays = {}
for overlay in Overlay.find_all():
overlays[overlay.name] = overlay
return overlays
class Profile(QueryTarget):
"""A portage profile, e.g., chromiumos:base."""
_obj_cache: Dict[Path, Profile] = {}
def __new__(cls, _name: str, path: Path, _overlay: Overlay):
# Caching the construction of profiles prevents a re-parse of
# make.defaults when a profile is inherited multiple times,
# which provides a significant speed-up during many queries.
if path not in cls._obj_cache:
cls._obj_cache[path] = super().__new__(cls)
return cls._obj_cache[path]
def __init__(self, name: str, path: Path, overlay: Overlay):
self.name = name
self.path = path
self.overlay = overlay
@classmethod
def find_all(cls, board=None, overlays=constants.BOTH_OVERLAYS):
for overlay in Overlay.find_all(board=board, overlays=overlays):
yield from overlay.profiles
@functools.cached_property
def make_defaults_vars(self) -> Dict[str, str]:
"""A dictionary of the raw make.defaults variables."""
make_defaults_path = self.path / "make.defaults"
if make_defaults_path.is_file():
contents = make_defaults_path.read_text(encoding="utf-8")
return make_defaults.parse(contents)
return {}
def resolve_var(self, var: str, default: Optional[str] = None) -> Any:
"""Resolve a variable for this profile, similar to how Portage would.
Note: this function resolves variables non-incrementally. For
incremental variables (e.g., USE, USE_EXPAND, etc), use
resolve_var_incremental.
Args:
var: The variable to resolve.
default: What to return if the variable is not set.
Returns:
The resolved variable.
"""
if var in self.make_defaults_vars:
return self.make_defaults_vars[var]
for profile in reversed(self.parents):
result = profile.resolve_var(var)
if result is not None:
return result
return default
def resolve_var_incremental(self, var: str) -> Set[str]:
"""Resolve a variable incrementally, similar to how Portage would.
This will traverse the profiles depth-first, adding tokens which are not
prefixed with a dash, and removing those which are.
Args:
var: The variable to resolve.
Returns:
The resolved variable, as a set of the tokens.
"""
result: Set[str] = set()
# portage_testables creates recursive overlays. We don't technically
# need to track visited overlays for well-formed overlays.
visited_overlays = set()
def _process_tokens(tokens):
for token in tokens:
if not token:
# Variable was unset, empty, or just whitespace.
continue
if token == "-*":
result.clear()
elif token.startswith("-"):
result.discard(token[1:])
else:
result.add(token)
def _rec_profile(profile):
tokens = profile.make_defaults_vars.get(var, "").split()
if "-*" not in tokens:
for parent in profile.parents:
_rec_profile(parent)
_process_tokens(tokens)
def _rec_overlay(overlay):
if overlay.name in visited_overlays:
return
visited_overlays.add(overlay.name)
for parent in overlay.parents:
_rec_overlay(parent)
tokens = overlay.make_conf_vars.get(var, "").split()
_process_tokens(tokens)
_rec_profile(self)
_rec_overlay(self.overlay)
return result
@property
def arch(self) -> str:
"""The machine architecture of this profile."""
return self.resolve_var("ARCH")
def _use_flag_changes(self) -> Tuple[Set[str], Set[str]]:
"""Compute the USE flags changed by this profile.
Returns:
A 2-tuple: the set of flags set, and the flags unset.
"""
flags_set = set()
flags_unset = set()
def _process_flag(flag, prefix=""):
flag_set = True
if not flag:
return
if flag.startswith("-"):
flag = flag[1:]
flag_set = False
flag = prefix + flag
if flag_set:
flags_unset.discard(flag)
flags_set.add(flag)
else:
flags_unset.add(flag)
flags_set.discard(flag)
for flag in self.make_defaults_vars.get("USE", "").split():
_process_flag(flag)
use_expand = self.resolve_var_incremental("USE_EXPAND")
for var in use_expand:
for val in self.make_defaults_vars.get(var.upper(), "").split():
_process_flag(val, prefix=f"{var.lower()}_")
flags_set.update(self.forced_use_flags)
flags_unset.difference_update(self.forced_use_flags)
return flags_set, flags_unset
@property
def use_flags_set(self) -> Set[str]:
"""A set of what USE flags this profile enables."""
flags_set, _ = self._use_flag_changes()
return flags_set
@property
def use_flags_unset(self) -> Set[str]:
"""A set of what USE flags this profile disables."""
_, flags_unset = self._use_flag_changes()
return flags_unset
# TODO(b/236161656): Fix.
# pylint: disable-next=cache-max-size-none
@functools.lru_cache(maxsize=None)
def _parse_conf(self, name: str) -> List[List[str]]:
"""Parse a basic conf file, such as parent, use.mask, or use.force."""
file_path = self.path / name
if not file_path.is_file():
return []
contents = file_path.read_text(encoding="utf-8")
return list(portage_profile_conf.parse(contents))
def _resolve_use_conf(self, name: str) -> Set[str]:
"""Resolve a use.mask or use.force file."""
result = set()
def _rec(profile):
for parent in profile.parents:
_rec(parent)
# pylint: disable=protected-access
for (flag,) in profile._parse_conf(name):
if flag.startswith("-"):
result.discard(flag[1:])
else:
result.add(flag)
_rec(self)
return result
@property
def masked_use_flags(self) -> Set[str]:
"""The resolved set of masked USE flags for this profile."""
return self._resolve_use_conf("use.mask")
@property
def forced_use_flags(self) -> Set[str]:
"""The resolved set of forced USE flags for this profile."""
return self._resolve_use_conf("use.force")
@property
def use_flags(self) -> Set[str]:
"""A set of the fully-resolved USE flags for this profile."""
use_flags = set(self.resolve_var_incremental("USE"))
use_expand = self.resolve_var_incremental("USE_EXPAND")
for var in use_expand:
expansions = self.resolve_var_incremental(var.upper())
for val in expansions:
use_flags.add(f"{var.lower()}_{val}")
use_flags.difference_update(self.masked_use_flags)
use_flags.update(self.forced_use_flags)
return use_flags
@functools.cached_property
def parents(self) -> List[Profile]:
"""A list of the immediate parent profiles of this profile."""
parents = []
for tokens in self._parse_conf("parent"):
if len(tokens) != 1:
logging.warning(
"Profile %r has invalid parent configuration: %r",
self,
tokens,
)
continue
if ":" in tokens[0]:
repo_name, _, profile_name = tokens[0].partition(":")
overlays = _get_all_overlays_by_name()
overlay = overlays.get(repo_name)
if not overlay:
logging.warning(
"Profile %r has parent %r, but %r isn't an overlay.",
self,
tokens[0],
repo_name,
)
continue
profile = overlay.get_profile(profile_name)
else:
path = (self.path / Path(tokens[0])).resolve()
path = path.relative_to(self.overlay.profiles_dir)
profile = self.overlay.get_profile(path)
if not profile:
continue
parents.append(profile)
return parents
def tree(self):
yield from self.parents
def __repr__(self):
return f"{self.overlay.name}:{self.name}"
class Ebuild(QueryTarget):
"""An ebuild, fully qualified by ebuild file.
Note: This implementation relies on the md5-cache files, which are updated
by a builder every 30 minutes. Thus, if you've made changes locally, they
may not be reflected in the query immediately. Run egencache in the overlay
to regenerate these cache files manually, should you require it.
"""
def __init__(self, ebuild_file: Path, overlay: Overlay):
self.ebuild_file = ebuild_file
self.overlay = overlay
@classmethod
def find_all(cls, board=None, overlays=constants.BOTH_OVERLAYS):
for overlay in Overlay.find_all(board=board, overlays=overlays):
yield from overlay.ebuilds
@functools.cached_property
def package_info(self) -> package_info.PackageInfo:
"""The PackageInfo for this ebuild."""
return package_info.parse(self.ebuild_file)
@property
def md5_cache_file(self) -> Path:
"""The path to the md5-cache file for this ebuild."""
return self.overlay.md5_cache_dir / self.package_info.cpvr
@functools.cached_property
def vars(self) -> Dict[str, str]:
"""The raw variables from the md5-cache file."""
if not self.md5_cache_file.is_file():
return {}
result = {}
with open(self.md5_cache_file, encoding="utf-8") as f:
for line in f:
key, _, value = line.partition("=")
result[key] = value.rstrip("\n")
return result
@property
def eapi(self) -> int:
"""The EAPI for the package."""
return int(self.vars.get("EAPI", 0))
@property
def iuse(self) -> Set[str]:
"""A set of the flags in IUSE."""
iuse = self.vars.get("IUSE")
if not iuse:
return set()
result = set()
for var in iuse.split(" "):
if var.startswith("-"):
continue
if var.startswith("+"):
var = var[1:]
result.add(var)
return result
@property
def iuse_default(self) -> Set[str]:
"""A set of the flags enabled by default in IUSE."""
iuse = self.vars.get("IUSE", "")
result = set()
for var in iuse.split(" "):
if var.startswith("+"):
var = var[1:]
result.add(var)
return result
@property
def eclasses(self) -> List[str]:
"""A list of the eclasses inherited by this package and its eclasses."""
eclasses = self.vars.get("_eclasses_")
if not eclasses:
return []
return eclasses.split("\t")[::2]
@property
def keywords(self) -> List[str]:
"""The KEYWORDS of this package."""
keywords = self.vars.get("KEYWORDS")
if not keywords:
return []
return keywords.split()
def get_stability(self, arch: str) -> Stability:
"""Get the stability of this package on a given architecture.
Args:
arch: The architecture to consider for stability.
Returns:
The stability on this architecture.
"""
stability = Stability.UNSPECIFIED
for keyword in self.keywords:
stability = {
arch: Stability.STABLE,
"*": Stability.STABLE,
f"~{arch}": Stability.UNSTABLE,
"~*": Stability.UNSTABLE,
f"-{arch}": Stability.BAD,
"-*": Stability.BAD,
}.get(keyword, stability)
return stability
def __repr__(self):
return f"{self.package_info.cpvr}::{self.overlay.name}"
def __eq__(self, other):
return self.ebuild_file == other.ebuild_file
class Board(QueryTarget):
"""A board, as in what would be passed to setup_board."""
def __init__(
self,
name: str,
private_overlay: Optional[Overlay] = None,
public_overlay: Optional[Overlay] = None,
):
self.name = name
self.private_overlay = private_overlay
self.public_overlay = public_overlay
@classmethod
def find_all(cls, board=None, overlays=constants.BOTH_OVERLAYS):
boards = {}
for overlay in Overlay.find_all(board=board, overlays=overlays):
board_name = overlay.board_name
if not board_name:
continue
if board_name not in boards:
boards[board_name] = Board(board_name)
if overlay.is_private:
boards[board_name].private_overlay = overlay
else:
boards[board_name].public_overlay = overlay
yield from boards.values()
@property
def top_level_overlay(self) -> Optional[Overlay]:
"""The top-level overlay for this board."""
return self.private_overlay or self.public_overlay
@property
def top_level_profile(self) -> Optional[Profile]:
"""The top-level profile for this board."""
if self.top_level_overlay:
return self.top_level_overlay.get_profile("base")
return None
def tree(self):
if self.top_level_profile:
yield self.top_level_profile
@property
def use_flags(self) -> Set[str]:
"""The fully-evaluated USE flags for this board."""
result = {f"board_use_{self.name}"}
if self.top_level_profile:
result.update(self.top_level_profile.use_flags)
return result
@property
def arch(self) -> Optional[str]:
"""The machine architecture of this board."""
if self.top_level_profile:
return self.top_level_profile.arch
return None
@property
def is_variant(self) -> bool:
"""True if this board has another board's top level overlay in its
overlays parents.
"""
if self.top_level_overlay:
for overlay in self.top_level_overlay.parents:
if overlay.board_name and overlay.board_name != self.name:
return True
return False
def __repr__(self):
return self.name
def __eq__(self, other):
return self.name == other.name
class Query:
"""The Python-level interface for cros query.
For example:
board = Query(Board).filter(lambda board: board.name == "volteer").one()
"""
_filters: List[Callable[[QueryTarget], bool]]
def __init__(
self,
target: Type[QueryTarget],
board: Optional[str] = None,
overlays: str = constants.BOTH_OVERLAYS,
):
self._filters = []
self._iter = target.find_all(board=board, overlays=overlays)
self._consumed = False
def filter(self, func: Callable[[QueryTarget], bool]) -> Query:
"""Add a filter to the results."""
if self._consumed:
raise RuntimeError("Filters cannot be added after consumption.")
self._filters.append(func)
return self
def __iter__(self) -> Iterator[QueryTarget]:
"""Iterate through all results."""
if self._consumed:
raise RuntimeError("Query iterator has already been consumed.")
self._consumed = True
for result in self._iter:
if all(x(result) for x in self._filters):
yield result
def one(self) -> QueryTarget:
"""Assert there is exactly one result and return it."""
iterator = iter(self)
result = next(iterator)
try:
next(iterator)
except StopIteration:
return result
raise ValueError("Query returned multiple results.")
def one_or_none(self) -> Optional[QueryTarget]:
"""Return the result if there's exactly one, or None if zero."""
try:
return self.one()
except StopIteration:
return None
def all(self) -> List[QueryTarget]:
"""Return all results in a list."""
return list(self)