| # Copyright 2015 The ChromiumOS Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Handle path inference and translation.""" |
| |
| import dataclasses |
| import enum |
| import os |
| from pathlib import Path |
| from typing import Callable, Iterator, List, Optional, Union |
| |
| from chromite.lib import chromite_config |
| from chromite.lib import constants |
| from chromite.lib import cros_build_lib |
| from chromite.lib import git |
| from chromite.lib import osutils |
| from chromite.utils import memoize |
| from chromite.utils import xdg_util |
| |
| |
| GENERAL_CACHE_DIR = ".cache" |
| CHROME_CACHE_DIR = "cros_cache" |
| GENERAL_CONFIG_DIR = ".config" |
| |
| |
| class CheckoutType(enum.IntEnum): |
| """The checkout type chromite is running under.""" |
| |
| # A citc checkout. |
| CITC = enum.auto() |
| |
| # A Chromium browser checkout. |
| GCLIENT = enum.auto() |
| |
| # A standard CrOS checkout using repo. |
| REPO = enum.auto() |
| |
| # We don't know what kind of checkout this is. |
| UNKNOWN = enum.auto() |
| |
| |
| @dataclasses.dataclass |
| class CheckoutInfo: |
| """A container which describes the source checkout. |
| |
| Call DetermineCheckout to auto-detect the checkout. |
| """ |
| |
| type: CheckoutType |
| # TODO: Change these to a Path. |
| root: Optional[str] |
| chrome_src_dir: Optional[str] |
| |
| @property |
| def tracks_main(self) -> bool: |
| """True if it looks like the checkout roughly tracks the main branch.""" |
| # For now, we assume CitC always tracks main. |
| if self.type == CheckoutType.CITC: |
| return True |
| # Browser checkouts don't track our main branch (different tree). |
| if self.type == CheckoutType.GCLIENT: |
| return False |
| if not self.root: |
| return False |
| repo_manifest = git.ManifestCheckout.Cached(self.root) |
| if repo_manifest.manifest_branch in ("snapshot", "main"): |
| return True |
| return False |
| |
| @property |
| def repo_binary(self) -> Optional[Path]: |
| """The path to the repo tool for this checkout.""" |
| if self.type != CheckoutType.REPO or not self.root: |
| return None |
| return Path(self.root) / ".repo" / "repo" / "repo" |
| |
| |
| class ChrootPathResolver: |
| """Perform path resolution to/from the chroot. |
| |
| Attributes: |
| source_path: Value to override default source root inference. |
| source_from_path_repo: Whether to infer the source root from the |
| converted path's repo parent during inbound translation; overrides |
| |source_path|. |
| chroot_path: Full path of the chroot to use. If chroot_path is |
| specified, source_path cannot be specified. |
| out_path: Full path of the output directory to use. |
| """ |
| |
| # When chroot_path is specified, it is assumed that any reference to |
| # the chroot mount point (/mnt/host/source) points back to the |
| # inferred source root determined by constants.SOURCE_ROOT. For example, |
| # assuming: |
| # constants.SOURCE_ROOT == /workspace/checkout/ |
| # and |
| # chroot_path = /custom/chroot/path : |
| # |
| # FromChroot('/mnt/host/source/my/file') -> /workspace/checkout/my/file |
| # FromChroot('/some/other/file') -> /custom/chroot/path/some/other/file |
| # ToChroot('/workspace/checkout/file') -> /mnt/host/source/file |
| # ToChroot('/custom/checkout/chroot/this/file') -> /this/file |
| |
| def __init__( |
| self, |
| source_path: Optional[Union[str, os.PathLike]] = None, |
| source_from_path_repo: bool = True, |
| chroot_path: Optional[Union[str, os.PathLike]] = None, |
| out_path: Optional[os.PathLike] = None, |
| ) -> None: |
| if chroot_path and source_path: |
| raise AssertionError( |
| "Either source_path or chroot_path must be specified" |
| ) |
| if out_path and source_path: |
| raise AssertionError( |
| "Either source_path or out_path must be specified" |
| ) |
| self._inside_chroot = cros_build_lib.IsInsideChroot() |
| self._source_from_path_repo = source_from_path_repo |
| self._custom_chroot_path = chroot_path |
| self._source_path = ( |
| constants.SOURCE_ROOT if source_path is None else source_path |
| ) |
| |
| # The following are only needed if outside the chroot. |
| if self._inside_chroot: |
| self._chroot_path = None |
| self._chroot_to_host_roots = None |
| self._out_path = None |
| else: |
| self._chroot_path = self._GetSourcePathChroot( |
| self._source_path, self._custom_chroot_path |
| ) |
| if out_path is not None: |
| self._out_path = out_path |
| elif self._source_path is not None: |
| self._out_path = ( |
| Path(self._source_path) / constants.DEFAULT_OUT_DIR |
| ) |
| else: |
| self._out_path = constants.DEFAULT_OUT_PATH |
| |
| # Initialize mapping of known root bind mounts. |
| self._chroot_to_host_roots = ( |
| (constants.CHROOT_SOURCE_ROOT, self._source_path), |
| (constants.CHROOT_CACHE_ROOT, self._GetCachePath), |
| ("/tmp", self._out_path / "tmp"), |
| ("/home", self._out_path / "home"), |
| ("/build", self._out_path / "build"), |
| ("/run", self._out_path / "sdk" / "run"), |
| ("/var/cache", self._out_path / "sdk" / "cache"), |
| ("/var/log", self._out_path / "sdk" / "logs"), |
| ("/var/tmp", self._out_path / "sdk" / "tmp"), |
| ("/usr/local/bin", self._out_path / "sdk" / "bin"), |
| (constants.CHROOT_OUT_ROOT, self._out_path), |
| ) |
| |
| @classmethod |
| @memoize.MemoizedSingleCall |
| def _GetCachePath(cls) -> str: |
| """Returns the cache directory.""" |
| return os.path.realpath(GetCacheDir()) |
| |
| def _GetSourcePathChroot( |
| self, |
| source_path: Optional[str], |
| custom_chroot_path: Optional[str] = None, |
| ) -> Optional[str]: |
| """Returns path to the chroot directory of a given source root.""" |
| if custom_chroot_path: |
| return custom_chroot_path |
| if source_path is None: |
| return None |
| return os.path.join(source_path, constants.DEFAULT_CHROOT_DIR) |
| |
| def _TranslatePath( |
| self, |
| path: str, |
| src_root: Union[os.PathLike, str], |
| dst_root_input: Union[ |
| Callable[[], Optional[Union[str, os.PathLike]]], |
| Optional[Union[str, os.PathLike]], |
| ], |
| ) -> Optional[str]: |
| """If |path| starts with |src_root|, replace it using |dst_root_input|. |
| |
| Args: |
| path: An absolute path we want to convert to a destination |
| equivalent. |
| src_root: The root that path needs to be contained in. |
| dst_root_input: The root we want to relocate the relative path into, |
| or a function returning this value. |
| |
| Returns: |
| A translated path, or None if |src_root| is not a prefix of |path|. |
| |
| Raises: |
| ValueError: If |src_root| is a prefix but |dst_root_input| yields |
| None, which means we don't have sufficient information to do the |
| translation. |
| """ |
| if not src_root: |
| raise ValueError("No source root to translate path from") |
| if not osutils.IsSubPath(path, src_root): |
| return None |
| dst_root = ( |
| dst_root_input() if callable(dst_root_input) else dst_root_input |
| ) |
| if dst_root is None: |
| raise ValueError("No target root to translate path to") |
| return str( |
| dst_root |
| / Path(path).absolute().relative_to(Path(src_root).absolute()) |
| ) |
| |
| def _GetChrootPath(self, path) -> str: |
| """Translates a fully-expanded host |path| into a chroot equivalent. |
| |
| This checks path prefixes in order from the most to least "contained": |
| the chroot itself, then the cache directory, and finally the source |
| tree. The idea is to return the shortest possible chroot equivalent. |
| |
| Args: |
| path: A host path to translate. |
| |
| Returns: |
| An equivalent chroot path. |
| |
| Raises: |
| ValueError: If |path| is not reachable from the chroot. |
| """ |
| # Preliminary: compute the actual source and chroot paths to use. These |
| # are generally the precomputed values, unless we're inferring the |
| # source root from the path itself. |
| source_path = self._source_path |
| chroot_path = self._chroot_path |
| |
| if self._custom_chroot_path is None and self._source_from_path_repo: |
| path_repo_dir = git.FindRepoDir(path) |
| if path_repo_dir is not None: |
| source_path = os.path.abspath(os.path.join(path_repo_dir, "..")) |
| chroot_path = self._GetSourcePathChroot(source_path) |
| |
| # NB: This mirrors self._chroot_to_host_roots, with tweaks due to |
| # per-|path| dynamic handling of |self._source_from_path_repo|. If you |
| # update one, you might need to update both. |
| host_to_chroot_roots = ( |
| # Check if the path happens to be in the chroot already. |
| (chroot_path, "/"), |
| # Check the cache directory. |
| (self._GetCachePath(), constants.CHROOT_CACHE_ROOT), |
| (self._out_path / "tmp", "/tmp"), |
| (self._out_path / "home", "/home"), |
| (self._out_path / "build", "/build"), |
| (self._out_path / "sdk" / "run", "/run"), |
| (self._out_path / "sdk" / "cache", "/var/cache"), |
| (self._out_path / "sdk" / "logs", "/var/log"), |
| (self._out_path / "sdk" / "tmp", "/var/tmp"), |
| (self._out_path / "sdk" / "bin", "/usr/local/bin"), |
| (self._out_path, constants.CHROOT_OUT_ROOT), |
| # Check the current SDK checkout tree. |
| (source_path, constants.CHROOT_SOURCE_ROOT), |
| ) |
| |
| for src_root, dst_root in host_to_chroot_roots: |
| if src_root is None: |
| continue |
| new_path = self._TranslatePath(path, src_root, dst_root) |
| if new_path is not None: |
| return new_path |
| |
| raise ValueError("Path is not reachable from the chroot") |
| |
| def _GetHostPath(self, path) -> str: |
| """Translates a fully-expanded chroot |path| into a host equivalent. |
| |
| We first attempt translation of known roots (source). If any is |
| successful, we check whether the result happens to point back to the |
| chroot, in which case we trim the chroot path prefix and recurse. If |
| neither was successful, just prepend the chroot path. |
| |
| Args: |
| path: A chroot path to translate. |
| |
| Returns: |
| An equivalent host path. |
| |
| Raises: |
| ValueError: If |path| could not be mapped to a proper host |
| destination. |
| """ |
| new_path = None |
| |
| # Attempt resolution of known roots. |
| for src_root, dst_root in self._chroot_to_host_roots: |
| new_path = self._TranslatePath(path, src_root, dst_root) |
| if new_path is not None: |
| break |
| |
| if new_path is None: |
| # If no known root was identified, just prepend the chroot path. |
| new_path = self._TranslatePath(path, "/", self._chroot_path) |
| else: |
| # Check whether the resolved path happens to point back at the |
| # chroot, in which case trim the chroot path and continue |
| # recursively. |
| path = self._TranslatePath(new_path, self._chroot_path, "/") |
| if path is not None: |
| new_path = self._GetHostPath(path) |
| |
| return new_path |
| |
| def _ConvertPath(self, path, get_converted_path, inbound: bool) -> str: |
| """Expands |path|; if outside the chroot, applies |get_converted_path|. |
| |
| Args: |
| path: A path to be converted. |
| get_converted_path: A conversion function. |
| inbound: Whether paths are being translated into the chroot (vs out |
| of the chroot). |
| |
| Returns: |
| An expanded and (if needed) converted path. |
| |
| Raises: |
| ValueError: If path conversion failed. |
| """ |
| # NOTE: We do not want to expand wrapper script symlinks because this |
| # prevents them from working. Therefore, if the path points to a file we |
| # only resolve its dirname but leave the basename intact. This means our |
| # path resolution might return unusable results for file symlinks that |
| # point outside the reachable space. These are edge cases in which the |
| # user is expected to resolve the realpath themselves in advance. |
| # |
| # And, expansion makes no sense on outbound, since the input path (an |
| # "inside chroot" path) should not be resolved using the outside-chroot |
| # filesystem. |
| if inbound: |
| expanded_path = os.path.expanduser(path) |
| if os.path.isfile(expanded_path): |
| expanded_path = os.path.join( |
| os.path.realpath(os.path.dirname(expanded_path)), |
| os.path.basename(expanded_path), |
| ) |
| else: |
| expanded_path = os.path.realpath(expanded_path) |
| else: |
| expanded_path = path |
| |
| if self._inside_chroot: |
| return expanded_path |
| |
| try: |
| return get_converted_path(expanded_path) |
| except ValueError as e: |
| raise ValueError("%s: %s" % (e, path)) |
| |
| def ToChroot(self, path: Union[str, os.PathLike]) -> str: |
| """Resolves current environment |path| for use in the chroot.""" |
| return self._ConvertPath(path, self._GetChrootPath, inbound=True) |
| |
| def FromChroot(self, path: Union[str, os.PathLike]) -> str: |
| """Resolves chroot |path| for use in the current environment.""" |
| return os.path.abspath( |
| self._ConvertPath(path, self._GetHostPath, inbound=False) |
| ) |
| |
| |
| def DetermineCheckout( |
| search_path: Optional[Union[str, "os.PathLike[str]"]] = None |
| ) -> CheckoutInfo: |
| """Gather information on the checkout we are in. |
| |
| There are several checkout types, as defined by CheckoutType. |
| This function determines what checkout type the provided path is in, for |
| example, it may belong to a `repo` checkout. |
| |
| Args: |
| search_path: The path to a checkout or any file or directory in it to |
| look upwards from. |
| |
| Returns: |
| CheckoutInfo object with these attributes: |
| type: The type of checkout. Valid values are CheckoutType. |
| root: The root of the checkout. |
| chrome_src_dir: If the checkout is a Chrome checkout, the path to |
| the Chrome src/ directory. |
| """ |
| checkout_type = CheckoutType.UNKNOWN |
| root, path = None, None |
| |
| search_path = search_path or constants.SOURCE_ROOT |
| for path in osutils.IteratePathParents(search_path): |
| if (path / ".gclient").exists(): |
| checkout_type = CheckoutType.GCLIENT |
| break |
| if (path / ".repo").is_dir(): |
| checkout_type = CheckoutType.REPO |
| break |
| if (path.parent / ".citc").is_dir(): |
| checkout_type = CheckoutType.CITC |
| break |
| |
| if ( |
| checkout_type == CheckoutType.UNKNOWN |
| and cros_build_lib.IsInsideChroot() |
| and (constants.SOURCE_ROOT / ".supermanifest").exists() |
| ): |
| # We can safely assume .repo and .gclient are in the mounted checkout in |
| # the chroot, but .citc will not be since it's in the host's parent path |
| # and doesn't get mounted. |
| checkout_type = CheckoutType.CITC |
| |
| if checkout_type != CheckoutType.UNKNOWN: |
| # TODO(vapier): Change this function to pathlib Path. |
| root = str(path) |
| |
| # Determine the chrome src directory. |
| chrome_src_dir = None |
| if checkout_type == CheckoutType.GCLIENT: |
| chrome_src_dir = os.path.join(root, "src") |
| |
| return CheckoutInfo(checkout_type, root, chrome_src_dir) |
| |
| |
| def get_global_cog_base_dir() -> Path: |
| """Returns the base directory for cog output.""" |
| return xdg_util.STATE_HOME / "cros" / "cog" |
| |
| |
| def get_global_cache_dir() -> Path: |
| """Returns the global cache directory location.""" |
| return xdg_util.CACHE_HOME / "cros" / "chromite" |
| |
| |
| def FindCacheDir() -> str: |
| """Returns the cache directory location based on the checkout type.""" |
| return str(find_cache_dir()) |
| |
| |
| def find_cache_dir() -> Path: |
| """Returns the cache directory Path based on the checkout type.""" |
| checkout = DetermineCheckout() |
| if checkout.type == CheckoutType.REPO: |
| return Path(checkout.root) / GENERAL_CACHE_DIR |
| elif checkout.type == CheckoutType.GCLIENT: |
| return Path(checkout.chrome_src_dir) / "build" / CHROME_CACHE_DIR |
| elif checkout.type == CheckoutType.CITC: |
| return get_global_cog_base_dir() / "cache" |
| elif checkout.type == CheckoutType.UNKNOWN: |
| return get_global_cache_dir() |
| else: |
| raise AssertionError("Unexpected type %s" % checkout.type) |
| |
| |
| def GetCacheDir() -> str: |
| """Returns the current cache dir.""" |
| return os.environ.get(constants.SHARED_CACHE_ENVVAR, FindCacheDir()) |
| |
| |
| def get_cache_dir() -> Path: |
| """Returns the current cache dir Path.""" |
| return Path(GetCacheDir()) |
| |
| |
| def get_global_config_dir() -> Path: |
| """Returns a config directory location which is shared on host machine. |
| |
| Any usage of this directory will be shared across any number of checkouts on |
| a given host machine. |
| |
| Returns: |
| A Path to a global config dir. |
| """ |
| return chromite_config.DIR |
| |
| |
| def find_config_dir_for_checkout() -> Path: |
| """Returns a config directory Path based on the checkout type. |
| |
| Paths provided from this method are unique to each checkout. For use cases |
| where a config file is expected to apply to all operations on a given host, |
| regardless of checkout location or type, please use get_global_config_dir() |
| or chromite_config.DIR directly. |
| |
| Returns: |
| A checkout-specific Path for config files. |
| """ |
| checkout = DetermineCheckout() |
| if checkout.type == CheckoutType.REPO: |
| return Path(checkout.root) / GENERAL_CONFIG_DIR |
| elif checkout.type == CheckoutType.GCLIENT: |
| return Path(checkout.chrome_src_dir) / "build" / GENERAL_CONFIG_DIR |
| elif checkout.type == CheckoutType.CITC: |
| # Cog checkouts shouldn't contain config files. We still need them to be |
| # separated by checkout; so, we provide a unique config path for each |
| # checkout. |
| return get_global_config_dir() / "cog" / read_workspace_id() |
| elif checkout.type == CheckoutType.UNKNOWN: |
| # Unknown checkout types do not have a known source root or identifier, |
| # so we specify a subdir to prevent conflating checkout-specific configs |
| # with global-to-host-machine chromite configs. |
| return get_global_config_dir() / "unknown_checkout" |
| else: |
| raise AssertionError("Unexpected type %s" % checkout.type) |
| |
| |
| def get_log_dir() -> Path: |
| """Return the log dir.""" |
| log_dir = "/var/log" |
| if cros_build_lib.IsInsideChroot(): |
| return Path(log_dir) |
| else: |
| return Path(FromChrootPath(log_dir)) |
| |
| |
| def ToChrootPath( |
| path: Optional[Union[str, os.PathLike]], |
| source_path: Optional[Union[str, os.PathLike]] = None, |
| chroot_path: Optional[Union[str, os.PathLike]] = None, |
| out_path: Optional[os.PathLike] = None, |
| ) -> str: |
| """Resolves current environment |path| for use in the chroot. |
| |
| Args: |
| path: string path to translate into chroot namespace. |
| source_path: string path to root of source checkout with chroot in it. |
| chroot_path: string name of the full chroot path to use. |
| out_path: Path name of the full out path to use. |
| |
| Returns: |
| The same path converted to "inside chroot" namespace. |
| |
| Raises: |
| ValueError: If the path references a location not available in the |
| chroot. |
| """ |
| return ChrootPathResolver( |
| source_path=source_path, chroot_path=chroot_path, out_path=out_path |
| ).ToChroot(path) |
| |
| |
| def FromChrootPath( |
| path: Optional[Union[str, os.PathLike]], |
| source_path: Optional[Union[str, os.PathLike]] = None, |
| chroot_path: Optional[Union[str, os.PathLike]] = None, |
| out_path: Optional[os.PathLike] = None, |
| ) -> str: |
| """Resolves chroot |path| for use in the current environment. |
| |
| Args: |
| path: string path to translate out of chroot namespace. |
| source_path: string path to root of source checkout with chroot in it. |
| chroot_path: string name of the full chroot path to use |
| out_path: Path name of the full out path to use |
| |
| Returns: |
| The same path converted to "outside chroot" namespace. |
| """ |
| return ChrootPathResolver( |
| source_path=source_path, chroot_path=chroot_path, out_path=out_path |
| ).FromChroot(path) |
| |
| |
| def normalize_paths_to_source_root( |
| source_paths: List[str], source_root: Path = constants.SOURCE_ROOT |
| ) -> List[str]: |
| """Return the "normalized" list of source paths relative to |source_root|. |
| |
| Normalizing includes: |
| * Sorting the source paths in alphabetical order. |
| * Remove paths that are sub-path of others in the source paths. |
| * Ensure all the directory path strings are ended with the trailing '/'. |
| * Convert all the path from absolute paths to relative path (relative to |
| the |source_root|). |
| """ |
| for i, path in enumerate(source_paths): |
| assert os.path.isabs(path), "path %s is not an aboslute path" % path |
| source_paths[i] = os.path.normpath(path) |
| |
| source_paths.sort() |
| |
| results = [] |
| |
| for i, path in enumerate(source_paths): |
| is_subpath_of_other = False |
| for j, other in enumerate(source_paths): |
| if j != i and osutils.IsSubPath(path, other): |
| is_subpath_of_other = True |
| if not is_subpath_of_other: |
| if os.path.isdir(path) and not path.endswith("/"): |
| path += "/" |
| path = os.path.relpath(path, source_root) |
| results.append(path) |
| |
| return results |
| |
| |
| def ExpandDirectories(files: List[Path]) -> Iterator[Path]: |
| """Expand a list of files and directories to be files only. |
| |
| This function is intended to be called by tools which take a list of file |
| paths (e.g., cros format and cros lint), where expansion of directories |
| passed in would be useful. If a directory is located inside a git |
| checkout, any gitignore'd files will be respected (by means of using |
| "git ls-files"). |
| |
| Args: |
| files: The list of files to process. |
| |
| Yields: |
| Paths to files. |
| """ |
| for f in files: |
| if f.is_dir(): |
| if git.FindGitTopLevel(f): |
| yield from git.LsFiles(files=[f], untracked=True) |
| else: |
| yield from (x for x in f.rglob("*") if x.is_file()) |
| else: |
| yield f |
| |
| |
| def read_workspace_id() -> str: |
| """Read citc workspace ID for path construction. |
| |
| From the host, the file should be available in the checkout's path and will |
| be read. From the chroot, the contents of workspace_id will be read and |
| added to /etc/env.d/99chromiumos as a chroot-specific env var. |
| |
| Returns: |
| The current workspace's ID (in the form of ${USER}/<int>). |
| """ |
| workspace_id = os.getenv("CROS_COG_WORKSPACE_ID") |
| if workspace_id: |
| return workspace_id |
| cros_build_lib.AssertOutsideChroot() |
| checkout = DetermineCheckout() |
| assert checkout.type == CheckoutType.CITC |
| id_path = Path(checkout.root).parent / ".citc" / "workspace_id" |
| return id_path.read_text(encoding="utf-8") |
| |
| |
| def is_citc_checkout() -> bool: |
| """Determine if the checkout is a Cog/citc checkout. |
| |
| Returns: |
| True if a citc checkout, false otherwise. |
| """ |
| checkout = DetermineCheckout() |
| return checkout.type == CheckoutType.CITC |
| |
| |
| def get_citc_workspace_path() -> Path: |
| """Get the base path for stateful files in a citc checkout. |
| |
| Returns: |
| Path to workspace-specific files (e.g. chroot, config files). |
| """ |
| # If running in a citc client, set default output paths to ~/. |
| return get_global_cog_base_dir() / "workspaces" / read_workspace_id() |
| |
| |
| def get_citc_chroot_path() -> Path: |
| """Get path to chroot filesystem for a citc workspace. |
| |
| Returns: |
| Path to chroot/ folder for a given workspace. |
| """ |
| return get_citc_workspace_path() / constants.DEFAULT_CHROOT_DIR |
| |
| |
| def get_citc_out_path() -> Path: |
| """Get path to chroot build artifacts for a citc workspace. |
| |
| Returns: |
| Path to out/ folder for a given workspace. |
| """ |
| return get_citc_workspace_path() / constants.DEFAULT_OUT_DIR |