blob: 6e4b7a7d457b2b66b546d77ec1b53dff21747618 [file] [log] [blame]
# Copyright (c) 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging, string, time
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib.cros import crash_detector
from autotest_lib.client.common_lib.cros import power_cycle_usb_util
from autotest_lib.server import autotest, test
_WAIT_DELAY = 10
class platform_CFM_USBPeripheralStress(test.test):
"""USB Peripheral Stress Test - HotPlug Stress & Reboot Stress"""
version = 1
def action_login(self):
"""Login i.e. runs using client test
@exception TestFail failed to login within timeout.
"""
self.autotest_client.run_test(self.client_autotest,
exit_without_logout=True)
def hot_plug_check_peripheral(self):
"""Power cycles USB port on which peripheral is connected."""
for peripheral_name, vid_pid in self.peripheral_dict.iteritems():
self.peripheral_name = peripheral_name
self.vid, self.pid = vid_pid
try:
power_cycle_usb_util.power_cycle_usb_vidpid(self.host,
self.board, self.vid, self.pid)
time.sleep(_WAIT_DELAY)
except KeyError:
raise error.TestFail('Couldn\'t find target device: '
'vid:pid {}:{}'.format(self.vid, self.pid))
self.check_usb_enumeration(peripheral_name=self.peripheral_name,
vendor_id=self.vid)
def check_usb_enumeration(self, peripheral_name, vendor_id):
"""Gets usb device type info from lsusb output based on vendor id to
check if usb device is enumerated.
@peripheral_name: USB Peripheral
@vendor_id: Device vendor id.
"""
try:
cmd = 'lsusb -v -d ' + vendor_id + ': | grep iProduct'
cmd_out = self.host.run(cmd, ignore_status=False).stdout.strip()
if cmd_out is not None:
# cmd_out[8:] trims the first 8 characters from the returned
# string which is "iProduct" and displays peripheral name.
logging.info('USB Peripheral {} enumerated successfully'
.format(string.strip(cmd_out[8:])))
except Exception as e:
raise error.TestFail('Couldn\'t find Plugged {} with: vid:'
'{} '.format(peripheral_name, vendor_id))
def upload_crash_count(self, count):
"""Uploads crash count based on length of crash_files list."""
self.output_perf_value(description='number_of_crashes',
value=int(count),
units='count', higher_is_better=False)
def cleanup(self):
"""Reboot host"""
self.host.reboot()
def run_once(self, host, client_autotest, repeat, action_sequence,
peripheral_dict, crash_check=False):
"""Main function to run autotest.
@param host: Host object representing the DUT.
@client_autotest: Client side test to run on host for login action.
@repeat: Number of times "actions" to repeat for stress testing.
@action_sequence: Sequence of actions to perform during test run.
@peripheral_dict: Contains USB peripheral's vid, pid under test.
@crash_check: True to check for crashes, False otherwise.
"""
self.host = host
self.board = host.get_board().split(':')[1]
if not self.board == 'guado':
logging.info("Skipping test, this test only runs on Guado")
return
self.client_autotest = client_autotest
# Login on host, so that Chrome sees all USB Peripherals.
self.autotest_client = autotest.Autotest(self.host)
self.crash_check = crash_check
self.login_status = False
self.peripheral_dict = peripheral_dict
self.action_login()
self.login_status = True
self.action_step = None
action_sequence = action_sequence.upper()
actions = action_sequence.split(',')
if crash_check:
self.detect_crash = crash_detector.CrashDetector(self.host)
self.detect_crash.remove_crash_files()
# Unplug, plug, check usb peripheral and leave plugged.
self.hot_plug_check_peripheral()
for iteration in xrange(1, repeat + 1):
step = 0
for action in actions:
step += 1
action = action.strip()
self.action_step = 'STEP %d.%d. %s' % (iteration, step, action)
logging.info(self.action_step)
if action.startswith('LOGIN'):
if self.login_status:
logging.debug('Skipping login. Already logged in.')
continue
else:
self.action_login()
self.login_status = True
for peripheral_name, vid_pid in self.peripheral_dict.iteritems():
self.peripheral_name = peripheral_name
self.vid, self.pid = vid_pid
self.check_usb_enumeration(self.peripheral_name,
vendor_id=self.vid)
elif action == 'REBOOT':
self.host.reboot()
time.sleep(_WAIT_DELAY)
self.login_status = False
elif action == 'HOTPLUG':
self.hot_plug_check_peripheral()
else:
logging.info('WRONG ACTION: %s .', self.action_step)
# Uploads crash count that occurred during test run.
self.upload_crash_count(len(self.detect_crash.get_crash_files()))