blob: 90d91f3c6afa254b34b1c5a3e004975ddddd2a36 [file] [log] [blame]
# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
from autotest_lib.client.common_lib import error
from autotest_lib.server.cros.network import attenuator
from autotest_lib.server.cros.network import attenuator_hosts
from chromite.lib import timeout_util
HOST_TO_FIXED_ATTENUATIONS = attenuator_hosts.HOST_FIXED_ATTENUATIONS
class AttenuatorController(object):
"""Represents a minicircuits variable attenuator.
This device is used to vary the attenuation between a router and a client.
This allows us to measure throughput as a function of signal strength and
test some roaming situations. The throughput vs signal strength tests
are referred to rate vs range (RvR) tests in places.
"""
@property
def supported_attenuators(self):
"""@return iterable of int attenuators supported on this host."""
return self._fixed_attenuations.keys()
def __init__(self, hostname):
"""Construct a AttenuatorController.
@param hostname: Hostname representing minicircuits attenuator.
"""
self.hostname = hostname
super(AttenuatorController, self).__init__()
part = hostname.split('.', 1)[0]
if part not in HOST_TO_FIXED_ATTENUATIONS.keys():
raise error.TestError('Unexpected RvR host name %r.' % hostname)
self._fixed_attenuations = HOST_TO_FIXED_ATTENUATIONS[part]
num_atten = len(self.supported_attenuators)
self._attenuator = attenuator.Attenuator(hostname, num_atten)
self.set_variable_attenuation(0)
def _approximate_frequency(self, attenuator_num, freq):
"""Finds an approximate frequency to freq.
In case freq is not present in self._fixed_attenuations, we use a value
from a nearby channel as an approximation.
@param attenuator_num: attenuator in question on the remote host. Each
attenuator has a different fixed path loss per frequency.
@param freq: int frequency in MHz.
@returns int approximate frequency from self._fixed_attenuations.
"""
old_offset = None
approx_freq = None
for defined_freq in self._fixed_attenuations[attenuator_num].keys():
new_offset = abs(defined_freq - freq)
if old_offset is None or new_offset < old_offset:
old_offset = new_offset
approx_freq = defined_freq
logging.debug('Approximating attenuation for frequency %d with '
'constants for frequency %d.', freq, approx_freq)
return approx_freq
def close(self):
"""Close variable attenuator connection."""
self._attenuator.close()
def set_total_attenuation(self, atten_db, frequency_mhz,
attenuator_num=None):
"""Set the total attenuation on one or all attenuators.
@param atten_db: int level of attenuation in dB. This must be
higher than the fixed attenuation level of the affected
attenuators.
@param frequency_mhz: int frequency for which to calculate the
total attenuation. The fixed component of attenuation
varies with frequency.
@param attenuator_num: int attenuator to change, or None to
set all variable attenuators.
"""
affected_attenuators = self.supported_attenuators
if attenuator_num is not None:
affected_attenuators = [attenuator_num]
for atten in affected_attenuators:
freq_to_fixed_loss = self._fixed_attenuations[atten]
approx_freq = self._approximate_frequency(atten,
frequency_mhz)
variable_atten_db = atten_db - freq_to_fixed_loss[approx_freq]
self.set_variable_attenuation(variable_atten_db,
attenuator_num=atten)
def set_variable_attenuation(self, atten_db, attenuator_num=None):
"""Set the variable attenuation on one or all attenuators.
@param atten_db: int non-negative level of attenuation in dB.
@param attenuator_num: int attenuator to change, or None to
set all variable attenuators.
"""
affected_attenuators = self.supported_attenuators
if attenuator_num is not None:
affected_attenuators = [attenuator_num]
for atten in affected_attenuators:
try:
self._attenuator.set_atten(atten, atten_db)
if int(self._attenuator.get_atten(atten)) != atten_db:
raise error.TestError('Attenuation did not set as expected '
'on attenuator %d' % atten)
except error.TestError:
self._attenuator.reopen(self.hostname)
self._attenuator.set_atten(atten, atten_db)
if int(self._attenuator.get_atten(atten)) != atten_db:
raise error.TestError('Attenuation did not set as expected '
'on attenuator %d' % atten)
logging.info('%ddb attenuation set successfully on attenautor %d',
atten_db, atten)
def get_minimal_total_attenuation(self):
"""Get attenuator's maximum fixed attenuation value.
This is pulled from the current attenuator's lines and becomes the
minimal total attenuation when stepping through attenuation levels.
@return maximum starting attenuation value
"""
max_atten = 0
for atten_num in self._fixed_attenuations.iterkeys():
atten_values = self._fixed_attenuations[atten_num].values()
max_atten = max(max(atten_values), max_atten)
return max_atten
def set_signal_level(self, client_context, requested_sig_level,
min_sig_level_allowed=-85, tolerance_percent=3, timeout=240):
"""Set wifi signal to desired level by changing attenuation.
@param client_context: Client context object.
@param requested_sig_level: Negative int value in dBm for wifi signal
level to be set.
@param min_sig_level_allowed: Minimum signal level allowed; this is to
ensure that we don't set a signal that is too weak and DUT can
not associate.
@param tolerance_percent: Percentage to be used to calculate the desired
range for the wifi signal level.
"""
atten_db = 0
starting_sig_level = client_context.wifi_signal_level
if not starting_sig_level:
raise error.TestError("No signal detected.")
if not (min_sig_level_allowed <= requested_sig_level <=
starting_sig_level):
raise error.TestError("Requested signal level (%d) is either "
"higher than current signal level (%r) with "
"0db attenuation or lower than minimum "
"signal level (%d) allowed." %
(requested_sig_level,
starting_sig_level,
min_sig_level_allowed))
try:
with timeout_util.Timeout(timeout):
while True:
client_context.reassociate(timeout_seconds=1)
current_sig_level = client_context.wifi_signal_level
logging.info("Current signal level %r", current_sig_level)
if not current_sig_level:
raise error.TestError("No signal detected.")
if self.signal_in_range(requested_sig_level,
current_sig_level, tolerance_percent):
logging.info("Signal level set to %r.",
current_sig_level)
break
if current_sig_level > requested_sig_level:
self.set_variable_attenuation(atten_db)
atten_db +=1
if current_sig_level < requested_sig_level:
self.set_variable_attenuation(atten_db)
atten_db -= 1
except (timeout_util.TimeoutError, error.TestError,
error.TestFail) as e:
raise error.TestError("Not able to set wifi signal to requested "
"level. \n%s" % e)
def signal_in_range(self, req_sig_level, curr_sig_level, tolerance_percent):
"""Check if wifi signal is within the threshold of requested signal.
@param req_sig_level: Negative int value in dBm for wifi signal
level to be set.
@param curr_sig_level: Current wifi signal level seen by the DUT.
@param tolerance_percent: Percentage to be used to calculate the desired
range for the wifi signal level.
@returns True if wifi signal is in the desired range.
"""
min_sig = req_sig_level + (req_sig_level * tolerance_percent / 100)
max_sig = req_sig_level - (req_sig_level * tolerance_percent / 100)
if min_sig <= curr_sig_level <= max_sig:
return True
return False