| # Copyright 2014 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Collection of tests to run on the rootfs of a built image.""" |
| |
| from __future__ import print_function |
| |
| import cStringIO |
| import collections |
| import itertools |
| import magic |
| import mimetypes |
| import os |
| import re |
| import stat |
| import unittest |
| |
| from chromite.lib import cros_build_lib |
| from chromite.lib import cros_logging as logging |
| from chromite.lib import filetype |
| from chromite.lib import osutils |
| from chromite.lib import perf_uploader |
| |
| |
| # File extension for file containing performance values. |
| PERF_EXTENSION = '.perf' |
| # Symlinks to mounted partitions. |
| ROOT_A = 'dir-ROOT-A' |
| STATEFUL = 'dir-STATE' |
| |
| |
| def IsPerfFile(file_name): |
| """Return True if |file_name| may contain perf values.""" |
| return file_name.endswith(PERF_EXTENSION) |
| |
| |
| class _BoardAndDirectoryMixin(object): |
| """A mixin to hold image test's specific info.""" |
| |
| _board = None |
| _result_dir = None |
| |
| def SetBoard(self, board): |
| self._board = board |
| |
| def SetResultDir(self, result_dir): |
| self._result_dir = result_dir |
| |
| |
| class ImageTestCase(unittest.TestCase, _BoardAndDirectoryMixin): |
| """Subclass unittest.TestCase to provide utility methods for image tests. |
| |
| Tests should not directly inherit this class. They should instead inherit |
| from ForgivingImageTestCase, or NonForgivingImageTestCase. |
| |
| Tests MUST use prefix "Test" (e.g.: TestLinkage, TestDiskSpace), not "test" |
| prefix, in order to be picked up by the test runner. |
| |
| Tests are run inside chroot. Tests are run as root. DO NOT modify any mounted |
| partitions. |
| |
| The current working directory is set up so that "ROOT_A", and "STATEFUL" |
| constants refer to the mounted partitions. The partitions are mounted |
| readonly. |
| |
| current working directory |
| + ROOT_A |
| + / |
| + bin |
| + etc |
| + usr |
| ... |
| + STATEFUL |
| + var_overlay |
| ... |
| """ |
| |
| def IsForgiving(self): |
| """Indicate if this test is forgiving. |
| |
| The test runner will classify tests into two buckets, forgiving and non- |
| forgiving. Forgiving tests DO NOT affect the result of the test runner; |
| non-forgiving tests do. In either case, test runner will still output the |
| result of each individual test. |
| """ |
| raise NotImplementedError() |
| |
| def _GeneratePerfFileName(self): |
| """Return a perf file name for this test. |
| |
| The file name is formatted as: |
| |
| image_test.<test_class><PERF_EXTENSION> |
| |
| e.g.: |
| |
| image_test.DiskSpaceTest.perf |
| """ |
| test_name = 'image_test.%s' % self.__class__.__name__ |
| file_name = '%s%s' % (test_name, PERF_EXTENSION) |
| file_name = os.path.join(self._result_dir, file_name) |
| return file_name |
| |
| @staticmethod |
| def GetTestName(file_name): |
| """Return the test name from a perf |file_name|. |
| |
| Args: |
| file_name: A path to the perf file as generated by _GeneratePerfFileName. |
| |
| Returns: |
| The qualified test name part of the file name. |
| """ |
| file_name = os.path.basename(file_name) |
| pos = file_name.rindex('.') |
| return file_name[:pos] |
| |
| def OutputPerfValue(self, description, value, units, |
| higher_is_better=True, graph=None): |
| """Record a perf value. |
| |
| If graph name is not provided, the test method name will be used as the |
| graph name. |
| |
| Args: |
| description: A string description of the value such as "partition-0". A |
| special description "ref" is taken as the reference. |
| value: A float value. |
| units: A string describing the unit of measurement such as "KB", "meter". |
| higher_is_better: A boolean indicating if higher value means better |
| performance. |
| graph: A string name of the graph this value will be plotted on. If not |
| provided, the graph name will take the test method name. |
| """ |
| if not self._result_dir: |
| logging.warning('Result directory is not set. Ignore OutputPerfValue.') |
| return |
| if graph is None: |
| graph = self._testMethodName |
| file_name = self._GeneratePerfFileName() |
| perf_uploader.OutputPerfValue(file_name, description, value, units, |
| higher_is_better, graph) |
| |
| |
| class ForgivingImageTestCase(ImageTestCase): |
| """Concrete base class of forgiving tests.""" |
| |
| def IsForgiving(self): |
| return True |
| |
| |
| class NonForgivingImageTestCase(ImageTestCase): |
| """Concrete base class of non forgiving tests.""" |
| |
| def IsForgiving(self): |
| return False |
| |
| |
| class ImageTestSuite(unittest.TestSuite, _BoardAndDirectoryMixin): |
| """Wrap around unittest.TestSuite to pass more info to the actual tests.""" |
| |
| def GetTests(self): |
| return self._tests |
| |
| def run(self, result, debug=False): |
| for t in self._tests: |
| t.SetResultDir(self._result_dir) |
| t.SetBoard(self._board) |
| return super(ImageTestSuite, self).run(result) |
| |
| |
| class ImageTestRunner(unittest.TextTestRunner, _BoardAndDirectoryMixin): |
| """Wrap around unittest.TextTestRunner to pass more info down the chain.""" |
| |
| def run(self, test): |
| test.SetResultDir(self._result_dir) |
| test.SetBoard(self._board) |
| return super(ImageTestRunner, self).run(test) |
| |
| |
| ##################### |
| # Here go the tests |
| ##################### |
| |
| |
| class LocaltimeTest(NonForgivingImageTestCase): |
| """Verify that /etc/localtime is a symlink to /var/lib/timezone/localtime. |
| |
| This is an example of an image test. The image is already mounted. The |
| test can access rootfs via ROOT_A constant. |
| """ |
| |
| def TestLocaltimeIsSymlink(self): |
| localtime_path = os.path.join(ROOT_A, 'etc', 'localtime') |
| self.assertTrue(os.path.islink(localtime_path)) |
| |
| def TestLocaltimeLinkIsCorrect(self): |
| localtime_path = os.path.join(ROOT_A, 'etc', 'localtime') |
| self.assertEqual('/var/lib/timezone/localtime', |
| os.readlink(localtime_path)) |
| |
| |
| def _GuessMimeType(magic_obj, file_name): |
| """Guess a file's mimetype base on its extension and content. |
| |
| File extension is favored over file content to reduce noise. |
| |
| Args: |
| magic_obj: A loaded magic instance. |
| file_name: A path to the file. |
| |
| Returns: |
| A mime type of |file_name|. |
| """ |
| mime_type, _ = mimetypes.guess_type(file_name) |
| if not mime_type: |
| mime_type = magic_obj.file(file_name) |
| return mime_type |
| |
| |
| class BlacklistTest(NonForgivingImageTestCase): |
| """Verify that rootfs does not contain blacklisted items.""" |
| |
| def TestBlacklistedDirectories(self): |
| dirs = [os.path.join(ROOT_A, 'usr', 'share', 'locale')] |
| for d in dirs: |
| self.assertFalse(os.path.isdir(d), 'Directory %s is blacklisted.' % d) |
| |
| def TestBlacklistedFileTypes(self): |
| """Fail if there are files of prohibited types (such as C++ source code). |
| |
| The whitelist has higher precedence than the blacklist. |
| """ |
| blacklisted_patterns = [re.compile(x) for x in [ |
| r'text/x-c\+\+', |
| r'text/x-c', |
| ]] |
| whitelisted_patterns = [re.compile(x) for x in [ |
| r'.*/braille/.*', |
| r'.*/brltty/.*', |
| r'.*/etc/sudoers$', |
| r'.*/dump_vpd_log$', |
| r'.*\.conf$', |
| r'.*/libnl/classid$', |
| r'.*/locale/', |
| r'.*/X11/xkb/', |
| r'.*/chromeos-assets/', |
| r'.*/udev/rules.d/', |
| r'.*/firmware/ar3k/.*pst$', |
| r'.*/etc/services', |
| r'.*/usr/share/dev-install/portage', |
| ]] |
| |
| failures = [] |
| |
| magic_obj = magic.open(magic.MAGIC_MIME_TYPE) |
| magic_obj.load() |
| for root, _, file_names in os.walk(ROOT_A): |
| for file_name in file_names: |
| full_name = os.path.join(root, file_name) |
| if os.path.islink(full_name) or not os.path.isfile(full_name): |
| continue |
| |
| mime_type = _GuessMimeType(magic_obj, full_name) |
| if (any(x.match(mime_type) for x in blacklisted_patterns) and not |
| any(x.match(full_name) for x in whitelisted_patterns)): |
| failures.append('File %s has blacklisted type %s.' % |
| (full_name, mime_type)) |
| magic_obj.close() |
| |
| self.assertFalse(failures, '\n'.join(failures)) |
| |
| def TestValidInterpreter(self): |
| """Fail if a script's interpreter is not found, or not executable. |
| |
| A script interpreter is anything after the #! sign, up to the end of line |
| or the first space. |
| """ |
| failures = [] |
| |
| for root, _, file_names in os.walk(ROOT_A): |
| for file_name in file_names: |
| full_name = os.path.join(root, file_name) |
| file_stat = os.lstat(full_name) |
| if (not stat.S_ISREG(file_stat.st_mode) or |
| (file_stat.st_mode & 0111) == 0): |
| continue |
| |
| with open(full_name, 'rb') as f: |
| if f.read(2) != '#!': |
| continue |
| line = '#!' + f.readline().strip() |
| |
| try: |
| # Ignore arguments to the interpreter. |
| interp, _ = filetype.SplitShebang(line) |
| except ValueError: |
| failures.append('File %s has an invalid interpreter path: "%s".' % |
| (full_name, line)) |
| |
| # Absolute path to the interpreter. |
| interp = os.path.join(ROOT_A, interp.lstrip('/')) |
| # Interpreter could be a symlink. Resolve it. |
| interp = osutils.ResolveSymlink(interp, ROOT_A) |
| if not os.path.isfile(interp): |
| failures.append('File %s uses non-existing interpreter %s.' % |
| (full_name, interp)) |
| elif (os.stat(interp).st_mode & 0111) == 0: |
| failures.append('Interpreter %s is not executable.' % interp) |
| |
| self.assertFalse(failures, '\n'.join(failures)) |
| |
| |
| class LinkageTest(NonForgivingImageTestCase): |
| """Verify that all binaries and libraries have proper linkage.""" |
| |
| def setUp(self): |
| osutils.MountDir( |
| os.path.join(STATEFUL, 'var_overlay'), |
| os.path.join(ROOT_A, 'var'), |
| mount_opts=('bind', ), |
| ) |
| |
| def tearDown(self): |
| osutils.UmountDir( |
| os.path.join(ROOT_A, 'var'), |
| cleanup=False, |
| ) |
| |
| def _IsPackageMerged(self, package_name): |
| cmd = [ |
| 'portageq-%s' % self._board, |
| 'has_version', |
| ROOT_A, |
| package_name |
| ] |
| ret = cros_build_lib.RunCommand(cmd, error_code_ok=True) |
| return ret.returncode == 0 |
| |
| def TestLinkage(self): |
| """Find main executable binaries and check their linkage.""" |
| binaries = [ |
| 'boot/vmlinuz', |
| 'bin/sed', |
| ] |
| |
| if self._IsPackageMerged('chromeos-base/chromeos-login'): |
| binaries.append('sbin/session_manager') |
| |
| if self._IsPackageMerged('x11-base/xorg-server'): |
| binaries.append('usr/bin/Xorg') |
| |
| # When chrome is built with USE="pgo_generate", rootfs chrome is actually a |
| # symlink to a real binary which is in the stateful partition. So we do not |
| # check for a valid chrome binary in that case. |
| if (not self._IsPackageMerged('chromeos-base/chromeos-chrome[pgo_generate]') |
| and self._IsPackageMerged('chromeos-base/chromeos-chrome')): |
| binaries.append('opt/google/chrome/chrome') |
| |
| binaries = [os.path.join(ROOT_A, x) for x in binaries] |
| |
| # Grab all .so files |
| libraries = [] |
| for root, _, files in os.walk(ROOT_A): |
| for name in files: |
| filename = os.path.join(root, name) |
| if '.so' in filename: |
| libraries.append(filename) |
| |
| # lddtree is only available in the chroot but this image_test module |
| # is imported by test_stages outside of the chroot too. So we dynamically |
| # import lddtree here, where it actually is used. |
| import lddtree |
| ldpaths = lddtree.LoadLdpaths(ROOT_A) |
| for to_test in itertools.chain(binaries, libraries): |
| # to_test could be a symlink, we need to resolve it relative to ROOT_A. |
| while os.path.islink(to_test): |
| link = os.readlink(to_test) |
| if link.startswith('/'): |
| to_test = os.path.join(ROOT_A, link[1:]) |
| else: |
| to_test = os.path.join(os.path.dirname(to_test), link) |
| try: |
| lddtree.ParseELF(to_test, root=ROOT_A, ldpaths=ldpaths) |
| except lddtree.exceptions.ELFError: |
| continue |
| except IOError as e: |
| self.fail('Fail linkage test for %s: %s' % (to_test, e)) |
| |
| |
| class FileSystemMetaDataTest(ForgivingImageTestCase): |
| """A test class to gather file system stats such as free inodes, blocks.""" |
| |
| def TestStats(self): |
| """Collect inodes and blocks usage.""" |
| # Find the loopback device that was mounted to ROOT_A. |
| loop_device = None |
| root_path = os.path.abspath(os.readlink(ROOT_A)) |
| for mtab in osutils.IterateMountPoints(): |
| if mtab.destination == root_path: |
| loop_device = mtab.source |
| break |
| self.assertTrue(loop_device, 'Cannot find loopback device for ROOT_A.') |
| |
| # Gather file system stats with tune2fs. |
| cmd = [ |
| 'tune2fs', |
| '-l', |
| loop_device |
| ] |
| # tune2fs produces output like this: |
| # |
| # tune2fs 1.42 (29-Nov-2011) |
| # Filesystem volume name: ROOT-A |
| # Last mounted on: <not available> |
| # Filesystem UUID: <none> |
| # Filesystem magic number: 0xEF53 |
| # Filesystem revision #: 1 (dynamic) |
| # ... |
| # |
| # So we need to ignore the first line. |
| ret = cros_build_lib.SudoRunCommand(cmd, capture_output=True, |
| extra_env={'LC_ALL': 'C'}) |
| fs_stat = dict(line.split(':', 1) for line in ret.output.splitlines() |
| if ':' in line) |
| free_inodes = int(fs_stat['Free inodes']) |
| free_blocks = int(fs_stat['Free blocks']) |
| inode_count = int(fs_stat['Inode count']) |
| block_count = int(fs_stat['Block count']) |
| block_size = int(fs_stat['Block size']) |
| |
| sum_file_size = 0 |
| for root, _, filenames in os.walk(ROOT_A): |
| for file_name in filenames: |
| full_name = os.path.join(root, file_name) |
| file_stat = os.lstat(full_name) |
| sum_file_size += file_stat.st_size |
| |
| metadata_size = (block_count - free_blocks) * block_size - sum_file_size |
| |
| self.OutputPerfValue('free_inodes_over_inode_count', |
| free_inodes * 100.0 / inode_count, 'percent', |
| graph='free_over_used_ratio') |
| self.OutputPerfValue('free_blocks_over_block_count', |
| free_blocks * 100.0 / block_count, 'percent', |
| graph='free_over_used_ratio') |
| self.OutputPerfValue('apparent_size', sum_file_size, 'bytes', |
| higher_is_better=False, graph='filesystem_stats') |
| self.OutputPerfValue('metadata_size', metadata_size, 'bytes', |
| higher_is_better=False, graph='filesystem_stats') |
| |
| |
| class SymbolsTest(NonForgivingImageTestCase): |
| """Tests related to symbols in ELF files.""" |
| |
| def setUp(self): |
| # Mapping of file name --> 2-tuple (import, export). |
| self._known_symtabs = {} |
| |
| def _GetSymbols(self, file_name): |
| """Return a 2-tuple (import, export) of an ELF file |file_name|. |
| |
| Import and export in the returned tuple are sets of strings (symbol names). |
| """ |
| if file_name in self._known_symtabs: |
| return self._known_symtabs[file_name] |
| |
| # We use cstringio here to obviate fseek/fread time in pyelftools. |
| stream = cStringIO.StringIO(osutils.ReadFile(file_name)) |
| |
| # pyelftools is not available during initial bootstrap, crbug.com/341152. |
| from chromite.lib import parseelf |
| from elftools.elf import elffile |
| from elftools.common import exceptions |
| |
| try: |
| elf = elffile.ELFFile(stream) |
| except exceptions.ELFError: |
| raise ValueError('%s is not an ELF file.' % file_name) |
| |
| imp, exp = parseelf.ParseELFSymbols(elf) |
| exp = set(exp.keys()) |
| |
| self._known_symtabs[file_name] = imp, exp |
| return imp, exp |
| |
| def TestImportedSymbolsAreAvailable(self): |
| """Ensure all ELF files' imported symbols are available in ROOT-A. |
| |
| In this test, we find all imported symbols and exported symbols from all |
| ELF files on the system. This test will fail if the set of imported symbols |
| is not a subset of exported symbols. |
| |
| This test DOES NOT simulate ELF loading. "TestLinkage" does that with |
| `lddtree`. |
| """ |
| # Import tables of files, keyed by file names. |
| importeds = collections.defaultdict(set) |
| # All exported symbols. |
| exported = set() |
| |
| for root, _, filenames in os.walk(ROOT_A): |
| for filename in filenames: |
| full_name = os.path.join(root, filename) |
| if os.path.islink(full_name) or not os.path.isfile(full_name): |
| continue |
| |
| try: |
| imp, exp = self._GetSymbols(full_name) |
| except (ValueError, IOError): |
| continue |
| else: |
| importeds[full_name] = imp |
| exported.update(exp) |
| |
| failures = [] |
| for full_name, imported in importeds.iteritems(): |
| missing = imported - exported |
| if missing: |
| failures.append('File %s contains unsatisfied symbols: %r' % |
| (full_name, missing)) |
| self.assertFalse(failures, '\n'.join(failures)) |