blob: 6e1cc5cf8ee77c05dd626eb1eee96ae63b04171f [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Handle path inference and translation."""
from __future__ import print_function
import collections
import os
import tempfile
from chromite.lib import constants
from chromite.lib import cros_build_lib
from chromite.lib import git
from chromite.lib import osutils
from chromite.utils import memoize
CHROME_CACHE_DIR = 'cros_cache'
CheckoutInfo = collections.namedtuple(
'CheckoutInfo', ['type', 'root', 'chrome_src_dir'])
class ChrootPathResolver(object):
"""Perform path resolution to/from the chroot.
source_path: Value to override default source root inference.
source_from_path_repo: Whether to infer the source root from the converted
path's repo parent during inbound translation; overrides |source_path|.
# TODO(garnold) We currently infer the source root based on the path's own
# encapsulating repository. This is a heuristic catering to paths are being
# translated to be used in a chroot that's not associated with the currently
# executing code (for example, cbuildbot run on a build root or a foreign
# tree checkout). This approach might result in arbitrary repo-contained
# paths being translated to invalid chroot paths where they actually should
# not, and other valid source paths failing to translate because they are not
# repo-contained. Eventually we'll want to make this behavior explicit, by
# either passing a source_root value, or requesting to infer it from the path
# (source_from_path_repo=True), but otherwise defaulting to the executing
# code's source root in the normal case. When that happens, we'll be
# switching source_from_path_repo to False by default. See chromium:485746.
def __init__(self, source_path=None, source_from_path_repo=True):
self._inside_chroot = cros_build_lib.IsInsideChroot()
self._source_path = (constants.SOURCE_ROOT if source_path is None
else source_path)
self._source_from_path_repo = source_from_path_repo
# The following are only needed if outside the chroot.
if self._inside_chroot:
self._chroot_path = None
self._chroot_link = None
self._chroot_to_host_roots = None
self._chroot_path = self._GetSourcePathChroot(self._source_path)
# The chroot link allows us to resolve paths when the chroot is symlinked
# to the default location. This is generally not used, but it is useful
# for CI for optimization purposes. We will trust them not to do something
# dumb, like symlink to /, but this doesn't enable that kind of behavior
# anyway, just allows resolving paths correctly from outside the chroot.
self._chroot_link = self._ReadChrootLink(self._chroot_path)
# Initialize mapping of known root bind mounts.
self._chroot_to_host_roots = (
(constants.CHROOT_SOURCE_ROOT, self._source_path),
(constants.CHROOT_CACHE_ROOT, self._GetCachePath),
def _GetCachePath(cls):
"""Returns the cache directory."""
return os.path.realpath(GetCacheDir())
def _GetSourcePathChroot(self, source_path):
"""Returns path to the chroot directory of a given source root."""
if source_path is None:
return None
return os.path.join(source_path, constants.DEFAULT_CHROOT_DIR)
def _ReadChrootLink(self, path):
"""Convert a chroot symlink to its absolute path.
This contains defaults/edge cases assumptions for chroot paths. Not
recommended for non-chroot paths.
path (str|None): The path to resolve.
str|None: The resolved path if the provided path is a symlink, None
# Mainly for the "if self._source_from_path_repo:" branch in _GetChrootPath.
# _GetSourcePathChroot can return None, so double check it here.
if not path:
return None
abs_path = os.path.abspath(path)
link = osutils.ResolveSymlink(abs_path)
# ResolveSymlink returns the passed path when the path isn't a symlink. We
# can skip some redundant work when its falling back on the link when the
# chroot is not a symlink.
if link == abs_path:
return None
return link
def _TranslatePath(self, path, src_root, dst_root_input):
"""If |path| starts with |src_root|, replace it using |dst_root_input|.
path: An absolute path we want to convert to a destination equivalent.
src_root: The root that path needs to be contained in.
dst_root_input: The root we want to relocate the relative path into, or a
function returning this value.
A translated path, or None if |src_root| is not a prefix of |path|.
ValueError: If |src_root| is a prefix but |dst_root_input| yields None,
which means we don't have sufficient information to do the translation.
if not path.startswith(os.path.join(src_root, '')) and path != src_root:
return None
dst_root = dst_root_input() if callable(dst_root_input) else dst_root_input
if dst_root is None:
raise ValueError('No target root to translate path to')
return os.path.join(dst_root, path[len(src_root):].lstrip(os.path.sep))
def _GetChrootPath(self, path):
"""Translates a fully-expanded host |path| into a chroot equivalent.
This checks path prefixes in order from the most to least "contained": the
chroot itself, then the cache directory, and finally the source tree. The
idea is to return the shortest possible chroot equivalent.
path: A host path to translate.
An equivalent chroot path.
ValueError: If |path| is not reachable from the chroot.
new_path = None
# Preliminary: compute the actual source and chroot paths to use. These are
# generally the precomputed values, unless we're inferring the source root
# from the path itself.
source_path = self._source_path
chroot_path = self._chroot_path
chroot_link = self._chroot_link
if self._source_from_path_repo:
path_repo_dir = git.FindRepoDir(path)
if path_repo_dir is not None:
source_path = os.path.abspath(os.path.join(path_repo_dir, '..'))
chroot_path = self._GetSourcePathChroot(source_path)
chroot_link = self._ReadChrootLink(chroot_path)
# First, check if the path happens to be in the chroot already.
if chroot_path is not None:
new_path = self._TranslatePath(path, chroot_path, '/')
# Or in the symlinked dir.
if new_path is None and chroot_link is not None:
new_path = self._TranslatePath(path, chroot_link, '/')
# Second, check the cache directory.
if new_path is None:
new_path = self._TranslatePath(path, self._GetCachePath(),
# Finally, check the current SDK checkout tree.
if new_path is None and source_path is not None:
new_path = self._TranslatePath(path, source_path,
if new_path is None:
raise ValueError('Path is not reachable from the chroot')
return new_path
def _GetHostPath(self, path):
"""Translates a fully-expanded chroot |path| into a host equivalent.
We first attempt translation of known roots (source). If any is successful,
we check whether the result happens to point back to the chroot, in which
case we trim the chroot path prefix and recurse. If neither was successful,
just prepend the chroot path.
path: A chroot path to translate.
An equivalent host path.
ValueError: If |path| could not be mapped to a proper host destination.
new_path = None
# Attempt resolution of known roots.
for src_root, dst_root in self._chroot_to_host_roots:
new_path = self._TranslatePath(path, src_root, dst_root)
if new_path is not None:
if new_path is None:
# If no known root was identified, just prepend the chroot path.
new_path = self._TranslatePath(path, '', self._chroot_path)
# Check whether the resolved path happens to point back at the chroot, in
# which case trim the chroot path or link prefix and continue recursively.
path = self._TranslatePath(new_path, self._chroot_path, '/')
if path is None and self._chroot_link:
path = self._TranslatePath(new_path, self._chroot_link, '/')
if path is not None:
new_path = self._GetHostPath(path)
return new_path
def _ConvertPath(self, path, get_converted_path):
"""Expands |path|; if outside the chroot, applies |get_converted_path|.
path: A path to be converted.
get_converted_path: A conversion function.
An expanded and (if needed) converted path.
ValueError: If path conversion failed.
# NOTE: We do not want to expand wrapper script symlinks because this
# prevents them from working. Therefore, if the path points to a file we
# only resolve its dirname but leave the basename intact. This means our
# path resolution might return unusable results for file symlinks that
# point outside the reachable space. These are edge cases in which the user
# is expected to resolve the realpath themselves in advance.
expanded_path = os.path.expanduser(path)
if os.path.isfile(expanded_path):
expanded_path = os.path.join(
expanded_path = os.path.realpath(expanded_path)
if self._inside_chroot:
return expanded_path
return get_converted_path(expanded_path)
except ValueError as e:
raise ValueError('%s: %s' % (e, path))
def ToChroot(self, path):
"""Resolves current environment |path| for use in the chroot."""
return self._ConvertPath(path, self._GetChrootPath)
def FromChroot(self, path):
"""Resolves chroot |path| for use in the current environment."""
return self._ConvertPath(path, self._GetHostPath)
def DetermineCheckout(cwd=None):
"""Gather information on the checkout we are in.
There are several checkout types, as defined by CHECKOUT_TYPE_XXX variables.
This function determines what checkout type |cwd| is in, for example, if |cwd|
belongs to a `repo` checkout.
A CheckoutInfo object with these attributes:
type: The type of checkout. Valid values are CHECKOUT_TYPE_*.
root: The root of the checkout.
chrome_src_dir: If the checkout is a Chrome checkout, the path to the
Chrome src/ directory.
root, path = None, None
cwd = cwd or os.getcwd()
for path in osutils.IteratePathParents(cwd):
gclient_file = os.path.join(path, '.gclient')
if os.path.exists(gclient_file):
repo_dir = os.path.join(path, '.repo')
if os.path.isdir(repo_dir):
checkout_type = CHECKOUT_TYPE_REPO
if checkout_type != CHECKOUT_TYPE_UNKNOWN:
root = path
# Determine the chrome src directory.
chrome_src_dir = None
if checkout_type == CHECKOUT_TYPE_GCLIENT:
chrome_src_dir = os.path.join(root, 'src')
return CheckoutInfo(checkout_type, root, chrome_src_dir)
def FindCacheDir():
"""Returns the cache directory location based on the checkout type."""
checkout = DetermineCheckout()
if checkout.type == CHECKOUT_TYPE_REPO:
return os.path.join(checkout.root, GENERAL_CACHE_DIR)
elif checkout.type == CHECKOUT_TYPE_GCLIENT:
return os.path.join(checkout.chrome_src_dir, 'build', CHROME_CACHE_DIR)
elif checkout.type == CHECKOUT_TYPE_UNKNOWN:
return os.path.join(tempfile.gettempdir(), 'chromeos-cache')
raise AssertionError('Unexpected type %s' % checkout.type)
def GetCacheDir():
"""Returns the current cache dir."""
return os.environ.get(constants.SHARED_CACHE_ENVVAR, FindCacheDir())
def ToChrootPath(path, source_path=None):
"""Resolves current environment |path| for use in the chroot.
path: string path to translate into chroot namespace.
source_path: string path to root of source checkout with chroot in it.
The same path converted to "inside chroot" namespace.
ValueError: If the path references a location not available in the chroot.
return ChrootPathResolver(source_path=source_path).ToChroot(path)
def FromChrootPath(path, source_path=None):
"""Resolves chroot |path| for use in the current environment.
path: string path to translate out of chroot namespace.
source_path: string path to root of source checkout with chroot in it.
The same path converted to "outside chroot" namespace.
return ChrootPathResolver(source_path=source_path).FromChroot(path)