blob: bef256e608efaa5779096379bb680cd043e63dd1 [file] [log] [blame]
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Code to provide functions for FAFT tests.
These can be exposed via a xmlrpci server running on the DUT.
"""
import functools, os, shutil, tempfile
import common
from autotest_lib.client.cros.faft.utils import (cgpt_state,
cgpt_handler,
chromeos_interface,
firmware_updater,
flashrom_handler,
kernel_handler,
rootfs_handler,
saft_flashrom_util,
tpm_handler,
)
def allow_multiple_section_input(image_operator):
"""Decorate a method to support multiple sections.
@param image_operator: Method accepting one section as its argument.
"""
@functools.wraps(image_operator)
def wrapper(self, section):
"""Wrapper method to support multiple sections.
@param section: A list of sections of just a section.
"""
if type(section) in (tuple, list):
for sec in section:
image_operator(self, sec)
else:
image_operator(self, section)
return wrapper
class LazyFlashromHandlerProxy:
"""Proxy of FlashromHandler for lazy initialization."""
_loaded = False
_obj = None
def __init__(self, *args, **kargs):
self._args = args
self._kargs = kargs
def _load(self):
self._obj = flashrom_handler.FlashromHandler()
self._obj.init(*self._args, **self._kargs)
self._obj.new_image()
self._loaded = True
def __getattr__(self, name):
if not self._loaded:
self._load()
return getattr(self._obj, name)
def reload(self):
"""Reload the FlashromHandler class."""
self._loaded = False
class RPCFunctions(object):
"""A class which aggregates some useful functions for firmware testing.
This class can be exposed via a XMLRPC server such that its functions can
be accessed remotely. Method naming should fit the naming rule
'_[categories]_[method_name]' where categories contains system, ec, bios,
kernel, cgpt, tpm, updater, etc. Methods should be called by
'FAFTClient.[categories].[method_name]', because _dispatch will rename
this name to '_[categories]_[method_name]'.
Attributes:
_chromeos_interface: An object to encapsulate OS services functions.
_bios_handler: An object to automate BIOS flashrom testing.
_ec_handler: An object to automate EC flashrom testing.
_ec_image: An object to automate EC image for autest.
_kernel_handler: An object to provide kernel related actions.
_log_file: Path of the log file.
_tpm_handler: An object to control TPM device.
_updater: An object to update firmware.
_temp_path: Path of a temp directory.
_keys_path: Path of a directory, keys/, in temp directory.
_work_path: Path of a directory, work/, in temp directory.
"""
def __init__(self):
"""Initialize the data attributes of this class."""
# TODO(waihong): Move the explicit object.init() methods to the
# objects' constructors (ChromeOSInterface, FlashromHandler,
# KernelHandler, and TpmHandler).
self._chromeos_interface = chromeos_interface.ChromeOSInterface(False)
# We keep the state of FAFT test in a permanent directory over reboots.
state_dir = '/var/tmp/faft'
self._log_file = os.path.join(state_dir, 'faft_client.log')
self._chromeos_interface.init(state_dir, log_file=self._log_file)
os.chdir(state_dir)
self._bios_handler = LazyFlashromHandlerProxy(
saft_flashrom_util,
self._chromeos_interface,
None,
'/usr/share/vboot/devkeys',
'bios')
self._ec_handler = None
if not os.system("mosys ec info"):
self._ec_handler = LazyFlashromHandlerProxy(
saft_flashrom_util,
self._chromeos_interface,
'ec_root_key.vpubk',
'/usr/share/vboot/devkeys',
'ec')
self._ec_image = None
self._kernel_handler = kernel_handler.KernelHandler()
# TODO(waihong): The dev_key_path is a new argument. We do that in
# order not to break the old image and still be able to run.
try:
self._kernel_handler.init(self._chromeos_interface,
dev_key_path='/usr/share/vboot/devkeys',
internal_disk=True)
except TypeError:
# Copy the key to the current working directory.
shutil.copy('/usr/share/vboot/devkeys/kernel_data_key.vbprivk', '.')
self._kernel_handler.init(self._chromeos_interface,
internal_disk=True)
self._tpm_handler = tpm_handler.TpmHandler()
self._tpm_handler.init(self._chromeos_interface)
self._cgpt_handler = cgpt_handler.CgptHandler(self._chromeos_interface)
self._cgpt_state = cgpt_state.CgptState(
'SHORT', self._chromeos_interface, self._system_get_root_dev(),
self._cgpt_handler)
self._rootfs_handler = rootfs_handler.RootfsHandler()
self._rootfs_handler.init(self._chromeos_interface)
self._updater = firmware_updater.FirmwareUpdater(self._chromeos_interface)
# Initialize temporary directory path
self._temp_path = '/var/tmp/faft/autest'
self._keys_path = os.path.join(self._temp_path, 'keys')
self._work_path = os.path.join(self._temp_path, 'work')
def _dispatch(self, method, params):
"""This _dispatch method handles string conversion especially.
Since we turn off allow_dotted_names option. So any string conversion,
like str(FAFTClient.method), i.e. FAFTClient.method.__str__, failed
via XML RPC call.
"""
is_str = method.endswith('.__str__')
if is_str:
method = method.rsplit('.', 1)[0]
categories = ('system', 'bios', 'ec', 'kernel',
'tpm', 'cgpt', 'updater', 'rootfs')
try:
if method.split('.', 1)[0] in categories:
func = getattr(self, '_%s_%s' % (method.split('.', 1)[0],
method.split('.', 1)[1]))
else:
func = getattr(self, method)
except AttributeError:
raise Exception('method "%s" is not supported' % method)
if is_str:
return str(func)
else:
self._chromeos_interface.log('Dispatching method %s with args %s' %
(str(func), str(params)))
return func(*params)
def _system_is_available(self):
"""Function for polling the RPC server availability.
@return: Always True.
"""
return True
def _system_dump_log(self, remove_log=False):
"""Dump the log file.
@param remove_log: Remove the log file after dump.
@return: String of the log file content.
"""
log = open(self._log_file).read()
if remove_log:
os.remove(self._log_file)
return log
def _system_run_shell_command(self, command):
"""Run shell command.
@param command: A shell command to be run.
"""
self._chromeos_interface.run_shell_command(command)
def _system_run_shell_command_get_output(self, command):
"""Run shell command and get its console output.
@param command: A shell command to be run.
@return: A list of strings stripped of the newline characters.
"""
return self._chromeos_interface.run_shell_command_get_output(command)
def _system_software_reboot(self):
"""Request software reboot."""
self._chromeos_interface.run_shell_command('reboot')
def _system_get_platform_name(self):
"""Get the platform name of the current system.
@return: A string of the platform name.
"""
# 'mosys platform name' sometimes fails. Let's get the verbose output.
lines = self._chromeos_interface.run_shell_command_get_output(
'(mosys -vvv platform name 2>&1) || echo Failed')
if lines[-1].strip() == 'Failed':
raise Exception('Failed getting platform name: ' + '\n'.join(lines))
return lines[-1]
def _system_get_crossystem_value(self, key):
"""Get crossystem value of the requested key.
@param key: A crossystem key.
@return: A string of the requested crossystem value.
"""
return self._chromeos_interface.run_shell_command_get_output(
'crossystem %s' % key)[0]
def _system_get_root_dev(self):
"""Get the name of root device without partition number.
@return: A string of the root device without partition number.
"""
return self._chromeos_interface.get_root_dev()
def _system_get_root_part(self):
"""Get the name of root device with partition number.
@return: A string of the root device with partition number.
"""
return self._chromeos_interface.get_root_part()
def _system_set_try_fw_b(self):
"""Set 'Try Frimware B' flag in crossystem."""
self._chromeos_interface.cs.fwb_tries = 1
def _system_set_fw_try_next(self, next):
"""Set fw_try_next to A or B"""
self._chromeos_interface.cs.fw_try_next = next
def _system_get_fw_vboot2(self):
"""Get fw_vboot2"""
try:
return self._chromeos_interface.cs.fw_vboot2 == '1'
except chromeos_interface.ChromeOSInterfaceError:
return False
def _system_request_recovery_boot(self):
"""Request running in recovery mode on the restart."""
self._chromeos_interface.cs.request_recovery()
def _system_get_dev_boot_usb(self):
"""Get dev_boot_usb value which controls developer mode boot from USB.
@return: True if enable, False if disable.
"""
return self._chromeos_interface.cs.dev_boot_usb == '1'
def _system_set_dev_boot_usb(self, value):
"""Set dev_boot_usb value which controls developer mode boot from USB.
@param value: True to enable, False to disable.
"""
self._chromeos_interface.cs.dev_boot_usb = 1 if value else 0
def _system_is_removable_device_boot(self):
"""Check the current boot device is removable.
@return: True: if a removable device boots.
False: if a non-removable device boots.
"""
root_part = self._chromeos_interface.get_root_part()
return self._chromeos_interface.is_removable_device(root_part)
def _system_create_temp_dir(self, prefix='backup_'):
"""Create a temporary directory and return the path."""
return tempfile.mkdtemp(prefix=prefix)
def _bios_reload(self):
"""Reload the firmware image that may be changed."""
self._bios_handler.reload()
def _bios_get_gbb_flags(self):
"""Get the GBB flags.
@return: An integer of the GBB flags.
"""
return self._bios_handler.get_gbb_flags()
def _bios_get_preamble_flags(self, section):
"""Get the preamble flags of a firmware section.
@param section: A firmware section, either 'a' or 'b'.
@return: An integer of the preamble flags.
"""
return self._bios_handler.get_section_flags(section)
def _bios_set_preamble_flags(self, section, flags):
"""Set the preamble flags of a firmware section.
@param section: A firmware section, either 'a' or 'b'.
@param flags: An integer of preamble flags.
"""
version = self._bios_get_version(section)
self._bios_handler.set_section_version(section, version, flags,
write_through=True)
def _bios_get_body_sha(self, section):
"""Get SHA1 hash of BIOS RW firmware section.
@param section: A firmware section, either 'a' or 'b'.
@param flags: An integer of preamble flags.
"""
return self._bios_handler.get_section_sha(section)
def _bios_get_sig_sha(self, section):
"""Get SHA1 hash of firmware vblock in section."""
return self._bios_handler.get_section_sig_sha(section)
@allow_multiple_section_input
def _bios_corrupt_sig(self, section):
"""Corrupt the requested firmware section signature.
@param section: A firmware section, either 'a' or 'b'.
"""
self._bios_handler.corrupt_firmware(section)
@allow_multiple_section_input
def _bios_restore_sig(self, section):
"""Restore the previously corrupted firmware section signature.
@param section: A firmware section, either 'a' or 'b'.
"""
self._bios_handler.restore_firmware(section)
@allow_multiple_section_input
def _bios_corrupt_body(self, section):
"""Corrupt the requested firmware section body.
@param section: A firmware section, either 'a' or 'b'.
"""
self._bios_handler.corrupt_firmware_body(section)
@allow_multiple_section_input
def _bios_restore_body(self, section):
"""Restore the previously corrupted firmware section body.
@param section: A firmware section, either 'a' or 'b'.
"""
self._bios_handler.restore_firmware_body(section)
def __bios_modify_version(self, section, delta):
"""Modify firmware version for the requested section, by adding delta.
The passed in delta, a positive or a negative number, is added to the
original firmware version.
"""
original_version = self._bios_get_version(section)
new_version = original_version + delta
flags = self._bios_handler.get_section_flags(section)
self._chromeos_interface.log(
'Setting firmware section %s version from %d to %d' % (
section, original_version, new_version))
self._bios_handler.set_section_version(section, new_version, flags,
write_through=True)
@allow_multiple_section_input
def _bios_move_version_backward(self, section):
"""Decrement firmware version for the requested section."""
self.__bios_modify_version(section, -1)
@allow_multiple_section_input
def _bios_move_version_forward(self, section):
"""Increase firmware version for the requested section."""
self.__bios_modify_version(section, 1)
def _bios_get_version(self, section):
"""Retrieve firmware version of a section."""
return self._bios_handler.get_section_version(section)
def _bios_get_datakey_version(self, section):
"""Return firmware data key version."""
return self._bios_handler.get_section_datakey_version(section)
def _bios_get_kernel_subkey_version(self, section):
"""Return kernel subkey version."""
return self._bios_handler.get_section_kernel_subkey_version(section)
def _bios_setup_EC_image(self, ec_path):
"""Setup the new EC image for later update.
@param ec_path: The path of the EC image to be updated.
"""
self._ec_image = flashrom_handler.FlashromHandler()
self._ec_image.init(saft_flashrom_util,
self._chromeos_interface,
'ec_root_key.vpubk',
'/usr/share/vboot/devkeys',
'ec')
self._ec_image.new_image(ec_path)
def _bios_get_EC_image_sha(self):
"""Get SHA1 hash of RW firmware section of the EC autest image."""
return self._ec_image.get_section_sha('rw')
def _bios_update_EC_from_image(self, section, flags):
"""Update EC via software sync design.
It copys the RW section from the EC image, which is loaded by calling
bios_setup_EC_image(), to the EC area of the specified RW section on the
current AP firmware.
@param section: A firmware section on current BIOS, either 'a' or 'b'.
@param flags: An integer of preamble flags.
"""
blob = self._ec_image.get_section_body('rw')
self._bios_handler.set_section_ecbin(section, blob,
write_through=True)
self._bios_set_preamble_flags(section, flags)
def _bios_dump_whole(self, bios_path):
"""Dump the current BIOS firmware to a file, specified by bios_path.
@param bios_path: The path of the BIOS image to be written.
"""
self._bios_handler.dump_whole(bios_path)
def _bios_dump_rw(self, dir_path):
"""Dump the current BIOS firmware RW to dir_path.
VBOOTA, VBOOTB, FVMAIN, FVMAINB need to be dumped.
@param dir_path: The path of directory which contains files to be
written.
"""
if not os.path.isdir(dir_path):
raise Exception("%s doesn't exist" % dir_path)
VBOOTA_blob = self._bios_handler.get_section_sig('a')
VBOOTB_blob = self._bios_handler.get_section_sig('b')
FVMAIN_blob = self._bios_handler.get_section_body('a')
FVMAINB_blob = self._bios_handler.get_section_body('b')
open(os.path.join(dir_path, 'VBOOTA'), 'w').write(VBOOTA_blob)
open(os.path.join(dir_path, 'VBOOTB'), 'w').write(VBOOTB_blob)
open(os.path.join(dir_path, 'FVMAIN'), 'w').write(FVMAIN_blob)
open(os.path.join(dir_path, 'FVMAINB'), 'w').write(FVMAINB_blob)
def _bios_write_whole(self, bios_path):
"""Write the firmware from bios_path to the current system.
@param bios_path: The path of the source BIOS image.
"""
self._bios_handler.new_image(bios_path)
self._bios_handler.write_whole()
def _bios_write_rw(self, dir_path):
"""Write the firmware RW from dir_path to the current system.
VBOOTA, VBOOTB, FVMAIN, FVMAINB need to be written.
@param dir_path: The path of directory which contains the source files.
"""
if not os.path.exists(os.path.join(dir_path, 'VBOOTA')) or \
not os.path.exists(os.path.join(dir_path, 'VBOOTB')) or \
not os.path.exists(os.path.join(dir_path, 'FVMAIN')) or \
not os.path.exists(os.path.join(dir_path, 'FVMAINB')):
raise Exception("Source firmware file(s) doesn't exist.")
VBOOTA_blob = open(os.path.join(dir_path, 'VBOOTA'), 'rb').read()
VBOOTB_blob = open(os.path.join(dir_path, 'VBOOTB'), 'rb').read()
FVMAIN_blob = open(os.path.join(dir_path, 'FVMAIN'), 'rb').read()
FVMAINB_blob = open(os.path.join(dir_path, 'FVMAINB'), 'rb').read()
self._bios_handler.set_section_sig('a', VBOOTA_blob,
write_through=True)
self._bios_handler.set_section_sig('b', VBOOTB_blob,
write_through=True)
self._bios_handler.set_section_body('a', FVMAIN_blob,
write_through=True)
self._bios_handler.set_section_body('b', FVMAINB_blob,
write_through=True)
def _ec_get_version(self):
"""Get EC version via mosys.
@return: A string of the EC version.
"""
return self._chromeos_interface.run_shell_command_get_output(
'mosys ec info | sed "s/.*| //"')[0]
def _ec_get_firmware_sha(self):
"""Get SHA1 hash of EC RW firmware section."""
return self._ec_handler.get_section_sha('rw')
@allow_multiple_section_input
def _ec_corrupt_sig(self, section):
"""Corrupt the requested EC section signature.
@param section: A EC section, either 'a' or 'b'.
"""
self._ec_handler.corrupt_firmware(section, corrupt_all=True)
@allow_multiple_section_input
def _ec_restore_sig(self, section):
"""Restore the previously corrupted EC section signature.
@param section: An EC section, either 'a' or 'b'.
"""
self._ec_handler.restore_firmware(section, restore_all=True)
@allow_multiple_section_input
def _ec_corrupt_body(self, section):
"""Corrupt the requested EC section body.
@param section: An EC section, either 'a' or 'b'.
"""
self._ec_handler.corrupt_firmware_body(section, corrupt_all=True)
@allow_multiple_section_input
def _ec_restore_body(self, section):
"""Restore the previously corrupted EC section body.
@param section: An EC section, either 'a' or 'b'.
"""
self._ec_handler.restore_firmware_body(section, restore_all=True)
def _ec_dump_firmware(self, ec_path):
"""Dump the current EC firmware to a file, specified by ec_path.
@param ec_path: The path of the EC image to be written.
"""
self._ec_handler.dump_whole(ec_path)
def _ec_set_write_protect(self, enable):
"""Enable write protect of the EC flash chip.
@param enable: True if activating EC write protect. Otherwise, False.
"""
if enable:
self._ec_handler.enable_write_protect()
else:
self._ec_handler.disable_write_protect()
@allow_multiple_section_input
def _kernel_corrupt_sig(self, section):
"""Corrupt the requested kernel section.
@param section: A kernel section, either 'a' or 'b'.
"""
self._kernel_handler.corrupt_kernel(section)
@allow_multiple_section_input
def _kernel_restore_sig(self, section):
"""Restore the requested kernel section (previously corrupted).
@param section: A kernel section, either 'a' or 'b'.
"""
self._kernel_handler.restore_kernel(section)
def __kernel_modify_version(self, section, delta):
"""Modify kernel version for the requested section, by adding delta.
The passed in delta, a positive or a negative number, is added to the
original kernel version.
"""
original_version = self._kernel_handler.get_version(section)
new_version = original_version + delta
self._chromeos_interface.log(
'Setting kernel section %s version from %d to %d' % (
section, original_version, new_version))
self._kernel_handler.set_version(section, new_version)
@allow_multiple_section_input
def _kernel_move_version_backward(self, section):
"""Decrement kernel version for the requested section."""
self.__kernel_modify_version(section, -1)
@allow_multiple_section_input
def _kernel_move_version_forward(self, section):
"""Increase kernel version for the requested section."""
self.__kernel_modify_version(section, 1)
def _kernel_get_version(self, section):
"""Return kernel version."""
return self._kernel_handler.get_version(section)
def _kernel_get_datakey_version(self, section):
"""Return kernel datakey version."""
return self._kernel_handler.get_datakey_version(section)
def _kernel_diff_a_b(self):
"""Compare kernel A with B.
@return: True: if kernel A is different with B.
False: if kernel A is the same as B.
"""
rootdev = self._chromeos_interface.get_root_dev()
kernel_a = self._chromeos_interface.join_part(rootdev, '2')
kernel_b = self._chromeos_interface.join_part(rootdev, '4')
# The signature (some kind of hash) for the kernel body is stored in
# the beginning. So compare the first 64KB (including header, preamble,
# and signature) should be enough to check them identical.
header_a = self._chromeos_interface.read_partition(kernel_a, 0x10000)
header_b = self._chromeos_interface.read_partition(kernel_b, 0x10000)
return header_a != header_b
def _kernel_resign_with_keys(self, section, key_path=None):
"""Resign kernel with temporary key."""
self._kernel_handler.resign_kernel(section, key_path)
def _kernel_dump(self, section, kernel_path):
"""Dump the specified kernel to a file.
@param section: The kernel to dump. May be A or B.
@param kernel_path: The path to the kernel image to be written.
"""
self._kernel_handler.dump_kernel(section, kernel_path)
def _kernel_write(self, section, kernel_path):
"""Write a kernel image to the specified section.
@param section: The kernel to dump. May be A or B.
@param kernel_path: The path to the kernel image.
"""
self._kernel_handler.write_kernel(section, kernel_path)
def _kernel_get_sha(self, section):
"""Return the SHA1 hash of the specified kernel section."""
return self._kernel_handler.get_sha(section)
def _tpm_get_firmware_version(self):
"""Retrieve tpm firmware body version."""
return self._tpm_handler.get_fw_version()
def _tpm_get_firmware_datakey_version(self):
"""Retrieve tpm firmware data key version."""
return self._tpm_handler.get_fw_body_version()
def _cgpt_run_test_loop(self):
"""Run the CgptState test loop. The tst logic is handled in the client.
@return: 0: there are more cgpt tests to execute.
1: no more CgptState test, finished.
"""
return self._cgpt_state.test_loop()
def _cgpt_set_test_step(self, step):
"""Set the CgptState test step.
@param step: A test step number.
"""
self._cgpt_state.set_step(step)
def _cgpt_get_test_step(self):
"""Get the CgptState test step.
@return: A test step number.
"""
return self._cgpt_state.get_step()
def _cgpt_get_attributes(self):
"""Get kernel attributes."""
rootdev = self._system_get_root_dev()
self._cgpt_handler.read_device_info(rootdev)
return {'A': self._cgpt_handler.get_partition(rootdev, 'KERN-A'),
'B': self._cgpt_handler.get_partition(rootdev, 'KERN-B')}
def _cgpt_set_attributes(self, attributes):
"""Set kernel attributes."""
rootdev = self._system_get_root_dev()
allowed = ['priority', 'tries', 'successful']
for p in ('A', 'B'):
if p not in attributes:
continue
attr = dict()
for k in allowed:
if k in attributes[p]:
attr[k] = attributes[p][k]
if attr:
self._cgpt_handler.set_partition(rootdev, 'KERN-%s' % p, attr)
def _updater_setup(self, shellball=None):
"""Setup the updater.
@param shellball: Path of provided shellball. Use default shellball
if None,
"""
self._updater.setup(self._chromeos_interface, shellball)
def _updater_cleanup(self):
self._updater.cleanup_temp_dir()
def _updater_get_fwid(self):
"""Retrieve shellball's fwid.
This method should be called after updater_setup.
@return: Shellball's fwid.
"""
return self._updater.retrieve_fwid()
def _updater_resign_firmware(self, version):
"""Resign firmware with version.
@param version: new version number.
"""
self._updater.resign_firmware(version)
def _updater_repack_shellball(self, append):
"""Repack shellball with new fwid.
@param append: use for new fwid naming.
"""
self._updater.repack_shellball(append)
def _updater_run_autoupdate(self, append):
"""Run chromeos-firmwareupdate with autoupdate mode."""
options = ['--noupdate_ec', '--nocheck_rw_compatible']
self._updater.run_firmwareupdate(mode='autoupdate',
updater_append=append,
options=options)
def _updater_run_factory_install(self):
"""Run chromeos-firmwareupdate with factory_install mode."""
options = ['--noupdate_ec']
self._updater.run_firmwareupdate(mode='factory_install',
options=options)
def _updater_run_bootok(self, append):
"""Run chromeos-firmwareupdate with bootok mode."""
self._updater.run_firmwareupdate(mode='bootok',
updater_append=append)
def _updater_run_recovery(self):
"""Run chromeos-firmwareupdate with recovery mode."""
options = ['--noupdate_ec', '--nocheck_rw_compatible']
self._updater.run_firmwareupdate(mode='recovery',
options=options)
def _updater_get_temp_path(self):
"""Get updater's temp directory path."""
return self._updater.get_temp_path()
def _updater_get_keys_path(self):
"""Get updater's keys directory path."""
return self._updater.get_keys_path()
def _updater_get_work_path(self):
"""Get updater's work directory path."""
return self._updater.get_work_path()
def _rootfs_verify_rootfs(self, section):
"""Verifies the integrity of the root FS.
@param section: The rootfs to verify. May be A or B.
"""
return self._rootfs_handler.verify_rootfs(section)
def cleanup(self):
"""Cleanup for the RPC server. Currently nothing."""
pass