blob: 7a9f80d06bfc89ca413fa947b2e87ee207f20306 [file] [log] [blame] [edit]
# Copyright (c) 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import collections
import glob
import logging
import re
import time
from autotest_lib.client.bin import test
from autotest_lib.client.common_lib import error, utils
from autotest_lib.client.cros import ec as cros_ec, cros_logging
class usbpd_GFU(test.test):
"""Integration test for USB-PD Google Firmware Update (GFU).
Test should:
- interrogate what firmware's are available for each device and for each:
1. Use ectool's flashpd to write RW with that to mimic old hw
- Validate that kernel driver successfully updates to latest RW.
2. Erase RW and see update as well.
TODO:
3. Check that update is checked after S2R.
"""
version = 1
FW_PATH = '/lib/firmware/cros-pd'
# <device>_v<major>.<minor>.<build>-<commit SHA>
FW_NAME_RE = r'%s/(\w+)_v(\d+)\.(\d+)\.(\d+)-([0-9a-f]+).*' % (FW_PATH)
GOOGLE_VID = '0x18d1'
MAX_UPDATE_SECS = 80
FW_UP_DNAME = 'cros_ec_pd_update'
# TODO(tbroch) This will be change once cros_ec_pd_update is abstracted from
# ACPI driver. Will need to fix this once it happens.
FW_UP_DISABLE_PATH = '/sys/devices/LNXSYSTM:00/device:00/PNP0A08:00/device:1e/PNP0C09:00/GOOG0003:00/disable'
# TODO(tbroch) find better way to build this or we'll have to edit test for
# each new PD peripheral.
DEV_MAJOR = dict(zinger=1, minimuffin=2, dingdong=3, hoho=4)
def _index_firmware_avail(self):
"""Index the various USB-PD firmwares in the rootfs.
TODO(crosbug.com/434522) This method will need reworked after we've come
up with a better method for firmware release.
@returns: dictionary of firmwares (key == name, value == list of
firmware paths)
"""
fw_dict = collections.defaultdict(list)
for fw in glob.glob('%s/*_v[1-9].*.bin' % (self.FW_PATH)):
mat = re.match(self.FW_NAME_RE, fw)
if not mat:
continue
name = mat.group(1)
fw_dict[name].append(fw)
return fw_dict
def _is_gfu(self, port):
"""Is it in GFU?
@param port: EC_USBPD object for port.
@returns: True if GFU enterd, False otherwise.
"""
return port.is_amode_supported(self.GOOGLE_VID)
def _is_in_rw(self, port):
"""Is PD device in RW firmware?
@param port: EC_USBPD object for port.
@returns: True if in RW, False otherwise.
"""
flash_info = port.get_flash_info()
logging.debug('flash_info = %s', flash_info)
return flash_info['image_status'] == 'RW'
def _set_kernel_fw_update(self, disable=0):
"""Disable the FW update driver.
@param disable: 1 for disable, 0 for enable.
"""
utils.write_one_line(self.FW_UP_DISABLE_PATH, disable)
if not disable:
# Allow kernel driver time quiesce
time.sleep(2)
def _modify_rw(self, port, rw=None, tries=3):
"""Modify RW of USB-PD device in <port>.
@param port: EC_USBPD object for port.
@param rw: Path to RW FW to write using ectool. If None then uses
/dev/null to invalidate the RW.
@param tries: Number of tries to update RW via flashpd
@returns: True if success, False otherwise.
"""
timeout = self.MAX_UPDATE_SECS
if not rw:
rw = '/dev/null'
tries = 1
self._set_kernel_fw_update(disable=1)
while (tries):
try:
# Note in flashpd <dev_major> <port> <file> the dev_major is
# unnecessary in all cases so its just been set to 0
port.ec_command('flashpd 0 %d %s' % (port.index, rw),
ignore_status=True, timeout=timeout)
except error.CmdTimeoutError:
# TODO(tbroch) could remove try/except if ec_command used run
# instead of system_output + ignore_timeout=True
tries -= 1
continue
if rw != '/dev/null' and not self._is_in_rw(port):
logging.warn('Port%d: not in RW after flashpd ... retrying',
port.index)
tries -= 1
else:
break
self._set_kernel_fw_update()
msg = self._reader.get_last_msg([r'%s.*is in RO' % port.index,
self.FW_UP_DNAME],
retries=5, sleep_seconds=2)
if not msg:
logging.warn('Port%d: Driver does NOT see dev in not in RO',
port.index)
return False
logging.info('Port%d: Driver sees device in RO', port.index)
return True
def _test_update(self, port, rw=None, tries=3):
"""Test RW update.
Method tests the kernel's RW update process by first modifying the
existing RW (either invalidating or rolling it back) via ectool. It
then querys the syslog to validate kernel sees the need for update and
is successful.
@param port: EC_USBPD object for port.
@param rw: path to RW firmware to write via ectool to test upgrade.
@param tries: integer number of attempts to write RW. Necessary as
update is not robust (design decision).
"""
if not tries:
raise error.TestError('Retries must be > 0')
if not self._is_in_rw(port):
raise error.TestError('Port%d: Device is not in RW' % port.index)
fw_up_re = r'%s.*Port%d FW update completed' % (self.FW_UP_DNAME,
port.index)
while tries:
self._reader.set_start_by_current()
rsp = self._modify_rw(port, rw)
if not rsp:
rsp_str = 'Port%d: RW modified with RW=%s failed' % \
(port.index, rw)
if tries:
logging.warn('%s ... retrying.', rsp_str)
tries -= 1
else:
raise error.TestError(rsp_str)
self._reader.set_start_by_current()
msg = self._reader.get_last_msg([fw_up_re],
retries=(self.MAX_UPDATE_SECS / 2),
sleep_seconds=2)
if not msg:
rsp_str = 'Port%d: driver did NOT update FW' % port.index
if tries:
logging.warn('%s ... retrying.', rsp_str)
tries -= 1
continue
else:
raise error.TestError(rsp_str)
logging.info('Port%d: Driver completed RW update', port.index)
# Allow adequate reboot time after RW write completes and device is
# rebooted.
time.sleep(3)
if not self._is_in_rw(port):
rsp_str = 'Port%d: Device is not in RW' % port.index
if tries:
logging.warn('%s ... retrying.', rsp_str)
tries -= 1
continue
else:
raise error.TestError(rsp_str)
break # success #
def _test_rw_rollback(self, port, fw_dict):
"""Test rolling back RW firmware.
@param port: EC_USBPD object for port.
@param fw_dict: dictionary of firmwares.
"""
self._set_kernel_fw_update()
# test old RW update
flash_info = port.get_flash_info()
for dev_name in fw_dict.keys():
if flash_info['dev_major'] == self.DEV_MAJOR[dev_name]:
for old_rw in sorted(fw_dict[dev_name], reverse=True)[1:]:
logging.info('Port%d: Rollback test %s to %s',
port.index, dev_name, old_rw)
self._test_update(port, rw=old_rw)
break
def _test_ro_only(self, port, ro_reps):
"""Test FW update on device with RO only.
@param port: EC_USBPD object for port.
@param ro_reps: Number of times to repeat test.
"""
# test update in RO ro_reps times
for i in xrange(ro_reps):
logging.info('RO Loop%d', i)
self._test_update(port)
def run_once(self, ro_reps=1):
fw_dict = self._index_firmware_avail()
self._usbpd = cros_ec.EC_USBPD()
self._reader = cros_logging.LogReader()
for port in self._usbpd.ports:
if not port.is_dfp():
continue
logging.info('Port%d: is a DFP', port.index)
if not self._is_gfu(port):
continue
logging.info('Port%d: supports GFU', port.index)
self._test_rw_rollback(port, fw_dict)
self._test_ro_only(port, ro_reps)
def cleanup(self):
self._set_kernel_fw_update()