blob: 1545467c3e548cf39ee0e5beeaf8a38ae1cc3347 [file] [log] [blame]
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import os
import subprocess
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib.cros import site_eap_certs
class HostapdServer(object):
"""Hostapd server instance wrapped in a context manager.
Simple interface to starting and controlling a hsotapd instance.
This can be combined with a virtual-ethernet setup to test 802.1x
on a wired interface.
Example usage:
with hostapd_server.HostapdServer(interface='veth_master') as hostapd:
hostapd.send_eap_packets()
"""
CONFIG_TEMPLATE = """
interface=%(interface)s
driver=%(driver)s
logger_syslog=-1
logger_syslog_level=2
logger_stdout=-1
logger_stdout_level=2
dump_file=%(config_directory)s/hostapd.dump
ctrl_interface=%(config_directory)s/%(control_directory)s
ieee8021x=1
eapol_key_index_workaround=0
eap_server=1
eap_user_file=%(config_directory)s/%(user_file)s
ca_cert=%(config_directory)s/%(ca_cert)s
server_cert=%(config_directory)s/%(server_cert)s
private_key=%(config_directory)s/%(server_key)s
use_pae_group_addr=1
eap_reauth_period=10
"""
CA_CERTIFICATE_FILE = 'ca.crt'
CONFIG_FILE = 'hostapd.conf'
CONTROL_DIRECTORY = 'hostapd.ctl'
EAP_PASSWORD = 'password'
EAP_PHASE2 = 'MSCHAPV2'
EAP_TYPE = 'PEAP'
EAP_USERNAME = 'test'
HOSTAPD_EXECUTABLE = 'hostapd'
HOSTAPD_CLIENT_EXECUTABLE = 'hostapd_cli'
SERVER_CERTIFICATE_FILE = 'server.crt'
SERVER_PRIVATE_KEY_FILE = 'server.key'
USER_AUTHENTICATION_TEMPLATE = """* %(type)s
"%(username)s"\t%(phase2)s\t"%(password)s"\t[2]
"""
USER_FILE = 'hostapd.eap_user'
# This is the default group MAC address to which EAP challenges
# are sent, absent any prior knowledge of a specific client on
# the link.
PAE_NEAREST_ADDRESS = '01:80:c2:00:00:03'
def __init__(self,
interface=None,
driver='wired',
config_directory='/tmp/hostapd-test'):
super(HostapdServer, self).__init__()
self._interface = interface
self._config_directory = config_directory
self._control_directory = '%s/%s' % (self._config_directory,
self.CONTROL_DIRECTORY)
self._driver = driver
self._process = None
def __enter__(self):
self.start()
return self
def __exit__(self, exception, value, traceback):
self.stop()
def write_config(self):
"""Write out a hostapd configuration file-set based on the caller
supplied parameters.
@return the file name of the top-level configuration file written.
"""
if not os.path.exists(self._config_directory):
os.mkdir(self._config_directory)
config_params = {
'ca_cert': self.CA_CERTIFICATE_FILE,
'config_directory' : self._config_directory,
'control_directory': self.CONTROL_DIRECTORY,
'driver': self._driver,
'interface': self._interface,
'server_cert': self.SERVER_CERTIFICATE_FILE,
'server_key': self.SERVER_PRIVATE_KEY_FILE,
'user_file': self.USER_FILE
}
authentication_params = {
'password': self.EAP_PASSWORD,
'phase2': self.EAP_PHASE2,
'username': self.EAP_USERNAME,
'type': self.EAP_TYPE
}
for filename, contents in (
( self.CA_CERTIFICATE_FILE, site_eap_certs.ca_cert_1 ),
( self.CONFIG_FILE, self.CONFIG_TEMPLATE % config_params),
( self.SERVER_CERTIFICATE_FILE, site_eap_certs.server_cert_1 ),
( self.SERVER_PRIVATE_KEY_FILE,
site_eap_certs.server_private_key_1 ),
( self.USER_FILE,
self.USER_AUTHENTICATION_TEMPLATE % authentication_params )):
config_file = '%s/%s' % (self._config_directory, filename)
with open(config_file, 'w') as f:
f.write(contents)
return '%s/%s' % (self._config_directory, self.CONFIG_FILE)
def start(self):
"""Start the hostap server."""
config_file = self.write_config()
self._process = subprocess.Popen(
[self.HOSTAPD_EXECUTABLE, '-dd', config_file])
def stop(self):
"""Stop the hostapd server."""
if self._process:
self._process.terminate()
self._process.wait()
self._process = None
def running(self):
"""Tests whether the hostapd process is still running.
@return True if the hostapd process is still running, False otherwise.
"""
if not self._process:
return False
if self._process.poll() != None:
# We have essentially reaped the proces, and it is no more.
self._process = None
return False
return True
def send_eap_packets(self):
"""Start sending EAP packets to the nearest neighbor."""
self.send_command('new_sta %s' % self.PAE_NEAREST_ADDRESS)
def get_client_mib(self, client_mac_address):
"""Get a dict representing the MIB properties for |client_mac_address|.
@param client_mac_address string MAC address of the client.
@return dict containing mib properties.
"""
# Expected output of "hostapd cli <client_mac_address>":
#
# Selected interface 'veth_master'
# b6:f1:39:1d:ad:10
# dot1xPaePortNumber=0
# dot1xPaePortProtocolVersion=2
# [...]
result = self.send_command('sta %s' % client_mac_address)
client_mib = {}
found_client = False
for line in result.splitlines():
if found_client:
parts = line.split('=', 1)
if len(parts) == 2:
client_mib[parts[0]] = parts[1]
elif line == client_mac_address:
found_client = True
return client_mib
def send_command(self, command):
"""Send a command to the hostapd instance.
@param command string containing the command to run on hostapd.
@return string output of the command.
"""
return utils.system_output('%s -p %s %s' %
(self.HOSTAPD_CLIENT_EXECUTABLE,
self._control_directory, command))
def client_has_authenticated(self, client_mac_address):
"""Return whether |client_mac_address| has successfully authenticated.
@param client_mac_address string MAC address of the client.
@return True if client is authenticated.
"""
mib = self.get_client_mib(client_mac_address)
return mib.get('dot1xAuthAuthSuccessesWhileAuthenticating', '') == '1'
def client_has_logged_off(self, client_mac_address):
"""Return whether |client_mac_address| has logged-off.
@param client_mac_address string MAC address of the client.
@return True if client has logged off.
"""
mib = self.get_client_mib(client_mac_address)
return mib.get('dot1xAuthAuthEapLogoffWhileAuthenticated', '') == '1'