| # Copyright 2023 The ChromiumOS Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Methods for reading and building manifests exported by the subtools builder. |
| |
| Loads and interprets subtools export manifests defined by the proto at |
| https://crsrc.org/o/src/config/proto/chromiumos/build/api/subtools.proto |
| """ |
| |
| from pathlib import Path |
| import re |
| import shutil |
| from typing import List, Literal, Optional, Set |
| |
| from chromite.third_party.google import protobuf |
| from chromite.third_party.google.protobuf import text_format |
| |
| import chromite |
| from chromite.api.gen.chromiumos.build.api import subtools_pb2 |
| from chromite.lib import cipd |
| from chromite.lib import osutils |
| from chromite.lib import portage_util |
| |
| |
| logger = chromite.ChromiteLogger.getLogger(__name__) |
| |
| |
| class Error(Exception): |
| """Module base error class.""" |
| |
| def __init__(self, message: str, subtool: object): |
| # TODO(build): Use self.add_note when Python 3.11 is available. |
| super().__init__(f"{message}\nSubtool:\n{subtool}") |
| |
| |
| class ManifestInvalidError(Error): |
| """The contents of the subtool package manifest proto are invalid.""" |
| |
| |
| class ManifestBundlingError(Error): |
| """The subtool could not be bundled.""" |
| |
| |
| # Default glob to find export package manifests under the config_dir. |
| SUBTOOLS_EXPORTS_GLOB = "**/*.textproto" |
| |
| # Valid names. A stricter version of `packageNameRe` in |
| # https://crsrc.org/i/go/src/go.chromium.org/luci/cipd/common/common.go |
| # Diallows slashes and starting with a ".". |
| _PACKAGE_NAME_RE = re.compile(r"^[a-z0-9_\-]+[a-z0-9_\-\.]*$") |
| |
| # Default destination path in the bundle when not specified on a PathMapping. |
| _DEFAULT_DEST = "bin" |
| |
| # Default regex to apply to input paths when bundling. |
| _DEFAULT_STRIP_PREFIX_REGEX = "^.*/" |
| |
| # Default CIPD prefix when unspecified. |
| _DEFAULT_CIPD_PREFIX = "chromiumos/infra/tools" |
| |
| |
| def get_installed_package( |
| query: str, error_context: "Subtool" |
| ) -> portage_util.InstalledPackage: |
| """Returns an InstalledPackage for an installed ebuild.""" |
| packages = portage_util.FindPackageNameMatches(query) |
| if len(packages) != 1: |
| raise ManifestBundlingError( |
| f"Package '{query}' must match exactly one package." |
| f" Matched {len(packages)} -> {packages}.", |
| error_context, |
| ) |
| logger.debug("%s matched %s", query, packages[0]) |
| installed_package = portage_util.PortageDB().GetInstalledPackage( |
| packages[0].category, packages[0].pvr |
| ) |
| if not installed_package: |
| atom = packages[0].atom |
| raise ManifestBundlingError( |
| f"Failed to map {query}=>{atom} to an *installed* package.", |
| error_context, |
| ) |
| return installed_package |
| |
| |
| class Subtool: |
| """A subtool, backed by a .textproto manifest. |
| |
| Attributes: |
| manifest_path: The source .textproto, used for debug output. |
| package: The parsed protobuf message. |
| work_root: Root path in which to build bundles for export. |
| is_valid: Set after validation to indicate an export may be attempted. |
| parse_error: Protobuf parse error held until validation. |
| """ |
| |
| def __init__(self, message: str, path: Path, work_root: Path): |
| """Loads from a .textpoto file contents. |
| |
| Args: |
| message: The contents of the .textproto file. |
| path: The source file (for logging). |
| work_root: Location on disk where packages are built. |
| """ |
| self.manifest_path = path |
| self.package = subtools_pb2.SubtoolPackage() |
| self.work_root = work_root |
| self.is_valid: Optional[bool] = None |
| self.parse_error: Optional[text_format.ParseError] = None |
| |
| # Set of c/p-v-r strings that provided the bundle contents. |
| self._source_ebuilds: Set[str] = set() |
| # Paths bundled, but not yet attributed to a source ebuild. |
| self._unmatched_paths: List[str] = [] |
| |
| try: |
| text_format.Parse(message, self.package) |
| except text_format.ParseError as e: |
| self.parse_error = e |
| |
| @classmethod |
| def from_file(cls, path: Path, work_root: Path) -> "Subtool": |
| """Helper to construct a Subtool from a path on disk.""" |
| return cls(path.read_text(encoding="utf-8"), path, work_root) |
| |
| def __str__(self) -> str: |
| """Debug output; emits the parsed textproto and source filename.""" |
| textproto = text_format.MessageToString(self.package) |
| return ( |
| f"{'=' * 10} {self.manifest_path} {'=' * 10}\n" |
| + textproto |
| + "=" * (len(self.manifest_path.as_posix()) + 22) |
| ) |
| |
| def _work_dir(self) -> Path: |
| """Returns the path under work_root for creating files for export.""" |
| return self.work_root / self.package.name |
| |
| @property |
| def metadata_dir(self) -> Path: |
| """Path holding all work relating specifically to this package.""" |
| return self._work_dir() |
| |
| @property |
| def bundle_dir(self) -> Path: |
| """Path (under metadata) holding files to form the exported bundle.""" |
| return self._work_dir() / "bundle" |
| |
| @property |
| def cipd_package(self) -> str: |
| """Full path to the CIPD package name.""" |
| prefix = ( |
| self.package.cipd_prefix |
| if self.package.HasField("cipd_prefix") |
| else _DEFAULT_CIPD_PREFIX |
| ) |
| return f"{prefix.rstrip('/')}/{self.package.name}" |
| |
| @property |
| def summary(self) -> str: |
| """A one-line summary describing this package.""" |
| return f"{self.package.name} (http://go/cipd/p/{self.cipd_package})" |
| |
| def stamp(self, kind: Literal["bundled", "exported"]) -> Path: |
| """Returns the path to a "stamp" file that tracks export progress.""" |
| return self.metadata_dir / f".{kind}" |
| |
| def clean(self) -> None: |
| """Resets export progress and removes the temporary bundle tree.""" |
| self.stamp("bundled").unlink(missing_ok=True) |
| self.stamp("exported").unlink(missing_ok=True) |
| osutils.RmDir(self.bundle_dir, ignore_missing=True) |
| |
| def bundle(self) -> None: |
| """Collect and bundle files described in `package` in the work dir.""" |
| self._validate() |
| self.clean() |
| self.metadata_dir.mkdir(exist_ok=True) |
| self.bundle_dir.mkdir() |
| logger.notice( |
| "%s: Subtool bundling under %s.", self.package.name, self.bundle_dir |
| ) |
| logger.info(self) |
| file_count = 0 |
| self._source_ebuilds = set() |
| self._unmatched_paths = [] |
| for path in self.package.paths: |
| file_count += self._bundle_mapping(path) |
| logger.notice("%s: Copied %d files.", self.package.name, file_count) |
| # TODO(b/277992359): Lddtree, hashing, licenses. |
| self.stamp("bundled").touch() |
| |
| def export(self, use_production: bool, cipd_path: str) -> None: |
| """Export the bundle, e.g., to cipd.""" |
| self._validate() |
| if not self.stamp("bundled").exists(): |
| raise ManifestBundlingError("Bundling incomplete.", self) |
| self._match_ebuilds() |
| tags = { |
| "builder_source": "sdk_subtools", |
| "ebuild_source": ",".join(self._source_ebuilds), |
| } |
| refs = ["latest"] |
| cipd.CreatePackage( |
| cipd_path, |
| self.cipd_package, |
| self.bundle_dir, |
| tags, |
| refs, |
| service_url=None if use_production else cipd.STAGING_SERVICE_URL, |
| ) |
| self.stamp("exported").touch() |
| |
| def _validate(self) -> None: |
| """Validate fields in the proto.""" |
| if self.is_valid: |
| # Note this does not worry about validity invalidation, e.g., due to |
| # changed disk state since validation. |
| return |
| |
| if self.parse_error: |
| error = ManifestInvalidError( |
| f"ParseError in .textproto: {self.parse_error}", self |
| ) |
| error.__cause__ = self.parse_error |
| raise error |
| |
| if not _PACKAGE_NAME_RE.match(self.package.name): |
| raise ManifestInvalidError( |
| f"Subtool name must match '{_PACKAGE_NAME_RE.pattern}'", self |
| ) |
| if not self.package.paths: |
| raise ManifestInvalidError("At least one path is required", self) |
| |
| # TODO(b/277992359): Validate more proto fields. |
| |
| self.is_valid = True |
| |
| def _copy_into_bundle( |
| self, src: Path, destdir: Path, strip: re.Pattern |
| ) -> int: |
| """Copies a file on disk into the bundling folder. |
| |
| Copies only files (follows symlinks). Ensures files are not clobbered. |
| Returns the number of files copied. |
| """ |
| if not src.is_file(): |
| return 0 |
| |
| # Apply the regex, and ensure the result is not an absolute path. |
| dest = destdir / strip.sub("", src.as_posix()).lstrip("/") |
| if dest.exists(): |
| raise ManifestBundlingError( |
| f"{dest} exists: refusing to copy {src}.", self |
| ) |
| osutils.SafeMakedirs(dest.parent) |
| logger.debug("Copy file %s -> %s.", src, dest) |
| shutil.copy2(src, dest) |
| return 1 |
| |
| def _check_counts(self, file_count: int) -> None: |
| """Raise an error if files violate the manifest spec.""" |
| if file_count > self.package.max_files: |
| raise ManifestBundlingError( |
| f"Max file count ({self.package.max_files}) exceeded.", self |
| ) |
| |
| def _bundle_mapping( |
| self, mapping: subtools_pb2.SubtoolPackage.PathMapping |
| ) -> int: |
| """Bundle files for the provided `mapping`. |
| |
| Returns the number of files matched. |
| """ |
| subdir = mapping.dest if mapping.HasField("dest") else _DEFAULT_DEST |
| destdir = self.bundle_dir / subdir.lstrip("/") |
| strip_prefix_regex = ( |
| mapping.strip_prefix_regex |
| if mapping.HasField("strip_prefix_regex") |
| else _DEFAULT_STRIP_PREFIX_REGEX |
| ) |
| strip = re.compile(strip_prefix_regex) |
| |
| # Any leading '/' must be stripped from the glob (pathlib only supports |
| # relative patterns when matching). Steps below effectively restore it. |
| glob = mapping.input.lstrip("/") |
| |
| file_count = 0 |
| |
| if mapping.ebuild_filter: |
| package = get_installed_package(mapping.ebuild_filter, self) |
| for _file_type, relative_path in package.ListContents(): |
| path = Path(f"/{relative_path}") |
| if not path.match(glob): |
| continue |
| file_count += self._copy_into_bundle(path, destdir, strip) |
| self._check_counts(file_count) |
| if file_count: |
| self._source_ebuilds.add(package.package_info.cpvr) |
| else: |
| for path in Path("/").glob(glob): |
| added_files = self._copy_into_bundle(path, destdir, strip) |
| if not added_files: |
| continue |
| file_count += added_files |
| self._check_counts(file_count) |
| self._unmatched_paths.append(str(path)) |
| |
| if file_count == 0: |
| raise ManifestBundlingError( |
| f"Input field {mapping.input} matched no files.", self |
| ) |
| logger.info("Glob '%s' matched %d files.", mapping.input, file_count) |
| return file_count |
| |
| def _match_ebuilds(self) -> None: |
| """Match up unmatched paths to the package names that provided them.""" |
| if self._unmatched_paths: |
| ebuilds = portage_util.FindPackageNamesForFiles( |
| *self._unmatched_paths |
| ) |
| # Assume all files were matched, and that it is not an error for any |
| # file to not be matched to a package. |
| self._unmatched_paths = [] |
| self._source_ebuilds.update(e.cpvr for e in ebuilds) |
| if len(self._source_ebuilds) != 1: |
| # TODO(b/277992359): Support this with an extra proto field. |
| candidates = sorted(self._source_ebuilds) |
| raise ManifestBundlingError( |
| "Bundle cannot be attributed to exactly one package." |
| f" Candidates: {candidates}", |
| self, |
| ) |
| logger.notice("Contents provided by %s", self._source_ebuilds) |
| |
| |
| class InstalledSubtools: |
| """Wraps the set of subtool manifests installed on the system. |
| |
| Attributes: |
| subtools: Collection of parsed subtool manifests. |
| work_root: Root folder where all packages are bundled. |
| """ |
| |
| def __init__( |
| self, |
| config_dir: Path, |
| work_root: Path, |
| glob: str = SUBTOOLS_EXPORTS_GLOB, |
| ): |
| logger.notice( |
| "Loading subtools from %s/%s with Protobuf library v%s", |
| config_dir, |
| glob, |
| protobuf.__version__, |
| ) |
| self.work_root = work_root |
| self.subtools = [ |
| Subtool.from_file(f, work_root) for f in config_dir.glob(glob) |
| ] |
| |
| def bundle_all(self) -> None: |
| """Read .textprotos and bundle blobs into `work_root`.""" |
| self.work_root.mkdir(exist_ok=True) |
| for subtool in self.subtools: |
| subtool.bundle() |
| |
| def export( |
| self, use_production: bool, export_filter: Optional[List[str]] = None |
| ) -> None: |
| """Read .textprotos and export valid bundles in `work_root`. |
| |
| Args: |
| use_production: Whether to export to production environments. |
| export_filter: If provided, only export subtools with these names. |
| """ |
| cipd_path = cipd.GetCIPDFromCache() |
| for subtool in self.subtools: |
| if export_filter is None or subtool.package.name in export_filter: |
| subtool.export(use_production, cipd_path) |