blob: 76e96c3dcc583b66416340e8cd0dd509316312b9 [file] [log] [blame]
# Copyright 2020 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Library to generate a DLC (Downloadable Content) artifact."""
from __future__ import division
import hashlib
import json
import logging
import math
import os
import re
import shutil
from chromite.lib import build_target_lib
from chromite.lib import cros_build_lib
from chromite.lib import dlc_allowlist
from chromite.lib import osutils
from chromite.lib import verity
from chromite.licensing import licenses_lib
from chromite.scripts import cros_set_lsb_release
from chromite.utils import pformat
DLC_BUILD_DIR = "build/rootfs/dlc"
DLC_BUILD_DIR_SCALED = "build/rootfs/dlc-scaled"
DLC_FACTORY_INSTALL_DIR = "unencrypted/dlc-factory-images"
DLC_DIR = "dlc"
DLC_DIR_SCALED = "dlc-scaled"
DLC_GID = 20118
DLC_IMAGE = "dlc.img"
DLC_LOADPIN_FILE_HEADER = "# LOADPIN_TRUSTED_VERITY_ROOT_DIGESTS"
DLC_LOADPIN_TRUSTED_VERITY_DIGESTS = "_trusted_verity_digests"
DLC_META_DIR = "opt/google/dlc"
DLC_PACKAGE = "package"
DLC_TMP_META_DIR = "meta"
DLC_UID = 20118
DLC_VERITY_TABLE = "table"
EBUILD_PARAMETERS = "ebuild_parameters.json"
IMAGELOADER_JSON = "imageloader.json"
LICENSE = "LICENSE"
LSB_RELEASE = "etc/lsb-release"
DLC_ID_RE = r"[a-zA-Z0-9][a-zA-Z0-9-]*"
# This file has major and minor version numbers that the update_engine client
# supports. These values are needed for generating a delta/full payload.
UPDATE_ENGINE_CONF = "etc/update_engine.conf"
_EXTRA_RESOURCES = (UPDATE_ENGINE_CONF,)
# The following boards don't have AppIds, but we allow DLCs to be generated for
# those boards for testing purposes.
_TEST_BOARDS_ALLOWLIST = (
"amd64-generic",
"amd64-generic-koosh",
"arm64-generic",
"arm-generic",
"galaxy",
)
DLC_ID_KEY = "DLC_ID"
DLC_PACKAGE_KEY = "DLC_PACKAGE"
DLC_NAME_KEY = "DLC_NAME"
DLC_APPID_KEY = "DLC_RELEASE_APPID"
SQUASHFS_TYPE = "squashfs"
EXT4_TYPE = "ext4"
USED_BY_USER = "user"
USED_BY_SYSTEM = "system"
_MAX_ID_NAME = 40
_IMAGE_SIZE_NEARING_RATIO = 1.05
_IMAGE_SIZE_GROWTH_RATIO = 1.2
def HashFile(file_path):
"""Calculate the sha256 hash of a file.
Args:
file_path: (str) path to the file.
Returns:
[str]: The sha256 hash of the file.
"""
sha256 = hashlib.sha256()
with open(file_path, "rb") as f:
for b in iter(lambda: f.read(2048), b""):
sha256.update(b)
return sha256.hexdigest()
def GetValueInJsonFile(json_path: str, key: str, default_value=None):
"""Reads file containing JSON and returns value or default_value for key.
Args:
json_path: File containing JSON.
key: The desired key to lookup.
default_value: The default value returned in case of missing key.
"""
with open(json_path, "rb") as fd:
return json.load(fd).get(key, default_value)
class EbuildParams(object):
"""Object to store and retrieve DLC ebuild parameters.
Attributes:
dlc_id: (str) DLC ID.
dlc_package: (str) DLC package.
fs_type: (str) file system type.
pre_allocated_blocks: (int) number of blocks pre-allocated on device.
version: (str) DLC version.
name: (str) DLC name.
description: (str) DLC description.
preload: (bool) allow for preloading DLC.
factory_install: (bool) allow factory installing the DLC.
mount_file_required: (bool) allow for mount file generation for DLC.
used_by: (str) The user of this DLC, e.g. "system" or "user"
days_to_purge: (int) The number of days to keep a DLC after uninstall
and before it is purged.
reserved: (bool) always reserve space for DLC on disk.
critical_update: (bool) DLC always updates with the OS.
fullnamerev: (str) The full package & version name.
loadpin_verity_digest: (bool) DLC digest is part of LoadPin trusted
dm-verity digest.
scaled: (bool) DLC will be fed through scaling design.
"""
def __init__(
self,
dlc_id,
dlc_package,
fs_type,
pre_allocated_blocks,
version,
name,
description,
preload,
used_by,
mount_file_required,
fullnamerev,
reserved=False,
critical_update=False,
days_to_purge=0,
factory_install=False,
loadpin_verity_digest=False,
scaled=False,
):
"""Initializes the object.
When adding a new variable in here, always set a default value. The
reason is that this class is sometimes used to load a pre-existing
ebuild params JSON file (through bin packages) and that file may not
contain the new argument. So the build will fail.
"""
self.dlc_id = dlc_id
self.dlc_package = dlc_package
self.fs_type = fs_type
self.pre_allocated_blocks = pre_allocated_blocks
self.version = version
self.name = name
self.description = description
self.preload = preload
self.factory_install = factory_install
self.used_by = used_by
self.mount_file_required = mount_file_required
self.fullnamerev = fullnamerev
self.days_to_purge = days_to_purge
self.reserved = reserved
self.critical_update = critical_update
self.loadpin_verity_digest = loadpin_verity_digest
self.scaled = scaled
def VerifyDlcParameters(self):
"""Verifies certain DLC parameters are valid and allowed."""
if self.factory_install:
if not dlc_allowlist.IsFactoryInstallAllowlisted(self.dlc_id):
err_msg = (
f"DLC={self.dlc_id} is not allowed to be factory installed."
)
logging.error(err_msg)
raise Exception(err_msg)
def StoreDlcParameters(self, install_root_dir, sudo):
"""Store DLC parameters defined in the ebuild.
Store DLC parameters defined in the ebuild in a temporary file so they
can be retrieved in the build_image phase.
Args:
install_root_dir: (str) The path to the root installation directory.
sudo: (bool) Use sudo to write the file.
"""
ebuild_params_path = EbuildParams.GetParamsPath(
install_root_dir,
self.dlc_id,
self.dlc_package,
self.scaled,
)
osutils.WriteFile(
ebuild_params_path,
json.dumps(self.__dict__),
makedirs=True,
sudo=sudo,
)
@staticmethod
def GetParamsPath(install_root_dir, dlc_id, dlc_package, scaled):
"""Get the path to the file storing the ebuild parameters.
Args:
install_root_dir: (str) The path to the root installation directory.
dlc_id: (str) DLC ID.
dlc_package: (str) DLC package.
scaled: (bool) Scaled DLC option.
Returns:
[str]: Path to |EBUILD_PARAMETERS|.
"""
return os.path.join(
install_root_dir,
DLC_BUILD_DIR_SCALED if scaled else DLC_BUILD_DIR,
dlc_id,
dlc_package,
EBUILD_PARAMETERS,
)
@classmethod
def LoadEbuildParams(cls, sysroot, dlc_id, dlc_package, scaled):
"""Read the stored ebuild parameters file and return a class instance.
Args:
dlc_id: (str) DLC ID.
dlc_package: (str) DLC package.
sysroot: (str) The path to the build root directory.
scaled: (bool) Scaled DLC option.
Returns:
[bool] : True if |ebuild_params_path| exists, False otherwise.
"""
path = cls.GetParamsPath(sysroot, dlc_id, dlc_package, scaled)
if not os.path.exists(path):
return None
with open(path, "rb") as fp:
return cls(**json.load(fp))
def __str__(self):
return str(self.__dict__)
class DlcGenerator(object):
"""Object to generate DLC artifacts."""
# Block size for the DLC image.
# We use 4K for various reasons:
# 1. it's what imageloader (linux kernel) supports.
# 2. it's what verity supports.
_BLOCK_SIZE = 4096
# Blocks in the initial sparse image.
_BLOCKS = 500000
# Version of manifest file.
_MANIFEST_VERSION = 1
# The DLC root path inside the DLC module.
_DLC_ROOT_DIR = "root"
def __init__(self, ebuild_params, sysroot, board, src_dir=None):
"""Object initializer.
Args:
sysroot: (str) The path to the build root directory.
ebuild_params: (EbuildParams) Ebuild variables.
board: (str) The target board we are building for.
src_dir: (str) Optional path to the DLC source root directory. When
None, the default directory in |DLC_BUILD_DIR| is used.
"""
# Use a temporary directory to avoid having to use sudo every time we
# write into the build directory.
self.temp_root = osutils.TempDir(prefix="dlc", sudo_rm=True)
self.src_dir = src_dir
self.sysroot = sysroot
self.board = board
self.ebuild_params = ebuild_params
build_dir = (
DLC_BUILD_DIR_SCALED if ebuild_params.scaled else DLC_BUILD_DIR
)
# If the client is not overriding the src_dir, use the default one.
if not self.src_dir:
self.src_dir = os.path.join(
self.sysroot,
build_dir,
self.ebuild_params.dlc_id,
self.ebuild_params.dlc_package,
self._DLC_ROOT_DIR,
)
self.image_dir = os.path.join(
self.temp_root.tempdir,
build_dir,
self.ebuild_params.dlc_id,
self.ebuild_params.dlc_package,
)
self.meta_dir = os.path.join(self.image_dir, DLC_TMP_META_DIR)
# Create path for all final artifacts.
self.dest_image = os.path.join(self.image_dir, DLC_IMAGE)
self.dest_table = os.path.join(self.meta_dir, "table")
self.dest_imageloader_json = os.path.join(
self.meta_dir, IMAGELOADER_JSON
)
# Log out the member variable values initially set.
logging.debug(
"Initial internal values of DlcGenerator: %s",
repr({k: str(i) for k, i in self.__dict__.items()}),
)
def CopyTempContentsToBuildDir(self):
"""Copy the temp files to the build directory using sudo."""
src = self.temp_root.tempdir.rstrip("/") + "/."
dst = self.sysroot
logging.debug(
"Copy files from temporary directory (%s) to build directory (%s).",
src,
dst,
)
cros_build_lib.sudo_run(["cp", "-dR", src, dst])
def SquashOwnerships(self, path):
"""Squash the owernships & permissions for files.
Args:
path: (str) path that contains all files to be processed.
"""
cros_build_lib.sudo_run(["chown", "-R", "0:0", path])
cros_build_lib.sudo_run(
[
"find",
path,
"-exec",
"touch",
"-h",
"-t",
"197001010000.00",
"{}",
"+",
]
)
def CreateExt4Image(self):
"""Create an ext4 image."""
with osutils.TempDir(prefix="dlc_") as temp_dir:
mount_point = os.path.join(temp_dir, "mount_point")
# Create a raw image file.
osutils.AllocateFile(
self.dest_image, self._BLOCKS * self._BLOCK_SIZE, makedirs=True
)
# Create an ext4 file system on the raw image.
cros_build_lib.run(
[
"/sbin/mkfs.ext4",
"-b",
str(self._BLOCK_SIZE),
"-O",
"^has_journal",
self.dest_image,
],
capture_output=True,
)
# Create the mount_point directory.
osutils.SafeMakedirs(mount_point)
# Mount the ext4 image.
osutils.MountDir(
self.dest_image, mount_point, mount_opts=("loop", "rw")
)
try:
self.SetupDlcImageFiles(mount_point)
finally:
# Unmount the ext4 image.
osutils.UmountDir(mount_point)
# Shrink to minimum size.
cros_build_lib.run(
["/sbin/e2fsck", "-y", "-f", self.dest_image],
capture_output=True,
)
cros_build_lib.run(
["/sbin/resize2fs", "-M", self.dest_image], capture_output=True
)
def CreateSquashfsImage(self):
"""Create a squashfs image."""
with osutils.TempDir(prefix="dlc_") as temp_dir:
squashfs_root = os.path.join(temp_dir, "squashfs-root")
self.SetupDlcImageFiles(squashfs_root)
cros_build_lib.run(
[
"mksquashfs",
squashfs_root,
self.dest_image,
"-4k-align",
"-noappend",
],
capture_output=True,
)
# Verity cannot create hashes for device images which are less than
# two pages in size. So fix this squashfs image if it's too small.
# Check out b/187725419 for details.
if os.path.getsize(self.dest_image) < self._BLOCK_SIZE * 2:
logging.warning(
"Increasing DLC image size to at least two pages."
)
os.truncate(self.dest_image, self._BLOCK_SIZE * 2)
# We changed the ownership and permissions of the squashfs_root
# directory. Now we need to remove it manually.
osutils.RmDir(squashfs_root, sudo=True)
def SetupDlcImageFiles(self, dlc_dir):
"""Prepares the directory dlc_dir with all the files a DLC needs.
Args:
dlc_dir: (str) The path to where to setup files inside the DLC.
"""
dlc_root_dir = os.path.join(dlc_dir, self._DLC_ROOT_DIR)
osutils.SafeMakedirs(dlc_root_dir)
osutils.CopyDirContents(self.src_dir, dlc_root_dir, symlinks=True)
self.PrepareLsbRelease(dlc_dir)
self.AddLicensingFile(dlc_dir)
self.CollectExtraResources(dlc_dir)
self.SquashOwnerships(dlc_dir)
def PrepareLsbRelease(self, dlc_dir):
"""Prepare the file /etc/lsb-release in the DLC module.
This file is used dropping some identification parameters for the DLC.
Args:
dlc_dir: (str) The path to the mounted point during image creation.
"""
app_id = None
platform_lsb_rel_path = os.path.join(self.sysroot, LSB_RELEASE)
if os.path.isfile(platform_lsb_rel_path):
# Reading the platform APPID and creating the DLC APPID.
platform_lsb_release = osutils.ReadFile(platform_lsb_rel_path)
for line in platform_lsb_release.split("\n"):
if line.startswith(cros_set_lsb_release.LSB_KEY_APPID_RELEASE):
app_id = line.split("=")[1]
if app_id is None and self.board not in _TEST_BOARDS_ALLOWLIST:
raise Exception(
"%s does not have a valid key %s"
% (
platform_lsb_rel_path,
cros_set_lsb_release.LSB_KEY_APPID_RELEASE,
)
)
fields = (
(DLC_ID_KEY, self.ebuild_params.dlc_id),
(DLC_PACKAGE_KEY, self.ebuild_params.dlc_package),
(DLC_NAME_KEY, self.ebuild_params.name),
# The DLC appid is generated by concatenating the platform appid
# with the DLC ID using an underscore. This pattern should never be
# changed once set otherwise it can break a lot of things!
(
DLC_APPID_KEY,
"%s_%s" % (app_id if app_id else "", self.ebuild_params.dlc_id),
),
)
lsb_release = os.path.join(dlc_dir, LSB_RELEASE)
osutils.SafeMakedirs(os.path.dirname(lsb_release))
content = "".join("%s=%s\n" % (k, v) for k, v in fields)
osutils.WriteFile(lsb_release, content)
def AddLicensingFile(self, dlc_dir):
"""Add the licensing file for this DLC.
Args:
dlc_dir: (str) The path to the mounted point during image creation.
"""
if not self.ebuild_params.fullnamerev:
return
sysroot = build_target_lib.get_default_sysroot_path(self.board)
licensing = licenses_lib.Licensing(
sysroot, [self.ebuild_params.fullnamerev], True
)
licensing.LoadPackageInfo()
licensing.ProcessPackageLicenses()
license_path = os.path.join(dlc_dir, LICENSE)
licenses = licensing.GenerateLicenseText()
# The first (and only) item contains the values for |self.fullnamerev|.
if licenses:
_, license_txt = next(iter(licenses.items()))
osutils.WriteFile(license_path, license_txt)
else:
logging.info(
"LICENSE text is empty. Skipping LICENSE file creation."
)
def CollectExtraResources(self, dlc_dir):
"""Collect the extra resources needed by the DLC module.
Look at the documentation around _EXTRA_RESOURCES.
Args:
dlc_dir: (str) The path to the mounted point during image creation.
"""
for r in _EXTRA_RESOURCES:
source_path = os.path.join(self.sysroot, r)
target_path = os.path.join(dlc_dir, r)
osutils.SafeMakedirs(os.path.dirname(target_path))
shutil.copyfile(source_path, target_path)
def CreateImage(self):
"""Create the image and copy the DLC files to it."""
logging.debug("Creating the DLC image.")
if self.ebuild_params.fs_type == EXT4_TYPE:
self.CreateExt4Image()
elif self.ebuild_params.fs_type == SQUASHFS_TYPE:
self.CreateSquashfsImage()
else:
raise ValueError(
"Wrong fs type: %s used:" % self.ebuild_params.fs_type
)
def VerifyImageSize(self):
"""Verify the image can fit to the reserved file."""
logging.debug("Verifying the DLC image size.")
image_bytes = os.path.getsize(self.dest_image)
preallocated_bytes = (
self.ebuild_params.pre_allocated_blocks * self._BLOCK_SIZE
)
# Verifies the actual size of the DLC image is NOT larger than the
# preallocated space.
if preallocated_bytes < image_bytes:
raise ValueError(
"The DLC_PREALLOC_BLOCKS (%s) value set in DLC ebuild resulted "
"in a max size of DLC_PREALLOC_BLOCKS * 4K (%s) bytes the DLC "
"image is allowed to occupy. The value is smaller than the "
"actual image size (%s) required. Increase DLC_PREALLOC_BLOCKS "
"in your ebuild to at least %d."
% (
self.ebuild_params.pre_allocated_blocks,
preallocated_bytes,
image_bytes,
self.GetOptimalImageBlockSize(image_bytes),
)
)
image_size_ratio = preallocated_bytes / image_bytes
# Warn if the DLC image size is nearing the preallocated size.
if image_size_ratio <= _IMAGE_SIZE_NEARING_RATIO:
logging.warning(
"The %s DLC image size (%s) is nearing the preallocated size "
"(%s).",
self.ebuild_params.dlc_id,
image_bytes,
preallocated_bytes,
)
# Warn if the DLC preallocated size is too much.
if image_size_ratio >= _IMAGE_SIZE_GROWTH_RATIO:
logging.warning(
"The %s DLC image size (%s) is significantly less than the "
"preallocated size (%s). Reduce the DLC_PREALLOC_BLOCKS in "
"your ebuild",
self.ebuild_params.dlc_id,
image_bytes,
preallocated_bytes,
)
def GetOptimalImageBlockSize(self, image_bytes):
"""Given the image bytes, get the least amount of blocks required."""
return int(math.ceil(image_bytes / self._BLOCK_SIZE))
def GetImageloaderJsonContent(self, image_hash, table_hash, blocks):
"""Return the content of imageloader.json file.
Args:
image_hash: (str) sha256 hash of the DLC image.
table_hash: (str) sha256 hash of the DLC table file.
blocks: (int) number of blocks in the DLC image.
Returns:
[str]: content of imageloader.json file.
"""
return {
"fs-type": self.ebuild_params.fs_type,
"id": self.ebuild_params.dlc_id,
"package": self.ebuild_params.dlc_package,
"image-sha256-hash": image_hash,
"image-type": "dlc",
"is-removable": True,
"manifest-version": self._MANIFEST_VERSION,
"name": self.ebuild_params.name,
"description": self.ebuild_params.description,
"pre-allocated-size": str(
self.ebuild_params.pre_allocated_blocks * self._BLOCK_SIZE
),
"size": str(blocks * self._BLOCK_SIZE),
"table-sha256-hash": table_hash,
"version": self.ebuild_params.version,
"preload-allowed": self.ebuild_params.preload,
"factory-install": self.ebuild_params.factory_install,
"used-by": self.ebuild_params.used_by,
"days-to-purge": self.ebuild_params.days_to_purge,
"mount-file-required": self.ebuild_params.mount_file_required,
"reserved": self.ebuild_params.reserved,
"critical-update": self.ebuild_params.critical_update,
"loadpin-verity-digest": self.ebuild_params.loadpin_verity_digest,
"scaled": self.ebuild_params.scaled,
# Initial rollout is to have all scaled DLCs use logical volumes on
# devices that support LVM stateful.
"use-logical-volume": self.ebuild_params.scaled,
}
def GenerateVerity(self):
"""Generate verity parameters and hashes for the image."""
logging.debug("Generating DLC image verity.")
with osutils.TempDir(prefix="dlc_") as temp_dir:
hash_tree = os.path.join(temp_dir, "hash_tree")
# Get blocks in the image.
blocks = math.ceil(
os.path.getsize(self.dest_image) / self._BLOCK_SIZE
)
result = cros_build_lib.run(
[
"verity",
"mode=create",
"alg=sha256",
"payload=" + self.dest_image,
"payload_blocks=" + str(blocks),
"hashtree=" + hash_tree,
"salt=random",
],
capture_output=True,
)
table = result.stdout
# Append the merkle tree to the image.
osutils.WriteFile(
self.dest_image,
osutils.ReadFile(hash_tree, mode="rb"),
mode="a+b",
)
# Write verity parameter to table file.
osutils.WriteFile(self.dest_table, table, mode="wb")
# Compute image hash.
image_hash = HashFile(self.dest_image)
table_hash = HashFile(self.dest_table)
# Write image hash to imageloader.json file.
blocks = math.ceil(
os.path.getsize(self.dest_image) / self._BLOCK_SIZE
)
imageloader_json_content = self.GetImageloaderJsonContent(
image_hash, table_hash, int(blocks)
)
pformat.json(
imageloader_json_content, fp=self.dest_imageloader_json
)
def GenerateDLC(self):
"""Generate a DLC artifact."""
# Create directories.
osutils.SafeMakedirs(self.image_dir)
osutils.SafeMakedirs(self.meta_dir)
# Create the image into |self.temp_root| and copy the DLC files to it.
self.CreateImage()
# Verify the image created is within pre-allocated size.
self.VerifyImageSize()
# Generate hash tree and other metadata and save them under
# |self.temp_root|.
self.GenerateVerity()
# Copy the files from |self.temp_root| into the build directory.
self.CopyTempContentsToBuildDir()
# Now that the image was successfully generated, delete
# |ebuild_params_path| to indicate that the image in the build directory
# is in sync with the files installed during the build_package phase.
ebuild_params_path = EbuildParams.GetParamsPath(
self.sysroot,
self.ebuild_params.dlc_id,
self.ebuild_params.dlc_package,
self.ebuild_params.scaled,
)
osutils.SafeUnlink(ebuild_params_path, sudo=True)
def IsFieldAllowed(dlc_id: str, dlc_build_dir: str, field: str):
"""Checks if a field is allowed.
Args:
dlc_id: TheDLC ID.
dlc_build_dir: The root path where DLC build files reside.
field: The field name in the metadata json.
"""
dlc_id_dir = os.path.join(dlc_build_dir, dlc_id)
if not os.path.exists(dlc_id_dir):
return False
for package in os.listdir(dlc_id_dir):
image_loader_json = os.path.join(
dlc_id_dir, package, DLC_TMP_META_DIR, IMAGELOADER_JSON
)
if not os.path.exists(image_loader_json):
return False
if not GetValueInJsonFile(
json_path=image_loader_json, key=field, default_value=False
):
return False
return True
def IsDlcPreloadingAllowed(dlc_id: str, dlc_build_dir: str):
"""Validates that DLC is built with DLC_PRELOAD=true.
Args:
dlc_id: The DLC ID.
dlc_build_dir: The root path where DLC build files reside.
"""
return IsFieldAllowed(dlc_id, dlc_build_dir, "preload-allowed")
def IsFactoryInstallAllowed(dlc_id: str, dlc_build_dir: str) -> bool:
"""Validates that DLC is built with DLC_FACTORY_INSTALL=true.
Args:
dlc_id: The DLC ID.
dlc_build_dir: The root path where DLC build files reside.
Returns:
Whether the factory installation for the DLC is allowed.
"""
if not IsFieldAllowed(dlc_id, dlc_build_dir, "factory-install"):
return False
if not dlc_allowlist.IsFactoryInstallAllowlisted(dlc_id):
err_msg = f"DLC={dlc_id} is not allowed to be factory installed."
logging.error(err_msg)
raise Exception(err_msg)
return True
def IsLoadPinVerityDigestAllowed(dlc_id: str, dlc_build_dir: str) -> bool:
"""Checks that DLC is built with DLC_LOADPIN_VERITY_DIGEST set to true.
Args:
dlc_id: The DLC ID.
dlc_build_dir: The root path where DLC build files reside.
Returns:
Boolean based on field being true or false.
"""
return IsFieldAllowed(dlc_id, dlc_build_dir, "loadpin-verity-digest")
def InstallDlcImages(
sysroot,
board,
dlc_id=None,
install_root_dir=None,
preload=False,
factory_install=False,
rootfs=None,
stateful=None,
src_dir=None,
):
"""Copies all DLC image files into the images directory.
Copies the DLC image files in the given build directory into the given DLC
image directory. If the DLC build directory does not exist, or there is no
DLC for that board, this function does nothing.
Args:
sysroot: Path to directory containing DLC images, e.g /build/<board>.
board: The target board we are building for.
dlc_id: (str) DLC ID. If None, all the DLCs will be installed.
install_root_dir: Path to DLC output directory, e.g.
src/build/images/<board>/<version>. If None, the image will be
generated but will not be copied to a destination.
preload: When true, only copies DLC(s) if built with DLC_PRELOAD=true.
factory_install: When true, copies DLC(s) built with
DLC_FACTORY_INSTALL=true.
rootfs: (str) Path to the platform rootfs.
stateful: (str) Path to the platform stateful.
src_dir: (str) Path to the DLC source root directory.
"""
build_dir = os.path.join(sysroot, DLC_BUILD_DIR)
build_dir_scaled = os.path.join(sysroot, DLC_BUILD_DIR_SCALED)
if not os.path.exists(build_dir) and not os.path.exists(build_dir_scaled):
logging.debug(
"DLC build directories (%s) (%s) do not exist, ignoring.",
build_dir,
build_dir_scaled,
)
return
for scaled in (False, True):
dlc_build_dir = build_dir_scaled if scaled else build_dir
if not os.path.exists(dlc_build_dir):
logging.debug("Skipping build directory %s.", dlc_build_dir)
continue
if dlc_id is not None:
if not os.path.exists(os.path.join(dlc_build_dir, dlc_id)):
logging.warning(
"DLC '%s' does not exist in the build directory %s.",
dlc_id,
dlc_build_dir,
)
continue
dlc_ids = [dlc_id]
else:
# Process all DLCs.
# Sort to ease testing.
dlc_ids = sorted(os.listdir(dlc_build_dir))
if not dlc_ids:
logging.info("There are no DLC(s) to copy to output, ignoring.")
return
logging.info(
"Detected the following DLCs (scaled=%s): %s",
scaled,
", ".join(dlc_ids),
)
for d_id in dlc_ids:
dlc_id_path = os.path.join(dlc_build_dir, d_id)
dlc_packages = [
direct
for direct in os.listdir(dlc_id_path)
if os.path.isdir(os.path.join(dlc_id_path, direct))
]
for d_package in dlc_packages:
logging.debug("Building image: DLC %s", d_id)
params = EbuildParams.LoadEbuildParams(
sysroot=sysroot,
dlc_id=d_id,
dlc_package=d_package,
scaled=scaled,
)
# Because portage sandboxes every ebuild package during
# build_packages phase, we cannot delete the old image during
# that phase, but we can use the existence of the file
# |EBUILD_PARAMETERS| to know if the image has to be generated
# or not.
if not params:
logging.debug(
"The ebuild parameters file (%s) for DLC (%s) does not "
"exist. This means that the image was already "
"generated and there is no need to create it again.",
EbuildParams.GetParamsPath(
sysroot, d_id, d_package, scaled=scaled
),
d_id,
)
else:
dlc_generator = DlcGenerator(
src_dir=src_dir,
sysroot=sysroot,
board=board,
ebuild_params=params,
)
dlc_generator.GenerateDLC()
# Copy the dlc images to install_root_dir.
if install_root_dir:
if preload and not IsDlcPreloadingAllowed(
d_id, dlc_build_dir
):
logging.debug(
"Skipping installation of DLC %s because the "
"preload flag is set and the DLC does not "
"support preloading.",
d_id,
)
else:
osutils.SafeMakedirsNonRoot(install_root_dir)
install_dlc_dir = os.path.join(
install_root_dir, d_id, d_package
)
osutils.SafeMakedirsNonRoot(install_dlc_dir)
source_dlc_dir = os.path.join(
dlc_build_dir, d_id, d_package
)
for filepath in (
os.path.join(source_dlc_dir, fname)
for fname in os.listdir(source_dlc_dir)
if fname.endswith(".img")
):
logging.debug(
"Copying DLC(%s) image from %s to %s: ",
d_id,
filepath,
install_dlc_dir,
)
shutil.copy(filepath, install_dlc_dir)
logging.debug(
"Done copying DLC to %s.", install_dlc_dir
)
else:
logging.debug(
"install_root_dir value was not provided. Copying dlc"
" image skipped."
)
# Factory install DLCs.
if (
stateful
and factory_install
and IsFactoryInstallAllowed(d_id, dlc_build_dir)
):
install_stateful_root = os.path.join(
stateful, DLC_FACTORY_INSTALL_DIR
)
install_stateful_dir = os.path.join(
install_stateful_root, d_id, d_package
)
osutils.SafeMakedirs(
install_stateful_dir, mode=0o755, sudo=True
)
source_dlc_dir = os.path.join(
dlc_build_dir, d_id, d_package
)
for filepath in (
os.path.join(source_dlc_dir, fname)
for fname in os.listdir(source_dlc_dir)
if fname.endswith(".img")
):
logging.debug(
"Factory installing DLC(%s) image from %s to %s: ",
d_id,
filepath,
install_stateful_dir,
)
cros_build_lib.sudo_run(
["cp", filepath, install_stateful_dir],
print_cmd=False,
stderr=True,
)
# Change the owner + group of factory install directory.
# Refer to
# http://cs/chromeos_public/src/third_party/eclass-overlay
# or DLC/dlcservice related uid + gid.
cros_build_lib.sudo_run(
[
"chown",
"-R",
"%d:%d" % (DLC_UID, DLC_GID),
install_stateful_root,
]
)
# Create metadata directory in rootfs.
if rootfs:
meta_rootfs = os.path.join(
rootfs, DLC_META_DIR, d_id, d_package
)
osutils.SafeMakedirs(meta_rootfs, sudo=True)
# Copy the metadata files to rootfs.
meta_dir_src = os.path.join(
dlc_build_dir, d_id, d_package, DLC_TMP_META_DIR
)
logging.debug(
"Copying DLC(%s) metadata from %s to %s: ",
d_id,
meta_dir_src,
meta_rootfs,
)
# Use sudo_run since osutils.CopyDirContents doesn't support
# sudo.
cros_build_lib.sudo_run(
[
"cp",
"-dR",
meta_dir_src.rstrip("/") + "/.",
meta_rootfs,
],
print_cmd=False,
stderr=True,
)
# Only allow if explicitly set when emerge'ing the DLC
# ebuild.
if IsLoadPinVerityDigestAllowed(d_id, dlc_build_dir):
# Append the DLC root dm-verity digest.
root_hexdigest = verity.ExtractRootHexdigest(
os.path.join(meta_rootfs, DLC_VERITY_TABLE)
)
if not root_hexdigest:
raise Exception(
f"Could not find root dm-verity digest of "
f"{d_id} in dm-verity table"
)
trusted_verity_digests = os.path.join(
rootfs,
DLC_META_DIR,
DLC_LOADPIN_TRUSTED_VERITY_DIGESTS,
)
# Create the initial digests file with correct LoadPin
# header.
if not os.path.exists(trusted_verity_digests):
osutils.WriteFile(
trusted_verity_digests,
DLC_LOADPIN_FILE_HEADER + "\n",
mode="w",
sudo=True,
)
# Handle duplicates.
if (
root_hexdigest
not in osutils.ReadFile(
trusted_verity_digests
).split()
):
osutils.WriteFile(
trusted_verity_digests,
root_hexdigest + "\n",
mode="a",
sudo=True,
)
else:
logging.debug(
"Skipping addition of LoadPin dm-verity digest of "
"%s.",
d_id,
)
else:
logging.debug(
"rootfs value was not provided. Copying metadata "
"skipped."
)
logging.debug("Done installing DLCs.")
def ValidateDlcIdentifier(name):
"""Validates the DLC identifiers like ID and package names.
The name specifications are:
- No underscore.
- First character should be only alphanumeric.
- Other characters can be alphanumeric and '-' (dash).
- Maximum length of 40 (_MAX_ID_NAME) characters.
For more info see:
https://chromium.googlesource.com/chromiumos/platform2/+/HEAD/dlcservice/docs/developer.md#create-a-dlc-module
Args:
name: The value of the string to be validated.
"""
errors = []
if not name:
errors.append("Must not be empty.")
if not name[0].isalnum():
errors.append("Must start with alphanumeric character.")
if not re.match(r"^[a-zA-Z0-9][a-zA-Z0-9-]*$", name):
errors.append("Must only use alphanumeric and - (dash).")
if len(name) > _MAX_ID_NAME:
errors.append("Must be within %d characters." % _MAX_ID_NAME)
if errors:
msg = "%s is invalid:\n%s" % (name, "\n".join(errors))
raise Exception(msg)