blob: 3739a5554dc4e6c6c59838be8de23c904077f746 [file] [log] [blame]
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import datetime
import collections
import logging
import time
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib.cros.network import iw_runner
from autotest_lib.client.common_lib.cros.network import ping_runner
from autotest_lib.server.cros import wifi_test_utils
from autotest_lib.server.cros.network import packet_capturer
NetDev = collections.namedtuple('NetDev',
['inherited', 'phy', 'if_name', 'if_type'])
class LinuxSystem(object):
"""Superclass for test machines running Linux.
Provides a common point for routines that use the cfg80211 userspace tools
to manipulate the wireless stack, regardless of the role they play.
Currently the commands shared are the init, which queries for wireless
devices, along with start_capture and stop_capture. More commands may
migrate from site_linux_router as appropriate to share.
"""
CAPABILITY_5GHZ = '5ghz'
CAPABILITY_MULTI_AP = 'multi_ap'
CAPABILITY_MULTI_AP_SAME_BAND = 'multi_ap_same_band'
CAPABILITY_IBSS = 'ibss_supported'
CAPABILITY_SEND_MANAGEMENT_FRAME = 'send_management_frame'
CAPABILITY_TDLS = 'tdls'
@property
def capabilities(self):
"""@return iterable object of AP capabilities for this system."""
if self._capabilities is None:
self._capabilities = self.get_capabilities()
logging.info('%s system capabilities: %r',
self.role, self._capabilities)
return self._capabilities
def __init__(self, host, role, inherit_interfaces=False):
# Command locations.
cmd_iw = wifi_test_utils.must_be_installed(
host, '/usr/sbin/iw')
self.cmd_ip = wifi_test_utils.must_be_installed(
host, '/usr/sbin/ip')
self.cmd_readlink = '%s -l' % wifi_test_utils.must_be_installed(
host, '/bin/ls')
self.host = host
self.role = role
self._packet_capturer = packet_capturer.get_packet_capturer(
self.host, host_description=role, cmd_ip=self.cmd_ip,
cmd_iw=cmd_iw, ignore_failures=True)
self.iw_runner = iw_runner.IwRunner(remote_host=host, command_iw=cmd_iw)
self._phy_list = None
self.phys_for_frequency, self.phy_bus_type = self._get_phy_info()
self._interfaces = []
for interface in self.iw_runner.list_interfaces():
if inherit_interfaces:
self._interfaces.append(NetDev(inherited=True,
if_name=interface.if_name,
if_type=interface.if_type,
phy=interface.phy))
else:
self.iw_runner.remove_interface(interface.if_name)
self._wlanifs_in_use = []
self._capture_interface = None
# Some uses of LinuxSystem don't use the interface allocation facility.
# Don't force us to remove all the existing interfaces if this facility
# is not desired.
self._wlanifs_initialized = False
self._capabilities = None
self._ping_runner = ping_runner.PingRunner(host=self.host)
@property
def phy_list(self):
"""@return iterable object of PHY descriptions for this system."""
if self._phy_list is None:
self._phy_list = self.iw_runner.list_phys()
return self._phy_list
def _get_phy_info(self):
"""Get information about WiFi devices.
Parse the output of 'iw list' and some of sysfs and return:
A dict |phys_for_frequency| which maps from each frequency to a
list of phys that support that channel.
A dict |phy_bus_type| which maps from each phy to the bus type for
each phy.
@return phys_for_frequency, phy_bus_type tuple as described.
"""
phys_for_frequency = {}
phy_caps = {}
phy_list = []
for phy in self.phy_list:
phy_list.append(phy.name)
for band in phy.bands:
for mhz in band.frequencies:
if mhz not in phys_for_frequency:
phys_for_frequency[mhz] = [phy.name]
else:
phys_for_frequency[mhz].append(phy.name)
phy_bus_type = {}
for phy in phy_list:
phybus = 'unknown'
command = '%s /sys/class/ieee80211/%s' % (self.cmd_readlink, phy)
devpath = self.host.run(command).stdout
if '/usb' in devpath:
phybus = 'usb'
elif '/mmc' in devpath:
phybus = 'sdio'
elif '/pci' in devpath:
phybus = 'pci'
phy_bus_type[phy] = phybus
logging.debug('Got phys for frequency: %r', phys_for_frequency)
return phys_for_frequency, phy_bus_type
def remove_interface(self, interface):
"""Remove an interface from a WiFi device.
@param interface string interface to remove (e.g. wlan0).
"""
self.release_interface(interface)
self.host.run('%s link set %s down' % (self.cmd_ip, interface))
self.iw_runner.remove_interface(interface)
for net_dev in self._interfaces:
if net_dev.if_name == interface:
self._interfaces.remove(net_dev)
break
def close(self):
"""Close global resources held by this system."""
logging.debug('Cleaning up host object for %s', self.role)
self._packet_capturer.close()
# Release and remove any interfaces that we create.
for net_dev in self._wlanifs_in_use:
self.release_interface(net_dev.if_name)
for net_dev in self._interfaces:
if net_dev.inherited:
continue
self.remove_interface(net_dev.if_name)
self.host.close()
self.host = None
def get_capabilities(self):
caps = set()
phymap = self.phys_for_frequency
if [freq for freq in phymap.iterkeys() if freq > 5000]:
# The frequencies are expressed in megaherz
caps.add(self.CAPABILITY_5GHZ)
if [freq for freq in phymap.iterkeys() if len(phymap[freq]) > 1]:
caps.add(self.CAPABILITY_MULTI_AP_SAME_BAND)
caps.add(self.CAPABILITY_MULTI_AP)
elif len(self.phy_bus_type) > 1:
caps.add(self.CAPABILITY_MULTI_AP)
for phy in self.phy_list:
if 'tdls_mgmt' in phy.commands or 'tdls_oper' in phy.commands:
caps.add(self.CAPABILITY_TDLS)
return caps
def start_capture(self, frequency, ht_type=None, snaplen=None):
"""Start a packet capture.
@param frequency int frequency of channel to capture on.
@param ht_type string one of (None, 'HT20', 'HT40+', 'HT40-').
@param snaplen int number of bytes to retain per capture frame.
"""
if self._packet_capturer.capture_running:
self.stop_capture()
self._capture_interface = self.get_wlanif(frequency, 'monitor')
full_interface = [net_dev for net_dev in self._interfaces
if net_dev.if_name == self._capture_interface][0]
# If this is the only interface on this phy, we ought to configure
# the phy with a channel and ht_type. Otherwise, inherit the settings
# of the phy as they stand.
if len([net_dev for net_dev in self._interfaces
if net_dev.phy == full_interface.phy]) == 1:
self._packet_capturer.configure_raw_monitor(
self._capture_interface, frequency, ht_type=ht_type)
else:
self.host.run('%s link set %s up' %
(self.cmd_ip, self._capture_interface))
# Start the capture.
self._packet_capturer.start_capture(self._capture_interface, './debug/',
snaplen=snaplen)
def stop_capture(self, save_dir=None, save_filename=None):
"""Stop a packet capture.
@param save_dir string path to directory to save pcap files in.
@param save_filename string basename of file to save pcap in locally.
"""
if not self._packet_capturer.capture_running:
return
results = self._packet_capturer.stop_capture(
local_save_dir=save_dir, local_pcap_filename=save_filename)
self.release_interface(self._capture_interface)
self._capture_interface = None
return results
def sync_host_times(self):
"""Set time on our DUT to match local time."""
epoch_seconds = time.time()
busybox_format = '%Y%m%d%H%M.%S'
busybox_date = datetime.datetime.utcnow().strftime(busybox_format)
self.host.run('date -u --set=@%s 2>/dev/null || date -u %s' %
(epoch_seconds, busybox_date))
def _get_phy_for_frequency(self, frequency, phytype):
"""Get a phy appropriate for a frequency and phytype.
Return the most appropriate phy interface for operating on the
frequency |frequency| in the role indicated by |phytype|. Prefer idle
phys to busy phys if any exist. Secondarily, show affinity for phys
that use the bus type associated with this phy type.
@param frequency int WiFi frequency of phy.
@param phytype string key of phytype registered at construction time.
@return string name of phy to use.
"""
phys = self.phys_for_frequency[frequency]
busy_phys = set(net_dev.phy for net_dev in self._wlanifs_in_use)
idle_phys = [phy for phy in phys if phy not in busy_phys]
phys = idle_phys or phys
preferred_bus = {'monitor': 'usb', 'managed': 'pci'}.get(phytype)
preferred_phys = [phy for phy in phys
if self.phy_bus_type[phy] == preferred_bus]
phys = preferred_phys or phys
return phys[0]
def get_wlanif(self, frequency, phytype, same_phy_as=None):
"""Get a WiFi device that supports the given frequency and type.
@param frequency int WiFi frequency to support.
@param phytype string type of phy (e.g. 'monitor').
@param same_phy_as string create the interface on the same phy as this.
@return string WiFi device.
"""
if same_phy_as:
for net_dev in self._interfaces:
if net_dev.if_name == same_phy_as:
phy = net_dev.phy
break
else:
raise error.TestFail('Unable to find phy for interface %s' %
same_phy_as)
elif frequency in self.phys_for_frequency:
phy = self._get_phy_for_frequency(frequency, phytype)
else:
raise error.TestFail('Unable to find phy for frequency %d' %
frequency)
# If we have a suitable unused interface sitting around on this
# phy, reuse it.
for net_dev in set(self._interfaces) - set(self._wlanifs_in_use):
if net_dev.phy == phy and net_dev.if_type == phytype:
self._wlanifs_in_use.append(net_dev)
return net_dev.if_name
# Because we can reuse interfaces, we have to iteratively find a good
# interface name.
name_exists = lambda name: bool([net_dev
for net_dev in self._interfaces
if net_dev.if_name == name])
if_name = lambda index: '%s%d' % (phytype, index)
if_index = len(self._interfaces)
while name_exists(if_name(if_index)):
if_index += 1
net_dev = NetDev(phy=phy, if_name=if_name(if_index), if_type=phytype,
inherited=False)
self._interfaces.append(net_dev)
self._wlanifs_in_use.append(net_dev)
self.iw_runner.add_interface(phy, net_dev.if_name, phytype)
return net_dev.if_name
def release_interface(self, wlanif):
"""Release a device allocated throuhg get_wlanif().
@param wlanif string name of device to release.
"""
for net_dev in self._wlanifs_in_use:
if net_dev.if_name == wlanif:
self._wlanifs_in_use.remove(net_dev)
def require_capabilities(self, requirements, fatal_failure=False):
"""Require capabilities of this LinuxSystem.
Check that capabilities in |requirements| exist on this system.
Raise and exception to skip but not fail the test if said
capabilities are not found. Pass |fatal_failure| to cause this
error to become a test failure.
@param requirements list of CAPABILITY_* defined above.
@param fatal_failure bool True iff failures should be fatal.
"""
to_be_raised = error.TestNAError
if fatal_failure:
to_be_raised = error.TestFail
missing = [cap for cap in requirements if not cap in self.capabilities]
if missing:
raise to_be_raised('AP on %s is missing required capabilites: %r' %
(self.role, missing))
def set_antenna_bitmap(self, tx_bitmap, rx_bitmap):
"""Setup antenna bitmaps for all the phys.
@param tx_bitmap int bitmap of allowed antennas to use for TX
@param rx_bitmap int bitmap of allowed antennas to use for RX
"""
for phy in self.phy_list:
if not phy.supports_setting_antenna_mask:
continue
self.iw_runner.set_antenna_bitmap(phy.name, tx_bitmap, rx_bitmap)
def set_default_antenna_bitmap(self):
"""Setup default antenna bitmaps for all the phys."""
for phy in self.phy_list:
if not phy.supports_setting_antenna_mask:
continue
self.iw_runner.set_antenna_bitmap(phy.name, phy.avail_tx_antennas,
phy.avail_rx_antennas)
def ping(self, ping_config):
"""Ping an IP from this system.
@param ping_config PingConfig object describing the ping command to run.
@return a PingResult object.
"""
logging.info('Pinging from the %s.', self.role)
return self._ping_runner.ping(ping_config)