| # Copyright 2015 The ChromiumOS Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Utilities for setting up and cleaning up the chroot environment.""" |
| |
| from __future__ import annotations |
| |
| import ast |
| import collections |
| import dataclasses |
| import functools |
| import grp |
| import io |
| import logging |
| import os |
| from pathlib import Path |
| import pwd |
| import re |
| import resource |
| import shutil |
| import sys |
| from typing import Any, List, Optional, Set, Union |
| import urllib.parse |
| import urllib.request |
| |
| from chromite.lib import build_target_lib |
| from chromite.lib import chroot_lib |
| from chromite.lib import constants |
| from chromite.lib import cros_build_lib |
| from chromite.lib import locking |
| from chromite.lib import metrics_lib |
| from chromite.lib import osutils |
| from chromite.lib import path_util |
| from chromite.lib import retry_util |
| from chromite.lib import sysroot_lib |
| from chromite.lib import timeout_util |
| from chromite.utils import gs_urls_util |
| from chromite.utils import key_value_store |
| |
| |
| # Version file location inside chroot. |
| CHROOT_VERSION_FILE = "/etc/cros_chroot_version" |
| # Version hooks directory. |
| _CHROOT_VERSION_HOOKS_DIR = ( |
| constants.CHROMITE_DIR / "sdk" / "chroot_version_hooks.d" |
| ) |
| |
| # Bash completion directory. |
| _BASH_COMPLETION_DIR = ( |
| f"{constants.CHROOT_SOURCE_ROOT}/chromite/sdk/etc/bash_completion.d" |
| ) |
| |
| |
| class Error(Exception): |
| """Base cros sdk error class.""" |
| |
| |
| class ChrootDeprecatedError(Error): |
| """Raised when the chroot is too old to update.""" |
| |
| def __init__(self, version) -> None: |
| # Message defined here because it's long and gives specific |
| # instructions. |
| super().__init__( |
| f"Upgrade hook missing for your chroot version {version}.\n" |
| "Your chroot is so old that some updates have been deprecated and " |
| "it will need to be recreated. A fresh chroot can be built " |
| "with:\n" |
| " cros_sdk --replace" |
| ) |
| |
| |
| class ChrootUpdateError(Error): |
| """Error encountered when updating the chroot.""" |
| |
| |
| class InvalidChrootVersionError(Error): |
| """Chroot version is not a valid version.""" |
| |
| |
| class UninitializedChrootError(Error): |
| """Chroot has not been initialized.""" |
| |
| |
| class VersionHasMultipleHooksError(Error): |
| """When it is found that a single version has multiple hooks.""" |
| |
| |
| def is_inside_chroot() -> bool: |
| """Returns True if we are inside chroot.""" |
| return os.path.exists(CHROOT_VERSION_FILE) |
| |
| |
| def is_outside_chroot() -> bool: |
| """Returns True if we are outside chroot.""" |
| return not is_inside_chroot() |
| |
| |
| def assert_inside_chroot(name: Optional[str] = None) -> None: |
| """Die if we are outside the chroot""" |
| name = name or Path(sys.argv[0]).name |
| assert is_inside_chroot(), f"{name}: please run inside the chroot" |
| |
| |
| def assert_outside_chroot(name: Optional[str] = None) -> None: |
| """Die if we are inside the chroot""" |
| name = name or Path(sys.argv[0]).name |
| assert is_outside_chroot(), f"{name}: please run outside the chroot" |
| |
| |
| def require_inside_chroot(_reason: str = ""): |
| """Decorator to assert a function must be called when inside the SDK.""" |
| |
| def outer(func): |
| @functools.wraps(func) |
| def wrapper(*args, **kwargs): |
| assert_inside_chroot(func.__name__) |
| return func(*args, **kwargs) |
| |
| return wrapper |
| |
| return outer |
| |
| |
| def require_outside_chroot(_reason: str = ""): |
| """Decorator to assert a function must be called when outside the SDK.""" |
| |
| def outer(func): |
| @functools.wraps(func) |
| def wrapper(*args, **kwargs): |
| assert_outside_chroot(func.__name__) |
| return func(*args, **kwargs) |
| |
| return wrapper |
| |
| return outer |
| |
| |
| def require_chroot(_reason: str = ""): |
| """Decorator to note the function requires the SDK. |
| |
| The function can be called from inside or outside the SDK and the function |
| handles entering as needed, but a chroot must have been instantiated. |
| This is currently only for documentation purposes. |
| """ |
| |
| def outer(func): |
| @functools.wraps(func) |
| def wrapper(*args, **kwargs): |
| return func(*args, **kwargs) |
| |
| return wrapper |
| |
| return outer |
| |
| |
| def chroot_not_required(func): |
| """Decorator to note the SDK has no effect on the function. |
| |
| The function does not use SDK specific functionality, and behaves |
| identically inside and outside the SDK. This is currently only for |
| documentation purposes. |
| """ |
| |
| @functools.wraps(func) |
| def wrapper(*args, **kwargs): |
| return func(*args, **kwargs) |
| |
| return wrapper |
| |
| |
| def GetChrootVersion(chroot): |
| """Extract the version of the chroot. |
| |
| Args: |
| chroot: Full path to the chroot to examine. |
| |
| Returns: |
| The version of the chroot dir, or None if the version is |
| missing/invalid. |
| """ |
| if chroot: |
| ver_path = os.path.join(chroot, CHROOT_VERSION_FILE.lstrip(os.sep)) |
| else: |
| ver_path = CHROOT_VERSION_FILE |
| |
| updater = ChrootUpdater(version_file=ver_path) |
| try: |
| return updater.GetVersion() |
| except (IOError, Error) as e: |
| logging.debug(e) |
| |
| return None |
| |
| |
| def IsChrootVersionValid(chroot_path, hooks_dir=None): |
| """Check if the chroot version exists and is a valid version.""" |
| version = GetChrootVersion(chroot_path) |
| return version and version <= LatestChrootVersion(hooks_dir) |
| |
| |
| def LatestChrootVersion(hooks_dir=None): |
| """Get the most recent update hook version.""" |
| hook_files = os.listdir(hooks_dir or _CHROOT_VERSION_HOOKS_DIR) |
| |
| # Hook file names must follow the "version_short_description" convention. |
| # Pull out just the version number and find the max. |
| return max(int(hook.split("_", 1)[0]) for hook in hook_files) |
| |
| |
| def EarliestChrootVersion(hooks_dir=None): |
| """Get the oldest update hook version.""" |
| hook_files = os.listdir(hooks_dir or _CHROOT_VERSION_HOOKS_DIR) |
| |
| # Hook file names must follow the "version_short_description" convention. |
| # Pull out just the version number and find the max. |
| return min(int(hook.split("_", 1)[0]) for hook in hook_files) |
| |
| |
| def IsChrootDirValid(chroot_path): |
| """Check the permissions and owner on a chroot directory. |
| |
| Args: |
| chroot_path: The path to a chroot. |
| |
| Returns: |
| bool - False iff there are incorrect values on an existing directory. |
| """ |
| if not os.path.exists(chroot_path): |
| # No directory == no incorrect values. |
| return True |
| |
| return IsChrootOwnerValid(chroot_path) and IsChrootPermissionsValid( |
| chroot_path |
| ) |
| |
| |
| def IsChrootOwnerValid(chroot_path): |
| """Check if the chroot owner is root.""" |
| chroot_stat = os.stat(chroot_path) |
| return not chroot_stat.st_uid and not chroot_stat.st_gid |
| |
| |
| def IsChrootPermissionsValid(chroot_path): |
| """Check if the permissions on the directory are correct.""" |
| chroot_stat = os.stat(chroot_path) |
| return chroot_stat.st_mode & 0o7777 == 0o755 |
| |
| |
| def IsChrootReady(chroot): |
| """Checks if the chroot is mounted and set up. |
| |
| /etc/cros_chroot_version is set to the current version of the chroot at the |
| end of the setup process. If this file exists and contains a non-zero |
| value, the chroot is ready for use. |
| |
| Args: |
| chroot: Full path to the chroot to examine. |
| |
| Returns: |
| True iff the chroot contains a valid version. |
| """ |
| version = GetChrootVersion(chroot) |
| return version is not None and version > 0 |
| |
| |
| @dataclasses.dataclass(frozen=True) |
| class SdkVersionConfig: |
| """Container for configs found in sdk_version.conf.""" |
| |
| latest_version: str |
| bootstrap_version: Optional[str] = None |
| bucket: Optional[str] = None |
| |
| @classmethod |
| def from_file( |
| cls, file: Union[str, "os.PathLike[str]", io.TextIOWrapper] |
| ) -> SdkVersionConfig: |
| """Load a SdkVersionConfig from a sdk_version.conf file. |
| |
| Args: |
| file: The file path or file-like object to load. |
| |
| Returns: |
| A SdkVersionConfig. |
| """ |
| conf = key_value_store.LoadFile(file) |
| return cls( |
| latest_version=conf["SDK_LATEST_VERSION"], |
| bootstrap_version=conf.get("BOOTSTRAP_FROZEN_VERSION"), |
| bucket=conf.get("SDK_BUCKET"), |
| ) |
| |
| @classmethod |
| def load(cls) -> SdkVersionConfig: |
| """Convenience method to call from_file() with default path. |
| |
| Returns: |
| A SdkVersionConfig. |
| """ |
| return cls.from_file(constants.SDK_VERSION_FILE_FULL_PATH) |
| |
| def get_default_version(self, bootstrap: bool = False) -> str: |
| """Get the default version that should be used. |
| |
| Args: |
| bootstrap: If true, provide the bootstrap version if defined. |
| |
| Returns: |
| The SDK version to be used. |
| """ |
| if bootstrap and self.bootstrap_version: |
| return self.bootstrap_version |
| return self.latest_version |
| |
| |
| def get_prefetch_sdk_versions() -> Set[str]: |
| """Get a reasonable set of SDK versions to have cached locally. |
| |
| This set includes: |
| 1. The current version in sdk_version.conf. |
| 2. If the user is tracking snapshot or main, the version from |
| cros-sdk-latest.conf. |
| |
| Returns: |
| A set of SDK versions. |
| """ |
| result = {SdkVersionConfig.load().latest_version} |
| checkout = path_util.DetermineCheckout() |
| if checkout.tracks_main: |
| url = get_sdk_latest_conf_file_url() |
| try: |
| with urllib.request.urlopen(url) as f: |
| data = key_value_store.LoadData(f.read().decode("utf-8")) |
| except urllib.error.URLError as e: |
| logging.warning( |
| "GET %s (error %s): ignoring for SDK prefetch version.", |
| url, |
| e, |
| ) |
| else: |
| result.add(data["LATEST_SDK"]) |
| return result |
| |
| |
| def get_sdk_gs_url( |
| suburl: str = "", |
| for_gsutil: bool = False, |
| override_bucket: Optional[str] = None, |
| ) -> str: |
| """Construct a Google Storage URL for an arbitrary file in the SDK bucket. |
| |
| Args: |
| suburl: The path to the file within the SDK bucket. |
| for_gsutil: Whether to return a URL for passing to `gsutil`. |
| override_bucket: If given and non-empty, use this URL instead of the |
| standard SDK bucket. |
| |
| Returns: |
| The fully constructed URL. |
| """ |
| return gs_urls_util.GetGsURL( |
| override_bucket or constants.SDK_GS_BUCKET, |
| for_gsutil=for_gsutil, |
| suburl=suburl, |
| ) |
| |
| |
| def get_sdk_tarball_url( |
| sdk_version: str, |
| file_extension: str = "tar.xz", |
| **kwargs: Any, |
| ) -> str: |
| """Return a Google Storage URL pointing to an SDK tarball. |
| |
| Args: |
| sdk_version: The SDK version to fetch a manifest for. |
| file_extension: The tarball's file extension. |
| **kwargs: Additional keyword arguments for get_sdk_gs_url(). |
| """ |
| tarball_basename = f"cros-sdk-{sdk_version}.{file_extension}" |
| return get_sdk_gs_url(suburl=tarball_basename, **kwargs) |
| |
| |
| def get_sdk_manifest_url(sdk_version: str, **kwargs: Any) -> str: |
| """Return a Google Storage URL pointing to an SDK manifest file. |
| |
| Args: |
| sdk_version: The SDK version to fetch a manifest for. |
| **kwargs: Additional keyword arguments for get_sdk_gs_url(). |
| """ |
| manifest_basename = f"cros-sdk-{sdk_version}.tar.xz.Manifest" |
| return get_sdk_gs_url(suburl=manifest_basename, **kwargs) |
| |
| |
| def get_sdk_latest_conf_file_url(**kwargs: Any) -> str: |
| """Return a Google Storage URL for the remote cros-sdk-latest.conf file. |
| |
| Args: |
| **kwargs: Additional keyword arguments for get_sdk_gs_url(). |
| """ |
| return get_sdk_gs_url(suburl="cros-sdk-latest.conf", **kwargs) |
| |
| |
| def fetch_remote_tarballs( |
| storage_dir: Path, |
| urls: List[str], |
| prefetch_versions: Optional[Set[str]] = None, |
| ) -> Path: |
| """Fetch a tarball given by url, and place it in |storage_dir|. |
| |
| Args: |
| storage_dir: Path in which to save the tarball. |
| urls: List of URLs to try to download. Download will stop on first |
| success. |
| prefetch_versions: Set of SDK versions which should not be discarded. |
| If not specified, get_prefetch_sdk_versions() will be used. |
| |
| Returns: |
| Full path to the downloaded file. |
| |
| Raises: |
| ValueError: None of the URLs worked. |
| """ |
| # Note we track content length ourselves since certain versions of curl |
| # fail if asked to resume a complete file. |
| # https://sourceforge.net/tracker/?func=detail&atid=100976&aid=3482927&group_id=976 |
| status_re = re.compile(rb"^HTTP/[0-9]+(\.[0-9]+)? 200") |
| for url in urls: |
| parsed = urllib.parse.urlparse(url) |
| tarball_name = os.path.basename(parsed.path) |
| if parsed.scheme in ("", "file"): |
| if os.path.exists(parsed.path): |
| return parsed.path |
| continue |
| content_length = 0 |
| logging.debug("Attempting download from %s", url) |
| result = retry_util.RunCurl( |
| ["-I", url], |
| print_cmd=False, |
| debug_level=logging.NOTICE, |
| capture_output=True, |
| ) |
| successful = False |
| for header in result.stdout.splitlines(): |
| # We must walk the output to find the 200 code for use cases where |
| # a proxy is involved and may have pushed down the actual header. |
| if status_re.match(header): |
| successful = True |
| elif header.lower().startswith(b"content-length:"): |
| content_length = int(header.split(b":", 1)[-1].strip()) |
| if successful: |
| break |
| if successful: |
| break |
| else: |
| raise ValueError("No valid URLs found!") |
| |
| osutils.SafeMakedirsNonRoot(storage_dir) |
| tarball_dest = storage_dir / tarball_name |
| lock_file = tarball_dest.with_name(f".{tarball_dest.name}.lock") |
| |
| with locking.FileLock(lock_file) as lock: |
| lock.write_lock(f"{tarball_dest} download lock") |
| current_size = 0 |
| if os.path.exists(tarball_dest): |
| current_size = os.path.getsize(tarball_dest) |
| if current_size > content_length: |
| osutils.SafeUnlink(tarball_dest) |
| current_size = 0 |
| |
| if current_size < content_length: |
| logging.notice("Downloading tarball %s ...", tarball_dest.name) |
| retry_util.RunCurl( |
| [ |
| "--fail", |
| "-L", |
| "-y", |
| "30", |
| "-C", |
| "-", |
| "--output", |
| tarball_dest, |
| url, |
| ], |
| print_cmd=False, |
| debug_level=logging.NOTICE, |
| ) |
| |
| # Cleanup old tarballs now since we've successfully fetched; only cleanup |
| # the tarballs for our prefix, or unknown ones. This gets a bit tricky |
| # because we might have partial overlap between known prefixes. |
| prefetch_versions = prefetch_versions or get_prefetch_sdk_versions() |
| for p in Path(storage_dir).glob("cros-sdk-*"): |
| if p.name == tarball_name: |
| continue |
| if any(p.name.startswith(f"cros-sdk-{x}") for x in prefetch_versions): |
| continue |
| logging.info("Cleaning up old tarball: %s", p) |
| osutils.SafeUnlink(p) |
| |
| return tarball_dest |
| |
| |
| def MountChrootPaths(chroot: chroot_lib.Chroot) -> None: |
| """Setup all the mounts for the |chroot|. |
| |
| NB: This assumes running in a unique mount namespace. If it is running in |
| the root mount namespace, then it will probably change settings for the |
| worse. |
| """ |
| KNOWN_FILESYSTEMS = set( |
| x.split()[-1] |
| for x in osutils.ReadFile("/proc/filesystems").splitlines() |
| ) |
| |
| path = Path(chroot.path).resolve() |
| out_dir = chroot.out_path |
| |
| logging.debug("Mounting chroot paths at %s", path) |
| |
| # Mark all existing mounts as slave mounts: that means changes made to |
| # mounts in the parent mount namespace will propagate down (like unmounts). |
| osutils.Mount(None, "/", None, osutils.MS_REC | osutils.MS_SLAVE) |
| |
| # If the mount path is already mounted, make it private so we can make |
| # changes without it propagating back out. |
| for info in osutils.IterateMountPoints(): |
| if info.destination == str(path): |
| osutils.Mount(None, path, None, osutils.MS_REC | osutils.MS_PRIVATE) |
| break |
| |
| # The source checkout must be mounted first. We'll be mounting paths into |
| # the chroot, and that chroot may live inside SOURCE_ROOT, so if we did |
| # the recursive bind at the end, we'd double bind things. |
| osutils.Mount( |
| constants.SOURCE_ROOT, |
| path / constants.CHROOT_SOURCE_ROOT.relative_to("/"), |
| "~/chromiumos", |
| osutils.MS_BIND | osutils.MS_REC, |
| ) |
| |
| # Prepare for pivot_root(2). `man 2 pivot_root` says new_root must be a |
| # mount point. |
| osutils.Mount( |
| path, |
| path, |
| None, |
| osutils.MS_BIND | osutils.MS_REC, |
| ) |
| |
| osutils.SafeMakedirsNonRoot(out_dir) |
| osutils.SafeMakedirs(path / constants.CHROOT_OUT_ROOT.relative_to("/")) |
| osutils.Mount( |
| out_dir, |
| path / constants.CHROOT_OUT_ROOT.relative_to("/"), |
| None, |
| osutils.MS_BIND | osutils.MS_REC, |
| ) |
| |
| for source_dir, dest_dir, mode in ( |
| ("tmp", "tmp", 0o1777), |
| ("home", "home", None), |
| ("build", "build", None), |
| ("sdk/bin", "usr/local/bin", None), |
| ("sdk/cache", "var/cache", None), |
| ("sdk/run", "run", None), |
| ("sdk/logs", "var/log", None), |
| ("sdk/tmp", "var/tmp", 0o1777), |
| ): |
| kwargs = {} |
| if mode is not None: |
| kwargs["mode"] = mode |
| |
| osutils.SafeMakedirsNonRoot(out_dir / source_dir, **kwargs) |
| osutils.SafeMakedirs(path / dest_dir) |
| osutils.Mount( |
| out_dir / source_dir, |
| path / dest_dir, |
| None, |
| osutils.MS_BIND | osutils.MS_REC, |
| ) |
| |
| # Bind mount a few /etc files, so sysroots can add their own users/groups. |
| for src, dst in ( |
| ("sdk/passwd", "etc/passwd"), |
| ("sdk/group", "etc/group"), |
| ("sdk/shadow", "etc/shadow"), |
| ): |
| if not (out_dir / src).exists(): |
| # Grab a unique lock here, as we only need fine-grained coverage |
| # over these passwd/group/shadow files, in the infrequent |
| # (first-time initialization) case that they haven't been copied |
| # over before. |
| with locking.FileLock( |
| out_dir / ".passwd_lock", |
| "passwd lock", |
| blocking_timeout=30, |
| ) as lock: |
| lock.write_lock() |
| # Check again now that we have the lock. If it exists now, we |
| # lost the race, but that's OK. |
| if (out_dir / src).exists(): |
| break |
| osutils.SafeMakedirsNonRoot((out_dir / src).parent) |
| shutil.copy2(path / dst, out_dir / src) |
| osutils.Mount(out_dir / src, path / dst, None, osutils.MS_BIND) |
| |
| defflags = ( |
| osutils.MS_NOSUID |
| | osutils.MS_NODEV |
| | osutils.MS_NOEXEC |
| | osutils.MS_RELATIME |
| ) |
| osutils.Mount("proc", path / "proc", "proc", defflags) |
| osutils.Mount("sysfs", path / "sys", "sysfs", defflags) |
| |
| if "binfmt_misc" in KNOWN_FILESYSTEMS: |
| try: |
| osutils.Mount( |
| "binfmt_misc", |
| path / "proc/sys/fs/binfmt_misc", |
| "binfmt_misc", |
| defflags, |
| ) |
| except PermissionError: |
| # We're in an environment where we can't mount binfmt_misc (e.g. a |
| # container), so ignore it for now. We need it for unittests via |
| # qemu, but nothing else currently. |
| pass |
| |
| # We expose /dev so we can access loopback & USB drives for flashing. |
| osutils.Mount("/dev", path / "dev", None, osutils.MS_BIND | osutils.MS_REC) |
| |
| |
| FileSystemDebugInfo = collections.namedtuple( |
| "FileSystemDebugInfo", ("fuser", "lsof", "ps") |
| ) |
| |
| |
| def GetFileSystemDebug(path: str, run_ps: bool = True) -> FileSystemDebugInfo: |
| """Collect filesystem debugging information. |
| |
| Dump some information to help find processes that may still be sing |
| files. Running ps auxf can also be done to see what processes are |
| still running. |
| |
| Args: |
| path: Full path for directory we want information on. |
| run_ps: When true, show processes running. |
| |
| Returns: |
| FileSystemDebugInfo with debug info. |
| """ |
| cmd_kwargs = { |
| "check": False, |
| "capture_output": True, |
| "encoding": "utf-8", |
| "errors": "replace", |
| } |
| fuser = cros_build_lib.sudo_run(["fuser", path], **cmd_kwargs) |
| lsof = cros_build_lib.sudo_run(["lsof", path], **cmd_kwargs) |
| if run_ps: |
| ps = cros_build_lib.run(["ps", "auxf"], **cmd_kwargs) |
| ps_stdout = ps.stdout |
| else: |
| ps_stdout = None |
| return FileSystemDebugInfo(fuser.stdout, lsof.stdout, ps_stdout) |
| |
| |
| # Raise an exception if cleanup takes more than 10 minutes. |
| @timeout_util.TimeoutDecorator(600) |
| def CleanupChroot( |
| chroot: chroot_lib.Chroot, |
| delete_out: bool = True, |
| ) -> None: |
| """Deletes a chroot, and possibly its output directory. |
| |
| Args: |
| chroot: The chroot to examine. |
| delete_out: Whether to also delete the chroot output directory. |
| """ |
| with metrics_lib.timer("cros_sdk_lib.CleanupChroot.RmDir.Chroot"): |
| osutils.RmDir(chroot.path, ignore_missing=True, sudo=True) |
| if delete_out: |
| with metrics_lib.timer("cros_sdk_lib.CleanupChroot.RmDir.out"): |
| osutils.RmDir(chroot.out_path, ignore_missing=True, sudo=True) |
| |
| |
| def RunChrootVersionHooks(version_file=None, hooks_dir=None) -> None: |
| """Run the chroot version hooks to bring the chroot up to date.""" |
| if not cros_build_lib.IsInsideChroot(): |
| command = ["run_chroot_version_hooks"] |
| cros_build_lib.run(command, enter_chroot=True) |
| else: |
| chroot = ChrootUpdater(version_file=version_file, hooks_dir=hooks_dir) |
| chroot.ApplyUpdates() |
| |
| |
| def InitLatestVersion(version_file=None, hooks_dir=None) -> None: |
| """Initialize the chroot version to the latest version.""" |
| if not cros_build_lib.IsInsideChroot(): |
| # Run the command in the chroot. |
| command = ["run_chroot_version_hooks", "--init-latest"] |
| cros_build_lib.run(command, enter_chroot=True) |
| else: |
| # Initialize the version. |
| chroot = ChrootUpdater(version_file=version_file, hooks_dir=hooks_dir) |
| if chroot.IsInitialized(): |
| logging.info( |
| "Chroot is already initialized to %s.", chroot.GetVersion() |
| ) |
| else: |
| logging.info( |
| "Initializing chroot to version %s.", chroot.latest_version |
| ) |
| chroot.SetVersion(chroot.latest_version) |
| |
| |
| class ChrootUpdater: |
| """Chroot version and update related functionality.""" |
| |
| def __init__(self, version_file=None, hooks_dir=None) -> None: |
| if version_file: |
| # We have one. Just here to skip the logic below since we don't need |
| # it. |
| default_version_file = None |
| elif cros_build_lib.IsInsideChroot(): |
| # Use the absolute path since we're inside the chroot. |
| default_version_file = CHROOT_VERSION_FILE |
| else: |
| # Otherwise convert to the path outside the chroot. |
| default_version_file = path_util.FromChrootPath(CHROOT_VERSION_FILE) |
| |
| self._version_file = version_file or default_version_file |
| self._hooks_dir = hooks_dir or _CHROOT_VERSION_HOOKS_DIR |
| |
| self._version = None |
| self._latest_version = None |
| self._hook_files = None |
| |
| @property |
| def latest_version(self): |
| """Get the highest available version for the chroot.""" |
| if self._latest_version is None: |
| self._latest_version = LatestChrootVersion(self._hooks_dir) |
| return self._latest_version |
| |
| def GetVersion(self): |
| """Get the chroot version. |
| |
| Returns: |
| int |
| |
| Raises: |
| InvalidChrootVersionError: when the file contents are not a valid |
| version. |
| IOError: when the file cannot be read. |
| UninitializedChrootError: when the version file does not exist. |
| """ |
| if self._version is None: |
| # Check for existence so IOErrors from osutils.ReadFile are limited |
| # to permissions problems. |
| if not os.path.exists(self._version_file): |
| raise UninitializedChrootError( |
| "Version file does not exist: %s" % self._version_file |
| ) |
| |
| version = osutils.ReadFile(self._version_file) |
| |
| try: |
| self._version = int(version) |
| except ValueError: |
| raise InvalidChrootVersionError( |
| "Invalid chroot version in %s: %s" |
| % (self._version_file, version) |
| ) |
| else: |
| logging.debug("Found chroot version %s", self._version) |
| |
| return self._version |
| |
| def SetVersion(self, version) -> None: |
| """Set and store the chroot version.""" |
| self._version = version |
| osutils.WriteFile(self._version_file, str(version), sudo=True) |
| |
| # TODO(2023-11-01): Owner by default should be root, but older chroots |
| # used to set this to the user. Force this to be root to keep all |
| # chroots in a consistent state. We can drop this after we stop caring |
| # about chroots that are too old. |
| osutils.Chown(self._version_file, user="root") |
| |
| def IsInitialized(self): |
| """Initialized Check.""" |
| try: |
| return self.GetVersion() > 0 |
| except (Error, IOError): |
| return False |
| |
| def ApplyUpdates(self) -> None: |
| """Apply all necessary updates to the chroot.""" |
| if self.GetVersion() > self.latest_version: |
| raise InvalidChrootVersionError( |
| "Missing upgrade hook for version %s.\n" |
| "Chroot is too new. Consider running:\n" |
| " cros_sdk --replace\n" |
| "If the chroot is brand new, retrieve latest hooks with:\n" |
| " repo sync" % self.GetVersion() |
| ) |
| |
| for hook, version in self.GetChrootUpdates(): |
| result = cros_build_lib.run( |
| ["bash", hook], enter_chroot=True, check=False |
| ) |
| if not result.returncode: |
| self.SetVersion(version) |
| else: |
| raise ChrootUpdateError( |
| "Error running chroot version hook: %s" % hook |
| ) |
| |
| def GetChrootUpdates(self): |
| """Get all (update file, version) pairs that have not been run. |
| |
| Returns: |
| list of (/path/to/hook/file, version) pairs in order. |
| |
| Raises: |
| ChrootDeprecatedError when one or more required update files have |
| been deprecated. |
| """ |
| hooks = self._GetHookFilesByVersion() |
| |
| # Create the relevant ChrootUpdates. |
| updates = [] |
| # Current version has already been run and we need to run the latest, so |
| # +1 for each end of the version range. |
| for version in range(self.GetVersion() + 1, self.latest_version + 1): |
| # Deprecation check: Deprecation is done by removing old scripts. |
| # Updates must form a continuous sequence. If the sequence is broken |
| # between the chroot's current version and the most recent, then the |
| # chroot must be recreated. |
| if version not in hooks: |
| raise ChrootDeprecatedError(self.GetVersion()) |
| |
| updates.append((hooks[version], version)) |
| |
| return updates |
| |
| def _GetHookFilesByVersion(self): |
| """Find and store the hooks by their version number. |
| |
| Returns: |
| dict - {version: /path/to/hook/file} mapping. |
| |
| Raises: |
| VersionHasMultipleHooksError when multiple hooks exist for a |
| version. |
| """ |
| if self._hook_files: |
| return self._hook_files |
| |
| hook_files = {} |
| for hook in os.listdir(self._hooks_dir): |
| version = int(hook.split("_", 1)[0]) |
| |
| # Sanity check: Each version may only have a single script. Multiple |
| # CLs landed at the same time and no one noticed the version |
| # overlap. |
| if version in hook_files: |
| raise VersionHasMultipleHooksError( |
| "Version %s has multiple hooks." % version |
| ) |
| |
| hook_files[version] = os.path.join(self._hooks_dir, hook) |
| |
| self._hook_files = hook_files |
| return self._hook_files |
| |
| |
| class ChrootCreator: |
| """Creates a new chroot from a given SDK. |
| |
| Note: For the lifetime of this class, no paths are mounted in the chroot. |
| Thus, some standard path conversion functions like path_util.FromChrootPath |
| and chroot.full_path might return paths that don't exist. Instead, use |
| self._from_chroot_path(). |
| """ |
| |
| # If the host timezone isn't set, we'll use this inside the SDK. |
| DEFAULT_TZ = "usr/share/zoneinfo/PST8PDT" |
| |
| # Groups to add the user to inside the chroot. |
| # This group list is a bit dated and probably contains a number of items |
| # that no longer make sense. |
| # TODO(crbug.com/762445): Remove "adm". |
| # TODO(build): Remove cdrom & floppy. |
| # TODO(build): See if audio is still needed. Host distros might use diff |
| # "audio" group, so we wouldn't get access to /dev/snd/ nodes directly. |
| # TODO(build): See if video is still needed. Host distros might use diff |
| # "video" group, so we wouldn't get access to /dev/dri/ nodes directly. |
| DEFGROUPS = {"adm", "cdrom", "floppy", "audio", "video", "portage"} |
| |
| def __init__( |
| self, |
| chroot: chroot_lib.Chroot, |
| sdk_tarball: Path, |
| ) -> None: |
| """Initialize. |
| |
| Args: |
| chroot: Chroot object representing the parameters for the chroot to |
| create. |
| sdk_tarball: Path to a downloaded Chromium OS SDK tarball. |
| """ |
| self.chroot = chroot |
| self.sdk_tarball = sdk_tarball |
| self._sysroot = sysroot_lib.Sysroot(chroot.path) |
| |
| @metrics_lib.timed("cros_sdk_lib.ChrootCreator._make_chroot") |
| def _make_chroot(self) -> None: |
| """Create the chroot.""" |
| # TODO(zbehan): Configure stuff that is usually done in postinst's, |
| # but wasn't. Fix the postinst's. |
| # NB: We don't use self.chroot.run because that requires the SDK be |
| # initialized since it uses `cros_sdk` to enter. |
| cros_build_lib.dbg_run( |
| ["chroot", self.chroot.path, "env-update", "--no-ldconfig"], |
| extra_env={ |
| # We need to use the PATH that makes sense inside the SDK, |
| # not whatever the host env is using. |
| "PATH": "/bin:/sbin:/usr/bin:/usr/sbin", |
| }, |
| ) |
| |
| def init_timezone(self) -> None: |
| """Setup the timezone info inside the chroot.""" |
| tz_path = Path("etc/localtime") |
| host_tz = "/" / tz_path |
| chroot_tz = Path(self.chroot.full_path(host_tz)) |
| # Nuke it in case it's a broken symlink. |
| osutils.SafeUnlink(chroot_tz) |
| if host_tz.exists(): |
| logging.debug("%s: copying from %s", chroot_tz, host_tz) |
| chroot_tz.write_bytes(host_tz.read_bytes()) |
| else: |
| logging.debug("%s: symlinking to %s", chroot_tz, self.DEFAULT_TZ) |
| chroot_tz.symlink_to(self.DEFAULT_TZ) |
| |
| def init_user( |
| self, |
| user: Optional[str] = None, |
| uid: Optional[int] = None, |
| gid: Optional[int] = None, |
| ) -> None: |
| """Setup the current user inside the chroot. |
| |
| The user account name & id are synced with the active account outside of |
| the SDK. This helps facilitate copying of files in & out of the SDK |
| without the need of sudo. |
| |
| The user account must not already exist inside the SDK (as a |
| pre-existing reserved name) otherwise we can't create it with the right |
| uid. |
| |
| Args: |
| user: The username to create. |
| uid: The new account's userid. |
| gid: The new account's groupid. |
| """ |
| if not user: |
| user = os.getenv("SUDO_USER") |
| assert user is not None |
| if uid is None: |
| uid_str = os.getenv("SUDO_UID") |
| assert uid_str is not None |
| uid = int(uid_str) |
| if gid is None: |
| gid = pwd.getpwnam(user).pw_gid |
| |
| path = Path(self.chroot.full_path("/etc/passwd")) |
| lines = path.read_text(encoding="utf-8").splitlines() |
| |
| # Make sure the user isn't one the existing reserved ones. |
| for line in lines: |
| existing_user = line.split(":", 1)[0] |
| if existing_user == user: |
| cros_build_lib.Die( |
| f"{user}: this account cannot be used to build CrOS" |
| ) |
| |
| # Create the account. |
| home = f"/home/{user}" |
| line = f"{user}:x:{uid}:{gid}:ChromeOS Developer:{home}:/bin/bash" |
| logging.debug("%s: adding user: %s", path, line) |
| lines.insert(0, line) |
| path.write_text("\n".join(lines) + "\n", encoding="utf-8") |
| |
| home_path = Path(self.chroot.full_path(home)) |
| # If |home_path| exists, a chroot has already been established for this |
| # tree. Skip reestablishing. |
| if not home_path.exists(): |
| self.init_user_home(home_path, uid, gid) |
| |
| def init_group( |
| self, |
| user: Optional[str] = None, |
| groups: Optional[Set[str]] = None, |
| group: Optional[str] = None, |
| gid: Optional[int] = None, |
| ) -> None: |
| """Setup the current user's groups inside the chroot. |
| |
| This will create the user's primary group and add them to a bunch of |
| supplemental groups. The primary group is synced with the active |
| account outside the SDK to help facilitate accessing of files & |
| resources (e.g. any /dev nodes). |
| |
| The primary group must not already exist inside the SDK (as a |
| pre-existing reserved name) otherwise we can't create it with the right |
| gid. |
| |
| Args: |
| user: The username to add to groups. |
| groups: The account's supplemental groups. |
| group: The account's primary group (to be created). |
| gid: The primary group's gid. |
| """ |
| if not user: |
| user = os.getenv("SUDO_USER") |
| assert user is not None |
| if groups is None: |
| groups = self.DEFGROUPS |
| if gid is None: |
| gid = pwd.getpwnam(user).pw_gid |
| if group is None: |
| group = grp.getgrgid(gid).gr_name |
| |
| path = Path(self.chroot.full_path("/etc/group")) |
| lines = path.read_text(encoding="utf-8").splitlines() |
| |
| # Make sure the group isn't one the existing reserved ones. |
| # Add the user to all the existing ones too. |
| for i, line in enumerate(lines): |
| entry = line.split(":") |
| if entry[0] == group: |
| # If the group exists with the same gid, no need to add a new |
| # one. This often comes up with e.g. the "users" group. |
| if entry[2] == str(gid): |
| return |
| cros_build_lib.Die( |
| f"{group}: this group cannot be used to build CrOS" |
| ) |
| if entry[0] in groups: |
| if entry[-1]: |
| entry[-1] += "," |
| entry[-1] += user |
| lines[i] = ":".join(entry) |
| |
| line = f"{group}:x:{gid}:{user}" |
| logging.debug("%s: adding group: %s", path, line) |
| lines.insert(0, line) |
| path.write_text("\n".join(lines) + "\n", encoding="utf-8") |
| |
| def init_user_home(self, home: Path, uid: int, gid: int) -> None: |
| """Initialize the user's /home dir.""" |
| shutil.copytree(self.chroot.full_path("/etc/skel"), home) |
| |
| (home / "chromiumos").symlink_to(constants.CHROOT_SOURCE_ROOT) |
| |
| bash_profile = home / ".bash_profile" |
| osutils.Touch(bash_profile) |
| data = bash_profile.read_text(encoding="utf-8").rstrip() |
| if data: |
| data += "\n\n" |
| # Automatically change to scripts directory. |
| data += 'cd "${CHROOT_CWD:-${HOME}/chromiumos/src/scripts}"\n\n' |
| bash_profile.write_text(data, encoding="utf-8") |
| |
| osutils.Chown(home, uid, group=gid, recursive=True) |
| |
| def init_filesystem_basic(self) -> None: |
| """Setup various dirs & simple config files.""" |
| # Create mount point directories. NB: we don't want to translate them |
| # via chroot.full_path(), because that would map to, e.g., the source |
| # directory (/path/to/chromiumos/src) instead of the mount-point we |
| # want to create (/path/to/chromiumos/chroot/mnt/host/source). |
| for path in ( |
| constants.CHROOT_SOURCE_ROOT, |
| constants.CHROOT_OUT_ROOT, |
| ): |
| (Path(self.chroot.path) / path.relative_to("/")).mkdir( |
| mode=0o755, parents=True, exist_ok=True |
| ) |
| |
| def init_etc(self, user: Optional[str] = None) -> None: |
| """Setup the /etc paths.""" |
| if user is None: |
| user = os.getenv("SUDO_USER") |
| |
| etc_dir = Path(self.chroot.full_path("/etc")) |
| |
| # Setup some symlinks. |
| mtab = etc_dir / "mtab" |
| if not mtab.is_symlink(): |
| osutils.SafeUnlink(mtab) |
| mtab.symlink_to("/proc/mounts") |
| |
| # Copy config from outside chroot into chroot. |
| for path in ("hosts", "resolv.conf"): |
| host_path = Path("/etc") / path |
| chroot_path = etc_dir / path |
| if host_path.exists(): |
| chroot_path.write_bytes(host_path.read_bytes()) |
| chroot_path.chmod(0o644) |
| |
| # Setup the SDK make.conf file. |
| build_target = build_target_lib.BuildTarget( |
| constants.CHROOT_BUILDER_BOARD |
| ) |
| self._sysroot.InstallMakeConfSdk(build_target) |
| |
| # Add chromite/sdk/bin and chromite/bin into the path globally. We rely |
| # on 'env-update' getting called later. |
| env_d = etc_dir / "env.d" / "99chromiumos" |
| chroot_chromite_bin = ( |
| constants.CHROOT_SOURCE_ROOT / constants.CHROMITE_BIN_SUBDIR |
| ) |
| env_d.write_text( |
| f"""\ |
| PATH="{constants.CHROOT_SOURCE_ROOT}/chromite/sdk/bin:{chroot_chromite_bin}" |
| CROS_WORKON_SRCROOT="{constants.CHROOT_SOURCE_ROOT}" |
| PORTAGE_USERNAME="{user}" |
| """, |
| encoding="utf-8", |
| ) |
| |
| profile_d = etc_dir / "profile.d" |
| profile_d.mkdir(mode=0o755, parents=True, exist_ok=True) |
| for f in ("40-chromeos-cachedir.sh", "50-chromiumos-niceties.sh"): |
| (profile_d / f).symlink_to( |
| f"{constants.CHROOT_SOURCE_ROOT}/chromite/sdk/etc/profile.d/{f}" |
| ) |
| |
| # Enable bash completion. |
| bash_completion_d = etc_dir / "bash_completion.d" |
| bash_completion_d.mkdir(mode=0o755, parents=True, exist_ok=True) |
| (bash_completion_d / "cros").symlink_to(f"{_BASH_COMPLETION_DIR}/cros") |
| |
| # Use the standardized upgrade script to setup proxied vars. |
| cros_build_lib.dbg_run( |
| [ |
| constants.CHROMITE_SHELL_DIR |
| / "sdk_lib" |
| / "rewrite-sudoers.d.sh", |
| self.chroot.path, |
| user, |
| ] |
| + list(constants.CHROOT_ENVIRONMENT_ALLOWLIST) |
| ) |
| |
| def init_var(self, uid: Optional[int] = None) -> None: |
| """Handle /var contents from SDK tarball.""" |
| if uid is None: |
| uid_str = os.getenv("SUDO_UID") |
| assert uid_str is not None |
| uid = int(uid_str) |
| |
| for chroot_path, out_path in ( |
| ("var/cache", "sdk/cache"), |
| ("var/log", "sdk/logs"), |
| ): |
| src_dir = Path(self.chroot.path) / chroot_path |
| dst_dir = self.chroot.out_path / out_path |
| # chroot source didn't have this path? Then skip it. |
| if not src_dir.exists(): |
| continue |
| |
| osutils.SafeMakedirsNonRoot(dst_dir) |
| osutils.MoveDirContents(src_dir, dst_dir, allow_nonempty=True) |
| |
| cache_dir = self.chroot.full_path(constants.CHROOT_CACHE_ROOT) |
| osutils.Chown(cache_dir, uid, group=constants.PORTAGE_GID) |
| |
| # Create edb cache stub directories. |
| edb_cache_dep = Path( |
| self.chroot.full_path(constants.CHROOT_EDB_CACHE_ROOT / "dep") |
| ) |
| osutils.SafeMakedirs(edb_cache_dep, mode=0o2775) |
| # Set users/groups. |
| osutils.Chown( |
| edb_cache_dep, |
| constants.PORTAGE_UID, |
| group=constants.PORTAGE_GID, |
| recursive=True, |
| ) |
| |
| @metrics_lib.timed("cros_sdk_lib.ChrootCreator.run") |
| def run( |
| self, |
| user: Optional[str] = None, |
| uid: Optional[int] = None, |
| group: Optional[str] = None, |
| gid: Optional[int] = None, |
| ) -> None: |
| """Create the chroot. |
| |
| Args: |
| user: The user account to use (e.g. for testing). |
| uid: The user id to use (e.g. for testing). |
| group: The group account to use (e.g. for testing). |
| gid: The group id to use (e.g. for testing). |
| """ |
| logging.notice("Creating chroot. This may take a few minutes...") |
| |
| metrics_prefix = "cros_sdk_lib.ChrootCreator.run" |
| with metrics_lib.timer(f"{metrics_prefix}.ExtractSdkTarball"): |
| # Unpack the chroot. |
| Path(self.chroot.path).mkdir( |
| mode=0o755, parents=True, exist_ok=True |
| ) |
| cros_build_lib.ExtractTarball(self.sdk_tarball, self.chroot.path) |
| |
| with metrics_lib.timer(f"{metrics_prefix}.init"): |
| self.init_timezone() |
| self.init_user(user=user, uid=uid, gid=gid) |
| self.init_group(user=user, group=group, gid=gid) |
| self.init_filesystem_basic() |
| self.init_etc(user=user) |
| self.init_var(uid=uid) |
| |
| MountChrootPaths(self.chroot) |
| |
| self._make_chroot() |
| |
| |
| @metrics_lib.timed("cros_sdk_lib.CreateChroot") |
| def CreateChroot(*args, **kwargs) -> None: |
| """Convenience method.""" |
| ChrootCreator(*args, **kwargs).run() |
| |
| |
| class ChrootEnteror: |
| """Enters an existing chroot (and syncs state we care about).""" |
| |
| ENTER_CHROOT = os.path.join( |
| constants.CHROMITE_SHELL_DIR, "sdk_lib/enter_chroot.sh" |
| ) |
| |
| # The rlimits we will lookup & pass down, in order. |
| RLIMITS_TO_PASS = ( |
| resource.RLIMIT_AS, |
| resource.RLIMIT_CORE, |
| resource.RLIMIT_CPU, |
| resource.RLIMIT_FSIZE, |
| resource.RLIMIT_MEMLOCK, |
| resource.RLIMIT_NICE, |
| resource.RLIMIT_NOFILE, |
| resource.RLIMIT_NPROC, |
| resource.RLIMIT_RSS, |
| resource.RLIMIT_STACK, |
| ) |
| |
| # We want a proc limit at least this small. |
| _RLIMIT_NPROC_MIN = 4096 |
| |
| # We want a file limit at least this small. |
| _RLIMIT_NOFILE_MIN = 262144 |
| |
| # Path to sysctl knob. Class-level constant for easy test overrides. |
| _SYSCTL_VM_MAX_MAP_COUNT = Path("/sys/vm/max_map_count") |
| |
| def __init__( |
| self, |
| chroot: "chroot_lib.Chroot", |
| chrome_root_mount: Optional[Path] = None, |
| cmd: Optional[List[str]] = None, |
| cwd: Optional[Path] = None, |
| read_only: bool = False, |
| ) -> None: |
| """Initialize. |
| |
| Args: |
| chroot: Where the new chroot will be created. |
| chrome_root_mount: Where to mount |chrome_root| inside the chroot. |
| cmd: Program to run inside the chroot. |
| cwd: Directory to change to before running |additional_args|. |
| read_only: Whether to mount the chroot read-only. |
| """ |
| self.chroot = chroot |
| self.chrome_root_mount = chrome_root_mount |
| self.cmd = cmd |
| self.read_only = read_only |
| |
| if cwd and not cwd.is_absolute(): |
| cwd = Path(chroot.chroot_path(cwd)) |
| self.cwd = cwd |
| |
| def _check_chroot(self) -> None: |
| """Verify the chroot is usable.""" |
| st = os.statvfs( |
| Path(self.chroot.full_path(Path("/") / "usr" / "bin" / "sudo")) |
| ) |
| if st.f_flag & os.ST_NOSUID: |
| cros_build_lib.Die("chroot cannot be in a nosuid mount") |
| |
| def _enter_chroot( |
| self, cmd: Optional[List[str]] = None, cwd: Optional[Path] = None |
| ) -> cros_build_lib.CompletedProcess: |
| """Enter the chroot.""" |
| self._check_chroot() |
| |
| if cmd is None: |
| cmd = self.cmd |
| if cwd is None: |
| cwd = self.cwd |
| |
| wrapper = [self.ENTER_CHROOT] + self.chroot.get_enter_args( |
| for_shell=True |
| ) |
| if self.chrome_root_mount: |
| wrapper += ["--chrome_root_mount", str(self.chrome_root_mount)] |
| if cwd: |
| wrapper += ["--working_dir", str(cwd)] |
| |
| if cmd: |
| wrapper += ["--"] + cmd |
| |
| return cros_build_lib.dbg_run(wrapper, check=False) |
| |
| @classmethod |
| def get_rlimits(cls) -> str: |
| """Serialize current rlimits.""" |
| return str(tuple(resource.getrlimit(x) for x in cls.RLIMITS_TO_PASS)) |
| |
| @classmethod |
| def set_rlimits(cls, limits: str) -> None: |
| """Deserialize rlimits.""" |
| for rlim, limit in zip(cls.RLIMITS_TO_PASS, ast.literal_eval(limits)): |
| cur_limit = resource.getrlimit(rlim) |
| if cur_limit != limit: |
| # Turn the number into a symbolic name for logging. |
| name = "RLIMIT_???" |
| for name, num in resource.__dict__.items(): |
| if name.startswith("RLIMIT_") and num == rlim: |
| break |
| logging.debug( |
| "Restoring user rlimit %s from %r to %r", |
| name, |
| cur_limit, |
| limit, |
| ) |
| |
| resource.setrlimit(rlim, limit) |
| |
| def _setup_rlimit_nproc(self) -> None: |
| """Update process rlimits.""" |
| # Some systems set the soft limit too low. Bump it to the hard limit. |
| # We don't override the hard limit because it's something the admins put |
| # in place and we want to respect such configs. http://b/234353695 |
| soft, hard = resource.getrlimit(resource.RLIMIT_NPROC) |
| if soft != resource.RLIM_INFINITY and soft < self._RLIMIT_NPROC_MIN: |
| if soft < hard or hard == resource.RLIM_INFINITY: |
| resource.setrlimit(resource.RLIMIT_NPROC, (hard, hard)) |
| |
| def _setup_vm_max_map_count(self) -> None: |
| """Update OS limits as ThinLTO opens lots of files at the same time.""" |
| soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) |
| resource.setrlimit( |
| resource.RLIMIT_NOFILE, |
| ( |
| max(soft, self._RLIMIT_NOFILE_MIN), |
| max(hard, self._RLIMIT_NOFILE_MIN), |
| ), |
| ) |
| try: |
| max_map_count = int( |
| self._SYSCTL_VM_MAX_MAP_COUNT.read_text(encoding="utf-8") |
| ) |
| except FileNotFoundError: |
| return |
| if max_map_count < self._RLIMIT_NOFILE_MIN: |
| logging.notice( |
| "Raising vm.max_map_count from %s to %s", |
| max_map_count, |
| self._RLIMIT_NOFILE_MIN, |
| ) |
| self._SYSCTL_VM_MAX_MAP_COUNT.write_text( |
| str(self._RLIMIT_NOFILE_MIN), encoding="utf-8" |
| ) |
| |
| def run( |
| self, cmd: Optional[List[str]] = None, cwd: Optional[Path] = None |
| ) -> cros_build_lib.CompletedProcess: |
| """Enter the chroot.""" |
| if "CHROMEOS_SUDO_RLIMITS" in os.environ: |
| self.set_rlimits(os.environ.pop("CHROMEOS_SUDO_RLIMITS")) |
| self._setup_rlimit_nproc() |
| self._setup_vm_max_map_count() |
| if self.read_only: |
| with ChrootReadOnly(path=self.chroot.path): |
| return self._enter_chroot(cmd=cmd, cwd=cwd) |
| else: |
| return self._enter_chroot(cmd=cmd, cwd=cwd) |
| |
| |
| def EnterChroot(*args, **kwargs) -> cros_build_lib.CompletedProcess: |
| """Convenience method.""" |
| return ChrootEnteror(*args, **kwargs).run() |
| |
| |
| class _ChrootWritable: |
| """A context manager for ensuring the Chroot mount writability.""" |
| |
| def __init__( |
| self, writable: bool, path: Union[str, os.PathLike] = "/" |
| ) -> None: |
| self._want_read_only = not writable |
| self._chroot_path = path |
| self._needs_remount = False |
| |
| def __enter__(self) -> None: |
| # This context manager doesn't make sense outside the chroot. |
| assert IsChrootReady(self._chroot_path) |
| |
| assert osutils.IsMounted(self._chroot_path) |
| self._needs_remount = ( |
| osutils.IsMountedReadOnly(self._chroot_path) != self._want_read_only |
| ) |
| |
| if self._needs_remount: |
| self._remount(read_only=self._want_read_only) |
| |
| def __exit__(self, _type, _value, _traceback) -> None: |
| if self._needs_remount: |
| # Path mounts may change (e.g., pivot_root on chroot entry), which |
| # means the path mount looks different by the time we exit. Just |
| # ignore it. |
| if not osutils.IsMounted(self._chroot_path): |
| return |
| |
| self._remount(read_only=not self._want_read_only) |
| |
| def _remount(self, read_only: bool) -> None: |
| """Perform the remount operation. |
| |
| Args: |
| read_only: if True, remount read-only; otherwise, remount |
| read/write. |
| """ |
| logging.debug( |
| "Re-mounting chroot %s", "read-only" if read_only else "read-write" |
| ) |
| try: |
| ro = osutils.MS_RDONLY if read_only else 0 |
| osutils.Mount( |
| None, |
| self._chroot_path, |
| None, |
| osutils.MS_REMOUNT | osutils.MS_BIND | ro, |
| ) |
| except PermissionError: |
| # Try via sudo instead. |
| ro = "ro" if read_only else "rw" |
| cros_build_lib.sudo_run( |
| [ |
| "mount", |
| "-o", |
| ",".join(("remount", "bind", ro)), |
| self._chroot_path, |
| ] |
| ) |
| |
| |
| class ChrootReadWrite(_ChrootWritable): |
| """Context manager for ensuring the Chroot mount is read/write. |
| |
| Operations that need to update the main chroot mount (i.e., the contents of |
| |Chroot.path|, not |Chroot.out_path|) may require the chroot be mounted in a |
| writable state. Such operations should be performed within this |
| ChrootReadWrite context manager. |
| |
| If the chroot is already mounted read/write, then this context manager is a |
| no-op. |
| |
| Note: carefully consider whether you really want to mount the chroot |
| read/write. Most writable state should go into |Chroot.out_path|, such that |
| we can avoid writing to |Chroot.path| most of the time. |
| |
| Examples: |
| |
| # Perform some chroot updates. The chroot may already be writable, but |
| # we document it anyway. |
| with ChrootReadWrite(): |
| PerformChrootUpdates() |
| |
| # Perform some chroot updates on chroot entry. The chroot is mounted |
| # read-only, and we want it read/write only for the Update operations. |
| with ChrootReadOnly(): |
| ... |
| with ChrootReadWrite(): |
| # Do a few maintenance steps read/write: |
| PerformChrootUpdates() |
| # Back to regular SDK shell, read-only. |
| """ |
| |
| def __init__(self, path: Union[str, os.PathLike] = "/") -> None: |
| """Initialize a ChrootReadWrite context manager. |
| |
| Args: |
| path: The chroot mount point. |
| """ |
| super().__init__(writable=True, path=path) |
| |
| |
| class ChrootReadOnly(_ChrootWritable): |
| """Context manager for ensuring the Chroot mount is read-only. |
| |
| Most code should assume that the chroot may be mounted read-only on chroot |
| entry, and so should be using a ChrootReadWrite manager for operations |
| where we need a writable chroot. See ChrootReadWrite for more info. |
| """ |
| |
| def __init__(self, path: Union[str, os.PathLike] = "/") -> None: |
| """Initialize a ChrootReadOnly context manager. |
| |
| Args: |
| path: The chroot mount point. |
| """ |
| super().__init__(writable=False, path=path) |