blob: 1951ecc823836be3492160e717e4c8670212a2a4 [file] [log] [blame]
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Utilities for cellular tests."""
import copy, dbus, logging, os, string, tempfile
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import error
from autotest_lib.client.cros.cellular import cellular
from autotest_lib.client.cros import flimflam_test_path
import flimflam, mm
class Error(Exception):
pass
TIMEOUT = 30
def ConnectToCellular(flim, timeout=TIMEOUT):
"""Attempts to connect to a cell network using FlimFlam.
Args:
flim: A flimflam object
timeout: Timeout (in seconds) before giving up on connect
Returns:
a tuple of the service and the service state
Raises:
Error if connection fails or times out
"""
service = flim.FindCellularService()
if not service:
raise Error('Could not find cell service')
properties = service.GetProperties(utf8_strings=True)
logging.error('Properties are: %s', properties)
logging.info('Connecting to cell service: %s', service)
success, status = flim.ConnectService(
service=service,
assoc_timeout=timeout,
config_timeout=timeout)
if not success:
logging.error('Connect failed: %s' % status)
# TODO(rochberg): Turn off autoconnect
if 'Error.AlreadyConnected' not in status['reason']:
raise Error('Could not connect: %s.' % status)
connected_states = ['portal', 'online']
# We have to wait up to 10 seconds for state to go to portal
state = flim.WaitForServiceState(service=service,
expected_states=connected_states,
timeout=timeout,
ignore_failure=True)[0]
if not state in connected_states:
raise Error('Still in state %s' % state)
return (service, state)
def FindLastGoodAPN(service, default=None):
if not service:
return default
props = service.GetProperties()
if 'Cellular.LastGoodAPN' not in props:
return default
last_good_apn = props['Cellular.LastGoodAPN']
return last_good_apn.get('apn', default)
def DisconnectFromCellularService(bs, flim, service):
"""Attempts to disconnect from the supplied cellular service.
Args:
bs: A basestation object. Pass None to skip basestation-side checks
flim: A flimflam object
service: A cellular service object
"""
flim.DisconnectService(service) # Waits for flimflam state to go to idle
if bs:
verifier = bs.GetAirStateVerifier()
# This is racy: The modem is free to report itself as
# disconnected before it actually finishes tearing down its RF
# connection.
verifier.AssertDataStatusIn([
cellular.UeGenericDataStatus.DISCONNECTING,
cellular.UeGenericDataStatus.REGISTERED,
cellular.UeGenericDataStatus.NONE,])
def _ModemIsFullyDisconnected():
return verifier.IsDataStatusIn([
cellular.UeGenericDataStatus.REGISTERED,
cellular.UeGenericDataStatus.NONE,])
utils.poll_for_condition(
_ModemIsFullyDisconnected,
timeout=20,
exception=Error('modem not disconnected from base station'))
def _EnumerateModems(manager):
"""Get a set of modem paths."""
return set([x[1] for x in mm.EnumerateDevices(manager)])
def _SawNewModem(manager, preexisting_modems, old_modem):
current_modems = _EnumerateModems(manager)
if old_modem in current_modems:
return False
# NB: This fails if an unrelated modem disappears. Not fixing
# until we support > 1 modem
return preexisting_modems != current_modems
def _WaitForModemToReturn(manager, preexisting_modems_original, modem_path):
preexisting_modems = copy.copy(preexisting_modems_original)
preexisting_modems.remove(modem_path)
utils.poll_for_condition(
lambda: _SawNewModem(manager, preexisting_modems, modem_path),
timeout=50,
exception=Error('Modem did not come back after settings change'))
current_modems = _EnumerateModems(manager)
new_modems = [x for x in current_modems - preexisting_modems]
if len(new_modems) != 1:
raise Error('Unexpected modem list change: %s vs %s' % (
current_modems, new_modems))
logging.info('New modem: %s' % new_modems[0])
return new_modems[0]
def SetFirmwareForTechnologyFamily(manager, modem_path, family):
"""Set the modem to firmware. Return potentially-new modem path."""
preexisting_modems = _EnumerateModems(manager)
# We do not currently support any multi-family modems besides Gobi
gobi = manager.GobiModem(modem_path)
if not gobi:
raise Error('Modem %s does not support %s, cannot change technologies' %
modem_path, family)
logging.info('Changing firmware to technology family %s' % family)
FamilyToCarrierString = {
cellular.TechnologyFamily.UMTS: 'Generic UMTS',
cellular.TechnologyFamily.CDMA: 'Verizon Wireless',}
gobi.SetCarrier(FamilyToCarrierString[family])
return _WaitForModemToReturn(manager, preexisting_modems, modem_path)
# A test PRL that has an ID of 3333 and sets the device to aquire the
# default config of an 8960 with system_id 331. Base64 encoding
# Generated with "base64 < prl"
TEST_PRL_3333 = (
'ADENBQMAAMAAAYADAgmABgIKDQsEAYAKDUBAAQKWAAIAQGAJApYAAgAw8BAAAAD/Uw=='.
decode('base64_codec'))
# A modem with this MDN will always report itself as activated
TESTING_MDN = dbus.String('1115551212', variant_level=1)
def _IsCdmaModemConfiguredCorrectly(manager, modem_path):
"""Returns true iff the CDMA modem at modem_path is configured correctly."""
# We don't test for systemID because the PRL should take care of
# that.
status = manager.SimpleModem(modem_path).GetStatus()
required_settings = {'mdn': TESTING_MDN,
'min': TESTING_MDN,
'prl_version': 3333}
configured_correctly = True
for rk, rv in required_settings.iteritems():
if rk not in status or rv != status[rk]:
logging.error('_CheckCdmaModemStatus: %s: expected %s, got %s' % (
rk, rv, status.get(rk)))
configured_correctly = False
return configured_correctly
def PrepareCdmaModem(manager, modem_path):
"""Configure a CDMA device (including PRL, MIN, and MDN)."""
if _IsCdmaModemConfiguredCorrectly(manager, modem_path):
return modem_path
logging.info('Updating modem settings')
preexisting_modems = _EnumerateModems(manager)
cdma = manager.CdmaModem(modem_path)
with tempfile.NamedTemporaryFile() as f:
os.chmod(f.name, 0744)
f.write(TEST_PRL_3333)
f.flush()
logging.info('Calling ActivateManual to change PRL')
cdma.ActivateManual({
'mdn': TESTING_MDN,
'min': TESTING_MDN,
'prlfile': dbus.String(f.name, variant_level=1),
'system_id': dbus.UInt16(331, variant_level=1), # Default 8960 SID
'spc': dbus.String('000000'),})
new_path = _WaitForModemToReturn(
manager, preexisting_modems, modem_path)
if not _IsCdmaModemConfiguredCorrectly(manager, new_path):
raise Error('Modem configuration failed')
return new_path
def GetCurrentTechnologyFamily(manager, modem_path):
"""Returns the technology family of the specified modem."""
try:
manager.GetAll(mm.ModemManager.GSM_CARD_INTERFACE, modem_path)
return cellular.TechnologyFamily.UMTS
except dbus.exceptions.DBusException:
return cellular.TechnologyFamily.CDMA
def PrepareModemForTechnology(modem_path, target_technology):
"""Prepare modem for the technology: Sets things like firmware, PRL."""
manager, modem_path = mm.PickOneModem(modem_path)
logging.info('Found modem %s' % modem_path)
current_family = GetCurrentTechnologyFamily(manager, modem_path)
target_family = cellular.TechnologyToFamily[target_technology]
if current_family != target_family:
modem_path = SetFirmwareForTechnologyFamily(
manager, modem_path, target_family)
if target_family == cellular.TechnologyFamily.CDMA:
modem_path = PrepareCdmaModem(manager, modem_path)
return modem_path
def FactoryResetModem(modem_pattern, spc='000000'):
"""Factory resets modem, returns DBus pathname of modem after reset."""
manager, modem_path = mm.PickOneModem(modem_pattern)
preexisting_modems = _EnumerateModems(manager)
modem = manager.Modem(modem_path)
modem.FactoryReset(spc)
return _WaitForModemToReturn(manager, preexisting_modems, modem_path)
class OtherDeviceShutdownContext(object):
"""Context manager that shuts down other devices.
Usage:
with cell_tools.OtherDeviceShutdownContext(flim, 'cellular'):
block
TODO(rochberg): Replace flimflam.DeviceManager with this
"""
def __init__(self, device_type, flim):
self.device_manager = flimflam.DeviceManager(flim)
self.device_manager.ShutdownAllExcept(device_type)
def __enter__(self):
return self
def __exit__(self, exception, value, traceback):
self.device_manager.RestoreDevices()
return False
class BlackholeContext(object):
"""Context manager which uses IP tables to black hole access to hosts.
A host in hosts can be either a hostname or an IP address. Using a
hostname is potentially troublesome here due to DNS inconsistencies
and load balancing, but iptables is generally smart with hostnames,
inserting rules for each of the N ip addresses returned by a name
lookup.
Usage:
with cell_tools.BlackholeContext(hosts):
block
"""
def __init__(self, hosts):
self.hosts = hosts
def _rules(self):
rules = utils.system_output('iptables -S OUTPUT').splitlines()
rules += utils.system_output('iptables -S INPUT').splitlines()
return set(rules)
def __enter__(self):
"""Preserve original list of rules and blacklist self.hosts."""
self.original_rules = self._rules()
for host, chain in self.hosts:
if chain == 'OUTPUT':
host_flag = '-d'
else:
host_flag = '-s'
cmd = ' '.join(['iptables',
'-I %s' % chain,
'%s %s' % (host_flag, host),
'-j REJECT'])
utils.run(cmd)
return self
def __exit__(self, exception, value, traceback):
"""Remove all rules not in the original list."""
for rule in self._rules():
if rule in self.original_rules:
logging.info('preserving %s' % rule)
continue
rule = string.replace(rule, '-A', '-D', 1)
logging.info('removing %s' % rule)
utils.run('iptables %s' % rule)
return False
class AutoConnectContext(object):
"""Context manager which sets autoconnect to either true or false.
Enable or Disable autoconnect for the cellular service.
Restore it when done.
Usage:
with cell_tools.DisableAutoConnectContext(device, flim, autoconnect):
block
"""
def __init__(self, device, flim, autoconnect):
self.device = device
self.flim = flim
self.autoconnect = autoconnect
self.autoconnect_changed = False
def PowerOnDevice(self, device):
"""Power on a flimflam device, ignoring in progress errors."""
logging.info('powered = %s' % device.GetProperties()['Powered'])
if device.GetProperties()['Powered']:
return
try:
device.Enable()
except dbus.exceptions.DBusException, e:
if e._dbus_error_name != 'org.chromium.flimflam.Error.InProgress':
raise e
def __enter__(self):
"""Power up device, get the service and disable autoconnect."""
changed = False
self.PowerOnDevice(self.device)
# TODO(jglasgow): generalize to use services associated with device
service = self.flim.FindCellularService(timeout=40)
if not service:
raise error.TestFail('No cellular service available.')
props = service.GetProperties()
favorite = props['Favorite']
if not favorite:
logging.info('Enabling Favorite by connecting to service.')
ConnectToCellular(self.flim)
props = service.GetProperties()
favorite = props['Favorite']
autoconnect = props['AutoConnect']
logging.info('Favorite = %s, AutoConnect = %s' % (
favorite, autoconnect))
if autoconnect != self.autoconnect:
logging.info('Setting AutoConnect = %s.', self.autoconnect)
service.SetProperty('AutoConnect', dbus.Boolean(self.autoconnect))
props = service.GetProperties()
favorite = props['Favorite']
autoconnect = props['AutoConnect']
changed = True
if not favorite:
raise error.TestFail('Favorite=False, but we want it to be True')
if autoconnect != self.autoconnect:
raise error.TestFail('AutoConnect is %s, but we want it to be %s' %
(autoconnect, self.autoconnect))
self.autoconnect_changed = changed
return self
def __exit__(self, exception, value, traceback):
"""Restore autoconnect state if we changed it."""
if not self.autoconnect_changed:
return False
try:
self.PowerOnDevice(self.device)
except Exception as e:
if exception:
logging.error(
'Exiting AutoConnectContext with one exception, but ' +
'PowerOnDevice raised another')
logging.error(
'Swallowing PowerOnDevice exception %s' % e)
return False
else:
raise e
# TODO(jglasgow): generalize to use services associated with
# device, and restore state only on changed services
service = self.flim.FindCellularService()
if not service:
logging.error('Cannot find cellular service. '
'Autoconnect state not restored.')
return False
service.SetProperty('AutoConnect', dbus.Boolean(not self.autoconnect))
return False