blob: ed79701c81f0921249a6fe7e1cea98b7821652db [file] [log] [blame]
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging, re, threading, time
from autotest_lib.client.cros.crash_test import CrashTest
from autotest_lib.server import autotest, test
from autotest_lib.client.common_lib import error
_WAIT_DELAY = 10
_LONG_TIMEOUT = 200
_SUSPEND_RESUME_BOARDS = ['daisy', 'panther']
_LOGIN_FAILED = 'DEVICE COULD NOT LOGIN!'
_SUSPEND_FAILED = 'Failed to SUSPEND within timeout'
_RESUME_FAILED = 'Failed to RESUME within timeout'
_CRASH_PATHS = [CrashTest._SYSTEM_CRASH_DIR.replace("/crash",""),
CrashTest._FALLBACK_USER_CRASH_DIR.replace("/crash",""),
CrashTest._USER_CRASH_DIRS.replace("/crash","")]
class platform_ExternalUsbPeripherals(test.test):
"""Uses servo to repeatedly connect/remove USB devices during boot."""
version = 1
def getPluggedUsbDevices(self):
"""Determines the external USB devices plugged
@returns plugged_list: List of plugged usb devices names
"""
lsusb_output = self.host.run('lsusb').stdout.strip()
items = lsusb_output.split('\n')
plugged_list = []
unnamed_device_count = 1
for item in items:
columns = item.split(' ')
if len(columns) == 6 or len(' '.join(columns[6:]).strip()) == 0:
logging.debug('Unnamed device located, adding generic name.')
name = 'Unnamed device %d' % unnamed_device_count
unnamed_device_count += 1
else:
name = ' '.join(columns[6:]).strip()
#Avoid servo components
if not name.startswith('Standard Microsystems Corp'):
plugged_list.append(name)
return plugged_list
def set_hub_power(self, on=True):
"""Setting USB hub power status
@param on: To power on the servo-usb hub or not
"""
reset = 'off'
if not on:
reset = 'on'
self.host.servo.set('dut_hub1_rst1', reset)
self.pluged_status = on
def action_login(self):
"""Login i.e. runs running client test
@exception TestFail failed to login within timeout.
"""
self.autotest_client.run_test(self.client_autotest,
exit_without_logout=True)
def wait_to_disconnect(self, fail_msg,
suspend_timeout = _LONG_TIMEOUT):
"""Wait for DUT to suspend.
@param fail_msg: Failure message
@param suspend_timeout: Time in seconds to wait to disconnect
@exception TestFail if fail to disconnect in time
@returns time took to disconnect
"""
start_time = int(time.time())
if not self.host.ping_wait_down(timeout=suspend_timeout):
raise error.TestFail(fail_msg)
return int(time.time()) - start_time
def wait_to_come_up(self, fail_msg, resume_timeout):
"""Wait for DUT to resume.
@param fail_msg: Failure message
@param resume_timeout: Time in seconds to wait to come up
@exception TestFail if fail to come_up in time
@returns time took to come up
"""
start_time = int(time.time())
if not self.host.wait_up(timeout=resume_timeout):
raise error.TestFail(fail_msg)
return int(time.time()) - start_time
def wait_for_cmd_output(self, cmd, check, timeout, timeout_msg):
"""Waits till command output is meta
@param cmd: executed command
@param check: string to be checked for in cmd output
@param timeout: max time in sec to wait for output
@param timeout_msg: timeout failure message
@returns True if check is found in command output; False otherwise
"""
start_time = int(time.time())
time_delta = 0
while True:
out = self.host.run(cmd, ignore_status=True).stdout.strip()
if len(re.findall(check, out)) > 0:
break
time_delta = int(time.time()) - start_time
if time_delta > timeout:
self.fail_reasons.append('%s - %d sec'
% (timeout_msg, timeout))
return False
time.sleep(0.5)
return True
def action_suspend(self):
"""Suspend i.e. close lid"""
self.host.servo.lid_close()
stime = self.wait_to_disconnect(_SUSPEND_FAILED)
self.suspend_status = True
logging.debug('--- Suspended in %d sec', stime)
def action_resume(self):
"""Resume i.e. open lid"""
self.host.servo.lid_open()
rtime = self.wait_to_come_up(_RESUME_FAILED, _LONG_TIMEOUT)
self.suspend_status = False
logging.debug('--- Resumed in %d sec', rtime)
def powerd_suspend_with_timeout(self, timeout):
"""Suspend the device with wakeup alarm
@param timeout: Wait time for the suspend wakealarm
"""
self.host.run('echo 0 > /sys/class/rtc/rtc0/wakealarm')
self.host.run('echo +%d > /sys/class/rtc/rtc0/wakealarm' % timeout)
self.host.run('powerd_dbus_suspend --delay=0 &')
def suspend_action_resume(self, action):
"""suspends and resumes through powerd_dbus_suspend in thread.
@param action: Action while suspended
"""
# Suspend and wait to be suspended
logging.info('--- SUSPENDING')
thread = threading.Thread(target = self.powerd_suspend_with_timeout,
args = (_LONG_TIMEOUT,))
thread.start()
self.wait_to_disconnect(_SUSPEND_FAILED)
# Execute action after suspending
do_while_suspended = re.findall(r'SUSPEND(\w*)RESUME', action)[0]
plugged_list = self.on_list
logging.info('--- %s-ing', do_while_suspended)
if do_while_suspended =='_UNPLUG_':
self.set_hub_power(False)
plugged_list = self.off_list
elif do_while_suspended =='_PLUG_':
self.set_hub_power(True)
# Press power key and resume ( and terminate thread)
logging.info('--- RESUMING')
self.host.servo.power_key(0.1)
self.wait_to_come_up(_RESUME_FAILED, _LONG_TIMEOUT)
if thread.is_alive():
raise error.TestFail('SUSPEND thread did not terminate!')
def crash_not_detected(self, crash_path):
"""Check for kernel, browser, process crashes
@param crash_path: Crash files path
@returns True if there were not crashes; False otherwise
"""
result = True
if str(self.host.run('ls %s' % crash_path,
ignore_status=True)).find('crash') != -1:
crash_out = self.host.run('ls %s/crash/' % crash_path).stdout
crash_files = crash_out.strip().split('\n')
for crash_file in crash_files:
if crash_file.find('.meta') != -1 and \
crash_file.find('kernel_warning') == -1:
self.fail_reasons.append('CRASH DETECTED in %s/crash: %s' %
(crash_path, crash_file))
result = False
return result
def check_plugged_usb_devices(self):
"""Checks the plugged peripherals match device list.
@returns True if expected USB peripherals are detected; False otherwise
"""
result = True
if self.pluged_status and self.usb_list != None:
# Check for mandatory USb devices passed by usb_list flag
for usb_name in self.usb_list:
found = self.wait_for_cmd_output(
'lsusb', usb_name, _WAIT_DELAY * 3,
'Not detecting %s' % usb_name)
result = result and found
if self.pluged_status:
dev_list = self.on_list
else:
dev_list = self.off_list
time.sleep(_WAIT_DELAY)
on_now = self.getPluggedUsbDevices()
if not len(set(dev_list).difference(set(on_now))) == 0:
self.fail_reasons.append('The list of connected peripherals '
'is wrong. --- Now: %s --- Should be: '
'%s' % (on_now, dev_list))
result = False
return result
def check_usb_peripherals_details(self):
"""Checks the effect from plugged in USB peripherals.
@returns True if command line output is matched successfuly; Else False
"""
usb_check_result = True
for cmd in self.usb_checks.keys():
out_match_list = self.usb_checks.get(cmd)
if cmd.startswith('loggedin:'):
if not self.login_status:
continue
cmd = cmd.replace('loggedin:','')
# Run the usb check command
for out_match in out_match_list:
match_result = self.wait_for_cmd_output(
cmd, out_match, _WAIT_DELAY * 3,
'USB CHECKS DETAILS failed at %s:' % cmd)
usb_check_result = usb_check_result and match_result
return usb_check_result
def check_status(self):
"""Performs checks after each action:
- for USB detected devices
- for generated crash files
- peripherals effect checks on cmd line
@returns True if all of the iteration checks pass; False otherwise.
"""
result = True
if not self.suspend_status:
# Detect the USB peripherals
result = self.check_plugged_usb_devices()
# Check for crash files
for crash_path in _CRASH_PATHS:
result = result and self.crash_not_detected(crash_path)
if self.pluged_status and (self.usb_checks != None):
# Check for plugged USB devices details
result = result and self.check_usb_peripherals_details()
return result
def change_suspend_resume(self, actions):
""" Modifying actions to suspend and resume
Changes suspend and resume actions done with lid_close
to suspend_resume done with powerd_dbus_suspend
"@param actions: the sequence of accions to perform
@returns The changed to suspend_resume action_sequence
"""
susp_resumes = re.findall(r'(SUSPEND,\w*,*RESUME)',actions)
for susp_resume in susp_resumes:
replace_with = susp_resume.replace(',', '_')
actions = actions.replace(susp_resume, replace_with, 1)
return actions
def remove_crash_data(self):
"""Delete crash meta files"""
for crash_path in _CRASH_PATHS:
if not self.crash_not_detected(crash_path):
self.host.run('rm -rf %s/crash' % crash_path,
ignore_status=True)
def run_once(self, host, client_autotest, action_sequence, repeat,
usb_list=None, usb_checks=None):
self.client_autotest = client_autotest
self.host = host
self.autotest_client = autotest.Autotest(self.host)
self.usb_list = usb_list
self.usb_checks = usb_checks
self.suspend_status = False
self.login_status = False
self.exit_without_logout = False
self.fail_reasons = []
self.host.servo.switch_usbkey('dut')
self.host.servo.set('usb_mux_sel3', 'dut_sees_usbkey')
time.sleep(_WAIT_DELAY)
# Collect USB peripherals when unplugged
self.set_hub_power(False)
time.sleep(_WAIT_DELAY)
self.off_list = self.getPluggedUsbDevices()
# Collect USB peripherals when plugged
self.set_hub_power(True)
time.sleep(_WAIT_DELAY * 2)
self.on_list = self.getPluggedUsbDevices()
diff_list = set(self.on_list).difference(set(self.off_list))
if len(diff_list) == 0:
# Fail if no devices detected after
raise error.TestError('No connected devices were detected. Make '
'sure the devices are connected to USB_KEY '
'and DUT_HUB1_USB on the servo board.')
logging.debug('Connected devices list: %s', diff_list)
board = host.get_board().split(':')[1]
action_sequence = action_sequence.upper()
if board in _SUSPEND_RESUME_BOARDS:
action_sequence = self.change_suspend_resume(action_sequence)
actions = action_sequence.split(',')
self.remove_crash_data()
for iteration in xrange(1, repeat + 1):
step = 0
for action in actions:
step += 1
action = action.strip()
action_step = '--- %d.%d. %s---' % (iteration, step, action)
logging.info(action_step)
if action == 'RESUME':
self.action_resume()
elif action == 'UNPLUG':
self.set_hub_power(False)
elif action == 'PLUG':
self.set_hub_power(True)
elif self.suspend_status == False:
if action.startswith('LOGIN'):
if self.login_status:
logging.debug('Skipping login. Already logged in.')
continue
else:
self.action_login()
self.login_status = True
elif action == 'REBOOT':
self.host.reboot()
time.sleep(_WAIT_DELAY * 3)
self.login_status = False
elif action == 'SUSPEND':
self.action_suspend()
elif re.match(r'SUSPEND\w*RESUME',action) is not None:
self.suspend_action_resume(action)
else:
raise error.TestError('--- WRONG ACTION: %s ---.' %
action_step)
if not self.check_status():
raise error.TestFail('Step %s failed with: %s' %
(action_step, str(self.fail_reasons)))