blob: 0cf2e28496083c1f00bc6d4f4910e8d817d81155 [file] [log] [blame]
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import os
import pprint
import time
from autotest_lib.client.common_lib import error
from autotest_lib.server.cros import wifi_test_utils
from autotest_lib.server.cros.chaos_ap_configurators import ap_cartridge
from autotest_lib.server.cros.chaos_ap_configurators import \
ap_configurator_config
from autotest_lib.server.cros.chaos_ap_configurators import \
download_chromium_prebuilt
from autotest_lib.server.cros.chaos_config import ChaosAP
from autotest_lib.server.cros.wlan import connector, disconnector
from autotest_lib.server.cros.network import profile_manager
class WiFiChaosConnectionTest(object):
"""Base class for simple (connect/disconnect) dynamic Chaos test.
@attribute host: an Autotest host object, DUT.
@attribute connector: a TracingConnector object.
@attribute disconnector: a Disconnector object.
@attribute error_list: a list of errors, intermediate test failures.
@attribute ap_config: an APConfiguratorConfig object.
@attribute factory: an APConfiguratorFactory object.
@attribute psk_password: a string, password used for PSK authentication.
@attribute PSK: a string, WiFi Pre-Shared Key (Personal) mode.
"""
PSK = 'psk'
FAILED_CONFIG_MSG = 'AP Configuration Failed!'
@property
def psk_password(self):
"""@returns PSK password."""
return self._psk_password
@psk_password.setter
def psk_password(self, password):
"""Sets PSK password.
@param password: a string, PSK password.
"""
self._psk_password = password
def _get_dut_wlan_mac(self):
"""Extracts MAC addr of DUT's WLAN interface.
@return a string, MAC address of a WLAN interface.
@raises TestFail: if error looking up wifi device or its MAC address.
"""
devs = wifi_test_utils.get_wlan_devs(self.host, 'iw')
if not devs:
raise error.TestFail('No wifi devices found on %s.',
self.host.hostname)
logging.info('Found wifi device %s on %s', devs[0], self.host.hostname)
mac = wifi_test_utils.get_interface_mac(self.host, devs[0], 'ip')
if not mac:
raise error.TestFail('No MAC address found for %s on %s.',
devs[0], self.host.hostname)
logging.info('%s has MAC addr %s', devs[0], mac)
return mac
def __init__(self, host, capturer):
"""Initialize.
@param host: an Autotest host object, device under test (DUT).
@param capturer: a PacketCaptureManager object, packet tracer.
"""
self.host = host
self.dut_mac_addr = self._get_dut_wlan_mac()
self.capturer = capturer
self.connector = connector.TracingConnector(self.host, self.capturer)
self.disconnector = disconnector.Disconnector(self.host)
self.error_list = []
self.ap_config = ap_configurator_config.APConfiguratorConfig()
self.psk_password = ''
download_chromium_prebuilt.check_webdriver_ready()
# Test on channel 5 for 2.4GHz band and channel 48 for 5GHz band.
# TODO(tgao): support user-specified channel.
self.band_channel_map = {self.ap_config.BAND_2GHZ: 5,
self.ap_config.BAND_5GHZ: 48}
def __repr__(self):
"""@returns class name, DUT name + MAC addr and packet tracer name."""
return 'class: %s, DUT: %s (MAC addr: %s), capturer: %s' % (
self.__class__.__name__,
self.host.hostname,
self.dut_mac_addr,
self.capturer)
def run_connect_disconnect_test(self, ap_info):
"""Attempts to connect to an AP.
@param ap_info: a dict of attributes of a specific AP.
@return a string (error message) or None.
"""
self.disconnector.disconnect(ap_info['ssid'])
self.connector.set_frequency(ap_info['frequency'])
# Use profile manager to prevent fallback connections.
with profile_manager.ProfileManager(self.host) as pm:
try:
self.connector.connect(
ap_info['ssid'],
security=ap_info.get('security', ''),
psk=ap_info.get(self.PSK, ''),
frequency=ap_info['frequency'])
except (connector.ConnectException,
connector.ConnectFailed,
connector.ConnectTimeout) as e:
error = str(e)
logging.error(error)
return error
finally:
self.disconnector.disconnect(ap_info['ssid'])
def run_ap_test(self, ap_info, tries, log_dir):
"""Runs test on a configured AP.
@param ap_info: a dict of attributes of a specific AP.
@param tries: an integer, number of connection attempts.
@param log_dir: a string, directory to store test logs.
"""
# Enable logging
self.host.run('restart wpasupplicant WPA_DEBUG=excessive')
self.host.run('restart shill SHILL_LOG_SCOPES=wifi SHILL_LOG_LEVEL=-5')
ap_info['failed_iterations'] = []
# Check the AP was successfully configured
if not ap_info['configurator'].get_configuration_success():
ap_info['failed_iterations'].append(
{'error': self.FAILED_CONFIG_MSG,
'try': 0})
self.error_list.append(ap_info)
# Capture screenshot when configuration fails
for image in ap_info['configurator'].screenshot_list:
error = os.path.join(log_dir,
'config_error_screenshot_%d.png' %
ap_info['configurator'].screenshot_list.index(image))
f = open(error, 'wb')
f.write(image.decode('base64'))
f.close()
return
# Make iteration 1-indexed
for iteration in range(1, tries+1):
logging.info('Connection try %d', iteration)
filename = os.path.join(log_dir,
'connect_try_%d' % iteration)
self.connector.set_filename(filename)
resp = self.run_connect_disconnect_test(ap_info)
if resp:
ap_info['failed_iterations'].append({'error': resp,
'try': iteration})
if ap_info['failed_iterations']:
self.error_list.append(ap_info)
def _config_one_ap(self, ap, band, security, mode, visibility):
"""Configures an AP for the test.
@param ap: an APConfigurator object.
@param band: a string, 2.4GHz or 5GHz.
@param security: a string, AP security method.
@param mode: a hexadecimal, 802.11 mode.
@param visibility: a boolean
@returns a dict representing one band of a configured AP.
"""
# Setting the band gets you the bss
ap.set_band(band)
# Remove all white space from the ssid
sanitized_short_name = ap.get_router_short_name().replace(' ', '_')
ssid = '_'.join([sanitized_short_name,
str(self.band_channel_map[band]),
str(band).replace('.', '_')])
ap.power_up_router()
ap.set_channel(self.band_channel_map[band])
ap.set_radio(enabled=True)
ap.set_ssid(ssid)
if ap.is_visibility_supported():
ap.set_visibility(visible=visibility)
ap.set_mode(mode)
if security == self.PSK:
logging.debug('Use PSK security w/ password %s', self.psk_password)
ap.set_security_wpapsk(self.psk_password)
else: # Testing open system, i.e. security = ''
ap.set_security_disabled()
# DO NOT apply_settings() here. Cartridge is used to apply config
# settings to multiple APs in parallel, see config_aps().
return {'configurator': ap,
'bss': ap.get_bss(),
'band': band,
'channel': self.band_channel_map[band],
'frequency': ChaosAP.FREQUENCY_TABLE[
self.band_channel_map[band]],
'radio': True,
'ssid': ssid,
'visibility': visibility,
'security': security,
self.PSK: self.psk_password,
'brand': ap.config_data.get_brand(),
'model': ap.get_router_short_name()}
def _get_mode_type(self, ap, band):
"""Gets 802.11 mode for ap at band.
@param ap: an APConfigurator object.
@param band: a string, 2.4GHz or 5GHz.
@returns a hexadecimal, 802.11 mode or None.
"""
for mode in ap.get_supported_modes():
if mode['band'] == band:
for mode_type in mode['modes']:
if (mode_type & self.ap_config.MODE_N !=
self.ap_config.MODE_N):
return mode_type
def _mark_ap_to_unlock(self, ap, band):
"""Checks if an AP can be unlocked after testing on band.
Assumption: we always test 2.4GHz before 5GHz, enforced in
WiFiChaosTest.run() in chaos_interop_test.py
Rules for unlocking an AP:
- a single-band ap can be unlocked after testing on 2.4GHz band
- a dual-band ap can only be unlocked after testing on 5GHz band
@param band: a string, 2.4GHz or 5GHz.
@returns a boolean True == OK to unlock AP after testing on band.
"""
supported_bands = ap.get_supported_bands()
bands_supported = [d['band'] for d in supported_bands]
if band in bands_supported:
if len(bands_supported) == 1 or band == self.ap_config.BAND_5GHZ:
return True
return False
def config_aps(self, aps, band, security='', visibility=True):
"""Configures a list of APs.
@param aps: a list of APConfigurator objects.
@param band: a string, 2.4GHz or 5GHz.
@param security: a string, AP security method. Defaults to empty string
(i.e. open system). Other possible value is self.PSK.
@param visibility: a boolean. Defaults to True.
@returns a list of dicts, each a return by _config_one_ap().
"""
configured_aps = []
scan_list = []
cartridge = ap_cartridge.APCartridge()
for ap in aps:
if not ap.is_band_and_channel_supported(
band, self.band_channel_map[band]):
logging.info('Skip %s: band %s and channel %d not supported',
ap.get_router_name(), band,
self.band_channel_map[band])
continue
logging.info('Configuring AP %s', ap.get_router_name())
mode = self._get_mode_type(ap, band)
ap_info = self._config_one_ap(ap, band, security, mode, visibility)
ap_info['ok_to_unlock'] = self._mark_ap_to_unlock(ap, band)
configured_aps.append(ap_info)
cartridge.push_configurator(ap)
scan_list.append(ap)
# Apply config settings to multiple APs in parallel.
cartridge.run_configurators()
# iw mlan0 scan for ARM and iw wlan0 scan for x86
scan_bss = 'for device in $(iw dev | grep Interface | awk \
\'{ print $2 }\'); do iw $device scan; done'
start_time = int(time.time())
# Setting 180s as timeout
logging.info('Waiting for the DUT to find BSS... ')
while (int(time.time()) - start_time) < 180 and len(scan_list):
# If command failed: Device or resource busy (-16), run again.
scan_result = self.host.run(scan_bss, ignore_status=True)
if 'busy' in str(scan_result):
continue
for ap in scan_list:
# If configuration failed, do not wait for the bss.
if not ap.get_configuration_success():
scan_list.remove(ap)
continue
bss = ap.get_bss()
if bss in str(scan_result):
# Remove ap from list if we found bss in scan
logging.debug('Found bss %s in scan', bss)
scan_list.remove(ap)
else:
continue
if len(scan_list):
logging.error('These APs were not listed in scan:')
for ap_info in configured_aps:
if ap_info['configurator'] in scan_list:
logging.error('Brand:%s\n\tModel:%s\n\tSSID:%s\n'
'\tBSS:%s'.expandtabs(16),
ap_info['brand'], ap_info['model'],
ap_info['ssid'], ap_info['bss'])
ap_info['configurator'].reset_command_list()
return configured_aps
def power_down(self, ap):
"""Powers down ap.
@param ap: an APConfigurator object.
"""
self.power_down_aps([ap])
def power_down_aps(self, aps):
"""Powers down a list of aps.
@param aps: a list of APConfigurator objects.
"""
cartridge = ap_cartridge.APCartridge()
for ap in aps:
ap.power_down_router()
cartridge.push_configurator(ap)
cartridge.run_configurators()
def check_test_error(self):
"""Checks if any intermediate test failed.
@raises TestError: if the AP could not be configured
@raises TestFail: if self.error_list is not empty and
the AP was configured.
"""
if len(self.error_list) == 0:
return
failures = self.error_list[0]['failed_iterations']
config_failure = False
if failures[0]['error'] == self.FAILED_CONFIG_MSG:
config_failure = True
if config_failure:
msg = ('\nThe AP was not configured correctly, '
'see the ERROR log for more info.\n')
else:
msg = '\nFailed with the following errors:\n'
msg += pprint.pformat(self.error_list)
# This is shared across tests; reset for the next AP.
self.error_list = []
if config_failure:
raise error.TestError(msg)
else:
raise error.TestFail(msg)
def run_once(self, tries=1):
"""Main entry function for autotest.
@param tries: an integer, number of connection attempts.
"""
raise NotImplementedError('Child class must implement this!')