# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import re
import signal
import time
from autotest_lib.client.common_lib import error
from import interface
from import iw_runner
from import ping_runner
from import xmlrpc_datatypes
from autotest_lib.client.cros import constants
from autotest_lib.server import autotest
from autotest_lib.server import site_linux_system
from autotest_lib.server.cros import remote_command
from autotest_lib.server.cros import wifi_test_utils
from import packet_capturer
from import wpa_cli_proxy
from autotest_lib.server.hosts import adb_host
class WiFiClient(object):
"""WiFiClient is a thin layer of logic over a remote DUT in wifitests."""
IW_LINK_KEY_DTIM_PERIOD = 'dtim period'
def board(self):
"""@return string self reported board of this device."""
if self._board is None:
# Remove 'board:' prefix.
self._board =':')[1]
return self._board
def machine_id(self):
"""@return string unique to a particular board/cpu configuration."""
if self._machine_id:
return self._machine_id
uname_result ='uname -m', ignore_status=True)
kernel_arch = ''
if not uname_result.exit_status and uname_result.stdout.find(' ') < 0:
kernel_arch = uname_result.stdout.strip()
cpu_info ='cat /proc/cpuinfo').stdout.splitlines()
cpu_count = len(filter(lambda x: x.lower().startswith('bogomips'),
cpu_count_str = ''
if cpu_count:
cpu_count_str = 'x%d' % cpu_count
ghz_value = ''
ghz_pattern = re.compile('([0-9.]+GHz)')
for line in cpu_info:
match =
if match is not None:
ghz_value = '_' +
return '%s_%s%s%s' % (self.board, kernel_arch, ghz_value, cpu_count_str)
def powersave_on(self):
"""@return bool True iff WiFi powersave mode is enabled."""
result ="iw dev %s get power_save" % self.wifi_if)
output = result.stdout.rstrip() # NB: chop \n
# Output should be either "Power save: on" or "Power save: off".
find_re = re.compile('([^:]+):\s+(\w+)')
find_results = find_re.match(output)
if not find_results:
raise error.TestFail('Failed to find power_save parameter '
'in iw results.')
return == 'on'
def capabilities(self):
"""@return list of WiFi capabilities as parsed by LinuxSystem."""
return self._capabilities
def host(self):
"""@return host object representing the remote DUT."""
return self._host
def shill(self):
"""@return shill RPCProxy object."""
return self._shill_proxy
def client(self):
"""Deprecated accessor for the client host.
The term client is used very loosely in old autotests and this
accessor should not be used in new code. Use host() instead.
@return host object representing a remote DUT.
return self._host
def command_ifconfig(self):
"""@return string path to ifconfig command."""
return self._command_ifconfig
def command_ip(self):
"""@return string path to ip command."""
return self._command_ip
def command_iptables(self):
"""@return string path to iptables command."""
return self._command_iptables
def command_iw(self):
"""@return string path to iw command."""
return self._command_iw
def command_netperf(self):
"""@return string path to netperf command."""
return self._command_netperf
def command_netserv(self):
"""@return string path to netserv command."""
return self._command_netserv
def command_ping6(self):
"""@return string path to ping6 command."""
return self._command_ping6
def command_wpa_cli(self):
"""@return string path to wpa_cli command."""
return self._command_wpa_cli
def wifi_if(self):
"""@return string wifi device on machine (e.g. mlan0)."""
return self._wifi_if
def wifi_mac(self):
"""@return string MAC address of self.wifi_if."""
return self._interface.mac_address
def wifi_ip(self):
"""@return string IPv4 address of self.wifi_if."""
return self._interface.ipv4_address
def wifi_signal_level(self):
"""Returns the signal level of this DUT's WiFi interface.
@return int signal level of connected WiFi interface or None (e.g. -67).
return self._interface.signal_level
def wifi_noise_level(self, frequency_mhz):
"""Returns the noise level of this DUT's WiFi interface.
@param frequency_mhz: frequency at which the noise level should be
measured and reported.
@return int signal level of connected WiFi interface in dBm (e.g. -67)
or None if the value is unavailable.
return self._interface.noise_level(frequency_mhz)
def __init__(self, client_host, result_dir):
Construct a WiFiClient.
@param client_host host object representing a remote host.
@param result_dir string directory to store test logs/packet caps.
super(WiFiClient, self).__init__()
self._board = None
self._command_ip = 'ip'
self._command_iptables = 'iptables'
self._command_iw = 'iw'
self._command_netperf = 'netperf'
self._command_netserv = 'netserver'
self._command_ping6 = 'ping6'
self._command_wpa_cli = 'wpa_cli'
self._host = client_host
self._machine_id = None
self._ping_runner = ping_runner.PingRunner(
self._ping_thread = None
self._result_dir = result_dir
# Look up the WiFi device (and its MAC) on the client.
devs = wifi_test_utils.get_wlan_devs(, self.command_iw)
if not devs:
raise error.TestFail('No wlan devices found on %s.' %
if len(devs) > 1:
logging.warning('Warning, found multiple WiFi devices on %s: %r',, devs)
self._wifi_if = devs[0]
self._interface = interface.Interface(self._wifi_if,
if isinstance(, adb_host.ADBHost):
self._shill_proxy = wpa_cli_proxy.WpaCliProxy(, self._wifi_if)
# Make sure the client library is on the device so that the proxy
# code is there when we try to call it.
client_at = autotest.Autotest(
# Start up the XMLRPC proxy on the client
self._shill_proxy =
# These commands aren't known to work with ADB hosts.
self._command_ifconfig = 'ifconfig'
# Used for packet captures.
self._packet_capturer = packet_capturer.get_packet_capturer(, host_description='client', ignore_failures=True)
self._result_dir = result_dir
self._firewall_rules = []
# Turn off powersave mode by default.
# It is tempting to make WiFiClient a type of LinuxSystem, but most of
# the functionality there only makes sense for systems that want to
# manage their own WiFi interfaces. On client devices however, shill
# does that work.
system = site_linux_system.LinuxSystem(, {}, 'client')
self._capabilities = system.capabilities
def _raise_logging_level(self):
"""Raises logging levels for WiFi on DUT."""'wpa_debug excessive')'ff_debug --level -5')'ff_debug +wifi')
def close(self):
"""Tear down state associated with the client."""
if self._ping_thread is not None:
# This kills the RPC server.
logging.debug('Cleaning up host object for client')
def ping(self, ping_config):
"""Ping an address from the client and return the command output.
@param ping_config parameters for the ping command.
@return a PingResult object.
"""'Pinging from the client.')
def ping_bg(self, ping_ip, ping_args):
"""Ping an address from the client in the background.
Only one instance of a background ping is supported at a time.
@param ping_ip string IPv4 address for the client to ping.
@param ping_args dict of parameters understood by
if self._ping_thread is not None:
raise error.TestFail('Tried to start a background ping without '
'stopping an earlier ping.')
cmd = '%s %s %s' % (self.COMMAND_PING,
self._ping_thread = remote_command.Command(, cmd, pkill_argument=self.COMMAND_PING)
def ping_bg_stop(self):
"""Stop pinging an address from the client in the background.
Clean up state from a previous call to ping_bg. If requested,
statistics from the background ping run may be saved.
if self._ping_thread is None:'Tried to stop a bg ping, but none was started')
# Sending SIGINT gives us stats at the end, how nice.
self._ping_thread = None
def firewall_open(self, proto, src):
"""Opens up firewall to run netperf tests.
By default, we have a firewall rule for NFQUEUE (see
In order to run netperf test, we need to add a new firewall rule BEFORE
this NFQUEUE rule in the INPUT chain.
@param proto a string, test traffic protocol, e.g. udp, tcp.
@param src a string, subnet/mask.
@return a string firewall rule added.
rule = 'INPUT -s %s/32 -p %s -m %s -j ACCEPT' % (src, proto, proto)'%s -I %s' % (self._command_iptables, rule))
return rule
def firewall_cleanup(self):
"""Cleans up firewall rules."""
for rule in self._firewall_rules:'%s -D %s' % (self._command_iptables, rule))
self._firewall_rules = []
def start_capture(self, snaplen=None):
"""Start a packet capture.
Attempt to start a host based OTA capture. If the driver stack does
not support creating monitor interfaces, fall back to managed interface
packet capture. Only one ongoing packet capture is supported at a time.
@param snaplen int number of byte to retain per captured frame.
devname = self._packet_capturer.create_managed_monitor(self.wifi_if)
if devname is None:
logging.warning('Failure creating monitor interface; doing '
'managed packet capture instead.')
devname = self.wifi_if
self._packet_capturer.start_capture(devname, self._result_dir,
def stop_capture(self):
"""Stop a packet capture and copy over the results."""
def sync_host_times(self):
"""Set time on our DUT to match local time."""
epoch_seconds = time.time()
def check_iw_link_value(self, iw_link_key, desired_value):
"""Assert that the current wireless link property is |desired_value|.
@param iw_link_key string one of IW_LINK_KEY_* defined above.
@param desired_value string desired value of iw link property.
result ='%s dev %s link' % (self.command_iw,
find_re = re.compile('\s*%s:\s*(.*\S)\s*$' % iw_link_key)
find_results = filter(bool, map(find_re.match,
if not find_results:
raise error.TestFail('Could not find iw link property %s.' %
actual_value = find_results[0].group(1)
desired_value = str(desired_value)
if actual_value != str(desired_value):
raise error.TestFail('Wanted iw link property %s value %s, but '
'got %s instead.' % (iw_link_key,
actual_value))'Found iw link key %s with value %s.',
iw_link_key, actual_value)
def powersave_switch(self, turn_on):
"""Toggle powersave mode for the DUT.
@param turn_on bool True iff powersave mode should be turned on.
mode = 'off'
if turn_on:
mode = 'on''iw dev %s set power_save %s' % (self.wifi_if, mode))
def scan(self, frequencies, ssids, timeout_seconds=10):
"""Request a scan and check that certain SSIDs appear in the results.
This method will retry for a default of |timeout_seconds| until it is
able to successfully kick off a scan. Sometimes, the device on the DUT
claims to be busy and rejects our requests.
@param frequencies list of int WiFi frequencies to scan for.
@param ssids list of string ssids to probe request for.
@param timeout_seconds: float number of seconds to retry scanning
if the interface is busy. This does not retry if certain
SSIDs are missing from the results.
runner = iw_runner.IwRunner(,
start_time = time.time()
while time.time() - start_time < timeout_seconds:
bss_list = runner.scan(self.wifi_if, frequencies=frequencies,
if bss_list is not None:
raise error.TestFail('Unable to trigger scan on client.')
for ssid in ssids:
if not ssid:
for bss in bss_list:
if bss.ssid == ssid:
raise error.TestFail('SSID %s is not in scan results: %r' %
(ssid, bss_list))
def configure_bgscan(self, configuration):
"""Control wpa_supplicant bgscan.
@param configuration BgscanConfiguration describes a configuration.
configuration.interface = self.wifi_if
if not self._shill_proxy.configure_bgscan(configuration):
raise error.TestError('Background scan configuration failed.')'bgscan configured.')
def disable_bgscan(self):
"""Disable wpa_supplicant bgscan."""
params = xmlrpc_datatypes.BgscanConfiguration()
params.interface = self.wifi_if
params.method = xmlrpc_datatypes.BgscanConfiguration.SCAN_METHOD_NONE
def enable_bgscan(self):
"""Enable wpa_supplicant bgscan."""
klass = xmlrpc_datatypes.BgscanConfiguration
params = xmlrpc_datatypes.BgscanConfiguration(
def wait_for_service_states(self, ssid, states, timeout_seconds):
"""Waits for a WiFi service to achieve one of |states|.
@param ssid string name of network being queried
@param states tuple list of states for which the caller is waiting
@param timeout_seconds int seconds to wait for a state in |states|
"""'Waiting for %s to reach one of %r...', ssid, states)
success, state, time = self._shill_proxy.wait_for_service_states(
ssid, states, timeout_seconds)'...ended up in state \'%s\' (%s) after %f seconds.',
state, 'success' if success else 'failure', time)
return success, state, time
def do_suspend(self, seconds):
"""Puts the DUT in suspend power state for |seconds| seconds.
@param seconds: The number of seconds to suspend the device.
"""'Suspending DUT for %d seconds...', seconds)
self._shill_proxy.do_suspend(seconds)'...done suspending')
def do_suspend_bg(self, seconds):
"""Suspend DUT using the power manager - non-blocking.
@param seconds: The number of seconds to suspend the device.
"""'Suspending DUT (in background) for %d seconds...',
def clear_supplicant_blacklist(self):
"""Clear's the AP blacklist on the DUT.
@return stdout and stderror returns passed from
stdoutdata, stderrdata = self._shill_proxy.clear_supplicant_blacklist()'wpa_cli blacklist clear: out:%r err:%r', stdoutdata,
return stdoutdata, stderrdata
def get_roam_threshold(self, wifi_interace):
"""Get wpa_supplicant's roaming theshold for the specified interface.
@param wifi_interface: string name of the wifi_interface.
@return integer roam threshold or False if something went wrong.
return self._shill_proxy.get_roam_threshold(wifi_interace)
def set_roam_threshold(self, wifi_interace, value):
"""Set wpa_supplicant's roaming theshold for the specified interface.
@param wifi_interace: string name of the wifi_interface.
@param value: integer to which to set the roam_threshold
@return True if it worked; False, otherwise
return self._shill_proxy.set_roam_threshold(wifi_interace, value)