blob: ff2ef26867ae443094a654d006f5410cbe82dc01 [file] [log] [blame]
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import glob
import hashlib
import logging
import os
import re
import time
from autotest_lib.client.bin import test
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import error
from autotest_lib.client.cros import factory_setup_modules
from cros.factory.test import factory
from cros.factory.event_log import EventLog
from cros.factory.test import leds
from autotest_lib.client.cros.rf import agilent_scpi
from autotest_lib.client.cros.rf import lan_scpi
from autotest_lib.client.cros.rf import rf_utils
from autotest_lib.client.cros.rf.config import PluggableConfig
base_config = PluggableConfig({
'channels': [
# antenna, channel, freq, fixed_rate, range, level,
# power_adjustment, min_avg_power, max_avg_power
#
# The test will fail if the observed average power is not
# between min_avg_power and max_avg_power.
('1 1', 1, 2412e6, 11, 0, -14, -28, None, None),
('2 2', 6, 2437e6, 11, 0, -14, -28, None, None),
('1 1', 11, 2462e6, 11, 0, -14, -28, None, None),
('2 3', 36, 5180e6, 7, -10, -40, -28, None, None),
(None, 64, 5320e6, 7, -10, -40, -32, None, None),
(None, 157, 5785e6, 7, -10, -40, -40, None, None),
]
})
delay_secs = 0.5
def run_cmd(command, delay_secs=0, ignore_status=False):
try:
ret = utils.system_output(command, ignore_status=ignore_status)
time.sleep(delay_secs)
logging.info("Command %s, returned %s" % (command, ret))
return ret
except:
logging.exception("Command Err: %s", command)
if not ignore_status:
raise
def get_ath9k_path():
'''
Gets the absolute path of ath9k.
'''
pattern = "/sys/kernel/debug/ieee80211/phy*/ath9k"
matches = glob.glob(pattern)
if len(matches) != 1:
raise error.TestError('Expected one match for %s but got %s'
% (pattern, matches))
return matches[0]
def get_phy_name(ath9k_path):
re_phy = re.search(r'ieee80211\/(phy[\d]*)\/ath9k' , ath9k_path)
if re_phy:
logging.info('Name of phy[%s]' % re_phy.group(1))
return re_phy.group(1)
else:
raise error.TestError('Expected to find phy name in path[%s]' %
ath9k_path)
class factory_Wifi(test.test):
version = 5
def run_once(self, n4010a_host, n4010a_port=5025, module_paths=None,
config_path=None, interface=None, set_ethernet_ip=None,
retries=5):
assert retries >= 1
module_names = []
try:
kernel_release = utils.system_output('uname -r').strip()
if module_paths:
module_paths = [
path.replace('${kernel_release}', kernel_release)
for path in module_paths]
for path in module_paths:
assert os.path.exists(path), path
assert path.endswith('.ko'), path
# Remove .ko suffix to get the module name
module_names.append(os.path.splitext(
os.path.basename(path))[0])
# Remove the current versions of the modules (in reverse, since
# there may be dependencies)
for module_name in reversed(module_names):
run_cmd('rmmod %s' % module_name,
delay_secs, ignore_status=True)
# Insert the custom modules
for module in module_paths:
run_cmd('insmod %s' % module, delay_secs)
# TODO(itspeter): Check services are stopped by ConnectionManager
if set_ethernet_ip:
rf_utils.SetEthernetIp(set_ethernet_ip, interface)
with leds.Blinker(((leds.LED_NUM, 0.25),
(leds.LED_CAP, 0.25),
(leds.LED_SCR, 0.25),
(leds.LED_CAP, 0.25))):
self._run(n4010a_host, n4010a_port, config_path, retries)
finally:
if module_names:
# Try to remove the custom modules
run_cmd('rmmod %s' % ' '.join(reversed(module_names)),
delay_secs, ignore_status=True)
# Try to modprobe the default modules back in
for module_name in module_names:
run_cmd('modprobe %s' % module_name,
delay_secs, ignore_status=True)
def _run(self, n4010a_host, n4010a_port, config_path, retries):
event_log = EventLog.ForAutoTest()
config = base_config.Read(config_path, event_log=event_log)
n4010a = agilent_scpi.N4010ASCPI(n4010a_host, n4010a_port, timeout=5)
logging.info("Tester ID: %s" % n4010a.id)
power_by_config = {}
failures = []
try:
for channel_info in config['channels']:
(antenna, channel, freq, fixed_rate, range, level,
power_adjustment, min_avg_power, max_avg_power) = channel_info
for try_number in xrange(retries):
logging.info("Try %d for channel %s, antenna %s",
try_number + 1, channel_info, antenna)
if antenna is not None:
# Assign specific antennai routing.
run_cmd("ifconfig wlan0 down", delay_secs)
run_cmd("iw phy %s set antenna %s" % (
get_phy_name(get_ath9k_path()), antenna))
run_cmd("ifconfig wlan0 up")
run_cmd("echo 0 > %s/tx99" % get_ath9k_path(),
delay_secs, ignore_status=True)
# Set up TX99 (continuous transmit) and begin sending.
run_cmd("iw wlan0 set channel %d" % channel, delay_secs)
run_cmd("echo %d > %s/fixed_rate" % (
fixed_rate, get_ath9k_path()), delay_secs)
run_cmd("echo 1 > %s/tx99" % get_ath9k_path(),
delay_secs)
try:
power = n4010a.MeasurePower(freq, range=range,
level=level)
power.avg_power -= power_adjustment
power.peak_power -= power_adjustment
power.tries = try_number + 1
if not rf_utils.IsInRange(
power.avg_power, min_avg_power, max_avg_power):
failures.append('Power for config %s is %s, '
'out of range (%s,%s)'
% ((channel, antenna, fixed_rate),
power.avg_power,
min_avg_power, max_avg_power))
break # Success: Don't retry
except lan_scpi.TimeoutError:
# Try again
logging.info("Timeout on config %s",
(channel, antenna, fixed_rate))
n4010a.Reopen()
power = None
else:
failures.append("Timeout on config (%s, %s, %s)" %
(channel, antenna, fixed_rate))
power_by_config[(channel, antenna, fixed_rate)] = power
if failures:
raise error.TestError('; '.join(failures))
finally:
event_log.Log(
'wifi_power',
power_by_config=dict((k, v and v.__dict__)
for k, v in power_by_config.iteritems()))
logging.info("Power: %s" % power_by_config)
run_cmd("echo 0 > %s/tx99" % get_ath9k_path(),
delay_secs, ignore_status=True)