blob: d8f48dbc8f0c4b5cd0b4cf48c6715cff2a014a32 [file] [log] [blame]
# Copyright 2022 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Routines which facilitate data collection for build operations.
Service module to parse or extract data about build behavior for structured
data storage. Complementary to chromite.lib.metrics, chromite.api.metrics, and
related library usage.
"""
import functools
import logging
import os
import re
from typing import Dict, Iterable, NamedTuple, Pattern, Tuple
from chromite.lib import constants
from chromite.lib import image_lib
from chromite.lib import osutils
from chromite.lib import portage_util
from chromite.lib.parser import package_info
_SUPPORTED_ISCP_PARTITIONS = {
constants.IMAGE_TYPE_BASE: [constants.PART_ROOT_A],
constants.IMAGE_TYPE_TEST: [constants.PART_ROOT_A, constants.PART_STATE],
constants.IMAGE_TYPE_DEV: [constants.PART_ROOT_A, constants.PART_STATE],
}
_STATEFUL_PARTITION_VDB = "var_overlay/db/pkg"
_STATEFUL_PARTITION_INSTALL_PATH = "dev_image"
class PackageVersion(NamedTuple):
"""Container class akin to chromite.observability.PackageVersion proto."""
major: int
minor: int
patch: int
extended: int
revision: int
full_version: str
class PackageName(NamedTuple):
"""Container class akin to chromite.observability.PackageName proto."""
atom: str
category: str
package_name: str
class PackageIdentifier(NamedTuple):
"""Container class like chromite.observability.PackageIdentifier proto."""
package_name: PackageName
package_version: PackageVersion
@functools.lru_cache(maxsize=None)
def _get_version_component_regex() -> Pattern:
# Parse version number, expecting up to 4 integer components arranged
# like: major[.minor[.patch[.extended]]]
extended = r"(?:\.(?P<extended>\d+))?"
patch = rf"(?:\.(?P<patch>\d+){extended})?"
minor = rf"(?:\.(?P<minor>\d+){patch})?"
complete = rf"^(?P<major>\d+){minor}"
return re.compile(complete)
def get_image_size_data(
image_details: Dict[os.PathLike, str]
) -> Dict[str, Dict[str, Dict[PackageIdentifier, portage_util.PackageSizes]]]:
"""Entry point method to parse input data and retrieve new data.
Args:
image_details: A mapping of the path to the image and the image type
(dev, test, etc).
Returns:
A mapping of the image type (base, dev, test) to a partition:package
mapping.
"""
package_sizes = {}
for image_path, image_type in image_details.items():
result = get_installed_package_data(
image_type=image_type, image_path=image_path
)
package_sizes[image_type] = result
return package_sizes
def get_installed_package_data(
image_type: str, image_path: os.PathLike
) -> Dict[str, Dict[PackageIdentifier, portage_util.PackageSizes]]:
"""Function for mounting an image and setting up a package database.
Utility method which mounts each supported partition of a given image and
produces the dataset of installed packages and their sizes.
Args:
image_type: The type of image being queried (base, dev, test).
image_path: The path to the image in question.
Returns:
A mapping of the partition type (stateful, rootfs) to package details.
"""
if image_type not in _SUPPORTED_ISCP_PARTITIONS:
logging.warning("Provided image type is not supported.")
return {}
results = {}
installed_package_files = []
# We mount the stateful partition in all cases because the stateful
# partition contains the package db that we need. We do this once and get
# installed packages once for all image types, regardless of whether we care
# about what's installed on the stateful partition.
with osutils.TempDir() as temp_dir:
# Get a dict of {partition:mountpoint}, including state always.
partitions = set(
_SUPPORTED_ISCP_PARTITIONS[image_type] + [constants.PART_STATE]
)
with image_lib.LoopbackPartitions(
path=image_path, destination=temp_dir, part_ids=partitions
) as img:
mount_points = img.Mounted()
db = portage_util.PortageDB(
root=mount_points[constants.PART_STATE],
vdb=_STATEFUL_PARTITION_VDB,
package_install_path=_STATEFUL_PARTITION_INSTALL_PATH,
)
installed_packages = db.InstalledPackages()
installed_package_files = list(
zip(
installed_packages,
[p.ListContents() for p in installed_packages],
)
)
# Now that we have the set of installed packages for the image, we
# mount each relevant partition that we want to check and calculate
# the size of the installed package on the partition (if the package
# is installed on that partition).
for partition in _SUPPORTED_ISCP_PARTITIONS[image_type]:
package_install_path = (
_STATEFUL_PARTITION_INSTALL_PATH
if partition == constants.PART_STATE
else ""
)
package_install_path = os.path.join(
mount_points[partition], package_install_path
)
results[partition] = get_package_details_for_partition(
package_install_path, installed_package_files
)
return results
# TODO(zland): refactor scripts/pkg_size (and this function) to use common
# library. This implementation does not want to recreate metrics records for the
# base image's rootfs, and we may not want to append all of the partition &
# image information to metrics, so a simplified approach to
# chromite/scripts/pkg_size is being used here.
def get_package_details_for_partition(
installation_path: os.PathLike,
pkgs: Iterable[
Tuple[portage_util.InstalledPackage, Iterable[Tuple[str, str]]]
],
) -> Dict[PackageIdentifier, portage_util.PackageSizes]:
"""Retrieve package size and format name details for |pkgs|.
Args:
installation_path: The path to the partition's root that the package's
installed files are relative to.
pkgs: The packages of interest in the partition (typically, the entire
contents of the package db).
"""
details = {}
for installed_package, pkg_fileset in pkgs:
sizes = portage_util.CalculatePackageSize(
pkg_fileset, installation_path
)
pkg_identifier = parse_package_name(installed_package.package_info)
details[pkg_identifier] = sizes
return details
def parse_package_name(pkg_info: package_info.PackageInfo) -> PackageIdentifier:
"""Produce detailed NamedTuple for a package from a PackageInfo object."""
# Version number parsing, looking for 1-4 numerical components and ignoring
# any alphanumeric suffixes (e.g. _alpha1).
matcher = _get_version_component_regex()
matches = matcher.match(pkg_info.version)
major = int(matches.group("major") or 0)
minor = int(matches.group("minor") or 0)
patch = int(matches.group("patch") or 0)
extended = int(matches.group("extended") or 0)
version = PackageVersion(
major=major,
minor=minor,
patch=patch,
extended=extended,
revision=pkg_info.revision,
full_version=pkg_info.vr,
)
name = PackageName(
atom=pkg_info.atom,
category=pkg_info.category,
package_name=pkg_info.package,
)
return PackageIdentifier(package_name=name, package_version=version)