| # Copyright 2017 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import logging |
| import os |
| import pprint |
| import StringIO |
| import subprocess |
| import time |
| |
| from autotest_lib.client.bin import utils |
| from autotest_lib.client.common_lib import error, utils |
| from autotest_lib.client.common_lib.cros import cr50_utils, tpm_utils |
| from autotest_lib.server.cros import debugd_dev_tools, gsutil_wrapper |
| from autotest_lib.server.cros.faft.firmware_test import FirmwareTest |
| |
| |
| class Cr50Test(FirmwareTest): |
| """Base class that sets up helper objects/functions for cr50 tests.""" |
| version = 1 |
| |
| CR50_GS_URL = 'gs://chromeos-localmirror-private/distfiles/chromeos-cr50-%s/' |
| CR50_DEBUG_FILE = '*/cr50_dbg_%s.bin' |
| CR50_PROD_FILE = 'cr50.%s.bin.prod' |
| NONE = 0 |
| # Saved the original device state during init. |
| INITIAL_STATE = 1 << 0 |
| # Saved the original image, the device image, and the debug image. These |
| # images are needed to be able to restore the original image and board id. |
| IMAGES = 1 << 1 |
| PP_SHORT_INTERVAL = 3 |
| CLEARED_FWMP_EXIT_STATUS = 1 |
| CLEARED_FWMP_ERROR_MSG = ('CRYPTOHOME_ERROR_FIRMWARE_MANAGEMENT_PARAMETERS' |
| '_INVALID') |
| |
| def initialize(self, host, cmdline_args, full_args, |
| restore_cr50_state=False, cr50_dev_path='', provision_update=False): |
| self._saved_state = self.NONE |
| self._raise_error_on_mismatch = not restore_cr50_state |
| self._provision_update = provision_update |
| super(Cr50Test, self).initialize(host, cmdline_args) |
| |
| if not hasattr(self, 'cr50'): |
| raise error.TestNAError('Test can only be run on devices with ' |
| 'access to the Cr50 console') |
| |
| logging.info('Test Args: %r', full_args) |
| self.ccd_lockout = full_args.get('ccd_lockout', '').lower() == 'true' |
| logging.info('ccd is%s locked out', '' if self.ccd_lockout else ' not') |
| |
| self.can_set_ccd_level = (not self.cr50.using_ccd() or |
| self.cr50.testlab_is_on()) and not self.ccd_lockout |
| self.original_ccd_level = self.cr50.get_ccd_level() |
| self.original_ccd_settings = self.cr50.get_cap_dict( |
| info=self.cr50.CAP_SETTING) |
| |
| self.host = host |
| tpm_utils.ClearTPMOwnerRequest(self.host, wait_for_ready=True) |
| # Clear the FWMP, so it can't disable CCD. |
| self.clear_fwmp() |
| |
| if self.can_set_ccd_level: |
| # Lock cr50 so the console will be restricted |
| self.cr50.set_ccd_level('lock') |
| elif self.original_ccd_level != 'lock': |
| raise error.TestNAError('Lock the console before running cr50 test') |
| |
| self._save_original_state() |
| # We successfully saved the device state |
| self._saved_state |= self.INITIAL_STATE |
| try: |
| self._save_node_locked_dev_image(cr50_dev_path) |
| self._save_original_images(full_args.get('release_path', '')) |
| # We successfully saved the device images |
| self._saved_state |= self.IMAGES |
| except: |
| if restore_cr50_state: |
| raise |
| |
| |
| def after_run_once(self): |
| """Log which iteration just ran""" |
| logging.info('successfully ran iteration %d', self.iteration) |
| |
| |
| def _save_node_locked_dev_image(self, cr50_dev_path): |
| """Save or download the node locked dev image. |
| |
| Args: |
| cr50_dev_path: The path to the node locked cr50 image. |
| """ |
| if os.path.isfile(cr50_dev_path): |
| self._node_locked_cr50_image = cr50_dev_path |
| else: |
| devid = self.servo.get('cr50_devid') |
| self._node_locked_cr50_image = self.download_cr50_debug_image( |
| devid)[0] |
| |
| |
| def _save_original_images(self, release_path): |
| """Use the saved state to find all of the device images. |
| |
| This will download running cr50 image and the device image. |
| |
| Args: |
| release_path: The release path given by test args |
| """ |
| # Copy the prod and prepvt images from the DUT |
| _, prod_rw, prod_bid = self._original_state['device_prod_ver'] |
| filename = 'prod_device_image_' + prod_rw |
| self._device_prod_image = os.path.join(self.resultsdir, |
| filename) |
| self.host.get_file(cr50_utils.CR50_PROD, |
| self._device_prod_image) |
| |
| if cr50_utils.HasPrepvtImage(self.host): |
| _, prepvt_rw, prepvt_bid = self._original_state['device_prepvt_ver'] |
| filename = 'prepvt_device_image_' + prepvt_rw |
| self._device_prepvt_image = os.path.join(self.resultsdir, |
| filename) |
| self.host.get_file(cr50_utils.CR50_PREPVT, |
| self._device_prepvt_image) |
| prepvt_bid = cr50_utils.GetBoardIdInfoString(prepvt_bid) |
| else: |
| self._device_prepvt_image = None |
| prepvt_rw = None |
| prepvt_bid = None |
| |
| if os.path.isfile(release_path): |
| self._original_cr50_image = release_path |
| logging.info('using supplied image') |
| return |
| # If the running cr50 image version matches the image on the DUT use |
| # the DUT image as the original image. If the versions don't match |
| # download the image from google storage |
| _, running_rw, running_bid = self.get_saved_cr50_original_version() |
| |
| # Make sure prod_bid and running_bid are in the same format |
| prod_bid = cr50_utils.GetBoardIdInfoString(prod_bid) |
| running_bid = cr50_utils.GetBoardIdInfoString(running_bid) |
| if running_rw == prod_rw and running_bid == prod_bid: |
| logging.info('Using device cr50 prod image %s %s', prod_rw, |
| prod_bid) |
| self._original_cr50_image = self._device_prod_image |
| elif running_rw == prepvt_rw and running_bid == prepvt_bid: |
| logging.info('Using device cr50 prepvt image %s %s', prepvt_rw, |
| prepvt_bid) |
| self._original_cr50_image = self._device_prepvt_image |
| else: |
| logging.info('Downloading cr50 image %s %s', running_rw, |
| running_bid) |
| self._original_cr50_image = self.download_cr50_release_image( |
| running_rw, running_bid)[0] |
| |
| |
| def _save_original_state(self): |
| """Save the cr50 related state. |
| |
| Save the device's current cr50 version, cr50 board id, rlz, and image |
| at /opt/google/cr50/firmware/cr50.bin.prod. These will be used to |
| restore the state during cleanup. |
| """ |
| self._original_state = self.get_cr50_device_state() |
| |
| |
| def get_saved_cr50_original_version(self): |
| """Return (ro ver, rw ver, bid).""" |
| if ('running_ver' not in self._original_state or 'cr50_image_bid' not in |
| self._original_state): |
| raise error.TestError('No record of original cr50 image version') |
| return (self._original_state['running_ver'][0], |
| self._original_state['running_ver'][1], |
| self._original_state['cr50_image_bid']) |
| |
| |
| def get_saved_cr50_original_path(self): |
| """Return the local path for the original cr50 image.""" |
| if not hasattr(self, '_original_cr50_image'): |
| raise error.TestError('No record of original image') |
| return self._original_cr50_image |
| |
| |
| def has_saved_cr50_dev_path(self): |
| """Returns true if we saved the node locked debug image.""" |
| return hasattr(self, '_node_locked_cr50_image') |
| |
| |
| def get_saved_cr50_dev_path(self): |
| """Return the local path for the cr50 dev image.""" |
| if not self.has_saved_cr50_dev_path(): |
| raise error.TestError('No record of debug image') |
| return self._node_locked_cr50_image |
| |
| |
| def _restore_original_image(self, chip_bid, chip_flags): |
| """Restore the cr50 image and erase the state. |
| |
| Make 3 attempts to update to the original image. Use a rollback from |
| the DBG image to erase the state that can only be erased by a DBG image. |
| Set the chip board id during rollback |
| |
| Args: |
| chip_bid: the integer representation of chip board id or None if the |
| board id should be erased |
| chip_flags: the integer representation of chip board id flags or |
| None if the board id should be erased |
| """ |
| for i in range(3): |
| try: |
| # Update to the node-locked DBG image so we can erase all of |
| # the state we are trying to reset |
| self.cr50_update(self._node_locked_cr50_image) |
| |
| # Rollback to the original cr50 image. |
| self.cr50_update(self._original_cr50_image, rollback=True, |
| chip_bid=chip_bid, chip_flags=chip_flags) |
| break |
| except Exception, e: |
| logging.warning('Failed to restore original image attempt %d: ' |
| '%r', i, e) |
| |
| |
| def rootfs_verification_disable(self): |
| """Remove rootfs verification.""" |
| if not self._rootfs_verification_is_disabled(): |
| logging.debug('Removing rootfs verification.') |
| self.rootfs_tool.enable() |
| |
| |
| def _rootfs_verification_is_disabled(self): |
| """Returns true if rootfs verification is enabled.""" |
| # Clear the TPM owner before trying to check rootfs verification |
| tpm_utils.ClearTPMOwnerRequest(self.host, wait_for_ready=True) |
| self.rootfs_tool = debugd_dev_tools.RootfsVerificationTool() |
| self.rootfs_tool.initialize(self.host) |
| # rootfs_tool.is_enabled is True, that means rootfs verification is |
| # disabled. |
| return self.rootfs_tool.is_enabled() |
| |
| |
| def _restore_original_state(self): |
| """Restore the original cr50 related device state.""" |
| if not (self._saved_state & self.IMAGES): |
| logging.warning('Did not save the original images. Cannot restore ' |
| 'state') |
| return |
| # Remove the prepvt image if the test installed one. |
| if (not self._original_state['has_prepvt'] and |
| cr50_utils.HasPrepvtImage(self.host)): |
| self.host.run('rm %s' % cr50_utils.CR50_PREPVT) |
| # If rootfs verification has been disabled, copy the cr50 device image |
| # back onto the DUT. |
| if self._rootfs_verification_is_disabled(): |
| cr50_utils.InstallImage(self.host, self._device_prod_image, |
| cr50_utils.CR50_PROD) |
| # Install the prepvt image if there was one. |
| if self._device_prepvt_image: |
| cr50_utils.InstallImage(self.host, self._device_prepvt_image, |
| cr50_utils.CR50_PREPVT) |
| |
| chip_bid_info = self._original_state['chip_bid'] |
| bid_is_erased = chip_bid_info == cr50_utils.ERASED_CHIP_BID |
| chip_bid = None if bid_is_erased else chip_bid_info[0] |
| chip_flags = None if bid_is_erased else chip_bid_info[2] |
| # Update to the original image and erase the board id |
| self._restore_original_image(chip_bid, chip_flags) |
| |
| # Set the RLZ code |
| cr50_utils.SetRLZ(self.host, self._original_state['rlz']) |
| |
| # Verify everything is still the same |
| mismatch = self._check_original_state() |
| if mismatch: |
| raise error.TestError('Could not restore state: %s' % mismatch) |
| |
| logging.info('Successfully restored the original cr50 state') |
| |
| |
| def get_cr50_device_state(self): |
| """Get a dict with the current device cr50 information. |
| |
| The state dict will include the platform brand, rlz code, chip board id, |
| the running cr50 image version, the running cr50 image board id, and the |
| device cr50 image version. |
| """ |
| state = {} |
| state['mosys platform brand'] = self.host.run('mosys platform brand', |
| ignore_status=True).stdout.strip() |
| state['device_prod_ver'] = cr50_utils.GetBinVersion(self.host, |
| cr50_utils.CR50_PROD) |
| state['has_prepvt'] = cr50_utils.HasPrepvtImage(self.host) |
| if state['has_prepvt']: |
| state['device_prepvt_ver'] = cr50_utils.GetBinVersion(self.host, |
| cr50_utils.CR50_PREPVT) |
| else: |
| state['device_prepvt_ver'] = None |
| state['rlz'] = cr50_utils.GetRLZ(self.host) |
| state['chip_bid'] = cr50_utils.GetChipBoardId(self.host) |
| state['chip_bid_str'] = '%08x:%08x:%08x' % state['chip_bid'] |
| state['running_ver'] = cr50_utils.GetRunningVersion(self.host) |
| state['cr50_image_bid'] = self.cr50.get_active_board_id_str() |
| |
| logging.debug('Current Cr50 state:\n%s', pprint.pformat(state)) |
| return state |
| |
| |
| def _check_original_state(self): |
| """Compare the current cr50 state to the original state. |
| |
| Returns: |
| A dictionary with the state that is wrong as the key and |
| the new and old state as the value |
| """ |
| if not (self._saved_state & self.INITIAL_STATE): |
| logging.warning('Did not save the original state. Cannot verify it ' |
| 'matches') |
| return |
| # Make sure the /var/cache/cr50* state is up to date. |
| cr50_utils.ClearUpdateStateAndReboot(self.host) |
| |
| mismatch = {} |
| new_state = self.get_cr50_device_state() |
| |
| for k, new_val in new_state.iteritems(): |
| original_val = self._original_state[k] |
| if new_val != original_val: |
| mismatch[k] = 'old: %s, new: %s' % (original_val, new_val) |
| |
| if mismatch: |
| logging.warning('State Mismatch:\n%s', pprint.pformat(mismatch)) |
| else: |
| logging.info('The device is in the original state') |
| return mismatch |
| |
| |
| def _reset_ccd_settings(self): |
| """Reset the ccd lock and capability states.""" |
| # Clear the password if one was set. |
| if self.cr50.get_ccd_info()['Password'] != 'none': |
| self.servo.set_nocheck('cr50_testlab', 'open') |
| self.cr50.send_command('ccd reset') |
| if self.cr50.get_ccd_info()['Password'] != 'none': |
| raise error.TestFail('Could not clear password') |
| |
| current_settings = self.cr50.get_cap_dict(info=self.cr50.CAP_SETTING) |
| if self.original_ccd_settings != current_settings: |
| if not self.can_set_ccd_level: |
| raise error.TestError("CCD state has changed, but we can't " |
| "restore it") |
| self.fast_open(True) |
| self.cr50.set_caps(self.original_ccd_settings) |
| |
| # First try using testlab open to open the device |
| if self.cr50.testlab_is_on() and self.original_ccd_level == 'open': |
| self.servo.set_nocheck('cr50_testlab', 'open') |
| if self.original_ccd_level != self.cr50.get_ccd_level(): |
| self.cr50.set_ccd_level(self.original_ccd_level) |
| |
| |
| |
| def cleanup(self): |
| """Make sure the device state is the same as the start of the test""" |
| # Reset the password as the first thing in cleanup. It is important that |
| # if some other part of cleanup fails, the password has at least been |
| # reset. |
| self.cr50.send_command('ccd testlab open') |
| self.cr50.send_command('ccd reset') |
| |
| # reboot to normal mode if the device is in dev mode. |
| self.enter_mode_after_checking_tpm_state('normal') |
| |
| tpm_utils.ClearTPMOwnerRequest(self.host, wait_for_ready=True) |
| self.clear_fwmp() |
| |
| state_mismatch = self._check_original_state() |
| if state_mismatch and not self._provision_update: |
| self._restore_original_state() |
| if self._raise_error_on_mismatch: |
| raise error.TestError('Unexpected state mismatch during ' |
| 'cleanup %s' % state_mismatch) |
| |
| # Restore the ccd privilege level |
| if hasattr(self, 'original_ccd_level'): |
| self._reset_ccd_settings() |
| |
| super(Cr50Test, self).cleanup() |
| |
| |
| def find_cr50_gs_image(self, filename, image_type=None): |
| """Find the cr50 gs image name. |
| |
| Args: |
| filename: the cr50 filename to match to |
| image_type: release or debug. If it is not specified we will search |
| both the release and debug directories |
| Returns: |
| a tuple of the gsutil bucket, filename |
| """ |
| gs_url = self.CR50_GS_URL % (image_type if image_type else '*') |
| gs_filename = os.path.join(gs_url, filename) |
| bucket, gs_filename = utils.gs_ls(gs_filename)[0].rsplit('/', 1) |
| return bucket, gs_filename |
| |
| |
| def download_cr50_gs_image(self, filename, image_bid='', bucket=None, |
| image_type=None): |
| """Get the image from gs and save it in the autotest dir. |
| |
| Args: |
| filename: The cr50 image basename |
| image_bid: the board id info list or string. It will be added to the |
| filename. |
| bucket: The gs bucket name |
| image_type: 'debug' or 'release'. This will be used to determine |
| the bucket if the bucket is not given. |
| Returns: |
| A tuple with the local path and version |
| """ |
| # Add the image bid string to the filename |
| if image_bid: |
| bid_str = cr50_utils.GetBoardIdInfoString(image_bid, |
| symbolic=True) |
| filename += '.' + bid_str.replace(':', '_') |
| |
| if not bucket: |
| bucket, filename = self.find_cr50_gs_image(filename, image_type) |
| |
| remote_temp_dir = '/tmp/' |
| src = os.path.join(remote_temp_dir, filename) |
| dest = os.path.join(self.resultsdir, filename) |
| |
| # Copy the image to the dut |
| gsutil_wrapper.copy_private_bucket(host=self.host, |
| bucket=bucket, |
| filename=filename, |
| destination=remote_temp_dir) |
| |
| self.host.get_file(src, dest) |
| ver = cr50_utils.GetBinVersion(self.host, src) |
| |
| # Compare the image board id to the downloaded image to make sure we got |
| # the right file |
| downloaded_bid = cr50_utils.GetBoardIdInfoString(ver[2], symbolic=True) |
| if image_bid and bid_str != downloaded_bid: |
| raise error.TestError('Could not download image with matching ' |
| 'board id wanted %s got %s' % (bid_str, |
| downloaded_bid)) |
| return dest, ver |
| |
| |
| def download_cr50_debug_image(self, devid, image_bid=''): |
| """download the cr50 debug file. |
| |
| Get the file with the matching devid and image board id info |
| |
| Args: |
| devid: the cr50_devid string '${DEVID0} ${DEVID1}' |
| image_bid: the image board id info string or list |
| Returns: |
| A tuple with the debug image local path and version |
| """ |
| # Debug images are node locked with the devid. Add the devid to the |
| # filename |
| filename = self.CR50_DEBUG_FILE % (devid.replace(' ', '_')) |
| |
| # Download the image |
| dest, ver = self.download_cr50_gs_image(filename, image_bid=image_bid, |
| image_type='debug') |
| |
| return dest, ver |
| |
| |
| def download_cr50_release_image(self, image_rw, image_bid=''): |
| """download the cr50 release file. |
| |
| Get the file with the matching version and image board id info |
| |
| Args: |
| image_rw: the rw version string |
| image_bid: the image board id info string or list |
| Returns: |
| A tuple with the release image local path and version |
| """ |
| # Release images can be found using the rw version |
| filename = self.CR50_PROD_FILE % image_rw |
| |
| # Download the image |
| dest, ver = self.download_cr50_gs_image(filename, image_bid=image_bid, |
| image_type='release') |
| |
| # Compare the rw version and board id info to make sure the right image |
| # was found |
| if image_rw != ver[1]: |
| raise error.TestError('Could not download image with matching ' |
| 'rw version') |
| return dest, ver |
| |
| |
| def _cr50_verify_update(self, expected_rw, expect_rollback): |
| """Verify the expected version is running on cr50. |
| |
| Args: |
| expected_rw: The RW version string we expect to be running |
| expect_rollback: True if cr50 should have rolled back during the |
| update |
| |
| Raises: |
| TestFail if there is any unexpected update state |
| """ |
| errors = [] |
| running_rw = self.cr50.get_version() |
| if expected_rw != running_rw: |
| errors.append('running %s not %s' % (running_rw, expected_rw)) |
| |
| if expect_rollback != self.cr50.rolledback(): |
| errors.append('%srollback detected' % |
| 'no ' if expect_rollback else '') |
| if len(errors): |
| raise error.TestFail('cr50_update failed: %s' % ', '.join(errors)) |
| logging.info('RUNNING %s after %s', expected_rw, |
| 'rollback' if expect_rollback else 'update') |
| |
| |
| def _cr50_run_update(self, path): |
| """Install the image at path onto cr50. |
| |
| Args: |
| path: the location of the image to update to |
| |
| Returns: |
| the rw version of the image |
| """ |
| tmp_dest = '/tmp/' + os.path.basename(path) |
| |
| dest, image_ver = cr50_utils.InstallImage(self.host, path, tmp_dest) |
| cr50_utils.GSCTool(self.host, ['-a', dest]) |
| return image_ver[1] |
| |
| |
| def cr50_update(self, path, rollback=False, erase_nvmem=False, |
| expect_rollback=False, chip_bid=None, chip_flags=None): |
| """Attempt to update to the given image. |
| |
| If rollback is True, we assume that cr50 is already running an image |
| that can rollback. |
| |
| Args: |
| path: the location of the update image |
| rollback: True if we need to force cr50 to rollback to update to |
| the given image |
| erase_nvmem: True if we need to erase nvmem during rollback |
| expect_rollback: True if cr50 should rollback on its own |
| chip_bid: the integer representation of chip board id or None if the |
| board id should be erased during rollback |
| chip_flags: the integer representation of chip board id flags or |
| None if the board id should be erased during rollback |
| |
| Raises: |
| TestFail if the update failed |
| """ |
| original_rw = self.cr50.get_version() |
| |
| # Cr50 is going to reject an update if it hasn't been up for more than |
| # 60 seconds. Wait until that passes before trying to run the update. |
| self.cr50.wait_until_update_is_allowed() |
| |
| image_rw = self._cr50_run_update(path) |
| |
| # Running the update may cause cr50 to reboot. Wait for that before |
| # sending more commands. The reboot should happen quickly. Wait a |
| # maximum of 10 seconds. |
| self.cr50.wait_for_reboot(10) |
| |
| if erase_nvmem and rollback: |
| self.cr50.erase_nvmem() |
| |
| if rollback: |
| self.cr50.rollback(chip_bid=chip_bid, chip_flags=chip_flags) |
| |
| expected_rw = original_rw if expect_rollback else image_rw |
| # If we expect a rollback, the version should remain unchanged |
| self._cr50_verify_update(expected_rw, rollback or expect_rollback) |
| |
| |
| def ccd_open_from_ap(self): |
| """Start the open process and press the power button.""" |
| self._ccd_open_last_len = 0 |
| |
| self._ccd_open_stdout = StringIO.StringIO() |
| |
| ccd_open_cmd = utils.sh_escape('gsctool -a -o') |
| full_ssh_cmd = '%s "%s"' % (self.host.ssh_command(options='-tt'), |
| ccd_open_cmd) |
| # Start running the Cr50 Open process in the background. |
| self._ccd_open_job = utils.BgJob(full_ssh_cmd, |
| nickname='ccd_open', |
| stdout_tee=self._ccd_open_stdout, |
| stderr_tee=utils.TEE_TO_LOGS) |
| |
| if self._ccd_open_job == None: |
| raise error.TestFail('could not start ccd open') |
| |
| try: |
| # Wait for the first gsctool power button prompt before starting the |
| # open process. |
| logging.info(self._get_ccd_open_output()) |
| # Cr50 starts out by requesting 5 quick presses then 4 longer |
| # power button presses. Run the quick presses without looking at the |
| # command output, because getting the output can take some time. For |
| # the presses that require a 1 minute wait check the output between |
| # presses, so we can catch errors |
| # |
| # run quick presses for 30 seconds. It may take a couple of seconds |
| # for open to start. 10 seconds should be enough. 30 is just used |
| # because it will definitely be enough, and this process takes 300 |
| # seconds, so doing quick presses for 30 seconds won't matter. |
| end_time = time.time() + 30 |
| while time.time() < end_time: |
| self.servo.power_short_press() |
| logging.info('short int power button press') |
| time.sleep(self.PP_SHORT_INTERVAL) |
| # Poll the output and press the power button for the longer presses. |
| utils.wait_for_value(self._check_open_and_press_power_button, |
| expected_value=True, timeout_sec=self.cr50.PP_LONG) |
| except Exception, e: |
| logging.info(e) |
| raise |
| finally: |
| self._close_ccd_open_job() |
| logging.info(self.cr50.get_ccd_info()) |
| |
| |
| def _check_open_and_press_power_button(self): |
| """Check stdout and press the power button if prompted. |
| |
| Returns: |
| True if the process is still running. |
| """ |
| logging.info(self._get_ccd_open_output()) |
| self.servo.power_short_press() |
| logging.info('long int power button press') |
| # Give cr50 some time to complete the open process. After the last |
| # power button press cr50 erases nvmem and resets the dut before setting |
| # the state to open. Wait a bit so we don't check the ccd state in the |
| # middle of this reset process. Power button requests happen once a |
| # minute, so waiting 10 seconds isn't a big deal. |
| time.sleep(10) |
| return (self._ccd_open_job.sp.poll() is not None or 'Open' in |
| self.cr50.get_ccd_info()['State']) |
| |
| |
| def _get_ccd_open_output(self): |
| """Read the new output.""" |
| self._ccd_open_job.process_output() |
| self._ccd_open_stdout.seek(self._ccd_open_last_len) |
| output = self._ccd_open_stdout.read() |
| self._ccd_open_last_len = self._ccd_open_stdout.len |
| return output |
| |
| |
| def _close_ccd_open_job(self): |
| """Terminate the process and check the results.""" |
| exit_status = utils.nuke_subprocess(self._ccd_open_job.sp) |
| stdout = self._ccd_open_stdout.getvalue().strip() |
| delattr(self, '_ccd_open_job') |
| if stdout: |
| logging.info('stdout of ccd open:\n%s', stdout) |
| if exit_status: |
| logging.info('exit status: %d', exit_status) |
| if 'Error' in stdout: |
| raise error.TestFail('ccd open Error %s' % |
| stdout.split('Error')[-1]) |
| if self.cr50.OPEN != self.cr50.get_ccd_level(): |
| raise error.TestFail('unable to open cr50: %s' % stdout) |
| else: |
| logging.info('Opened Cr50') |
| |
| |
| def fast_open(self, enable_testlab=False): |
| """Try to use testlab open. If that fails, do regular ap open. |
| |
| Args: |
| enable_testlab: If True, enable testlab mode after cr50 is open. |
| """ |
| # Try to use testlab open first, so we don't have to wait for the |
| # physical presence check. |
| self.cr50.send_command('ccd testlab open') |
| if self.cr50.get_ccd_level() == 'open': |
| return |
| |
| self.enter_mode_after_checking_tpm_state('dev') |
| self.ccd_open_from_ap() |
| self.enter_mode_after_checking_tpm_state('normal') |
| if enable_testlab: |
| self.cr50.set_ccd_testlab('on') |
| |
| |
| def run_gsctool_cmd_with_password(self, password, cmd, name, expect_error): |
| """Run a gsctool command and input the password |
| |
| Args: |
| password: The cr50 password string |
| cmd: The gsctool command |
| name: The name to give the job |
| expect_error: True if the command should fail |
| """ |
| set_pwd_cmd = utils.sh_escape(cmd) |
| full_ssh_command = '%s "%s"' % (self.host.ssh_command(options='-tt'), |
| set_pwd_cmd) |
| stdout = StringIO.StringIO() |
| # Start running the gsctool Command in the background. |
| gsctool_job = utils.BgJob(full_ssh_command, |
| nickname='%s_with_password' % name, |
| stdout_tee=stdout, |
| stderr_tee=utils.TEE_TO_LOGS, |
| stdin=subprocess.PIPE) |
| if gsctool_job == None: |
| raise error.TestFail('could not start gsctool command %r', cmd) |
| |
| try: |
| # Wait for enter prompt |
| gsctool_job.process_output() |
| logging.info(stdout.getvalue().strip()) |
| # Enter the password |
| gsctool_job.sp.stdin.write(password + '\n') |
| |
| # Wait for re-enter prompt |
| gsctool_job.process_output() |
| logging.info(stdout.getvalue().strip()) |
| # Re-enter the password |
| gsctool_job.sp.stdin.write(password + '\n') |
| time.sleep(self.cr50.CONSERVATIVE_CCD_WAIT) |
| gsctool_job.process_output() |
| finally: |
| exit_status = utils.nuke_subprocess(gsctool_job.sp) |
| output = stdout.getvalue().strip() |
| logging.info('%s stdout: %s', name, output) |
| logging.info('%s exit status: %s', name, exit_status) |
| if exit_status: |
| message = ('gsctool %s failed using %r: %s %s' % |
| (name, password, exit_status, output)) |
| if expect_error: |
| logging.info(message) |
| else: |
| raise error.TestFail(message) |
| elif expect_error: |
| raise error.TestFail('%s with %r did not fail when expected' % |
| (name, password)) |
| |
| |
| def set_ccd_password(self, password, expect_error=False): |
| """Set the ccd password""" |
| # If for some reason the test sets a password and is interrupted before |
| # we can clear it, we want testlab mode to be enabled, so it's possible |
| # to clear the password without knowing it. |
| if not self.cr50.testlab_is_on(): |
| raise error.TestError('Will not set password unless testlab mode ' |
| 'is enabled.') |
| self.run_gsctool_cmd_with_password( |
| password, 'gsctool -a -P', 'set_password', expect_error) |
| |
| |
| def ccd_unlock_from_ap(self, password=None, expect_error=False): |
| """Unlock cr50""" |
| if not password: |
| self.host.run('gsctool -a -U') |
| return |
| self.run_gsctool_cmd_with_password( |
| password, 'gsctool -a -U', 'unlock', expect_error) |
| |
| |
| def enter_mode_after_checking_tpm_state(self, mode): |
| """Reboot to mode if cr50 doesn't already match the state""" |
| # If the device is already in the correct mode, don't do anything |
| if (mode == 'dev') == self.cr50.in_dev_mode(): |
| logging.info('already in %r mode', mode) |
| return |
| |
| self.switcher.reboot_to_mode(to_mode=mode) |
| |
| if (mode == 'dev') != self.cr50.in_dev_mode(): |
| raise error.TestError('Unable to enter %r mode' % mode) |
| |
| |
| def _fwmp_is_cleared(self): |
| """Return True if the FWMP has been created""" |
| res = self.host.run('cryptohome ' |
| '--action=get_firmware_management_parameters', |
| ignore_status=True) |
| if res.exit_status and res.exit_status != self.CLEARED_FWMP_EXIT_STATUS: |
| raise error.TestError('Could not run cryptohome command %r' % res) |
| return self.CLEARED_FWMP_ERROR_MSG in res.stdout |
| |
| |
| def clear_fwmp(self): |
| """Clear the FWMP""" |
| if self._fwmp_is_cleared(): |
| return |
| status = self.host.run('cryptohome --action=tpm_status').stdout |
| logging.debug(status) |
| if 'TPM Owned: true' not in status: |
| self.host.run('cryptohome --action=tpm_take_ownership') |
| self.host.run('cryptohome --action=tpm_wait_ownership') |
| self.host.run('cryptohome ' |
| '--action=remove_firmware_management_parameters') |