| # Lint as: python2, python3 |
| # Copyright 2018 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Helper class for managing charging the DUT with Servo v4.""" |
| |
| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
| |
| import logging |
| from six.moves import range |
| import time |
| |
| from autotest_lib.client.common_lib import error |
| from autotest_lib.client.common_lib.cros import retry |
| |
| # Base delay time in seconds for Servo role change and PD negotiation. |
| _DELAY_SEC = 0.1 |
| # Total delay time in minutes for Servo role change and PD negotiation. |
| _TIMEOUT_MIN = 0.3 |
| # Exponential backoff for Servo role change and PD negotiation. |
| _BACKOFF = 2 |
| # Number of attempts to recover Servo v4. |
| _RETRYS = 3 |
| # Seconds to wait after resetting the role on a recovery attempt |
| # before trying to set it to the intended role again. |
| _RECOVERY_WAIT_SEC = 1 |
| # Delay to wait before polling whether the role as been changed successfully. |
| _ROLE_SETTLING_DELAY_SEC = 1 |
| # Timeout in minutes to attempt checking AC information over ssh. |
| # Ethernet connection through the v4 flickers on role change. The usb |
| # adapter needs to reenumerate and the DUT reconnect before information can be |
| # queried. This delay has proven sufficient to overcome this in the current |
| # implementation. |
| _ETH_REENUMERATE_TIMEOUT_MIN = 1 |
| |
| |
| def _invert_role(role): |
| """Helper to invert the role. |
| |
| @param role: role to invert |
| |
| @returns: |
| 'src' if |role| is 'snk' |
| 'snk' if |role| is 'src' |
| """ |
| return 'src' if role == 'snk' else 'snk' |
| |
| class ServoV4ChargeManager(object): |
| """A helper class for managing charging the DUT with Servo v4.""" |
| |
| def __init__(self, host, servo): |
| """Check for correct Servo setup. |
| |
| Make sure that Servo is v4 and can manage charging. Make sure that DUT |
| responds to Servo charging commands. Restore Servo v4 power role after |
| sanity check. |
| |
| @param host: CrosHost object representing the DUT or None. |
| If host is None, then the is_ac_connected check on the |
| host object is skipped. |
| @param servo: a proxy for servod. |
| """ |
| super(ServoV4ChargeManager, self).__init__() |
| self._host = host |
| self._servo = servo |
| if not self._servo.supports_built_in_pd_control(): |
| raise error.TestNAError('Servo setup does not support PD control. ' |
| 'Check logs for details.') |
| |
| self._original_role = self._servo.get('servo_v4_role') |
| if self._original_role == 'snk': |
| self.start_charging() |
| self.stop_charging() |
| elif self._original_role == 'src': |
| self.stop_charging() |
| self.start_charging() |
| else: |
| raise error.TestNAError('Unrecognized Servo v4 power role: %s.' % |
| self._original_role) |
| |
| # TODO(b/129882930): once both sides are stable, remove the _retry_wrapper |
| # wrappers as they aren't needed anymore. The current motivation for the |
| # retry loop in the autotest framework is to have a 'stable' library i.e. |
| # retries but also a mechanism and and easy to remove bridge once the bug |
| # is fixed, and we don't require the bandaid anymore. |
| |
| def _retry_wrapper(self, role, verify): |
| """Try up to |_RETRYS| times to set the v4 to |role|. |
| |
| @param role: string 'src' or 'snk'. If 'src' connect DUT to AC power; if |
| 'snk' disconnect DUT from AC power. |
| @param verify: bool to verify that charging started/stopped. |
| |
| @returns: number of retries needed for success |
| """ |
| for retry in range(_RETRYS): |
| try: |
| self._change_role(role, verify) |
| return retry |
| except error.TestError as e: |
| if retry < _RETRYS - 1: |
| # Ensure this retry loop and logging isn't run on the |
| # last iteration. |
| logging.warning('Failed to set to %s %d times. %s ' |
| 'Trying to cycle through %s to ' |
| 'recover.', role, retry + 1, str(e), |
| _invert_role(role)) |
| # Cycle through the other state before retrying. Do not |
| # verify as this is strictly a recovery mechanism - sleep |
| # instead. |
| self._change_role(_invert_role(role), verify=False) |
| time.sleep(_RECOVERY_WAIT_SEC) |
| logging.error('Giving up on %s.', role) |
| raise e |
| |
| def stop_charging(self, verify=True): |
| """Cut off AC power supply to DUT with Servo. |
| |
| @param verify: whether to verify that charging stopped. |
| |
| @returns: number of retries needed for success |
| """ |
| return self._retry_wrapper('snk', verify) |
| |
| def start_charging(self, verify=True): |
| """Connect AC power supply to DUT with Servo. |
| |
| @param verify: whether to verify that charging started. |
| |
| @returns: number of retries needed for success |
| """ |
| return self._retry_wrapper('src', verify) |
| |
| def restore_original_setting(self, verify=True): |
| """Restore Servo to original charging setting. |
| |
| @param verify: whether to verify that original role was restored. |
| """ |
| self._retry_wrapper(self._original_role, verify) |
| |
| def _change_role(self, role, verify=True): |
| """Change Servo PD role and check if DUT responded accordingly. |
| |
| @param role: string 'src' or 'snk'. If 'src' connect DUT to AC power; if |
| 'snk' disconnect DUT from AC power. |
| @param verify: bool to verify that charging started/stopped. |
| |
| @raises error.TestError: if the role did not change successfully. |
| """ |
| self._servo.set_nocheck('servo_v4_role', role) |
| # Sometimes the role reverts quickly. Add a short delay to let the new |
| # role stabilize. |
| time.sleep(_ROLE_SETTLING_DELAY_SEC) |
| |
| if not verify: |
| return |
| |
| @retry.retry(error.TestError, timeout_min=_TIMEOUT_MIN, |
| delay_sec=_DELAY_SEC, backoff=_BACKOFF) |
| def check_servo_role(role): |
| """Check if servo role is as expected, if not, retry.""" |
| if self._servo.get('servo_v4_role') != role: |
| raise error.TestError('Servo v4 failed to set its PD role to ' |
| '%s.' % role) |
| check_servo_role(role) |
| |
| connected = True if role == 'src' else False |
| |
| @retry.retry(error.TestError, timeout_min=_TIMEOUT_MIN, |
| delay_sec=_DELAY_SEC, backoff=_BACKOFF) |
| def check_ac_connected(connected): |
| """Check if the EC believes an AC charger is connected.""" |
| if not self._servo.has_control('charger_connected'): |
| # TODO(coconutruben): remove this check once labs have the |
| # latest hdctools with the required control. |
| logging.warn('Could not verify %r control as the ' |
| 'control is not available on servod.', |
| 'charger_connected') |
| return |
| ec_opinion = self._servo.get('charger_connected') |
| if ec_opinion != connected: |
| str_lookup = {True: 'connected', False: 'disconnected'} |
| msg = ('EC thinks charger is %s but it should be %s.' |
| % (str_lookup[ec_opinion], |
| str_lookup[connected])) |
| raise error.TestError(msg) |
| |
| check_ac_connected(connected) |
| |
| @retry.retry(error.TestError, timeout_min=_ETH_REENUMERATE_TIMEOUT_MIN, |
| delay_sec=_DELAY_SEC, backoff=_BACKOFF) |
| def check_host_ac(connected): |
| """Check if DUT AC power is as expected, if not, retry.""" |
| if self._host.is_ac_connected() != connected: |
| intent = 'connect' if connected else 'disconnect' |
| raise error.TestError('DUT failed to %s AC power.'% intent) |
| |
| if self._host and self._host.is_up_fast(): |
| # If the DUT has been charging in S3/S5/G3, cannot verify. |
| check_host_ac(connected) |