blob: e0feb94f0a5471ac85664258c9f78fa4167e4eee [file] [log] [blame]
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib.cros.network import ping_runner
from autotest_lib.client.common_lib.cros.network import xmlrpc_datatypes
from autotest_lib.server import hosts
from autotest_lib.server import site_linux_router
from autotest_lib.server import site_linux_system
from autotest_lib.server.cros import dnsname_mangler
from autotest_lib.server.cros.network import attenuator_controller
from autotest_lib.server.cros.network import wifi_client
class WiFiTestContextManager(object):
"""A context manager for state used in WiFi autotests.
Some of the building blocks we use in WiFi tests need to be cleaned up
after use. For instance, we start an XMLRPC server on the client
which should be shut down so that the next test can start its instance.
It is convenient to manage this setup and teardown through a context
manager rather than building it into the test class logic.
"""
CMDLINE_ATTEN_ADDR = 'atten_addr'
CMDLINE_CLIENT_PACKET_CAPTURES = 'client_capture'
CMDLINE_CONDUCTIVE_RIG = 'conductive_rig'
CMDLINE_PACKET_CAPTURE_SNAPLEN = 'capture_snaplen'
CMDLINE_ROUTER_ADDR = 'router_addr'
CMDLINE_PCAP_ADDR = 'pcap_addr'
CMDLINE_PACKET_CAPTURES = 'packet_capture'
CMDLINE_USE_WPA_CLI = 'use_wpa_cli'
@property
def attenuator(self):
"""@return attenuator object (e.g. a BeagleBone)."""
if self._attenuator is None:
raise error.TestNAError('No attenuator available in this setup.')
return self._attenuator
@property
def client(self):
"""@return WiFiClient object abstracting the DUT."""
return self._client_proxy
@property
def router(self):
"""@return router object (e.g. a LinuxCrosRouter)."""
return self._router
@property
def pcap_host(self):
"""@return Dedicated packet capture host or None."""
return self._pcap_host
@property
def capture_host(self):
"""@return Dedicated pcap_host or the router itself."""
return self.pcap_host or self.router
def __init__(self, test_name, host, cmdline_args, debug_dir):
"""Construct a WiFiTestContextManager.
Optionally can pull addresses of the server address, router address,
or router port from cmdline_args.
@param test_name string descriptive name for this test.
@param host host object representing the DUT.
@param cmdline_args dict of key, value settings from command line.
"""
super(WiFiTestContextManager, self).__init__()
self._test_name = test_name
self._cmdline_args = cmdline_args.copy()
self._client_proxy = wifi_client.WiFiClient(
host, debug_dir,
self._get_bool_cmdline_value(self.CMDLINE_USE_WPA_CLI, False))
self._attenuator = None
self._router = None
self._pcap_host = None
self._enable_client_packet_captures = False
self._enable_packet_captures = False
self._packet_capture_snaplen = None
def __enter__(self):
self.setup()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.teardown()
def _get_bool_cmdline_value(self, key, default_value):
"""Returns a bool value for the given key from the cmdline args.
@param key string cmdline args key.
@param default_value value to return if the key is not specified in the
cmdline args.
@return True/False or default_value if key is not specified in the
cmdline args.
"""
if key in self._cmdline_args:
value = self._cmdline_args[key].lower()
if value in ('1', 'true', 'yes', 'y'):
return True
else:
return False
else:
return default_value
def get_wifi_addr(self, ap_num=0):
"""Return an IPv4 address pingable by the client on the WiFi subnet.
@param ap_num int number of AP. Only used in stumpy cells.
@return string IPv4 address.
"""
return self.router.local_server_address(ap_num)
def get_wifi_if(self, ap_num=0):
"""Returns the interface name for the IP address of self.get_wifi_addr.
@param ap_num int number of AP. Only used in stumpy cells.
@return string interface name "e.g. wlan0".
"""
return self.router.get_hostapd_interface(ap_num)
def get_wifi_host(self):
"""@return host object representing a pingable machine."""
return self.router.host
def configure(self, ap_config, multi_interface=None, is_ibss=None,
configure_pcap=False):
"""Configure a router with the given config.
Configures an AP according to the specified config and
enables whatever packet captures are appropriate. Will deconfigure
existing APs unless |multi_interface| is specified.
@param ap_config HostapConfig object.
@param multi_interface True iff having multiple configured interfaces
is expected for this configure call.
@param is_ibss True iff this is an IBSS endpoint.
@param configure_pcap True iff pcap_host should be configured for this
configure call. Raises a TestNAError if |self._pcap_as_router|
is False.
"""
if configure_pcap and not self._pcap_as_router:
raise error.TestNAError('pcap was not configured as router.')
if not self.client.is_frequency_supported(ap_config.frequency):
raise error.TestNAError('DUT does not support frequency: %s' %
ap_config.frequency)
if ap_config.require_vht:
self.client.require_capabilities(
[site_linux_system.LinuxSystem.CAPABILITY_VHT])
router = self.router
if configure_pcap:
router = self.pcap_host
ap_config.security_config.install_router_credentials(router.host,
router.logdir)
if is_ibss:
if multi_interface:
raise error.TestFail('IBSS mode does not support multiple '
'interfaces.')
if not self.client.is_ibss_supported():
raise error.TestNAError('DUT does not support IBSS mode')
router.ibss_configure(ap_config)
else:
router.hostap_configure(ap_config, multi_interface=multi_interface)
if self._enable_client_packet_captures:
self.client.start_capture(ap_config.frequency,
snaplen=self._packet_capture_snaplen)
if self._enable_packet_captures:
self.capture_host.start_capture(ap_config.frequency,
width_type=ap_config.packet_capture_mode,
snaplen=self._packet_capture_snaplen)
def _setup_router(self):
"""Set up the router device."""
self._router = site_linux_router.build_router_proxy(
test_name=self._test_name,
client_hostname=self.client.host.hostname,
router_addr=self._cmdline_args.get(
self.CMDLINE_ROUTER_ADDR, None))
self.router.sync_host_times()
def _setup_pcap(self, pcap_as_router=False):
"""
Set up the pcap device.
@param pcap_as_router optional bool which should be True if the pcap
should be configured as a router.
"""
ping_helper = ping_runner.PingRunner()
pcap_addr = dnsname_mangler.get_pcap_addr(
self.client.host.hostname,
allow_failure=True,
cmdline_override=self._cmdline_args.get(self.CMDLINE_PCAP_ADDR,
None))
if pcap_addr and ping_helper.simple_ping(pcap_addr):
if pcap_as_router:
self._pcap_host = site_linux_router.LinuxRouter(
hosts.create_host(pcap_addr),
role='pcap',
test_name=self._test_name)
else:
self._pcap_host = site_linux_system.LinuxSystem(
hosts.create_host(pcap_addr),'pcap')
def _setup_attenuator(self):
"""
Set up the attenuator device.
The attenuator host gives us the ability to attenuate particular
antennas on the router. Most setups don't have this capability
and most tests do not require it. We use this for RvR
(network_WiFi_AttenuatedPerf) and some roaming tests.
"""
ping_helper = ping_runner.PingRunner()
attenuator_addr = dnsname_mangler.get_attenuator_addr(
self.client.host.hostname,
cmdline_override=self._cmdline_args.get(
self.CMDLINE_ATTEN_ADDR, None),
allow_failure=True)
if attenuator_addr and ping_helper.simple_ping(attenuator_addr):
self._attenuator = attenuator_controller.AttenuatorController(
attenuator_addr)
def setup(self, include_router=True, include_pcap=True,
include_attenuator=True, pcap_as_router=False):
"""
Construct the state used in a WiFi test.
@param include_router optional bool which should be False if the router
is not used by the test
@param include_pcap optional bool which should be False if the pcap
device is not used by the test
@param include_attenuator optional bool which should be False if the
attenuator is not used by the test
@param pcap_as_router optional bool which should be True if the pcap
should be configured as a router.
"""
if include_router:
self._setup_router()
if include_pcap:
self._setup_pcap(pcap_as_router)
self._pcap_as_router = pcap_as_router
if include_attenuator:
self._setup_attenuator()
# Set up a clean context to conduct WiFi tests in.
self.client.shill.init_test_network_state()
self.client.sync_host_times()
if self.CMDLINE_CLIENT_PACKET_CAPTURES in self._cmdline_args:
self._enable_client_packet_captures = True
if self.CMDLINE_PACKET_CAPTURES in self._cmdline_args:
self._enable_packet_captures = True
if self.CMDLINE_PACKET_CAPTURE_SNAPLEN in self._cmdline_args:
self._packet_capture_snaplen = int(
self._cmdline_args[self.CMDLINE_PACKET_CAPTURE_SNAPLEN])
self.client.conductive = self._get_bool_cmdline_value(
self.CMDLINE_CONDUCTIVE_RIG, None)
def teardown(self):
"""Teardown the state used in a WiFi test."""
logging.debug('Tearing down the test context.')
for system in [self._attenuator, self._client_proxy,
self._router, self._pcap_host]:
if system is not None:
system.close()
def assert_connect_wifi(self, wifi_params, description=None):
"""Connect to a WiFi network and check for success.
Connect a DUT to a WiFi network and check that we connect successfully.
@param wifi_params AssociationParameters describing network to connect.
@param description string Additional text for logging messages.
@returns AssociationResult if successful; None if wifi_params
contains expect_failure; asserts otherwise.
"""
if description:
connect_name = '%s (%s)' % (wifi_params.ssid, description)
else:
connect_name = '%s' % wifi_params.ssid
logging.info('Connecting to %s.', connect_name)
assoc_result = xmlrpc_datatypes.deserialize(
self.client.shill.connect_wifi(wifi_params))
logging.info('Finished connection attempt to %s with times: '
'discovery=%.2f, association=%.2f, configuration=%.2f.',
connect_name,
assoc_result.discovery_time,
assoc_result.association_time,
assoc_result.configuration_time)
if assoc_result.success and wifi_params.expect_failure:
raise error.TestFail(
'Expected connection to %s to fail, but it was successful.' %
connect_name)
if not assoc_result.success and not wifi_params.expect_failure:
raise error.TestFail(
'Expected connection to %s to succeed, '
'but it failed with reason: %s.' % (
connect_name, assoc_result.failure_reason))
if wifi_params.expect_failure:
logging.info('Unable to connect to %s, as intended.',
connect_name)
return None
logging.info('Connected successfully to %s, signal level: %r.',
connect_name, self.client.wifi_signal_level)
return assoc_result
def assert_ping_from_dut(self, ping_config=None, ap_num=None):
"""Ping a host on the WiFi network from the DUT.
Ping a host reachable on the WiFi network from the DUT, and
check that the ping is successful. The host we ping depends
on the test setup, sometimes that host may be the server and
sometimes it will be the router itself. Ping-ability may be
used to confirm that a WiFi network is operating correctly.
@param ping_config optional PingConfig object to override defaults.
@param ap_num int which AP to ping if more than one is configured.
"""
if ap_num is None:
ap_num = 0
if ping_config is None:
ping_ip = self.router.get_wifi_ip(ap_num=ap_num)
ping_config = ping_runner.PingConfig(ping_ip)
self.client.ping(ping_config)
def assert_ping_from_server(self, ping_config=None):
"""Ping the DUT across the WiFi network from the server.
Check that the ping is mostly successful and fail the test if it
is not.
@param ping_config optional PingConfig object to override defaults.
"""
logging.info('Pinging from server.')
if ping_config is None:
ping_ip = self.client.wifi_ip
ping_config = ping_runner.PingConfig(ping_ip)
self.router.ping(ping_config)
def wait_for_connection(self, ssid, freq=None, ap_num=None,
timeout_seconds=30):
"""Verifies a connection to network ssid on frequency freq.
@param ssid string ssid of the network to check.
@param freq int frequency of network to check.
@param ap_num int AP to which to connect
@param timeout_seconds int number of seconds to wait for
connection on the given frequency.
@returns a named tuple of (state, time)
"""
if ap_num is None:
ap_num = 0
desired_subnet = self.router.get_wifi_ip_subnet(ap_num)
wifi_ip = self.router.get_wifi_ip(ap_num)
return self.client.wait_for_connection(
ssid, timeout_seconds=timeout_seconds, freq=freq,
ping_ip=wifi_ip, desired_subnet=desired_subnet)