blob: 1881d85e1d785c8a04a446d327ded2fa626fabf9 [file] [log] [blame]
# Copyright 2014 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Collection of tests to run on the rootfs of a built image.
This module should only be imported inside the chroot.
"""
import collections
import errno
import fnmatch
import glob
import io
import itertools
import logging
import mimetypes
import os
from pathlib import Path
import re
import stat
from typing import NamedTuple
import unittest
from chromite.third_party import lddtree
from chromite.third_party.pyelftools.elftools.common import exceptions
from chromite.third_party.pyelftools.elftools.elf import elffile
import magic # pylint: disable=import-error
# pylint: disable=ungrouped-imports
from chromite.cros.test import usergroup_baseline
from chromite.lib import constants
from chromite.lib import cros_build_lib
from chromite.lib import image_test_lib
from chromite.lib import osutils
from chromite.lib import parseelf
from chromite.lib import portage_util
from chromite.utils.parser import shebang
class LocaltimeTest(image_test_lib.ImageTestCase):
"""Verify that /etc/localtime is a symlink to /var/lib/timezone/localtime.
This is an example of an image test. The image is already mounted. The
test can access rootfs via ROOT_A constant.
"""
def TestLocaltimeIsSymlink(self) -> None:
localtime_path = os.path.join(image_test_lib.ROOT_A, "etc", "localtime")
self.assertTrue(os.path.islink(localtime_path))
def TestLocaltimeLinkIsCorrect(self) -> None:
localtime_path = os.path.join(image_test_lib.ROOT_A, "etc", "localtime")
self.assertEqual(
"/var/lib/timezone/localtime", os.readlink(localtime_path)
)
def _GuessMimeType(magic_obj, file_name):
"""Guess a file's mimetype base on its extension and content.
File extension is favored over file content to reduce noise.
Args:
magic_obj: A loaded magic instance.
file_name: A path to the file.
Returns:
A mime type of |file_name|.
"""
mime_type, _ = mimetypes.guess_type(file_name)
if not mime_type:
mime_type = magic_obj.file(file_name)
return mime_type
class BlockedTest(image_test_lib.ImageTestCase):
"""Verify that rootfs does not contain blocked items."""
BLOCKED_PACKAGES = (
"app-text/iso-codes",
"dev-java/icedtea",
"dev-java/icedtea-bin",
"dev-java/openjdk-bin",
"dev-lang/perl",
"dev-lang/python",
"dev-lang/tcl",
"media-sound/pulseaudio",
"sys-apps/mosys",
"x11-libs/libxklavier",
)
BLOCKED_FILES = (
"/usr/bin/java",
"/usr/bin/javac",
"/usr/bin/perl",
"/usr/bin/python",
"/usr/bin/tclsh",
"/usr/share/kdump/boot/kdump-image",
)
BLOCKED_DIRS = ("/usr/share/locale",)
def TestBlockedDirectories(self) -> None:
for path in self.BLOCKED_DIRS:
full_path = os.path.join(image_test_lib.ROOT_A, path.lstrip(os.sep))
self.assertFalse(
os.path.isdir(full_path), "Directory %s is not allowed." % path
)
def TestBlockedFileTypes(self) -> None:
"""Fail if there are files of prohibited types (e.g. C++ source code).
The allow list has higher precedence than the block list.
"""
blocked_patterns = [
re.compile(x)
for x in [
r"^text/x-c\+\+$",
r"^text/x-c$",
]
]
allowed_patterns = [
re.compile(x)
for x in [
r".*/braille/.*",
r".*/brltty/.*",
r".*/etc/sudoers$",
r".*/dump_vpd_log$",
r".*\.conf$",
r".*/libnl/classid$",
r".*/locale/",
r".*/X11/xkb/",
r".*/chromeos-assets/",
r".*/udev/rules.d/",
r".*/firmware/ar3k/.*pst$",
r".*/etc/services",
# Python reads this file at runtime to look up install features.
r".*/usr/include/python[\d\.]*/pyconfig.h$",
r".*/usr/lib/node_modules/.*",
r".*/usr/share/dev-install/portage",
r".*/opt/pita/qml",
]
]
failures = []
magic_obj = magic.open(magic.MAGIC_MIME_TYPE)
magic_obj.load()
for root, _, file_names in os.walk(image_test_lib.ROOT_A):
for file_name in file_names:
full_name = os.path.join(root, file_name)
if os.path.islink(full_name) or not os.path.isfile(full_name):
continue
mime_type = _GuessMimeType(magic_obj, full_name)
if any(
x.match(mime_type) for x in blocked_patterns
) and not any(x.match(full_name) for x in allowed_patterns):
failures.append(
"File %s type %s is not allowed."
% (full_name, mime_type)
)
magic_obj.close()
self.assertFalse(failures, "\n".join(failures))
def TestBlockedPackages(self) -> None:
"""Fail if any blocked packages are installed."""
for package in self.BLOCKED_PACKAGES:
self.assertFalse(
portage_util.PortageqHasVersion(
package, sysroot=image_test_lib.ROOT_A
)
)
def TestBlockedFiles(self) -> None:
"""Fail if any blocked files exist."""
for path in self.BLOCKED_FILES:
full_path = os.path.join(image_test_lib.ROOT_A, path.lstrip(os.sep))
self.assertFalse(
os.path.exists(full_path),
"Path exists but should not: %s" % full_path,
)
def TestValidInterpreter(self) -> None:
"""Fail if a script's interpreter is not found, or not executable.
A script interpreter is anything after the #! sign, up to the end of
line or the first space.
"""
failures = []
for root, _, file_names in os.walk(image_test_lib.ROOT_A):
for file_name in file_names:
full_name = os.path.join(root, file_name)
file_stat = os.lstat(full_name)
if (
not stat.S_ISREG(file_stat.st_mode)
or (file_stat.st_mode & 0o0111) == 0
):
continue
with open(full_name, "rb") as f:
if f.read(2) != "#!":
continue
line = "#!" + f.readline().strip()
try:
# Ignore arguments to the interpreter.
interp = shebang.parse(line).command
except ValueError:
failures.append(
'File %s has an invalid interpreter path: "%s".'
% (full_name, line)
)
# Absolute path to the interpreter.
interp = os.path.join(image_test_lib.ROOT_A, interp.lstrip("/"))
# Interpreter could be a symlink. Resolve it.
interp = osutils.ResolveSymlinkInRoot(
interp, image_test_lib.ROOT_A
)
if not os.path.isfile(interp):
failures.append(
"File %s uses non-existing interpreter %s."
% (full_name, interp)
)
elif (os.stat(interp).st_mode & 0o111) == 0:
failures.append(
"Interpreter %s is not executable." % interp
)
self.assertFalse(failures, "\n".join(failures))
class LinkageTest(image_test_lib.ImageTestCase):
"""Verify that all binaries and libraries have proper linkage."""
def setUp(self) -> None:
osutils.MountDir(
os.path.join(image_test_lib.STATEFUL, "var_overlay"),
os.path.join(image_test_lib.ROOT_A, "var"),
mount_opts=("bind",),
)
def tearDown(self) -> None:
osutils.UmountDir(
os.path.join(image_test_lib.ROOT_A, "var"),
cleanup=False,
)
def _IsPackageMerged(self, package_name):
has_version = portage_util.PortageqHasVersion(
package_name, sysroot=os.path.abspath(image_test_lib.ROOT_A)
)
if has_version:
logging.info("Package is available: %s", package_name)
else:
logging.info("Package is not available: %s", package_name)
return has_version
def TestLinkage(self) -> None:
"""Find main executable binaries and check their linkage."""
binaries = [
"bin/sed",
]
if self._IsPackageMerged("chromeos-base/chromeos-login"):
binaries.append("sbin/session_manager")
if self._IsPackageMerged("x11-base/xorg-server"):
binaries.append("usr/bin/Xorg")
# When chrome is built with USE="pgo_generate", rootfs chrome is
# actually a symlink to a real binary which is in the stateful
# partition. So we do not check for a valid chrome binary in that case.
if not self._IsPackageMerged(
"chromeos-base/chromeos-chrome[pgo_generate]"
):
if self._IsPackageMerged(
"chromeos-base/chromeos-chrome[app_shell]"
):
binaries.append("opt/google/chrome/app_shell")
elif self._IsPackageMerged("chromeos-base/chromeos-chrome"):
binaries.append("opt/google/chrome/chrome")
if self._IsPackageMerged("net-print/hplip"):
binaries.append("usr/libexec/cups/filter/hpcups")
binaries.append("usr/libexec/cups/filter/hpps")
binaries = [os.path.join(image_test_lib.ROOT_A, x) for x in binaries]
# Grab all .so files
libraries = []
for root, _, files in os.walk(image_test_lib.ROOT_A):
for name in files:
filename = os.path.join(root, name)
if ".so" in filename:
libraries.append(filename)
ldpaths = lddtree.LoadLdpaths(image_test_lib.ROOT_A)
failures = []
for to_test in itertools.chain(binaries, libraries):
# to_test could be a symlink, we need to resolve it relative to
# ROOT_A.
while os.path.islink(to_test):
link = os.readlink(to_test)
if link.startswith("/"):
to_test = os.path.join(image_test_lib.ROOT_A, link[1:])
else:
to_test = os.path.join(os.path.dirname(to_test), link)
try:
elf = lddtree.ParseELF(
to_test, root=image_test_lib.ROOT_A, ldpaths=ldpaths
)
if os.path.basename(to_test) in [
# Deps mounted from squashfs at runtime.
"libcros_camera.so",
# Deps mounted from squashfs at runtime.
"intel-ipu6.so",
# Deps mounted from squashfs at runtime.
"camera.qcom.core.so",
# libasound_module_ctl_ipaudio.so dep outside normal search
# paths.
"libasound_module_pcm_ipaudio.so",
# libfwupdutil.so dep outside normal search paths.
"libfwupdutil.so",
# libfwupdengine.so dep outside normal search paths.
"libfwupdengine.so",
]:
continue
for lib in elf["needed"]:
if not lib in elf["libs"] or not elf["libs"][lib]["path"]:
failures.append(
"Fail linkage test for /%s: unresolved library %s"
% (
os.path.relpath(
to_test, start=image_test_lib.ROOT_A
),
lib,
)
)
except lddtree.exceptions.ELFError:
continue
except IOError as e:
self.fail("Fail linkage test for %s: %s" % (to_test, e))
if failures:
self.fail(str(failures))
@unittest.expectedFailure
class FileSystemMetaDataTest(image_test_lib.ImageTestCase):
"""A test class to gather file system stats such as free inodes, blocks."""
def TestStats(self) -> None:
"""Collect inodes and blocks usage."""
# Find the loopback device that was mounted to ROOT_A.
loop_device = None
root_path = os.path.abspath(os.readlink(image_test_lib.ROOT_A))
for mtab in osutils.IterateMountPoints():
if mtab.destination == root_path:
loop_device = mtab.source
break
self.assertTrue(loop_device, "Cannot find loopback device for ROOT_A.")
# Gather file system stats with tune2fs.
cmd = ["tune2fs", "-l", loop_device]
# tune2fs produces output like this:
#
# tune2fs 1.42 (29-Nov-2011)
# Filesystem volume name: ROOT-A
# Last mounted on: <not available>
# Filesystem UUID: <none>
# Filesystem magic number: 0xEF53
# Filesystem revision #: 1 (dynamic)
# ...
#
# So we need to ignore the first line.
ret = cros_build_lib.sudo_run(
cmd, capture_output=True, extra_env={"LC_ALL": "C"}
)
fs_stat = dict(
line.split(":", 1)
for line in ret.stdout.splitlines()
if ":" in line
)
free_inodes = int(fs_stat["Free inodes"])
free_blocks = int(fs_stat["Free blocks"])
inode_count = int(fs_stat["Inode count"])
block_count = int(fs_stat["Block count"])
block_size = int(fs_stat["Block size"])
sum_file_size = 0
for root, _, filenames in os.walk(image_test_lib.ROOT_A):
for file_name in filenames:
full_name = os.path.join(root, file_name)
file_stat = os.lstat(full_name)
sum_file_size += file_stat.st_size
metadata_size = (block_count - free_blocks) * block_size - sum_file_size
self.OutputPerfValue(
"free_inodes_over_inode_count",
free_inodes * 100 / inode_count,
"percent",
graph="free_over_used_ratio",
)
self.OutputPerfValue(
"free_blocks_over_block_count",
free_blocks * 100 / block_count,
"percent",
graph="free_over_used_ratio",
)
self.OutputPerfValue(
"apparent_size",
sum_file_size,
"bytes",
higher_is_better=False,
graph="filesystem_stats",
)
self.OutputPerfValue(
"metadata_size",
metadata_size,
"bytes",
higher_is_better=False,
graph="filesystem_stats",
)
class SymbolsTest(image_test_lib.ImageTestCase):
"""Tests related to symbols in ELF files."""
def setUp(self) -> None:
# Mapping of file name --> 2-tuple (import, export).
self._known_symtabs = {}
def _GetSymbols(self, file_name):
"""Return a 2-tuple (import, export) of an ELF file |file_name|.
Import and export in the returned tuple are sets of strings (symbol
names).
"""
if file_name in self._known_symtabs:
return self._known_symtabs[file_name]
# We use BytesIO here to obviate fseek/fread time in pyelftools.
stream = io.BytesIO(osutils.ReadFile(file_name, mode="rb"))
try:
elf = elffile.ELFFile(stream)
except exceptions.ELFError:
raise ValueError("%s is not an ELF file." % file_name)
try:
imp, exp = parseelf.ParseELFSymbols(elf)
except exceptions.ELFError as e:
self.fail(f"{file_name}: Unable to parse ELF symbols: {e}")
self._known_symtabs[file_name] = imp, exp
return imp, exp
def TestImportedSymbolsAreAvailable(self) -> None:
"""Ensure all ELF files' imported symbols are available in ROOT-A.
In this test, we find all imported symbols and exported symbols from all
ELF files on the system. This test will fail if the set of imported
symbols is not a subset of exported symbols.
This test DOES NOT simulate ELF loading. "TestLinkage" does that with
`lddtree`.
"""
# Import tables of files, keyed by file names.
importeds = collections.defaultdict(set)
# All exported symbols.
exported = set()
# Allow firmware binaries which are mostly provided by various
# vendors, some in proprietary format. This is OK because the files are
# not executable on the main CPU, so we treat them as blobs that we load
# into external hardware/devices. This is ensured by PermissionTest.
# TestNoExecutableInFirmwareFolder.
permitted_patterns = (
os.path.join("dir-ROOT-A", "lib", "firmware", "*"),
# Jetstream firmware package.
os.path.join("dir-ROOT-A", "usr", "share", "fastrpc", "*"),
)
for root, _, filenames in os.walk(image_test_lib.ROOT_A):
for filename in filenames:
full_name = os.path.join(root, filename)
if os.path.islink(full_name) or not os.path.isfile(full_name):
continue
if any(
fnmatch.fnmatch(full_name, x) for x in permitted_patterns
):
continue
try:
imp, exp = self._GetSymbols(full_name)
except (ValueError, IOError):
continue
else:
importeds[full_name] = imp
exported.update(exp)
# TODO(toolchain): Remove the old libthread_db-1.0.so entry once glibc
# is upgraded past 2.34
known_unsatisfieds = {
"libthread_db-1.0.so": {
b"ps_pdwrite",
b"ps_pdread",
b"ps_lgetfpregs",
b"ps_lsetregs",
b"ps_lgetregs",
b"ps_lsetfpregs",
b"ps_pglobal_lookup",
b"ps_getpid",
},
"libthread_db.so.1": {
b"ps_pdwrite",
b"ps_pdread",
b"ps_lgetfpregs",
b"ps_lsetregs",
b"ps_lgetregs",
b"ps_lsetfpregs",
b"ps_pglobal_lookup",
b"ps_getpid",
},
}
excluded_files = set(
[
# These libraries are built against Android NDK's libc and have
# several imports that will appear to be unsatisfied.
"libmojo_core_arc32.so",
"libmojo_core_arc64.so",
# The camera shared libraries these libraries need are mounted
# at runtime.
"libcros_camera.so",
"camera_hal/intel-ipu6.so",
"camera.qcom.core.so",
"camera_hal/usb.so",
# In glibc 2.35, ldconfig is a static PIE executable with
# dynamic sections which confuses the image test.
# Ignore any missing symbols in it (b/244512686).
"sbin/ldconfig",
]
)
failures = []
for full_name, imported in importeds.items():
parts = full_name.split("/")
file_name = parts[-1]
dir_file_name = "/".join(parts[-2:])
if file_name in excluded_files or dir_file_name in excluded_files:
continue
missing = (
imported - exported - known_unsatisfieds.get(file_name, set())
)
if missing:
failures.append(
"File %s contains unsatisfied symbols: %r"
% (full_name, missing)
)
self.assertFalse(failures, "\n".join(failures))
class UserGroupTest(image_test_lib.ImageTestCase):
"""Tests users and groups in /etc/passwd and /etc/group."""
@staticmethod
def _validate_passwd(entry):
"""Check users that are not in the baseline.
The user ID should match the group ID, and the user's home directory
and shell should be invalid.
"""
uid = entry.uid
gid = entry.gid
if uid != gid:
logging.error(
'New user "%s" has uid %d and different gid %d',
entry.user,
uid,
gid,
)
return False
if entry.home != "/dev/null":
logging.error(
'Expected /dev/null for new user "%s" home dir, got "%s"',
entry.user,
entry.home,
)
return False
if entry.shell != "/bin/false":
logging.error(
'Expected /bin/false for new user "%s" shell, got "%s"',
entry.user,
entry.shell,
)
return False
return True
@staticmethod
def _validate_group(entry):
"""Check groups that are not in the baseline.
Allow groups that have no users and groups with only the matching user.
"""
group_name = entry.group
users = entry.users
# Groups with no users and groups with only the matching user are OK.
if not users or users == {group_name}:
return True
logging.error('New group "%s" has users "%s"', group_name, users)
return False
@staticmethod
def _match_passwd(expected, actual):
"""Match password, uid, gid, home, and shell."""
matched = True
if expected.encpasswd != actual.encpasswd:
matched = False
logging.error(
'Expected encrypted password "%s" for user "%s", got "%s".',
expected.encpasswd,
expected.user,
actual.encpasswd,
)
if expected.uid != actual.uid:
matched = False
logging.error(
'Expected uid %d for user "%s", got %d.',
expected.uid,
expected.user,
actual.uid,
)
if expected.gid != actual.gid:
matched = False
logging.error(
'Expected gid %d for user "%s", got %d.',
expected.gid,
expected.user,
actual.gid,
)
if isinstance(expected.home, set):
valid_home = actual.home in expected.home
else:
valid_home = actual.home == expected.home
if not valid_home:
matched = False
logging.error(
'Expected home "%s" for user "%s", got "%s".',
expected.home,
expected.user,
actual.home,
)
if isinstance(expected.shell, set):
valid_shell = actual.shell in expected.shell
else:
valid_shell = actual.shell == expected.shell
if not valid_shell:
matched = False
logging.error(
'Expected shell "%s" for user "%s", got "%s".',
expected.shell,
expected.user,
actual.shell,
)
return matched
@staticmethod
def _match_group(expected, actual):
"""Match password, gid, and members."""
matched = True
if expected.encpasswd != actual.encpasswd:
matched = False
logging.error(
'Expected encrypted password "%s" for group "%s", got "%s".',
expected.encpasswd,
expected.group,
actual.encpasswd,
)
if expected.gid != actual.gid:
matched = False
logging.error(
'Expected gid %d for group "%s", got %d.',
expected.gid,
expected.group,
actual.gid,
)
# Ignore self-membership for now.
if (expected.users - {expected.group}) != (
actual.users - {expected.group}
):
matched = False
logging.error(
'Expected members "%s" for group "%s", got "%s".',
expected.users,
expected.group,
actual.users,
)
return matched
def _LoadPath(self, path):
"""Load the given passwd/group file.
Args:
path: Path to the file.
Returns:
A dict of passwd/group entries indexed by account name.
"""
d = {}
for line in osutils.ReadFile(path).splitlines():
fields = line.split(":")
if len(fields) == 7:
# wpa:!:219:219::/dev/null:/bin/false
entry = usergroup_baseline.UserEntry(
user=fields[0],
encpasswd=fields[1],
uid=int(fields[2]),
gid=int(fields[3]),
home=fields[5],
shell=fields[6],
)
d[entry.user] = entry
elif len(fields) == 4:
# tty:!:5:power,brltty
users = set()
if fields[3]:
users = set(fields[3].split(","))
entry = usergroup_baseline.GroupEntry(
group=fields[0],
encpasswd=fields[1],
gid=int(fields[2]),
users=users,
)
d[entry.group] = entry
else:
raise ValueError('Invalid baseline format "%s"' % line)
return d
def _LoadBaseline(self, basename):
"""Loads the passwd or group baseline."""
d = None
if "passwd" in basename:
d = usergroup_baseline.USER_BASELINE.copy()
# Per-board baseline.
if (
self._board
and self._board in usergroup_baseline.USER_BOARD_BASELINES
):
d.update(usergroup_baseline.USER_BOARD_BASELINES[self._board])
elif "group" in basename:
d = usergroup_baseline.GROUP_BASELINE.copy()
# Per-board baseline.
if (
self._board
and self._board in usergroup_baseline.GROUP_BOARD_BASELINES
):
d.update(usergroup_baseline.GROUP_BOARD_BASELINES[self._board])
else:
raise ValueError('Invalid basename "%s"' % basename)
return d
def _CheckFile(self, basename) -> None:
"""Validates the passwd or group file."""
match_func = getattr(self, "_match_%s" % basename)
validate_func = getattr(self, "_validate_%s" % basename)
expected_entries = self._LoadBaseline(basename)
actual_entries = self._LoadPath(
os.path.join(image_test_lib.ROOT_A, "etc", basename)
)
success = True
for entry, details in actual_entries.items():
if entry not in expected_entries:
is_valid = validate_func(details)
if not is_valid:
logging.error(
'Unexpected %s entry for "%s".', basename, entry
)
success = success and is_valid
continue
expected = expected_entries[entry]
match_res = match_func(expected, details)
success = success and match_res
missing = set(expected_entries.keys()) - set(actual_entries.keys())
for m in missing:
logging.info('Ignoring missing %s entry for "%s".', basename, m)
self.assertTrue(success)
def TestUsers(self) -> None:
"""Enforces known user IDs."""
self._CheckFile("passwd")
def TestGroups(self) -> None:
"""Enforces known group IDs."""
self._CheckFile("group")
class CroshTest(image_test_lib.ImageTestCase):
"""Check crosh code."""
# Base directory for crosh code.
CROSH_DIR = "usr/share/crosh"
def TestUnknownModules(self) -> None:
"""Only permit known crosh modules on the system."""
# Do *not* add modules to this list until they've been reviewed by
# security or someone in the crosh/OWNERS list. Insecure code here can
# easily cause compromise of CrOS system security in verified mode. It
# has happened.
ALLOWED = {
"dev.d": {"50-crosh.sh"},
"extra.d": set(),
"removable.d": {"50-crosh.sh"},
}
base_path = os.path.join(image_test_lib.ROOT_A, self.CROSH_DIR)
for mod_dir, good_modules in ALLOWED.items():
mod_path = os.path.join(base_path, mod_dir)
if not os.path.exists(mod_path):
continue
found_modules = set(os.listdir(mod_path))
unknown_modules = found_modules - good_modules
self.assertEqual(set(), unknown_modules)
class SymlinkTest(image_test_lib.ImageTestCase):
"""Verify symlinks in the rootfs."""
# These are an allow list only. We don't require any of these to actually
# be symlinks. But if they are, they have to point to these targets.
#
# The key is the symlink and the value is the symlink target.
# Both accept fnmatch style expressions (i.e. globs).
_ACCEPTABLE_LINKS = {
# Allow any /etc path to point to any /run path.
"/etc/*": {"/run/*"},
"/etc/localtime": {"/var/lib/timezone/localtime"},
"/etc/machine-id": {"/var/lib/dbus/machine-id"},
"/etc/mtab": {"/proc/mounts"},
# Some boards don't set this up properly. It's not a big deal.
"/usr/libexec/editor": {"/usr/bin/*"},
# These are hacks to make dev images and `dev_install` work. Normally
# /usr/local isn't mounted or populated, so it's not too big a deal to
# let these things always point there.
"/etc/env.d/*": {"/usr/local/etc/env.d/*"},
"/usr/bin/python*": {
"/usr/local/usr/bin/python*",
"/usr/local/bin/python*",
},
"/usr/lib/portage": {
"/usr/local/usr/lib/portage",
"/usr/local/lib/portage",
},
"/usr/lib/python-exec": {
"/usr/local/usr/lib/python-exec",
"/usr/local/lib/python-exec",
},
"/usr/lib/python*": {
"/usr/local/usr/lib/python*",
"/usr/local/lib/python*",
},
"/usr/lib64/python*": {
"/usr/local/usr/lib64/python*",
"/usr/local/lib64/python*",
},
"/usr/lib/debug": {"/usr/local/usr/lib/debug"},
# Used by `file` and libmagic.so when the package is in /usr/local.
"/usr/share/misc/magic.mgc": {"/usr/local/share/misc/magic.mgc"},
"/usr/share/portage": {"/usr/local/share/portage"},
# Needed for the ARC++/ARCVM dual build. For test images only.
"/opt/google/vms/android": {"/usr/local/vms/android"},
# TODO(b/150806692): Cleanup this library symlink.
# Allow /opt/pita/lib path to point to any /run path. For PluginVM DLC.
"/opt/pita/lib": {"/run/*"},
}
@classmethod
def _SymlinkTargetAllowed(cls, source, target):
"""See whether |source| points to an acceptable |target|."""
# Scan the allow list.
for allow_source, allow_targets in cls._ACCEPTABLE_LINKS.items():
if fnmatch.fnmatch(source, allow_source) and any(
fnmatch.fnmatch(target, x) for x in allow_targets
):
return True
# Reject everything else.
return False
def TestCheckSymlinkTargets(self) -> None:
"""Make sure the targets of all symlinks are 'valid'."""
failures = []
for root, _, files in os.walk(image_test_lib.ROOT_A):
for name in files:
full_path = os.path.join(root, name)
try:
target = os.readlink(full_path)
except OSError as e:
# If it's not a symlink, ignore it.
if e.errno == errno.EINVAL:
continue
raise
# Ignore symlinks to just basenames.
if "/" not in target:
continue
# Resolve the link target relative to the rootfs.
resolved_target = osutils.ResolveSymlinkInRoot(
full_path, image_test_lib.ROOT_A
)
normed_target = os.path.normpath(resolved_target)
# If the target exists, it's fine.
if os.path.exists(normed_target):
continue
# Now check the allow list.
source = "/" + os.path.relpath(full_path, image_test_lib.ROOT_A)
if not self._SymlinkTargetAllowed(source, target):
failures.append((source, target))
for source, target in failures:
logging.error("Insecure symlink: %s -> %s", source, target)
self.assertEqual(0, len(failures))
class PermissionTest(image_test_lib.ImageTestCase):
"""Verify file permissions."""
def TestNoExecutableInFirmwareFolder(self) -> None:
"""Ensure all files in ROOT-A/lib/firmware are not executable.
Files under ROOT-A/lib/firmware will be allowed in
"TestImportedSymbolsAreAvailable".
"""
firmware_path = os.path.join(image_test_lib.ROOT_A, "lib", "firmware")
success = True
for root, _, filenames in os.walk(firmware_path):
for filename in filenames:
full_name = os.path.join(root, filename)
# We check symlinks in SymlinkTest, so no need to recheck here.
if os.path.islink(full_name) or not os.path.isfile(full_name):
continue
st = os.stat(full_name)
if st.st_mode & 0o111:
success = False
logging.error(
"Executable file not allowed in /lib/firmware: %s.",
filename,
)
self.assertTrue(success)
class IntelWifiTest(image_test_lib.ImageTestCase):
"""Verify installation of iwlwifi driver and firmware.
Intel WiFi chips need a kernel module and a firmware file. Test that they're
installed correctly, in particular that there's no version mismatch between
the two or that the firmware file for a particular chip is missing entirely.
"""
def _FindKernelVersion(self):
"""Detect the version of the kernel used by the image."""
module_top = os.path.join(image_test_lib.ROOT_A, "lib", "modules")
if not os.path.isdir(module_top):
logging.error('Path "%s" is not a directory.', module_top)
return None
kernels = os.listdir(module_top)
if len(kernels) != 1:
logging.error(
"Image has %d kernel versions, expected 1.", len(kernels)
)
logging.error("Found kernel versions: %s", ", ".join(kernels))
return None
return kernels[0]
def _FindDriverSupportedFirmware(self, kernel):
"""List all the firmware files supported by the driver.
The iwlwifi driver has the path of the various firmware versions that it
supports built in. The list of firmware versions is available through
the 'modinfo' command.
Args:
kernel: A string containing the kernel version.
Returns:
A list of strings containing the names of all the firmware files
that can be loaded by the iwlwifi driver.
"""
# The iwlwifi module lists the firmware files that it can load.
# Typical output of the 'modinfo' command:
# iwlwifi-7265-17.ucode
# iwlwifi-7265D-29.ucode
# iwlwifi-8000C-36.ucode
# iwlwifi-8265-36.ucode
# iwlwifi-9000-pu-b0-jf-b0-46.ucode
# iwlwifi-9260-th-b0-jf-b0-46.ucode
try:
cmd = [
"modinfo",
"-F",
"firmware",
"-b",
image_test_lib.ROOT_A,
"-k",
kernel,
"iwlwifi",
]
modinfo = cros_build_lib.run(
cmd, print_cmd=False, capture_output=True, encoding="utf-8"
)
except cros_build_lib.RunCommandError as e:
# It's not necessarily an error to have enabled the firmware but not
# the iwlwifi driver (e.g. bringup) -> log a warning, not an error.
logging.warning("Could not query iwlwifi driver.")
logging.warning(
'"%s" returned code %d.', " ".join(cmd), e.returncode
)
logging.warning("stdout: %s", e.stdout)
logging.warning("stderr: %s", e.stderr)
return []
return modinfo.stdout.splitlines()
def _GetLinuxFirmwareIwlwifiFlags(self):
"""Extract 'iwlwifi-*' flags from LINUX_FIRMWARE."""
linux_firmware = portage_util.PortageqEnvvar(
"LINUX_FIRMWARE", board=self._board, allow_undefined=True
)
if not linux_firmware:
logging.info("Board %s doesn't use LINUX_FIRMWARE.", self._board)
return []
# Look for flags 'iwlwifi-all', 'iwlwifi-9260', 'iwlwifi-QuZ', etc.
flags = [x for x in linux_firmware.split() if x.startswith("iwlwifi-")]
if not flags:
logging.info("Board %s doesn't support iwlwifi.", self._board)
return []
logging.info("Expecting the following WiFi chips: %s", ", ".join(flags))
return flags
def _GetIwlwifiFirmwareFiles(self):
"""List all the iwlwifi-* files in /lib/firmware."""
pathname = os.path.join(
image_test_lib.ROOT_A, "lib", "firmware", "iwlwifi-*"
)
return [os.path.basename(x) for x in glob.glob(pathname)]
def TestIwlwifiFirmwareAndKernelMatch(self) -> None:
"""Ensure that the firmware files are supported by the kernel.
The iwlwifi firmware files expected by the driver must be present in
/lib/firmware. This will also ensure that there's no version mismatch
between the driver and the firmware.
"""
iwlwifi_flags = self._GetLinuxFirmwareIwlwifiFlags()
if not iwlwifi_flags:
self.skipTest("Could not find iwlwifi flags.")
if "iwlwifi-all" in iwlwifi_flags:
self.skipTest("All firmware files have been installed.")
# Find the kernel version of the image, necessary to call 'modinfo'
# later.
kernel = self._FindKernelVersion()
if kernel is None:
self.skipTest("Failed to detect the kernel version.")
modinfo_files = self._FindDriverSupportedFirmware(kernel)
if not modinfo_files:
self.skipTest("Could not find iwlwifi module.")
iwlwifi_files = self._GetIwlwifiFirmwareFiles()
# We have at least one iwlwifi-* flag listed in LINUX_FIRMWARE, ensure
# that at least one firmware file is present.
self.assertTrue(iwlwifi_files, "No iwlwifi firmware file installed.")
# Ensure that for every iwlwifi-* flag listed in LINUX_FIRMWARE, the
# driver has at least one corresponding firmware file listed, and at
# least one of the firmware files is present on the rootfs.
for flag in iwlwifi_flags:
supported_fw = {x for x in modinfo_files if x.startswith(flag)}
available_fw = {x for x in iwlwifi_files if x.startswith(flag)}
logging.info(
'The driver supports the following "%s" files: %s',
flag,
", ".join(supported_fw),
)
logging.info(
'The rootfs provides the following "%s" files: %s',
flag,
", ".join(available_fw),
)
self.assertTrue(
supported_fw & available_fw,
"Driver/firmware mismatch for %s" % flag,
)
class DBusServiceTest(image_test_lib.ImageTestCase):
"""Verify installed D-Bus service file contents."""
def TestDelegationToUpstart(self) -> None:
"""Check D-Bus service files for delegation to Upstart.
crbug.com/1025914: To prevent D-Bus activated services from running
indefinitely, each D-Bus activated service file should have an
associated Upstart job that manages the lifecycle of the service.
The Exec clause can either start with "Exec=/sbin/start(whitespace)"
(delegate to upstart) or should be "Exec=/sbin/false" (D-Bus service
activations disabled).
"""
DBUS_HEADER_RE = re.compile(r"^\[D-BUS Service]$", re.MULTILINE)
EXEC_CLAUSE_RE = re.compile(
r"^Exec=(/sbin/start\s|/bin/false)", re.MULTILINE
)
dbus_service_path_spec = (
"%s/usr/share/dbus-1/*services/*.service" % image_test_lib.ROOT_A
)
success = True
for filename in glob.iglob(dbus_service_path_spec):
file_contents = osutils.ReadFile(filename)
if DBUS_HEADER_RE.search(
file_contents
) and not EXEC_CLAUSE_RE.search(file_contents):
success = False
logging.error(
"%s: Add an Upstart script to manage D-Bus activated "
"service lifecycle: see crbug.com/1025914.",
filename,
)
self.assertTrue(success)
class SafesetidEntry(NamedTuple):
"""An entry in a Safesetid allowlist file."""
conf: str
source_uid: int
target_uid: int
class SafesetidTest(image_test_lib.ImageTestCase):
"""Safesetid related image tests."""
_success = True
def _parse(self, conf):
"""Parse a safesetid allowlist.txt file and yield each entry."""
for line in conf.read_text("utf-8").splitlines():
line = line.strip().split("#", 1)[0]
if not line:
continue
parts = line.split(":")
if (
len(parts) != 2
or not parts[0].isnumeric()
or not parts[1].isnumeric()
):
logging.error("%s: has invalid line '%s'", conf, line)
self._success = False
continue
yield SafesetidEntry(conf.name, int(parts[0]), int(parts[1]))
def _iter_allowlist_txt(self):
"""Yield every safesetid rule in the rootfs."""
root = Path(image_test_lib.ROOT_A)
config_path = (
root
/ "usr"
/ "share"
/ "cros"
/ "startup"
/ "process_management_policies"
)
for conf in config_path.glob("*.txt"):
yield from self._parse(conf)
def TestSafesetidConfig(self) -> None:
"""Check for Safesetid config problems
* The same rule should not exist more than once or setting the policy
will fail.
* There should not be transitions to unconstrained users. Every
destination user should have at least one rule to make sure it is not
able to transition to any other user (such as root).
"""
sources = set()
targets = set()
rules = {}
for entry in self._iter_allowlist_txt():
key = (entry.source_uid, entry.target_uid)
sources.add(entry.source_uid)
targets.add(entry.target_uid)
if key in rules:
other = rules[key].conf
logging.error(
"%s and %s: both have '%s'", other, entry.conf, key
)
self._success = False
else:
rules[key] = entry
unconstrained_uids = targets.difference(sources)
if unconstrained_uids:
logging.error(
"UIDs that are only targets are unconstrained: '%s'",
unconstrained_uids,
)
self._success = False
self.assertTrue(self._success)
class TmpfilesdEntry(NamedTuple):
"""An entry in a tmpfiles.d file."""
config: str
type: str
path: str
mode: str
user: str
group: str
age: str
argument: str
class TmpfilesdTest(image_test_lib.ImageTestCase):
"""Verify tmpfiles.d configuration settings."""
def _parse(self, conf):
"""Parse a tmpfiles.d file and yield each entry."""
for line in conf.read_text("utf-8").splitlines():
line = line.strip().split("#", 1)[0]
if not line:
continue
line = line.split()
yield TmpfilesdEntry(conf, *(line + (["-"] * 5))[0:7])
def _iter_tmpfiles_d(self):
"""Yield every tmpfiles.d entry in the rootfs."""
root = Path(image_test_lib.ROOT_A)
etc_tmpfiles_d = root / "etc" / "tmpfiles.d"
usr_tmpfiles_d = root / "usr" / "lib" / "tmpfiles.d"
for tmpfiles_d in (etc_tmpfiles_d, usr_tmpfiles_d):
for conf in tmpfiles_d.glob("*.conf"):
yield from self._parse(conf)
def TestAccounts(self) -> None:
"""Make sure every user & group actually exist.
If the accounts don't exist at runtime, tmpfiles.d likes to blow up.
Numeric entries are allowed to support ARC++ shared mounts.
"""
root = Path(image_test_lib.ROOT_A)
etc_passwd = root / "etc" / "passwd"
etc_group = root / "etc" / "group"
valid_users = set(
x.split(":", 1)[0]
for x in etc_passwd.read_text("utf-8").splitlines()
)
valid_users.add("-")
valid_groups = set(
x.split(":", 1)[0]
for x in etc_group.read_text("utf-8").splitlines()
)
valid_groups.add("-")
success = True
for entry in self._iter_tmpfiles_d():
if not entry.user.isnumeric() and entry.user not in valid_users:
logging.error("%s: unknown user", entry)
success = False
if not entry.group.isnumeric() and entry.group not in valid_groups:
logging.error("%s: unknown group", entry)
success = False
self.assertTrue(success)
class FactoryScriptTest(image_test_lib.ImageTestCase):
"""Verifies the image can be loaded by the factory scripts.
Some factory scripts parse files in the image. This test aims to detect if
there's any format change in the image that breaks the factory scripts.
Please contact
https://chromium.googlesource.com/chromiumos/platform/factory/+/main/DIR_METADATA
or
chromeos-factoy-eng@google.com
if this test fails in CQ.
"""
FINALIZE_BUNDLE = os.path.join(
constants.SOURCE_ROOT, "src/platform/factory/bin/finalize_bundle"
)
def TestFinalizeBundle_ExtractFirmwareInfo(self) -> None:
root = Path(image_test_lib.ROOT_A)
# Skip the test for:
# 1. The project is too old that doesn't have cros-config
# 2. The project is too new that its firmware is not ready yet
if self._board and not portage_util.PortageqHasVersion(
"chromeos-base/chromeos-config", self._board
):
logging.info(
"Board %s doesn't have chromeos-config. Skip the test.",
self._board,
)
return
fw_update = root / "usr/sbin/chromeos-firmwareupdate"
if not fw_update.exists():
logging.info(
"The image doesn't have firmware updater. Skip the test."
)
return
cmd = [
self.FINALIZE_BUNDLE,
"fake_manifest.yaml",
"--extract-firmware-info",
root,
]
cros_build_lib.run(cmd)