blob: b52d05bafffcb6c201892f3b2db48a4e7d5c8395 [file] [log] [blame]
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import os
import random
import stat
import string
import sys
import tempfile
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib.cros import xmlrpc_types
def deserialize(serialized):
"""Deserialize a SecurityConfig.
@param serialized dict representing a serialized SecurityConfig.
@return a SecurityConfig object built from |serialized|.
"""
return xmlrpc_types.deserialize(serialized, module=sys.modules[__name__])
class SecurityConfig(xmlrpc_types.XmlRpcStruct):
"""Abstracts the security configuration for a WiFi network.
This bundle of credentials can be passed to both HostapConfig and
AssociationParameters so that both shill and hostapd can set up and connect
to an encrypted WiFi network. By default, we'll assume we're connecting
to an open network.
"""
SERVICE_PROPERTY_PASSPHRASE = 'Passphrase'
def __init__(self, security='none'):
super(SecurityConfig, self).__init__()
self.security = security
def get_hostapd_config(self):
"""@return dict fragment of hostapd configuration for security."""
return {}
def get_shill_service_properties(self):
"""@return dict of shill service properties."""
return {}
def get_wpa_cli_properties(self):
"""@return dict values to be set with wpa_cli set_network."""
return {'key_mgmt': 'NONE'}
def install_router_credentials(self, host):
"""Install the necessary credentials on the router.
@param host host object representing the router.
"""
pass # Many authentication methods have no special router credentials.
def install_client_credentials(self, tpm_store):
"""Install credentials on the local host (hopefully a DUT).
Only call this if we're running on a DUT in a WiFi test. This
method can do things like install credentials into the TPM.
@param tpm_store TPMStore object representing the TPM on our DUT.
"""
pass # Many authentication methods have no special client credentials.
def __repr__(self):
return '%s(%s)' % (self.__class__.__name__,
', '.join(['%s=%r' % item
for item in vars(self).iteritems()]))
class WEPConfig(SecurityConfig):
"""Abstracts security configuration for a WiFi network using static WEP."""
# Open system authentication means that we don't do a 4 way AUTH handshake,
# and simply start using the WEP keys after association finishes.
AUTH_ALGORITHM_OPEN = 1
# This refers to a mode where the AP sends a plaintext challenge and the
# client sends back the challenge encrypted with the WEP key as part of a 4
# part auth handshake.
AUTH_ALGORITHM_SHARED = 2
AUTH_ALGORITHM_DEFAULT = AUTH_ALGORITHM_OPEN
@staticmethod
def _format_key(key, ascii_key_formatter):
"""Returns a key formatted to for its appropriate consumer.
Both hostapd and wpa_cli want their ASCII encoded WEP keys formatted
in a particular way. Hex string on the other hand can be given raw.
Other key formats aren't even accepted, and this method will raise
and exception if it sees such a key.
@param key string a 40/104 bit WEP key.
@param ascii_key_formatter converter function that escapes a WEP
string-encoded passphrase. This conversion varies in format
depending on the consumer.
@return string corrected formatted WEP key.
"""
if len(key) in (5, 13):
# These are 'ASCII' strings, or at least N-byte strings
# of the right size.
return ascii_key_formatter(key)
if len(key) in (10, 26):
# These are hex encoded byte strings.
return key
raise error.TestFail('Invalid WEP key: %r' % key)
def __init__(self, wep_keys, wep_default_key=0,
auth_algorithm=AUTH_ALGORITHM_DEFAULT):
"""Construct a WEPConfig object.
@param wep_keys list of string WEP keys.
@param wep_default_key int 0 based index into |wep_keys| for the default
key.
@param auth_algorithm int bitfield of AUTH_ALGORITHM_* defined above.
"""
super(WEPConfig, self).__init__(security='wep')
self.wep_keys = wep_keys
self.wep_default_key = wep_default_key
self.auth_algorithm = auth_algorithm
if self.auth_algorithm & ~(self.AUTH_ALGORITHM_OPEN |
self.AUTH_ALGORITHM_SHARED):
raise error.TestFail('Invalid authentication mode specified (%d).' %
self.auth_algorithm)
if self.wep_keys and len(self.wep_keys) > 4:
raise error.TestFail('More than 4 WEP keys specified (%d).' %
len(self.wep_keys))
def get_hostapd_config(self):
"""@return dict fragment of hostapd configuration for security."""
ret = {}
quote = lambda x: '"%s"' % x
for idx,key in enumerate(self.wep_keys):
ret['wep_key%d' % idx] = self._format_key(key, quote)
ret['wep_default_key'] = self.wep_default_key
ret['auth_algs'] = self.auth_algorithm
return ret
def get_shill_service_properties(self):
"""@return dict of shill service properties."""
return {self.SERVICE_PROPERTY_PASSPHRASE: '%d:%s' % (
self.wep_default_key,
self.wep_keys[self.wep_default_key])}
def get_wpa_cli_properties(self):
properties = super(WEPConfig, self).get_wpa_cli_properties()
quote = lambda x: '\'\\"%s\\"\'' % x
for idx, key in enumerate(self.wep_keys):
properties['wep_key%d' % idx] = self._format_key(key, quote)
properties['wep_tx_keyidx'] = self.wep_default_key
if self.auth_algorithm == self.AUTH_ALGORITHM_SHARED:
properties['auth_alg'] = 'SHARED'
return properties
class WPAConfig(SecurityConfig):
"""Abstracts security configuration for a WPA encrypted WiFi network."""
# We have the option of turning on WPA, WPA2, or both via a bitfield.
MODE_PURE_WPA = 1
MODE_PURE_WPA2 = 2
MODE_MIXED_WPA = MODE_PURE_WPA | MODE_PURE_WPA2
MODE_DEFAULT = MODE_MIXED_WPA
# WPA2 mandates the use of AES in CCMP mode.
# WPA allows the use of 'ordinary' AES, but mandates support for TKIP.
# The protocol however seems to indicate that you just list a bunch of
# different ciphers that you support and we'll start speaking one.
CIPHER_CCMP = 'CCMP'
CIPHER_TKIP = 'TKIP'
def __init__(self, psk='', wpa_mode=MODE_DEFAULT, wpa_ciphers=[],
wpa2_ciphers=[], wpa_ptk_rekey_period=None,
wpa_gtk_rekey_period=None, wpa_gmk_rekey_period=None,
use_strict_rekey=None):
"""Construct a WPAConfig.
@param psk string a passphrase (64 hex characters or an ASCII phrase up
to 63 characters long).
@param wpa_mode int one of MODE_* above.
@param wpa_ciphers list of ciphers to advertise in the WPA IE.
@param wpa2_ciphers list of ciphers to advertise in the WPA2 IE.
hostapd will fall back on WPA ciphers for WPA2 if this is
left unpopulated.
@param wpa_ptk_rekey_period int number of seconds between PTK rekeys.
@param wpa_gtk_rekey_period int number of second between GTK rekeys.
@param wpa_gmk_rekey_period int number of seconds between GMK rekeys.
The GMK is a key internal to hostapd used to generate GTK.
It is the 'master' key.
@param use_strict_rekey bool True iff hostapd should refresh the GTK
whenever any client leaves the group.
"""
super(WPAConfig, self).__init__(security='psk')
self.psk = psk
self.wpa_mode = wpa_mode
self.wpa_ciphers = wpa_ciphers
self.wpa2_ciphers = wpa2_ciphers
self.wpa_ptk_rekey_period = wpa_ptk_rekey_period
self.wpa_gtk_rekey_period = wpa_gtk_rekey_period
self.wpa_gmk_rekey_period = wpa_gmk_rekey_period
self.use_strict_rekey = use_strict_rekey
if len(psk) > 64:
raise error.TestFail('WPA passphrases can be no longer than 63 '
'characters (or 64 hex digits).')
if len(psk) == 64:
for c in psk:
if c not in '0123456789abcdefABCDEF':
raise error.TestFail('Invalid PMK: %r' % psk)
def get_hostapd_config(self):
"""@return dict fragment of hostapd configuration for security."""
if not self.wpa_mode:
raise error.TestFail('Cannot configure WPA unless we know which '
'mode to use.')
if self.MODE_PURE_WPA & self.wpa_mode and not self.wpa_ciphers:
raise error.TestFail('Cannot configure WPA unless we know which '
'ciphers to use.')
if not self.wpa_ciphers and not self.wpa2_ciphers:
raise error.TestFail('Cannot configure WPA2 unless we have some '
'ciphers.')
ret = {'wpa': self.wpa_mode,
'wpa_key_mgmt': 'WPA-PSK'}
if len(self.psk) == 64:
ret['wpa_psk'] = self.psk
else:
ret['wpa_passphrase'] = self.psk
if self.wpa_ciphers:
ret['wpa_pairwise'] = ' '.join(self.wpa_ciphers)
if self.wpa2_ciphers:
ret['rsn_pairwise'] = ' '.join(self.wpa2_ciphers)
if self.wpa_ptk_rekey_period:
ret['wpa_ptk_rekey'] = self.wpa_ptk_rekey_period
if self.wpa_gtk_rekey_period:
ret['wpa_group_rekey'] = self.wpa_gtk_rekey_period
if self.wpa_gmk_rekey_period:
ret['wpa_gmk_rekey'] = self.wpa_gmk_rekey_period
if self.use_strict_rekey:
ret['wpa_strict_rekey'] = 1
return ret
def get_shill_service_properties(self):
"""@return dict of shill service properties."""
return {self.SERVICE_PROPERTY_PASSPHRASE: self.psk}
def get_wpa_cli_properties(self):
properties = super(WPAConfig, self).get_wpa_cli_properties()
# TODO(wiley) This probably doesn't work for raw PMK.
protos = []
if self.wpa_mode & self.MODE_PURE_WPA:
protos.append('WPA')
if self.wpa_mode & self.MODE_PURE_WPA2:
protos.append('RSN')
properties.update({'psk': '\'\\"%s\\"\'' % self.psk,
'key_mgmt': 'WPA-PSK',
'proto': ' '.join(protos)})
return properties
class EAPConfig(SecurityConfig):
"""Abstract superclass that implements certificate/key installation."""
DEFAULT_EAP_USERS = '* TLS'
DEFAULT_EAP_IDENTITY = 'chromeos'
SERVICE_PROPERTY_CA_CERT_PEM = 'EAP.CACertPEM'
SERVICE_PROPERTY_CLIENT_CERT_ID = 'EAP.CertID'
SERVICE_PROPERTY_EAP_IDENTITY = 'EAP.Identity'
SERVICE_PROPERTY_EAP_KEY_MGMT = 'EAP.KeyMgmt'
SERVICE_PROPERTY_EAP_PASSWORD = 'EAP.Password'
SERVICE_PROPERTY_EAP_PIN = 'EAP.PIN'
SERVICE_PROPERTY_INNER_EAP= 'EAP.InnerEAP'
SERVICE_PROPERTY_PRIVATE_KEY_ID = 'EAP.KeyID'
SERVICE_PROPERTY_USE_SYSTEM_CAS = 'EAP.UseSystemCAs'
last_tpm_id = 8800
@staticmethod
def reserve_TPM_id():
"""@return session unique TPM identifier."""
ret = str(EAPConfig.last_tpm_id)
EAPConfig.last_tpm_id += 1
return ret
def __init__(self, security='802_1x', file_suffix=None, use_system_cas=None,
server_ca_cert=None, server_cert=None, server_key=None,
server_eap_users=None,
client_ca_cert=None, client_cert=None, client_key=None,
client_cert_id=None, client_key_id=None,
eap_identity=None):
"""Construct an EAPConfig.
@param file_suffix string unique file suffix on DUT.
@param use_system_cas False iff we should ignore server certificates.
@param server_ca_cert string PEM encoded CA certificate for the server.
@param server_cert string PEM encoded identity certificate for server.
@param server_key string PEM encoded private key for server.
@param server_eap_users string contents of EAP user file.
@param client_ca_cert string PEM encoded CA certificate for client.
@param client_cert string PEM encoded identity certificate for client.
@param client_key string PEM encoded private key for client.
@param client_cert_id string identifier for client certificate in TPM.
@param client_key_id string identifier for client private key in TPM.
@param eap_identity string user to authenticate as during EAP.
"""
super(EAPConfig, self).__init__(security=security)
self.use_system_cas = use_system_cas
self.server_ca_cert = server_ca_cert
self.server_cert = server_cert
self.server_key = server_key
self.server_eap_users = server_eap_users or self.DEFAULT_EAP_USERS
self.client_ca_cert = client_ca_cert
self.client_cert = client_cert
self.client_key = client_key
if file_suffix is None:
suffix_letters = string.ascii_lowercase + string.digits
file_suffix = ''.join(random.choice(suffix_letters)
for x in range(10))
logging.debug('Choosing unique file_suffix %s.', file_suffix)
self.server_ca_cert_file = '/tmp/hostapd_ca_cert_file.' + file_suffix
self.server_cert_file = '/tmp/hostapd_cert_file.' + file_suffix
self.server_key_file = '/tmp/hostapd_key_file.' + file_suffix
self.server_eap_user_file = '/tmp/hostapd_eap_user_file.' + file_suffix
# While these paths won't make it across the network, the suffix will.
self.file_suffix = file_suffix
self.client_cert_id = client_cert_id or self.reserve_TPM_id()
self.client_key_id = client_key_id or self.reserve_TPM_id()
# This gets filled in at install time.
self.pin = None
self.eap_identity = eap_identity or self.DEFAULT_EAP_IDENTITY
def install_router_credentials(self, host):
"""Install the necessary credentials on the router.
@param host host object representing the router.
"""
files = [(self.server_ca_cert, self.server_ca_cert_file),
(self.server_cert, self.server_cert_file),
(self.server_key, self.server_key_file),
(self.server_eap_users, self.server_eap_user_file)]
for content, path in files:
# If we omit a parameter, just omit copying a file over.
if content is None:
continue
# Write the contents to local disk first so we can use the easy
# built in mechanism to do this.
with tempfile.NamedTemporaryFile() as f:
f.write(content)
f.flush()
os.chmod(f.name, stat.S_IRUSR | stat.S_IWUSR |
stat.S_IRGRP | stat.S_IWGRP |
stat.S_IROTH | stat.S_IWOTH)
host.send_file(f.name, path, delete_dest=True)
def install_client_credentials(self, tpm_store):
"""Install credentials on the local host (hopefully a DUT).
Only call this if we're running on a DUT in a WiFi test. This
method can do things like install credentials into the TPM.
@param tpm_store TPMStore object representing the TPM on our DUT.
"""
if self.client_cert:
tpm_store.install_certificate(self.client_cert, self.client_cert_id)
self.pin = tpm_store.PIN
if self.client_key:
tpm_store.install_private_key(self.client_key, self.client_key_id)
self.pin = tpm_store.PIN
def get_shill_service_properties(self):
"""@return dict of shill service properties."""
ret = {self.SERVICE_PROPERTY_EAP_IDENTITY: self.eap_identity}
if self.pin:
ret[self.SERVICE_PROPERTY_EAP_PIN] = self.pin
if self.client_ca_cert:
# Technically, we could accept a list of certificates here, but we
# have no such tests.
ret[self.SERVICE_PROPERTY_CA_CERT_PEM] = [self.client_ca_cert]
if self.client_cert:
ret[self.SERVICE_PROPERTY_CLIENT_CERT_ID] = self.client_cert_id
if self.client_key:
ret[self.SERVICE_PROPERTY_PRIVATE_KEY_ID] = self.client_key_id
if self.use_system_cas is not None:
ret[self.SERVICE_PROPERTY_USE_SYSTEM_CAS] = self.use_system_cas
return ret
def get_hostapd_config(self):
"""@return dict fragment of hostapd configuration for security."""
return {'ieee8021x': 1, # Enable 802.1x support.
'eap_server' : 1, # Do EAP inside hostapd to avoid RADIUS.
'ca_cert': self.server_ca_cert_file,
'server_cert': self.server_cert_file,
'private_key': self.server_key_file,
'eap_user_file': self.server_eap_user_file}
class DynamicWEPConfig(EAPConfig):
"""Configuration settings bundle for dynamic WEP.
This is a WEP encrypted connection where the keys are negotiated after the
client authenticates via 802.1x.
"""
DEFAULT_REKEY_PERIOD = 20
def __init__(self, use_short_keys=False,
wep_rekey_period=DEFAULT_REKEY_PERIOD,
server_ca_cert=None, server_cert=None, server_key=None,
client_ca_cert=None, client_cert=None, client_key=None,
file_suffix=None, client_cert_id=None, client_key_id=None):
"""Construct a DynamicWEPConfig.
@param use_short_keys bool force hostapd to use 40 bit WEP keys.
@param wep_rekey_period int number of second between rekeys.
@param server_ca_cert string PEM encoded CA certificate for the server.
@param server_cert string PEM encoded identity certificate for server.
@param server_key string PEM encoded private key for server.
@param client_ca_cert string PEM encoded CA certificate for client.
@param client_cert string PEM encoded identity certificate for client.
@param client_key string PEM encoded private key for client.
@param file_suffix string unique file suffix on DUT.
@param client_cert_id string identifier for client certificate in TPM.
@param client_key_id string identifier for client private key in TPM.
"""
super(DynamicWEPConfig, self).__init__(
security='wep', file_suffix=file_suffix,
server_ca_cert=server_ca_cert, server_cert=server_cert,
server_key=server_key, client_ca_cert=client_ca_cert,
client_cert=client_cert, client_key=client_key,
client_cert_id=client_cert_id, client_key_id=client_key_id)
self.use_short_keys = use_short_keys
self.wep_rekey_period = wep_rekey_period
def get_hostapd_config(self):
"""@return dict fragment of hostapd configuration for security."""
ret = super(DynamicWEPConfig, self).get_hostapd_config()
key_len = 13 # 128 bit WEP, 104 secret bits.
if self.use_short_keys:
key_len = 5 # 64 bit WEP, 40 bits of secret.
ret.update({'wep_key_len_broadcast': key_len,
'wep_key_len_unicast': key_len,
'wep_rekey_period': self.wep_rekey_period})
return ret
def get_shill_service_properties(self):
"""@return dict of shill service properties."""
ret = super(DynamicWEPConfig, self).get_shill_service_properties()
ret.update({self.SERVICE_PROPERTY_EAP_KEY_MGMT: 'IEEE8021X'})
return ret
class WPAEAPConfig(EAPConfig):
"""Security type to set up a WPA tunnel via EAP-TLS negotiation."""
def __init__(self, file_suffix=None, use_system_cas=None,
server_ca_cert=None, server_cert=None, server_key=None,
client_ca_cert=None, client_cert=None, client_key=None,
client_cert_id=None, client_key_id=None,
eap_identity=None, server_eap_users=None,
wpa_mode=WPAConfig.MODE_PURE_WPA):
"""Construct a DynamicWEPConfig.
@param file_suffix string unique file suffix on DUT.
@param use_system_cas False iff we should ignore server certificates.
@param server_ca_cert string PEM encoded CA certificate for the server.
@param server_cert string PEM encoded identity certificate for server.
@param server_key string PEM encoded private key for server.
@param client_ca_cert string PEM encoded CA certificate for client.
@param client_cert string PEM encoded identity certificate for client.
@param client_key string PEM encoded private key for client.
@param client_cert_id string identifier for client certificate in TPM.
@param client_key_id string identifier for client private key in TPM.
@param eap_identity string user to authenticate as during EAP.
@param server_eap_users string contents of server EAP users file.
"""
super(WPAEAPConfig, self).__init__(
file_suffix=file_suffix, use_system_cas=use_system_cas,
server_ca_cert=server_ca_cert, server_cert=server_cert,
server_key=server_key, client_ca_cert=client_ca_cert,
client_cert=client_cert, client_key=client_key,
client_cert_id=client_cert_id, client_key_id=client_key_id,
eap_identity=eap_identity, server_eap_users=server_eap_users)
self.wpa_mode = wpa_mode
def get_hostapd_config(self):
"""@return dict fragment of hostapd configuration for security."""
ret = super(WPAEAPConfig, self).get_hostapd_config()
# If we wanted to expand test coverage to WPA2/PEAP combinations
# or particular ciphers, we'd have to let people set these
# settings manually. But for now, do the simple thing.
ret.update({'wpa': self.wpa_mode,
'wpa_pairwise': WPAConfig.CIPHER_CCMP,
'wpa_key_mgmt':'WPA-EAP'})
return ret
class Tunneled1xConfig(WPAEAPConfig):
"""Security type to set up a TTLS/PEAP connection.
Both PEAP and TTLS are tunneled protocols which use EAP inside of a TLS
secured tunnel. The secured tunnel is a symmetric key encryption scheme
negotiated under the protection of a public key in the server certificate.
Thus, we'll see server credentials in the form of certificates, but client
credentials in the form of passwords and a CA Cert to root the trust chain.
"""
TTLS_PREFIX = 'TTLS-'
LAYER1_TYPE_PEAP = 'PEAP'
LAYER1_TYPE_TTLS = 'TTLS'
LAYER2_TYPE_MSCHAPV2 = 'MSCHAPV2'
LAYER2_TYPE_MD5 = 'MD5'
LAYER2_TYPE_TTLS_MSCHAPV2 = TTLS_PREFIX + 'MSCHAPV2'
LAYER2_TYPE_TTLS_MSCHAP = TTLS_PREFIX + 'MSCHAP'
LAYER2_TYPE_TTLS_PAP = TTLS_PREFIX + 'PAP'
def __init__(self, server_ca_cert, server_cert, server_key,
client_ca_cert, eap_identity, password,
outer_protocol=LAYER1_TYPE_PEAP,
inner_protocol=LAYER2_TYPE_MD5,
client_password=None, file_suffix=None):
self.password = password
if client_password is not None:
# Override the password used on the client. This lets us set
# bad passwords for testing. However, we use the real password
# below for the server config.
self.password = client_password
self.inner_protocol = inner_protocol
# hostapd wants these surrounded in double quotes.
quote = lambda x: '"' + x + '"'
eap_users = map(' '.join, [('*', outer_protocol),
(quote(eap_identity), inner_protocol,
quote(password), '[2]')])
super(Tunneled1xConfig, self).__init__(
server_ca_cert=server_ca_cert,
server_cert=server_cert,
server_key=server_key,
server_eap_users='\n'.join(eap_users),
client_ca_cert=client_ca_cert,
eap_identity=eap_identity,
file_suffix=file_suffix)
def get_shill_service_properties(self):
"""@return dict of shill service properties."""
ret = super(Tunneled1xConfig, self).get_shill_service_properties()
ret.update({self.SERVICE_PROPERTY_EAP_PASSWORD: self.password})
if self.inner_protocol.startswith(self.TTLS_PREFIX):
auth_str = 'auth=' + self.inner_protocol[len(self.TTLS_PREFIX):]
ret.update({self.SERVICE_PROPERTY_INNER_EAP: auth_str})
return ret