blob: f121aad7ea78b1441c6002877079070f16e6f049 [file] [log] [blame]
# Copyright 2015 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Utilities for manipulating ChromeOS images."""
import errno
import glob
import json
import logging
import os
from pathlib import Path
import re
import stat
from typing import Dict, List, NamedTuple, Optional, Set, Tuple, Union
from chromite.lib import cgpt
from chromite.lib import chromeos_version
from chromite.lib import constants
from chromite.lib import cros_build_lib
from chromite.lib import git
from chromite.lib import install_mask
from chromite.lib import osutils
from chromite.lib import portage_util
from chromite.lib import retry_util
from chromite.lib import signing
from chromite.lib import timeout_util
from chromite.utils import c_blkpg
from chromite.utils import c_loop
# security_check: pass_config mapping.
_SECURITY_CHECKS = {
"no_nonrelease_files": True,
"sane_lsb-release": True,
"secure_kernelparams": True,
"not_ASAN": False,
}
_FACTORY_SHIM_USE_FLAGS = "fbconsole vtconsole factory_shim_ramfs i2cdev vfat"
class Error(Exception):
"""Base image_lib error class."""
class LoopbackError(Error):
"""An exception raised when something went wrong setting up a loopback"""
def _DumpPartitionInfo() -> None:
"""Dump loopdevice related info for debug."""
loop_file_paths = list(Path("/dev").glob("loop*p*"))
cros_build_lib.run(
["fuser", "-mv"] + loop_file_paths,
check=False,
log_output=True,
encoding="utf-8",
)
cros_build_lib.run(
["losetup", "-a"], check=False, log_output=True, encoding="utf-8"
)
class LoopbackPartitions:
"""Loopback mount a file and provide access to its partitions.
This class can be used as a context manager with the "with" statement, or
individual instances of it can be created which will clean themselves up
when garbage collected or when explicitly closed, ala the tempfile module.
In either case, the same arguments should be passed to init.
"""
def __init__(
self,
path,
destination=None,
part_ids=None,
mount_opts=("ro",),
delete: bool = True,
):
"""Initialize.
Args:
path: Path to the backing file.
destination: Base path to mount partitions. If not specified, then
calling Mount() will create a temporary directory and use it.
part_ids: Mount these partitions at context manager entry. This is
only used during initialization of the context manager.
mount_opts: Use these mount_opts for mounting |part_ids|. This is
only used during initialization of the context manager.
delete: Whether to automatically tear down the loopback device.
"""
self.path = path
self.destination = destination
self.dev = None
self.part_ids = part_ids
self.mount_opts = mount_opts
self.delete = delete
self.parts = {}
self._destination_created = False
self._gpt_table = {}
# Set of _gpt_table elements currently mounted.
self._mounted = set()
# Set of dirs that need to be removed in close().
self._to_be_rmdir = set()
# Set of symlinks created.
self._symlinks = set()
self._InitGpt()
def _InitGpt(self):
"""Initialize the GPT info.
This is a separate function for test mocking purposes.
"""
self._gpt_table = GetImageDiskPartitionInfo(self.path)
@classmethod
def attach_image(cls, path: Union[str, os.PathLike]) -> str:
"""Attach |path| disk image and return the loopback path."""
cros_build_lib.AssertRootUser()
# Sync the image file before we mount it as loop device.
osutils.sync_storage(path, filesystem=True)
# Mount the image in the first available loop device.
cmd = ["losetup", "--show", "-f", path]
ret = cros_build_lib.dbg_run(
cmd,
capture_output=True,
encoding="utf-8",
)
dev = ret.stdout.strip()
# Delete existing partitions.
try:
cls._DeletePartitions(dev)
# Add missing partitions.
gpt_table = GetImageDiskPartitionInfo(path)
cls._AddPartitions(dev, gpt_table)
except:
# If we crash, free the loopback device so we don't leak it.
c_loop.detach(dev)
raise
return dev
def Attach(self):
"""Initialize the loopback device.
This is a separate function for test mocking purposes.
"""
try:
if osutils.IsRootUser():
self.dev = self.attach_image(self.path)
else:
result = cros_build_lib.sudo_run(
[
constants.CHROMITE_SCRIPTS_DIR / "cros_losetup",
"attach",
self.path,
],
debug_level=logging.DEBUG,
stdout=True,
)
data = json.loads(result.stdout)
self.dev = data["path"]
part_devs = glob.glob(self.dev + "p*")
if not part_devs:
logging.warning(
"Didn't find partition devices nodes for %s.", self.path
)
return
for part in part_devs:
number = int(re.search(r"p(\d+)$", part).group(1))
self.parts[number] = part
except:
self.close()
raise
@staticmethod
def _CheckNodeIsLoopback(path: Union[str, os.PathLike]) -> Tuple[int, int]:
"""Verify |path| is a loopback device node."""
st = os.stat(path)
if not stat.S_ISBLK(st.st_mode):
raise ValueError(f"{path}: path is not a block device")
# If we ever want to extend the API to taking a file as a reference,
# be aware of diff between st_dev & st_rdev.
major = os.major(st.st_rdev)
minor = os.minor(st.st_rdev)
if major != 7:
raise ValueError(
f"{path}: expecting loop device with major 7, "
f"not {major}:{minor}"
)
return (major, minor)
def DeletePartitions(self):
"""Clear out existing registered partitions."""
self._DeletePartitions(self.path)
@classmethod
def _DeletePartitions(cls, path: Union[str, os.PathLike]):
"""Clear out existing registered partitions."""
major, minor = cls._CheckNodeIsLoopback(path)
def _partition_del_retry(e):
if isinstance(e, OSError) and e.errno == errno.EBUSY:
logging.warning("Deleting partition returned EBUSY.")
_DumpPartitionInfo()
return True
return False
# Check the partitions the kernel knows of.
logging.debug("%s: Clearing registered partitions", path)
sysfs_dev = Path(f"/sys/dev/block/{major}:{minor}")
expecting = []
with osutils.OpenContext(path) as fd:
for part_dir in sysfs_dev.glob(f"loop{minor}p*"):
try:
part_id = (
(part_dir / "partition")
.read_text(encoding="utf-8")
.strip()
)
except FileNotFoundError:
# If the partition file doesn't exist, then this subdir
# isn't a partition we have to remove.
continue
logging.debug("Removing partition %s", part_id)
part_id = int(part_id)
try:
# There is a possibility we might get EBUSY (b/273697462)
# error when deleting partitions. So retry in that case.
retry_util.GenericRetry(
_partition_del_retry,
3,
c_blkpg.delete_partition,
fd,
part_id,
sleep=1,
)
expecting.append(part_id)
except OSError as e:
logging.warning(
"deleting partition %s (part_id=%s) failed: %s",
path,
part_id,
e,
)
if e.errno == errno.EBUSY:
_DumpPartitionInfo()
# Wait for the nodes to be cleaned up from /dev.
for part_id in expecting:
path = cls.ConstructPartitionDevName(minor, part_id)
try:
timeout_util.WaitForReturnTrue(
lambda: not path.exists(), 3, period=0.1
)
except timeout_util.TimeoutError:
logging.warning(
"%s: timeout waiting for device node to be cleaned up", path
)
def AddPartitions(self):
"""Update registered partitions using parsed GPT."""
self._AddPartitions(self.path, self._gpt_table)
@classmethod
def _AddPartitions(cls, path: Union[str, os.PathLike], gpt_table):
"""Update registered partitions using parsed GPT."""
major, minor = cls._CheckNodeIsLoopback(path)
# Check the partitions the kernel knows of.
logging.debug("%s: Registering partitions", path)
sysfs_dev = Path(f"/sys/dev/block/{major}:{minor}")
expecting = []
with osutils.OpenContext(path) as fd:
for part in gpt_table:
sys_part = sysfs_dev / f"loop{minor}p{part.number}"
if sys_part.exists():
logging.debug(
"partition %s already exists; skipping", part.number
)
continue
try:
c_blkpg.add_partition(
fd, part.number, part.start, part.size
)
expecting.append(part.number)
except OSError as e:
logging.warning(
"adding partition %s (part_id=%s) failed: %s",
path,
part.number,
e,
)
# Wait for the nodes to appear in /dev.
for part_id in expecting:
path = cls.ConstructPartitionDevName(minor, part_id)
try:
timeout_util.WaitForReturnTrue(path.exists, 3, period=0.1)
except timeout_util.TimeoutError:
logging.warning(
"%s: timeout waiting for device node to show up", path
)
@staticmethod
def ConstructPartitionDevName(
loopnum: Union[str, int], part_id: Union[str, int]
) -> Path:
"""Return the loopback device for a partition.
Args:
loopnum: The loopback device number.
part_id: Partition number.
"""
return Path("/dev") / f"loop{loopnum}p{part_id}"
def GetPartitionDevName(self, part_id: Union[str, int]):
"""Return the loopback device for a partition.
Args:
part_id: partition name (str) or number (int)
Returns:
String with name of loopback device (e.g. '/dev/loop3p2'). If there
are multiple partitions that match part_id, then the first one from
the partition table is returned.
"""
part_info = self.GetPartitionInfo(part_id)
return "%sp%d" % (self.dev, part_info.number)
def GetPartitionInfo(self, part_id: Union[str, int]):
"""Return the partition info for the given partition ID.
Args:
part_id: partition name (str) or number (int)
Returns:
A PartitionInfo object representing the given partition ID. If there
are multiple partitions that match part_id, then the first one from
the partition table is returned.
"""
for part in self._gpt_table:
if part_id in (part.name, part.number):
return part
raise KeyError(repr(part_id))
def _GetMountPointAndSymlink(self, part):
"""Return tuple of mount point and symlink for a given PartitionInfo.
Args:
part: A PartitionInfo object.
Returns:
(mount_point, symlink) tuple.
"""
dest_number = os.path.join(self.destination, "dir-%d" % part.number)
dest_label = os.path.join(self.destination, "dir-%s" % part.name)
return (dest_number, dest_label)
def Mount(self, part_ids, mount_opts=("ro",)):
"""Mount the given part_ids in subdirectories of the given destination.
Args:
part_ids: list of partition names (str) or numbers (int)
mount_opts: list of mount options to be applied for these
partitions.
Returns:
List of mountpoint paths.
"""
ret = []
for part_id in part_ids:
for part in self._gpt_table:
if part_id in (part.name, part.number):
ret.append(self._Mount(part, mount_opts))
break
else:
raise KeyError(repr(part_id))
return ret
def Unmount(self, part_ids):
"""Mount the given part_ids in subdirectories of the given destination.
Args:
part_ids: list of partition names (str) or numbers (int).
"""
for part_id in part_ids:
for part in self._gpt_table:
if part_id in (part.name, part.number):
self._Unmount(part)
break
else:
raise KeyError(repr(part_id))
def Mounted(self) -> Dict[str, os.PathLike]:
"""Returns information for mounted partitions.
Returns:
A dictionary of partition_names:mount_path.
"""
return {
x.name: self._GetMountPointAndSymlink(x)[0] for x in self._mounted
}
def _IsExt2(self, part_id, offset=0):
"""Is the given partition an ext2 file system?"""
dev = self.GetPartitionDevName(part_id)
return IsExt2Image(dev, offset=offset)
def EnableRwMount(self, part_id, offset=0):
"""Enable RW mounts of the specified partition."""
dev = self.GetPartitionDevName(part_id)
if not self._IsExt2(part_id, offset):
logging.error(
"EnableRwMount called on non-ext2 fs: %s %s", part_id, offset
)
return
ro_compat_ofs = offset + 0x464 + 3
logging.info("Enabling RW mount writing 0x00 to %d", ro_compat_ofs)
# We shouldn't need the sync here, but we sometimes see flakes with some
# kernels where it looks like the metadata written isn't seen when we
# try to mount later on. Adding a sync for 1 byte shouldn't be too bad.
cros_build_lib.sudo_run(
[
"dd",
"of=%s" % dev,
"seek=%d" % ro_compat_ofs,
"conv=notrunc,fsync",
"count=1",
"bs=1",
],
input=b"\0",
debug_level=logging.DEBUG,
stderr=True,
)
def DisableRwMount(self, part_id, offset=0):
"""Disable RW mounts of the specified partition."""
dev = self.GetPartitionDevName(part_id)
if not self._IsExt2(part_id, offset):
logging.error(
"DisableRwMount called on non-ext2 fs: %s %s", part_id, offset
)
return
ro_compat_ofs = offset + 0x464 + 3
logging.info("Disabling RW mount writing 0xff to %d", ro_compat_ofs)
# We shouldn't need the sync here, but we sometimes see flakes with some
# kernels where it looks like the metadata written isn't seen when we
# try to mount later on. Adding a sync for 1 byte shouldn't be too bad.
cros_build_lib.sudo_run(
[
"dd",
"of=%s" % dev,
"seek=%d" % ro_compat_ofs,
"conv=notrunc,fsync",
"count=1",
"bs=1",
],
input=b"\xff",
debug_level=logging.DEBUG,
stderr=True,
)
def _Mount(self, part, mount_opts):
if not self.destination:
self.destination = osutils.TempDir().tempdir
self._destination_created = True
dest_number, dest_label = self._GetMountPointAndSymlink(part)
if part in self._mounted and "remount" not in mount_opts:
return dest_number
osutils.MountDir(
self.GetPartitionDevName(part.number),
dest_number,
makedirs=True,
skip_mtab=False,
sudo=True,
mount_opts=mount_opts,
)
self._mounted.add(part)
osutils.SafeSymlink(os.path.basename(dest_number), dest_label)
self._symlinks.add(dest_label)
return dest_number
def _Unmount(self, part):
"""Unmount a partition that was mounted by _Mount."""
dest_number, _ = self._GetMountPointAndSymlink(part)
# Due to crosbug/358933, the RmDir call might fail. So we skip the
# cleanup.
osutils.UmountDir(dest_number, cleanup=False)
self._mounted.remove(part)
self._to_be_rmdir.add(dest_number)
@classmethod
def detach_loopback(cls, path: Union[str, os.PathLike]) -> bool:
"""Detach |path| loopback device."""
cros_build_lib.AssertRootUser()
cls._DeletePartitions(path)
logging.debug("%s: Detaching loop device", path)
try:
c_loop.detach(path)
except OSError as e:
# If it's already detached, there's nothing to do.
if e.errno == errno.ENXIO:
logging.debug("%s: Device already detached", path)
else:
raise
return True
def close(self):
if self.dev:
for part in list(self._mounted):
self._Unmount(part)
# We still need to remove some directories, since _Unmount did not.
for link in self._symlinks:
osutils.SafeUnlink(link)
self._symlinks = set()
for path in self._to_be_rmdir:
retry_util.RetryException(
cros_build_lib.RunCommandError,
60,
osutils.RmDir,
path,
sudo=True,
sleep=1,
)
self._to_be_rmdir = set()
if osutils.IsRootUser():
self.detach_loopback(self.dev)
else:
cros_build_lib.sudo_run(
[
constants.CHROMITE_SCRIPTS_DIR / "cros_losetup",
"detach",
self.dev,
],
debug_level=logging.DEBUG,
)
self.dev = None
self.parts = {}
self._gpt_table = None
if self._destination_created:
self.destination = None
self._destination_created = False
def __enter__(self):
self.Attach()
if self.part_ids:
self.Mount(self.part_ids, self.mount_opts)
return self
def __exit__(self, exc_type, exc, tb):
if self.delete:
self.close()
def __del__(self):
if self.delete:
self.close()
def WriteLsbRelease(sysroot, fields):
"""Writes out the /etc/lsb-release file into the given sysroot.
Args:
sysroot: The sysroot to write the lsb-release file to.
fields: A dictionary of all the fields and values to write.
"""
content = "\n".join("%s=%s" % (k, v) for k, v in fields.items()) + "\n"
path = os.path.join(sysroot, constants.LSB_RELEASE_PATH.lstrip("/"))
if os.path.exists(path):
# The file has already been pre-populated with some fields. Since
# osutils.WriteFile(..) doesn't support appending with sudo, read in the
# content and prepend it to the new content to write.
# TODO(stevefung): Remove this appending, once all writing to the
# /etc/lsb-release file has been removed from ebuilds and consolidated
# to the buid tools.
content = osutils.ReadFile(path) + content
osutils.WriteFile(path, content, mode="w", makedirs=True, sudo=True)
cros_build_lib.sudo_run(
[
"setfattr",
"-n",
"security.selinux",
"-v",
"u:object_r:cros_conf_file:s0",
path,
]
)
# TODO(b/265885353): update to use path_util or chroot_lib.
def GetLatestImageLink(
board: str, force_chroot: bool = False, pointer: Optional[str] = None
):
"""Get the path for the `latest` image symlink for the given board.
Args:
board: The name of the board.
force_chroot: Get the path as if we are inside the chroot, whether we
actually are.
pointer: Symlink name for image dir.
Returns:
str - The `latest` image symlink path.
"""
base = (
constants.CHROOT_SOURCE_ROOT if force_chroot else constants.SOURCE_ROOT
)
pointer = pointer or "latest"
return os.path.join(base, "src/build/images", board, pointer)
class ImageDoesNotExistError(Error):
"""When the provided or implied image path does not exist."""
class SecurityConfigDirectoryError(Error):
"""The SecurityTestConfig directory does not exist."""
class SecurityTestArgumentError(Error):
"""Invalid SecurityTest argument error."""
class VbootCheckoutError(Error):
"""Error checking out the stable vboot source."""
def SecurityTest(
board: Optional[str] = None,
image: Optional[str] = None,
baselines: Optional[str] = None,
vboot_hash: Optional[str] = None,
):
"""Image security tests.
Args:
board: The board whose image should be tested. Used when |image| is not
provided or is a basename. Defaults to the default board.
image: The path to an image that should be tested, or the basename of
the desired image in the |board|'s build directory.
baselines: The path to a directory containing the baseline configs.
vboot_hash: The commit hash to checkout for the vboot_reference clone.
Returns:
bool - True on success, False on failure.
Raises:
SecurityTestArgumentError: when one or more arguments are not valid.
VbootCheckoutError: when the vboot_reference repository cannot be cloned
or the |vboot_hash| cannot be checked out.
"""
if not cros_build_lib.IsInsideChroot():
cmd = ["security_test_image"]
if board:
cmd += ["--board", board]
if image:
cmd += ["--image", image]
if baselines:
cmd += ["--baselines", baselines]
if vboot_hash:
cmd += ["--vboot-hash", vboot_hash]
result = cros_build_lib.run(cmd, enter_chroot=True, check=False)
return not result.returncode
else:
try:
image = BuildImagePath(board, image)
except ImageDoesNotExistError as e:
raise SecurityTestArgumentError(str(e))
logging.info("Using %s", image)
if not baselines:
baselines = signing.SECURITY_BASELINES_DIR
if not os.path.exists(baselines):
if not os.path.exists(signing.CROS_SIGNING_BASE_DIR):
logging.warning(
"Skipping security tests with public manifest."
)
return True
else:
raise SecurityTestArgumentError(
f"Could not locate security baselines from {baselines} "
"with private manifest."
)
logging.info("Loading baselines from %s", baselines)
if not vboot_hash:
vboot_hash = signing.GetDefaultVbootStableHash()
if not vboot_hash:
raise SecurityTestArgumentError(
"Could not detect vboot_stable_hash in %s."
% signing.CROS_SIGNING_CONFIG
)
logging.info("Using vboot_reference.git rev %s", vboot_hash)
with osutils.TempDir() as tempdir:
config = SecurityTestConfig(image, baselines, vboot_hash, tempdir)
failures = sum(
config.RunCheck(check, with_config)
for check, with_config in _SECURITY_CHECKS.items()
)
if failures:
logging.error("%s tests failed", failures)
else:
logging.info("All tests passed.")
return not failures
def BuildImagePath(board: str, image: str):
"""Build a fully qualified path to the image.
Args:
board: The name of the board whose image is being tested when an image
path is not specified.
image: The path to an image (in which case |image| is simply returned)
or the basename of the image file to use. When |image| is a
basename, the |board| build directory is always used to find it.
"""
# Prefer an image path if provided.
if image and os.sep in image:
if os.path.exists(image):
return image
else:
raise ImageDoesNotExistError(
"The provided image does not exist: %s" % image
)
# We have no image or a basename only, so we need the board to build out the
# full path to an image file.
if not board:
board = cros_build_lib.GetDefaultBoard()
if not board:
if image:
raise ImageDoesNotExistError(
"|image| must be a full path or used with |board|."
)
else:
raise ImageDoesNotExistError(
"Either |image| or |board| must be provided."
)
# Build out the full path using the board's build path.
image_file = image or "recovery_image.bin"
image = os.path.join(GetLatestImageLink(board), image_file)
if not os.path.exists(image):
raise ImageDoesNotExistError("Image does not exist: %s" % image)
return image
class SecurityTestConfig:
"""Hold configurations and do related setup."""
_VBOOT_SRC = os.path.join(
constants.SOURCE_ROOT, "src/platform/vboot_reference/.git"
)
_VBOOT_CHECKS_REL_DIR = "scripts/image_signing"
def __init__(
self, image: str, baselines: str, vboot_hash: str, directory: str
):
"""SecurityTest run configuration.
Args:
image: Path to an image.
baselines: Path to the security baselines.
vboot_hash: Commit hash for the vboot_reference.
directory: The directory to use for the vboot_reference checkout.
Usually a temporary directory.
"""
self.image = image
self.baselines = baselines
self.vboot_hash = vboot_hash
self.directory = directory
self._repo_dir = os.path.join(self.directory, "vboot_source")
self._checks_dir = os.path.join(
self._repo_dir, self._VBOOT_CHECKS_REL_DIR
)
self._checked_out = False
def RunCheck(self, check: str, pass_config: bool) -> bool:
"""Run the given check.
Args:
check: A config.vboot_dir/ensure_|check|.sh check name.
pass_config: Whether the check has a corresponding
`ensure_|check|.config` file to pass.
Returns:
True on success, False on failure.
Raises:
SecurityConfigDirectoryError: if the directory does not exist.
VbootCheckoutError: if the vboot reference repo could not be cloned
or the vboot_hash could not be checked out.
"""
self._VbootCheckout()
cmd = [
os.path.join(self._checks_dir, "ensure_%s.sh" % check),
self.image,
]
if pass_config:
cmd.append(os.path.join(self.baselines, "ensure_%s.config" % check))
try:
self._RunCommand(cmd)
except cros_build_lib.RunCommandError as e:
logging.error("%s test failed: %s", check, e)
return False
else:
return True
def _VbootCheckout(self):
"""Clone the vboot reference repo and checkout the vboot stable hash."""
if not os.path.exists(self.directory):
raise SecurityConfigDirectoryError("The directory does not exist.")
if not self._checked_out:
try:
git.Clone(
self._repo_dir, self._VBOOT_SRC, reference=self._VBOOT_SRC
)
except cros_build_lib.RunCommandError as e:
raise VbootCheckoutError(
"Failed cloning repo from %s: %s" % (self._VBOOT_SRC, e)
)
try:
cros_build_lib.run(
["git", "checkout", "-q", self.vboot_hash],
cwd=self._repo_dir,
)
except cros_build_lib.RunCommandError as e:
raise VbootCheckoutError(
"Failed checking out %s from %s: %s"
% (self.vboot_hash, self._VBOOT_SRC, e)
)
self._checked_out = True
def _RunCommand(self, cmd, *args, **kwargs):
"""Run a command with the signing bin directory in PATH."""
extra_env = {
"PATH": "%s:%s" % (signing.CROS_SIGNING_BIN_DIR, os.environ["PATH"])
}
kwargs["extra_env"] = extra_env.update(kwargs.get("extra_env", {}))
return cros_build_lib.run(cmd, *args, **kwargs)
class PartitionInfo(NamedTuple):
"""A single GPT partition entry."""
# The partition number. Must be within the range [1,256] (Linux limit).
# NB: The number has no relationship to the order on disk. The first
# partition on the disk (i.e. the one with the smallest start) can have
# any partition number.
number: int
# The offset of the start of the partition, in bytes.
start: int
# The size of the partition, in bytes.
size: int
# Filesystem type, if known. e.g. ext2 ext4 fat16
file_system: str = ""
# Partition label/name. May not exceed 36 Unicode characters.
name: str = ""
def _ParseParted(lines):
"""Returns partition information from `parted print` output."""
ret = []
# Sample output (partition #, start, end, size, file system, name, flags):
# /foo/chromiumos_qemu_image.bin:3360MB:file:512:512:gpt:;
# 11:0.03MB:8.42MB:8.39MB::RWFW:;
# 6:8.42MB:8.42MB:0.00MB::KERN-C:;
# 7:8.42MB:8.42MB:0.00MB::ROOT-C:;
# 9:8.42MB:8.42MB:0.00MB::reserved:;
# 10:8.42MB:8.42MB:0.00MB::reserved:;
# 2:10.5MB:27.3MB:16.8MB::KERN-A:;
# 4:27.3MB:44.0MB:16.8MB::KERN-B:;
# 8:44.0MB:60.8MB:16.8MB:ext4:OEM:;
# 12:128MB:145MB:16.8MB:fat16:EFI-SYSTEM:boot;
# 5:145MB:2292MB:2147MB::ROOT-B:;
# 3:2292MB:4440MB:2147MB:ext2:ROOT-A:;
# 1:4440MB:7661MB:3221MB:ext4:STATE:;
pattern = re.compile(r"(([^:]*:){6}[^:]*);")
for line in lines:
match = pattern.match(line)
if match:
values = match.group(1).split(":")
# Kick out the end field.
values.pop(2)
d = dict(zip(PartitionInfo._fields, values))
# Kick out the flags field.
values.pop()
# Disregard any non-numeric partition number (e.g. the file path).
if d["number"].isdigit():
d["number"] = int(d["number"])
for key in ["start", "size"]:
d[key] = int(d[key][:-1])
ret.append(PartitionInfo(**d))
return ret
def GetImageDiskPartitionInfo(image_path):
"""Returns the disk partition table of an image.
Args:
image_path: Path to the image file.
Returns:
A list of PartitionInfo items.
"""
if cros_build_lib.IsInsideChroot():
disk = cgpt.Disk.FromImage(image_path)
return [
PartitionInfo(
number=p.part_num,
start=p.start * 512,
size=p.size * 512,
name=p.label,
)
for p in disk.partitions.values()
]
else:
# Outside chroot, use `parted`. Parted 3.2 and earlier has a bug where
# it will complain that partitions are overlapping even when they are
# not. It does this in a specific case: when inserting a one-sector
# partition into a layout where that partition is snug in between two
# other partitions that have smaller partition numbers. With
# disk_layout_v2.json, this happens when inserting partition 10, KERN-A,
# since the blank padding before it was removed.
# Work around this by telling parted to ignore this "failure"
# interactively.
# Yes, the three dashes are correct, and yes, it _is_ weird.
# TODO(build): Change -m to --json once Parted 3.5 (released Apr 2022)
# is available "everywhere". That probably means once our baseline
# Ubuntu LTS supports it.
cmd = [
"parted",
"---pretend-input-tty",
"-m",
image_path,
"unit",
"B",
"print",
]
# The 'I' input tells parted to ignore its supposed concern about
# overlapping partitions. Cgpt simply ignores the input.
lines = cros_build_lib.dbg_run(
cmd,
extra_env={"PATH": "/sbin:%s" % os.environ["PATH"], "LC_ALL": "C"},
capture_output=True,
encoding="utf-8",
input=b"I",
).stdout.splitlines()
return _ParseParted(lines)
def GetImagesToBuild(image_types: List[str]) -> Set[str]:
"""Construct the images to build from the image type.
Args:
image_types: list of image types.
Returns:
A list of image name to build.
Raises:
ValueError: if an invalid image type is given as input or if factory
shim image is requested along with any other image type.
"""
image_names = set()
for image in image_types:
if image not in constants.IMAGE_TYPE_TO_NAME:
raise ValueError(f"Invalid image type : {image}")
image_names.add(constants.IMAGE_TYPE_TO_NAME[image])
if constants.FACTORY_IMAGE_BIN in image_names and len(image_names) > 1:
raise ValueError(
f"Can't build {constants.FACTORY_IMAGE_BIN} with any other image."
)
return image_names
def GetBuildImageEnvvars(
image_names: Set[str],
board: str,
version_info: Optional[chromeos_version.VersionInfo] = None,
build_dir: Optional[Union[str, os.PathLike]] = None,
output_dir: Optional[Union[str, os.PathLike]] = None,
env_var_init: Optional[Dict[str, str]] = None,
) -> Dict[str, str]:
"""Get the environment variables required to build the given images.
Args:
image_names: The list of images to build.
board: The board for which the images will be built.
version_info: ChromeOS version information that needs to be populated.
build_dir: Directory in which to compose the image.
output_dir: Directory in which to place image result.
env_var_init: Initial environment variables to use.
Returns:
A dictionary of environment variables.
"""
if not env_var_init:
env_var_init = {}
env_var_init["INSTALL_MASK"] = "\n".join(install_mask.DEFAULT)
env_var_init["PRISTINE_IMAGE_NAME"] = constants.BASE_IMAGE_BIN
env_var_init["BASE_PACKAGE"] = "virtual/target-os"
if constants.FACTORY_IMAGE_BIN in image_names:
env_var_init["INSTALL_MASK"] = "\n".join(install_mask.FACTORY_SHIM)
env_var_init["USE"] = (
env_var_init.get("USE", "") + " " + _FACTORY_SHIM_USE_FLAGS
).strip()
env_var_init["PRISTINE_IMAGE_NAME"] = constants.FACTORY_IMAGE_BIN
env_var_init["BASE_PACKAGE"] = "virtual/target-os-factory-shim"
# Mask systemd directories if this is not a systemd image.
if "systemd" not in portage_util.GetBoardUseFlags(board):
env_var_init["INSTALL_MASK"] += "\n" + "\n".join(install_mask.SYSTEMD)
if version_info:
env_var_init["CHROME_BRANCH"] = version_info.chrome_branch
env_var_init["CHROMEOS_BUILD"] = version_info.build_number
env_var_init["CHROMEOS_BRANCH"] = version_info.branch_build_number
env_var_init["CHROMEOS_PATCH"] = version_info.patch_number
env_var_init["CHROMEOS_VERSION_STRING"] = version_info.VersionString()
# TODO(rchandrasekar): Remove 'BUILD_DIR' and 'OUTPUT_DIR' env variables
# after image creation is moved out of build_image.sh script.
if build_dir:
env_var_init["BUILD_DIR"] = str(build_dir)
if output_dir:
env_var_init["OUTPUT_DIR"] = str(output_dir)
return env_var_init
def CreateBuildDir(
build_root: Union[str, os.PathLike],
output_root: Union[str, os.PathLike],
chrome_branch: str,
version: str,
board: str,
symlink: str,
replace: bool = False,
build_attempt: Optional[int] = None,
output_suffix: Optional[str] = None,
) -> Tuple[Path, Path, Path]:
"""Create the build directory based on input arguments.
Args:
build_root: Directory in which to compose the image.
output_root: Directory in which to place the image result.
chrome_branch: Chrome branch number to use.
version: The version string to use for the output directory.
board: The board for which the image is generated.
symlink: The output directory symlink to be created.
replace: Whether to remove and replace the existing directory.
build_attempt: build attempt count to append to directory name.
output_suffix: Any user given output suffix to append to directory name.
Returns:
A tuple of build directory, output directory and symlink directory.
Raises:
FileExistsError when the output build directory already exists.
"""
image_dir = f"R{chrome_branch}-{version}"
if build_attempt:
image_dir += f"-a{build_attempt}"
if output_suffix:
image_dir += f"-{output_suffix}"
board_dir = Path(board) / image_dir
build_dir = Path(build_root) / board_dir
output_dir = Path(output_root) / board_dir
symlink_dir = Path(output_root) / board / symlink
if replace and build_dir.exists():
osutils.RmDir(build_dir, sudo=True)
if build_dir.exists():
logging.error("Directory %s already exists.", build_dir)
logging.error(
"Use --build_attempt option to specify an unused attempt."
)
logging.error(
"Or use --replace if you want to overwrite this directory."
)
raise FileExistsError(
errno.EEXIST, "Unwilling to overwrite %s", build_dir
)
osutils.SafeMakedirs(build_dir)
osutils.SafeMakedirs(output_dir)
osutils.SafeSymlink(image_dir, symlink_dir)
return [build_dir, output_dir, symlink_dir]
def IsSquashfsImage(path):
"""Returns true if |path| is a squashfs filesystem."""
MAGIC = b"\x68\x73\x71\x73"
logging.debug("Checking if image is squashfs: %s", path)
# Read the magic number in the file's superblock.
return (
osutils.ReadFile(path, mode="rb", size=len(MAGIC), sudo=True) == MAGIC
)
def IsExt2Image(path, offset=0):
"""Returns true if |path| is an ext2/ext3/ext4 filesystem."""
MAGIC = b"\x53\xef"
SB_OFFSET = 0x438
logging.debug("Checking if image is ext2/3/4: %s", path)
# Read the magic number in the file's superblock.
return (
osutils.ReadFile(
path, mode="rb", seek=offset + SB_OFFSET, size=len(MAGIC), sudo=True
)
== MAGIC
)