blob: 16ea3407b5199d2c83cf51885c8f209be7fbb3bd [file] [log] [blame]
# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import time
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib.cros.network import xmlrpc_datatypes
from autotest_lib.server.cros.network import attenuator_controller
from autotest_lib.server.cros.network import hostap_config
from autotest_lib.server.cros.network import wifi_cell_test_base
ATTENUATION_STEP = 4
FINAL_ATTENUATION = 90
ATTENUATORS_PER_PHY = 2
LOW_POWER_SIGNAL = -75
class AttenuatorInfo(object):
"""Contains debug information about an attenuator."""
def __init__(self):
self.attenuator_zeros = False
self.zeroed_scan_signal = None
self.allows_connection = False
self._zeroed_linked_signal = None
@property
def zeroed_linked_signal(self):
"""Returns the linked signal as a float."""
return self._zeroed_linked_signal
@zeroed_linked_signal.setter
def zeroed_linked_signal(self, value):
"""Sets the linked signal to a float.
@param value: the linked signal as a float
"""
if (self._zeroed_linked_signal is None or
value > self._zeroed_linked_signal):
self._zeroed_linked_signal = value
def healthy_attenuator(self):
"""Returns True if the attenuator looks good; False otherwise."""
if (not self.allows_connection and
self.zeroed_scan_signal is None):
return False
elif not self.attenuator_zeros:
return False
if (self.zeroed_scan_signal < LOW_POWER_SIGNAL and
self.zeroed_linked_signal < LOW_POWER_SIGNAL):
return False
return True
class network_WiFi_VerifyAttenuator(wifi_cell_test_base.WiFiCellTestBase):
"""Test that all connected attenuators are functioning correctly."""
version = 1
def _refresh_ap_ssids(self, frequency):
"""Start up new APs, with unique SSIDs.
Doing this before each connection attempt in the test prevents
spillover from previous connection attempts interfering with
our intentions.
@param frequency: int WiFi frequency to configure the APs on.
"""
ap_config = hostap_config.HostapConfig(
frequency=frequency,
mode=hostap_config.HostapConfig.MODE_11N_PURE)
self.context.router.deconfig_aps()
self._all_ssids = list()
for i in range(self.num_phys):
self.context.configure(ap_config, multi_interface=True)
self._all_ssids.append(self.context.router.get_ssid(instance=i))
def _get_phy_num_for_instance(self, instance):
"""Get the phy number corresponding to a hostapd instance.
@param instance: int hostapd instance to test against.
@return int phy number corresponding to that AP (e.g.
for phy0 return 0).
"""
phy = self.context.router.get_hostapd_phy(instance)
if not phy.startswith('phy'):
raise error.TestError('Unexpected phy name %s' % phy)
return int(phy[3:])
def _wait_for_good_signal_levels(self, ssid, attenuator_info):
"""Verify the desired SSID is available with a good signal.
@param ssid: the ssid as a string
@param attenuator_info: dictionary with information about the
current attenuator
@returns an updated attenuator_info dictionary
"""
# In practice it has been observed that going from max attuation
# to 0 attenuation may take several scans until the signal is what
# is desirable.
for _ in range(5):
scan_result = self._client_iw_runner.wait_for_scan_result(
self._client_if, ssids=[ssid], timeout_seconds=10)
if scan_result is None or len(scan_result) == 0:
# Device is busy or not results at this time, try again
continue
for network in scan_result:
if network.ssid == ssid and network.signal < LOW_POWER_SIGNAL:
logging.info('WARNING: Signal strength is less than '
'optimal (%f) consider re-calibrating or '
'check the conductive cabling.',
network.signal)
attenuator_info.zeroed_scan_signal = network.signal
return attenuator_info
elif network.ssid == ssid and network.signal > LOW_POWER_SIGNAL:
logging.info('Scan found an acceptable signal strength %f',
network.signal)
attenuator_info.zeroed_scan_signal = network.signal
return attenuator_info
raise error.TestError('The desired SSID is not visible, the '
'attenuator may be stuck or broken. '
'OR the AP is in a bad state or is '
'bad, try swapping.')
def _verify_attenuator(self, ap_num, frequency_mhz, attenuator_num):
"""Verify that each phy has two attenuators controlling its signal.
@param ap_num: int hostapd instance to test against.
@param frequency_mhz: int frequency of the AP.
@param attenuator_num: int attenuator num controlling one antenna on
the AP.
@return AttenuatorInfo object
"""
logging.info('Verifying attenuator functionality')
ai = AttenuatorInfo()
# Remove knowledge of previous networks from shill.
self.context.client.shill.init_test_network_state()
# Isolate the client entirely.
self.context.attenuator.set_variable_attenuation(
attenuator_controller.MAX_VARIABLE_ATTENUATION)
logging.info('Removing variable attenuation for attenuator=%d',
attenuator_num)
# But allow one antenna on this phy.
self.context.attenuator.set_variable_attenuation(
0, attenuator_num=attenuator_num)
client_conf = xmlrpc_datatypes.AssociationParameters(
ssid=self.context.router.get_ssid(instance=ap_num))
logging.info('Waiting for client signal levels to settle.')
ai = self._wait_for_good_signal_levels(client_conf.ssid, ai)
logging.info('Connecting to %s', client_conf.ssid)
assoc_result = xmlrpc_datatypes.deserialize(
self.context.client.shill.connect_wifi(client_conf))
if not assoc_result.success:
logging.error('Failed to connect to AP %d on attenuator %d',
ap_num, attenuator_num)
return ai
ai.allows_connection = True
ai.zeroed_linked_signal = self.context.client.wifi_signal_level
logging.info('Connected successfully')
start_atten = self.context.attenuator.get_minimal_total_attenuation()
for atten in range(start_atten,
min(start_atten + 20, FINAL_ATTENUATION),
ATTENUATION_STEP):
self.context.attenuator.set_total_attenuation(
atten, frequency_mhz, attenuator_num=attenuator_num)
time.sleep(2)
logging.info('Attenuator %d signal at attenuation=%d is %d dBm.',
attenuator_num, atten,
self.context.client.wifi_signal_level)
return ai
def _debug_phy_attenuator_correspondence(self, visible_ssid, hidden_ssid):
"""Verify that the non-attenuated SSID is the only one that is visble.
If everything is working correctly then all the DUT should see is one
SSID that is not the one which is attenuated. Here are the different
possible failure scenarios:
- Two network_<blah> SSIDs are visible, both with a strong signal
(something greater than -80 dBm) means the rainbow cables on the
attenuation rig are backwards.
- Two network_<blah> SSIDs are visble, the one which should be
hidden is visible with something less than -80 dBm means one
of the attenuators is broken.
- The attenuated SSID is the only visible one, means that rainbow
cables are in the wrong order.
- The visible SSID is not seen, means that both attenuators are
stuck at max attenuation or there is a cabling problem.
@param visible_ssid: string of the SSID that should be visible.
@param hidden_ssid: string of the SSID that should be hidden
"""
scan_result = self._client_iw_runner.wait_for_scan_result(
self._client_if, ssids=[visible_ssid, hidden_ssid])
if scan_result is None or len(scan_result) == 0:
raise error.TestFail('No visible SSIDs. Check cables, the '
'attenuators may be stuck')
elif (len(scan_result) == 1 and scan_result[0].ssid == hidden_ssid):
raise error.TestFail('The wrong network is visible, the rainbow '
'cables are in the wrong order.')
elif len(scan_result) > 1:
for network in scan_result:
if (network.ssid == hidden_ssid):
# The SSID that should be hidden from the DUT is not,
# along with what is presumably the network that should
# be visible. Check the signal strength.
if network.signal > LOW_POWER_SIGNAL:
raise error.TestFail('Two SSIDs are visible, the '
'rainbow cables may be '
'connected backwards.')
else:
logging.warning('The attenuated SSID is visible with '
'very low power (%f), the attenuator '
'may be broken, or this is ghost '
'signal; will attempt to connect',
network.signal)
def _verify_phy_attenuator_correspondence(self, instance):
"""Verify that we cannot connect to a phy when it is attenuated.
Check that putting maximum attenuation on the attenuators expected
to gate a particular phy produces the expected result. We should
be unable to connect to the corresponding SSID.
@param instance: int hostapd instance to verify corresponds to
a particular 2 attenuators.
"""
logging.info('Verifying attenuator correspondence')
# Turn up all attenuation.
self.context.attenuator.set_variable_attenuation(
attenuator_controller.MAX_VARIABLE_ATTENUATION)
# Turn down attenuation for phys other than the instance we're
# interested in.
for other_instance in [x for x in range(self.num_phys)
if x != instance]:
other_phy_num = self._get_phy_num_for_instance(other_instance)
for attenuator_offset in range(ATTENUATORS_PER_PHY):
attenuator_num = (other_phy_num * ATTENUATORS_PER_PHY +
attenuator_offset)
self.context.attenuator.set_variable_attenuation(
0, attenuator_num=attenuator_num)
# The other SSID should be available.
self._debug_phy_attenuator_correspondence(
self.context.router.get_ssid(instance=other_instance),
self.context.router.get_ssid(instance=instance))
# We should be unable to connect.
client_conf = xmlrpc_datatypes.AssociationParameters(
ssid=self.context.router.get_ssid(instance=instance),
expect_failure=True)
self.context.assert_connect_wifi(client_conf)
def run_once(self):
"""For each PHY on a router, for 2 and 5 Ghz bands on a PHY:
1) Set up an AP on the PHY.
2) Walk the attenuators from low to high attenuations.
3) Measure AP signal as attenuation increases.
4) Tester should manually inspect that signal levels decrease linearly
and are consistent from attenuator to attenuator.
"""
# Create some re-usable client objects
self._client_iw_runner = self.context.client.iw_runner
self._client_if = self.context.client.wifi_if
# Verify the client cell is clean
scan_result = self._client_iw_runner.scan(self._client_if)
if scan_result and len(scan_result) > 0:
raise error.TestError('SSIDs found, the cell is not closed or '
'is not cabled correctly.')
attenuators_info = list()
self.num_phys = len(self.context.router.iw_runner.list_phys())
# Pick channels other than the calibrated ones.
for frequency in (2447, 5660):
for instance in range(self.num_phys):
if self.num_phys > 1:
self._refresh_ap_ssids(frequency)
self._verify_phy_attenuator_correspondence(instance)
phy_num = self._get_phy_num_for_instance(instance)
for attenuator_offset in range(ATTENUATORS_PER_PHY):
attenuator_num = (phy_num * ATTENUATORS_PER_PHY +
attenuator_offset)
self._refresh_ap_ssids(frequency)
attenuator_info = self._verify_attenuator(
instance, frequency, attenuator_num)
attenuators_info.append(attenuator_info)
for info in attenuators_info:
if info.healthy_attenuator is False:
raise error.TestFail('One or more attenuators are broken!')