| # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import logging |
| import re |
| import signal |
| import time |
| |
| from autotest_lib.client.common_lib import error |
| from autotest_lib.client.common_lib.cros.network import interface |
| from autotest_lib.client.common_lib.cros.network import iw_runner |
| from autotest_lib.client.common_lib.cros.network import ping_runner |
| from autotest_lib.client.common_lib.cros.network import xmlrpc_datatypes |
| from autotest_lib.client.cros import constants |
| from autotest_lib.server import autotest |
| from autotest_lib.server import site_linux_system |
| from autotest_lib.server.cros import remote_command |
| from autotest_lib.server.cros import wifi_test_utils |
| from autotest_lib.server.cros.network import packet_capturer |
| from autotest_lib.server.cros.network import wpa_cli_proxy |
| from autotest_lib.server.hosts import adb_host |
| |
| |
| class WiFiClient(object): |
| """WiFiClient is a thin layer of logic over a remote DUT in wifitests.""" |
| |
| XMLRPC_BRINGUP_TIMEOUT_SECONDS = 60 |
| |
| IW_LINK_KEY_BEACON_INTERVAL = 'beacon int' |
| IW_LINK_KEY_DTIM_PERIOD = 'dtim period' |
| IW_LINK_KEY_FREQUENCY = 'freq' |
| |
| DEFAULT_PING_COUNT = 10 |
| COMMAND_PING = 'ping' |
| |
| UNKNOWN_BOARD_TYPE = 'unknown' |
| |
| |
| @property |
| def board(self): |
| """@return string self reported board of this device.""" |
| if self._board is None: |
| # Remove 'board:' prefix. |
| self._board = self.host.get_board().split(':')[1] |
| return self._board |
| |
| |
| @property |
| def machine_id(self): |
| """@return string unique to a particular board/cpu configuration.""" |
| if self._machine_id: |
| return self._machine_id |
| |
| uname_result = self.host.run('uname -m', ignore_status=True) |
| kernel_arch = '' |
| if not uname_result.exit_status and uname_result.stdout.find(' ') < 0: |
| kernel_arch = uname_result.stdout.strip() |
| cpu_info = self.host.run('cat /proc/cpuinfo').stdout.splitlines() |
| cpu_count = len(filter(lambda x: x.lower().startswith('bogomips'), |
| cpu_info)) |
| cpu_count_str = '' |
| if cpu_count: |
| cpu_count_str = 'x%d' % cpu_count |
| ghz_value = '' |
| ghz_pattern = re.compile('([0-9.]+GHz)') |
| for line in cpu_info: |
| match = ghz_pattern.search(line) |
| if match is not None: |
| ghz_value = '_' + match.group(1) |
| break |
| |
| return '%s_%s%s%s' % (self.board, kernel_arch, ghz_value, cpu_count_str) |
| |
| |
| @property |
| def powersave_on(self): |
| """@return bool True iff WiFi powersave mode is enabled.""" |
| result = self.host.run("iw dev %s get power_save" % self.wifi_if) |
| output = result.stdout.rstrip() # NB: chop \n |
| # Output should be either "Power save: on" or "Power save: off". |
| find_re = re.compile('([^:]+):\s+(\w+)') |
| find_results = find_re.match(output) |
| if not find_results: |
| raise error.TestFail('Failed to find power_save parameter ' |
| 'in iw results.') |
| |
| return find_results.group(2) == 'on' |
| |
| |
| @property |
| def capabilities(self): |
| """@return list of WiFi capabilities as parsed by LinuxSystem.""" |
| return self._capabilities |
| |
| |
| @property |
| def host(self): |
| """@return host object representing the remote DUT.""" |
| return self._host |
| |
| |
| @property |
| def shill(self): |
| """@return shill RPCProxy object.""" |
| return self._shill_proxy |
| |
| |
| @property |
| def client(self): |
| """Deprecated accessor for the client host. |
| |
| The term client is used very loosely in old autotests and this |
| accessor should not be used in new code. Use host() instead. |
| |
| @return host object representing a remote DUT. |
| |
| """ |
| return self._host |
| |
| |
| @property |
| def command_ifconfig(self): |
| """@return string path to ifconfig command.""" |
| return self._command_ifconfig |
| |
| |
| @property |
| def command_ip(self): |
| """@return string path to ip command.""" |
| return self._command_ip |
| |
| |
| @property |
| def command_iptables(self): |
| """@return string path to iptables command.""" |
| return self._command_iptables |
| |
| |
| @property |
| def command_iw(self): |
| """@return string path to iw command.""" |
| return self._command_iw |
| |
| |
| @property |
| def command_netperf(self): |
| """@return string path to netperf command.""" |
| return self._command_netperf |
| |
| |
| @property |
| def command_netserv(self): |
| """@return string path to netserv command.""" |
| return self._command_netserv |
| |
| |
| @property |
| def command_ping6(self): |
| """@return string path to ping6 command.""" |
| return self._command_ping6 |
| |
| |
| @property |
| def command_wpa_cli(self): |
| """@return string path to wpa_cli command.""" |
| return self._command_wpa_cli |
| |
| |
| @property |
| def wifi_if(self): |
| """@return string wifi device on machine (e.g. mlan0).""" |
| return self._wifi_if |
| |
| |
| @property |
| def wifi_mac(self): |
| """@return string MAC address of self.wifi_if.""" |
| return self._interface.mac_address |
| |
| |
| @property |
| def wifi_ip(self): |
| """@return string IPv4 address of self.wifi_if.""" |
| return self._interface.ipv4_address |
| |
| |
| @property |
| def wifi_signal_level(self): |
| """Returns the signal level of this DUT's WiFi interface. |
| |
| @return int signal level of connected WiFi interface or None (e.g. -67). |
| |
| """ |
| return self._interface.signal_level |
| |
| |
| def wifi_noise_level(self, frequency_mhz): |
| """Returns the noise level of this DUT's WiFi interface. |
| |
| @param frequency_mhz: frequency at which the noise level should be |
| measured and reported. |
| @return int signal level of connected WiFi interface in dBm (e.g. -67) |
| or None if the value is unavailable. |
| |
| """ |
| return self._interface.noise_level(frequency_mhz) |
| |
| |
| def __init__(self, client_host, result_dir): |
| """ |
| Construct a WiFiClient. |
| |
| @param client_host host object representing a remote host. |
| @param result_dir string directory to store test logs/packet caps. |
| |
| """ |
| super(WiFiClient, self).__init__() |
| self._board = None |
| self._command_ip = 'ip' |
| self._command_iptables = 'iptables' |
| self._command_iw = 'iw' |
| self._command_netperf = 'netperf' |
| self._command_netserv = 'netserver' |
| self._command_ping6 = 'ping6' |
| self._command_wpa_cli = 'wpa_cli' |
| self._host = client_host |
| self._machine_id = None |
| self._ping_runner = ping_runner.PingRunner(host=self.host) |
| self._ping_thread = None |
| self._result_dir = result_dir |
| # Look up the WiFi device (and its MAC) on the client. |
| devs = wifi_test_utils.get_wlan_devs(self.host, self.command_iw) |
| if not devs: |
| raise error.TestFail('No wlan devices found on %s.' % |
| self.host.hostname) |
| |
| if len(devs) > 1: |
| logging.warning('Warning, found multiple WiFi devices on %s: %r', |
| self.host.hostname, devs) |
| self._wifi_if = devs[0] |
| self._interface = interface.Interface(self._wifi_if, host=self.host) |
| if isinstance(self.host, adb_host.ADBHost): |
| self._shill_proxy = wpa_cli_proxy.WpaCliProxy( |
| self.host, self._wifi_if) |
| else: |
| # Make sure the client library is on the device so that the proxy |
| # code is there when we try to call it. |
| client_at = autotest.Autotest(self.host) |
| client_at.install() |
| # Start up the XMLRPC proxy on the client |
| self._shill_proxy = self.host.xmlrpc_connect( |
| constants.SHILL_XMLRPC_SERVER_COMMAND, |
| constants.SHILL_XMLRPC_SERVER_PORT, |
| command_name=constants.SHILL_XMLRPC_SERVER_CLEANUP_PATTERN, |
| ready_test_name=constants.SHILL_XMLRPC_SERVER_READY_METHOD, |
| timeout_seconds=self.XMLRPC_BRINGUP_TIMEOUT_SECONDS) |
| # These commands aren't known to work with ADB hosts. |
| self._command_ifconfig = 'ifconfig' |
| self._raise_logging_level() |
| # Used for packet captures. |
| self._packet_capturer = packet_capturer.get_packet_capturer( |
| self.host, host_description='client', ignore_failures=True) |
| self._result_dir = result_dir |
| |
| self._firewall_rules = [] |
| # Turn off powersave mode by default. |
| self.powersave_switch(False) |
| # It is tempting to make WiFiClient a type of LinuxSystem, but most of |
| # the functionality there only makes sense for systems that want to |
| # manage their own WiFi interfaces. On client devices however, shill |
| # does that work. |
| system = site_linux_system.LinuxSystem(self.host, {}, 'client') |
| self._capabilities = system.capabilities |
| |
| |
| def _raise_logging_level(self): |
| """Raises logging levels for WiFi on DUT.""" |
| self.host.run('wpa_debug excessive') |
| self.host.run('ff_debug --level -5') |
| self.host.run('ff_debug +wifi') |
| |
| |
| def close(self): |
| """Tear down state associated with the client.""" |
| if self._ping_thread is not None: |
| self.ping_bg_stop() |
| self.stop_capture() |
| self.powersave_switch(False) |
| self.shill.clean_profiles() |
| # This kills the RPC server. |
| logging.debug('Cleaning up host object for client') |
| self._host.close() |
| |
| |
| def ping(self, ping_config): |
| """Ping an address from the client and return the command output. |
| |
| @param ping_config parameters for the ping command. |
| @return a PingResult object. |
| |
| """ |
| logging.info('Pinging from the client.') |
| return self._ping_runner.ping(ping_config) |
| |
| |
| def ping_bg(self, ping_ip, ping_args): |
| """Ping an address from the client in the background. |
| |
| Only one instance of a background ping is supported at a time. |
| |
| @param ping_ip string IPv4 address for the client to ping. |
| @param ping_args dict of parameters understood by |
| wifi_test_utils.ping_args(). |
| |
| """ |
| if self._ping_thread is not None: |
| raise error.TestFail('Tried to start a background ping without ' |
| 'stopping an earlier ping.') |
| cmd = '%s %s %s' % (self.COMMAND_PING, |
| wifi_test_utils.ping_args(ping_args), |
| ping_ip) |
| self._ping_thread = remote_command.Command( |
| self.host, cmd, pkill_argument=self.COMMAND_PING) |
| |
| |
| def ping_bg_stop(self): |
| """Stop pinging an address from the client in the background. |
| |
| Clean up state from a previous call to ping_bg. If requested, |
| statistics from the background ping run may be saved. |
| |
| """ |
| if self._ping_thread is None: |
| logging.info('Tried to stop a bg ping, but none was started') |
| return |
| # Sending SIGINT gives us stats at the end, how nice. |
| self._ping_thread.join(signal.SIGINT) |
| self._ping_thread = None |
| |
| |
| def firewall_open(self, proto, src): |
| """Opens up firewall to run netperf tests. |
| |
| By default, we have a firewall rule for NFQUEUE (see crbug.com/220736). |
| In order to run netperf test, we need to add a new firewall rule BEFORE |
| this NFQUEUE rule in the INPUT chain. |
| |
| @param proto a string, test traffic protocol, e.g. udp, tcp. |
| @param src a string, subnet/mask. |
| |
| @return a string firewall rule added. |
| |
| """ |
| rule = 'INPUT -s %s/32 -p %s -m %s -j ACCEPT' % (src, proto, proto) |
| self.host.run('%s -I %s' % (self._command_iptables, rule)) |
| self._firewall_rules.append(rule) |
| return rule |
| |
| |
| def firewall_cleanup(self): |
| """Cleans up firewall rules.""" |
| for rule in self._firewall_rules: |
| self.host.run('%s -D %s' % (self._command_iptables, rule)) |
| self._firewall_rules = [] |
| |
| |
| def start_capture(self, snaplen=None): |
| """Start a packet capture. |
| |
| Attempt to start a host based OTA capture. If the driver stack does |
| not support creating monitor interfaces, fall back to managed interface |
| packet capture. Only one ongoing packet capture is supported at a time. |
| |
| @param snaplen int number of byte to retain per captured frame. |
| |
| """ |
| self.stop_capture() |
| devname = self._packet_capturer.create_managed_monitor(self.wifi_if) |
| if devname is None: |
| logging.warning('Failure creating monitor interface; doing ' |
| 'managed packet capture instead.') |
| devname = self.wifi_if |
| self._packet_capturer.start_capture(devname, self._result_dir, |
| snaplen=snaplen) |
| |
| |
| def stop_capture(self): |
| """Stop a packet capture and copy over the results.""" |
| self._packet_capturer.close() |
| |
| |
| def sync_host_times(self): |
| """Set time on our DUT to match local time.""" |
| epoch_seconds = time.time() |
| self.shill.sync_time_to(epoch_seconds) |
| |
| |
| def check_iw_link_value(self, iw_link_key, desired_value): |
| """Assert that the current wireless link property is |desired_value|. |
| |
| @param iw_link_key string one of IW_LINK_KEY_* defined above. |
| @param desired_value string desired value of iw link property. |
| |
| """ |
| result = self.host.run('%s dev %s link' % (self.command_iw, |
| self.wifi_if)) |
| find_re = re.compile('\s*%s:\s*(.*\S)\s*$' % iw_link_key) |
| find_results = filter(bool, map(find_re.match, |
| result.stdout.splitlines())) |
| if not find_results: |
| raise error.TestFail('Could not find iw link property %s.' % |
| iw_link_key) |
| |
| actual_value = find_results[0].group(1) |
| desired_value = str(desired_value) |
| if actual_value != str(desired_value): |
| raise error.TestFail('Wanted iw link property %s value %s, but ' |
| 'got %s instead.' % (iw_link_key, |
| desired_value, |
| actual_value)) |
| |
| logging.info('Found iw link key %s with value %s.', |
| iw_link_key, actual_value) |
| |
| |
| def powersave_switch(self, turn_on): |
| """Toggle powersave mode for the DUT. |
| |
| @param turn_on bool True iff powersave mode should be turned on. |
| |
| """ |
| mode = 'off' |
| if turn_on: |
| mode = 'on' |
| self.host.run('iw dev %s set power_save %s' % (self.wifi_if, mode)) |
| |
| |
| def scan(self, frequencies, ssids, timeout_seconds=10): |
| """Request a scan and check that certain SSIDs appear in the results. |
| |
| This method will retry for a default of |timeout_seconds| until it is |
| able to successfully kick off a scan. Sometimes, the device on the DUT |
| claims to be busy and rejects our requests. |
| |
| @param frequencies list of int WiFi frequencies to scan for. |
| @param ssids list of string ssids to probe request for. |
| @param timeout_seconds: float number of seconds to retry scanning |
| if the interface is busy. This does not retry if certain |
| SSIDs are missing from the results. |
| |
| """ |
| runner = iw_runner.IwRunner(remote_host=self.host, |
| command_iw=self.command_iw) |
| start_time = time.time() |
| while time.time() - start_time < timeout_seconds: |
| bss_list = runner.scan(self.wifi_if, frequencies=frequencies, |
| ssids=ssids) |
| if bss_list is not None: |
| break |
| |
| time.sleep(0.5) |
| else: |
| raise error.TestFail('Unable to trigger scan on client.') |
| |
| for ssid in ssids: |
| if not ssid: |
| continue |
| |
| for bss in bss_list: |
| if bss.ssid == ssid: |
| break |
| |
| else: |
| raise error.TestFail('SSID %s is not in scan results: %r' % |
| (ssid, bss_list)) |
| |
| |
| def configure_bgscan(self, configuration): |
| """Control wpa_supplicant bgscan. |
| |
| @param configuration BgscanConfiguration describes a configuration. |
| |
| """ |
| configuration.interface = self.wifi_if |
| if not self._shill_proxy.configure_bgscan(configuration): |
| raise error.TestError('Background scan configuration failed.') |
| |
| logging.info('bgscan configured.') |
| |
| |
| def disable_bgscan(self): |
| """Disable wpa_supplicant bgscan.""" |
| params = xmlrpc_datatypes.BgscanConfiguration() |
| params.interface = self.wifi_if |
| params.method = xmlrpc_datatypes.BgscanConfiguration.SCAN_METHOD_NONE |
| self.configure_bgscan(params) |
| |
| |
| def enable_bgscan(self): |
| """Enable wpa_supplicant bgscan.""" |
| klass = xmlrpc_datatypes.BgscanConfiguration |
| params = xmlrpc_datatypes.BgscanConfiguration( |
| interface=self.wifi_if, |
| method=klass.SCAN_METHOD_DEFAULT, |
| short_interval=klass.DEFAULT_SHORT_INTERVAL_SECONDS, |
| long_interval=klass.DEFAULT_LONG_INTERVAL_SECONDS) |
| self.configure_bgscan(params) |
| |
| |
| def wait_for_service_states(self, ssid, states, timeout_seconds): |
| """Waits for a WiFi service to achieve one of |states|. |
| |
| @param ssid string name of network being queried |
| @param states tuple list of states for which the caller is waiting |
| @param timeout_seconds int seconds to wait for a state in |states| |
| |
| """ |
| logging.info('Waiting for %s to reach one of %r...', ssid, states) |
| success, state, time = self._shill_proxy.wait_for_service_states( |
| ssid, states, timeout_seconds) |
| logging.info('...ended up in state \'%s\' (%s) after %f seconds.', |
| state, 'success' if success else 'failure', time) |
| return success, state, time |
| |
| |
| def do_suspend(self, seconds): |
| """Puts the DUT in suspend power state for |seconds| seconds. |
| |
| @param seconds: The number of seconds to suspend the device. |
| |
| """ |
| logging.info('Suspending DUT for %d seconds...', seconds) |
| self._shill_proxy.do_suspend(seconds) |
| logging.info('...done suspending') |
| |
| |
| def do_suspend_bg(self, seconds): |
| """Suspend DUT using the power manager - non-blocking. |
| |
| @param seconds: The number of seconds to suspend the device. |
| |
| """ |
| logging.info('Suspending DUT (in background) for %d seconds...', |
| seconds) |
| self._shill_proxy.do_suspend_bg(seconds) |
| |
| |
| def clear_supplicant_blacklist(self): |
| """Clear's the AP blacklist on the DUT. |
| |
| @return stdout and stderror returns passed from |
| self._shill_proxy.clear_supplicant_blacklist() |
| |
| """ |
| stdoutdata, stderrdata = self._shill_proxy.clear_supplicant_blacklist() |
| logging.info('wpa_cli blacklist clear: out:%r err:%r', stdoutdata, |
| stderrdata) |
| return stdoutdata, stderrdata |
| |
| |
| def get_roam_threshold(self, wifi_interace): |
| """Get wpa_supplicant's roaming theshold for the specified interface. |
| |
| @param wifi_interface: string name of the wifi_interface. |
| @return integer roam threshold or False if something went wrong. |
| |
| """ |
| return self._shill_proxy.get_roam_threshold(wifi_interace) |
| |
| |
| def set_roam_threshold(self, wifi_interace, value): |
| """Set wpa_supplicant's roaming theshold for the specified interface. |
| |
| @param wifi_interace: string name of the wifi_interface. |
| @param value: integer to which to set the roam_threshold |
| @return True if it worked; False, otherwise |
| |
| """ |
| return self._shill_proxy.set_roam_threshold(wifi_interace, value) |