| # Copyright (c) 2010 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| """A module containing kernel handler class used by SAFT.""" |
| |
| import hashlib |
| import os |
| import re |
| |
| TMP_FILE_NAME = 'kernel_header_dump' |
| |
| # Types of kernel modifications. |
| KERNEL_BODY_MOD = 1 |
| KERNEL_VERSION_MOD = 2 |
| KERNEL_RESIGN_MOD = 3 |
| |
| |
| class KernelHandlerError(Exception): |
| """KernelHandler-specific exception.""" |
| pass |
| |
| |
| class KernelHandler(object): |
| """An object to provide ChromeOS kernel related actions. |
| |
| Mostly it allows to corrupt and restore a particular kernel partition |
| (designated by the partition name, A or B. |
| |
| @type os_if: autotest_lib.client.cros.faft.utils.os_interface.OSInterface |
| """ |
| |
| # This value is used to alter contents of a byte in the appropriate kernel |
| # image. First added to corrupt the image, then subtracted to restore the |
| # image. |
| DELTA = 1 |
| |
| # The maximum kernel size in MB. |
| KERNEL_SIZE_MB = 16 |
| |
| def __init__(self, os_if): |
| self.os_if = os_if |
| self.dump_file_name = None |
| self.partition_map = {} |
| self.root_dev = None |
| self.initialized = False |
| |
| def _get_version(self, device): |
| """Get version of the kernel hosted on the passed in partition.""" |
| # 16 K should be enough to include headers and keys |
| data = self.os_if.read_partition(device, 0x4000) |
| return self.os_if.retrieve_body_version(data) |
| |
| def _get_datakey_version(self, device): |
| """Get datakey version of kernel hosted on the passed in partition.""" |
| # 16 K should be enought to include headers and keys |
| data = self.os_if.read_partition(device, 0x4000) |
| return self.os_if.retrieve_datakey_version(data) |
| |
| def _get_partition_map(self, internal_disk=True): |
| """Scan `cgpt show <device> output to find kernel devices. |
| |
| Args: |
| internal_disk - decide whether to use internal kernel disk. |
| """ |
| if internal_disk: |
| target_device = self.os_if.get_internal_disk( |
| self.os_if.get_root_part()) |
| else: |
| target_device = self.root_dev |
| |
| kernel_partitions = re.compile('KERN-([AB])') |
| disk_map = self.os_if.run_shell_command_get_output( |
| 'cgpt show %s' % target_device) |
| |
| for line in disk_map: |
| matched_line = kernel_partitions.search(line) |
| if not matched_line: |
| continue |
| label = matched_line.group(1) |
| part_info = {} |
| device = self.os_if.join_part(target_device, line.split()[2]) |
| part_info['device'] = device |
| part_info['version'] = self._get_version(device) |
| part_info['datakey_version'] = self._get_datakey_version(device) |
| self.partition_map[label] = part_info |
| |
| def dump_kernel(self, section, kernel_path): |
| """Dump the specified kernel to a file. |
| |
| @param section: The kernel to dump. May be A or B. |
| @param kernel_path: The path to the kernel image. |
| """ |
| dev = self.partition_map[section.upper()]['device'] |
| cmd = 'dd if=%s of=%s bs=%dM count=1' % (dev, kernel_path, |
| self.KERNEL_SIZE_MB) |
| self.os_if.run_shell_command(cmd) |
| |
| def write_kernel(self, section, kernel_path): |
| """Write a kernel image to the specified section. |
| |
| @param section: The kernel to write. May be A or B. |
| @param kernel_path: The path to the kernel image to write. |
| """ |
| dev = self.partition_map[section.upper()]['device'] |
| dd_cmd = 'dd if=%s of=%s bs=%dM count=1' % (kernel_path, dev, |
| self.KERNEL_SIZE_MB) |
| self.os_if.run_shell_command(dd_cmd, modifies_device=True) |
| |
| def _modify_kernel(self, |
| section, |
| delta, |
| modification_type=KERNEL_BODY_MOD, |
| key_path=None): |
| """Modify kernel image on a disk partition. |
| |
| This method supports three types of kernel modification. KERNEL_BODY_MOD |
| just adds the value of delta to the first byte of the kernel blob. |
| This might leave the kernel corrupted (as required by the test). |
| |
| The second type, KERNEL_VERSION_MOD - will use 'delta' as the new |
| version number, it will put it in the kernel header, and then will |
| resign the kernel blob. |
| |
| The third type. KERNEL_RESIGN_MOD - will resign the kernel with keys in |
| argument key_path. If key_path is None, choose dev_key_path as resign |
| key directory. |
| """ |
| self.dump_kernel(section, self.dump_file_name) |
| data = list(self.os_if.read_file(self.dump_file_name)) |
| if modification_type == KERNEL_BODY_MOD: |
| data[0] = '%c' % ((ord(data[0]) + delta) % 0x100) |
| self.os_if.write_file(self.dump_file_name, ''.join(data)) |
| kernel_to_write = self.dump_file_name |
| elif modification_type == KERNEL_VERSION_MOD: |
| new_version = delta |
| kernel_to_write = self.dump_file_name + '.new' |
| self.os_if.run_shell_command( |
| 'vbutil_kernel --repack %s --version %d ' |
| '--signprivate %s --oldblob %s' % |
| (kernel_to_write, new_version, |
| os.path.join(self.dev_key_path, |
| 'kernel_data_key.vbprivk'), |
| self.dump_file_name)) |
| elif modification_type == KERNEL_RESIGN_MOD: |
| if key_path and self.os_if.is_dir(key_path): |
| resign_key_path = key_path |
| else: |
| resign_key_path = self.dev_key_path |
| |
| kernel_to_write = self.dump_file_name + '.new' |
| self.os_if.run_shell_command( |
| 'vbutil_kernel --repack %s ' |
| '--signprivate %s --oldblob %s --keyblock %s' % |
| (kernel_to_write, |
| os.path.join(resign_key_path, 'kernel_data_key.vbprivk'), |
| self.dump_file_name, |
| os.path.join(resign_key_path, 'kernel.keyblock'))) |
| else: |
| return # Unsupported mode, ignore. |
| self.write_kernel(section, kernel_to_write) |
| |
| def corrupt_kernel(self, section): |
| """Corrupt a kernel section (add DELTA to the first byte).""" |
| self._modify_kernel(section.upper(), self.DELTA) |
| |
| def restore_kernel(self, section): |
| """Restore the previously corrupted kernel.""" |
| self._modify_kernel(section.upper(), -self.DELTA) |
| |
| def get_version(self, section): |
| """Return version read from this section blob's header.""" |
| return self.partition_map[section.upper()]['version'] |
| |
| def get_datakey_version(self, section): |
| """Return datakey version read from this section blob's header.""" |
| return self.partition_map[section.upper()]['datakey_version'] |
| |
| def get_sha(self, section): |
| """Return the SHA1 hash of the section blob.""" |
| s = hashlib.sha1() |
| dev = self.partition_map[section.upper()]['device'] |
| s.update(self.os_if.read_file(dev)) |
| return s.hexdigest() |
| |
| def set_version(self, section, version): |
| """Set version of this kernel blob and re-sign it.""" |
| if version < 0: |
| raise KernelHandlerError('Bad version value %d' % version) |
| self._modify_kernel(section.upper(), version, KERNEL_VERSION_MOD) |
| |
| def resign_kernel(self, section, key_path=None): |
| """Resign kernel with original kernel version and keys in key_path.""" |
| self._modify_kernel(section.upper(), self.get_version(section), |
| KERNEL_RESIGN_MOD, key_path) |
| |
| def init(self, dev_key_path='.', internal_disk=True): |
| """Initialize the kernel handler object. |
| |
| Input argument is an OS interface object reference. |
| """ |
| self.dev_key_path = dev_key_path |
| self.root_dev = self.os_if.get_root_dev() |
| self.dump_file_name = self.os_if.state_dir_file(TMP_FILE_NAME) |
| self._get_partition_map(internal_disk) |
| self.initialized = True |