blob: ee1455966a1ce2b0e87d66349d57f68cbc53125c [file] [log] [blame]
# Copyright 2021 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""DependencyGraph class.
At time of writing, the DependencyGraph is only used by the depgraph lib to
produce graphs from portage, but this class is decoupled from that file to
allow it to be used elsewhere. For example, the package index file in binhosts
contain enough information to generate a graph, and does not require portage
imports to parse.
"""
import collections
import enum
import os
from pathlib import Path
import sys
from typing import Iterable, Iterator, List, Optional, Set, Union
from chromite.lib import cros_build_lib
from chromite.lib import cros_logging as logging
from chromite.lib.parser import package_info
assert sys.version_info >= (3, 6), 'This module requires Python 3.6+'
class Error(Exception):
"""Base module error."""
class NodeCollisionError(Error):
"""Found multiple nodes representing the same package."""
class NoSourcePathsError(Error):
"""Source paths were queried but none were provided."""
class SysrootValueError(Error, ValueError):
"""Sysroot argument value not valid."""
class PackageNode:
"""A package vertex in the depgraph.
Attributes:
pkg_info: The package the node represents.
root: The root where the package is to be installed.
_deps: The set of packages this package depends on.
_rev_deps: The set of packages that depend on this package.
"""
pkg_info: package_info.PackageInfo
root: str
def __init__(self, pkg_info: package_info.PackageInfo, root: str,
src_paths: Optional[Iterable[Union[str, os.PathLike]]] = None):
self.pkg_info = pkg_info
self.root = root
self._deps = set()
self._rev_deps = set()
self.source_paths = list(src_paths) if src_paths else []
def __eq__(self, other: Union['PackageNode', package_info.PackageInfo]):
if isinstance(other, PackageNode):
return (self.pkg_info == other.pkg_info and
self.root == other.root and
self._deps == set(other.dependencies) and
self._rev_deps == set(other.reverse_dependencies))
elif isinstance(other, package_info.PackageInfo):
return self.pkg_info == other
else:
return False
def __hash__(self):
return hash((self.name, self.root))
def __str__(self):
return f'PackageNode<{self.pkg_info} in {self.root}>'
def __repr__(self):
# Build truncated strings for deps, rdeps, and source paths.
truncated_len = 3
deplen = min(len(self._deps), truncated_len)
deps = ', '.join(str(p) for p in list(self._deps)[:deplen])
if len(self._deps) > deplen:
deps += ', ...'
rdeplen = min(len(self._rev_deps), truncated_len)
rdeps = ', '.join(str(p) for p in list(self._rev_deps)[:rdeplen])
if len(self._rev_deps) > rdeplen:
rdeps += ', ...'
src_paths_len = min(len(self.source_paths), truncated_len)
src_paths = ', '.join(x for x in self.source_paths[:src_paths_len])
if len(self.source_paths) > src_paths_len:
src_paths += ', ...'
data = ', '.join([
f'pkg_info: "{self.pkg_info}"',
f'root: "{self.root}"',
f'deps: "{deps}"',
f'reverse deps: "{rdeps}"',
f'source paths: "{src_paths}"',
])
return f'PackageNode<{data}>'
@property
def dependencies(self) -> Iterable['PackageNode']:
"""Get the packages this one depends on."""
yield from self._deps
@property
def reverse_dependencies(self) -> Iterable['PackageNode']:
"""Get the packages that depend on this one."""
yield from self._rev_deps
@property
def atom(self):
"""Get the package atom ("category/package")."""
return self.pkg_info.atom
@property
def cpvr(self):
return self.pkg_info.cpvr
@property
def name(self):
"""Get the node name, a unique identifier for the package itself."""
return self.cpvr
def add_dependency(self, dependency: 'PackageNode'):
"""Add a package as a dependency of this node.
Also registers this package as a reverse dependency of the other.
"""
self._deps.add(dependency)
dependency.add_reverse_dependency(self)
def add_reverse_dependency(self, dependency: 'PackageNode'):
"""Add a reverse dependency.
This method is not generally meant to be used directly. Use add_dependency
to build out a DependencyGraph.
"""
self._rev_deps.add(dependency)
def is_relevant(self, src_path: Union[str, os.PathLike]) -> bool:
"""Check if |src_path| is relevant to the node.
Relevant here means a change in a file in the source paths could affect the
result of building the package.
"""
if not self.source_paths:
# By definition every package has at least 1 relevant path, the ebuild
# itself, so no paths must mean the info wasn't provided.
raise NoSourcePathsError(
'Must add source paths to nodes to perform this operation.')
path = Path(src_path)
for p in self.source_paths:
# TODO(python3.9): Change to use Path.is_realtive_to().
try:
path.relative_to(p)
return True
except ValueError:
pass
return False
def any_relevant(self, src_paths: Iterable[Union[str, os.PathLike]]) -> bool:
"""Check if any of the paths in |src_paths| are relevant to the node."""
return any(self.is_relevant(p) for p in src_paths)
@enum.unique
class RootType(enum.Enum):
"""Root type enum for DependencyGraph."""
SYSROOT = enum.auto()
SDK = enum.auto()
ALL = enum.auto()
class DependencyGraph:
"""Dependency graph data structure.
Data structure to enable more easily querying and traversing the depgraphs.
Querying the depgraph currently supports the full CPVR package spec, and
package atoms. The behavior of other package specs (e.g. CPV) is undefined.
"""
def __init__(self, nodes: Iterable[PackageNode],
sysroot: Union[str, 'sysroot_lib.Sysroot'],
root_packages: List[Union[str, package_info.PackageInfo]]):
"""DependencyGraph init.
The |nodes| are expected to already have their dependencies added.
Args:
nodes: The PackageNodes for the depgraph.
sysroot: The sysroot the dependency graph was built from.
root_packages: The packages emerged to create this depgraph.
"""
### Add all of the nodes to our internal structure.
# The pkg/atom_dict are nested dictionaries. The first key is the package
# CPF (category/package-version-revision). The next key is the package's
# target root. This allows identifying packages that are installed to both
# the sysroot and the SDK.
self._pkg_dict = collections.defaultdict(dict)
# The atom_list is a dictionary containing the list of packages that map
# to a specific atom. Many packages will have a single entry, but the list
# may contain multiple when a package is installed to both the sysroot
# and the SDK root, and/or could contain multiple versions of a package.
self._atom_list = collections.defaultdict(list)
self._roots = set()
self._len = 0
for node in nodes:
self._add_node(node)
### Verify found roots and store roots information.
# Each entry should be installed to either the SDK (BDEPEND) or to
# the sysroot (DEPEND/RDEPEND). Zero roots is possible only when the
# depgraph is empty.
assert 0 <= len(self._roots) <= 2, 'Unexpected roots: {self._roots}'
# Figure out which roots we have roots, and that they're the ones we expect.
given_sysroot = getattr(sysroot, 'path', sysroot)
sdk_root = cros_build_lib.GetSysroot()
is_sdk_graph = given_sysroot == sdk_root
# Store the SDK (bdeps) root when present and it's not the SDK's depgraph.
self.sdk_root = (
sdk_root if sdk_root in self._roots and not is_sdk_graph else None)
self.sysroot_path = given_sysroot if given_sysroot in self._roots else None
if not self.sysroot_path:
# Given sysroot could not be confirmed. It could mean we were given a bad
# value, or just that nothing needed to be installed to the sysroot.
# found_sysroot should always be either empty, or a set with exactly one
# element in it.
found_sysroot = self._roots - {self.sdk_root}
if found_sysroot:
raise SysrootValueError(
f'Found sysroot {found_sysroot} does not match given sysroot '
f'({given_sysroot}).')
else:
# Since it's valid no packages could be installed to it, just notify
# the user.
logging.info('Given %s but depgraph did not contain any packages '
'installed to it.', given_sysroot)
### Save the references to the root package nodes.
# This must be done after the root information processing for the root
# filtering functionality in _get_nodes().
self._root_package_nodes = []
for pkg in [package_info.parse(p) for p in root_packages]:
self._root_package_nodes.extend(self.get_nodes(pkg, RootType.SYSROOT))
def __contains__(
self, item: Union[str, package_info.PackageInfo, PackageNode]) -> bool:
"""Check if the given package is in the depgraph."""
if isinstance(item, PackageNode):
return item.atom in self._atom_list and item in self._atom_list[item.atom]
elif isinstance(item, package_info.PackageInfo):
return item.cpvr in self._pkg_dict or item.atom in self._atom_list
else:
return item in self._pkg_dict or item in self._atom_list
def __iter__(self):
"""BFS traversal of the depgraph."""
pkgs = collections.deque(self._root_package_nodes)
seen = set()
while pkgs:
pkg = pkgs.popleft()
if pkg in seen:
continue
seen.add(pkg)
pkgs.extend(pkg.dependencies)
yield pkg
def __len__(self):
"""Size of the depgraph."""
return self._len
def _add_node(self, node: PackageNode):
"""Add a package and its dependencies to the graph."""
if node.root in self._pkg_dict[node.name]:
if self._pkg_dict[node.name][node.root] == node:
# Ignore if re-adding.
return
else:
raise NodeCollisionError(f'Different node already exists for {node}.')
# Add node.
self._pkg_dict[node.name][node.root] = node
self._atom_list[node.atom].append(node)
self._roots.add(node.root)
self._len += 1
for dep in node.dependencies:
self._add_node(dep)
def _get_roots(self, root: RootType = RootType.ALL) -> Set[str]:
"""Helper to translate RootType value to a set of roots."""
if root is RootType.SDK:
return self._roots & {self.sdk_root}
elif root is RootType.SYSROOT:
return self._roots & {self.sysroot_path}
else:
return self._roots
def get_nodes(self, pkg: Union[package_info.PackageInfo, str],
root_type: RootType = RootType.ALL) -> List[PackageNode]:
"""Get all nodes matching the given package and root type."""
pkg_info = package_info.parse(pkg)
roots = self._get_roots(root_type)
if pkg_info.cpvr in self._pkg_dict:
# Have exact match.
nodes = []
for root in roots:
if root in self._pkg_dict[pkg_info.cpvr]:
nodes.append(self._pkg_dict[pkg_info.cpvr][root])
return nodes
elif pkg_info.atom in self._atom_list:
# Given an atom string.
return [x for x in self._atom_list[pkg_info.atom] if x.root in roots]
else:
return []
def is_dependency(self, dep_pkg: Union[package_info.PackageInfo, str],
src_pkg: Union[package_info.PackageInfo, str],
dep_root_type: RootType = RootType.ALL,
src_root_type: RootType = RootType.ALL,
direct: bool = False) -> bool:
"""Check if |dep_pkg| is in the dependency graph of |src_pkg|.
Args:
dep_pkg: The potential dependency package.
src_pkg: The root of the subgraph to search for |dep_pkg|.
dep_root_type: Filters the root when finding |dep_pkg|.
src_root_type: Filters the root when finding |src_pkg|.
direct: Only search direct dependencies when set to True.
"""
dep_pkg_nodes = self.get_nodes(dep_pkg, dep_root_type)
src_pkg_nodes = self.get_nodes(src_pkg, src_root_type)
if not dep_pkg_nodes or not src_pkg_nodes:
# One or both not in the graph at all.
return False
elif direct:
# Only check the direct dependencies.
return any(d in s.dependencies
for d in dep_pkg_nodes
for s in src_pkg_nodes)
else:
# Check if it's in the subgraph(s) rooted at src_pkg.
subgraph = DependencyGraph(src_pkg_nodes, self.sysroot_path, [src_pkg])
return any(x in subgraph for x in dep_pkg_nodes)
def get_dependencies(
self,
pkg: Union[package_info.PackageInfo, str],
root_type: RootType = RootType.ALL) -> Iterator[PackageNode]:
"""Get the dependencies for a package.
Use |root_type| to differentiate between the instances installed to the SDK
and sysroot. Generates an empty list for packages not in the graph (or when
filtered to a root the package isn't installed to). Does not ensure a
unique list when multiple nodes are found.
"""
for node in self.get_nodes(pkg, root_type):
yield from node.dependencies
def get_reverse_dependencies(
self,
pkg: Union[package_info.PackageInfo, str],
root_type: RootType = RootType.ALL) -> Iterator[PackageNode]:
"""Get the reverse dependencies for a package.
Like get_dependencies(), but get the reverse dependencies for the package.
See get_dependencies() for more information.
"""
for node in self.get_nodes(pkg, root_type):
yield from node.reverse_dependencies
def is_relevant(self, src_path: Union[str, os.PathLike]) -> bool:
"""Check if |src_path| is relevant to any nodes in the graph."""
return any(x.is_relevant(src_path) for x in self)
def any_relevant(self, src_paths: Iterable[Union[str, os.PathLike]]) -> bool:
"""Check if any of the paths in |src_paths| are relevant to any nodes."""
return any(self.is_relevant(p) for p in src_paths)