blob: b65920d6fd6da0b11ff450dfe2e31435816e73c8 [file] [log] [blame]
# Copyright 2018 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""The Image API is the entry point for image functionality."""
import dataclasses
import errno
import glob
import json
import logging
import os
from pathlib import Path
import re
import shutil
from typing import Dict, Iterable, List, NamedTuple, Optional, Union
from chromite.api.gen.chromiumos import signing_pb2
from chromite.lib import build_target_lib
from chromite.lib import chromeos_version
from chromite.lib import chroot_lib
from chromite.lib import constants
from chromite.lib import cros_build_lib
from chromite.lib import dlc_lib
from chromite.lib import flexor
from chromite.lib import image_lib
from chromite.lib import osutils
from chromite.lib import portage_util
from chromite.lib import sysroot_lib
from chromite.lib.parser import package_info
PARALLEL_EMERGE_STATUS_FILE_NAME = "status_file"
_IMAGE_TYPE_DESCRIPTION = {
constants.BASE_IMAGE_BIN: "Non-developer Chromium OS",
constants.DEV_IMAGE_BIN: "Developer",
constants.TEST_IMAGE_BIN: "Test",
constants.FACTORY_IMAGE_BIN: "Chromium OS Factory install shim",
constants.FLEXOR_KERNEL_IMAGE_NAME: "Flexor vmlinuz",
}
TERMINA_TOOLS_DIR = os.path.join(
constants.CHROOT_SOURCE_ROOT, "src/platform/container-guest-tools/termina"
)
_LUCI_AUTH_ENV_VARIABLES = [
"GCE_METADATA_HOST",
"GCE_METADATA_IP",
"GCE_METADATA_ROOT",
]
class Error(Exception):
"""Base module error."""
class ChrootError(Error, Exception):
"""Unexpectedly run outside the chroot."""
class ImageToVmError(Error):
"""Error converting the image to a vm."""
class InvalidArgumentError(Error, ValueError):
"""Invalid argument values."""
class InvalidImageTypeError(InvalidArgumentError):
"""Invalid image type(s) specified."""
class MissingImageError(Error):
"""An image that was expected to exist was not found."""
class PushImageError(Error):
"""An error while pushing the image."""
@dataclasses.dataclass
class DlcArtifactsMetadata:
"""Named tuple to hold DLC artifacts metadata.
Attributes:
image_hash: The sha256 hash of the DLC image.
image_name: The DLC image name.
uri_path: The DLC artifacts URI path.
identifier: The DLC ID.
"""
image_hash: str
image_name: str
uri_path: Union[str, os.PathLike]
identifier: str
class BuildConfig(NamedTuple):
"""Named tuple to hold the build configuration options.
Attributes:
builder_path: The value to which the builder path lsb key should be
set, the build_name installed on DUT during hwtest.
disk_layout: The disk layout type.
enable_rootfs_verification: Whether the rootfs verification is enabled.
replace: Whether to replace existing output if any exists.
version: The version string to use for the image.
build_attempt: The build_attempt number to pass to `cros build-image`.
symlink: Symlink name (defaults to "latest").
output_dir_suffix: String to append to the image build directory.
adjust_partition: Adjustments to apply to partition table
(LABEL:[+-=]SIZE) e.g. ROOT-A:+1G
boot_args: Additional boot arguments to pass to the commandline.
output_root: Directory in which to place image result directories.
build_root: Directory in which to compose the image, before copying it
to output_root.
enable_serial: Enable serial port for printks. Example values: ttyS0
kernel_loglevel: The loglevel to add to the kernel command line.
jobs: Number of packages to process in parallel at maximum.
base_is_recovery: Copy the base image to recovery_image.bin.
"""
builder_path: Optional[str] = None
disk_layout: Optional[str] = None
enable_rootfs_verification: bool = True
replace: bool = False
version: Optional[str] = None
build_attempt: int = 1
symlink: str = "latest"
output_dir_suffix: Optional[str] = None
adjust_partition: Optional[str] = None
boot_args: str = "noinitrd"
output_root: Union[str, os.PathLike] = (
constants.DEFAULT_BUILD_ROOT / "images"
)
build_root: Union[str, os.PathLike] = (
constants.DEFAULT_BUILD_ROOT / "images"
)
enable_serial: Optional[str] = None
kernel_loglevel: int = 7
jobs: int = os.cpu_count()
base_is_recovery: bool = False
# TODO(b/232566937): Remove the argument generation function, once the
# build_image.sh is removed.
def GetBuildImageCommand(
config: BuildConfig, image_names: Iterable[str], board: str
) -> List[Union[str, os.PathLike]]:
"""Get the build_image command for the configuration.
Args:
config: BuildConfig to use to generate the command.
image_names: A set of image names that need to be built.
board: The board for which the image to be built.
Returns:
List with build_image command with arguments.
"""
cmd = [
constants.CHROMITE_SHELL_DIR / "build_image.sh",
"--script-is-run-only-by-chromite-and-not-users",
"--board",
board,
]
_config = config._asdict()
if constants.FACTORY_IMAGE_BIN in image_names:
_config["boot_args"] += " cros_factory_install"
if _config["builder_path"]:
cmd.extend(["--builder_path", _config["builder_path"]])
if not _config["enable_rootfs_verification"]:
cmd.append("--noenable_rootfs_verification")
if _config["adjust_partition"]:
cmd.extend(["--adjust_part", _config["adjust_partition"]])
if _config["enable_serial"]:
cmd.extend(["--enable_serial", _config["enable_serial"]])
cmd.extend(
[
"--disk_layout",
_config["disk_layout"] if _config["disk_layout"] else "default",
]
)
cmd.extend(["--boot_args", _config["boot_args"]])
cmd.extend(["--loglevel", f"{_config['kernel_loglevel']}"])
cmd.extend(["--jobs", f"{_config['jobs']}"])
cmd.extend(image_names)
return cmd
class BuildResult:
"""Class to record and report build image results."""
def __init__(self, image_types: List[str]) -> None:
"""Init method.
Args:
image_types: A list of image names that were requested to be built.
"""
self._unbuilt_image_types = image_types
self.images = {}
self.return_code = None
self._failed_packages = []
self.output_dir = None
self.exception = None
@property
def failed_packages(self) -> List[package_info.PackageInfo]:
"""Get the failed packages."""
return self._failed_packages
@failed_packages.setter
def failed_packages(self, packages: Union[Iterable[str], None]) -> None:
"""Set the failed packages."""
self._failed_packages = [package_info.parse(x) for x in packages or []]
@property
def all_built(self) -> bool:
"""Check that all the images that were meant to be built were built."""
return not self._unbuilt_image_types
@property
def build_run(self) -> bool:
"""Check if build images has been run."""
return self.return_code is not None
@property
def run_error(self) -> bool:
"""Check if an error occurred during the build.
True iff build images ran and returned a non-zero return code.
"""
return bool(self.return_code)
@property
def run_success(self) -> bool:
"""Check if the build was successful.
True when the build ran, returned a zero return code, and no failed
packages were parsed.
"""
return self.return_code == 0 and not self.failed_packages
def add_image(self, image_type: str, image_path: Path) -> None:
"""Add an image to the result.
Record the image path by the image name, and remove the image type from
the un-built image list.
"""
if image_path and image_path.exists():
self.images[image_type] = image_path
logging.debug("Added %s image path %s", image_type, image_path)
if image_type in self._unbuilt_image_types:
self._unbuilt_image_types.remove(image_type)
logging.debug("Removed unbuilt type %s", image_type)
else:
logging.warning("Unexpected Image Type %s", image_type)
else:
logging.error(
"%s image path does not exist: %s", image_type, image_path
)
def Build(
board: str,
images: List[str],
config: Optional[BuildConfig] = None,
extra_env: Optional[dict] = None,
) -> BuildResult:
"""Build an image.
Args:
board: The board name.
images: The image types to build.
config: The build configuration options.
extra_env: Environment variables to set for build_image.
Returns:
BuildResult
"""
cros_build_lib.AssertInsideChroot()
if not board:
raise InvalidArgumentError("A build target name is required.")
build_result = BuildResult(images[:])
if not images:
return build_result
config = config or BuildConfig()
try:
image_names = image_lib.GetImagesToBuild(images)
except ValueError:
build_result.exception = InvalidImageTypeError(
f"Invalid image types requested: {' '.join(images)}"
)
build_result.return_code = errno.EINVAL
logging.error(str(build_result.exception))
return build_result
logging.info("The following images will be built %s", " ".join(image_names))
version_info = chromeos_version.VersionInfo(
version_file=constants.SOURCE_ROOT / constants.VERSION_FILE
)
# For Flexor, we don't use build_image.sh, but instead call the build
# command directly.
should_build_flexor = False
if constants.FLEXOR_KERNEL_IMAGE_NAME in image_names:
should_build_flexor = True
image_names.remove(constants.FLEXOR_KERNEL_IMAGE_NAME)
cmd = (
GetBuildImageCommand(config, image_names, board)
if image_names
else None
)
cros_build_lib.ClearShadowLocks(
build_target_lib.get_default_sysroot_path(board)
)
try:
build_dir, output_dir, image_dir = image_lib.CreateBuildDir(
config.build_root,
config.output_root,
version_info.chrome_branch,
config.version or version_info.VersionStringWithDateTime(),
board,
config.symlink,
config.replace,
config.build_attempt,
config.output_dir_suffix,
)
except (FileExistsError, IsADirectoryError) as e:
# IsADirectoryError: Can occur if someone manually creates the
# build/images/${BOARD}/latest directory.
logging.error(e)
build_result.return_code = errno.EEXIST
build_result.exception = e
return build_result
build_result.output_dir = output_dir
extra_env_local = image_lib.GetBuildImageEnvvars(
image_names, board, version_info, build_dir, output_dir, extra_env
)
with osutils.TempDir() as tempdir:
status_file = os.path.join(tempdir, PARALLEL_EMERGE_STATUS_FILE_NAME)
extra_env_local[
constants.PARALLEL_EMERGE_STATUS_FILE_ENVVAR
] = status_file
try:
# We don't need to invoke build_image.sh if we are only
# building Flexor.
if cmd:
result = cros_build_lib.run(
cmd, enter_chroot=True, extra_env=extra_env_local
)
build_result.return_code = result.returncode
except cros_build_lib.RunCommandError as e:
build_result.exception = e
build_result.return_code = e.returncode
if should_build_flexor:
try:
# For now we only sign Flexor with UEFI keys,
# so it is fine to use the devkeys for the
# kernel.
flexor.create_flexor_kernel_image(
build_target=build_target_lib.BuildTarget(board),
version=version_info.VersionString(),
work_dir=build_dir,
keys_dir=constants.VBOOT_DEVKEYS_DIR,
public_key=constants.KERNEL_PUBLIC_SUBKEY,
private_key=constants.KERNEL_DATA_PRIVATE_KEY,
keyblock=constants.KERNEL_KEYBLOCK,
serial=config.enable_serial,
jobs=config.jobs,
build_kernel=True,
)
build_result.return_code = 0
except flexor.FlexorBuildError as e:
build_result.exception = e
build_result.return_code = e.returncode
try:
cros_build_lib.CreateTarball(
constants.FLEXOR_KERNEL_IMAGE_TAR,
build_dir,
inputs=[constants.FLEXOR_KERNEL_IMAGE_NAME],
compression=cros_build_lib.CompressionType.ZSTD,
)
except cros_build_lib.TarballError as e:
build_result.exception = e
build_result.return_code = e.returncode
try:
content = osutils.ReadFile(status_file).strip()
except IOError:
# No file means no packages.
pass
else:
build_result.failed_packages = content.split() if content else None
if build_result.return_code != 0:
return build_result
# Move the completed image to the output_root.
osutils.MoveDirContents(
build_dir, output_dir, remove_from_dir=True, allow_nonempty=True
)
# TODO(rchandrasekar): move build_dlc to a module that we can import.
# Copy DLC images to the output_root directory.
dlc_dir = output_dir / "dlc"
dlc_cmd = [
"build_dlc",
"--sysroot",
build_target_lib.get_default_sysroot_path(board),
"--install-root-dir",
dlc_dir,
"--board",
board,
]
# During build image phase, the DLC images get finalized.
cros_build_lib.run(dlc_cmd)
logging.info("Done. Image(s) created in %s\n", output_dir)
# Save the path to each image that was built.
for image_type in images:
filename = constants.IMAGE_TYPE_TO_NAME[image_type]
image_path = (image_dir / filename).resolve()
logging.debug("%s Resolved Image Path: %s", image_type, image_path)
build_result.add_image(image_type, image_path)
if image_type is constants.IMAGE_TYPE_RECOVERY:
continue
# Get the image path relative to the CWD.
image_path = os.path.relpath(image_path)
msg = (
f"{_IMAGE_TYPE_DESCRIPTION[filename]} image created as {filename}\n"
)
if image_type != constants.IMAGE_TYPE_FLEXOR_KERNEL:
msg += (
"To copy the image to a USB key, use:\n"
f" cros flash usb:// {image_path}\n"
"To flash the image to a Chrome OS device, use:\n"
f" cros flash ${{DUT_IP}} {image_path}\n"
"Note that the device must be accessible over the network.\n"
"A base image will not work in this mode, but a test or "
"dev image will.\n"
)
if any(
filename == x
for x in [constants.DEV_IMAGE_BIN, constants.TEST_IMAGE_BIN]
):
msg += (
"To run the image in a virtual machine, use:\n"
f" cros vm --start --image-path={image_path} --board={board}\n"
)
logging.notice(msg)
return build_result
def _GetResultAndAddImage(
board: str, cmd: list, image_path: Path = None
) -> BuildResult:
"""Add an image to the BuildResult.
Args:
board: The board name.
cmd: An array of command-line arguments.
image_path: The chrooted path to the image.
Returns:
BuildResult
"""
build_result = BuildResult([constants.IMAGE_TYPE_RECOVERY])
result = cros_build_lib.run(cmd, enter_chroot=True, check=False)
build_result.return_code = result.returncode
if result.returncode:
return build_result
image_name = constants.IMAGE_TYPE_TO_NAME[constants.IMAGE_TYPE_RECOVERY]
if image_path:
recovery_image = image_path.parent / image_name
else:
image_dir = Path(image_lib.GetLatestImageLink(board))
image_path = image_dir / image_name
recovery_image = image_path.resolve()
if recovery_image.exists():
build_result.add_image(constants.IMAGE_TYPE_RECOVERY, recovery_image)
return build_result
def CopyBaseToRecovery(board: str, image_path: Path) -> BuildResult:
"""Copy the first base image to recovery_image.bin.
For build targets that do not support a recovery image: the base image gets
copied to "recovery_image.bin" so images are available in the Chromebook
Recovery Utility, GoldenEye and other locations.
Args:
board: The board name.
image_path: The chrooted path to the base image.
Returns:
BuildResult
"""
image_name = constants.IMAGE_TYPE_TO_NAME[constants.IMAGE_TYPE_RECOVERY]
recovery_image_path = image_path.parent / image_name
cmd = ["cp", image_path, recovery_image_path]
return _GetResultAndAddImage(board, cmd, recovery_image_path)
def BuildRecoveryImage(
board: str, image_path: Optional[Path] = None
) -> BuildResult:
"""Build a recovery image.
This must be done after a base image has been created.
Args:
board: The board name.
image_path: The chrooted path to the image, defaults to
chromiums_image.bin.
Returns:
BuildResult
"""
if not board:
raise InvalidArgumentError("board is required.")
cmd = [
constants.CROSUTILS_DIR / "mod_image_for_recovery.sh",
"--board",
board,
]
if image_path:
cmd.extend(["--image", str(image_path)])
return _GetResultAndAddImage(board, cmd, image_path)
def CreateVm(
board: str,
disk_layout: Optional[str] = None,
is_test: bool = False,
chroot: Optional["chroot_lib.Chroot"] = None,
image_dir: Optional[str] = None,
) -> str:
"""Create a VM from an image.
Args:
board: The board for which the VM is being created.
disk_layout: The disk layout type.
is_test: Whether it is a test image.
chroot: The chroot where the image lives.
image_dir: The built image directory.
Returns:
str: Path to the created VM .bin file.
"""
chroot = chroot or chroot_lib.Chroot()
assert board
cmd = ["./image_to_vm.sh", "--board", board]
if is_test:
cmd.append("--test_image")
if disk_layout:
cmd.extend(["--disk_layout", disk_layout])
if image_dir:
inside_image_dir = chroot.chroot_path(image_dir)
cmd.extend(["--from", inside_image_dir])
result = chroot.run(cmd, check=False)
if result.returncode:
# Error running the command. Unfortunately we can't be much more helpful
# than this right now.
raise ImageToVmError(
"Unable to convert the image to a VM. "
"Consult the logs to determine the problem."
)
vm_path = os.path.join(
image_dir or image_lib.GetLatestImageLink(board), constants.VM_IMAGE_BIN
)
return os.path.realpath(vm_path)
def CreateGuestVm(
image_dir: str,
is_test: bool = False,
chroot: chroot_lib.Chroot = None,
) -> str:
"""Convert an existing image into a guest VM image.
Args:
image_dir: The directory containing the built images.
is_test: Flag to create a test guest VM image.
chroot: The chroot where the cros image lives.
Returns:
Path to the created guest VM folder.
"""
assert image_dir
chroot = chroot or chroot_lib.Chroot()
cmd = [os.path.join(TERMINA_TOOLS_DIR, "termina_build_image.py")]
image_dir = chroot.chroot_path(image_dir)
image_file = (
constants.TEST_IMAGE_BIN if is_test else constants.BASE_IMAGE_BIN
)
image_path = os.path.join(image_dir, image_file)
output_dir = (
constants.TEST_GUEST_VM_DIR if is_test else constants.BASE_GUEST_VM_DIR
)
output_path = os.path.join(image_dir, output_dir)
cmd.append(image_path)
cmd.append(output_path)
result = chroot.sudo_run(cmd, check=False)
if result.returncode:
# Error running the command. Unfortunately we can't be much more helpful
# than this right now.
raise ImageToVmError(
"Unable to convert the image to a Guest VM using"
"termina_build_image.py."
"Consult the logs to determine the problem."
)
return os.path.realpath(output_path)
def generate_dlc_artifacts_metadata_list(
sysroot_path: str,
) -> List[DlcArtifactsMetadata]:
"""Generates a list of `DlcArtifacts` from base_path.
Args:
sysroot_path: The sysroot path of the build.
Returns:
A list of `DlcArtifacts`, empty if none.
"""
ret = []
artifacts_meta_dir = os.path.join(
sysroot_path, dlc_lib.DLC_BUILD_DIR_ARTIFACTS_META
)
if not os.path.exists(artifacts_meta_dir):
logging.info(
"The DLC artifacts metadata directory doesn't exist at %s",
artifacts_meta_dir,
)
return ret
dlc_re = f"({dlc_lib.DLC_ID_RE})/{dlc_lib.DLC_PACKAGE}/{dlc_lib.URI_PREFIX}"
pat = f"{artifacts_meta_dir}/{dlc_re}$"
for path in osutils.DirectoryIterator(artifacts_meta_dir):
if not path.is_file():
continue
m = re.search(pat, str(path))
if not m:
continue
dlc_id = m.group(1)
# Create variable for documentation of DLC URI file.
uri_prefix_path = path
dirname = path.parent
imageloader_json_path = dirname / dlc_lib.IMAGELOADER_JSON
if not imageloader_json_path.exists():
logging.error(
"Missing part of metadata from artifacts for DLC=%s, "
"skipping generation",
dlc_id,
)
continue
try:
imageloader_json = json.loads(imageloader_json_path.read_bytes())
except json.decoder.JSONDecodeError:
logging.error(
"Malformed imageloader json for DLC=%s, " "skipping generation",
dlc_id,
)
continue
image_hash = imageloader_json.get(
dlc_lib.IMAGELOADER_IMAGE_SHA256_HASH_KEY, None
)
if image_hash is None:
logging.error(
"Missing digest from imageloader json for DLC=%s, "
"skipping generation",
dlc_id,
)
continue
ret.append(
DlcArtifactsMetadata(
image_hash=image_hash,
image_name=dlc_lib.DLC_IMAGE,
uri_path=osutils.ReadFile(uri_prefix_path),
identifier=dlc_id,
)
)
return ret
def copy_dlc_image(base_path: str, output_dir: str) -> List[str]:
"""Copy DLC images from base_path to output_dir.
Args:
base_path: Base path wherein DLC images are expected to be.
output_dir: Folder destination for DLC images folder.
Returns:
A list of folder paths after move or None if the source path doesn't
exist.
"""
ret = []
for dlc_build_dir, dlc_dir in (
(dlc_lib.DLC_BUILD_DIR, dlc_lib.DLC_DIR),
(dlc_lib.DLC_BUILD_DIR_SCALED, dlc_lib.DLC_DIR_SCALED),
):
dlc_source_path = os.path.join(base_path, dlc_build_dir)
if not os.path.exists(dlc_source_path):
continue
dlc_dest_path = os.path.join(os.path.join(output_dir, dlc_dir))
ret.append(dlc_dest_path)
dlc_data_dest_path = os.path.join(
output_dir, dlc_lib.DLC_TMP_META_DIR, dlc_dir
)
ret.append(dlc_data_dest_path)
try:
os.makedirs(dlc_dest_path)
os.makedirs(dlc_data_dest_path)
except FileExistsError:
pass
# Only archive DLC images, all other uncompressed files/data should not
# be uploaded into archives.
dlc_re = (
f"({dlc_lib.DLC_ID_RE}/{dlc_lib.DLC_PACKAGE}/{dlc_lib.DLC_IMAGE})"
)
dlc_data_re = (
f"({dlc_lib.DLC_ID_RE}/{dlc_lib.DLC_PACKAGE})/"
f"{dlc_lib.DLC_TMP_META_DIR}/{dlc_lib.IMAGELOADER_JSON}"
)
pat = f"/{dlc_build_dir}/{dlc_re}$"
pat_data = f"/{dlc_build_dir}/{dlc_data_re}$"
for path in osutils.DirectoryIterator(dlc_source_path):
if not path.is_file():
continue
m = re.search(pat, str(path))
if m:
img_path = os.path.join(
dlc_dest_path,
m.group(1),
)
os.makedirs(os.path.dirname(img_path))
shutil.copyfile(path, img_path)
m = re.search(pat_data, str(path))
if m:
data_path = os.path.join(
dlc_data_dest_path,
m.group(1),
dlc_lib.IMAGELOADER_JSON,
)
os.makedirs(os.path.dirname(data_path))
shutil.copyfile(path, data_path)
# Empty list returns `None`.
return ret or None
def copy_license_credits(
board: str, output_dir: str, symlink: Optional[str] = None
) -> List[str]:
"""Copies license_credits.html from image build dir to output_dir.
Args:
board: The board name.
output_dir: Folder destination for license_credits.html.
symlink: Symlink name to use instead of "latest".
Returns:
The output path or None if the source path doesn't exist.
"""
filename = "license_credits.html"
license_credits_source_path = os.path.join(
image_lib.GetLatestImageLink(board, pointer=symlink), filename
)
if not os.path.exists(license_credits_source_path):
return None
license_credits_dest_path = os.path.join(output_dir, filename)
shutil.copyfile(license_credits_source_path, license_credits_dest_path)
return license_credits_dest_path
def Test(board: str, result_directory: str, image_dir: str = None) -> bool:
"""Run tests on an already built image.
Currently this is just running test_image.
Args:
board: The board name.
result_directory: Root directory where the results should be stored
relative to the chroot.
image_dir: The path to the image. Uses the board's default image
build path when not provided.
Returns:
True if all tests passed, False otherwise.
"""
if not board:
raise InvalidArgumentError("Board is required.")
if not result_directory:
raise InvalidArgumentError("Result directory required.")
# We don't handle inside/outside chroot path translation. We only need to
# work inside the chroot, so that's OK. Enforce that assumption.
if cros_build_lib.IsOutsideChroot():
raise ChrootError("Image Test service only available inside chroot.")
if not image_dir:
# We can build the path to the latest image directory.
image_dir = image_lib.GetLatestImageLink(board, force_chroot=True)
cmd = [
constants.CHROMITE_BIN_DIR / "test_image",
"--board",
board,
"--test_results_root",
result_directory,
image_dir,
]
result = cros_build_lib.sudo_run(cmd, enter_chroot=True, check=False)
return result.returncode == 0
def create_factory_image_zip(
chroot: chroot_lib.Chroot,
sysroot_class: sysroot_lib.Sysroot,
factory_shim_dir: Path,
version: str,
output_dir: str,
) -> Union[str, None]:
"""Build factory_image.zip in archive_dir.
Args:
chroot: The chroot class used for these artifacts.
sysroot_class: The sysroot where the original environment archive
can be found.
factory_shim_dir: Directory containing factory shim.
version: if not None, version to include in factory_image.zip
output_dir: Directory to store factory_image.zip.
Returns:
The path to the zipfile if it could be created, else None.
"""
filename = "factory_image.zip"
zipfile = os.path.join(output_dir, filename)
cmd = ["zip", "-r", zipfile]
if not factory_shim_dir or not factory_shim_dir.exists():
logging.error(
"create_factory_image_zip: %s not found", factory_shim_dir
)
return None
files = [
"*factory_install*.bin",
"*partition*",
os.path.join("netboot", "*"),
]
cmd_files = []
for file in files:
cmd_files.extend(
["--include", os.path.join(factory_shim_dir.name, file)]
)
# factory_shim_dir may be a symlink. We can not use '-y' here.
cros_build_lib.run(
cmd + [factory_shim_dir.name] + cmd_files,
cwd=factory_shim_dir.parent,
capture_output=True,
)
# Everything in /usr/local/factory/bundle gets overlaid into the
# bundle.
bundle_src_dir = chroot.full_path(
sysroot_class.path, "usr", "local", "factory", "bundle"
)
if os.path.exists(bundle_src_dir):
cros_build_lib.run(
cmd + ["-y", "."], cwd=bundle_src_dir, capture_output=True
)
else:
logging.warning(
"create_factory_image_zip: %s not found, skipping", bundle_src_dir
)
# Add a version file in the zip file.
if version is not None:
version_filename = "BUILD_VERSION"
# Creates a staging temporary folder.
with osutils.TempDir() as temp_dir:
version_file = os.path.join(temp_dir, version_filename)
osutils.WriteFile(version_file, version)
cros_build_lib.run(
cmd + [version_filename], cwd=temp_dir, capture_output=True
)
return zipfile if os.path.exists(zipfile) else None
def create_stripped_packages_tar(
chroot: chroot_lib.Chroot,
build_target: build_target_lib.BuildTarget,
output_dir: str,
) -> Union[str, None]:
"""Build stripped_packages.tar in archive_dir.
Args:
chroot: The chroot class used for these artifacts.
build_target: The build target.
output_dir: Directory to store stripped_packages.tar.
Returns:
The path to the zipfile if it could be created, else None.
"""
package_globs = [
"chromeos-base/chromeos-chrome",
"sys-kernel/*kernel*",
]
board = build_target.name
stripped_pkg_dir = chroot.full_path(build_target.root, "stripped-packages")
tarball_paths = []
strip_package_path = chroot.chroot_path(
constants.CHROMITE_SCRIPTS_DIR / "strip_package"
)
tarball_cwd = chroot.full_path(build_target.root)
for pattern in package_globs:
packages = portage_util.FindPackageNameMatches(
pattern,
board,
chroot=chroot,
)
for cpv in packages:
chroot.run([strip_package_path, "--board", board, cpv.cpf])
# Find the stripped package.
files = glob.glob(os.path.join(stripped_pkg_dir, cpv.cpf) + ".*")
if not files:
bin_path = os.path.join(stripped_pkg_dir, cpv.cpf)
raise AssertionError(
f"Silent failure to strip binary {cpv.cpf}? "
f"Failed to find stripped files at {bin_path}."
)
if len(files) > 1:
logging.warning(
"Expected one stripped package for %s, found %d",
cpv.cpf,
len(files),
)
tarball = sorted(files)[-1]
tarball_paths.append(os.path.relpath(tarball, tarball_cwd))
if not tarball_paths:
# tar barfs on an empty list of files, so skip tarring completely.
return None
tarball_output = os.path.join(output_dir, "stripped-packages.tar")
cros_build_lib.CreateTarball(
tarball_path=tarball_output,
cwd=tarball_cwd,
compression=cros_build_lib.CompressionType.NONE,
chroot=chroot,
inputs=tarball_paths,
)
return tarball_output
def create_netboot_kernel(
board: str,
output_dir: str,
) -> None:
"""Build netboot kernel artifacts in output_dir.
Args:
board: The board being built.
output_dir: Directory to place the artifact
"""
cmd = ["./make_netboot.sh", f"--board={board}", f"--image_dir={output_dir}"]
cros_build_lib.run(cmd, enter_chroot=True)
def create_image_scripts_archive(
build_target: build_target_lib.BuildTarget,
output_dir: str,
) -> Union[str, None]:
"""Create image_scripts.tar.xz.
Args:
chroot: The chroot class used for these artifacts.
build_target: The build target.
output_dir: Directory to image_scripts.tar.xz.
Returns:
The path to the archive, or None if it couldn't be created.
"""
image_dir = image_lib.GetLatestImageLink(build_target.name)
if not os.path.exists(image_dir):
logging.warning("Image build directory not found.")
return None
tarball_path = os.path.join(output_dir, constants.IMAGE_SCRIPTS_TAR)
files = glob.glob(os.path.join(image_dir, "*.sh"))
files = [os.path.basename(f) for f in files]
cros_build_lib.CreateTarball(tarball_path, image_dir, inputs=files)
return tarball_path
def _get_auth_args() -> List[str]:
"""Get the set of LUCI Auth properties off the environment."""
# Check required env variables present.
if any(
x not in os.environ for x in ["LUCI_CONTEXT", *_LUCI_AUTH_ENV_VARIABLES]
):
raise InvalidArgumentError(
"Environment is missing required LUCI auth variables"
)
args = []
# First, we need LUCI_CONTEXT.
luci_context_location = os.environ.get("LUCI_CONTEXT")
luci_context_filename = os.path.basename(luci_context_location)
# Mount the file location as a volume.
args.extend(
["-v", f"{luci_context_location}:/tmp/luci/{luci_context_filename}"]
)
# Env variable for its location.
args.extend(["-e", f"LUCI_CONTEXT=/tmp/luci/{luci_context_filename}"])
# Next, pipe in all the auth env variables.
for variable in _LUCI_AUTH_ENV_VARIABLES:
args.extend(["-e", f"{variable}={os.environ.get(variable)}"])
return args
def CallDocker(
docker_image: str,
docker_args: List[str],
entrypoint_args: List[str],
):
"""Call the signing docker container with the given args.
Args:
docker_image: docker image to run.
docker_args: Args to be passed to docker.
entrypoint_args: Args to be passed.
"""
# First, verify that the docker image exists.
try:
cros_build_lib.run(
["docker", "inspect", "--type=image", docker_image], check=True
)
except Exception:
# TODO (b/295358776) error handling
raise
auth_args = _get_auth_args()
# Invoke the signing docker container.
cros_build_lib.run(
[
"docker",
"run",
# We must run in privileged mode to support /dev/loop*.
"--privileged",
# Use the hosts networking stack so it has access to the luci
# auth proxy.
"--network",
"host",
*auth_args,
# Args passed to docker.
*docker_args,
docker_image,
# Args passed into the entrypoint.
*entrypoint_args,
]
)
def SignImage(
signing_configs: "signing_pb2.BuildTargetSigningConfigs",
archive_dir: Union[str, Path],
result_path: Path,
docker_image: str,
) -> signing_pb2.BuildTargetSignedArtifacts:
"""Sign artifacts based on the given config.
Args:
signing_configs: Config for each artifact to sign.
archive_dir: Path to dir containing input artifacts.
Path must exist on the host.
result_path: Path to place the signed artifacts in.
docker_image: docker image to run.
Returns:
Information about the signed artifacts.
"""
# Everything is going to live in a temp dir to be copied over to docker.
with osutils.TempDir() as tempdir:
# Serialize the proto to a file.
osutils.WriteFile(
os.path.join(tempdir, "proto.bin"),
signing_configs.SerializeToString(),
mode="wb",
)
# TODO (b/295358776) Copy all the paths from the configs into the
# temp dir.
keys_dir = constants.SOURCE_ROOT / "src/platform/signing/keys"
if not keys_dir.exists():
raise InvalidArgumentError(
"Cannot sign on a host that doesn't have keysets checked out."
)
# Invoke the docker container to sign the artifacts.
CallDocker(
docker_image,
[
# Mount the `/dev` directory on the host into `/dev` in the
# container.
"-v",
"/dev:/dev",
# Mount the input dir as a volume.
"-v",
f"{tempdir}:/in",
# Mount the archive dir as a volume.
"-v",
f"{archive_dir}:/archive_dir",
# Mount the output dir as a volume.
"-v",
f"{result_path}:/out",
# Mount the keyset checkout as a volume.
"-v",
f"{keys_dir}:/keys",
],
[
# Args that are passed in to the entrypoint.
"-i",
"/in/proto.bin",
"--archive-dir",
"/archive_dir",
"-o",
"/out",
"-p",
"out_proto.bin",
],
)
output = signing_pb2.BuildTargetSignedArtifacts()
with open(os.path.join(result_path, "out_proto.bin"), "rb") as f:
output.ParseFromString(f.read())
return output
@dataclasses.dataclass
class PushImageArguments:
"""Push image arguments."""
image_dir: str
build_target: build_target_lib.BuildTarget
# See pushimage._SUPPORTED_IMAGE_TYPES.
sign_types: Iterable[str] = ()
dryrun: bool = False
channels: Iterable[str] = ()
destination_bucket: Optional[str] = None
yes: bool = False
@property
def profile(self):
return self.build_target.profile
def get_cli_args(self, urls_file: Optional[Path] = None) -> List[str]:
"""Get the pushimage CLI args."""
args = [
self.image_dir,
"--board",
self.build_target.name,
]
if self.profile and self.profile != "base":
args.extend(["--profile", self.profile])
if self.sign_types:
args.extend(["--sign-types", *self.sign_types])
if self.dryrun:
args.append("--dry-run")
if self.channels:
args.extend(["--channels", " ".join(self.channels)])
if self.destination_bucket:
args.extend(["--dest-bucket", self.destination_bucket])
if self.yes:
args.append("--yes")
if urls_file:
args.extend(["--instruction-urls-file", urls_file])
return args
def run_push_image(args: PushImageArguments) -> Dict[str, List[str]]:
"""Execute the pushimage script.
Named as such to allow the pushimage script to later be refactored into the
service, but because the pushimage script uses vpython, explicitly running
the script instead of just calling the implementation may be necessary for
some time.
"""
with osutils.TempDir() as tmpdir:
urls_file = Path(tmpdir) / "urls.json"
cmd = [
constants.CHROMITE_BIN_DIR / "pushimage",
*args.get_cli_args(urls_file),
]
try:
cros_build_lib.run(cmd)
except cros_build_lib.RunCommandError as e:
raise PushImageError(f"Error running pushimage: {e}") from e
return json.loads(osutils.ReadFile(urls_file))