# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import re
import time
from autotest_lib.client.common_lib import error
from autotest_lib.server import site_linux_router
def isLinuxCrosRouter(router):
"""Detect if a remote system is a CrOS router (stumpy cell).
@param router Host object representing the router.
@return True iff |router| is a host running CrOS.
router_lsb ='cat /etc/lsb-release', ignore_status=True).stdout
return'CHROMEOS_RELEASE', router_lsb)
class LinuxCrosRouter(site_linux_router.LinuxRouter):
Linux/mac80211-style WiFi Router support for WiFiTest class.
As compared to LinuxRouter, LinuxCrosRouter is specialized for routers
running a ChromiumOS image.
def get_capabilities(self):
"""@return iterable object of AP capabilities for this system."""
return super(LinuxCrosRouter, self).get_capabilities().union(
def __init__(self, host, params, test_name):
cros_params = params.copy()
'force_local_server': None,
'phy_bus_preference': {
'monitor': 'usb',
'managed': 'pci'
super(LinuxCrosRouter, self).__init__(host, cros_params, test_name)
def get_hostapd_start_command(self, log_file, pid_file, conf_file):
return '%s -dd -B -t -f %s -P %s %s' % (
self.cmd_hostapd, log_file, pid_file, conf_file)
def _pre_start_hook(self, config):
# Make sure a supplicant instance is not running.'stop wpasupplicant', ignore_status=True)
def start_dhcp_server(self, interface):
for server in self.local_servers:
if server['interface'] == interface:
params = server
raise RunTimeError('Could not find local server to match interface')
dhcpd_conf_file = self.dhcpd_conf % interface
dhcp_conf = '\n'.join([
'port=0', # disables DNS server
'dhcp-range=%s' % params['dhcp_range'].replace(' ', ','),
'interface=%s' % params['interface'],
'dhcp-leasefile=%s' % self.dhcpd_leases])'cat <<EOF >%s\n%s\nEOF\n' %
(dhcpd_conf_file, dhcp_conf))'dnsmasq --conf-file=%s' % dhcpd_conf_file)
def stop_dhcp_server(self, instance):
self._kill_process_instance('dnsmasq', instance, 0)
def _post_start_hook(self, params):
hostapd_instance = self.hostapd_instances[-1]
log_file = hostapd_instance['log_file']
pid_file = hostapd_instance['pid_file']
# Wait for confirmation that the router came up.
pid = int('cat %s' % pid_file).stdout)'Waiting for hostapd to startup.')
start_time = time.time()
while time.time() - start_time < self.STARTUP_TIMEOUT_SECONDS:
success =
'grep "Completing interface initialization" %s' % log_file,
ignore_status=True).exit_status == 0
if success:
# A common failure is to request an invalid router configuration.
# Detect this and exit early if we see it.
bad_config =
'grep "Interface initialization failed" %s' % log_file,
ignore_status=True).exit_status == 0
if bad_config:
raise error.TestFail('hostapd failed to initialize AP '
if pid:
early_exit ='kill -0 %d' % pid,
if early_exit:
raise error.TestFail('hostapd process terminated.')
raise error.TestFail('Timed out while waiting for hostapd '
'to start.')