blob: 03eecfcdc193872261eb6693f866e69aae9f1da9 [file] [log] [blame]
# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
from autotest_lib.client.common_lib import error
from autotest_lib.server.cros.network import attenuator
from autotest_lib.server.cros.network import attenuator_hosts
from autotest_lib.utils.frozen_chromite.lib import timeout_util
HOST_TO_FIXED_ATTENUATIONS = attenuator_hosts.HOST_FIXED_ATTENUATIONS
# Fake entry to deal with attenuator not added to attenuator_hosts.py file
FAKE_HOST = HOST_TO_FIXED_ATTENUATIONS['fake-atten-host']
class AttenuatorController(object):
"""Represents a minicircuits variable attenuator.
This device is used to vary the attenuation between a router and a client.
This allows us to measure throughput as a function of signal strength and
test some roaming situations. The throughput vs signal strength tests
are referred to rate vs range (RvR) tests in places.
Fixed attenuatations should be recorded in attenuator_hosts.py else
TestError will be raised when fixed attentuations are accessed.
"""
@property
def supported_attenuators(self):
"""@return iterable of int attenuators supported on this host."""
return self._fixed_attenuations.keys()
def __init__(self, hostname):
"""Construct a AttenuatorController.
@param hostname: Hostname representing minicircuits attenuator.
"""
self.hostname = hostname
super(AttenuatorController, self).__init__()
part = hostname.split('.cros', 1)[0]
if part not in HOST_TO_FIXED_ATTENUATIONS.keys():
logging.debug('Attenuator %s not found in attenuator_host list',
part)
self._fixed_attenuations = FAKE_HOST
else:
self._fixed_attenuations = HOST_TO_FIXED_ATTENUATIONS[part]
num_atten = len(self.supported_attenuators)
self._attenuator = attenuator.Attenuator(hostname, num_atten)
self.set_variable_attenuation(0)
def _approximate_frequency(self, attenuator_num, freq):
"""Finds an approximate frequency to freq.
In case freq is not present in self._fixed_attenuations, we use a value
from a nearby channel as an approximation.
@param attenuator_num: attenuator in question on the remote host. Each
attenuator has a different fixed path loss per frequency.
@param freq: int frequency in MHz.
@returns int approximate frequency from self._fixed_attenuations.
@raises TestError if attenuator is not in attenuator_hosts.py
"""
self._fail_if_fake()
old_offset = None
approx_freq = None
for defined_freq in self._fixed_attenuations[attenuator_num].keys():
new_offset = abs(defined_freq - freq)
if old_offset is None or new_offset < old_offset:
old_offset = new_offset
approx_freq = defined_freq
logging.debug('Approximating attenuation for frequency %d with '
'constants for frequency %d.', freq, approx_freq)
return approx_freq
def close(self):
"""Close variable attenuator connection."""
self._attenuator.close()
def set_total_attenuation(self, atten_db, frequency_mhz,
attenuator_num=None):
"""Set the total attenuation on one or all attenuators.
@param atten_db: int level of attenuation in dB. This must be
higher than the fixed attenuation level of the affected
attenuators.
@param frequency_mhz: int frequency for which to calculate the
total attenuation. The fixed component of attenuation
varies with frequency.
@param attenuator_num: int attenuator to change, or None to
set all variable attenuators.
@raises TestError if attenuator is not in attenuator_hosts.py
"""
self._fail_if_fake()
affected_attenuators = self.supported_attenuators
if attenuator_num is not None:
affected_attenuators = [attenuator_num]
for atten in affected_attenuators:
freq_to_fixed_loss = self._fixed_attenuations[atten]
approx_freq = self._approximate_frequency(atten,
frequency_mhz)
variable_atten_db = atten_db - freq_to_fixed_loss[approx_freq]
self.set_variable_attenuation(variable_atten_db,
attenuator_num=atten)
def set_variable_attenuation(self, atten_db, attenuator_num=None):
"""Set the variable attenuation on one or all attenuators.
@param atten_db: int non-negative level of attenuation in dB.
@param attenuator_num: int attenuator to change, or None to
set all variable attenuators.
"""
affected_attenuators = self.supported_attenuators
if attenuator_num is not None:
affected_attenuators = [attenuator_num]
for atten in affected_attenuators:
try:
self._attenuator.set_atten(atten, atten_db)
if int(self._attenuator.get_atten(atten)) != atten_db:
raise error.TestError('Attenuation did not set as expected '
'on attenuator %d' % atten)
except error.TestError:
self._attenuator.reopen(self.hostname)
self._attenuator.set_atten(atten, atten_db)
if int(self._attenuator.get_atten(atten)) != atten_db:
raise error.TestError('Attenuation did not set as expected '
'on attenuator %d' % atten)
logging.info('%ddb attenuation set successfully on attenautor %d',
atten_db, atten)
def get_minimal_total_attenuation(self):
"""Get attenuator's maximum fixed attenuation value.
This is pulled from the current attenuator's lines and becomes the
minimal total attenuation when stepping through attenuation levels.
@return maximum starting attenuation value
@raises TestError if attenuator is not in attenuator_hosts.py
"""
self._fail_if_fake()
max_atten = 0
for atten_num in self._fixed_attenuations.iterkeys():
atten_values = self._fixed_attenuations[atten_num].values()
max_atten = max(max(atten_values), max_atten)
return max_atten
def set_signal_level(self, client_context, requested_sig_level,
min_sig_level_allowed=-85, tolerance_percent=3, timeout=240):
"""Set wifi signal to desired level by changing attenuation.
@param client_context: Client context object.
@param requested_sig_level: Negative int value in dBm for wifi signal
level to be set.
@param min_sig_level_allowed: Minimum signal level allowed; this is to
ensure that we don't set a signal that is too weak and DUT can
not associate.
@param tolerance_percent: Percentage to be used to calculate the desired
range for the wifi signal level.
"""
atten_db = 0
starting_sig_level = client_context.wifi_signal_level
if not starting_sig_level:
raise error.TestError("No signal detected.")
if not (min_sig_level_allowed <= requested_sig_level <=
starting_sig_level):
raise error.TestError("Requested signal level (%d) is either "
"higher than current signal level (%r) with "
"0db attenuation or lower than minimum "
"signal level (%d) allowed." %
(requested_sig_level,
starting_sig_level,
min_sig_level_allowed))
try:
with timeout_util.Timeout(timeout):
while True:
client_context.reassociate(timeout_seconds=1)
current_sig_level = client_context.wifi_signal_level
logging.info("Current signal level %r", current_sig_level)
if not current_sig_level:
raise error.TestError("No signal detected.")
if self.signal_in_range(requested_sig_level,
current_sig_level, tolerance_percent):
logging.info("Signal level set to %r.",
current_sig_level)
break
if current_sig_level > requested_sig_level:
self.set_variable_attenuation(atten_db)
atten_db +=1
if current_sig_level < requested_sig_level:
self.set_variable_attenuation(atten_db)
atten_db -= 1
except (timeout_util.TimeoutError, error.TestError,
error.TestFail) as e:
raise error.TestError("Not able to set wifi signal to requested "
"level. \n%s" % e)
def signal_in_range(self, req_sig_level, curr_sig_level, tolerance_percent):
"""Check if wifi signal is within the threshold of requested signal.
@param req_sig_level: Negative int value in dBm for wifi signal
level to be set.
@param curr_sig_level: Current wifi signal level seen by the DUT.
@param tolerance_percent: Percentage to be used to calculate the desired
range for the wifi signal level.
@returns True if wifi signal is in the desired range.
"""
min_sig = req_sig_level + (req_sig_level * tolerance_percent / 100)
max_sig = req_sig_level - (req_sig_level * tolerance_percent / 100)
if min_sig <= curr_sig_level <= max_sig:
return True
return False
def _fail_if_fake(self):
""" Raises test error if this attenuator is missing
If an attenuator is missing, we use use a fake entry. This function
will fail the test if the current attenuator is fake.
"""
if self._fixed_attenuations == FAKE_HOST:
raise error.TestError(
'Attenuator %r not found in attenuator_hosts.py' %
self.hostname)