| # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import logging |
| |
| from autotest_lib.client.common_lib import error |
| from autotest_lib.client.common_lib.cros.network import ping_runner |
| from autotest_lib.client.common_lib.cros.network import xmlrpc_datatypes |
| from autotest_lib.server import hosts |
| from autotest_lib.server import site_linux_router |
| from autotest_lib.server import site_linux_system |
| from autotest_lib.server.cros import dnsname_mangler |
| from autotest_lib.server.cros.network import attenuator_controller |
| from autotest_lib.server.cros.network import wifi_client |
| |
| class WiFiTestContextManager(object): |
| """A context manager for state used in WiFi autotests. |
| |
| Some of the building blocks we use in WiFi tests need to be cleaned up |
| after use. For instance, we start an XMLRPC server on the client |
| which should be shut down so that the next test can start its instance. |
| It is convenient to manage this setup and teardown through a context |
| manager rather than building it into the test class logic. |
| |
| """ |
| CMDLINE_ATTEN_ADDR = 'atten_addr' |
| CMDLINE_CLIENT_PACKET_CAPTURES = 'client_capture' |
| CMDLINE_CONDUCTIVE_RIG = 'conductive_rig' |
| CMDLINE_PACKET_CAPTURE_SNAPLEN = 'capture_snaplen' |
| CMDLINE_ROUTER_ADDR = 'router_addr' |
| CMDLINE_PCAP_ADDR = 'pcap_addr' |
| CMDLINE_PACKET_CAPTURES = 'packet_capture' |
| CMDLINE_USE_WPA_CLI = 'use_wpa_cli' |
| |
| |
| @property |
| def attenuator(self): |
| """@return attenuator object (e.g. a BeagleBone).""" |
| if self._attenuator is None: |
| raise error.TestNAError('No attenuator available in this setup.') |
| |
| return self._attenuator |
| |
| |
| @property |
| def client(self): |
| """@return WiFiClient object abstracting the DUT.""" |
| return self._client_proxy |
| |
| |
| @property |
| def router(self): |
| """@return router object (e.g. a LinuxCrosRouter).""" |
| return self._router |
| |
| |
| @property |
| def pcap_host(self): |
| """@return Dedicated packet capture host or None.""" |
| return self._pcap_host |
| |
| |
| @property |
| def capture_host(self): |
| """@return Dedicated pcap_host or the router itself.""" |
| return self.pcap_host or self.router |
| |
| |
| def __init__(self, test_name, host, cmdline_args, debug_dir): |
| """Construct a WiFiTestContextManager. |
| |
| Optionally can pull addresses of the server address, router address, |
| or router port from cmdline_args. |
| |
| @param test_name string descriptive name for this test. |
| @param host host object representing the DUT. |
| @param cmdline_args dict of key, value settings from command line. |
| |
| """ |
| super(WiFiTestContextManager, self).__init__() |
| self._test_name = test_name |
| self._cmdline_args = cmdline_args.copy() |
| self._client_proxy = wifi_client.WiFiClient( |
| host, debug_dir, |
| self._get_bool_cmdline_value(self.CMDLINE_USE_WPA_CLI, False)) |
| self._attenuator = None |
| self._router = None |
| self._pcap_host = None |
| self._enable_client_packet_captures = False |
| self._enable_packet_captures = False |
| self._packet_capture_snaplen = None |
| |
| |
| def __enter__(self): |
| self.setup() |
| return self |
| |
| |
| def __exit__(self, exc_type, exc_value, traceback): |
| self.teardown() |
| |
| |
| def _get_bool_cmdline_value(self, key, default_value): |
| """Returns a bool value for the given key from the cmdline args. |
| |
| @param key string cmdline args key. |
| @param default_value value to return if the key is not specified in the |
| cmdline args. |
| |
| @return True/False or default_value if key is not specified in the |
| cmdline args. |
| |
| """ |
| if key in self._cmdline_args: |
| value = self._cmdline_args[key].lower() |
| if value in ('1', 'true', 'yes', 'y'): |
| return True |
| else: |
| return False |
| else: |
| return default_value |
| |
| |
| def get_wifi_addr(self, ap_num=0): |
| """Return an IPv4 address pingable by the client on the WiFi subnet. |
| |
| @param ap_num int number of AP. Only used in stumpy cells. |
| @return string IPv4 address. |
| |
| """ |
| return self.router.local_server_address(ap_num) |
| |
| |
| def get_wifi_if(self, ap_num=0): |
| """Returns the interface name for the IP address of self.get_wifi_addr. |
| |
| @param ap_num int number of AP. Only used in stumpy cells. |
| @return string interface name "e.g. wlan0". |
| |
| """ |
| return self.router.get_hostapd_interface(ap_num) |
| |
| |
| def get_wifi_host(self): |
| """@return host object representing a pingable machine.""" |
| return self.router.host |
| |
| |
| def configure(self, ap_config, multi_interface=None, is_ibss=None, |
| configure_pcap=False): |
| """Configure a router with the given config. |
| |
| Configures an AP according to the specified config and |
| enables whatever packet captures are appropriate. Will deconfigure |
| existing APs unless |multi_interface| is specified. |
| |
| @param ap_config HostapConfig object. |
| @param multi_interface True iff having multiple configured interfaces |
| is expected for this configure call. |
| @param is_ibss True iff this is an IBSS endpoint. |
| @param configure_pcap True iff pcap_host should be configured for this |
| configure call. Raises a TestNAError if |self._pcap_as_router| |
| is False. |
| """ |
| if configure_pcap and not self._pcap_as_router: |
| raise error.TestNAError('pcap was not configured as router.') |
| if not self.client.is_frequency_supported(ap_config.frequency): |
| raise error.TestNAError('DUT does not support frequency: %s' % |
| ap_config.frequency) |
| if ap_config.require_vht: |
| self.client.require_capabilities( |
| [site_linux_system.LinuxSystem.CAPABILITY_VHT]) |
| router = self.router |
| if configure_pcap: |
| router = self.pcap_host |
| ap_config.security_config.install_router_credentials(router.host, |
| router.logdir) |
| if is_ibss: |
| if multi_interface: |
| raise error.TestFail('IBSS mode does not support multiple ' |
| 'interfaces.') |
| if not self.client.is_ibss_supported(): |
| raise error.TestNAError('DUT does not support IBSS mode') |
| router.ibss_configure(ap_config) |
| else: |
| router.hostap_configure(ap_config, multi_interface=multi_interface) |
| if self._enable_client_packet_captures: |
| self.client.start_capture(ap_config.frequency, |
| snaplen=self._packet_capture_snaplen) |
| if self._enable_packet_captures: |
| self.capture_host.start_capture(ap_config.frequency, |
| width_type=ap_config.packet_capture_mode, |
| snaplen=self._packet_capture_snaplen) |
| |
| |
| def _setup_router(self): |
| """Set up the router device.""" |
| self._router = site_linux_router.build_router_proxy( |
| test_name=self._test_name, |
| client_hostname=self.client.host.hostname, |
| router_addr=self._cmdline_args.get( |
| self.CMDLINE_ROUTER_ADDR, None)) |
| self.router.sync_host_times() |
| |
| |
| def _setup_pcap(self, pcap_as_router=False): |
| """ |
| Set up the pcap device. |
| |
| @param pcap_as_router optional bool which should be True if the pcap |
| should be configured as a router. |
| """ |
| ping_helper = ping_runner.PingRunner() |
| pcap_addr = dnsname_mangler.get_pcap_addr( |
| self.client.host.hostname, |
| allow_failure=True, |
| cmdline_override=self._cmdline_args.get(self.CMDLINE_PCAP_ADDR, |
| None)) |
| if pcap_addr and ping_helper.simple_ping(pcap_addr): |
| if pcap_as_router: |
| self._pcap_host = site_linux_router.LinuxRouter( |
| hosts.create_host(pcap_addr), |
| role='pcap', |
| test_name=self._test_name) |
| else: |
| self._pcap_host = site_linux_system.LinuxSystem( |
| hosts.create_host(pcap_addr),'pcap') |
| |
| |
| def _setup_attenuator(self): |
| """ |
| Set up the attenuator device. |
| |
| The attenuator host gives us the ability to attenuate particular |
| antennas on the router. Most setups don't have this capability |
| and most tests do not require it. We use this for RvR |
| (network_WiFi_AttenuatedPerf) and some roaming tests. |
| """ |
| ping_helper = ping_runner.PingRunner() |
| attenuator_addr = dnsname_mangler.get_attenuator_addr( |
| self.client.host.hostname, |
| cmdline_override=self._cmdline_args.get( |
| self.CMDLINE_ATTEN_ADDR, None), |
| allow_failure=True) |
| if attenuator_addr and ping_helper.simple_ping(attenuator_addr): |
| self._attenuator = attenuator_controller.AttenuatorController( |
| attenuator_addr) |
| |
| def setup(self, include_router=True, include_pcap=True, |
| include_attenuator=True, pcap_as_router=False): |
| """ |
| Construct the state used in a WiFi test. |
| |
| @param include_router optional bool which should be False if the router |
| is not used by the test |
| @param include_pcap optional bool which should be False if the pcap |
| device is not used by the test |
| @param include_attenuator optional bool which should be False if the |
| attenuator is not used by the test |
| @param pcap_as_router optional bool which should be True if the pcap |
| should be configured as a router. |
| """ |
| if include_router: |
| self._setup_router() |
| if include_pcap: |
| self._setup_pcap(pcap_as_router) |
| self._pcap_as_router = pcap_as_router |
| if include_attenuator: |
| self._setup_attenuator() |
| |
| # Set up a clean context to conduct WiFi tests in. |
| self.client.shill.init_test_network_state() |
| self.client.sync_host_times() |
| |
| if self.CMDLINE_CLIENT_PACKET_CAPTURES in self._cmdline_args: |
| self._enable_client_packet_captures = True |
| if self.CMDLINE_PACKET_CAPTURES in self._cmdline_args: |
| self._enable_packet_captures = True |
| if self.CMDLINE_PACKET_CAPTURE_SNAPLEN in self._cmdline_args: |
| self._packet_capture_snaplen = int( |
| self._cmdline_args[self.CMDLINE_PACKET_CAPTURE_SNAPLEN]) |
| self.client.conductive = self._get_bool_cmdline_value( |
| self.CMDLINE_CONDUCTIVE_RIG, None) |
| |
| |
| def teardown(self): |
| """Teardown the state used in a WiFi test.""" |
| logging.debug('Tearing down the test context.') |
| for system in [self._attenuator, self._client_proxy, |
| self._router, self._pcap_host]: |
| if system is not None: |
| system.close() |
| |
| |
| def assert_connect_wifi(self, wifi_params, description=None): |
| """Connect to a WiFi network and check for success. |
| |
| Connect a DUT to a WiFi network and check that we connect successfully. |
| |
| @param wifi_params AssociationParameters describing network to connect. |
| @param description string Additional text for logging messages. |
| |
| @returns AssociationResult if successful; None if wifi_params |
| contains expect_failure; asserts otherwise. |
| |
| """ |
| if description: |
| connect_name = '%s (%s)' % (wifi_params.ssid, description) |
| else: |
| connect_name = '%s' % wifi_params.ssid |
| logging.info('Connecting to %s.', connect_name) |
| assoc_result = xmlrpc_datatypes.deserialize( |
| self.client.shill.connect_wifi(wifi_params)) |
| logging.info('Finished connection attempt to %s with times: ' |
| 'discovery=%.2f, association=%.2f, configuration=%.2f.', |
| connect_name, |
| assoc_result.discovery_time, |
| assoc_result.association_time, |
| assoc_result.configuration_time) |
| |
| if assoc_result.success and wifi_params.expect_failure: |
| raise error.TestFail( |
| 'Expected connection to %s to fail, but it was successful.' % |
| connect_name) |
| |
| if not assoc_result.success and not wifi_params.expect_failure: |
| raise error.TestFail( |
| 'Expected connection to %s to succeed, ' |
| 'but it failed with reason: %s.' % ( |
| connect_name, assoc_result.failure_reason)) |
| |
| if wifi_params.expect_failure: |
| logging.info('Unable to connect to %s, as intended.', |
| connect_name) |
| return None |
| |
| logging.info('Connected successfully to %s, signal level: %r.', |
| connect_name, self.client.wifi_signal_level) |
| return assoc_result |
| |
| |
| def assert_ping_from_dut(self, ping_config=None, ap_num=None): |
| """Ping a host on the WiFi network from the DUT. |
| |
| Ping a host reachable on the WiFi network from the DUT, and |
| check that the ping is successful. The host we ping depends |
| on the test setup, sometimes that host may be the server and |
| sometimes it will be the router itself. Ping-ability may be |
| used to confirm that a WiFi network is operating correctly. |
| |
| @param ping_config optional PingConfig object to override defaults. |
| @param ap_num int which AP to ping if more than one is configured. |
| |
| """ |
| if ap_num is None: |
| ap_num = 0 |
| if ping_config is None: |
| ping_ip = self.router.get_wifi_ip(ap_num=ap_num) |
| ping_config = ping_runner.PingConfig(ping_ip) |
| self.client.ping(ping_config) |
| |
| |
| def assert_ping_from_server(self, ping_config=None): |
| """Ping the DUT across the WiFi network from the server. |
| |
| Check that the ping is mostly successful and fail the test if it |
| is not. |
| |
| @param ping_config optional PingConfig object to override defaults. |
| |
| """ |
| logging.info('Pinging from server.') |
| if ping_config is None: |
| ping_ip = self.client.wifi_ip |
| ping_config = ping_runner.PingConfig(ping_ip) |
| self.router.ping(ping_config) |
| |
| |
| def wait_for_connection(self, ssid, freq=None, ap_num=None, |
| timeout_seconds=30): |
| """Verifies a connection to network ssid on frequency freq. |
| |
| @param ssid string ssid of the network to check. |
| @param freq int frequency of network to check. |
| @param ap_num int AP to which to connect |
| @param timeout_seconds int number of seconds to wait for |
| connection on the given frequency. |
| |
| @returns a named tuple of (state, time) |
| """ |
| if ap_num is None: |
| ap_num = 0 |
| desired_subnet = self.router.get_wifi_ip_subnet(ap_num) |
| wifi_ip = self.router.get_wifi_ip(ap_num) |
| return self.client.wait_for_connection( |
| ssid, timeout_seconds=timeout_seconds, freq=freq, |
| ping_ip=wifi_ip, desired_subnet=desired_subnet) |