| # Lint as: python2, python3 |
| # Copyright 2015 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
| |
| import re |
| import logging |
| import six |
| from six.moves import range |
| import time |
| |
| from autotest_lib.client.common_lib import error |
| |
| |
| class PDConsoleUtils(object): |
| """Base clase for all PD console utils |
| |
| This class provides a set of APIs for expected Type C PD required actions |
| in TypeC FAFT tests. The base class is specific for Type C console access. |
| |
| """ |
| def __init__(self, console): |
| """Console can be either usbpd, ec, or pdtester UART |
| |
| This object with then be used by the class which creates |
| the PDConsoleUtils class to send/receive commands to UART |
| """ |
| # save console for UART access functions |
| self.console = console |
| |
| def send_pd_command(self, cmd): |
| """Send command to PD console UART |
| |
| @param cmd: pd command string |
| """ |
| self.console.send_command(cmd) |
| |
| def send_pd_command_get_output(self, cmd, regexp, debug_on=True): |
| """Send command to PD console, wait for response |
| |
| @param cmd: pd command string |
| @param regexp: regular expression for desired output |
| """ |
| # Enable PD console debug mode to show control messages |
| if debug_on: |
| self.enable_pd_console_debug() |
| output = self.console.send_command_get_output(cmd, regexp) |
| if debug_on: |
| self.disable_pd_console_debug() |
| return output |
| |
| def send_pd_command_get_reply_msg(self, cmd): |
| """Send PD protocol msg, get PD control msg reply |
| |
| The PD console debug mode is enabled prior to sending |
| a pd protocol message. This allows the |
| control message reply to be extracted. The debug mode |
| is disabled prior to exiting. |
| |
| @param cmd: pd command to issue to the UART console |
| |
| @returns: PD control header message |
| """ |
| m = self.send_pd_command_get_output(cmd, ['RECV\s([\w]+)\W']) |
| ctrl_msg = int(m[0][1], 16) & self.PD_CONTROL_MSG_MASK |
| return ctrl_msg |
| |
| def verify_pd_console(self): |
| """Verify that PD commands exist on UART console |
| |
| Send 'help' command to UART console |
| |
| @returns: True if 'pd' is found, False if not |
| """ |
| |
| l = self.console.send_command_get_output('help', ['(pd)\s+([\w]+)']) |
| if l[0][1] == 'pd': |
| return True |
| else: |
| return False |
| |
| def get_pd_version(self): |
| """Get the version of the PD stack |
| |
| @returns: version of PD stack, one of (1, 2) |
| """ |
| # Match a number or an error ("Wrong number of params") |
| matches = self.console.send_command_get_output('pd version', |
| [r'\s+(\d+|Wrong.*)']) |
| if matches: |
| result = matches[0][1] |
| if result[0].isdigit(): |
| return int(result) |
| return 1 |
| |
| def execute_pd_state_cmd(self, port): |
| """Get PD state for specified channel |
| |
| pd 0/1 state command gives produces 5 fields. The full response |
| line is captured and then parsed to extract each field to fill |
| the dict containing port, polarity, role, pd_state, and flags. |
| |
| @param port: Type C PD port 0 or 1 |
| |
| @returns: A dict with the 5 fields listed above |
| @raises: TestFail if any field not found |
| """ |
| cmd = 'pd' |
| subcmd = 'state' |
| pd_cmd = cmd +" " + str(port) + " " + subcmd |
| time.sleep(self.CURRENT_STATE_PROBE_DELAY) |
| # Two FW versions for this command, get full line. |
| m = self.send_pd_command_get_output(pd_cmd, ['(Port.*) - (Role:.*)\n'], |
| debug_on=False) |
| |
| # Extract desired values from result string |
| state_result = {} |
| pd_state_dict = self.PD_STATE_DICT |
| |
| for key, regexp in six.iteritems(pd_state_dict): |
| value = re.search(regexp, m[0][0]) |
| if value: |
| state_result[key] = value.group(1) |
| else: |
| raise error.TestFail('pd %d state: %r value not found' % |
| (port, key)) |
| |
| return state_result |
| |
| def get_pd_state(self, port): |
| """Get the current PD state |
| |
| """ |
| raise NotImplementedError( |
| 'should be implemented in derived class') |
| |
| def get_pd_port(self, port): |
| """Get the current PD port |
| |
| @param port: Type C PD port 0/1 |
| @returns: current pd state |
| """ |
| pd_dict = self.execute_pd_state_cmd(port) |
| return pd_dict['port'] |
| |
| def get_pd_role(self, port): |
| """Get the current PD power role (source or sink) |
| |
| @param port: Type C PD port 0/1 |
| @returns: current pd state |
| """ |
| pd_dict = self.execute_pd_state_cmd(port) |
| return pd_dict['role'] |
| |
| def get_pd_flags(self, port): |
| """Get the current PD flags |
| |
| @param port: Type C PD port 0/1 |
| @returns: current pd state |
| """ |
| pd_dict = self.execute_pd_state_cmd(port) |
| return pd_dict['flags'] |
| |
| def get_pd_dualrole(self, port): |
| """Get the current PD dualrole setting |
| |
| @param port: Type C PD port 0/1 |
| @returns: current PD dualrole setting, one of (on, off, snk, src) |
| """ |
| |
| if self.per_port_dualrole_setting is True: |
| cmd = 'pd %d dualrole' % port |
| elif self.per_port_dualrole_setting is False: |
| cmd = 'pd dualrole' |
| else: |
| try: |
| self.per_port_dualrole_setting = True |
| return self.get_pd_dualrole(port) |
| except: |
| self.per_port_dualrole_setting = False |
| return self.get_pd_dualrole(port) |
| |
| dualrole_values = self.DUALROLE_VALUES |
| |
| m = self.send_pd_command_get_output( |
| cmd, ['dual-role toggling:\s+([\w ]+)[\r\n]'], debug_on=False) |
| # Find the index according to the output of "pd dualrole" command |
| dual_index = self.DUALROLE_CMD_RESULTS.index(m[0][1]) |
| # Map to a string which is the output of this method |
| return dualrole_values[dual_index] |
| |
| def set_pd_dualrole(self, port, value): |
| """Set pd dualrole |
| |
| """ |
| raise NotImplementedError( |
| 'should be implemented in derived class') |
| |
| def query_pd_connection(self): |
| """Determine if PD connection is present |
| |
| Try the 'pd 0/1 state' command and see if it's in either |
| expected state of a connection. Record the port number |
| that has an active connection |
| |
| @returns: dict with params port, connect, and state |
| """ |
| status = {} |
| port = 0; |
| status['connect'] = False |
| status['port'] = port |
| state = self.get_pd_state(port) |
| # Check port 0 first |
| |
| if self.is_pd_connected(port): |
| status['connect'] = True |
| status['role'] = state |
| else: |
| port = 1 |
| status['port'] = port |
| state = self.get_pd_state(port) |
| logging.info('CHECK PORT 1: %s', state) |
| # Check port 1 |
| if self.is_pd_connected(port): |
| status['connect'] = True |
| status['role'] = state |
| |
| return status |
| |
| def swap_power_role(self, port): |
| """Attempt a power role swap |
| |
| This method attempts to execute a power role swap. A check |
| is made to ensure that dualrole mode is enabled and that |
| a PD contract is currently established. If both checks pass, |
| then the power role swap command is issued. After a delay, |
| if a PD contract is established and the current state does |
| not equal the starting state, then it was successful. |
| |
| @param port: pd port number |
| |
| @returns: True if power swap is successful, False otherwise. |
| """ |
| # Get starting state |
| if self.is_pd_dual_role_enabled(port) == False: |
| logging.info('Dualrole Mode not enabled!') |
| return False |
| if self.is_pd_connected(port) == False: |
| logging.info('PD contract not established!') |
| return False |
| current_pr = self.get_pd_state(port) |
| swap_cmd = 'pd %d swap power' % port |
| self.send_pd_command(swap_cmd) |
| time.sleep(self.CONNECT_TIME) |
| new_pr = self.get_pd_state(port) |
| logging.info('Power swap: %s -> %s', current_pr, new_pr) |
| if self.is_pd_connected(port) == False: |
| return False |
| return bool(current_pr != new_pr) |
| |
| def disable_pd_console_debug(self): |
| """Turn off PD console debug |
| |
| """ |
| cmd = 'pd dump 0' |
| self.send_pd_command(cmd) |
| |
| def enable_pd_console_debug(self): |
| """Enable PD console debug level 1 |
| |
| """ |
| cmd = 'pd dump 2' |
| self.send_pd_command(cmd) |
| |
| def is_pd_flag_set(self, port, key): |
| """Test a bit in PD protocol state flags |
| |
| The flag word contains various PD protocol state information. |
| This method allows for a specific flag to be tested. |
| |
| @param port: Port which has the active PD connection |
| @param key: dict key to retrieve the flag bit mapping |
| |
| @returns True if the bit to be tested is set |
| """ |
| pd_flags = self.get_pd_flags(port) |
| return bool(self.PD_STATE_FLAGS_DICT[key] & int(pd_flags, 16)) |
| |
| def is_pd_connected(self, port): |
| """Check if a PD connection is active |
| |
| @param port: port to be used for pd console commands |
| |
| @returns True if port is in connected state |
| """ |
| return self.is_src_connected(port) or self.is_snk_connected(port) |
| |
| def is_pd_dual_role_enabled(self, port): |
| """Check if a PD device is in dualrole mode |
| |
| @param port: Type C PD port 0/1 |
| |
| @returns True is dualrole mode is active, false otherwise |
| """ |
| drp = self.get_pd_dualrole(port) |
| return drp == 'on' |
| |
| def is_src_connected(self, port, state=None): |
| """Checks if the port is connected as a source |
| |
| @param port: Type C PD port 0/1 |
| @param state: the state to check (None to get current state) |
| |
| @returns True if connected as SRC, False otherwise |
| """ |
| if state is None: |
| state = self.get_pd_state(port) |
| return state in self.get_src_connect_states() |
| |
| def is_snk_connected(self, port, state=None): |
| """Checks if the port is connected as a sink |
| |
| @param port: Type C PD port 0/1 |
| @param state: the state to check (None to get current state) |
| |
| @returns True if connected as SNK, False otherwise |
| """ |
| if state is None: |
| state = self.get_pd_state(port) |
| return state in self.get_snk_connect_states() |
| |
| def is_disconnected(self, port, state=None): |
| """Checks if the port is disconnected |
| |
| @param port: Type C PD port 0/1 |
| @param state: the state to check (None to get current state) |
| |
| @return True if disconnected |
| """ |
| if state is None: |
| state = self.get_pd_state(port) |
| return state in self.get_disconnected_states() |
| |
| def get_src_connect_states(self): |
| """Returns the name of the SRC state |
| |
| """ |
| raise NotImplementedError( |
| 'should be implemented in derived class') |
| |
| def get_snk_connect_states(self): |
| """Returns the name of the SNK state |
| |
| """ |
| raise NotImplementedError( |
| 'should be implemented in derived class') |
| |
| def get_disconnected_states(self): |
| """Returns the names of the disconnected states |
| |
| """ |
| return self.DISCONNECTED_STATES |
| |
| def is_snk_discovery_state(self, port): |
| """Returns true if in snk discovery state, else false |
| |
| @param port: Type C PD port 0/1 |
| |
| @return: True if in SNK Discovery state |
| """ |
| raise NotImplementedError( |
| 'should be implemented in derived class') |
| |
| class TCPMv1ConsoleUtils(PDConsoleUtils): |
| """ Provides a set of methods common to USB PD TCPMv1 FAFT tests |
| |
| Each instance of this class is associated with a particular |
| servo UART console. USB PD tests will typically use the console |
| command 'pd' and its subcommands to control/monitor Type C PD |
| connections. The servo object used for UART operations is |
| passed in and stored when this object is created. |
| |
| """ |
| SRC_CONNECT = ('SRC_READY',) |
| SNK_CONNECT = ('SNK_READY',) |
| SRC_DISC = 'SRC_DISCONNECTED' |
| SNK_DISC = 'SNK_DISCONNECTED' |
| SNK_DISCOVERY = 'SNK_DISCOVERY' |
| DRP_AUTO_TOGGLE = 'DRP_AUTO_TOGGLE' |
| DISCONNECTED_STATES = (SRC_DISC, SNK_DISC, DRP_AUTO_TOGGLE) |
| |
| PD_MAX_PORTS = 2 |
| CONNECT_TIME = 4 |
| |
| CURRENT_STATE_PROBE_DELAY = 2 |
| DUALROLE_QUERY_DELAY = 1 |
| # Dualrole input/output values of methods in this class. |
| DUALROLE_VALUES = ['on', 'off', 'snk', 'src'] |
| # Strings passing to the console command "pd dualrole" |
| DUALROLE_CMD_ARGS = ['on', 'off', 'sink', 'source'] |
| # Strings returned from the console command "pd dualrole" |
| DUALROLE_CMD_RESULTS = ['on', 'off', 'force sink', 'force source'] |
| |
| # Some old firmware uses a single dualrole setting for all ports; while |
| # some new firmware uses a per port dualrole setting. This flag will be |
| # initialized to True or False. |
| # TODO: Remove this flag when the old setting phases out |
| per_port_dualrole_setting = None |
| |
| # Dictionary for 'pd 0/1 state' parsing |
| PD_STATE_DICT = { |
| 'port': 'Port\s+([\w]+)', |
| 'role': 'Role:\s+([\w]+-[\w]+)', |
| 'pd_state': 'State:\s+([\w()]+)', |
| 'flags': 'Flags:\s+([\w]+)', |
| 'polarity': '(CC\d)' |
| } |
| |
| # Regex to match PD state name; work for both old and new formats |
| RE_PD_STATE = r"(\d+)?\(?([\w_]+)?\)?" |
| # Copied from ec repo: common/usb_pd_protocol.c |
| PD_STATE_NAMES = [ |
| "DISABLED", # index: 0 |
| "SUSPENDED", |
| "SNK_DISCONNECTED", |
| "SNK_DISCONNECTED_DEBOUNCE", |
| "SNK_HARD_RESET_RECOVER", |
| "SNK_DISCOVERY", # index: 5 |
| "SNK_REQUESTED", |
| "SNK_TRANSITION", |
| "SNK_READY", |
| "SNK_SWAP_INIT", |
| "SNK_SWAP_SNK_DISABLE", # index: 10 |
| "SNK_SWAP_SRC_DISABLE", |
| "SNK_SWAP_STANDBY", |
| "SNK_SWAP_COMPLETE", |
| "SRC_DISCONNECTED", |
| "SRC_DISCONNECTED_DEBOUNCE", # index: 15 |
| "SRC_HARD_RESET_RECOVER", |
| "SRC_STARTUP", |
| "SRC_DISCOVERY", |
| "SRC_NEGOCIATE", |
| "SRC_ACCEPTED", # index: 20 |
| "SRC_POWERED", |
| "SRC_TRANSITION", |
| "SRC_READY", |
| "SRC_GET_SNK_CAP", |
| "DR_SWAP", # index: 25 |
| "SRC_SWAP_INIT", |
| "SRC_SWAP_SNK_DISABLE", |
| "SRC_SWAP_SRC_DISABLE", |
| "SRC_SWAP_STANDBY", |
| "VCONN_SWAP_SEND", # index: 30 |
| "VCONN_SWAP_INIT", |
| "VCONN_SWAP_READY", |
| "SOFT_RESET", |
| "HARD_RESET_SEND", |
| "HARD_RESET_EXECUTE", # index: 35 |
| "BIST_RX", |
| "BIST_TX", |
| "DRP_AUTO_TOGGLE", |
| ] |
| |
| # Dictionary for PD control message types |
| PD_CONTROL_MSG_MASK = 0x1f |
| PD_CONTROL_MSG_DICT = { |
| 'GoodCRC': 1, |
| 'GotoMin': 2, |
| 'Accept': 3, |
| 'Reject': 4, |
| 'Ping': 5, |
| 'PS_RDY': 6, |
| 'Get_Source_Cap': 7, |
| 'Get_Sink_Cap': 8, |
| 'DR_Swap': 9, |
| 'PR_Swap': 10, |
| 'VCONN_Swap': 11, |
| 'Wait': 12, |
| 'Soft_Reset': 13 |
| } |
| |
| # Dictionary for PD firmware state flags |
| PD_STATE_FLAGS_DICT = { |
| 'power_swap': 1 << 1, |
| 'data_swap': 1 << 2, |
| 'data_swap_active': 1 << 3, |
| 'vconn_on': 1 << 12 |
| } |
| |
| def _normalize_pd_state(self, state): |
| """Normalize the PD state name which handles both old and new formats. |
| |
| The old format is like: "SNK_READY" |
| The new format is like: "8()" if debug_level == 0, or |
| "8(SNK_READY)" if debug_level > 0 |
| |
| This method will convert the new format to the old one. |
| |
| @param state: The raw PD state text |
| |
| @returns: The normalized PD state name |
| @raises: TestFail if unexpected PD state format |
| """ |
| m = re.match(self.RE_PD_STATE, state) |
| if m and any(m.groups()): |
| state_index, state_name = m.groups() |
| if state_index is None: |
| # The old format: return the name |
| return state_name |
| # The new format: map the index to a name |
| mapped_name = self.PD_STATE_NAMES[int(state_index)] |
| if state_name is not None: |
| assert mapped_name == state_name |
| return mapped_name |
| else: |
| raise error.TestFail('Unexpected PD state format: %s' % state) |
| |
| def get_pd_state(self, port): |
| """Get the current PD state |
| |
| @param port: Type C PD port 0/1 |
| @returns: current pd state |
| """ |
| |
| pd_dict = self.execute_pd_state_cmd(port) |
| return self._normalize_pd_state(pd_dict['pd_state']) |
| |
| def set_pd_dualrole(self, port, value): |
| """Set pd dualrole |
| |
| It can be set to either: |
| 1. on |
| 2. off |
| 3. snk (force sink mode) |
| 4. src (force source mode) |
| After setting, the current value is read to confirm that it |
| was set properly. |
| |
| @param port: Type C PD port 0/1 |
| @param value: One of the 4 options listed |
| """ |
| dualrole_values = self.DUALROLE_VALUES |
| # If the dualrole setting is not initialized, call the get method to |
| # initialize it. |
| if self.per_port_dualrole_setting is None: |
| self.get_pd_dualrole(port) |
| |
| # Get string required for console command |
| dual_index = dualrole_values.index(value) |
| # Create console command |
| if self.per_port_dualrole_setting is True: |
| cmd = 'pd %d dualrole %s' % (port, self.DUALROLE_CMD_ARGS[dual_index]) |
| elif self.per_port_dualrole_setting is False: |
| cmd = 'pd dualrole %s' % (self.DUALROLE_CMD_ARGS[dual_index]) |
| else: |
| raise error.TestFail("dualrole error") |
| |
| self.console.send_command(cmd) |
| time.sleep(self.DUALROLE_QUERY_DELAY) |
| # Get current setting to verify that command was successful |
| dual = self.get_pd_dualrole(port) |
| # If it doesn't match, then raise error |
| if dual != value: |
| raise error.TestFail("dualrole error: " + value + " != " + dual) |
| |
| def get_src_connect_states(self): |
| """Returns the name of the SRC state |
| |
| """ |
| return self.SRC_CONNECT |
| |
| def get_snk_connect_states(self): |
| """Returns the name of the SRC state |
| |
| """ |
| return self.SNK_CONNECT |
| |
| def is_snk_discovery_state(self, port): |
| """Returns true if in snk discovery state, else false |
| |
| @param port: Type C PD port 0/1 |
| |
| @return: True if in SNK Discovery state |
| """ |
| state = self.get_pd_state(port) |
| return state == self.SNK_DISCOVERY |
| |
| class TCPMv2ConsoleUtils(PDConsoleUtils): |
| """ Provides a set of methods common to USB PD TCPMv1 FAFT tests |
| |
| Each instance of this class is associated with a particular |
| servo UART console. USB PD tests will typically use the console |
| command 'pd' and its subcommands to control/monitor Type C PD |
| connections. The servo object used for UART operations is |
| passed in and stored when this object is created. |
| |
| """ |
| SRC_CONNECT = ('Attached.SRC', 'UnorientedDebugAccessory.SRC') |
| SNK_CONNECT = ('Attached.SNK', 'DebugAccessory.SNK') |
| SRC_DISC = 'Unattached.SRC' |
| SNK_DISC = 'Unattached.SNK' |
| DRP_AUTO_TOGGLE = 'DRPAutoToggle' |
| LOW_POWER_MODE = 'LowPowerMode' |
| SNK_DISCOVERY = 'PE_SNK_DISCOVERY' |
| DISCONNECTED_STATES = (SRC_DISC, SNK_DISC, DRP_AUTO_TOGGLE, LOW_POWER_MODE) |
| |
| PD_MAX_PORTS = 2 |
| CONNECT_TIME = 4 |
| |
| CURRENT_STATE_PROBE_DELAY = 2 |
| DUALROLE_QUERY_DELAY = 1 |
| # Dualrole input/output values of methods in this class. |
| DUALROLE_VALUES = ['on', 'off', 'sink', 'source'] |
| # Strings passing to the console command "pd dualrole" |
| DUALROLE_CMD_ARGS = ['on', 'off', 'sink', 'source'] |
| # Strings returned from the console command "pd dualrole" |
| DUALROLE_CMD_RESULTS = ['on', 'off', 'force sink', 'force source'] |
| |
| # Some old firmware uses a single dualrole setting for all ports; while |
| # some new firmware uses a per port dualrole setting. This flag will be |
| # initialized to True or False. |
| # TODO: Remove this flag when the old setting phases out |
| per_port_dualrole_setting = None |
| |
| # Dictionary for 'pd 0/1 state' parsing |
| PD_STATE_DICT = { |
| 'port': 'Port\s+([\w]+)', |
| 'role': 'Role:\s+([\w]+-[\w]+)', |
| 'pd_state': 'TC State:\s+([\w().]+)', |
| 'flags': 'Flags:\s+([\w]+)', |
| 'pe_state': 'PE State:\s+(\w*)', |
| 'polarity': '(CC\d)' |
| } |
| |
| # Regex to match PD state name; work for both old and new formats |
| RE_PD_STATE = r"(\d+)?\(?([\w_]+)?\)?" |
| |
| # Dictionary for PD control message types |
| PD_CONTROL_MSG_MASK = 0x1f |
| PD_CONTROL_MSG_DICT = { |
| 'GoodCRC': 1, |
| 'GotoMin': 2, |
| 'Accept': 3, |
| 'Reject': 4, |
| 'Ping': 5, |
| 'PS_RDY': 6, |
| 'Get_Source_Cap': 7, |
| 'Get_Sink_Cap': 8, |
| 'DR_Swap': 9, |
| 'PR_Swap': 10, |
| 'VCONN_Swap': 11, |
| 'Wait': 12, |
| 'Soft_Reset': 13 |
| } |
| |
| # Dictionary for PD firmware state flags |
| PD_STATE_FLAGS_DICT = { |
| 'power_swap': 1 << 1, |
| 'data_swap': 1 << 2, |
| 'data_swap_active': 1 << 3, |
| 'vconn_on': 1 << 12 |
| } |
| |
| def get_pe_state(self, port): |
| """Get the current Policy Engine state |
| |
| @param port: Type C PD port 0/1 |
| @returns: current pe state |
| """ |
| |
| pd_dict = self.execute_pd_state_cmd(port) |
| return pd_dict['pe_state'] |
| |
| def get_pd_state(self, port): |
| """Get the current PD state |
| |
| @param port: Type C PD port 0/1 |
| @returns: current pd state |
| """ |
| |
| pd_dict = self.execute_pd_state_cmd(port) |
| return pd_dict['pd_state'] |
| |
| def set_pd_dualrole(self, port, value): |
| """Set pd dualrole |
| |
| It can be set to either: |
| 1. on |
| 2. off |
| 3. snk (force sink mode) |
| 4. src (force source mode) |
| After setting, the current value is read to confirm that it |
| was set properly. |
| |
| @param port: Type C PD port 0/1 |
| @param value: One of the 4 options listed |
| """ |
| dualrole_values = self.DUALROLE_VALUES |
| |
| if value == 'src': |
| value = 'source' |
| elif value == 'snk': |
| value = 'sink' |
| |
| # Get string required for console command |
| dual_index = dualrole_values.index(value) |
| # Create console command |
| cmd = 'pd %d dualrole %s' % (port, self.DUALROLE_CMD_ARGS[dual_index]) |
| self.console.send_command(cmd) |
| time.sleep(self.DUALROLE_QUERY_DELAY) |
| # Get current setting to verify that command was successful |
| dual = self.get_pd_dualrole(port) |
| # If it doesn't match, then raise error |
| if dual != value: |
| raise error.TestFail("dualrole error: " + value + " != " + dual) |
| |
| def get_src_connect_states(self): |
| """Returns the name of the SRC states |
| |
| @returns: List of connected source state names |
| """ |
| return self.SRC_CONNECT |
| |
| def get_snk_connect_states(self): |
| """Returns the name of the SRC states |
| |
| @returns: List of connected sink state names |
| """ |
| return self.SNK_CONNECT |
| |
| def is_snk_discovery_state(self, port): |
| """Returns true if in snk discovery state, else false |
| |
| @param port: Type C PD port 0/1 |
| |
| @return: True if in SNK Discovery state |
| """ |
| state = self.get_pe_state(port) |
| return state == self.SNK_DISCOVERY |
| |
| class PDConnectionUtils(PDConsoleUtils): |
| """Provides a set of methods common to USB PD FAFT tests |
| |
| This class is used for PD utility methods that require access |
| to both PDTester and DUT PD consoles. |
| |
| """ |
| |
| def __init__(self, dut_console, pdtester_console): |
| """ |
| @param dut_console: PD console object for DUT |
| @param pdtester_console: PD console object for PDTester |
| """ |
| # save console for DUT PD UART access functions |
| self.dut_console = dut_console |
| # save console for PDTester UART access functions |
| self.pdtester_console = pdtester_console |
| |
| def _verify_pdtester_connection(self, port): |
| """Verify DUT to PDTester PD connection |
| |
| This method checks for a PDTester PD connection for the |
| given port by first verifying if a PD connection is present. |
| If found, then it uses a PDTester feature to force a PD disconnect. |
| If the port is no longer in the connected state, and following |
| a delay, is found to be back in the connected state, then |
| a DUT pd to PDTester connection is verified. |
| |
| @param port: DUT pd port to test |
| |
| @returns True if DUT to PDTester pd connection is verified |
| """ |
| DISCONNECT_CHECK_TIME = 2 |
| DISCONNECT_TIME_SEC = 10 |
| # pdtester console command to force PD disconnect |
| disc_cmd = 'fakedisconnect 100 %d' % (DISCONNECT_TIME_SEC * 1000) |
| # Only check for PDTester if DUT has active PD connection |
| if self.dut_console.is_pd_connected(port): |
| # Attempt to force PD disconnection |
| self.pdtester_console.send_pd_command(disc_cmd) |
| time.sleep(DISCONNECT_CHECK_TIME) |
| # Verify that DUT PD port is no longer connected |
| if self.dut_console.is_pd_connected(port) == False: |
| # Wait for disconnect timer and give time to reconnect |
| time.sleep(self.dut_console.CONNECT_TIME + DISCONNECT_TIME_SEC) |
| if self.dut_console.is_pd_connected(port): |
| logging.info('PDTester connection verified on port %d', |
| port) |
| return True |
| else: |
| # Could have disconnected other port, allow it to reconnect |
| # before exiting. |
| time.sleep(self.dut_console.CONNECT_TIME + DISCONNECT_TIME_SEC) |
| return False |
| |
| def find_dut_to_pdtester_connection(self): |
| """Find the PD port which is connected to PDTester |
| |
| @returns DUT pd port number if found, None otherwise |
| """ |
| for port in range(self.dut_console.PD_MAX_PORTS): |
| # Check for DUT to PDTester connection on port |
| if self._verify_pdtester_connection(port): |
| # PDTester PD connection found so exit |
| return port |
| return None |
| |
| def create_pd_console_utils(console): |
| """Factory that detects the proper PDConsole Utils to use for DUT |
| |
| @param console: DUT PD console |
| |
| @returns: An instance of TCPMv1ConsoleUtils or TCPMv2ConsoleUtils |
| """ |
| pd_console_utils = { |
| 1: TCPMv1ConsoleUtils, |
| 2: TCPMv2ConsoleUtils, |
| } |
| |
| version = PDConsoleUtils(console).get_pd_version() |
| logging.debug('%s is TCPM v%s', console, version) |
| cls = pd_console_utils[version] |
| return cls(console) |