blob: d4c1b16e0bfd77e17ac7459380794e0213eaca0c [file] [log] [blame]
# Copyright 2022 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Holds helper functions to run fuzzers on hardware devices."""
import contextlib
import logging
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple
from chromite.lib import build_target_lib
from chromite.lib import constants
from chromite.lib import cros_build_lib
from chromite.lib import portage_util
from chromite.lib import remote_access
_CROS_GENERATE_SYSROOT = constants.CHROMITE_BIN_DIR / "cros_generate_sysroot"
# TODO(b/255365294): remove duplicate cros_fuzz and
# on device code.
_MAX_TOTAL_TIME_OPTION_NAME = "max_total_time"
_MAX_TOTAL_TIME_DEFAULT_VALUE = 30
class Error(Exception):
"""Base on_device_fuzz error class."""
class SetupError(Error):
"""Error for when setup commands fail."""
def create_sysroot_tarball(
packages: Iterable[str],
board: str,
output_path: Path,
board_build_dir: Optional[Path] = None,
) -> Path:
"""Create a sysroot tarball to install on device.
Args:
packages: Packages we want to include in the sysroot.
Does not implicitly check for virtual/implicit-system.
board: Board to build sysroot tarball for.
output_path: Output path for the tarball. Should
be a file path, not a directory.
board_build_dir: Path to the board build dir
to check for installed packages. Defaults to
"/build/<board>".
Returns:
The output_path of the tarball.
Raises:
SetupError if the setup fails.
"""
if not board_build_dir:
board_build_dir = build_target_lib.get_default_sysroot_path(board)
sysroot_tarball_setup_checks(packages, board_build_dir)
package_str = " ".join(packages)
sdk_cmd = [
_CROS_GENERATE_SYSROOT,
f"--out-file={output_path.name}",
f"--out-dir={output_path.parent}",
f"--board={board}",
f"--package={package_str}",
]
try:
cros_build_lib.run(sdk_cmd, check=True, env={"USE": "asan fuzzer"})
except cros_build_lib.RunCommandError as e:
raise SetupError from e
return output_path
def sysroot_tarball_setup_checks(
packages: Iterable[str], board_build_dir: Path
):
"""Check that we can bundle the necessary packages for a sysroot.
Args:
packages: Packages we want to include in the sysroot.
Does not implicitly check for virtual/implicit-system.
board_build_dir: Path to the board build dir to check for
installed packages.
Raises:
SetupError if the check fails. Otherwise returns None.
"""
not_installed = _check_necessary_installed(packages, board_build_dir)
if not_installed:
logging.error(
"Not all necessary packages are installed in %s:\n%s",
board_build_dir,
not_installed,
)
raise SetupError("Not all necessary packages installed.")
def _check_necessary_installed(
atoms: Iterable[str], board_build_dir: Path
) -> List[Tuple[str, str]]:
"""Helper for checking the packages are installed."""
match_gen = (
(x, portage_util.PortageqMatch(x, sysroot=board_build_dir))
for x in atoms
)
db = portage_util.PortageDB(board_build_dir)
not_installed = []
for atom, package_info in match_gen:
if not package_info:
not_installed.append((atom, None))
continue
if not db.GetInstalledPackage(package_info.category, package_info.pvr):
not_installed.append((atom, package_info.cpvr))
return not_installed
def create_dut_sysroot(
device: remote_access.ChromiumOSDevice,
sysroot_tarball: Path,
sysroot_device_path: Path,
):
"""Create a sysroot on a device using a sysroot tarball.
Args:
device: Device to create the sysroot on.
sysroot_tarball: Path on the build system to the sysroot tarball
to unpack.
sysroot_device_path: Path ON DEVICE for the sysroot to be located.
Should be on an executable partition, like in /usr/local.
Will be created if it does not exist. Should be absolute.
"""
device.mkdir(sysroot_device_path)
dest = sysroot_device_path.parent
device.CopyToDevice(src=str(sysroot_tarball), dest=str(dest), mode="scp")
untar_cmd = [
"tar",
"-xf",
str(dest / sysroot_tarball.name),
"-C",
str(sysroot_device_path),
]
device.run(untar_cmd)
def add_default_fuzz_time(libfuzzer_options: Dict[str, Any]) -> Dict[str, Any]:
"""Add default maximum fuzz time to the libfuzzer options if not set."""
return {
_MAX_TOTAL_TIME_OPTION_NAME: _MAX_TOTAL_TIME_DEFAULT_VALUE,
**libfuzzer_options,
}
def _setup_envs(cmd: List[str]) -> List[str]:
"""Set environment variables for ASAN, MSAN, UBSAN for chroot cmds."""
# We have to prepend these environment variables because they are
# run as a string in the chroot.
sanitizers = ("ASAN", "MSAN", "UBSAN")
options_dict = {"log_path": "stderr", "detect_odr_violation": "0"}
sanitizer_options = ":".join(f"{k}={v}" for k, v in options_dict.items())
sanitizer_vars = [
f"{san}_OPTIONS={sanitizer_options}" for san in sanitizers
]
return cmd + sanitizer_vars
@contextlib.contextmanager
def _sysroot_mount_context(
device: remote_access.ChromiumOSDevice, sysroot_device_path: Path
):
"""Mount necessary system directories into the device sysroot.
System directories are lazily unmounted from the chroot when exiting
this context.
Args:
device: Device which to run fuzzers on.
sysroot_device_path: Location on device where the sysroot is located.
Cannot be the root dir.
"""
if sysroot_device_path == Path("/"):
raise ValueError("sysroot_device_path should never be the root dir")
def _mount(flags, dirname: str):
mount_target = str(sysroot_device_path / dirname)
device.mkdir(mount_target)
device.run(["mount"] + flags + [f"/{dirname}", mount_target])
def _umount(dirname: str):
mount_target = str(sysroot_device_path / dirname)
# Lazily umount, because the mounts may be busy.
device.run(["umount", "-l", mount_target])
_mount(["-t", "proc"], "proc")
_mount(["--rbind"], "dev")
try:
yield
finally:
_umount("dev")
_umount("proc")
def run_fuzzer_executable(
device: remote_access.ChromiumOSDevice,
sysroot_device_path: Path,
sysroot_fuzzer_path: Path,
libfuzzer_options: Optional[Dict[str, Any]] = None,
):
"""Run a fuzzer on a device with an already set-up sysroot.
Args:
device: Device to run fuzzer on.
sysroot_device_path: Location of sysroot on the device.
sysroot_fuzzer_path: Location of the fuzzer, from within the sysroot
on device. Usually something like
Path("/usr/libexec/fuzzers/my_fuzzer").
libfuzzer_options: Key-value pairs to pass to the fuzzer invocation.
Raises:
FileNotFoundError: When the fuzzer on device does not exist.
"""
if not libfuzzer_options:
libfuzzer_options = {}
libfuzzer_options_timed = add_default_fuzz_time(libfuzzer_options)
# We need the following command set up so we can pass arguments and
# environment variables.
libfuzzer_cmd = ["/usr/bin/env"]
libfuzzer_cmd = _setup_envs(libfuzzer_cmd)
libfuzzer_cmd.append(str(sysroot_fuzzer_path))
libfuzzer_cmd += (f"-{k}={v}" for k, v in libfuzzer_options_timed.items())
chroot_run_cmd = ["chroot", str(sysroot_device_path)] + libfuzzer_cmd
outside_dut_sysroot_fuzzer = (
sysroot_device_path / sysroot_fuzzer_path.relative_to("/")
)
if not device.IfFileExists(outside_dut_sysroot_fuzzer):
logging.error(
"Could not find fuzzer on device at" " %s. Maybe not installed?",
outside_dut_sysroot_fuzzer,
)
raise FileNotFoundError(f"Fuzzer {sysroot_fuzzer_path} not found")
with _sysroot_mount_context(device, sysroot_device_path):
logging.info(
"Running %s now on device...",
sysroot_fuzzer_path,
)
# We can't capture output here because fuzzers run indefinitely.
# In theory we could have a process that we poll every so often,
# and then buffer the output, but not capturing the process
# output seems to also do exactly what we need anyways.
device.run(chroot_run_cmd, capture_output=False)