blob: 502af6219582f2adea5948ae2c8317263b2348f4 [file] [log] [blame]
# Copyright 2016 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
# TODO(b/316978014) Remove this file when the test is no longer in any
# PVS plan
import logging
import re
from autotest_lib.client.common_lib import error
from autotest_lib.server.cros.faft.firmware_test import FirmwareTest
from autotest_lib.server.cros.servo import pd_console
class firmware_PDProtocol(FirmwareTest):
"""
Servo based USB PD protocol test.
A charger must be connected to the DUT for this test.
This test checks that when an appropriate zinger charger is connected that
the PD is properly negotiated in dev mode and when booted from a test image
through recovery that the PD is not negotiated.
Example:
PD Successfully negotiated
- ServoV4 in SRC_READY or SNK_READY state
PD not negotiated
- ServoV4 in SRC_DISCOVERY or SNK_DISCOVERY state
"""
version = 1
NEEDS_SERVO_USB = True
PD_NOT_SUPPORTED_PATTERN = 'INVALID_COMMAND'
def initialize(self, host, cmdline_args, ec_wp=None):
"""Initialize the test"""
super(firmware_PDProtocol, self).initialize(host, cmdline_args)
self._setup_ec_write_protect(ec_wp)
self.check_if_pd_supported()
self.switcher.setup_mode('dev')
# The USB disk is used for recovery. But this test wants a fine-grained
# control, i.e. swapping the role just before booting into recovery,
# not swapping here. So set used_for_recovery=False.
self.setup_usbkey(usbkey=True, host=False, used_for_recovery=False)
self.original_dev_boot_usb = self.faft_client.system.get_dev_boot_usb()
logging.info('Original dev_boot_usb value: %s',
str(self.original_dev_boot_usb))
self.hw_wp = self.servo.get('fw_wp_state')
self.sw_wp = self.faft_client.ec.get_write_protect_status()
logging.info('hw_wp=%s, sw_wp=%s', self.hw_wp, self.sw_wp)
def cleanup(self):
"""Cleanup the test"""
if hasattr(self, 'original_dev_boot_usb'):
self.ensure_dev_internal_boot(self.original_dev_boot_usb)
self._restore_ec_write_protect()
if (self.servo.has_control('cold_reset_select')
and hasattr(self, 'cold_reset_select')):
self.servo.set('cold_reset_select', self.cold_reset_select)
super(firmware_PDProtocol, self).cleanup()
def _setup_ec_write_protect(self, ec_wp):
"""Setup for EC write-protection.
It makes sure the EC in the requested write-protection state. If not, it
flips the state. Flipping the write-protection requires DUT reboot.
@param ec_wp: True to request EC write-protected; False to request EC
not write-protected; None to do nothing.
"""
if ec_wp is None:
return
# The default c2d2 cold_reset will reboot GSC, and this breaks the
# hardware write protect. If available, set cold_reset to
# gsc_ecrst_pulse. This should always be available on c2d2 platforms.
if (self.servo.has_control('cold_reset_select')
and self.servo.has_control('gsc_ecrst_pulse')):
self.cold_reset_select = self.servo.get('cold_reset_select')
self.servo.set('cold_reset_select', 'gsc_ecrst_pulse')
self._old_wpsw_cur = self.checkers.crossystem_checker(
{'wpsw_cur': '1'}, suppress_logging=True)
if not self.faft_config.ap_access_ec_flash:
raise error.TestNAError(
"Cannot change EC write-protect for this device")
logging.info(
'The test required EC is %swrite-protected. Reboot '
'and flip the state.', '' if ec_wp else 'not ')
self.switcher.mode_aware_reboot(
'custom', lambda: self.set_ec_write_protect_and_reboot(ec_wp))
wpsw_cur = '1' if ec_wp else '0'
self.check_state((self.checkers.crossystem_checker, {
'wpsw_cur': wpsw_cur
}))
def _restore_ec_write_protect(self):
"""Restore the original EC write-protection."""
if (not hasattr(self,
'_old_wpsw_cur')) or (self._old_wpsw_cur is None):
return
if not self.checkers.crossystem_checker(
{'wpsw_cur': '1' if self._old_wpsw_cur else '0'},
suppress_logging=True):
logging.info('Restore original EC write protection and reboot.')
self.switcher.mode_aware_reboot(
'custom', lambda: self.set_ec_write_protect_and_reboot(
self._old_wpsw_cur))
self.check_state((self.checkers.crossystem_checker, {
'wpsw_cur': '1' if self._old_wpsw_cur else '0'
}))
def check_if_pd_supported(self):
""" Checks if the DUT responds to ectool usbpdpower and skips the test
if it isn't supported on the device.
"""
output = self.run_command('ectool usbpdpower')
if (not output or
self.check_ec_output(output, self.PD_NOT_SUPPORTED_PATTERN)):
raise error.TestNAError("PD not supported skipping test.")
def boot_to_recovery(self):
"""Boot device into recovery mode."""
logging.info('Reboot into Recovery...')
self.switcher.reboot_to_mode(to_mode='rec')
self.check_state((self.checkers.crossystem_checker,
{'mainfw_type': 'recovery'}))
def run_command(self, command):
"""Runs the specified command and returns the output
as a list of strings.
@param command: The command to run on the DUT
@return A list of strings of the command output
"""
logging.info('Command to run: %s', command)
output = self.faft_client.system.run_shell_command_get_output(command)
logging.info('Command output: %s', output)
return output
def check_ec_output(self, output, pattern):
"""Checks if any line in the output matches the given pattern.
@param output: A list of strings containg the output to search
@param pattern: The regex to search the output for
@return True upon first match found or False
"""
logging.info('Checking %s for %s.', output, pattern)
for line in output:
if bool(re.search(pattern, line)):
return True
return False
def run_once(self, host):
"""Main test logic"""
# TODO(b/35573842): Refactor to use PDPortPartner to probe the port
self.pdtester_port = 1 if 'servo_v4' in self.pdtester.servo_type else 0
self.pdtester_pd_utils = pd_console.create_pd_console_utils(
self.pdtester)
self.ensure_dev_internal_boot(self.original_dev_boot_usb)
# Check servo_v4 is negotiated
if self.pdtester_pd_utils.is_disconnected(self.pdtester_port):
raise error.TestNAError('PD not connected')
# TODO(b:152148025): Directly set role as pdsnkdts might fail the
# PD communication. In short term, we could use PR SWAP instead, and
# should also fix the TCPM for handling SRCDTS -> SNKDTS case.
if host.has_battery():
self.set_servo_v4_role_to_snk(pd_comm=True)
self.boot_to_recovery()
# Check PD is not negotiated
# We allow the chromebox/chromebase, to enable the PD in the
# recovery mode.
if (host.get_board_type() != 'CHROMEBOX'
and host.get_board_type() != 'CHROMEBASE'
and not self.pdtester_pd_utils.is_snk_discovery_state(
self.pdtester_port)):
raise error.TestFail('Expect PD to be disabled, WP (HW/SW) %s/%s' %
(self.hw_wp, self.sw_wp))
# Check WP status. Only both SW/HW WP on should pass the test.
if (not self.sw_wp) or ('off' in self.hw_wp):
raise error.TestFail(
'Expect HW and SW WP on, got hw_wp=%s, sw_wp=%s' %
(self.hw_wp, self.sw_wp))