blob: 438c4c854232a34239b6776a4522829140a0286f [file] [log] [blame]
# Copyright 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import os
import pprint
import StringIO
import subprocess
import time
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import error, utils
from autotest_lib.client.common_lib.cros import cr50_utils, tpm_utils
from autotest_lib.server.cros import filesystem_util, gsutil_wrapper
from autotest_lib.server.cros.faft.firmware_test import FirmwareTest
class Cr50Test(FirmwareTest):
"""Base class that sets up helper objects/functions for cr50 tests."""
version = 1
RESPONSE_TIMEOUT = 180
GS_PRIVATE = 'gs://chromeos-localmirror-private/distfiles/*/'
GS_PUBLIC = 'gs://chromeos-localmirror/distfiles/'
CR50_DEBUG_FILE = '*/cr50_dbg_%s.bin%s'
CR50_PROD_FILE = 'cr50.r0.0.10.w%s%s.tbz2'
CR50_TOT_VER_FILE = 'tot/LATEST'
CR50_TOT_FILE = 'tot/cr50.bin.%s.%s'
NONE = 0
# Saved the original device state during init.
INITIAL_IMAGE_STATE = 1 << 0
# Saved the original image, the device image, and the debug image. These
# images are needed to be able to restore the original image and board id.
IMAGES = 1 << 1
PP_SHORT_INTERVAL = 3
# Cr50 may have flash operation errors during the test. Here's an example
# of one error message.
# do_flash_op:245 errors 20 fsh_pe_control 40720004
# The stuff after the ':' may change, but all flash operation errors
# contain do_flash_op. do_flash_op is only ever printed if there is an
# error during the flash operation. Just search for do_flash_op to simplify
# the search string and make it applicable to all flash op errors.
CR50_FLASH_OP_ERROR_MSG = 'do_flash_op'
def initialize(self, host, cmdline_args, full_args,
restore_cr50_state=False, cr50_dev_path='', provision_update=False):
self._saved_state = self.NONE
self._raise_error_on_mismatch = not restore_cr50_state
self._provision_update = provision_update
self.tot_test_run = full_args.get('tot_test_run', '').lower() == 'true'
super(Cr50Test, self).initialize(host, cmdline_args)
if not hasattr(self, 'cr50'):
raise error.TestNAError('Test can only be run on devices with '
'access to the Cr50 console')
logging.info('Test Args: %r', full_args)
self.can_set_ccd_level = (not self.servo.running_through_ccd() or
self.cr50.testlab_is_on())
self.original_ccd_level = self.cr50.get_ccd_level()
self.original_ccd_settings = self.cr50.get_cap_dict(
info=self.cr50.CAP_SETTING)
self.host = host
tpm_utils.ClearTPMOwnerRequest(self.host, wait_for_ready=True)
# Clear the FWMP, so it can't disable CCD.
self.clear_fwmp()
if self.can_set_ccd_level:
# Lock cr50 so the console will be restricted
self.cr50.set_ccd_level('lock')
elif self.original_ccd_level != 'lock':
raise error.TestNAError('Lock the console before running cr50 test')
self._save_original_state()
# Verify cr50 is still running the correct version
cr50_qual_version = full_args.get('cr50_qual_version', '').strip()
if cr50_qual_version:
_, running_rw, running_bid = self.get_saved_cr50_original_version()
expected_rw, expected_bid_sym = cr50_qual_version.split('/')
expected_bid = cr50_utils.GetBoardIdInfoString(expected_bid_sym,
symbolic=False)
logging.debug('Running %s %s Expect %s %s', running_rw, running_bid,
expected_rw, expected_bid)
if running_rw != expected_rw or expected_bid != running_bid:
raise error.TestError('Not running %s' % cr50_qual_version)
# We successfully saved the device state
self._saved_state |= self.INITIAL_IMAGE_STATE
try:
self._save_node_locked_dev_image(cr50_dev_path)
self._save_original_images(full_args.get('release_path', ''))
# We successfully saved the device images
self._saved_state |= self.IMAGES
except error.TestFail as e:
if restore_cr50_state:
if 'Could not find' in str(e):
raise error.TestNAError('Need DBG image to run test')
raise
except:
if restore_cr50_state:
raise
def after_run_once(self):
"""Log which iteration just ran"""
logging.info('successfully ran iteration %d', self.iteration)
self._try_to_bring_dut_up()
def _save_node_locked_dev_image(self, cr50_dev_path):
"""Save or download the node locked dev image.
@param cr50_dev_path: The path to the node locked cr50 image.
"""
# TODO(mruthven): remove once we get the new RO DBG images.
raise error.TestFail('Could not find DBG image for new RO')
if os.path.isfile(cr50_dev_path):
self._node_locked_cr50_image = cr50_dev_path
else:
devid = self.servo.get('cr50_devid')
self._node_locked_cr50_image = self.download_cr50_debug_image(
devid)[0]
def _save_original_images(self, release_path):
"""Use the saved state to find all of the device images.
This will download running cr50 image and the device image.
@param release_path: The release path given by test args
"""
# Copy the prod and prepvt images from the DUT
_, prod_rw, prod_bid = self._original_image_state['device_prod_ver']
filename = 'prod_device_image_' + prod_rw
self._device_prod_image = os.path.join(self.resultsdir, filename)
self.host.get_file(cr50_utils.CR50_PROD, self._device_prod_image)
if cr50_utils.HasPrepvtImage(self.host):
_, prepvt_rw, prepvt_bid = (
self._original_image_state['device_prepvt_ver'])
filename = 'prepvt_device_image_' + prepvt_rw
self._device_prepvt_image = os.path.join(self.resultsdir,
filename)
self.host.get_file(cr50_utils.CR50_PREPVT,
self._device_prepvt_image)
prepvt_bid = cr50_utils.GetBoardIdInfoString(prepvt_bid)
else:
self._device_prepvt_image = None
prepvt_rw = None
prepvt_bid = None
if os.path.isfile(release_path):
self._original_cr50_image = release_path
logging.info('using supplied image')
return
if self.tot_test_run:
self._original_cr50_image = self.download_cr50_tot_image()
return
# If the running cr50 image version matches the image on the DUT use
# the DUT image as the original image. If the versions don't match
# download the image from google storage
_, running_rw, running_bid = self.get_saved_cr50_original_version()
# Make sure prod_bid and running_bid are in the same format
prod_bid = cr50_utils.GetBoardIdInfoString(prod_bid)
running_bid = cr50_utils.GetBoardIdInfoString(running_bid)
if running_rw == prod_rw and running_bid == prod_bid:
logging.info('Using device cr50 prod image %s %s', prod_rw,
prod_bid)
self._original_cr50_image = self._device_prod_image
elif running_rw == prepvt_rw and running_bid == prepvt_bid:
logging.info('Using device cr50 prepvt image %s %s', prepvt_rw,
prepvt_bid)
self._original_cr50_image = self._device_prepvt_image
else:
logging.info('Downloading cr50 image %s %s', running_rw,
running_bid)
self._original_cr50_image = self.download_cr50_release_image(
running_rw, running_bid)[0]
def _save_original_state(self):
"""Save the cr50 related state.
Save the device's current cr50 version, cr50 board id, rlz, and image
at /opt/google/cr50/firmware/cr50.bin.prod. These will be used to
restore the state during cleanup.
"""
self._original_image_state = self.get_image_and_bid_state()
def get_saved_cr50_original_version(self):
"""Return (ro ver, rw ver, bid)."""
if ('running_ver' not in self._original_image_state or 'cr50_image_bid'
not in self._original_image_state):
raise error.TestError('No record of original cr50 image version')
return (self._original_image_state['running_ver'][0],
self._original_image_state['running_ver'][1],
self._original_image_state['cr50_image_bid'])
def get_saved_cr50_original_path(self):
"""Return the local path for the original cr50 image."""
if not hasattr(self, '_original_cr50_image'):
raise error.TestError('No record of original image')
return self._original_cr50_image
def has_saved_cr50_dev_path(self):
"""Returns true if we saved the node locked debug image."""
return hasattr(self, '_node_locked_cr50_image')
def get_saved_cr50_dev_path(self):
"""Return the local path for the cr50 dev image."""
if not self.has_saved_cr50_dev_path():
raise error.TestError('No record of debug image')
return self._node_locked_cr50_image
def _restore_original_image(self, chip_bid, chip_flags):
"""Restore the cr50 image and erase the state.
Make 3 attempts to update to the original image. Use a rollback from
the DBG image to erase the state that can only be erased by a DBG image.
Set the chip board id during rollback
@param chip_bid: the integer representation of chip board id or None if
the board id should be erased
@param chip_flags: the integer representation of chip board id flags or
None if the board id should be erased
"""
for i in range(3):
try:
# Update to the node-locked DBG image so we can erase all of
# the state we are trying to reset
self.cr50_update(self._node_locked_cr50_image)
# Rollback to the original cr50 image.
self.cr50_update(self._original_cr50_image, rollback=True,
chip_bid=chip_bid, chip_flags=chip_flags)
break
except Exception, e:
logging.warning('Failed to restore original image attempt %d: '
'%r', i, e)
def _restore_original_image_and_board_id(self):
"""Restore the original cr50 related device state."""
if not (self._saved_state & self.IMAGES):
logging.warning('Did not save the original images. Cannot restore '
'state')
return
# Remove the prepvt image if the test installed one.
if (not self._original_image_state['has_prepvt'] and
cr50_utils.HasPrepvtImage(self.host)):
self.host.run('rm %s' % cr50_utils.CR50_PREPVT)
# If rootfs verification has been disabled, copy the cr50 device image
# back onto the DUT.
if filesystem_util.is_rootfs_writable(self.host):
cr50_utils.InstallImage(self.host, self._device_prod_image,
cr50_utils.CR50_PROD)
# Install the prepvt image if there was one.
if self._device_prepvt_image:
cr50_utils.InstallImage(self.host, self._device_prepvt_image,
cr50_utils.CR50_PREPVT)
chip_bid_info = self._original_image_state['chip_bid']
bid_is_erased = chip_bid_info == cr50_utils.ERASED_CHIP_BID
chip_bid = None if bid_is_erased else chip_bid_info[0]
chip_flags = None if bid_is_erased else chip_bid_info[2]
# Update to the original image and erase the board id
self._restore_original_image(chip_bid, chip_flags)
# Set the RLZ code
cr50_utils.SetRLZ(self.host, self._original_image_state['rlz'])
# Verify everything is still the same
mismatch = self._check_original_image_state()
if mismatch:
raise error.TestError('Could not restore state: %s' % mismatch)
logging.info('Successfully restored the original cr50 state')
def get_image_and_bid_state(self):
"""Get a dict with the current device cr50 information.
The state dict will include the platform brand, rlz code, chip board id,
the running cr50 image version, the running cr50 image board id, and the
device cr50 image version.
"""
state = {}
state['mosys platform brand'] = self.host.run('mosys platform brand',
ignore_status=True).stdout.strip()
state['device_prod_ver'] = cr50_utils.GetBinVersion(self.host,
cr50_utils.CR50_PROD)
state['has_prepvt'] = cr50_utils.HasPrepvtImage(self.host)
if state['has_prepvt']:
state['device_prepvt_ver'] = cr50_utils.GetBinVersion(self.host,
cr50_utils.CR50_PREPVT)
else:
state['device_prepvt_ver'] = None
state['rlz'] = cr50_utils.GetRLZ(self.host)
state['chip_bid'] = cr50_utils.GetChipBoardId(self.host)
state['chip_bid_str'] = '%08x:%08x:%08x' % state['chip_bid']
state['running_ver'] = cr50_utils.GetRunningVersion(self.host)
state['cr50_image_bid'] = self.cr50.get_active_board_id_str()
logging.debug('Current Cr50 state:\n%s', pprint.pformat(state))
return state
def _check_original_image_state(self):
"""Compare the current cr50 state to the original state.
@return: A dictionary with the state that is wrong as the key and the
new and old state as the value
"""
if not (self._saved_state & self.INITIAL_IMAGE_STATE):
logging.warning('Did not save the original state. Cannot verify it '
'matches')
return
# Make sure the /var/cache/cr50* state is up to date.
cr50_utils.ClearUpdateStateAndReboot(self.host)
mismatch = {}
new_state = self.get_image_and_bid_state()
for k, new_val in new_state.iteritems():
original_val = self._original_image_state[k]
if new_val != original_val:
mismatch[k] = 'old: %s, new: %s' % (original_val, new_val)
if mismatch:
logging.warning('State Mismatch:\n%s', pprint.pformat(mismatch))
else:
logging.info('The device is in the original state')
return mismatch
def _reset_ccd_settings(self):
"""Reset the ccd lock and capability states."""
if not self.cr50.ccd_is_reset():
# Try to open cr50 and enable testlab mode if it isn't enabled.
try:
self.fast_open(True)
except:
# Even if we can't open cr50, do our best to reset the rest of
# the system state. Log a warning here.
logging.warning('Unable to Open cr50', exc_info=True)
self.cr50.send_command('ccd reset')
if not self.cr50.ccd_is_reset():
raise error.TestFail('Could not reset ccd')
current_settings = self.cr50.get_cap_dict(info=self.cr50.CAP_SETTING)
if self.original_ccd_settings != current_settings:
if not self.can_set_ccd_level:
raise error.TestError("CCD state has changed, but we can't "
"restore it")
self.fast_open(True)
self.cr50.set_caps(self.original_ccd_settings)
# First try using testlab open to open the device
if self.original_ccd_level == 'open':
self.fast_open(True)
elif self.original_ccd_level != self.cr50.get_ccd_level():
self.cr50.set_ccd_level(self.original_ccd_level)
def cleanup(self):
"""Attempt to cleanup the cr50 state. Then run firmware cleanup"""
try:
self._restore_cr50_state()
finally:
super(Cr50Test, self).cleanup()
# Check the logs captured during firmware_test cleanup for cr50 errors.
self._get_cr50_stats_from_uart_capture()
def _get_cr50_stats_from_uart_capture(self):
"""Check cr50 uart output for errors.
Use the logs captured during firmware_test cleanup to check for cr50
errors. Flash operation issues aren't obvious unless you check the logs.
All flash op errors print do_flash_op and it isn't printed during normal
operation. Open the cr50 uart file and count the number of times this is
printed. Log the number of errors.
"""
if not hasattr(self, 'cr50_uart_file'):
logging.info('There is not a cr50 uart file')
return
flash_error_count = 0
with open(self.cr50_uart_file, 'r') as f:
for line in f:
if self.CR50_FLASH_OP_ERROR_MSG in line:
flash_error_count += 1
# Log any flash operation errors.
logging.info('do_flash_op count: %d', flash_error_count)
def _try_to_bring_dut_up(self):
"""Try to quickly get the dut in a pingable state"""
logging.info('checking dut state')
self.servo.set_nocheck('cold_reset', 'off')
self.servo.set_nocheck('warm_reset', 'off')
time.sleep(self.cr50.SHORT_WAIT)
if not self.cr50.ap_is_on():
logging.info('Pressing power button to turn on AP')
self.servo.power_short_press()
end_time = time.time() + self.RESPONSE_TIMEOUT
while not self.host.ping_wait_up(
self.faft_config.delay_reboot_to_ping * 2):
if time.time() > end_time:
logging.warn('DUT is unresponsive after trying to bring it up')
return
self.servo.get_power_state_controller().reset()
logging.info('DUT did not respond. Resetting it.')
def _restore_cr50_state(self):
"""Restore cr50 state, so the device can be used for further testing"""
state_mismatch = self._check_original_image_state()
if state_mismatch and not self._provision_update:
self._restore_original_image_and_board_id()
if self._raise_error_on_mismatch:
raise error.TestError('Unexpected state mismatch during '
'cleanup %s' % state_mismatch)
# Reset the password as the first thing in cleanup. It is important that
# if some other part of cleanup fails, the password has at least been
# reset.
self.cr50.send_command('ccd testlab open')
self.cr50.send_command('rddkeepalive disable')
self.cr50.send_command('ccd reset')
self.cr50.send_command('wp follow_batt_pres atboot')
# Reboot cr50 if the console is accessible. This will reset most state.
if self.cr50.get_cap('GscFullConsole')[self.cr50.CAP_IS_ACCESSIBLE]:
self.cr50.reboot()
# Reenable servo v4 CCD
self.cr50.ccd_enable()
# reboot to normal mode if the device is in dev mode.
self.enter_mode_after_checking_tpm_state('normal')
tpm_utils.ClearTPMOwnerRequest(self.host, wait_for_ready=True)
self.clear_fwmp()
# Restore the ccd privilege level
self._reset_ccd_settings()
def find_cr50_gs_image(self, gsurl):
"""Find the cr50 gs image name.
@param gsurl: the cr50 image location
@return: a list of the gsutil bucket, filename or None if the file
can't be found
"""
try:
return utils.gs_ls(gsurl)[0].rsplit('/', 1)
except error.CmdError:
logging.info('%s does not exist', gsurl)
return None
def _extract_cr50_image(self, archive, fn):
"""Extract the filename from the given archive
Aargs:
archive: the archive location on the host
fn: the file to extract
Returns:
The location of the extracted file
"""
remote_dir = os.path.dirname(archive)
result = self.host.run('tar xfv %s -C %s' % (archive, remote_dir))
for line in result.stdout.splitlines():
if os.path.basename(line) == fn:
return os.path.join(remote_dir, line)
raise error.TestFail('%s was not extracted from %s' % (fn , archive))
def download_cr50_gs_file(self, gsurl, extract_fn):
"""Download and extract the file at gsurl.
@param gsurl: The gs url for the cr50 image
@param extract_fn: The name of the file to extract from the cr50 image
tarball. Don't extract anything if extract_fn is None.
@return: a tuple (local path, host path)
"""
file_info = self.find_cr50_gs_image(gsurl)
if not file_info:
raise error.TestFail('Could not find %s' % gsurl)
bucket, fn = file_info
remote_temp_dir = '/tmp/'
src = os.path.join(remote_temp_dir, fn)
dest = os.path.join(self.resultsdir, fn)
# Copy the image to the dut
gsutil_wrapper.copy_private_bucket(host=self.host,
bucket=bucket,
filename=fn,
destination=remote_temp_dir)
if extract_fn:
src = self._extract_cr50_image(src, extract_fn)
logging.info('extracted %s', src)
# Remove .tbz2 from the local path.
dest = os.path.splitext(dest)[0]
self.host.get_file(src, dest)
return dest, src
def download_cr50_gs_image(self, gsurl, extract_fn, image_bid):
"""Get the image from gs and save it in the autotest dir.
@param gsurl: The gs url for the cr50 image
@param extract_fn: The name of the file to extract from the cr50 image
tarball. Don't extract anything if extract_fn is None.
@param image_bid: the image symbolic board id
@return: A tuple with the local path and version
"""
dest, src = self.download_cr50_gs_file(gsurl, extract_fn)
ver = cr50_utils.GetBinVersion(self.host, src)
# Compare the image board id to the downloaded image to make sure we got
# the right file
downloaded_bid = cr50_utils.GetBoardIdInfoString(ver[2], symbolic=True)
if image_bid and image_bid != downloaded_bid:
raise error.TestError('Could not download image with matching '
'board id wanted %s got %s' % (image_bid,
downloaded_bid))
return dest, ver
def download_cr50_debug_image(self, devid, image_bid=''):
"""download the cr50 debug file.
Get the file with the matching devid and image board id info
@param devid: the cr50_devid string '${DEVID0} ${DEVID1}'
@param image_bid: the image board id info string or list
@return: A tuple with the debug image local path and version
"""
bid_ext = ''
# Add the image bid string to the filename
if image_bid:
image_bid = cr50_utils.GetBoardIdInfoString(image_bid,
symbolic=True)
bid_ext = '.' + image_bid.replace(':', '_')
gsurl = os.path.join(self.GS_PRIVATE,
(self.CR50_DEBUG_FILE % (devid.replace(' ', '_'), bid_ext)))
return self.download_cr50_gs_image(gsurl, None, image_bid)
def download_cr50_tot_image(self):
"""download the cr50 TOT image.
@return: the local path to the TOT image.
"""
# Get the TOT version information
ver_file = os.path.join(self.GS_PRIVATE, self.CR50_TOT_VER_FILE)
dut_ver_file = self.download_cr50_gs_file(ver_file, False)[1]
tot_ver = self.host.run('cat %s' % dut_ver_file).stdout.strip()
# Download the TOT image for the current board using the devid and TOT
# version information.
devid_ext = self.servo.get('cr50_devid').replace(' ', '_')
tot_file = self.CR50_TOT_FILE % (tot_ver, devid_ext)
gsurl = os.path.join(self.GS_PRIVATE, tot_file)
logging.info('using release image %s', tot_file)
return self.download_cr50_gs_image(gsurl, False, None)[0]
def _find_release_image_gsurl(self, fn):
"""Find the gs url for the release image"""
for gsbucket in [self.GS_PUBLIC, self.GS_PRIVATE]:
gsurl = os.path.join(gsbucket, fn)
if self.find_cr50_gs_image(gsurl):
return gsurl
raise error.TestFail('%s is not on google storage' % fn)
def download_cr50_release_image(self, image_rw, image_bid=''):
"""download the cr50 release file.
Get the file with the matching version and image board id info
@param image_rw: the rw version string
@param image_bid: the image board id info string or list
@return: A tuple with the release image local path and version
"""
bid_ext = ''
# Add the image bid string to the gsurl
if image_bid:
image_bid = cr50_utils.GetBoardIdInfoString(image_bid,
symbolic=True)
bid_ext = '_' + image_bid.replace(':', '_')
release_fn = self.CR50_PROD_FILE % (image_rw, bid_ext)
gsurl = self._find_release_image_gsurl(release_fn)
# Release images can be found using the rw version
# Download the image
dest, ver = self.download_cr50_gs_image(gsurl, 'cr50.bin.prod',
image_bid)
# Compare the rw version and board id info to make sure the right image
# was found
if image_rw != ver[1]:
raise error.TestError('Could not download image with matching '
'rw version')
return dest, ver
def _cr50_verify_update(self, expected_rw, expect_rollback):
"""Verify the expected version is running on cr50.
@param expected_rw: The RW version string we expect to be running
@param expect_rollback: True if cr50 should have rolled back during the
update
@raise TestFail: if there is any unexpected update state
"""
errors = []
running_rw = self.cr50.get_version()
if expected_rw != running_rw:
errors.append('running %s not %s' % (running_rw, expected_rw))
if expect_rollback != self.cr50.rolledback():
errors.append('%srollback detected' %
'no ' if expect_rollback else '')
if len(errors):
raise error.TestFail('cr50_update failed: %s' % ', '.join(errors))
logging.info('RUNNING %s after %s', expected_rw,
'rollback' if expect_rollback else 'update')
def _cr50_run_update(self, path):
"""Install the image at path onto cr50.
@param path: the location of the image to update to
@return: the rw version of the image
"""
tmp_dest = '/tmp/' + os.path.basename(path)
dest, image_ver = cr50_utils.InstallImage(self.host, path, tmp_dest)
cr50_utils.GSCTool(self.host, ['-a', dest])
return image_ver[1]
def cr50_update(self, path, rollback=False, erase_nvmem=False,
expect_rollback=False, chip_bid=None, chip_flags=None):
"""Attempt to update to the given image.
If rollback is True, we assume that cr50 is already running an image
that can rollback.
@param path: the location of the update image
@param rollback: True if we need to force cr50 to rollback to update to
the given image
@param erase_nvmem: True if we need to erase nvmem during rollback
@param expect_rollback: True if cr50 should rollback on its own
@param chip_bid: the integer representation of chip board id or None if
the board id should be erased during rollback
@param chip_flags: the integer representation of chip board id flags or
None if the board id should be erased during rollback
@raise TestFail: if the update failed
"""
original_rw = self.cr50.get_version()
# Cr50 is going to reject an update if it hasn't been up for more than
# 60 seconds. Wait until that passes before trying to run the update.
self.cr50.wait_until_update_is_allowed()
image_rw = self._cr50_run_update(path)
# Running the update may cause cr50 to reboot. Wait for that before
# sending more commands. The reboot should happen quickly. Wait a
# maximum of 10 seconds.
self.cr50.wait_for_reboot(timeout=10)
if erase_nvmem and rollback:
self.cr50.erase_nvmem()
if rollback:
self.cr50.rollback(chip_bid=chip_bid, chip_flags=chip_flags)
expected_rw = original_rw if expect_rollback else image_rw
# If we expect a rollback, the version should remain unchanged
self._cr50_verify_update(expected_rw, rollback or expect_rollback)
def ccd_open_from_ap(self):
"""Start the open process and press the power button."""
# Opening CCD requires power button presses. If those presses would
# power off the AP and prevent CCD open from completing, ignore them.
if self.faft_config.ec_forwards_short_pp_press:
self.stop_powerd()
self._ccd_open_last_len = 0
self._ccd_open_stdout = StringIO.StringIO()
ccd_open_cmd = utils.sh_escape('gsctool -a -o')
full_ssh_cmd = '%s "%s"' % (self.host.ssh_command(options='-tt'),
ccd_open_cmd)
# Start running the Cr50 Open process in the background.
self._ccd_open_job = utils.BgJob(full_ssh_cmd,
nickname='ccd_open',
stdout_tee=self._ccd_open_stdout,
stderr_tee=utils.TEE_TO_LOGS)
if self._ccd_open_job == None:
raise error.TestFail('could not start ccd open')
try:
# Wait for the first gsctool power button prompt before starting the
# open process.
logging.info(self._get_ccd_open_output())
# Cr50 starts out by requesting 5 quick presses then 4 longer
# power button presses. Run the quick presses without looking at the
# command output, because getting the output can take some time. For
# the presses that require a 1 minute wait check the output between
# presses, so we can catch errors
#
# run quick presses for 30 seconds. It may take a couple of seconds
# for open to start. 10 seconds should be enough. 30 is just used
# because it will definitely be enough, and this process takes 300
# seconds, so doing quick presses for 30 seconds won't matter.
end_time = time.time() + 30
while time.time() < end_time:
self.servo.power_short_press()
logging.info('short int power button press')
time.sleep(self.PP_SHORT_INTERVAL)
# Poll the output and press the power button for the longer presses.
utils.wait_for_value(self._check_open_and_press_power_button,
expected_value=True, timeout_sec=self.cr50.PP_LONG)
except Exception, e:
logging.info(e)
raise
finally:
self._close_ccd_open_job()
self._try_to_bring_dut_up()
logging.info(self.cr50.get_ccd_info())
def _check_open_and_press_power_button(self):
"""Check stdout and press the power button if prompted.
@return: True if the process is still running.
"""
logging.info(self._get_ccd_open_output())
self.servo.power_short_press()
logging.info('long int power button press')
# Give cr50 some time to complete the open process. After the last
# power button press cr50 erases nvmem and resets the dut before setting
# the state to open. Wait a bit so we don't check the ccd state in the
# middle of this reset process. Power button requests happen once a
# minute, so waiting 10 seconds isn't a big deal.
time.sleep(10)
return (self._ccd_open_job.sp.poll() is not None or 'Open' in
self.cr50.get_ccd_info()['State'])
def _get_ccd_open_output(self):
"""Read the new output."""
self._ccd_open_job.process_output()
self._ccd_open_stdout.seek(self._ccd_open_last_len)
output = self._ccd_open_stdout.read()
self._ccd_open_last_len = self._ccd_open_stdout.len
return output
def _close_ccd_open_job(self):
"""Terminate the process and check the results."""
exit_status = utils.nuke_subprocess(self._ccd_open_job.sp)
stdout = self._ccd_open_stdout.getvalue().strip()
delattr(self, '_ccd_open_job')
if stdout:
logging.info('stdout of ccd open:\n%s', stdout)
if exit_status:
logging.info('exit status: %d', exit_status)
if 'Error' in stdout:
raise error.TestFail('ccd open Error %s' %
stdout.split('Error')[-1])
if self.cr50.OPEN != self.cr50.get_ccd_level():
raise error.TestFail('unable to open cr50: %s' % stdout)
else:
logging.info('Opened Cr50')
def fast_open(self, enable_testlab=False):
"""Try to use testlab open. If that fails, do regular ap open.
@param enable_testlab: If True, enable testlab mode after cr50 is open.
"""
if not self.faft_config.has_powerbutton:
logging.warning('No power button', exc_info=True)
enable_testlab = False
# Try to use testlab open first, so we don't have to wait for the
# physical presence check.
self.cr50.send_command('ccd testlab open')
if self.cr50.get_ccd_level() == 'open':
return
if self.servo.has_control('chassis_open'):
self.servo.set('chassis_open','yes')
# Use the console to open cr50 without entering dev mode if possible. It
# takes longer and relies on more systems to enter dev mode and ssh into
# the AP. Skip the steps that aren't required.
if not self.cr50.get_cap('OpenNoDevMode')[self.cr50.CAP_IS_ACCESSIBLE]:
self.enter_mode_after_checking_tpm_state('dev')
if self.cr50.get_cap('OpenFromUSB')[self.cr50.CAP_IS_ACCESSIBLE]:
self.cr50.set_ccd_level(self.cr50.OPEN)
else:
self.ccd_open_from_ap()
if self.servo.has_control('chassis_open'):
self.servo.set('chassis_open','no')
if enable_testlab:
self.cr50.set_ccd_testlab('on')
# Make sure the device is in normal mode. After opening cr50, the TPM
# should be cleared and the device should automatically reset to normal
# mode. Just check to be consistent. It's possible capabilitiy settings
# are set to skip wiping the TPM.
self.enter_mode_after_checking_tpm_state('normal')
def run_gsctool_cmd_with_password(self, password, cmd, name, expect_error):
"""Run a gsctool command and input the password
@param password: The cr50 password string
@param cmd: The gsctool command
@param name: The name to give the job
@param expect_error: True if the command should fail
"""
set_pwd_cmd = utils.sh_escape(cmd)
full_ssh_command = '%s "%s"' % (self.host.ssh_command(options='-tt'),
set_pwd_cmd)
stdout = StringIO.StringIO()
# Start running the gsctool Command in the background.
gsctool_job = utils.BgJob(full_ssh_command,
nickname='%s_with_password' % name,
stdout_tee=stdout,
stderr_tee=utils.TEE_TO_LOGS,
stdin=subprocess.PIPE)
if gsctool_job == None:
raise error.TestFail('could not start gsctool command %r', cmd)
try:
# Wait for enter prompt
gsctool_job.process_output()
logging.info(stdout.getvalue().strip())
# Enter the password
gsctool_job.sp.stdin.write(password + '\n')
# Wait for re-enter prompt
gsctool_job.process_output()
logging.info(stdout.getvalue().strip())
# Re-enter the password
gsctool_job.sp.stdin.write(password + '\n')
time.sleep(self.cr50.CONSERVATIVE_CCD_WAIT)
gsctool_job.process_output()
finally:
exit_status = utils.nuke_subprocess(gsctool_job.sp)
output = stdout.getvalue().strip()
logging.info('%s stdout: %s', name, output)
logging.info('%s exit status: %s', name, exit_status)
if exit_status:
message = ('gsctool %s failed using %r: %s %s' %
(name, password, exit_status, output))
if expect_error:
logging.info(message)
else:
raise error.TestFail(message)
elif expect_error:
raise error.TestFail('%s with %r did not fail when expected' %
(name, password))
def set_ccd_password(self, password, expect_error=False):
"""Set the ccd password"""
# Testlab mode can't be enabled if there is no power button, so we
# shouldn't allow setting the password.
if not self.faft_config.has_powerbutton:
raise error.TestError('No power button')
# If for some reason the test sets a password and is interrupted before
# we can clear it, we want testlab mode to be enabled, so it's possible
# to clear the password without knowing it.
if not self.cr50.testlab_is_on():
raise error.TestError('Will not set password unless testlab mode '
'is enabled.')
self.run_gsctool_cmd_with_password(
password, 'gsctool -a -P', 'set_password', expect_error)
def ccd_unlock_from_ap(self, password=None, expect_error=False):
"""Unlock cr50"""
if not password:
self.host.run('gsctool -a -U')
return
self.run_gsctool_cmd_with_password(
password, 'gsctool -a -U', 'unlock', expect_error)
def enter_mode_after_checking_tpm_state(self, mode):
"""Reboot to mode if cr50 doesn't already match the state"""
# If the device is already in the correct mode, don't do anything
if (mode == 'dev') == self.cr50.in_dev_mode():
logging.info('already in %r mode', mode)
return
self.switcher.reboot_to_mode(to_mode=mode)
if (mode == 'dev') != self.cr50.in_dev_mode():
raise error.TestError('Unable to enter %r mode' % mode)
def tpm_is_responsive(self):
"""Check TPM responsiveness by running tpm_version."""
result = self.host.run('tpm_version', ignore_status=True)
logging.debug(result.stdout.strip())
return not result.exit_status