firmware_PDVBusRequest: Refactor the test to use pd_device class

This patch refactors firmware_PDVBusRequest test.
Use pd_device class instead of pd_consle because it verifies the PD
connection correctly.

BUG=b:35573842
TEST=Run firmware_PDVBusRequest test. Make sure test works properly.

Change-Id: I45520ba8a28e13e1b333ca7e679677bafd11bdcb
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/third_party/autotest/+/2266472
Reviewed-by: Patryk Duda <pdk@semihalf.com>
Reviewed-by: Wai-Hong Tam <waihong@google.com>
Tested-by: Dawid Niedźwiecki <dn@semihalf.com>
Commit-Queue: Wai-Hong Tam <waihong@google.com>
diff --git a/server/site_tests/firmware_PDVbusRequest/firmware_PDVbusRequest.py b/server/site_tests/firmware_PDVbusRequest/firmware_PDVbusRequest.py
index 8ffad56..fcaa1e7 100644
--- a/server/site_tests/firmware_PDVbusRequest/firmware_PDVbusRequest.py
+++ b/server/site_tests/firmware_PDVbusRequest/firmware_PDVbusRequest.py
@@ -8,7 +8,7 @@
 
 from autotest_lib.client.common_lib import error
 from autotest_lib.server.cros.faft.firmware_test import FirmwareTest
-from autotest_lib.server.cros.servo import pd_console
+from autotest_lib.server.cros.servo import pd_device
 
 
 class firmware_PDVbusRequest(FirmwareTest):
@@ -87,21 +87,24 @@
         """Exectue VBUS request test.
 
         """
-        # TODO(b/35573842): Refactor to use PDPortPartner to probe the port
-        self.pdtester_port = 1 if 'servo_v4' in self.pdtester.servo_type else 0
+        consoles = [self.usbpd, self.pdtester]
+        port_partner = pd_device.PDPortPartner(consoles)
 
-        # create objects for pd utilities
-        pd_dut_utils = pd_console.create_pd_console_utils(self.usbpd)
-        pd_pdtester_utils = pd_console.create_pd_console_utils(self.pdtester)
+        # Identify a valid test port pair
+        port_pair = port_partner.identify_pd_devices()
+        if not port_pair:
+            raise error.TestFail('No PD connection found!')
 
-        # Make sure PD support exists in the UART console
-        if pd_dut_utils.verify_pd_console() == False:
-            raise error.TestFail("pd command not present on console!")
+        for port in port_pair:
+            if port.is_pdtester:
+                self.pdtester_port = port
+            else:
+                self.dut_port = port
 
-        # Type C connection (PD contract) should exist at this point
-        dut_state = pd_dut_utils.query_pd_connection()
-        logging.info('DUT PD connection state: %r', dut_state)
-        if dut_state['connect'] == False:
+        dut_connect_state = self.dut_port.get_pd_state()
+        logging.info('Initial DUT connect state = %s', dut_connect_state)
+
+        if not self.dut_port.is_connected(dut_connect_state):
             raise error.TestFail("pd connection not found")
 
         dut_voltage_limit = self.faft_config.usbc_input_voltage_limit
@@ -126,12 +129,12 @@
             # Wait for new PD contract to be established
             time.sleep(self.PD_SETTLE_DELAY)
             # Get current PDTester PD state
-            pdtester_state = pd_pdtester_utils.get_pd_state(self.pdtester_port)
+            pdtester_state = self.pdtester_port.get_pd_state()
             # If PDTester is in SNK mode and the DUT is in S0, the DUT should
             # source VBUS = USBC_SINK_VOLTAGE. If PDTester is in SNK mode, and
             # the DUT is not in S0, the DUT shouldn't source VBUS, which means
             # VBUS = 0.
-            if pdtester_state == pd_pdtester_utils.SNK_CONNECT:
+            if self.pdtester_port.is_snk(pdtester_state):
                 expected_vbus_voltage = (self.USBC_SINK_VOLTAGE
                         if self.get_power_state() == 'S0' else 0)
                 ok_to_fail = False
@@ -158,11 +161,11 @@
 
         # The DUT must be in SNK mode for the pd <port> dev <voltage>
         # command to have an effect.
-        if not pd_dut_utils.is_snk_connected(dut_state['port']):
+        if not self.dut_port.is_snk():
             # DUT needs to be in SINK Mode, attempt to force change
-            pd_dut_utils.set_pd_dualrole(dut_state['port'], 'snk')
+            self.dut_port.drp_set('snk')
             time.sleep(self.PD_SETTLE_DELAY)
-            if not pd_dut_utils.is_snk_connected(dut_state['port']):
+            if not self.dut_port.is_snk():
                 raise error.TestFail("DUT not able to connect in SINK mode")
 
         logging.info('Start of DUT initiated tests')
@@ -177,8 +180,8 @@
                              'update hdctools and servo_v4 firmware', v)
                 continue
             # Build 'pd <port> dev <voltage> command
-            cmd = 'pd %d dev %d' % (dut_state['port'], v)
-            pd_dut_utils.send_pd_command(cmd)
+            cmd = 'pd %d dev %d' % (self.dut_port.port, v)
+            self.dut_port.utils.send_pd_command(cmd)
             time.sleep(self.PD_SETTLE_DELAY)
             result, result_str = self._compare_vbus(v, ok_to_fail=is_override)
             logging.info('%s, %s', result_str, result)
@@ -187,11 +190,11 @@
 
         # Make sure DUT is set back to its max voltage so DUT will accept all
         # options
-        cmd = 'pd %d dev %d' % (dut_state['port'], dut_voltage_limit)
-        pd_dut_utils.send_pd_command(cmd)
+        cmd = 'pd %d dev %d' % (self.dut_port.port, dut_voltage_limit)
+        self.dut_port.utils.send_pd_command(cmd)
         time.sleep(self.PD_SETTLE_DELAY)
         # The next group of tests need DUT to connect in SNK and SRC modes
-        pd_dut_utils.set_pd_dualrole(dut_state['port'], 'on')
+        self.dut_port.drp_set('on')
 
         if dut_failures:
             logging.error('DUT voltage request failures')