Add basic Plankton DP mux test

USB type C mux DP and USB3 signals. This test switches the mux between
DP and USB3 and checks if USB3 device is detected.

BUG=chrome-os-partner:33362
TEST=manual
  test_that --fast -b host_board dut_ip firmware_TypeCProbeUSB3

Change-Id: I875b6084556ed237f86313b15a7980efa51706e0
Reviewed-on: https://chromium-review.googlesource.com/257946
Reviewed-by: Wai-Hong Tam <waihong@chromium.org>
Commit-Queue: Pin-chih Lin <johnylin@chromium.org>
Tested-by: Pin-chih Lin <johnylin@chromium.org>
diff --git a/server/cros/servo/plankton.py b/server/cros/servo/plankton.py
index 8154f34..0caa646 100644
--- a/server/cros/servo/plankton.py
+++ b/server/cros/servo/plankton.py
@@ -31,6 +31,7 @@
     USBC_COMMAND_DELAY = 0.5
     # Plankton USBC commands.
     USBC_ROLE = 'usbc_role'
+    USBC_MUX = 'usbc_mux'
     RE_USBC_ROLE_VOLTAGE = re.compile(r'src(\d+)v')
     USBC_CHARGING_VOLTAGES = {
         0: 'sink',
@@ -148,3 +149,16 @@
                                          (self.USBC_PD_STATES[state],
                                           self.POLL_STATE_SECS)),
             timeout=self.POLL_STATE_SECS)
+
+
+    def set_usbc_mux(self, mux):
+        """Sets Plankton usbc_mux.
+
+        @param mux: Specified mux state name.
+        """
+        if mux not in ['dp', 'usb']:
+            raise PlanktonError('Invalid mux name: %s, '
+                                'should be either \'dp\' or \'usb\'.' % mux)
+        self.set(self.USBC_MUX, mux)
+        time.sleep(self.USBC_COMMAND_DELAY)
+
diff --git a/server/site_tests/firmware_TypeCProbeUSB3/control b/server/site_tests/firmware_TypeCProbeUSB3/control
new file mode 100644
index 0000000..b941fed
--- /dev/null
+++ b/server/site_tests/firmware_TypeCProbeUSB3/control
@@ -0,0 +1,28 @@
+# Copyright 2015 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+from autotest_lib.server import utils
+
+AUTHOR = "chromeos-plankton"
+NAME = "firmware_TypeCProbeUSB3"
+PURPOSE = "Remotely controlled USB type C super speed device probing test."
+CRITERIA = "This test will fail if DUT can't probe USB3 device on type C port."
+SUITE = "plankton_basic"
+TIME = "FAST"
+TEST_CATEGORY = "Functional"
+TEST_TYPE = "server"
+DEPENDENCIES = "plankton"
+
+DOC = """
+This test remotely switches type C port to USB3 device mode. It fails if
+DUT can't probe the USB3 super speed device on Plankton.
+"""
+
+args_dict = utils.args_to_dict(args)
+
+def run(machine):
+    host = hosts.create_host(machine)
+    job.run_test("firmware_TypeCProbeUSB3", host=host, args_dict=args_dict)
+
+parallel_simple(run, machines)
diff --git a/server/site_tests/firmware_TypeCProbeUSB3/firmware_TypeCProbeUSB3.py b/server/site_tests/firmware_TypeCProbeUSB3/firmware_TypeCProbeUSB3.py
new file mode 100644
index 0000000..3dec3d8
--- /dev/null
+++ b/server/site_tests/firmware_TypeCProbeUSB3/firmware_TypeCProbeUSB3.py
@@ -0,0 +1,82 @@
+# Copyright 2015 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""This is a USB type C USB3 probing test using Plankton board."""
+
+import logging
+import math
+import re
+
+from autotest_lib.client.bin import utils
+from autotest_lib.client.common_lib import error
+from autotest_lib.server import test
+from autotest_lib.server.cros.servo import plankton
+
+class firmware_TypeCProbeUSB3(test.test):
+    """USB type C USB3 probing test."""
+    version = 1
+
+    RE_SUPERSPEED_DEVICE = r' Port \d+: .+, 5000M$'
+    CHARGING_VOLTAGE = 12
+    SINK = 0
+    POLL_USB3_SECS = 10
+
+
+    def get_usb_devices_and_buses(self):
+        """Gets set from the list of USB devices and buses."""
+        result = self._host.run_short('lsusb -t')
+        return set(result.stdout.splitlines())
+
+
+    def is_superspeed(self, lsusb_line):
+        """Checks if the lsusb output contains 5000M."""
+        return re.search(self.RE_SUPERSPEED_DEVICE, lsusb_line)
+
+
+    def run_once(self, host, args_dict):
+        """Checks DUT USB3 device.
+
+        @raise TestFail: If USB3 can't be found when switch to USB mode.
+        """
+        self._host = host
+        plankton_host = plankton.Plankton(args_dict)
+
+        # Enumerate USB devices when charging.
+        plankton_host.charge(self.CHARGING_VOLTAGE)
+        devices_charging = self.get_usb_devices_and_buses()
+        logging.info('number of devices while charging: %d',
+                     len(devices_charging))
+        # Enumerate USB devices when switching to DP.
+        plankton_host.set_usbc_mux('dp')
+        plankton_host.charge(self.SINK)
+        plankton_host.poll_pd_state('sink')
+        devices_dp = self.get_usb_devices_and_buses()
+        plankton_host.charge(self.CHARGING_VOLTAGE)
+        logging.info('number of devices when switch to dp: %d',
+                     len(devices_dp))
+        # Enumerate USB devices when switching to USB3
+        plankton_host.set_usbc_mux('usb')
+        plankton_host.charge(self.SINK)
+        plankton_host.poll_pd_state('sink')
+        # It takes a short period for DUT to get USB3.0 device.
+        utils.poll_for_condition(
+            lambda: len(self.get_usb_devices_and_buses()) > len(devices_dp),
+            exception=error.TestFail(
+                    'Can\'t find new device when switching to USB '
+                    'after %d seconds' % self.POLL_USB3_SECS),
+            timeout=self.POLL_USB3_SECS)
+        devices_usb = self.get_usb_devices_and_buses()
+        plankton_host.charge(self.CHARGING_VOLTAGE)
+        logging.info('number of devices when switch to usb: %d',
+                     len(devices_usb))
+
+        # For Samus USB2.0 signal power swap while usbc_role changes due to
+        # lack of UFP. Port number of lsusb may change since the device is
+        # disconnected for a short period.
+        if len(devices_dp) != len(devices_charging):
+            raise error.TestFail(
+                    'Number of USB devices changed on switching to DP')
+
+        if not any(map(self.is_superspeed, devices_usb - devices_dp)):
+            raise error.TestFail('Can\'t find new USB3 device')