# Copyright 2015 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Utilities to create sysroots."""

import logging
import multiprocessing
import os
from pathlib import Path
from typing import (
    Any,
    Dict,
    Iterable,
    List,
    Optional,
    Tuple,
    TYPE_CHECKING,
    Union,
)

from chromite.lib import constants
from chromite.lib import cros_build_lib
from chromite.lib import locking
from chromite.lib import osutils
from chromite.lib import portage_util
from chromite.lib import toolchain
from chromite.lib.parser import package_info


if TYPE_CHECKING:
    from chromite.lib import chroot_lib
    from chromite.lib import toolchain_list


class ConfigurationError(Exception):
    """Raised when an invalid configuration is found."""


CACHED_FIELD_PROFILE_OVERRIDE = "PROFILE_OVERRIDE"
STANDARD_FIELD_PORTDIR_OVERLAY = "PORTDIR_OVERLAY"
STANDARD_FIELD_CHOST = "CHOST"
STANDARD_FIELD_BOARD_OVERLAY = "BOARD_OVERLAY"
STANDARD_FIELD_BOARD_USE = "BOARD_USE"
STANDARD_FIELD_ARCH = "ARCH"

DEFAULT_PROFILE = "base"

_PORTAGE_WRAPPER_TEMPLATE = """#!/bin/sh
# Generated by chromite/lib/sysroot_lib.py.
exec "{wrapper_cmd}" \
  --build-target "{build_target}" \
  --chost "{chost}" \
  --sysroot "{sysroot}" \
  {command} \
  -- \
  {args} \
  "$@"
"""

_BOARD_WRAPPER_TEMPLATE = """#!/bin/sh
exec {command} --board="{board}" "$@"
"""

_BOARD_WRAPPER_DEPRECATED_CMD_TEMPLATE = """#!/bin/sh
echo "{deprecated}"
exec {command} --board="{board}" "$@"
"""

_BUILD_TARGET_WRAPPER_TEMPLATE = """#!/bin/sh
exec {command} --build-target="{build_target}" "$@"
"""

_PKGCONFIG_WRAPPER_TEMPLATE = """#!/bin/bash

PKG_CONFIG_LIBDIR=$(printf '%s:' "{sysroot}"/usr/*/pkgconfig)
export PKG_CONFIG_LIBDIR

export PKG_CONFIG_SYSROOT_DIR="{sysroot}"

# Portage will get confused and try to "help" us by exporting this.
# Undo that logic.
unset PKG_CONFIG_PATH

# TODO: Consider using pkgconf cross-personalities instead
# See https://github.com/pkgconf/pkgconf/issues/264
export PKG_CONFIG_SYSTEM_INCLUDE_PATH="/usr/include:{sysroot}/usr/include"

# Use full path to bypass automated wrapper checks that block `pkg-config`.
# https://crbug.com/985180
exec /usr/bin/pkg-config "$@"
"""

_wrapper_dir = "/usr/local/bin"

_IMPLICIT_SYSROOT_DEPS_KEY = "IMPLICIT_SYSROOT_DEPS"
_IMPLICIT_SYSROOT_DEPS = [
    "sys-kernel/linux-headers",
    "sys-libs/gcc-libs",
    "sys-libs/libcxx",
]

_MAKE_CONF = "etc/make.conf"
_MAKE_CONF_BOARD_SETUP = "etc/make.conf.board_setup"
_MAKE_CONF_BOARD = "etc/make.conf.board"
_MAKE_CONF_USER = "etc/make.conf.user"
_MAKE_CONF_HOST_SETUP = "etc/make.conf.host_setup"

_CACHE_PATH = "var/cache/edb/chromeos"

_CHROMIUMOS_OVERLAY = os.path.join(
    constants.CHROOT_SOURCE_ROOT, constants.CHROMIUMOS_OVERLAY_DIR
)
_CHROMIUMOS_CONFIG = os.path.join(_CHROMIUMOS_OVERLAY, "chromeos", "config")

_INTERNAL_BINHOST_DIR = os.path.join(
    constants.PRIVATE_BINHOST_CONF_DIR,
    "target",
)
_EXTERNAL_BINHOST_DIR = os.path.join(
    constants.PUBLIC_BINHOST_CONF_DIR,
    "target",
)

_CHROMEOS_INTERNAL_BOTO_PATH = os.path.join(
    constants.SOURCE_ROOT,
    "src",
    "private-overlays",
    "chromeos-overlay",
    "googlestorage_account.boto",
)

_ARCH_MAPPING = {
    "amd64": "amd64-generic",
    "x86": "x86-generic",
    "arm": "arm-generic",
    "arm64": "arm64-generic",
    "mips": "mipsel-o32-generic",
}


class Error(Exception):
    """Module base error class."""


# This error is meant to be used with `cros build-packages`.  This exists here
# so the setup_board (ToolchainInstallError) and `cros build-packages` errors
# exist in a common, sensible location.
class PackageInstallError(Error, cros_build_lib.RunCommandError):
    """An error installing packages."""

    def __init__(
        self,
        msg: str,
        result: "cros_build_lib.CompletedProcess",
        exception: BaseException = None,
        packages: Optional[Iterable[package_info.PackageInfo]] = None,
    ):
        """Init method.

        Args:
            msg: The message.
            result: The command result.
            exception: An origin exception.
            packages: The list of failed packages.
        """
        super().__init__(msg, result, exception)
        self.failed_packages = packages
        self.args = (self.args, packages)

    def Stringify(self, stdout: bool = True, stderr: bool = True) -> str:
        """Stringify override to include the failed package info.

        See:
          cros_build_lib.RunCommandError.Stringify
        """
        items = [super().Stringify(stdout, stderr)]

        pkgs = []
        for cpv in self.failed_packages:
            if cpv.cpf:
                pkgs.append(cpv.cpf)
            elif cpv.cp:
                pkgs.append(cpv.cp)
            elif cpv.package:
                pkgs.append(cpv.package)

        if pkgs:
            items.append("Failed Packages: %s" % " ".join(pkgs))

        return "\n".join(items)


class ToolchainInstallError(PackageInstallError):
    """An error when installing a toolchain package.

    Essentially identical to PackageInstallError, but has names that better
    reflect that the packages are toolchain packages.
    """

    def __init__(
        self,
        msg: str,
        result: "cros_build_lib.CompletedProcess",
        exception: BaseException = None,
        tc_info: Optional[Iterable[package_info.PackageInfo]] = None,
    ):
        """Init method.

        Args:
            msg: The message.
            result: The command result.
            exception: An origin exception.
            tc_info: The list of failed toolchain packages.
        """
        super().__init__(msg, result, exception, packages=tc_info)

    @property
    def failed_toolchain_info(
        self,
    ) -> Optional[Iterable[package_info.PackageInfo]]:
        return self.failed_packages


def _CreateWrapper(wrapper_path: str, template: str, **kwargs: Any) -> None:
    """Creates a wrapper from a given template.

    Args:
        wrapper_path: path to the wrapper.
        template: wrapper template.
        **kwargs: fields to be set in the template.
    """
    osutils.WriteFile(
        wrapper_path,
        template.format(**kwargs),
        makedirs=True,
        sudo=True,
        chmod=0o755,
    )


def _NotEmpty(filepath: str) -> bool:
    """Returns True if |filepath| is not empty.

    Args:
        filepath: path to a file.
    """
    return os.path.exists(filepath) and osutils.ReadFile(filepath).strip()


def _DictToKeyValue(dictionary: Dict) -> str:
    """Formats dictionary in to a key=value string.

    Args:
        dictionary: a python dictionary.
    """
    output = []
    for key in sorted(dictionary.keys()):
        output.append('%s="%s"' % (key, dictionary[key]))

    return "\n".join(output)


def _GetMakeConfGenericPath() -> str:
    """Get the path to the make.conf.generic-target file."""
    return os.path.join(_CHROMIUMOS_CONFIG, "make.conf.generic-target")


def _GetChrootMakeConfUserPath() -> str:
    """Get the path to the chroot's make.conf.user file."""
    return "/%s" % _MAKE_CONF_USER


class Profile:
    """Class that encapsulates the profile name for a sysroot."""

    def __init__(self, name: str = ""):
        self._name = name

    @property
    def name(self) -> str:
        return self._name

    def __eq__(self, other):
        if not isinstance(other, self.__class__):
            return False
        return self.name == other.name


class Sysroot:
    """Class that encapsulate the interaction with sysroots."""

    def __init__(self, path: Union[Path, str]):
        self.path = str(path)

        # Read config from _MAKE_CONF which also pulls in config from
        # _MAKE_CONF_BOARD_SETUP, but only write any config overrides directly
        # to _MAKE_CONF_BOARD_SETUP.
        self._config_file_read = self.Path(_MAKE_CONF)
        self._config_file_write = self.Path(_MAKE_CONF_BOARD_SETUP)

        self._cache_file = self.Path(_CACHE_PATH)
        self._cache_file_lock = self._cache_file + ".lock"

    def __eq__(self, other):
        """Equality check."""
        if not isinstance(other, self.__class__):
            return False
        return self.path == other.path

    def Exists(self, chroot: "chroot_lib.Chroot" = None) -> bool:
        """Check if the sysroot exists.

        Args:
            chroot: Optionally check if the sysroot exists inside the specified
                chroot.

        Returns:
            True if the sysroot exists.
        """
        if chroot:
            return chroot.has_path(self.path)

        return os.path.exists(self.path)

    def Path(self, *args: str) -> str:
        """Helper to build out a path within the sysroot.

        Pass args as if calling os.path.join().

        Args:
            *args: path components to join.

        Returns:
            The path within the sysroot.
        """
        return os.path.join(self.path, *args)

    def GetStandardField(self, field: str) -> Optional[Any]:
        """Returns the value of a standard field.

        Args:
            field: Field from the standard configuration file to get.
                One of STANDARD_FIELD_* from above.
        """
        # We want to source from within the config's directory as the config
        # itself may source other scripts using a relative path.
        with osutils.ChdirContext(Path(self._config_file_read).parent):
            return osutils.SourceEnvironment(
                self._config_file_read, [field], multiline=True
            ).get(field)

    def GetCachedField(self, field: str) -> Optional[str]:
        """Returns the value of |field| in the sysroot cache file.

        Access to the cache is thread-safe as long as we access it through this
        methods or the bash helper in common.sh.

        Args:
            field: name of the field.
        """
        if not os.path.exists(self._cache_file):
            return None

        with locking.FileLock(
            self._cache_file_lock, locktype=locking.FLOCK, world_writable=True
        ).read_lock():
            return osutils.SourceEnvironment(self._cache_file, [field]).get(
                field
            )

    def SetCachedField(self, field: str, value: Optional[str]):
        """Sets |field| to |value| in the sysroot cache file.

        Access to the cache is thread-safe as long as we access it through this
        methods or the bash helper in common.sh.

        Args:
            field: name of the field.
            value: value to set. If |value| is None, the field is unset.
        """
        # TODO(bsimonnet): add support for values with quotes and newlines.
        # crbug.com/476764.
        for symbol in '\n`$"\\':
            if value and symbol in value:
                raise ValueError(
                    'Cannot use \\n, `, $, \\ or " in cached value.'
                )

        with locking.FileLock(
            self._cache_file_lock, locktype=locking.FLOCK, world_writable=True
        ).write_lock():
            lines = []
            if os.path.exists(self._cache_file):
                lines = osutils.ReadFile(self._cache_file).splitlines()

                # Remove the old value for field if it exists.
                lines = [l for l in lines if not l.startswith(field + "=")]

            if value is not None:
                lines.append('%s="%s"' % (field, value))
            osutils.WriteFile(self._cache_file, "\n".join(lines), sudo=True)

    @property
    def build_target_name(self) -> str:
        """Get the name of the build target this sysroot was created for."""
        return self.GetStandardField(STANDARD_FIELD_BOARD_USE)

    @property
    def profile_name(self) -> str:
        """Get the name of the sysroot's profile."""
        return (
            self.GetCachedField(CACHED_FIELD_PROFILE_OVERRIDE)
            or DEFAULT_PROFILE
        )

    @property
    def board_overlay(self) -> List[str]:
        """The BOARD_OVERLAY standard field as a list.

        The BOARD_OVERLAY field is set on creation, and stores the list of
        overlays more directly associated with the build target itself. In an
        ideal world, this would be the single, top level overlay for the build
        target (e.g. overlay-eve-private) and everything else could be derived
        from that. In practice, this is currently every available overlay that
        is not in src/third_party.
        """
        return self.GetStandardField(STANDARD_FIELD_BOARD_OVERLAY).split()

    @property
    def _build_target_overlays(self) -> List[Path]:
        """Overlays for the build target itself."""
        prefix = f"overlay-{self.build_target_name}"
        return [x for x in self.get_overlays() if x.name.startswith(prefix)]

    @property
    def build_target_overlay(self) -> Optional[Path]:
        """The most specific build target overlay for the sysroot."""
        # Choose the longest as a proxy for the most specific. This should only
        # ever be choosing between overlay-x and overlay-x-private, but we'll
        # need better logic here if we have any cases with more than that.
        overlays = self._build_target_overlays
        overlay = max(overlays, key=lambda x: len(x.name)) if overlays else None
        return overlay

    @property
    def chipset(self) -> Optional[str]:
        """The chipset for the sysroot's build target."""
        overlays = [
            x for x in self.get_overlays() if x.name.startswith("chipset-")
        ]
        if not overlays:
            return None

        # Choose the longest as a proxy for the most specific. This should at
        # most be choosing between chipset-x and chipset-x-private, but we'll
        # need better logic here if we have any cases with more than that.
        overlay = max(overlays, key=lambda x: len(x.name))
        chipset = overlay.name

        # TODO(python 3.9): string.removeprefix & string.removesuffix instead.
        if chipset.startswith("chipset-"):
            chipset = chipset[len("chipset-") :]
        if chipset.endswith("-private"):
            chipset = chipset[: -len("-private")]
        return chipset

    @property
    def portdir_overlay(self) -> List[str]:
        """The PORTDIR_OVERLAY field as a list.

        The PORTDIR_OVERLAY field is set on creation, and stores the list of all
        overlays available to the sysroot.
        """
        return self.GetStandardField(STANDARD_FIELD_PORTDIR_OVERLAY).split()

    @property
    def use_flags(self) -> List[str]:
        """Get all USE flags for the sysroot."""
        return portage_util.PortageqEnvvar("USE", sysroot=self.path).split()

    @property
    def features(self) -> List[str]:
        """Get all FEATURES for the sysroot."""
        return portage_util.PortageqEnvvar(
            "FEATURES", sysroot=self.path
        ).split()

    @property
    def portage_logdir(self) -> str:
        """Get the PORTAGE_LOGDIR property for this sysroot."""
        return portage_util.PortageqEnvvar("PORTAGE_LOGDIR", sysroot=self.path)

    def get_overlays(
        self, build_target_only: bool = False, relative: bool = False
    ) -> List[Path]:
        """Get a list of the overlays available to the sysroot.

        Note: The overlay paths are always inside the SDK. If the outside the
        SDK paths are needed, we should add an option to transform them here.

        Args:
            build_target_only: Only fetch the overlays more relevant to the
                build target. By default, fetch all overlays available to the
                sysroot.
            relative: Get the overlay paths relative to the source root rather
                than as absolute paths.
        """
        overlays = (
            self.board_overlay if build_target_only else self.portdir_overlay
        )
        overlay_paths = [Path(x) for x in overlays]
        if relative:
            return [
                x.relative_to(constants.CHROOT_SOURCE_ROOT)
                for x in overlay_paths
            ]

        return overlay_paths

    def _WrapperPath(self, command: str, friendly_name: str = None) -> str:
        """Returns the path to the wrapper for |command|.

        Args:
            command: command to wrap.
            friendly_name: suffix to add to the command name. If None, the
                wrapper will be created in the sysroot.
        """
        if friendly_name:
            return os.path.join(
                _wrapper_dir, "%s-%s" % (command, friendly_name)
            )
        return self.Path("build", "bin", command)

    def CreateAllWrappers(self, friendly_name: str = None) -> None:
        """Creates all the wrappers.

        Creates all portage tools wrappers, plus wrappers for gdb, cros_workon
        and pkg-config.

        Args:
            friendly_name: if not None, create friendly wrappers with
                |friendly_name| added to the command.
        """
        chost = self.GetStandardField(STANDARD_FIELD_CHOST)
        portage_wrapper_base_args = {
            "build_target": self.build_target_name,
            "chost": chost,
            "sysroot": self.path,
            "wrapper_cmd": (
                constants.CHROMITE_SCRIPTS_DIR / "portage_cmd_wrapper"
            ),
            "args": "",
        }
        for cmd in (
            "ebuild",
            "eclean",
            "emaint",
            "equery",
            "portageq",
            "qcheck",
            "qdepends",
            "qfile",
            "qlist",
            "qmerge",
            "qsize",
        ):
            args = portage_wrapper_base_args.copy()
            args["command"] = cmd
            if friendly_name:
                _CreateWrapper(
                    self._WrapperPath(cmd, friendly_name),
                    _PORTAGE_WRAPPER_TEMPLATE,
                    **args,
                )
            _CreateWrapper(
                self._WrapperPath(cmd), _PORTAGE_WRAPPER_TEMPLATE, **args
            )

        if friendly_name:
            args = portage_wrapper_base_args.copy()
            args["command"] = "emerge"
            args["args"] = "--root-deps"
            _CreateWrapper(
                self._WrapperPath("emerge", friendly_name),
                _PORTAGE_WRAPPER_TEMPLATE,
                **args,
            )
            # TODO(crbug.com/1108874): Delete the deprecated wrapper.
            _CreateWrapper(
                self._WrapperPath("cros_workon", friendly_name),
                _BOARD_WRAPPER_DEPRECATED_CMD_TEMPLATE,
                board=friendly_name,
                command="cros_workon",
                deprecated=(
                    "cros_workon-%s is deprecated, use cros-workon-%s instead."
                    % (friendly_name, friendly_name)
                ),
            )
            _CreateWrapper(
                self._WrapperPath("cros-workon", friendly_name),
                _BUILD_TARGET_WRAPPER_TEMPLATE,
                build_target=friendly_name,
                command="cros workon",
            )
            _CreateWrapper(
                self._WrapperPath("gdb", friendly_name),
                _BOARD_WRAPPER_TEMPLATE,
                board=friendly_name,
                command="cros_gdb",
            )
            _CreateWrapper(
                self._WrapperPath("pkg-config", friendly_name),
                _PKGCONFIG_WRAPPER_TEMPLATE,
                sysroot=self.path,
            )

        _CreateWrapper(
            self._WrapperPath("pkg-config"),
            _PKGCONFIG_WRAPPER_TEMPLATE,
            sysroot=self.path,
        )

        args = portage_wrapper_base_args.copy()
        args["command"] = "emerge"
        args["args"] = "--root-deps"
        _CreateWrapper(
            self._WrapperPath("emerge"), _PORTAGE_WRAPPER_TEMPLATE, **args
        )

        # Create a link to the debug symbols in the chroot so that gdb can
        # detect them.
        debug_symlink = os.path.join("/usr/lib/debug", self.path.lstrip("/"))
        sysroot_debug = self.Path("usr/lib/debug")
        osutils.SafeMakedirs(os.path.dirname(debug_symlink), sudo=True)
        osutils.SafeMakedirs(os.path.dirname(sysroot_debug), sudo=True)

        osutils.SafeSymlink(sysroot_debug, debug_symlink, sudo=True)

    def InstallMakeConf(self) -> None:
        """Make sure the make.conf file exists and is up to date."""
        config_file = _GetMakeConfGenericPath()
        osutils.SafeSymlink(config_file, self.Path(_MAKE_CONF), sudo=True)

    def InstallMakeConfBoard(
        self,
        accepted_licenses: str = None,
        local_only: bool = False,
        package_indexes: List["PackageIndexInfo"] = None,
        use_cq_prebuilts: bool = False,
        expanded_binhost_inheritance: bool = False,
    ) -> None:
        """Make sure the make.conf.board file exists and is up to date.

        Args:
            accepted_licenses: Any additional accepted licenses.
            local_only: Whether prebuilts can be fetched from remote sources.
            package_indexes: List of information about available prebuilts,
                youngest first, or None.
            use_cq_prebuilts: Whether to use the prebuilts generated by CQ.
            expanded_binhost_inheritance: Whether to enable expanded binhost
                inheritance, which searches for additional binhosts to include
                to attempt to improve binhost hit rates.
        """
        board_conf = self.GenerateBoardMakeConf(
            accepted_licenses=accepted_licenses
        )
        make_conf_path = self.Path(_MAKE_CONF_BOARD)
        osutils.WriteFile(make_conf_path, board_conf, sudo=True)

        # Once make.conf.board has been generated, generate the binhost config.
        # We need to do this in two steps as the binhost generation step needs
        # portageq to be available.
        binhost_conf = self.GenerateBinhostConf(
            local_only=local_only,
            package_indexes=package_indexes,
            use_cq_prebuilts=use_cq_prebuilts,
            expanded_binhost_inheritance=expanded_binhost_inheritance,
        )
        osutils.WriteFile(
            make_conf_path, "%s\n%s\n" % (board_conf, binhost_conf), sudo=True
        )

    def InstallMakeConfBoardSetup(self, board: str) -> None:
        """Make sure the sysroot has the make.conf.board_setup file.

        Args:
            board: The name of the board being setup in the sysroot.
        """
        self.WriteConfig(self.GenerateBoardSetupConfig(board))

    def InstallMakeConfUser(self) -> None:
        """Make sure the sysroot has the make.conf.user file.

        This method assumes the chroot's make.conf.user file exists.
        See chroot_util.CreateMakeConfUser() to create one if needed.
        Only works inside the chroot.
        """
        make_user = _GetChrootMakeConfUserPath()
        link_path = self.Path(_MAKE_CONF_USER)
        if not os.path.exists(link_path):
            osutils.SafeSymlink(make_user, link_path, sudo=True)

    def _GenerateConfig(
        self,
        toolchains: "toolchain_list.ToolchainList",
        board_overlays: List[str],
        portdir_overlays: List[str],
        header: str,
        **kwargs: Any,
    ) -> str:
        """Create common config settings for boards and bricks.

        Args:
            toolchains: ToolchainList object to use.
            board_overlays: List of board overlays.
            portdir_overlays: List of portage overlays.
            header: Header comment string; must start with #.
            **kwargs: Additional configuration values to set.

        Returns:
            Configuration string.

        Raises:
            ConfigurationError: Could not generate a valid configuration.
        """
        config = {}

        default_toolchains = toolchain.FilterToolchains(
            toolchains, "default", True
        )
        if not default_toolchains:
            raise ConfigurationError("No default toolchain could be found.")
        config["CHOST"] = list(default_toolchains)[0]
        config["ARCH"] = toolchain.GetArchForTarget(config["CHOST"])

        config["BOARD_OVERLAY"] = "\n".join(board_overlays)
        config["PORTDIR_OVERLAY"] = "\n".join(portdir_overlays)

        config["MAKEOPTS"] = "-j%s" % str(multiprocessing.cpu_count())
        config["ROOT"] = self.path + "/"
        config["PKG_CONFIG"] = self._WrapperPath("pkg-config")

        config.update(kwargs)

        return "\n".join((header, _DictToKeyValue(config)))

    def GenerateBoardSetupConfig(self, board: str) -> str:
        """Generates the setup configuration for a given board.

        Args:
            board: board name to use to generate the configuration.
        """
        toolchains = toolchain.GetToolchainsForBoard(board)

        # Compute the overlay list.
        portdir_overlays = portage_util.FindOverlays(
            constants.BOTH_OVERLAYS, board
        )
        prefix = os.path.join(constants.SOURCE_ROOT, "src", "third_party")
        board_overlays = [
            o for o in portdir_overlays if not o.startswith(prefix)
        ]

        header = "# Created by cros_sysroot_utils from --board=%s." % board
        return self._GenerateConfig(
            toolchains,
            board_overlays,
            portdir_overlays,
            header,
            BOARD_USE=board,
        )

    def WriteConfig(self, config: str) -> None:
        """Writes the configuration.

        Args:
            config: configuration to use.
        """
        osutils.WriteFile(
            self._config_file_write, config, makedirs=True, sudo=True
        )

    def GenerateBoardMakeConf(self, accepted_licenses: str = None) -> str:
        """Generates the board specific make.conf.

        Args:
            accepted_licenses: Licenses accepted by portage.

        Returns:
            The make.conf file as a python string.
        """
        config = [
            """# AUTO-GENERATED FILE. DO NOT EDIT.

# Source make.conf from each overlay."""
        ]

        overlay_list = self.GetStandardField(STANDARD_FIELD_BOARD_OVERLAY)
        boto_config = ""
        for overlay in overlay_list.splitlines():
            make_conf = os.path.join(overlay, "make.conf")
            boto_file = os.path.join(overlay, "googlestorage_account.boto")
            if os.path.isfile(make_conf):
                config.append("source %s" % make_conf)

            if os.path.isfile(boto_file):
                boto_config = boto_file

        # If there is a boto file in the chromeos internal overlay, use it as it
        # will have access to the most stuff.
        if os.path.isfile(_CHROMEOS_INTERNAL_BOTO_PATH):
            boto_config = _CHROMEOS_INTERNAL_BOTO_PATH
        else:
            # NB: Do not touch this w/out build consult.  Pretend this doesn't
            # exist.
            config.append('USE="$USE -ondevice_speech"')

        gs_fetch_binpkg = os.path.join(
            constants.SOURCE_ROOT, "chromite", "bin", "gs_fetch_binpkg"
        )
        gsutil_cmd = (
            '%s \\"${URI}\\" \\"${DISTDIR}/${FILE}\\"' % gs_fetch_binpkg
        )
        config.append('BOTO_CONFIG="%s"' % boto_config)
        config.append(
            "FETCHCOMMAND_GS=\"bash -c 'BOTO_CONFIG=%s %s'\""
            % (boto_config, gsutil_cmd)
        )
        config.append('RESUMECOMMAND_GS="$FETCHCOMMAND_GS"')

        if accepted_licenses:
            config.append('ACCEPT_LICENSE="%s"' % accepted_licenses)

        return "\n".join(config)

    def GenerateBinhostConf(
        self,
        local_only: bool = False,
        package_indexes: List["PackageIndexInfo"] = None,
        expanded_binhost_inheritance: bool = False,
        use_cq_prebuilts: bool = False,
        source_root: str = constants.SOURCE_ROOT,
    ) -> str:
        """Returns the binhost configuration.

        Args:
            local_only: If True, use binary packages from local boards only.
            package_indexes: List of information about available prebuilts,
                youngest first, or None.
            expanded_binhost_inheritance: Look for additional binhosts to
                inherit.
            use_cq_prebuilts: Whether to use the prebuilts generated by CQ.
            source_root: Root directory for the source files.

        Returns:
            The config contents.
        """
        board = self.GetStandardField(STANDARD_FIELD_BOARD_USE)
        if local_only:
            if not board:
                return ""
            # TODO(bsimonnet): Refactor cros_generate_local_binhosts into a
            #   function here and remove the following call.
            local_binhosts = cros_build_lib.run(
                [
                    constants.CHROMITE_BIN_DIR / "cros_generate_local_binhosts",
                    "--board=%s" % board,
                ],
                print_cmd=False,
                capture_output=True,
                encoding="utf-8",
            ).stdout
            return "\n".join(
                [local_binhosts, 'PORTAGE_BINHOST="$LOCAL_BINHOST"']
            )

        config = []
        if package_indexes:
            # TODO(crbug/1088059): Drop all use of overlay commits, once the
            #   solution is in place for non-snapshot checkouts.
            # If present, this defines PORTAGE_BINHOST.  These are independent
            # of the overlay commits.
            config.append("# This is the list of binhosts provided by the API.")
            config.append(
                'PASSED_BINHOST="%s"'
                % " ".join(x.location for x in reversed(package_indexes))
            )
            config.append('PORTAGE_BINHOST="$PASSED_BINHOST"')
        else:
            config.append(
                """
# FULL_BINHOST is populated by the full builders. It is listed first because it
# is the lowest priority binhost. It is better to download packages from the
# postsubmit/cq binhost because they are fresher packages.
PORTAGE_BINHOST="$FULL_BINHOST"
"""
            )

            config.extend(
                self._ContinuousBinhostConfigs(
                    "POSTSUBMIT",
                    board,
                    expanded_binhost_inheritance,
                    source_root,
                )
            )

        # CQ BINHOSTs in the repository are effective if |package_indexes| is
        # not set or |use_cq_prebuilts| is explicitly specified.
        if use_cq_prebuilts:
            config.extend(
                self._ContinuousBinhostConfigs(
                    "CQ", board, expanded_binhost_inheritance, source_root
                )
            )

        return "\n".join(config)

    def _ContinuousBinhostConfigs(
        self,
        builder_type: str,
        board: Union[str, None],
        expanded_binhost_inheritance: bool,
        source_root: str,
    ) -> List[str]:
        config = []
        (binhost_public, binhost_internal) = self._ContinuousBinhosts(
            builder_type, board, expanded_binhost_inheritance, source_root
        )
        if binhost_public:
            config.append(
                f"""
# {builder_type}_BINHOST is populated by the public {builder_type} builders.
# The packages here takes higher priority than the packages provided by the
# above binhosts.
source {binhost_public}
PORTAGE_BINHOST="$PORTAGE_BINHOST ${builder_type}_BINHOST"
"""
            )
        if binhost_internal:
            config.append(
                f"""
# {builder_type}_BINHOST is populated by the internal {builder_type} builders.
# The packages here takes higher priority than the packages provided by the
# above binhosts.
source {binhost_internal}
PORTAGE_BINHOST="$PORTAGE_BINHOST ${builder_type}_BINHOST"
"""
            )

        return config

    def _ContinuousBinhosts(
        self,
        builder_type: str,
        board: Union[str, None],
        expanded_binhost_inheritance: bool,
        source_root: str,
    ) -> Tuple[Optional[str], Optional[str]]:
        """Returns the postsubmit or CQ binhost to use."""
        boards = []
        # The preference of picking the binhost file for a board is in the same
        # order of boards, so it's critical to make sure
        # <board>-<builder_type>_BINHOST.conf is at the top of |boards| list.
        if board:
            boards = [board]
            # Add reference board if applicable.
            if "_" in board:
                boards.append(board.split("_")[0])
            elif expanded_binhost_inheritance:
                # Search the public parent overlays for the given board, and
                # include the parents' binhosts; e.g. eve for eve-kvm.
                overlays = portage_util.FindOverlays(
                    constants.PUBLIC_OVERLAYS, board=board
                )
                names = [portage_util.GetOverlayName(x) for x in overlays]
                boards.extend(x for x in names if x != board)

        # Add base architecture board.
        arch = self.GetStandardField(STANDARD_FIELD_ARCH)
        if arch in _ARCH_MAPPING:
            boards.append(_ARCH_MAPPING[arch])

        filenames = [f"{p}-{builder_type}_BINHOST.conf" for p in boards]

        external = internal = None
        for filename in filenames:
            # The binhost file must exist and not be empty, both for internal
            # and external binhosts. When a builder is deleted and no longer
            # publishes prebuilts, we need developers to pick up the next set of
            # prebuilts. Clearing the binhost files triggers this.
            candidate = os.path.join(
                source_root, _INTERNAL_BINHOST_DIR, filename
            )
            if not internal and _NotEmpty(candidate):
                internal = candidate

            candidate = os.path.join(
                source_root, _EXTERNAL_BINHOST_DIR, filename
            )
            if not external and _NotEmpty(candidate):
                external = candidate

        return external, internal

    def CreateSkeleton(self) -> None:
        """Creates a sysroot skeleton."""
        needed_dirs = [
            self.Path("etc", "portage", "hooks"),
            self.Path("etc", "portage", "profile"),
            "/usr/local/bin",
        ]
        for d in needed_dirs:
            osutils.SafeMakedirs(d, sudo=True)

        # Create links for portage hooks.
        for filename in (constants.CROSUTILS_DIR / "hooks").glob("*"):
            linkpath = self.Path(
                "etc",
                "portage",
                "hooks",
                filename.name,
            )
            osutils.SafeSymlink(filename, linkpath, sudo=True)

    def UpdateToolchain(self, board: str, local_init: bool = True) -> None:
        """Updates the toolchain packages.

        This will install both the toolchains and the packages that are
        implicitly needed (gcc-libs, linux-headers).

        Args:
            board: The name of the board.
            local_init: Whether to use local packages to bootstrap the implicit
                dependencies.
        """
        try:
            toolchain.InstallToolchain(self)
        except toolchain.ToolchainInstallError as e:
            raise ToolchainInstallError(
                str(e),
                e.result,
                exception=e.exception,
                tc_info=e.failed_toolchain_info,
            ) from e

        if not self.IsToolchainInstalled():
            # Emerge the implicit dependencies.
            emerge = self._UpdateToolchainCommand(board, local_init)

            # Use a tempdir to handle the status file cleanup.
            with osutils.TempDir() as tempdir:
                extra_env = {constants.CROS_METRICS_DIR_ENVVAR: tempdir}

                try:
                    cros_build_lib.sudo_run(
                        emerge, preserve_env=True, extra_env=extra_env
                    )
                except cros_build_lib.RunCommandError as e:
                    # Include failed packages from the status file in the error.
                    failed_pkgs = portage_util.ParseDieHookStatusFile(tempdir)
                    raise ToolchainInstallError(
                        str(e), e.result, exception=e, tc_info=failed_pkgs
                    )

            # Record we've installed them so we don't call emerge each time.
            self.SetCachedField(_IMPLICIT_SYSROOT_DEPS_KEY, "yes")

    def _UpdateToolchainCommand(self, board: str, local_init: bool) -> str:
        """Helper function to build the emerge command for UpdateToolchain."""
        emerge = [
            constants.CHROMITE_BIN_DIR / "parallel_emerge",
            "--board=%s" % board,
            "--root-deps=rdeps",
            "--select",
            "--quiet",
        ]

        if local_init:
            emerge += ["--getbinpkg", "--usepkg"]

        emerge += _IMPLICIT_SYSROOT_DEPS

        return emerge

    def IsToolchainInstalled(self) -> bool:
        """Check if the toolchain has been installed."""
        return self.GetCachedField(_IMPLICIT_SYSROOT_DEPS_KEY) == "yes"

    def Delete(self, background: bool = False) -> None:
        """Delete the sysroot.

        Optionally run asynchronously. Async delete moves the sysroot into a
        temp directory and then deletes the tempdir with a background task.

        Args:
            background: Whether to run the delete as a background operation.
        """
        rm = ["rm", "-rf", "--one-file-system", "--"]
        if background:
            # Make the temporary directory in the same folder as the sysroot
            # were deleting to avoid crossing disks, mounts, etc. that'd cause
            # us to synchronously copy the entire thing before we delete it.
            cwd = os.path.normpath(self.Path(".."))
            try:
                result = cros_build_lib.sudo_run(
                    ["mktemp", "-d", "-p", cwd],
                    encoding="utf-8",
                    stdout=True,
                    cwd=cwd,
                    debug_level=logging.DEBUG,
                )
            except cros_build_lib.RunCommandError:
                # Fall back to a synchronous delete just in case.
                logging.notice(
                    "Error deleting sysroot asynchronously. Deleting "
                    "synchronously instead. This may take a minute."
                )
                return self.Delete(background=False)

            tempdir = result.stdout.strip()
            cros_build_lib.sudo_run(
                ["mv", self.path, tempdir],
                capture_output=True,
                debug_level=logging.DEBUG,
            )
            if not os.fork():
                # Child process, just delete the sysroot root and _exit.
                result = cros_build_lib.sudo_run(
                    rm + [tempdir],
                    capture_output=True,
                    check=False,
                    debug_level=logging.DEBUG,
                )
                if result.returncode:
                    # Log it so it can be handled manually.
                    logging.warning(
                        "Unable to delete old sysroot now at %s: %s",
                        tempdir,
                        result.stderr,
                    )
                # pylint: disable=protected-access
                os._exit(result.returncode)
        else:
            cros_build_lib.sudo_run(
                rm + [self.path], capture_output=True, debug_level=logging.DEBUG
            )

    def get_sdk_provided_packages(self) -> Iterable[package_info.PackageInfo]:
        """Find all packages provided by the SDK (i.e. package.provided)."""
        # Look at packages in package.provided.
        sdk_file_path = self.Path(
            "etc", "portage", "profile", "package.provided"
        )
        for line in osutils.ReadFile(sdk_file_path).splitlines():
            # Skip comments and empty lines.
            line = line.split("#", 1)[0].strip()
            if not line:
                continue
            yield package_info.parse(line)


def get_sdk_provided_packages(
    sysroot_path: str,
) -> Iterable[package_info.PackageInfo]:
    """Find all packages provided by the SDK (i.e. package.provided).

    Convenience wrapper for the Sysroot method.

    Args:
        sysroot_path: The sysroot to use when finding SDK packages.

    Returns:
        The provided packages.
    """
    sysroot = Sysroot(sysroot_path)
    return sysroot.get_sdk_provided_packages()
