| # -*- coding: utf-8 -*- |
| # Copyright 2014 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Collection of tests to run on the rootfs of a built image. |
| |
| This module should only be imported inside the chroot. |
| """ |
| |
| from __future__ import division |
| from __future__ import print_function |
| |
| import collections |
| import errno |
| import fnmatch |
| import glob |
| import io |
| import itertools |
| import mimetypes |
| import os |
| import re |
| import stat |
| import unittest |
| |
| from elftools.elf import elffile |
| from elftools.common import exceptions |
| import lddtree |
| import magic # pylint: disable=import-error |
| |
| from chromite.lib import cros_build_lib |
| from chromite.lib import cros_logging as logging |
| from chromite.lib import filetype |
| from chromite.lib import image_test_lib |
| from chromite.lib import osutils |
| from chromite.lib import parseelf |
| from chromite.lib import portage_util |
| |
| from chromite.cros.test import usergroup_baseline |
| |
| |
| class LocaltimeTest(image_test_lib.ImageTestCase): |
| """Verify that /etc/localtime is a symlink to /var/lib/timezone/localtime. |
| |
| This is an example of an image test. The image is already mounted. The |
| test can access rootfs via ROOT_A constant. |
| """ |
| |
| def TestLocaltimeIsSymlink(self): |
| localtime_path = os.path.join(image_test_lib.ROOT_A, 'etc', 'localtime') |
| self.assertTrue(os.path.islink(localtime_path)) |
| |
| def TestLocaltimeLinkIsCorrect(self): |
| localtime_path = os.path.join(image_test_lib.ROOT_A, 'etc', 'localtime') |
| self.assertEqual('/var/lib/timezone/localtime', |
| os.readlink(localtime_path)) |
| |
| |
| def _GuessMimeType(magic_obj, file_name): |
| """Guess a file's mimetype base on its extension and content. |
| |
| File extension is favored over file content to reduce noise. |
| |
| Args: |
| magic_obj: A loaded magic instance. |
| file_name: A path to the file. |
| |
| Returns: |
| A mime type of |file_name|. |
| """ |
| mime_type, _ = mimetypes.guess_type(file_name) |
| if not mime_type: |
| mime_type = magic_obj.file(file_name) |
| return mime_type |
| |
| |
| class BlockedTest(image_test_lib.ImageTestCase): |
| """Verify that rootfs does not contain blocked items.""" |
| |
| BLOCKED_PACKAGES = ( |
| 'app-text/iso-codes', |
| 'dev-java/icedtea', |
| 'dev-java/icedtea6-bin', |
| 'dev-java/openjdk-bin', |
| 'dev-lang/perl', |
| 'dev-lang/python', |
| 'dev-lang/tcl', |
| 'media-sound/pulseaudio', |
| 'x11-libs/libxklavier', |
| ) |
| |
| BLOCKED_FILES = ( |
| '/usr/bin/java', |
| '/usr/bin/javac', |
| '/usr/bin/perl', |
| '/usr/bin/python', |
| '/usr/bin/tclsh', |
| ) |
| |
| BLOCKED_DIRS = ( |
| '/usr/share/locale', |
| ) |
| |
| def TestBlockedDirectories(self): |
| for path in self.BLOCKED_DIRS: |
| full_path = os.path.join(image_test_lib.ROOT_A, path.lstrip(os.sep)) |
| self.assertFalse(os.path.isdir(full_path), |
| 'Directory %s is not allowed.' % path) |
| |
| def TestBlockedFileTypes(self): |
| """Fail if there are files of prohibited types (such as C++ source code). |
| |
| The allow list has higher precedence than the block list. |
| """ |
| blocked_patterns = [re.compile(x) for x in [ |
| r'text/x-c\+\+', |
| r'text/x-c', |
| ]] |
| allowed_patterns = [re.compile(x) for x in [ |
| r'.*/braille/.*', |
| r'.*/brltty/.*', |
| r'.*/etc/sudoers$', |
| r'.*/dump_vpd_log$', |
| r'.*\.conf$', |
| r'.*/libnl/classid$', |
| r'.*/locale/', |
| r'.*/X11/xkb/', |
| r'.*/chromeos-assets/', |
| r'.*/udev/rules.d/', |
| r'.*/firmware/ar3k/.*pst$', |
| r'.*/etc/services', |
| # Python reads this file at runtime to look up install features. |
| r'.*/usr/include/python[\d\.]*/pyconfig.h$', |
| r'.*/usr/lib/node_modules/.*', |
| r'.*/usr/share/dev-install/portage', |
| r'.*/opt/pita/qml', |
| ]] |
| |
| failures = [] |
| |
| magic_obj = magic.open(magic.MAGIC_MIME_TYPE) |
| magic_obj.load() |
| for root, _, file_names in os.walk(image_test_lib.ROOT_A): |
| for file_name in file_names: |
| full_name = os.path.join(root, file_name) |
| if os.path.islink(full_name) or not os.path.isfile(full_name): |
| continue |
| |
| mime_type = _GuessMimeType(magic_obj, full_name) |
| if (any(x.match(mime_type) for x in blocked_patterns) and not |
| any(x.match(full_name) for x in allowed_patterns)): |
| failures.append('File %s type %s is not allowed.' % |
| (full_name, mime_type)) |
| magic_obj.close() |
| |
| self.assertFalse(failures, '\n'.join(failures)) |
| |
| def TestBlockedPackages(self): |
| """Fail if any blocked packages are installed.""" |
| for package in self.BLOCKED_PACKAGES: |
| self.assertFalse( |
| portage_util.PortageqHasVersion( |
| package, sysroot=image_test_lib.ROOT_A)) |
| |
| def TestBlockedFiles(self): |
| """Fail if any blocked files exist.""" |
| for path in self.BLOCKED_FILES: |
| full_path = os.path.join(image_test_lib.ROOT_A, path.lstrip(os.sep)) |
| # TODO(saklein) Convert ImageTestCase to extend cros_build_lib.unittest |
| # and change to an assertNotExists. Currently produces an error importing |
| # mox on at least some builders. |
| self.assertFalse(os.path.exists(full_path), |
| 'Path exists but should not: %s' % full_path) |
| |
| def TestValidInterpreter(self): |
| """Fail if a script's interpreter is not found, or not executable. |
| |
| A script interpreter is anything after the #! sign, up to the end of line |
| or the first space. |
| """ |
| failures = [] |
| |
| for root, _, file_names in os.walk(image_test_lib.ROOT_A): |
| for file_name in file_names: |
| full_name = os.path.join(root, file_name) |
| file_stat = os.lstat(full_name) |
| if (not stat.S_ISREG(file_stat.st_mode) or |
| (file_stat.st_mode & 0o0111) == 0): |
| continue |
| |
| with open(full_name, 'rb') as f: |
| if f.read(2) != '#!': |
| continue |
| line = '#!' + f.readline().strip() |
| |
| try: |
| # Ignore arguments to the interpreter. |
| interp, _ = filetype.SplitShebang(line) |
| except ValueError: |
| failures.append('File %s has an invalid interpreter path: "%s".' % |
| (full_name, line)) |
| |
| # Absolute path to the interpreter. |
| interp = os.path.join(image_test_lib.ROOT_A, interp.lstrip('/')) |
| # Interpreter could be a symlink. Resolve it. |
| interp = osutils.ResolveSymlinkInRoot(interp, image_test_lib.ROOT_A) |
| if not os.path.isfile(interp): |
| failures.append('File %s uses non-existing interpreter %s.' % |
| (full_name, interp)) |
| elif (os.stat(interp).st_mode & 0o111) == 0: |
| failures.append('Interpreter %s is not executable.' % interp) |
| |
| self.assertFalse(failures, '\n'.join(failures)) |
| |
| |
| class LinkageTest(image_test_lib.ImageTestCase): |
| """Verify that all binaries and libraries have proper linkage.""" |
| |
| def setUp(self): |
| osutils.MountDir( |
| os.path.join(image_test_lib.STATEFUL, 'var_overlay'), |
| os.path.join(image_test_lib.ROOT_A, 'var'), |
| mount_opts=('bind', ), |
| ) |
| |
| def tearDown(self): |
| osutils.UmountDir( |
| os.path.join(image_test_lib.ROOT_A, 'var'), |
| cleanup=False, |
| ) |
| |
| def _IsPackageMerged(self, package_name): |
| has_version = portage_util.PortageqHasVersion( |
| package_name, sysroot=image_test_lib.ROOT_A) |
| if has_version: |
| logging.info('Package is available: %s', package_name) |
| else: |
| logging.info('Package is not available: %s', package_name) |
| return has_version |
| |
| def TestLinkage(self): |
| """Find main executable binaries and check their linkage.""" |
| binaries = [ |
| 'bin/sed', |
| ] |
| |
| if self._IsPackageMerged('chromeos-base/chromeos-login'): |
| binaries.append('sbin/session_manager') |
| |
| if self._IsPackageMerged('x11-base/xorg-server'): |
| binaries.append('usr/bin/Xorg') |
| |
| # When chrome is built with USE="pgo_generate", rootfs chrome is actually a |
| # symlink to a real binary which is in the stateful partition. So we do not |
| # check for a valid chrome binary in that case. |
| if not self._IsPackageMerged('chromeos-base/chromeos-chrome[pgo_generate]'): |
| if self._IsPackageMerged('chromeos-base/chromeos-chrome[app_shell]'): |
| binaries.append('opt/google/chrome/app_shell') |
| elif self._IsPackageMerged('chromeos-base/chromeos-chrome'): |
| binaries.append('opt/google/chrome/chrome') |
| |
| binaries = [os.path.join(image_test_lib.ROOT_A, x) for x in binaries] |
| |
| # Grab all .so files |
| libraries = [] |
| for root, _, files in os.walk(image_test_lib.ROOT_A): |
| for name in files: |
| filename = os.path.join(root, name) |
| if '.so' in filename: |
| libraries.append(filename) |
| |
| ldpaths = lddtree.LoadLdpaths(image_test_lib.ROOT_A) |
| for to_test in itertools.chain(binaries, libraries): |
| # to_test could be a symlink, we need to resolve it relative to ROOT_A. |
| while os.path.islink(to_test): |
| link = os.readlink(to_test) |
| if link.startswith('/'): |
| to_test = os.path.join(image_test_lib.ROOT_A, link[1:]) |
| else: |
| to_test = os.path.join(os.path.dirname(to_test), link) |
| try: |
| lddtree.ParseELF(to_test, root=image_test_lib.ROOT_A, ldpaths=ldpaths) |
| except lddtree.exceptions.ELFError: |
| continue |
| except IOError as e: |
| self.fail('Fail linkage test for %s: %s' % (to_test, e)) |
| |
| |
| @unittest.expectedFailure |
| class FileSystemMetaDataTest(image_test_lib.ImageTestCase): |
| """A test class to gather file system stats such as free inodes, blocks.""" |
| |
| def TestStats(self): |
| """Collect inodes and blocks usage.""" |
| # Find the loopback device that was mounted to ROOT_A. |
| loop_device = None |
| root_path = os.path.abspath(os.readlink(image_test_lib.ROOT_A)) |
| for mtab in osutils.IterateMountPoints(): |
| if mtab.destination == root_path: |
| loop_device = mtab.source |
| break |
| self.assertTrue(loop_device, 'Cannot find loopback device for ROOT_A.') |
| |
| # Gather file system stats with tune2fs. |
| cmd = [ |
| 'tune2fs', |
| '-l', |
| loop_device |
| ] |
| # tune2fs produces output like this: |
| # |
| # tune2fs 1.42 (29-Nov-2011) |
| # Filesystem volume name: ROOT-A |
| # Last mounted on: <not available> |
| # Filesystem UUID: <none> |
| # Filesystem magic number: 0xEF53 |
| # Filesystem revision #: 1 (dynamic) |
| # ... |
| # |
| # So we need to ignore the first line. |
| ret = cros_build_lib.sudo_run(cmd, capture_output=True, |
| extra_env={'LC_ALL': 'C'}) |
| fs_stat = dict(line.split(':', 1) for line in ret.output.splitlines() |
| if ':' in line) |
| free_inodes = int(fs_stat['Free inodes']) |
| free_blocks = int(fs_stat['Free blocks']) |
| inode_count = int(fs_stat['Inode count']) |
| block_count = int(fs_stat['Block count']) |
| block_size = int(fs_stat['Block size']) |
| |
| sum_file_size = 0 |
| for root, _, filenames in os.walk(image_test_lib.ROOT_A): |
| for file_name in filenames: |
| full_name = os.path.join(root, file_name) |
| file_stat = os.lstat(full_name) |
| sum_file_size += file_stat.st_size |
| |
| metadata_size = (block_count - free_blocks) * block_size - sum_file_size |
| |
| self.OutputPerfValue('free_inodes_over_inode_count', |
| free_inodes * 100 / inode_count, 'percent', |
| graph='free_over_used_ratio') |
| self.OutputPerfValue('free_blocks_over_block_count', |
| free_blocks * 100 / block_count, 'percent', |
| graph='free_over_used_ratio') |
| self.OutputPerfValue('apparent_size', sum_file_size, 'bytes', |
| higher_is_better=False, graph='filesystem_stats') |
| self.OutputPerfValue('metadata_size', metadata_size, 'bytes', |
| higher_is_better=False, graph='filesystem_stats') |
| |
| |
| class SymbolsTest(image_test_lib.ImageTestCase): |
| """Tests related to symbols in ELF files.""" |
| |
| def setUp(self): |
| # Mapping of file name --> 2-tuple (import, export). |
| self._known_symtabs = {} |
| |
| def _GetSymbols(self, file_name): |
| """Return a 2-tuple (import, export) of an ELF file |file_name|. |
| |
| Import and export in the returned tuple are sets of strings (symbol names). |
| """ |
| if file_name in self._known_symtabs: |
| return self._known_symtabs[file_name] |
| |
| # We use BytesIO here to obviate fseek/fread time in pyelftools. |
| stream = io.BytesIO(osutils.ReadFile(file_name, mode='rb')) |
| |
| try: |
| elf = elffile.ELFFile(stream) |
| except exceptions.ELFError: |
| raise ValueError('%s is not an ELF file.' % file_name) |
| |
| imp, exp = parseelf.ParseELFSymbols(elf) |
| self._known_symtabs[file_name] = imp, exp |
| return imp, exp |
| |
| def TestImportedSymbolsAreAvailable(self): |
| """Ensure all ELF files' imported symbols are available in ROOT-A. |
| |
| In this test, we find all imported symbols and exported symbols from all |
| ELF files on the system. This test will fail if the set of imported symbols |
| is not a subset of exported symbols. |
| |
| This test DOES NOT simulate ELF loading. "TestLinkage" does that with |
| `lddtree`. |
| """ |
| # Import tables of files, keyed by file names. |
| importeds = collections.defaultdict(set) |
| # All exported symbols. |
| exported = set() |
| |
| # Allow firmware binaries which are mostly provided by various |
| # vendors, some in proprietary format. This is OK because the files are |
| # not executable on the main CPU, so we treat them as blobs that we load |
| # into external hardware/devices. This is ensured by PermissionTest. |
| # TestNoExecutableInFirmwareFolder. |
| permitted_patterns = ( |
| os.path.join('dir-ROOT-A', 'lib', 'firmware', '*'), |
| |
| # Jetstream firmware package. |
| os.path.join('dir-ROOT-A', 'usr', 'share', 'fastrpc', '*'), |
| ) |
| |
| for root, _, filenames in os.walk(image_test_lib.ROOT_A): |
| for filename in filenames: |
| full_name = os.path.join(root, filename) |
| if os.path.islink(full_name) or not os.path.isfile(full_name): |
| continue |
| |
| if any(fnmatch.fnmatch(full_name, x) for x in permitted_patterns): |
| continue |
| |
| try: |
| imp, exp = self._GetSymbols(full_name) |
| except (ValueError, IOError): |
| continue |
| else: |
| importeds[full_name] = imp |
| exported.update(exp) |
| |
| known_unsatisfieds = { |
| 'libthread_db-1.0.so': { |
| b'ps_pdwrite', b'ps_pdread', b'ps_lgetfpregs', b'ps_lsetregs', |
| b'ps_lgetregs', b'ps_lsetfpregs', b'ps_pglobal_lookup', |
| b'ps_getpid', |
| }, |
| } |
| |
| excluded_files = set([ |
| # These libraries are built against Android NDK's libc and have several |
| # imports that will appear to be unsatisfied. |
| 'libmojo_core_arc32.so', |
| 'libmojo_core_arc64.so', |
| ]) |
| |
| failures = [] |
| for full_name, imported in importeds.items(): |
| file_name = os.path.basename(full_name) |
| if file_name in excluded_files: |
| continue |
| missing = imported - exported - known_unsatisfieds.get(file_name, set()) |
| if missing: |
| failures.append('File %s contains unsatisfied symbols: %r' % |
| (full_name, missing)) |
| self.assertFalse(failures, '\n'.join(failures)) |
| |
| |
| class UserGroupTest(image_test_lib.ImageTestCase): |
| """Tests users and groups in /etc/passwd and /etc/group.""" |
| |
| @staticmethod |
| def _validate_passwd(entry): |
| """Check users that are not in the baseline. |
| |
| The user ID should match the group ID, and the user's home directory |
| and shell should be invalid. |
| """ |
| uid = entry.uid |
| gid = entry.gid |
| |
| if uid != gid: |
| logging.error('New user "%s" has uid %d and different gid %d', |
| entry.user, uid, gid) |
| return False |
| |
| if entry.home != '/dev/null': |
| logging.error('Expected /dev/null for new user "%s" home dir, got "%s"', |
| entry.user, entry.home) |
| return False |
| |
| if entry.shell != '/bin/false': |
| logging.error('Expected /bin/false for new user "%s" shell, got "%s"', |
| entry.user, entry.shell) |
| return False |
| |
| return True |
| |
| @staticmethod |
| def _validate_group(entry): |
| """Check groups that are not in the baseline. |
| |
| Allow groups that have no users and groups with only the matching user. |
| """ |
| group_name = entry.group |
| users = entry.users |
| |
| # Groups with no users and groups with only the matching user are OK. |
| if not users or users == {group_name}: |
| return True |
| |
| logging.error('New group "%s" has users "%s"', group_name, users) |
| return False |
| |
| @staticmethod |
| def _match_passwd(expected, actual): |
| """Match password, uid, gid, home, and shell.""" |
| matched = True |
| |
| if expected.encpasswd != actual.encpasswd: |
| matched = False |
| logging.error('Expected encrypted password "%s" for user "%s", got "%s".', |
| expected.encpasswd, expected.user, actual.encpasswd) |
| |
| if expected.uid != actual.uid: |
| matched = False |
| logging.error('Expected uid %d for user "%s", got %d.', |
| expected.uid, expected.user, actual.uid) |
| |
| if expected.gid != actual.gid: |
| matched = False |
| logging.error('Expected gid %d for user "%s", got %d.', |
| expected.gid, expected.user, actual.gid) |
| |
| if isinstance(expected.home, set): |
| valid_home = actual.home in expected.home |
| else: |
| valid_home = actual.home == expected.home |
| if not valid_home: |
| matched = False |
| logging.error('Expected home "%s" for user "%s", got "%s".', |
| expected.home, expected.user, actual.home) |
| |
| if isinstance(expected.shell, set): |
| valid_shell = actual.shell in expected.shell |
| else: |
| valid_shell = actual.shell == expected.shell |
| if not valid_shell: |
| matched = False |
| logging.error('Expected shell "%s" for user "%s", got "%s".', |
| expected.shell, expected.user, actual.shell) |
| |
| return matched |
| |
| @staticmethod |
| def _match_group(expected, actual): |
| """Match password, gid, and members.""" |
| matched = True |
| |
| if expected.encpasswd != actual.encpasswd: |
| matched = False |
| logging.error( |
| 'Expected encrypted password "%s" for group "%s", got "%s".', |
| expected.encpasswd, expected.group, actual.encpasswd) |
| |
| if expected.gid != actual.gid: |
| matched = False |
| logging.error('Expected gid %d for group "%s", got %d.', |
| expected.gid, expected.group, actual.gid) |
| |
| if expected.users != actual.users: |
| matched = False |
| logging.error('Expected members "%s" for group "%s", got "%s".', |
| expected.users, expected.group, actual.users) |
| |
| return matched |
| |
| def _LoadPath(self, path): |
| """Load the given passwd/group file. |
| |
| Args: |
| path: Path to the file. |
| |
| Returns: |
| A dict of passwd/group entries indexed by account name. |
| """ |
| d = {} |
| for line in osutils.ReadFile(path).splitlines(): |
| fields = line.split(':') |
| if len(fields) == 7: |
| # wpa:!:219:219::/dev/null:/bin/false |
| entry = usergroup_baseline.UserEntry(user=fields[0], |
| encpasswd=fields[1], |
| uid=int(fields[2]), |
| gid=int(fields[3]), |
| home=fields[5], |
| shell=fields[6]) |
| d[entry.user] = entry |
| elif len(fields) == 4: |
| # tty:!:5:power,brltty |
| users = set() |
| if fields[3]: |
| users = set(fields[3].split(',')) |
| entry = usergroup_baseline.GroupEntry(group=fields[0], |
| encpasswd=fields[1], |
| gid=int(fields[2]), |
| users=users) |
| d[entry.group] = entry |
| else: |
| raise ValueError('Invalid baseline format "%s"' % line) |
| |
| return d |
| |
| def _LoadBaseline(self, basename): |
| """Loads the passwd or group baseline.""" |
| d = None |
| if 'passwd' in basename: |
| d = usergroup_baseline.USER_BASELINE.copy() |
| |
| # Per-board baseline. |
| if self._board and self._board in usergroup_baseline.USER_BOARD_BASELINES: |
| d.update(usergroup_baseline.USER_BOARD_BASELINES[self._board]) |
| elif 'group' in basename: |
| d = usergroup_baseline.GROUP_BASELINE.copy() |
| |
| # Per-board baseline. |
| if (self._board and |
| self._board in usergroup_baseline.GROUP_BOARD_BASELINES): |
| d.update(usergroup_baseline.GROUP_BOARD_BASELINES[self._board]) |
| else: |
| raise ValueError('Invalid basename "%s"' % basename) |
| |
| return d |
| |
| def _CheckFile(self, basename): |
| """Validates the passwd or group file.""" |
| match_func = getattr(self, '_match_%s' % basename) |
| validate_func = getattr(self, '_validate_%s' % basename) |
| |
| expected_entries = self._LoadBaseline(basename) |
| actual_entries = self._LoadPath(os.path.join(image_test_lib.ROOT_A, |
| 'etc', |
| basename)) |
| |
| success = True |
| for entry, details in actual_entries.items(): |
| if entry not in expected_entries: |
| is_valid = validate_func(details) |
| if not is_valid: |
| logging.error('Unexpected %s entry for "%s".', basename, entry) |
| |
| success = success and is_valid |
| continue |
| |
| expected = expected_entries[entry] |
| match_res = match_func(expected, details) |
| success = success and match_res |
| |
| missing = set(expected_entries.keys()) - set(actual_entries.keys()) |
| for m in missing: |
| logging.info('Ignoring missing %s entry for "%s".', basename, m) |
| |
| self.assertTrue(success) |
| |
| def TestUsers(self): |
| """Enforces known user IDs.""" |
| self._CheckFile('passwd') |
| |
| def TestGroups(self): |
| """Enforces known group IDs.""" |
| self._CheckFile('group') |
| |
| |
| class CroshTest(image_test_lib.ImageTestCase): |
| """Check crosh code.""" |
| |
| # Base directory for crosh code. |
| CROSH_DIR = 'usr/share/crosh' |
| |
| def TestUnknownModules(self): |
| """Only permit known crosh modules on the system.""" |
| # Do *not* add modules to this list until they've been reviewed by security |
| # or someone in the crosh/OWNERS list. Insecure code here can easily cause |
| # compromise of CrOS system security in verified mode. It has happened. |
| ALLOWED = { |
| 'dev.d': {'50-crosh.sh'}, |
| 'extra.d': set(), |
| 'removable.d': {'50-crosh.sh'}, |
| } |
| |
| base_path = os.path.join(image_test_lib.ROOT_A, self.CROSH_DIR) |
| for mod_dir, good_modules in ALLOWED.items(): |
| mod_path = os.path.join(base_path, mod_dir) |
| if not os.path.exists(mod_path): |
| continue |
| |
| found_modules = set(os.listdir(mod_path)) |
| unknown_modules = found_modules - good_modules |
| self.assertEqual(set(), unknown_modules) |
| |
| |
| class SymlinkTest(image_test_lib.ImageTestCase): |
| """Verify symlinks in the rootfs.""" |
| |
| # These are an allow list only. We don't require any of these to actually |
| # be symlinks. But if they are, they have to point to these targets. |
| # |
| # The key is the symlink and the value is the symlink target. |
| # Both accept fnmatch style expressions (i.e. globs). |
| _ACCEPTABLE_LINKS = { |
| # Allow any /etc path to point to any /run path. |
| '/etc/*': {'/run/*'}, |
| |
| '/etc/localtime': {'/var/lib/timezone/localtime'}, |
| '/etc/machine-id': {'/var/lib/dbus/machine-id'}, |
| '/etc/mtab': {'/proc/mounts'}, |
| |
| # The kip board has a broken/dangling symlink. Allow it until we can |
| # rewrite the code. Or kip goes EOL. |
| '/lib/firmware/elan_i2c.bin': {'/opt/google/touch/firmware/*'}, |
| |
| # Some boards don't set this up properly. It's not a big deal. |
| '/usr/libexec/editor': {'/usr/bin/*'}, |
| |
| # These are hacks to make dev images and `dev_install` work. Normally |
| # /usr/local isn't mounted or populated, so it's not too big a deal to |
| # let these things always point there. |
| '/etc/env.d/*': {'/usr/local/etc/env.d/*'}, |
| '/usr/bin/python*': { |
| '/usr/local/usr/bin/python*', |
| '/usr/local/bin/python*', |
| }, |
| '/usr/lib/portage': { |
| '/usr/local/usr/lib/portage', |
| '/usr/local/lib/portage', |
| }, |
| '/usr/lib/python-exec': { |
| '/usr/local/usr/lib/python-exec', |
| '/usr/local/lib/python-exec', |
| }, |
| '/usr/lib/debug': {'/usr/local/usr/lib/debug'}, |
| # Used by `file` and libmagic.so when the package is in /usr/local. |
| '/usr/share/misc/magic.mgc': {'/usr/local/share/misc/magic.mgc'}, |
| '/usr/share/portage': {'/usr/local/share/portage'}, |
| # Needed for the ARC++/ARCVM dual build. For test images only. |
| '/opt/google/vms/android': {'/usr/local/vms/android'}, |
| # TODO(b/150806692): Clenaup this library symlink. |
| # Allow /opt/pita/lib path to point to any /run path. For PluginVM DLC. |
| '/opt/pita/lib': {'/run/*'}, |
| } |
| |
| @classmethod |
| def _SymlinkTargetAllowed(cls, source, target): |
| """See whether |source| points to an acceptable |target|.""" |
| # Scan the allow list. |
| for allow_source, allow_targets in cls._ACCEPTABLE_LINKS.items(): |
| if (fnmatch.fnmatch(source, allow_source) and |
| any(fnmatch.fnmatch(target, x) for x in allow_targets)): |
| return True |
| |
| # Reject everything else. |
| return False |
| |
| def TestCheckSymlinkTargets(self): |
| """Make sure the targets of all symlinks are 'valid'.""" |
| failures = [] |
| for root, _, files in os.walk(image_test_lib.ROOT_A): |
| for name in files: |
| full_path = os.path.join(root, name) |
| try: |
| target = os.readlink(full_path) |
| except OSError as e: |
| # If it's not a symlink, ignore it. |
| if e.errno == errno.EINVAL: |
| continue |
| raise |
| |
| # Ignore symlinks to just basenames. |
| if '/' not in target: |
| continue |
| |
| # Resolve the link target relative to the rootfs. |
| resolved_target = osutils.ResolveSymlinkInRoot(full_path, |
| image_test_lib.ROOT_A) |
| normed_target = os.path.normpath(resolved_target) |
| |
| # If the target exists, it's fine. |
| if os.path.exists(normed_target): |
| continue |
| |
| # Now check the allow list. |
| source = '/' + os.path.relpath(full_path, image_test_lib.ROOT_A) |
| if not self._SymlinkTargetAllowed(source, target): |
| failures.append((source, target)) |
| |
| for (source, target) in failures: |
| logging.error('Insecure symlink: %s -> %s', source, target) |
| self.assertEqual(0, len(failures)) |
| |
| |
| class PermissionTest(image_test_lib.ImageTestCase): |
| """Verify file permissions.""" |
| |
| def TestNoExecutableInFirmwareFolder(self): |
| """Ensure all files in ROOT-A/lib/firmware are not executable. |
| |
| Files under ROOT-A/lib/firmware will be allowed in |
| "TestImportedSymbolsAreAvailable". |
| """ |
| firmware_path = os.path.join(image_test_lib.ROOT_A, 'lib', 'firmware') |
| |
| success = True |
| for root, _, filenames in os.walk(firmware_path): |
| for filename in filenames: |
| full_name = os.path.join(root, filename) |
| # We check symlinks in SymlinkTest, so no need to recheck here. |
| if os.path.islink(full_name) or not os.path.isfile(full_name): |
| continue |
| |
| st = os.stat(full_name) |
| if st.st_mode & 0o111: |
| success = False |
| logging.error('Executable file not allowed in /lib/firmware: %s.', |
| filename) |
| |
| self.assertTrue(success) |
| |
| |
| class IntelWifiTest(image_test_lib.ImageTestCase): |
| """Verify installation of iwlwifi driver and firmware. |
| |
| Intel WiFi chips need a kernel module and a firmware file. Test that they're |
| installed correctly, in particular that there's no version mismatch between |
| the two or that the firmware file for a particular chip is missing entirely. |
| """ |
| |
| def _FindKernelVersion(self): |
| """Detect the version of the kernel used by the image.""" |
| module_top = os.path.join(image_test_lib.ROOT_A, 'lib', 'modules') |
| if not os.path.isdir(module_top): |
| logging.error('Path "%s" is not a directory.', module_top) |
| return None |
| |
| kernels = os.listdir(module_top) |
| if len(kernels) != 1: |
| logging.error('Image has %d kernel versions, expected 1.', len(kernels)) |
| logging.error('Found kernel versions: %s', ', '.join(kernels)) |
| return None |
| |
| return kernels[0] |
| |
| def _FindDriverSupportedFirmware(self, kernel): |
| """List all the firmware files supported by the driver. |
| |
| The iwlwifi driver has the path of the various firmware versions that it |
| supports built in. The list of firmware versions is available through the |
| 'modinfo' command. |
| |
| Args: |
| kernel: A string containing the kernel version. |
| |
| Returns: |
| A list of strings containing the names of all the firmware files that can |
| be loaded by the iwlwifi driver. |
| """ |
| # The iwlwifi module lists the firmware files that it can load. |
| # Typical output of the 'modinfo' command: |
| # iwlwifi-7265-17.ucode |
| # iwlwifi-7265D-29.ucode |
| # iwlwifi-8000C-36.ucode |
| # iwlwifi-8265-36.ucode |
| # iwlwifi-9000-pu-b0-jf-b0-46.ucode |
| # iwlwifi-9260-th-b0-jf-b0-46.ucode |
| try: |
| cmd = ['modinfo', '-F', 'firmware', '-b', image_test_lib.ROOT_A, |
| '-k', kernel, 'iwlwifi'] |
| modinfo = cros_build_lib.run(cmd, print_cmd=False, capture_output=True, |
| encoding='utf-8') |
| except cros_build_lib.RunCommandError as e: |
| # It's not necessarily an error to have enabled the firmware but not the |
| # iwlwifi driver (e.g. bringup) -> log a warning, not an error. |
| logging.warning('Could not query iwlwifi driver.') |
| logging.warning('"%s" returned code %d.', ' '.join(cmd), e.returncode) |
| logging.warning('stdout: %s', e.stdout) |
| logging.warning('stderr: %s', e.stderr) |
| return [] |
| |
| return modinfo.output.splitlines() |
| |
| def _GetLinuxFirmwareIwlwifiFlags(self): |
| """Extract 'iwlwifi-*' flags from LINUX_FIRMWARE.""" |
| linux_firmware = portage_util.PortageqEnvvar('LINUX_FIRMWARE', |
| board=self._board, |
| allow_undefined=True) |
| if not linux_firmware: |
| logging.info("Board %s doesn't use LINUX_FIRMWARE.", self._board) |
| return [] |
| |
| # Look for flags 'iwlwifi-all', 'iwlwifi-9260', 'iwlwifi-QuZ', etc. |
| flags = [x for x in linux_firmware.split() if x.startswith('iwlwifi-')] |
| if not flags: |
| logging.info("Board %s doesn't support iwlwifi.", self._board) |
| return [] |
| |
| logging.info('Expecting the following WiFi chips: %s', ', '.join(flags)) |
| return flags |
| |
| def _GetIwlwifiFirmwareFiles(self): |
| """List all the iwlwifi-* files in /lib/firmware.""" |
| pathname = os.path.join(image_test_lib.ROOT_A, 'lib', 'firmware', |
| 'iwlwifi-*') |
| return [os.path.basename(x) for x in glob.glob(pathname)] |
| |
| def TestIwlwifiFirmwareAndKernelMatch(self): |
| """Ensure that the firmware files are supported by the kernel. |
| |
| The iwlwifi firmware files expected by the driver must be present in |
| /lib/firmware. This will also ensure that there's no version mismatch |
| between the driver and the firmware. |
| """ |
| iwlwifi_flags = self._GetLinuxFirmwareIwlwifiFlags() |
| if not iwlwifi_flags: |
| self.skipTest('Could not find iwlwifi flags.') |
| if 'iwlwifi-all' in iwlwifi_flags: |
| self.skipTest('All firmware files have been installed.') |
| |
| # Find the kernel version of the image, necessary to call 'modinfo' later. |
| kernel = self._FindKernelVersion() |
| if kernel is None: |
| self.skipTest('Failed to detect the kernel version.') |
| |
| modinfo_files = self._FindDriverSupportedFirmware(kernel) |
| if not modinfo_files: |
| self.skipTest('Could not find iwlwifi module.') |
| |
| iwlwifi_files = self._GetIwlwifiFirmwareFiles() |
| # We have at least one iwlwifi-* flag listed in LINUX_FIRMWARE, ensure that |
| # at least one firmware file is present. |
| self.assertTrue(iwlwifi_files, 'No iwlwifi firmware file installed.') |
| |
| # Ensure that for every iwlwifi-* flag listed in LINUX_FIRMWARE, the driver |
| # has at least one corresponding firmware file listed, and at least one of |
| # the firmware files is present on the rootfs. |
| for flag in iwlwifi_flags: |
| supported_fw = {x for x in modinfo_files if x.startswith(flag)} |
| available_fw = {x for x in iwlwifi_files if x.startswith(flag)} |
| logging.info('The driver supports the following "%s" files: %s', flag, |
| ', '.join(supported_fw)) |
| logging.info('The rootfs provides the following "%s" files: %s', flag, |
| ', '.join(available_fw)) |
| self.assertTrue(supported_fw & available_fw, |
| 'Driver/firmware mismatch for %s' % flag) |
| |
| class DBusServiceTest(image_test_lib.ImageTestCase): |
| """Verify installed D-Bus service file contents.""" |
| |
| def TestDelegationToUpstart(self): |
| """Check D-Bus service files for delegation to Upstart. |
| |
| crbug.com/1025914: To prevent D-Bus activated services from running |
| indefinitely, each D-Bus activated service file should have an associated |
| Upstart job that manages the lifecycle of the service. |
| |
| The Exec clause can either start with "Exec=/sbin/start(whitespace)" |
| (delegate to upstart) or should be "Exec=/sbin/false" (D-Bus service |
| activations disabled). |
| """ |
| DBUS_HEADER_RE = re.compile(r'^\[D-BUS Service]$', re.MULTILINE) |
| EXEC_CLAUSE_RE = re.compile(r'^Exec=(/sbin/start\s|/bin/false)', |
| re.MULTILINE) |
| |
| dbus_service_path_spec = ('%s/usr/share/dbus-1/*services/*.service' % |
| image_test_lib.ROOT_A) |
| success = True |
| |
| for filename in glob.iglob(dbus_service_path_spec): |
| file_contents = osutils.ReadFile(filename) |
| if (DBUS_HEADER_RE.search(file_contents) and |
| not EXEC_CLAUSE_RE.search(file_contents)): |
| success = False |
| logging.error('%s: Add an Upstart script to manage D-Bus activated ' |
| 'service lifecycle: see crbug.com/1025914.', filename) |
| |
| self.assertTrue(success) |