blob: 268c4efc6afa915b955a2ac14e1c8086b1a98ff9 [file] [log] [blame]
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
# TODO(tgao): use XMLHTTP instead of repeated SSH commands. See
# and the code for
# remote_pyauto in client/cros for examples
import logging
import os
from autotest_lib.client.common_lib import error
class ScriptNotFound(Exception):
"""Raised when attenuator scripts cannot be found."""
def __init__(self, script_name):
@param script_name: a string.
super(ScriptNotFound, self).__init__(
'Script %s not found in search path' % script_name)
class Attenuator(object):
"""Attenuator support for WiFiTest class.
This class implements wifi test methods that communicate with a variable
attenuator over SSH in control network.
LAB_DOMAIN = '.cros'
# Host names in RvR test cells.
HOST1 = 'chromeos1-grover-host1'
HOST2 = 'chromeos1-grover-host2'
# Look up fixed path loss by AP frequency, host name and port number.
FREQ_LOSS_MAP = {2437: {HOST1: [44, 44], HOST2: [43, 43]},
5220: {HOST1: [49, 46], HOST2: [46, 46]},
5765: {HOST1: [49, 47], HOST2: [47, 48]}}
# We only use 2 ports out of the 4 available.
PORTS = [0, 1]
# The scripts that run on the attenuator limit the attenuation to this
# plus the fixed attenuation for the specific port.
# TODO(tgao): refactor & merge this w/ site_wifitest.install_script()
def _copy_script(self, script_name, *support_scripts):
"""Copies scripts to DUT.
@param script_name: a string, main script to copy.
@param support_scripts: a list of strings, dependent scripts.
@return a string, script_name if it's copied successfully.
@raises ScriptNotFound if any source script is not found.
if script_name in self._installed_scripts:
return self._installed_scripts[script_name]
script_dir = self._host.get_tmp_dir()
script_file = os.path.join(script_dir, script_name)
for copy_file in [script_name] + list(support_scripts):
# Look either relative to the current location of this file or
# relative to ../client/common_lib/cros/site_attenuator/
# for the script.
script_relative_paths = [['.'],
['..', 'client', 'common_lib', 'cros',
for rel_path in script_relative_paths:
src_file = os.path.join(
*(rel_path + [copy_file]))
if os.path.exists(src_file):
raise ScriptNotFound(copy_file)
dest_file = os.path.join(script_dir,
self._host.send_file(src_file, dest_file, delete_dest=True)
self._installed_scripts[script_name] = script_file
return script_file
def _run_init_script(self, port, cleanup=False):
"""Initializes attenuator port.
@param port: an integer, Beaglebone I/O port number (0 or 1).
@param cleanup: a boolean, True == unexport GPIO pins/reset port.
# TODO(tgao): bundle these scripts as part of a test image?
flag = '-c' if cleanup else '''python "%s" -p %s %s 2>&1' %
(self._init_script, port, flag))
def _init_ports(self):
"""Initializes attenuator port."""
for port in self.PORTS:
def __init__(self, host):
@param host: an Autotest host object, representing the attenuator.
self._host = host
self._installed_scripts = dict()
self.fixed_loss = [] # Fixed path loss on ports 0 and 1 in dB.
# Install Python scripts on attenuator
self._init_script = self._copy_script('',
self._config_script = self._copy_script('',
def cleanup(self):
"""Resets attenuator ports."""
for port in self.PORTS:
self._run_init_script(port, cleanup=True)
def get_attenuation(self, port):
"""Reads current attenuation level in dB.
@param port: an integer, Beaglebone I/O port number (0 or 1).
"""'python "%s" -p %d 2>&1' % (self._config_script, port))
def set_variable_attenuation_on_port(self, port, variable_db):
"""Sets desired variable attenuation in dB.
@param port: port to attenuate.
@param variable_db: an integer, variable attenuation in dB.
fixed_db = self.fixed_loss[port]
total_db = fixed_db + variable_db'python "%s" -p %d -f %d -t %d 2>&1' %
(self._config_script, port, fixed_db, total_db))
def set_variable_attenuation(self, variable_db):
"""Sets desired variable attenuation in dB.
@param variable_db: an integer, variable attenuation in dB.
for port in self.PORTS:
self.set_variable_attenuation_on_port(port, variable_db)
def set_total_attenuation_on_port(self, port, total_db):
"""Sets desired total attenuation in dB.
@param port: port to attenuate.
@param total_db: an integer, total attenuation in dB.
"""'python "%s" -p %d -f %d -t %d 2>&1' %
(self._config_script, port, self.fixed_loss[port],
def set_total_attenuation(self, total_db):
"""Sets desired total attenuation in dB.
@param total_db: an integer, total attenuation in dB.
for port in self.PORTS:
self.set_total_attenuation_on_port(port, total_db)
def _approximate_frequency(freq):
"""Finds an approximate frequency to freq.
In case freq is not present in FREQ_LOSS_MAP, we use a value
from a nearby channel as an approximation.
@param freq an integer, frequency in MHz.
@returns an integer, approximate frequency from FREQ_LOSS_MAP.
old_offset = None
approx_freq = None
for f in sorted(Attenuator.FREQ_LOSS_MAP.keys()):
new_offset = abs(f - freq)
if old_offset is not None and new_offset > old_offset:
old_offset = new_offset
approx_freq = f'Approximating attenuation for frequency %d with '
'constants for frequency %d.', freq, approx_freq)
return approx_freq
def config(self, hostname, freq):
"""Configures variable attenuator for a given frequency.
@param hostname a string, DUT host name.
@param freq an integer, frequency in MHz.
@raises TestError if DUT hostname is unexpected.
if hostname.endswith(self.LAB_DOMAIN):
hostname = hostname[:-len(self.LAB_DOMAIN)]
if hostname not in self.VALID_HOSTS:
raise error.TestError('Unexpected RvR host name %r.' % hostname)
# Look up path loss by frequency. Approximate if needed.
freq_used = freq
if freq not in self.FREQ_LOSS_MAP:
freq_used = self._approximate_frequency(freq)'Looking up fixed path loss on freq %d', freq_used)
self.fixed_loss = self.FREQ_LOSS_MAP[freq_used][hostname]