| # -*- coding: utf-8 -*- |
| # Copyright 2015 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Utilities for manipulating ChromeOS images.""" |
| |
| from __future__ import print_function |
| |
| import glob |
| import os |
| import re |
| |
| from chromite.lib import constants |
| from chromite.lib import cros_build_lib |
| from chromite.lib import cros_logging as logging |
| from chromite.lib import git |
| from chromite.lib import osutils |
| from chromite.lib import signing |
| |
| |
| # security_check: pass_config mapping. |
| _SECURITY_CHECKS = { |
| 'no_nonrelease_files': True, |
| 'sane_lsb-release': True, |
| 'secure_kernelparams': True, |
| 'not_ASAN': False, |
| } |
| |
| |
| class Error(Exception): |
| """Base image_lib error class.""" |
| |
| |
| class LoopbackError(Error): |
| """An exception raised when something went wrong setting up a loopback""" |
| |
| |
| class LoopbackPartitions(object): |
| """Loopback mount a file and provide access to its partitions. |
| |
| This class can be used as a context manager with the "with" statement, or |
| individual instances of it can be created which will clean themselves up |
| when garbage collected or when explicitly closed, ala the tempfile module. |
| |
| In either case, the same arguments should be passed to init. |
| |
| Args: |
| path: Path to the backing file. |
| losetup: Path to the losetup utility. The default uses the system PATH. |
| """ |
| def __init__(self, path, util_path=None): |
| self._util_path = util_path |
| self.path = path |
| self.dev = None |
| self.parts = {} |
| |
| try: |
| cmd = ['losetup', '--show', '-f', self.path] |
| ret = self._au_safe_sudo(cmd, print_cmd=False, capture_output=True) |
| self.dev = ret.output.strip() |
| cmd = ['partx', '-d', self.dev] |
| self._au_safe_sudo(cmd, quiet=True, error_code_ok=True) |
| cmd = ['partx', '-a', self.dev] |
| self._au_safe_sudo(cmd, print_cmd=False) |
| |
| self.parts = {} |
| part_devs = glob.glob(self.dev + 'p*') |
| if not part_devs: |
| logging.Warning('Didn\'t find partition devices nodes for %s.', |
| self.path) |
| return |
| |
| for part in part_devs: |
| number = int(re.search(r'p(\d+)$', part).group(1)) |
| self.parts[number] = part |
| |
| except: |
| self.close() |
| raise |
| |
| def _au_safe_sudo(self, cmd, **kwargs): |
| """Run a command using sudo in a way that respects the util_path""" |
| newcmd = osutils.Which(cmd[0], path=self._util_path) |
| if newcmd: |
| cmd = [newcmd] + cmd[1:] |
| return cros_build_lib.SudoRunCommand(cmd, **kwargs) |
| |
| |
| def close(self): |
| if self.dev: |
| cmd = ['partx', '-d', self.dev] |
| self._au_safe_sudo(cmd, quiet=True, error_code_ok=True) |
| self._au_safe_sudo(['losetup', '--detach', self.dev]) |
| self.dev = None |
| self.parts = {} |
| |
| def __enter__(self): |
| return self |
| |
| def __exit__(self, exc_type, exc, tb): |
| self.close() |
| |
| def __del__(self): |
| self.close() |
| |
| |
| def WriteLsbRelease(sysroot, fields): |
| """Writes out the /etc/lsb-release file into the given sysroot. |
| |
| Args: |
| sysroot: The sysroot to write the lsb-release file to. |
| fields: A dictionary of all the fields and values to write. |
| """ |
| content = '\n'.join('%s=%s' % (k, v) for k, v in fields.items()) + '\n' |
| |
| path = os.path.join(sysroot, constants.LSB_RELEASE_PATH.lstrip('/')) |
| |
| if os.path.exists(path): |
| # The file has already been pre-populated with some fields. Since |
| # osutils.WriteFile(..) doesn't support appending with sudo, read in the |
| # content and prepend it to the new content to write. |
| # TODO(stevefung): Remove this appending, once all writing to the |
| # /etc/lsb-release file has been removed from ebuilds and consolidated |
| # to the buid tools. |
| content = osutils.ReadFile(path) + content |
| |
| osutils.WriteFile(path, content, mode='w', makedirs=True, sudo=True) |
| |
| |
| def GetLatestImageLink(board): |
| """Get the path for the `latest` image symlink for the given board.""" |
| return os.path.join(constants.SOURCE_ROOT, 'src/build/images', board, |
| 'latest') |
| |
| |
| class ImageDoesNotExistError(Error): |
| """When the provided or implied image path does not exist.""" |
| |
| |
| class SecurityConfigDirectoryError(Error): |
| """The SecurityTestConfig directory does not exist.""" |
| |
| |
| class SecurityTestArgumentError(Error): |
| """Invalid SecurityTest argument error.""" |
| |
| |
| class VbootCheckoutError(Error): |
| """Error checking out the stable vboot source.""" |
| |
| |
| def SecurityTest(board=None, image=None, baselines=None, vboot_hash=None): |
| """Image security tests. |
| |
| Args: |
| board: str - The board whose image should be tested. Used when |image| is |
| not provided or is a basename. Defaults to the default board. |
| image: str - The path to an image that should be tested, or the basename of |
| the desired image in the |board|'s build directory. |
| baselines: str - The path to a directory containing the baseline configs. |
| vboot_hash: str - The commit hash to checkout for the vboot_reference clone. |
| |
| Returns: |
| bool - True on success, False on failure. |
| |
| Raises: |
| SecurityTestArgumentError when one or more arguments are not valid. |
| VbootCheckoutError when the vboot_reference repository cannot be cloned or |
| the |vboot_hash| cannot be checked out. |
| """ |
| if not cros_build_lib.IsInsideChroot(): |
| cmd = ['security_test_image'] |
| if board: |
| cmd += ['--board', board] |
| if image: |
| cmd += ['--image', image] |
| if baselines: |
| cmd += ['--baselines', baselines] |
| if vboot_hash: |
| cmd += ['--vboot-hash', vboot_hash] |
| result = cros_build_lib.RunCommand(cmd, enter_chroot=True, |
| error_code_ok=True) |
| return not result.returncode |
| else: |
| try: |
| image = BuildImagePath(board, image) |
| except ImageDoesNotExistError as e: |
| raise SecurityTestArgumentError(e.message) |
| logging.info('Using %s', image) |
| |
| if not baselines: |
| baselines = signing.SECURITY_BASELINES_DIR |
| if not os.path.exists(baselines): |
| if not os.path.exists(signing.CROS_SIGNING_BASE_DIR): |
| logging.warning('Skipping security tests with public manifest.') |
| return True |
| else: |
| raise SecurityTestArgumentError( |
| 'Could not locate security baselines from %s with private ' |
| 'manifest.' % baselines) |
| logging.info('Loading baselines from %s', baselines) |
| |
| if not vboot_hash: |
| vboot_hash = signing.GetDefaultVbootStableHash() |
| if not vboot_hash: |
| raise SecurityTestArgumentError( |
| 'Could not detect vboot_stable_hash in %s.' |
| % signing.CROS_SIGNING_CONFIG) |
| logging.info('Using vboot_reference.git rev %s', vboot_hash) |
| |
| with osutils.TempDir() as tempdir: |
| config = SecurityTestConfig(image, baselines, vboot_hash, tempdir) |
| failures = sum(config.RunCheck(check, with_config) |
| for check, with_config in _SECURITY_CHECKS.iteritems()) |
| |
| if failures: |
| logging.error('%s tests failed', failures) |
| else: |
| logging.info('All tests passed.') |
| |
| return not failures |
| |
| |
| def BuildImagePath(board, image): |
| """Build a fully qualified path to the image. |
| |
| Args: |
| board: str - The name of the board whose image is being tested when an |
| image path is not specified. |
| image: str - The path to an image (in which case |image| is simply returned) |
| or the basename of the image file to use. When |image| is a basename, the |
| |board| build directory is always used to find it. |
| """ |
| # Prefer an image path if provided. |
| if image and os.sep in image: |
| if os.path.exists(image): |
| return image |
| else: |
| raise ImageDoesNotExistError( |
| 'The provided image does not exist: %s' % image) |
| |
| # We have no image or a basename only, so we need the board to build out the |
| # full path to an image file. |
| if not board: |
| board = cros_build_lib.GetDefaultBoard() |
| |
| if not board: |
| if image: |
| raise ImageDoesNotExistError( |
| '|image| must be a full path or used with |board|.') |
| else: |
| raise ImageDoesNotExistError( |
| 'Either |image| or |board| must be provided.') |
| |
| # Build out the full path using the board's build path. |
| image_file = image or 'recovery_image.bin' |
| image = os.path.join(GetLatestImageLink(board), image_file) |
| |
| if not os.path.exists(image): |
| raise ImageDoesNotExistError('Image does not exist: %s' % image) |
| |
| return image |
| |
| |
| class SecurityTestConfig(object): |
| """Hold configurations and do related setup.""" |
| |
| _VBOOT_SRC = os.path.join(constants.SOURCE_ROOT, |
| 'src/platform/vboot_reference/.git') |
| _VBOOT_CHECKS_REL_DIR = 'scripts/image_signing' |
| |
| def __init__(self, image, baselines, vboot_hash, directory): |
| """SecurityTest run configuration. |
| |
| Args: |
| image: str - Path to an image. |
| baselines: str - Path to the security baselines. |
| vboot_hash: str - Commit hash for the vboot_reference. |
| directory: str - The directory to use for the vboot_reference checkout. |
| Usually a temporary directory. |
| """ |
| self.image = image |
| self.baselines = baselines |
| self.vboot_hash = vboot_hash |
| self.directory = directory |
| self._repo_dir = os.path.join(self.directory, 'vboot_source') |
| self._checks_dir = os.path.join(self._repo_dir, self._VBOOT_CHECKS_REL_DIR) |
| self._checked_out = False |
| |
| def RunCheck(self, check, pass_config): |
| """Run the given check. |
| |
| Args: |
| check: str - A config.vboot_dir/ensure_|check|.sh check name. |
| pass_config: bool - Whether the check has a corresponding |
| `ensure_|check|.config` file to pass. |
| |
| Returns: |
| bool - True on success, False on failure. |
| |
| Raises: |
| SecurityConfigDirectoryError if the directory does not exist. |
| VbootCheckoutError if the vboot reference repo could not be cloned or the |
| vboot_hash could not be checked out. |
| """ |
| self._VbootCheckout() |
| |
| cmd = [os.path.join(self._checks_dir, 'ensure_%s.sh' % check), self.image] |
| if pass_config: |
| cmd.append(os.path.join(self.baselines, 'ensure_%s.config' % check)) |
| |
| try: |
| self._RunCommand(cmd) |
| except cros_build_lib.RunCommandError as e: |
| logging.error('%s test failed: %s', check, e.message) |
| return False |
| else: |
| return True |
| |
| def _VbootCheckout(self): |
| """Clone the vboot reference repo and checkout the vboot stable hash.""" |
| if not os.path.exists(self.directory): |
| raise SecurityConfigDirectoryError('The directory does not exist.') |
| |
| if not self._checked_out: |
| try: |
| git.Clone(self._repo_dir, self._VBOOT_SRC, reference=self._VBOOT_SRC) |
| except cros_build_lib.RunCommandError as e: |
| raise VbootCheckoutError('Failed cloning repo from %s: %s' % |
| (self._VBOOT_SRC, e.message)) |
| try: |
| cros_build_lib.RunCommand(['git', 'checkout', '-q', self.vboot_hash], |
| cwd=self._repo_dir) |
| except cros_build_lib.RunCommandError as e: |
| raise VbootCheckoutError('Failed checking out %s from %s: %s' % |
| (self.vboot_hash, self._VBOOT_SRC, e.message)) |
| self._checked_out = True |
| |
| def _RunCommand(self, cmd, *args, **kwargs): |
| """Run a command ensuring the signing bin directory is included in PATH.""" |
| extra_env = {'PATH': '%s:%s' % (signing.CROS_SIGNING_BIN_DIR, |
| os.environ['PATH'])} |
| kwargs['extra_env'] = extra_env.update(kwargs.get('extra_env', {})) |
| return cros_build_lib.RunCommand(cmd, *args, **kwargs) |