blob: 37d3083bbec1bafe7775f5e5037ae70dff66bbe0 [file] [log] [blame]
# Copyright (c) 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import re
import time
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib.cros import crash_detector
from autotest_lib.client.common_lib.cros import tpm_utils
from autotest_lib.client.common_lib.cros.cfm.usb import cfm_usb_devices
from autotest_lib.client.common_lib.cros.cfm.usb import usb_device_collector
from autotest_lib.server import test
from autotest_lib.server.cros.multimedia import remote_facade_factory
_SHORT_TIMEOUT = 5
# Constants for Hangouts UI peripherals names. Each entry is a tuple in the form
# of (<hangouts_name_regex>, <USB device spec>). We use regular expressions for
# the labels since these are not constant values, i.e. they depend on which USB
# port a device has been plugged in to.
# TODO(dtosic): Move this out to a separate util so other tests can reuse it.
_HANGOUTS_UI_CAMERA_NAMES = [
(r'HD Pro Webcam C920 \(046d:082d\)', cfm_usb_devices.HD_PRO_WEBCAM_C920),
(r'Logitech Webcam C930e \(046d:0843\)',
cfm_usb_devices.LOGITECH_WEBCAM_C930E),
]
_HANGOUTS_UI_MICROPHONE_NAMES = [
(r'Jabra SPEAK 410', cfm_usb_devices.JABRA_SPEAK_410),
# Some cameras also have integrated microphones.
(r'HD Pro Webcam C920: USB Audio:[0-9],[0-9]',
cfm_usb_devices.HD_PRO_WEBCAM_C920),
(r'Logitech Webcam C930e: USB Audio:[0-9],[0-9]',
cfm_usb_devices.LOGITECH_WEBCAM_C930E),
]
_HANGOUTS_UI_SPEAKER_NAMES = [
(r'Jabra SPEAK 410', cfm_usb_devices.JABRA_SPEAK_410),
]
def _get_usb_spec_for_hangouts_device_name(device_name, name_to_spec_map):
"""Returns a USB spec (if found) for the specified name, or None."""
for name_regex, usb_spec in name_to_spec_map:
if re.match(name_regex, device_name):
return usb_spec
return None
class _Peripherals(object):
"""Utility class for storing peripherals names by type."""
def __init__(self):
self._dict = {
'Microphone': [],
'Speaker': [],
'Camera':[]
}
def add_mic(self, spec):
"""Registers a mic name."""
self._add_device('Microphone', spec)
def add_speaker(self, spec):
"""Registers a speaker name."""
self._add_device('Speaker', spec)
def add_camera(self, spec):
"""Registers a camera name."""
self._add_device('Camera', spec)
def _add_device(self, type, spec):
self._dict[type].append(str(spec))
def _get_by_type(self, type):
return self._dict[type]
def get_diff(self, other_peripherals):
"""Returns a diff dictonary for other_peripherals."""
peripherals_diff = {}
for type in self._dict:
self_devices = set(self._get_by_type(type))
other_devices = set(other_peripherals._get_by_type(type))
type_diff = other_devices.difference(self_devices)
if type_diff:
peripherals_diff[type] = type_diff
return peripherals_diff
def __repr__(self):
return str(self._dict)
class enterprise_CFM_USBPeripheralHotplugDetect(test.test):
"""
Uses servo to hotplug and detect USB peripherals on CrOS and hotrod.
It compares attached audio/video peripheral names on CrOS against what
Hangouts detects.
"""
version = 1
def _get_usb_device_types(self, vid_pid):
"""
Returns a list of types (based on lsusb) for the specified vid:pid.
@param vid_pid: The vid:pid of the device to query.
@returns List of device types (string).
"""
details_list = []
cmd = 'lsusb -v -d %s' % vid_pid
cmd_out = self.client.run(cmd).stdout.strip().split('\n')
for line in cmd_out:
if (any(phrase in line for phrase in ('bInterfaceClass',
'wTerminalType'))):
device_type = line.split(None)[2]
details_list.append(device_type)
return details_list
def _get_cros_usb_peripherals(self, peripherals_to_check):
"""
Queries cros for connected USB devices.
@param peripherals_to_check: A list of peripherals to query. If a
connected USB device is not in this list it will be ignored.
@returns A _Peripherals object
"""
cros_peripherals = _Peripherals()
device_collector = usb_device_collector.UsbDeviceCollector(self.client)
for device in device_collector.get_usb_devices():
vid_pid = device.vid_pid
device_types = self._get_usb_device_types(vid_pid)
device_spec = cfm_usb_devices.get_usb_device_spec(vid_pid)
if device_spec in peripherals_to_check:
if 'Microphone' in device_types:
cros_peripherals.add_mic(device_spec)
if 'Speaker' in device_types:
cros_peripherals.add_speaker(device_spec)
if 'Video' in device_types:
cros_peripherals.add_camera(device_spec)
return cros_peripherals
def _enroll_device_and_skip_oobe(self):
"""Enroll device into CFM and skip CFM oobe."""
self.cfm_facade.enroll_device()
self.cfm_facade.skip_oobe_after_enrollment()
self.cfm_facade.wait_for_hangouts_telemetry_commands()
def _get_connected_cfm_hangouts_peripherals(self, peripherals_to_check):
"""
Gets peripheral information as reported by Hangouts.
@param peripherals_to_check: A list of peripherals to query. If a
connected USB device is not in this list it will be ignored.
@returns A _Peripherals object
"""
cfm_peripherals = _Peripherals()
for mic_name in self.cfm_facade.get_mic_devices():
mic_spec = _get_usb_spec_for_hangouts_device_name(
mic_name, _HANGOUTS_UI_MICROPHONE_NAMES)
if mic_spec and mic_spec in peripherals_to_check:
cfm_peripherals.add_mic(mic_spec)
for speaker_name in self.cfm_facade.get_speaker_devices():
speaker_spec = _get_usb_spec_for_hangouts_device_name(
speaker_name, _HANGOUTS_UI_SPEAKER_NAMES)
if speaker_spec and speaker_spec in peripherals_to_check:
cfm_peripherals.add_speaker(speaker_spec)
for camera_name in self.cfm_facade.get_camera_devices():
logging.info("Ccamera_name: %s", camera_name)
camera_spec = _get_usb_spec_for_hangouts_device_name(
camera_name, _HANGOUTS_UI_CAMERA_NAMES)
logging.info("camera_spec: %s", camera_spec)
if camera_spec and camera_spec in peripherals_to_check:
cfm_peripherals.add_camera(camera_spec)
return cfm_peripherals
def _upload_crash_count(self, count):
"""Uploads crash count based on length of crash_files list."""
self.output_perf_value(description='number_of_crashes',
value=int(count),
units='count', higher_is_better=False)
def run_once(self, host, peripherals_to_check):
"""
Main function to run autotest.
@param host: Host object representing the DUT.
@param peripherals_to_check: List of USB specs to check.
"""
self.client = host
self.crash_list =[]
factory = remote_facade_factory.RemoteFacadeFactory(
host, no_chrome=True)
self.cfm_facade = factory.create_cfm_facade()
detect_crash = crash_detector.CrashDetector(self.client)
tpm_utils.ClearTPMOwnerRequest(self.client)
factory = remote_facade_factory.RemoteFacadeFactory(
host, no_chrome=True)
self.cfm_facade = factory.create_cfm_facade()
if detect_crash.is_new_crash_present():
self.crash_list.append('New Warning or Crash Detected before ' +
'plugging in usb peripherals.')
# Turns on the USB port on the servo so that any peripheral connected to
# the DUT it is visible.
if self.client.servo:
self.client.servo.switch_usbkey('dut')
self.client.servo.set('usb_mux_sel3', 'dut_sees_usbkey')
time.sleep(_SHORT_TIMEOUT)
self.client.servo.set('dut_hub1_rst1', 'off')
time.sleep(_SHORT_TIMEOUT)
if detect_crash.is_new_crash_present():
self.crash_list.append('New Warning or Crash Detected after ' +
'plugging in usb peripherals.')
cros_peripherals = self._get_cros_usb_peripherals(
peripherals_to_check)
logging.info('Peripherals detected by CrOS: %s', cros_peripherals)
try:
self._enroll_device_and_skip_oobe()
cfm_peripherals = self._get_connected_cfm_hangouts_peripherals(
peripherals_to_check)
logging.info('Peripherals detected by hangouts: %s',
cfm_peripherals)
except Exception as e:
exception_msg = str(e)
if self.crash_list:
crash_identified_at = (' ').join(self.crash_list)
exception_msg += '. ' + crash_identified_at
self._upload_crash_count(len(detect_crash.get_crash_files()))
raise error.TestFail(str(exception_msg))
if detect_crash.is_new_crash_present():
self.crash_list.append('New Warning or Crash detected after '
'device enrolled into CFM.')
tpm_utils.ClearTPMOwnerRequest(self.client)
if self.crash_list:
crash_identified_at = (', ').join(self.crash_list)
else:
crash_identified_at = 'No Crash or Warning detected.'
self._upload_crash_count(len(detect_crash.get_crash_files()))
peripherals_diff = cfm_peripherals.get_diff(cros_peripherals)
if peripherals_diff:
raise error.TestFail(
'Peripherals do not match.\n'
'Diff: {0} \n Cros: {1} \n Hangouts: {2} \n.'
'No of Crashes: {3}. Crashes: {4}'.format(
peripherals_diff, cros_peripherals, cfm_peripherals,
len(detect_crash.get_crash_files()), crash_identified_at))