blob: 549a6d4dd8e667bee90b637e77b0830c667a3474 [file] [log] [blame]
# Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""A module containing kernel handler class used by SAFT."""
import hashlib
import os
import re
TMP_FILE_NAME = 'kernel_header_dump'
# Types of kernel modifications.
KERNEL_BODY_MOD = 1
KERNEL_VERSION_MOD = 2
KERNEL_RESIGN_MOD = 3
class KernelHandlerError(Exception):
"""KernelHandler-specific exception."""
pass
class KernelHandler(object):
"""An object to provide ChromeOS kernel related actions.
Mostly it allows to corrupt and restore a particular kernel partition
(designated by the partition name, A or B.
@type os_if: autotest_lib.client.cros.faft.utils.os_interface.OSInterface
"""
# This value is used to alter contents of a byte in the appropriate kernel
# image. First added to corrupt the image, then subtracted to restore the
# image.
DELTA = 1
# The maximum kernel size in MB.
KERNEL_SIZE_MB = 16
def __init__(self, os_if):
self.os_if = os_if
self.dump_file_name = None
self.partition_map = {}
self.root_dev = None
self.initialized = False
def _get_version(self, device):
"""Get version of the kernel hosted on the passed in partition."""
# 16 K should be enough to include headers and keys
data = self.os_if.read_partition(device, 0x4000)
return self.os_if.retrieve_body_version(data)
def _get_datakey_version(self, device):
"""Get datakey version of kernel hosted on the passed in partition."""
# 16 K should be enought to include headers and keys
data = self.os_if.read_partition(device, 0x4000)
return self.os_if.retrieve_datakey_version(data)
def _get_partition_map(self, internal_disk=True):
"""Scan `cgpt show <device> output to find kernel devices.
Args:
internal_disk - decide whether to use internal kernel disk.
"""
if internal_disk:
target_device = self.os_if.get_internal_disk(
self.os_if.get_root_part())
else:
target_device = self.root_dev
kernel_partitions = re.compile('KERN-([AB])')
disk_map = self.os_if.run_shell_command_get_output(
'cgpt show %s' % target_device)
for line in disk_map:
matched_line = kernel_partitions.search(line)
if not matched_line:
continue
label = matched_line.group(1)
part_info = {}
device = self.os_if.join_part(target_device, line.split()[2])
part_info['device'] = device
part_info['version'] = self._get_version(device)
part_info['datakey_version'] = self._get_datakey_version(device)
self.partition_map[label] = part_info
def dump_kernel(self, section, kernel_path):
"""Dump the specified kernel to a file.
@param section: The kernel to dump. May be A or B.
@param kernel_path: The path to the kernel image.
"""
dev = self.partition_map[section.upper()]['device']
cmd = 'dd if=%s of=%s bs=%dM count=1' % (dev, kernel_path,
self.KERNEL_SIZE_MB)
self.os_if.run_shell_command(cmd)
def write_kernel(self, section, kernel_path):
"""Write a kernel image to the specified section.
@param section: The kernel to write. May be A or B.
@param kernel_path: The path to the kernel image to write.
"""
dev = self.partition_map[section.upper()]['device']
dd_cmd = 'dd if=%s of=%s bs=%dM count=1' % (kernel_path, dev,
self.KERNEL_SIZE_MB)
self.os_if.run_shell_command(dd_cmd, modifies_device=True)
def _modify_kernel(self,
section,
delta,
modification_type=KERNEL_BODY_MOD,
key_path=None):
"""Modify kernel image on a disk partition.
This method supports three types of kernel modification. KERNEL_BODY_MOD
just adds the value of delta to the first byte of the kernel blob.
This might leave the kernel corrupted (as required by the test).
The second type, KERNEL_VERSION_MOD - will use 'delta' as the new
version number, it will put it in the kernel header, and then will
resign the kernel blob.
The third type. KERNEL_RESIGN_MOD - will resign the kernel with keys in
argument key_path. If key_path is None, choose dev_key_path as resign
key directory.
"""
self.dump_kernel(section, self.dump_file_name)
data = list(self.os_if.read_file(self.dump_file_name))
if modification_type == KERNEL_BODY_MOD:
data[0] = '%c' % ((ord(data[0]) + delta) % 0x100)
self.os_if.write_file(self.dump_file_name, ''.join(data))
kernel_to_write = self.dump_file_name
elif modification_type == KERNEL_VERSION_MOD:
new_version = delta
kernel_to_write = self.dump_file_name + '.new'
self.os_if.run_shell_command(
'vbutil_kernel --repack %s --version %d '
'--signprivate %s --oldblob %s' %
(kernel_to_write, new_version,
os.path.join(self.dev_key_path,
'kernel_data_key.vbprivk'),
self.dump_file_name))
elif modification_type == KERNEL_RESIGN_MOD:
if key_path and self.os_if.is_dir(key_path):
resign_key_path = key_path
else:
resign_key_path = self.dev_key_path
kernel_to_write = self.dump_file_name + '.new'
self.os_if.run_shell_command(
'vbutil_kernel --repack %s '
'--signprivate %s --oldblob %s --keyblock %s' %
(kernel_to_write,
os.path.join(resign_key_path, 'kernel_data_key.vbprivk'),
self.dump_file_name,
os.path.join(resign_key_path, 'kernel.keyblock')))
else:
return # Unsupported mode, ignore.
self.write_kernel(section, kernel_to_write)
def corrupt_kernel(self, section):
"""Corrupt a kernel section (add DELTA to the first byte)."""
self._modify_kernel(section.upper(), self.DELTA)
def restore_kernel(self, section):
"""Restore the previously corrupted kernel."""
self._modify_kernel(section.upper(), -self.DELTA)
def get_version(self, section):
"""Return version read from this section blob's header."""
return self.partition_map[section.upper()]['version']
def get_datakey_version(self, section):
"""Return datakey version read from this section blob's header."""
return self.partition_map[section.upper()]['datakey_version']
def get_sha(self, section):
"""Return the SHA1 hash of the section blob."""
s = hashlib.sha1()
dev = self.partition_map[section.upper()]['device']
s.update(self.os_if.read_file(dev))
return s.hexdigest()
def set_version(self, section, version):
"""Set version of this kernel blob and re-sign it."""
if version < 0:
raise KernelHandlerError('Bad version value %d' % version)
self._modify_kernel(section.upper(), version, KERNEL_VERSION_MOD)
def resign_kernel(self, section, key_path=None):
"""Resign kernel with original kernel version and keys in key_path."""
self._modify_kernel(section.upper(), self.get_version(section),
KERNEL_RESIGN_MOD, key_path)
def init(self, dev_key_path='.', internal_disk=True):
"""Initialize the kernel handler object.
Input argument is an OS interface object reference.
"""
self.dev_key_path = dev_key_path
self.root_dev = self.os_if.get_root_dev()
self.dump_file_name = self.os_if.state_dir_file(TMP_FILE_NAME)
self._get_partition_map(internal_disk)
self.initialized = True