blob: ca646aaf51459d6c4d9e717dcab80c5fbc737cd2 [file] [log] [blame]
# Copyright 2015 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Routines and a delegate for dealing with locally worked on packages."""
import collections
import glob
import logging
import os
from pathlib import Path
import re
from typing import Iterable, List, Set
from chromite.lib import build_target_lib
from chromite.lib import constants
from chromite.lib import cros_build_lib
from chromite.lib import dependency_graph
from chromite.lib import git
from chromite.lib import osutils
from chromite.lib import path_util
from chromite.lib import portage_util
from chromite.lib import sysroot_lib
if cros_build_lib.IsInsideChroot():
from chromite.lib import depgraph
# A package is a canonical CP atom.
# A package may have 0 or more repositories, given as strings.
# Each repository may be mapped into our workspace at some path.
PackageInfo = collections.namedtuple(
"PackageInfo", ("package", "repos", "src_paths")
)
# Vars that we want to pass through for the user.
EBUILD_PASS_THROUGH_VARS = frozenset(
[
# Common test vars.
"GTEST_ARGS",
# Platform eclass vars.
"P2_TEST_FILTER",
"P2_VMODULE",
]
)
def _IsWorkonEbuild(include_chrome, ebuild_path, ebuild_contents=None):
"""Returns True iff the ebuild at |ebuild_path| is a workon ebuild.
This means roughly that the ebuild is compatible with our cros_workon based
system. For most packages, this means that it inherits the cros-workon
overlay.
Args:
include_chrome: True iff we should include Chrome and chromium-source
packages.
ebuild_path: path an ebuild in question.
ebuild_contents: None, or the contents of the ebuild at |ebuild_path|.
If None, _IsWorkonEbuild will read the contents of the ebuild when
necessary.
Returns:
True iff the ebuild can be used with cros_workon.
"""
# TODO(rcui): remove special casing of chromeos-chrome here when we make it
# inherit from cros-workon / chromium-source class (chromium-os:19259).
if (
include_chrome
and portage_util.EbuildToCP(ebuild_path) == constants.CHROME_CP
):
return True
workon_eclasses = "cros-workon"
if include_chrome:
workon_eclasses += "|chromium-source"
ebuild_contents = ebuild_contents or osutils.ReadFile(ebuild_path)
if re.search(
"^inherit (.|\\\n)*(%s)" % workon_eclasses, ebuild_contents, re.M
):
return True
return False
def _GetLinesFromFile(path: Path, line_prefix, line_suffix) -> Set[str]:
"""Get a unique set of lines from a file, stripping off a prefix and suffix.
Rejects lines that do not start with |line_prefix| or end with
|line_suffix|.
Returns an empty set if the file at |path| does not exist.
Discards duplicate lines.
Args:
path: path to file.
line_prefix: prefix of line to look for and strip if found.
line_suffix: suffix of line to look for and strip if found.
Returns:
A set of filtered lines from the file at |path|.
"""
if not path.exists():
return set()
# Note that there is an opportunity to race with the file system here.
lines = set()
for line in path.read_text(encoding="utf-8").splitlines():
if not line.startswith(line_prefix) or not line.endswith(line_suffix):
logging.warning("Filtering out malformed line: %s", line)
continue
lines.add(line[len(line_prefix) : -len(line_suffix)])
return lines
def _WriteLinesToFile(path, lines, line_prefix, line_suffix) -> None:
"""Write a set of lines to a file, adding prefixes, suffixes and newlines.
Args:
path: path to file.
lines: iterable of lines to write.
line_prefix: string to prefix each line with.
line_suffix: string to append to each line before a newline.
"""
contents = "".join(
["%s%s%s\n" % (line_prefix, line, line_suffix) for line in lines]
)
if not contents:
osutils.SafeUnlink(path)
else:
osutils.WriteFile(path, contents, makedirs=True)
def GetWorkonPath(source_root=constants.SOURCE_ROOT, sub_path=None):
"""Get the path to files related to packages we're working locally on.
Args:
source_root: path to source root inside chroot.
sub_path: optional path to file relative to the workon root directory.
Returns:
path to the workon root directory or file within the root directory.
"""
ret = os.path.join(source_root, ".config/cros_workon")
if sub_path:
ret = os.path.join(ret, sub_path)
return ret
class WorkonError(Exception):
"""Raised when invariants of the WorkonHelper are violated."""
def _FilterWorkonOnlyEbuilds(ebuilds):
"""Filter a list of ebuild paths to only with those no stable version.
Args:
ebuilds: list of string paths to ebuild files (e.g.
['/prefix/sys-app/app/app-9999.ebuild'])
Returns:
list of ebuild paths meeting this criterion.
"""
result = []
for ebuild_path in ebuilds:
ebuild_pattern = os.path.join(os.path.dirname(ebuild_path), "*.ebuild")
stable_ebuilds = [
path
for path in glob.glob(ebuild_pattern)
if not path.endswith("-9999.ebuild")
]
if not stable_ebuilds:
result.append(ebuild_path)
return result
def ListAllWorkedOnAtoms(src_root=constants.SOURCE_ROOT):
"""Get a list of all atoms we're currently working on.
Args:
src_root: path to source root inside chroot.
Returns:
Dictionary of atoms marked as worked on (e.g. ['chromeos-base/shill'])
for each system.
"""
workon_dir = GetWorkonPath(source_root=src_root)
if not os.path.isdir(workon_dir):
return {}
system_to_atoms = {}
for file_name in os.listdir(workon_dir):
if file_name.endswith(".mask"):
continue
file_contents = osutils.ReadFile(os.path.join(workon_dir, file_name))
atoms = []
for line in file_contents.splitlines():
match = re.match("=(.*)-9999", line)
if match:
atoms.append(match.group(1))
if atoms:
system_to_atoms[os.path.basename(file_name)] = atoms
return system_to_atoms
class WorkonHelper:
"""Delegate that knows how to mark packages as being worked on locally.
This class assumes that we're executing in the build root.
"""
def __init__(
self,
sysroot,
friendly_name=None,
verbose=False,
src_root=constants.SOURCE_ROOT,
) -> None:
"""Construct an instance.
Args:
sysroot: path to sysroot to work on packages within.
friendly_name: friendly name of the system (e.g. 'host', <board
name>, or a brick friendly name). Defaults to 'host' if sysroot
is '/' or the last component of the sysroot path.
verbose: boolean True iff we should print a lot more command output.
This is intended for debugging, and you should never cause a
script to depend on behavior enabled by this flag.
src_root: path to source root inside chroot.
"""
self._sysroot = sysroot
if friendly_name:
self._system = friendly_name
else:
self._system = (
"host"
if sysroot == "/"
else os.path.basename(sysroot.rstrip("/"))
)
self._verbose = verbose
self._src_root = src_root
self._cached_overlays = None
self._cached_arch = None
self._depgraph = None
profile = os.path.join(self._sysroot, "etc", "portage")
self._unmasked_symlink = os.path.join(
profile, "package.unmask", "cros-workon"
)
self._keywords_symlink = os.path.join(
profile, "package.accept_keywords", "cros-workon"
)
self._masked_symlink = os.path.join(
profile, "package.mask", "cros-workon"
)
# Clobber and re-create the WORKON_FILE symlinks every time. This is a
# trivial operation and eliminates all kinds of corner cases as well as
# any possible future renames of WORKON_FILE.
# In particular, we build the chroot as a board (amd64-host), bundle it
# and unpack it on /. After unpacking, the symlinks will point to
# .config/cros_workon/amd64-host instead of .config/cros_workon/host.
# Regenerating the symlinks here corrects it. crbug.com/23096.
# Note: This is currently also relied upon as an indirect fix for
# crbug.com/679831. Search the bug number for instance(s).
self._RefreshSymlinks()
@property
def workon_file_path(self):
"""Returns path to the file holding our currently worked on atoms."""
return GetWorkonPath(source_root=self._src_root, sub_path=self._system)
@property
def masked_file_path(self):
"""Returns path to file masking non-9999 ebuilds for worked on atoms."""
return self.workon_file_path + ".mask"
@property
def _arch(self):
if self._cached_arch is None:
self._cached_arch = sysroot_lib.Sysroot(
self._sysroot
).GetStandardField(sysroot_lib.STANDARD_FIELD_ARCH)
return self._cached_arch
@property
def _overlays(self):
"""Returns overlays installed for the selected system."""
if self._cached_overlays is None:
sysroot = sysroot_lib.Sysroot(self._sysroot)
portdir_overlay = sysroot.GetStandardField("PORTDIR_OVERLAY")
self._cached_overlays = [
x.strip() for x in portdir_overlay.splitlines()
]
return self._cached_overlays
def _SetWorkedOnAtoms(self, atoms) -> None:
"""Sets the unmasked atoms.
This will generate both the unmasked atom list and the masked atoms list
as the two files mention the same atom list.
Args:
atoms: Atoms to unmask.
"""
_WriteLinesToFile(self.workon_file_path, atoms, "=", "-9999")
_WriteLinesToFile(self.masked_file_path, atoms, "<", "-9999")
self._RefreshSymlinks()
def _RefreshSymlinks(self) -> None:
"""Recreates the symlinks.
This will create the symlinks needed:
* package.mask/cros-workon: list of packages to mask.
* package.keywords/cros-workon: list of hidden packages to accept.
"""
if not os.path.exists(self._sysroot):
return
# We used to generate this, but stopped as it's not actually needed.
# Clean up any old links that might still be around.
osutils.SafeUnlink(self._unmasked_symlink, sudo=True)
for target, symlink in (
(self.masked_file_path, self._masked_symlink),
(self.workon_file_path, self._keywords_symlink),
):
if not os.path.exists(target):
logging.debug("Config %s doesn't exist; ignoring.", target)
elif not os.path.exists(symlink):
osutils.SafeMakedirs(os.path.dirname(symlink), sudo=True)
osutils.SafeSymlink(target, symlink, sudo=True)
else:
logging.debug(
"Symlink %s already exists. Don't recreate it.", symlink
)
def _AtomsToEbuilds(self, atoms):
"""Maps a list of CP atoms to a list of corresponding -9999 ebuilds.
Args:
atoms: iterable of portage atoms (e.g. ['sys-apps/dbus']).
Returns:
list of ebuilds corresponding to those atoms.
"""
atoms_to_ebuilds = {atom: None for atom in atoms}
for overlay in self._overlays:
ebuild_paths = glob.glob(
os.path.join(overlay, "*-*", "*", "*-9999.ebuild")
)
for ebuild_path in ebuild_paths:
atom = portage_util.EbuildToCP(ebuild_path)
if atom in atoms_to_ebuilds:
atoms_to_ebuilds[atom] = ebuild_path
ebuilds = []
for atom, ebuild in atoms_to_ebuilds.items():
if ebuild is None:
raise WorkonError("Could not find ebuild for atom %s" % atom)
ebuilds.append(ebuild)
return ebuilds
def _GetCanonicalAtom(self, package_fragment: str, find_stale=False):
"""Transform a pkg source path or name fragment to the canonical atom.
If there are multiple atoms that a package fragment could map to,
picks an arbitrary one and prints a warning.
Args:
package_fragment: Package source path or name fragment.
find_stale: if True, allow stale (missing) worked on package.
Returns:
string canonical atom name (e.g. 'sys-apps/dbus')
"""
# Attempt to not hit portage if at all possible for speed.
if package_fragment in self._GetWorkedOnAtoms():
return package_fragment
# Ask portage directly what it thinks about that package.
ebuild_path = self._FindEbuildForPackage(package_fragment)
# If portage didn't know about that package, try and autocomplete it.
if ebuild_path is None:
possible_atoms = set()
for ebuild in self._GetWorkonEbuilds(filter_on_arch=False):
pkg_atom = portage_util.EbuildToCP(ebuild)
if package_fragment in pkg_atom:
possible_atoms.add(pkg_atom)
# Also autocomplete from the worked-on list, in case the ebuild was
# deleted.
if find_stale:
for pkg_atom in self._GetWorkedOnAtoms():
if package_fragment in pkg_atom:
possible_atoms.add(pkg_atom)
if not possible_atoms:
# Try finding the packages affected by a given path.
path_atoms = sorted(self._GetPathAtoms(package_fragment))
if not path_atoms:
logging.warning(
'Could not find canonical package for "%s"',
package_fragment,
)
return None
if len(path_atoms) > 1:
logging.warning(
"Multiple affected packages found for %s:",
package_fragment,
)
for p in path_atoms:
logging.warning(" %s", p)
logging.warning("Using %s", path_atoms[0])
logging.notice(
"cros workon start command for the rest of the "
"packages:"
)
logging.notice(
f"cros workon -b {self._system} start "
f'{" ".join(path_atoms[1:])}'
)
logging.notice(
"Package %s found for path %s",
path_atoms[0],
package_fragment,
)
return path_atoms[0]
# We want some consistent order for making our selection below.
possible_atoms = sorted(possible_atoms)
if len(possible_atoms) > 1:
logging.warning("Multiple autocompletes found:")
for possible_atom in possible_atoms:
logging.warning(" %s", possible_atom)
autocompleted_package = portage_util.EbuildToCP(possible_atoms[0])
# Sanity check to avoid infinite loop.
if package_fragment == autocompleted_package:
logging.error("Resolved %s to itself", package_fragment)
return None
logging.info(
'Autocompleted "%s" to: "%s"',
package_fragment,
autocompleted_package,
)
return self._GetCanonicalAtom(autocompleted_package)
if not _IsWorkonEbuild(True, ebuild_path):
msg = (
"In order to cros_workon a package, it must have a -9999 "
"ebuild that inherits from cros-workon.\n"
)
if "-9999" in ebuild_path:
msg += (
'"%s" is a -9999 ebuild, make sure it inherits from '
"cros-workon.\n" % ebuild_path
)
else:
msg += '"%s" is not a -9999 ebuild.\n' % ebuild_path
logging.warning(msg)
return None
return portage_util.EbuildToCP(ebuild_path)
def _GetCanonicalAtoms(
self, package_fragments: Iterable[str], find_stale=False
):
"""Transforms a list of package name fragments into a list of CP atoms.
Args:
package_fragments: list of package source paths and/or name
fragments.
find_stale: if True, allow stale (missing) worked on package.
Returns:
list of canonical portage atoms corresponding to the given
fragments.
"""
if not package_fragments:
raise WorkonError("No packages specified")
atoms = []
for package_fragment in package_fragments:
atom = self._GetCanonicalAtom(
package_fragment, find_stale=find_stale
)
if atom is None:
raise WorkonError("Error parsing package list")
atoms.append(atom)
return atoms
def _GetDepGraph(self):
"""Get the dependency graph."""
if self._depgraph:
return self._depgraph
try:
# Get the graph for our target.
if self._sysroot == build_target_lib.get_default_sysroot_path():
self._depgraph = depgraph.get_sdk_dependency_graph(
with_src_paths=True
)
else:
self._depgraph = depgraph.get_build_target_dependency_graph(
self._sysroot, with_src_paths=True
)
except dependency_graph.Error as e:
logging.error(e)
raise WorkonError("Error generating dependency graph.")
return self._depgraph
def _GetPathAtoms(self, raw_path) -> Iterable:
"""Get workon atoms affected by the current path."""
# Make sure we're in a source path.
path = Path(raw_path).resolve()
if not path.exists():
# The path doesn't exist. To avoid the long lookup when the dev
# misspells a package name, lets just assume it's that and return no
# packages.
logging.warning(
"%s (%s) does not exist. Is it a misspelled package?",
path,
raw_path,
)
return []
try:
path = path.relative_to(constants.SOURCE_ROOT)
except ValueError as e:
logging.error(e)
raise WorkonError(
"Current path not in the source root: "
f"{path} not in {constants.SOURCE_ROOT}"
)
graph = self._GetDepGraph()
# Get the relevant packages from the dep graph.
logging.debug("Getting packages relevant to %s", path)
relevant_atoms = set(x.atom for x in graph.get_relevant_nodes([path]))
if not relevant_atoms:
return []
logging.debug("Found relevant packages: %s", relevant_atoms)
# Filter out any non-cros-workon packages.
canonical = [self._GetCanonicalAtom(x) for x in relevant_atoms]
workon_atoms = [x for x in canonical if x]
logging.debug("Found relevant workon packages: %s", workon_atoms)
return workon_atoms
def _GetWorkedOnAtoms(self) -> Set[str]:
"""Returns the set of package atoms that we're currently working on."""
return _GetLinesFromFile(Path(self.workon_file_path), "=", "-9999")
def _FindEbuildForPackage(self, package):
"""Find an ebuild for a given atom (accepting even masked ebuilds).
Args:
package: package string.
Returns:
path to ebuild for given package.
"""
return portage_util.FindEbuildForPackage(
package,
self._sysroot,
include_masked=True,
extra_env={"ACCEPT_KEYWORDS": "~%s" % self._arch},
)
def _GetWorkonEbuilds(
self, filter_workon=False, filter_on_arch=True, include_chrome=True
):
"""Get a list of all cros-workon ebuilds in the current system.
Args:
filter_workon: True iff we should filter the list of ebuilds to
those packages which define only a workon ebuild (i.e. no stable
version).
filter_on_arch: True iff we should only return ebuilds which are
marked as unstable for the architecture of the system we're
interested in.
include_chrome: True iff we should also include chromeos-chrome and
related ebuilds. These ebuilds can be worked on, but don't work
like normal cros-workon ebuilds.
Returns:
list of paths to ebuilds meeting the above criteria.
"""
result = []
if filter_on_arch:
keyword_pat = re.compile(
r'^KEYWORDS=".*~(\*|%s).*"$' % self._arch, re.M
)
for overlay in self._overlays:
ebuild_paths = glob.glob(
os.path.join(overlay, "*-*", "*", "*-9999.ebuild")
)
for ebuild_path in ebuild_paths:
ebuild_contents = osutils.ReadFile(ebuild_path)
if filter_on_arch and not keyword_pat.search(ebuild_contents):
continue
if not _IsWorkonEbuild(
include_chrome, ebuild_path, ebuild_contents=ebuild_contents
):
continue
result.append(ebuild_path)
if filter_workon:
result = _FilterWorkonOnlyEbuilds(result)
return result
def _GetLiveAtoms(self, filter_workon=False):
"""Get a list of atoms currently marked as being locally compiled.
Args:
filter_workon: True iff the list should be filtered to only those
atoms without a stable version (i.e. the -9999 ebuild is the
only ebuild).
Returns:
list of canonical portage atoms.
"""
atoms = self._GetWorkedOnAtoms()
if filter_workon:
ebuilds = _FilterWorkonOnlyEbuilds(self._AtomsToEbuilds(atoms))
return [portage_util.EbuildToCP(ebuild) for ebuild in ebuilds]
return atoms
def _AddProjectsToPartialManifests(self, atoms) -> None:
"""Add projects corresponding to a list of atoms to the local manifest.
If we mark projects as workon that we don't have in our local checkout,
it is convenient to have them added to the manifest. Note that users
will need to `repo sync` to pull down repositories added in this way.
Args:
atoms: iterable of atoms to ensure are in the manifest.
"""
# If this is a Cog checkout, we do not care about local manifests at
# this time, as external access to Cider is a long ways down the road.
# A strategy to equate a Cog superproject checkout to a local manifest
# is also a requirement to re-enable this.
# TODO: b/334950349 - Support workon modifications of local manifests
if path_util.DetermineCheckout().type == path_util.CheckoutType.CITC:
return
manifest = git.ManifestCheckout.Cached(self._src_root)
should_repo_sync = False
ebuilds = portage_util.FindEbuildsForPackages(atoms, self._sysroot)
for ebuild_path in ebuilds.values():
infos = portage_util.GetRepositoryForEbuild(
ebuild_path, self._sysroot
)
if not infos:
logging.warning(
"Unable to determine project information for %s. "
"If you already have the project(s) for the package "
"synced, no action is needed.",
ebuild_path,
)
continue
for info in infos:
if not info.project or manifest.FindCheckouts(info.project):
continue
cmd = ["loman", "add", "--workon", info.project]
try:
cros_build_lib.dbg_run(cmd)
should_repo_sync = True
except cros_build_lib.RunCommandError as e:
logging.warning(
"Error adding project %s for %s to the manifest. "
"If you already have the project synced, no "
"action is needed.",
info.project,
ebuild_path,
)
logging.debug(e)
if should_repo_sync:
print('Please run "repo sync" now.')
def ListAtoms(self, use_all=False, use_workon_only=False):
"""Returns a list of interesting atoms.
By default, return a list of the atoms marked as being locally worked on
for the system in question.
Args:
use_all: If true, return a list of all atoms we could possibly work
on for the system in question.
use_workon_only: If true, return a list of all atoms we could
possibly work on that have no stable ebuild.
Returns:
a list of atoms (e.g. ['chromeos-base/shill', 'sys-apps/dbus']).
"""
if use_workon_only or use_all:
ebuilds = self._GetWorkonEbuilds(filter_workon=use_workon_only)
packages = [portage_util.EbuildToCP(ebuild) for ebuild in ebuilds]
else:
packages = self._GetLiveAtoms()
return sorted(packages)
def StartWorkingOnPackages(
self,
packages,
use_all: bool = False,
use_workon_only: bool = False,
quiet: bool = False,
) -> None:
"""Mark a list of packages as being worked on locally.
Args:
packages: list of package name fragments. While each fragment could
be a complete portage atom, this helper will attempt to infer
intent by looking for fragments in a list of all possible atoms
for the system in question.
use_all: True iff we should ignore the package list, and instead
consider all possible atoms that we could mark as worked on
locally.
use_workon_only: True iff we should ignore the package list, and
instead consider all possible atoms for the system in question
that define only the -9999 ebuild.
quiet: Does not log the started atoms when True. Used to avoid
confusion in cases where the list must be toggled to compute the
new packages.
"""
if not os.path.exists(self._sysroot):
raise WorkonError("Sysroot %s is not setup." % self._sysroot)
if use_all or use_workon_only:
ebuilds = self._GetWorkonEbuilds(filter_workon=use_workon_only)
atoms = [portage_util.EbuildToCP(ebuild) for ebuild in ebuilds]
else:
atoms = self._GetCanonicalAtoms(packages)
atoms = set(atoms)
logging.debug("Atoms: %s", sorted(atoms))
# Read out what atoms we're already working on.
existing_atoms = self._GetWorkedOnAtoms()
# Warn the user if they're requested to work on an atom that's already
# marked as being worked on.
for atom in atoms & existing_atoms:
logging.warning("Already working on %s", atom)
# If we have no new atoms to work on, we can quit now.
new_atoms = atoms - existing_atoms
if not new_atoms:
return
# Write out all these atoms to the appropriate files.
current_atoms = new_atoms | existing_atoms
self._SetWorkedOnAtoms(current_atoms)
self._AddProjectsToPartialManifests(new_atoms)
if not quiet:
# Legacy scripts used single quotes in their output, and we carry on
# this honorable tradition.
logging.notice(
"Started working on '%s' for '%s'",
" ".join(new_atoms),
self._system,
)
def StopWorkingOnPackages(
self,
packages,
use_all: bool = False,
use_workon_only: bool = False,
quiet: bool = False,
) -> None:
"""Stop working on a list of pkgs currently marked as locally worked on.
Args:
packages: list of package name fragments. These will be mapped to
canonical portage atoms via the same process as
StartWorkingOnPackages().
use_all: True iff instead of the provided package list, we should
just stop working on all currently worked on atoms for the
system in question.
use_workon_only: True iff instead of the provided package list, we
should stop working on all currently worked on atoms that define
only a -9999 ebuild.
quiet: Does not log the started atoms when True. Used to avoid
confusion in cases where the list must be toggled to compute the
new packages.
"""
if use_all or use_workon_only:
atoms = self._GetLiveAtoms(filter_workon=use_workon_only)
else:
atoms = self._GetCanonicalAtoms(packages, find_stale=True)
current_atoms = self._GetWorkedOnAtoms()
stopped_atoms = []
for atom in atoms:
if not atom in current_atoms:
logging.warning("Not working on %s", atom)
continue
current_atoms.discard(atom)
stopped_atoms.append(atom)
self._SetWorkedOnAtoms(current_atoms)
if stopped_atoms and not quiet:
# Legacy scripts used single quotes in their output, and we carry on
# this honorable tradition.
logging.notice(
"Stopped working on '%s' for '%s'",
" ".join(stopped_atoms),
self._system,
)
def GetWorkonAtom(self, package: str) -> str:
"""Find the canonical atom name for the workon package.
The function finds the canonical atom name (e.g. 'sys-apps/dbus')
for the given workon package. The function will throw WorkonError
if the package does not exist or is not on the current workon list
for the board.
Args:
package: The package to check in workon list for board.
Returns:
The canonical portage atom corresponding to the given fragment
(e.g. 'sys-apps/dbus')
"""
atom = self._GetCanonicalAtom(package, find_stale=False)
if not atom:
raise WorkonError(f"error looking up package {package}")
if not atom in self._GetWorkedOnAtoms():
raise WorkonError(
f"run 'cros workon --board {self._system} "
f"start {package}' first!"
)
return atom
def _AssertNotHost(self) -> None:
"""Fails if the sysroot is for host."""
if self._sysroot == build_target_lib.get_default_sysroot_path():
raise WorkonError("cannot run for host. expecting board target.")
def BuildPackage(
self,
package: str,
clean: bool = False,
test: bool = False,
) -> None:
"""Build a workon package.
Args:
package: The name of the package to build. This must be a workon
else an error is raised.
clean: True if we want to re run configure and prepare steps.
test: True if we want to run tests.
"""
cros_build_lib.AssertInsideChroot()
self._AssertNotHost()
atom = self.GetWorkonAtom(package)
pkgfile = self._FindEbuildForPackage(package)
if not pkgfile:
raise WorkonError(f"cannot find ebuild for {package}")
workpath = Path(
sysroot_lib.Sysroot(self._sysroot).JoinPath(
"tmp", "portage", f"{atom}-9999"
)
)
features = os.environ.get("FEATURES", "").split()
features.append("-noauto")
if test:
features.append("test")
(workpath / ".tested").unlink(missing_ok=True)
workdir = workpath / "work" / f"{atom}-9999"
ebuild_vars = osutils.SourceEnvironment(
pkgfile, ["CROS_WORKON_OUTOFTREE_BUILD"]
)
if not (
workdir.is_symlink()
or ebuild_vars.get("CROS_WORKON_OUTOFTREE_BUILD", "") == "1"
):
logging.warning("Cleaning up stale workdir: %s", workdir)
clean = True
if not clean:
(workpath / ".compiled").unlink(missing_ok=True)
envf = workpath / "temp" / "environment"
if envf.exists():
lines = [
(
f"declare -x {v}="
f"{cros_build_lib.ShellQuote(os.environ.get(v, ''))}"
f"{os.linesep}"
)
for v in EBUILD_PASS_THROUGH_VARS
]
osutils.WriteFile(
envf,
lines,
encoding="utf-8",
mode="a",
)
cmd = [f"ebuild-{self._system}", pkgfile]
if clean:
cmd.append("clean")
cmd.append("test" if test else "compile")
cros_build_lib.run(
cmd,
print_cmd=True,
extra_env={
"FEATURES": " ".join(features).strip(),
"SANDBOX_WRITE": self._src_root,
"CROS_WORKON_INPLACE": "1",
},
)
def InstallPackage(self, package: str) -> None:
"""Install a workon package.
Args:
package: The name of the package to be installed. This must be a
workon else an error is raised.
"""
cros_build_lib.AssertInsideChroot()
self._AssertNotHost()
atom = self.GetWorkonAtom(package)
cros_build_lib.run(
[f"emerge-{self._system}", "--nodeps", atom], print_cmd=True
)
def ScrubPackage(self, package: str) -> None:
"""Scrub a workon package.
Args:
package: The name of the package to be scrubbed.
"""
cros_build_lib.AssertInsideChroot()
self._AssertNotHost()
atom = self.GetWorkonAtom(package)
self.RunCommandInAtomSourceDirectory(atom, ["git", "clean", "-dxf"])
def GetPackageInfo(self, packages, use_all=False, use_workon_only=False):
"""Get information about packages.
Args:
packages: list of package name fragments. These will be mapped to
canonical portage atoms via the same process as
StartWorkingOnPackages().
use_all: True iff we should ignore the package list, and instead
consider all possible workon-able atoms.
use_workon_only: True iff we should ignore the package list, and
instead consider all possible atoms for the system in question
that define only the -9999 ebuild.
Returns:
Returns a list of PackageInfo tuples.
"""
if use_all or use_workon_only:
# You can't use info to find the source code from Chrome, since that
# workflow is different.
ebuilds = self._GetWorkonEbuilds(
filter_workon=use_workon_only, include_chrome=False
)
else:
atoms = self._GetCanonicalAtoms(packages)
ebuilds = [self._FindEbuildForPackage(atom) for atom in atoms]
build_root = self._src_root
src_root = os.path.join(build_root, "src")
manifest = git.ManifestCheckout.Cached(build_root)
ebuild_to_repos = {}
ebuild_to_src_paths = collections.defaultdict(list)
for ebuild in ebuilds:
ebuild_obj = portage_util.EBuild(ebuild)
workon_vars = ebuild_obj.cros_workon_vars
projects = workon_vars.project if workon_vars else []
ebuild_to_repos[ebuild] = projects
if ebuild_obj.is_manually_uprevved:
# Manually uprevved ebuild is pinned to a specific git sha1, so
# change in that repo matter to the ebuild.
continue
src_paths = ebuild_obj.GetSourceInfo(src_root, manifest).srcdirs
src_paths = [
os.path.relpath(path, build_root) for path in src_paths
]
ebuild_to_src_paths[ebuild] = src_paths
result = []
for ebuild in ebuilds:
package = portage_util.EbuildToCP(ebuild)
repos = ebuild_to_repos.get(ebuild, [])
src_paths = ebuild_to_src_paths.get(ebuild, [])
result.append(PackageInfo(package, repos, src_paths))
result.sort()
return result
def RunCommandInAtomSourceDirectory(self, atom, command: List[str]) -> None:
"""Run a command in the source directory of an atom.
Args:
atom: string atom to run the command in (e.g.
'chromeos-base/shill').
command: Command to run in the source directory of |atom|.
"""
logging.info('Running "%s" on %s', command, atom)
ebuild_path = self._FindEbuildForPackage(atom)
if ebuild_path is None:
raise WorkonError("Error looking for atom %s" % atom)
for info in portage_util.GetRepositoryForEbuild(
ebuild_path, self._sysroot
):
cros_build_lib.run(command, cwd=info.srcdir, print_cmd=False)
def RunCommandInPackages(
self, packages, command: List[str], use_all=False, use_workon_only=False
) -> None:
"""Run a command in the source directory of a list of packages.
Args:
packages: list of package name fragments.
command: Command to run in the source directory of |atom|.
use_all: True iff we should ignore the package list, and instead
consider all possible workon-able atoms.
use_workon_only: True iff we should ignore the package list, and
instead consider all possible atoms for the system in question
that define only the -9999 ebuild.
"""
if use_all or use_workon_only:
atoms = self._GetLiveAtoms(filter_workon=use_workon_only)
else:
atoms = self._GetCanonicalAtoms(packages)
for atom in atoms:
self.RunCommandInAtomSourceDirectory(atom, command)
def InstalledWorkonAtoms(self):
"""Returns the set of installed cros_workon packages."""
installed_cp = set()
for pkg in portage_util.PortageDB(self._sysroot).InstalledPackages():
installed_cp.add("%s/%s" % (pkg.category, pkg.package))
return set(a for a in self.ListAtoms(use_all=True) if a in installed_cp)
class WorkonScope:
"""Context manager to assist managing workon status for packages."""
def __init__(
self,
build_target: build_target_lib.BuildTarget,
pkgs: Iterable[str] = tuple(),
) -> None:
"""Construct an instance.
Args:
build_target: The build target (board) being built.
pkgs: The workon packages to be used in the context manager dunder
methods.
"""
self.helper = WorkonHelper(build_target.root, build_target.name)
self.pkgs = pkgs
self.target = build_target
self.stop_packages = []
self.start_packages = []
self.before_workon = self.helper.ListAtoms()
def __enter__(self: "WorkonScope") -> "WorkonScope":
"""Commence context manager tasks for starting and stopping packages.
Returns:
The initialized WorkonScope context manager.
"""
self.start(self.pkgs)
after_workon = self.helper.ListAtoms()
# Stop = the set we actually started. Preserves workon started status
# for any in the packages that were already worked on.
self.stop_packages = sorted(set(after_workon) - set(self.before_workon))
return self
def __exit__(self, exc_type, exc_val, tb) -> None:
"""Clean up context manager tasks for starting and stopping packages.
Args:
exc_type: The exception type passed when the runtime context raises
an exception.
exc_val: The exception value raised by the runtime context.
tb: The exception traceback raised by the runtime context.
Raises:
Any exception raised in the runtime context will be raised here
after cleanup. Beyond that, all WorkonHelper methods are expected to
be safe operations.
"""
# Reset the environment.
logging.notice("Restoring cros_workon status.")
if self.stop_packages:
# Stop the packages we started.
logging.info("Stopping workon packages previously started.")
try:
self.stop(self.stop_packages)
except WorkonError:
to_stop = sorted(
set(self.stop_packages) - set(self.helper.ListAtoms())
)
logging.critical(
"Unable to stop started packages. Please stop the "
"following packages: %s",
" ".join(to_stop),
)
else:
logging.info("No packages needed to be stopped.")
if self.start_packages:
# Stop the packages we started.
logging.info("Restarting workon packages previously stopped.")
try:
self._start_packages(self.start_packages)
except WorkonError:
to_start = sorted(
set(self.start_packages) - set(self.helper.ListAtoms())
)
logging.critical(
"Unable to start stopped packages. Please start the "
"following packages: %s",
" ".join(to_start),
)
else:
logging.info("No packages needed to be restarted.")
def _start_packages(self, pkgs: Iterable[str]) -> None:
"""Wrapper for self.WorkonHelper.StartWorkingOnPackages."""
self.helper.StartWorkingOnPackages(pkgs)
def _stop_packages(self, pkgs: Iterable[str]) -> None:
self.helper.StopWorkingOnPackages(pkgs)
def start(self, pkgs: Iterable[str]) -> None:
"""Helper to allow the context manager to explicitly start packages.
Invocations of this method will track started packages and stop them
when __exit__ is invoked, even when explicitly called by a client in a
runtime context.
Args:
pkgs: A list of package name fragments.
"""
if pkgs:
logging.debug(
"cros-workon-%s start %s", self.target.name, " ".join(pkgs)
)
self._start_packages(pkgs)
after_workon = self.helper.ListAtoms()
self.stop_packages = sorted(
set(after_workon) - set(self.before_workon)
)
def stop(self, pkgs: Iterable[str]) -> None:
"""Helper to allow the context manager to explicitly stop packages.
If a package is stopped that was marked as workon before entering the
runtime context, that package will be restarted on exit.
Args:
pkgs: A list of package name fragments.
"""
if pkgs:
logging.debug(
"cros-workon-%s stop %s", self.target.name, " ".join(pkgs)
)
self._stop_packages(pkgs)
to_restart = set(pkgs) & set(self.before_workon)
self.start_packages = sorted(to_restart | set(self.start_packages))