| # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Utilities for cellular tests.""" |
| import copy, dbus, os, tempfile |
| |
| # TODO(thieule): Consider renaming mm.py, mm1.py, modem.py, etc to be more |
| # descriptive (crosbug.com/37060). |
| import common |
| from autotest_lib.client.bin import utils |
| from autotest_lib.client.common_lib import error |
| from autotest_lib.client.cros.cellular import cellular |
| from autotest_lib.client.cros.cellular import cellular_system_error |
| from autotest_lib.client.cros.cellular import mm |
| from autotest_lib.client.cros.cellular import modem |
| |
| from autotest_lib.client.cros import flimflam_test_path |
| import flimflam |
| |
| |
| TIMEOUT = 30 |
| SERVICE_TIMEOUT = 60 |
| |
| import cellular_logging |
| |
| logger = cellular_logging.SetupCellularLogging('cell_tools') |
| |
| |
| def ConnectToCellular(flim, timeout=TIMEOUT): |
| """Attempts to connect to a cell network using FlimFlam. |
| |
| Args: |
| flim: A flimflam object |
| timeout: Timeout (in seconds) before giving up on connect |
| |
| Returns: |
| a tuple of the service and the service state |
| |
| Raises: |
| Error if connection fails or times out |
| """ |
| |
| service = flim.FindCellularService(timeout=timeout) |
| if not service: |
| raise cellular_system_error.ConnectionFailure( |
| 'Could not find cell service') |
| properties = service.GetProperties(utf8_strings=True) |
| logger.error('Properties are: %s', properties) |
| |
| logger.info('Connecting to cell service: %s', service) |
| |
| states = ['portal', 'online', 'idle'] |
| state = flim.WaitForServiceState(service=service, |
| expected_states=states, |
| timeout=timeout, |
| ignore_failure=True)[0] |
| logger.debug('Cell connection state : %s ' % state) |
| connected_states = ['portal', 'online'] |
| if state in connected_states: |
| logger.debug('Looks good, skip ConnectService') |
| return service, state |
| else: |
| logger.debug('Trying to ConnectService') |
| |
| success, status = flim.ConnectService( |
| service=service, |
| assoc_timeout=timeout, |
| config_timeout=timeout) |
| |
| if not success: |
| logger.error('Connect failed: %s' % status) |
| # TODO(rochberg): Turn off autoconnect |
| if 'Error.AlreadyConnected' not in status['reason']: |
| raise cellular_system_error.ConnectionFailure( |
| 'Could not connect: %s.' % status) |
| |
| state = flim.WaitForServiceState(service=service, |
| expected_states=connected_states, |
| timeout=timeout, |
| ignore_failure=True)[0] |
| if not state in connected_states: |
| raise cellular_system_error.BadState( |
| 'Still in state %s, expecting one of: %s ' % |
| (state, str(connected_states))) |
| |
| return service, state |
| |
| |
| def FindLastGoodAPN(service, default=None): |
| if not service: |
| return default |
| props = service.GetProperties() |
| if 'Cellular.LastGoodAPN' not in props: |
| return default |
| last_good_apn = props['Cellular.LastGoodAPN'] |
| return last_good_apn.get('apn', default) |
| |
| |
| def DisconnectFromCellularService(bs, flim, service): |
| """Attempts to disconnect from the supplied cellular service. |
| |
| Args: |
| bs: A basestation object. Pass None to skip basestation-side checks |
| flim: A flimflam object |
| service: A cellular service object |
| """ |
| |
| flim.DisconnectService(service) # Waits for flimflam state to go to idle |
| |
| if bs: |
| verifier = bs.GetAirStateVerifier() |
| # This is racy: The modem is free to report itself as |
| # disconnected before it actually finishes tearing down its RF |
| # connection. |
| verifier.AssertDataStatusIn([ |
| cellular.UeGenericDataStatus.DISCONNECTING, |
| cellular.UeGenericDataStatus.REGISTERED, |
| cellular.UeGenericDataStatus.NONE,]) |
| |
| def _ModemIsFullyDisconnected(): |
| return verifier.IsDataStatusIn([ |
| cellular.UeGenericDataStatus.REGISTERED, |
| cellular.UeGenericDataStatus.NONE,]) |
| |
| utils.poll_for_condition( |
| _ModemIsFullyDisconnected, |
| timeout=20, |
| exception=cellular_system_error.BadState( |
| 'modem not disconnected from base station')) |
| |
| |
| def _EnumerateModems(manager): |
| """Get a set of modem paths.""" |
| return set([x[1] for x in mm.EnumerateDevices(manager)]) |
| |
| |
| def _SawNewModem(manager, preexisting_modems, old_modem): |
| current_modems = _EnumerateModems(manager) |
| if old_modem in current_modems: |
| return False |
| # NB: This fails if an unrelated modem disappears. Not fixing |
| # until we support > 1 modem |
| return preexisting_modems != current_modems |
| |
| |
| def _WaitForModemToReturn(manager, preexisting_modems_original, modem_path): |
| preexisting_modems = copy.copy(preexisting_modems_original) |
| preexisting_modems.remove(modem_path) |
| |
| utils.poll_for_condition( |
| lambda: _SawNewModem(manager, preexisting_modems, modem_path), |
| timeout=50, |
| exception=cellular_system_error.BadState( |
| 'Modem did not come back after settings change')) |
| |
| current_modems = _EnumerateModems(manager) |
| |
| new_modems = [x for x in current_modems - preexisting_modems] |
| if len(new_modems) != 1: |
| raise cellular_system_error.BadState( |
| 'Unexpected modem list change: %s vs %s' % |
| (current_modems, new_modems)) |
| |
| logger.info('New modem: %s' % new_modems[0]) |
| return new_modems[0] |
| |
| |
| def SetFirmwareForTechnologyFamily(manager, modem_path, family): |
| """Set the modem to firmware. Return potentially-new modem path.""" |
| # todo(byronk): put this in a modem object? |
| if family == cellular.TechnologyFamily.LTE: |
| return # nothing to set up on a Pixel. todo(byronk) how about others? |
| logger.debug('SetFirmwareForTechnologyFamily : manager : %s ' % manager) |
| logger.debug('SetFirmwareForTechnologyFamily : modem_path : %s ' % |
| modem_path) |
| logger.debug('SetFirmwareForTechnologyFamily : family : %s ' % family) |
| preexisting_modems = _EnumerateModems(manager) |
| # We do not currently support any multi-family modems besides Gobi |
| gobi = manager.GetModem(modem_path).GobiModem() |
| if not gobi: |
| raise cellular_system_error.BadScpiCommand( |
| 'Modem %s does not support %s, cannot change technologies' % |
| modem_path, family) |
| |
| logger.info('Changing firmware to technology family %s' % family) |
| |
| FamilyToCarrierString = { |
| cellular.TechnologyFamily.UMTS: 'Generic UMTS', |
| cellular.TechnologyFamily.CDMA: 'Verizon Wireless',} |
| |
| gobi.SetCarrier(FamilyToCarrierString[family]) |
| return _WaitForModemToReturn(manager, preexisting_modems, modem_path) |
| |
| |
| # A test PRL that has an ID of 3333 and sets the device to aquire the |
| # default config of an 8960 with system_id 331. Base64 encoding |
| # Generated with "base64 < prl" |
| |
| TEST_PRL_3333 = ( |
| 'ADENBQMAAMAAAYADAgmABgIKDQsEAYAKDUBAAQKWAAICQGAJApYAAgIw8BAAAQDhWA=='. |
| decode('base64_codec')) |
| |
| |
| # A modem with this MDN will always report itself as activated |
| TESTING_MDN = dbus.String('1115551212', variant_level=1) |
| |
| |
| def _IsCdmaModemConfiguredCorrectly(manager, modem_path): |
| """Returns true iff the CDMA modem at modem_path is configured correctly.""" |
| # We don't test for systemID because the PRL should take care of |
| # that. |
| |
| status = manager.GetModem(modem_path).SimpleModem().GetStatus() |
| |
| required_settings = {'mdn': TESTING_MDN, |
| 'min': TESTING_MDN, |
| 'prl_version': 3333} |
| configured_correctly = True |
| |
| for rk, rv in required_settings.iteritems(): |
| if rk not in status or rv != status[rk]: |
| logger.error('_CheckCdmaModemStatus: %s: expected %s, got %s' % ( |
| rk, rv, status.get(rk))) |
| configured_correctly = False |
| return configured_correctly |
| |
| |
| def PrepareCdmaModem(manager, modem_path): |
| """Configure a CDMA device (including PRL, MIN, and MDN).""" |
| |
| if _IsCdmaModemConfiguredCorrectly(manager, modem_path): |
| return modem_path |
| |
| logger.info('Updating modem settings') |
| preexisting_modems = _EnumerateModems(manager) |
| cdma = manager.GetModem(modem_path).CdmaModem() |
| |
| with tempfile.NamedTemporaryFile() as f: |
| os.chmod(f.name, 0744) |
| f.write(TEST_PRL_3333) |
| f.flush() |
| logger.info('Calling ActivateManual to change PRL') |
| |
| cdma.ActivateManual({ |
| 'mdn': TESTING_MDN, |
| 'min': TESTING_MDN, |
| 'prlfile': dbus.String(f.name, variant_level=1), |
| 'system_id': dbus.UInt16(331, variant_level=1), # Default 8960 SID |
| 'spc': dbus.String('000000'),}) |
| new_path = _WaitForModemToReturn( |
| manager, preexisting_modems, modem_path) |
| |
| if not _IsCdmaModemConfiguredCorrectly(manager, new_path): |
| raise cellular_system_error.BadState('Modem configuration failed') |
| return new_path |
| |
| |
| def PrepareModemForTechnology(modem_path, target_technology): |
| """Prepare modem for the technology: Sets things like firmware, PRL.""" |
| |
| manager, modem_path = mm.PickOneModem(modem_path) |
| |
| logger.info('Found modem %s' % modem_path) |
| |
| |
| # todo(byronk) : This returns TechnologyFamily:UMTS on a Pixel. ???? |
| current_family = manager.GetModem(modem_path).GetCurrentTechnologyFamily() |
| target_family = cellular.TechnologyToFamily[target_technology] |
| |
| if current_family != target_family: |
| logger.debug('Modem Current Family: %s ' % current_family) |
| logger.debug('Modem Target Family : %s ' %target_family ) |
| modem_path = SetFirmwareForTechnologyFamily( |
| manager, modem_path, target_family) |
| |
| if target_family == cellular.TechnologyFamily.CDMA: |
| modem_path = PrepareCdmaModem(manager, modem_path) |
| # Force the modem to report that is has been activated since we |
| # use a custom PRL and have already manually activated it. |
| manager.GetModem(modem_path).GobiModem().ForceModemActivatedStatus() |
| |
| # When testing EVDO, we need to force the modem to register with EVDO |
| # directly (bypassing CDMA 1x RTT) else the modem will not register |
| # properly because it looks for CDMA 1x RTT first but can't find it |
| # because the call box can only emulate one technology at a time (EVDO). |
| try: |
| if target_technology == cellular.Technology.EVDO_1X: |
| network_preference = modem.Modem.NETWORK_PREFERENCE_EVDO_1X |
| else: |
| network_preference = modem.Modem.NETWORK_PREFERENCE_AUTOMATIC |
| gobi = manager.GetModem(modem_path).GobiModem() |
| gobi.SetNetworkPreference(network_preference) |
| except AttributeError: |
| # Not a Gobi modem |
| pass |
| |
| return modem_path |
| |
| |
| def FactoryResetModem(modem_pattern, spc='000000'): |
| """Factory resets modem, returns DBus pathname of modem after reset.""" |
| manager, modem_path = mm.PickOneModem(modem_pattern) |
| preexisting_modems = _EnumerateModems(manager) |
| modem = manager.GetModem(modem_path).Modem() |
| modem.FactoryReset(spc) |
| return _WaitForModemToReturn(manager, preexisting_modems, modem_path) |
| |
| |
| class OtherDeviceShutdownContext(object): |
| """Context manager that shuts down other devices. |
| |
| Usage: |
| with cell_tools.OtherDeviceShutdownContext('cellular'): |
| block |
| |
| TODO(rochberg): Replace flimflam.DeviceManager with this |
| """ |
| |
| def __init__(self, device_type): |
| self.device_type = device_type |
| self.device_manager = None |
| |
| def __enter__(self): |
| self.device_manager = flimflam.DeviceManager(flimflam.FlimFlam()) |
| self.device_manager.ShutdownAllExcept(self.device_type) |
| return self |
| |
| def __exit__(self, exception, value, traceback): |
| if self.device_manager: |
| self.device_manager.RestoreDevices() |
| return False |
| |
| |
| class AutoConnectContext(object): |
| """Context manager which sets autoconnect to either true or false. |
| |
| Enable or Disable autoconnect for the cellular service. |
| Restore it when done. |
| |
| Usage: |
| with cell_tools.DisableAutoConnectContext(device, flim, autoconnect): |
| block |
| """ |
| |
| def __init__(self, device, flim, autoconnect): |
| self.device = device |
| self.flim = flim |
| self.autoconnect = autoconnect |
| self.autoconnect_changed = False |
| |
| def PowerOnDevice(self, device): |
| """Power on a flimflam device, ignoring in progress errors.""" |
| logger.info('powered = %s' % device.GetProperties()['Powered']) |
| if device.GetProperties()['Powered']: |
| return |
| try: |
| device.Enable() |
| except dbus.exceptions.DBusException, e: |
| if e._dbus_error_name != 'org.chromium.flimflam.Error.InProgress': |
| raise e |
| |
| def __enter__(self): |
| """Power up device, get the service and disable autoconnect.""" |
| changed = False |
| self.PowerOnDevice(self.device) |
| |
| # Use SERVICE_TIMEOUT*2 here because it may take SERVICE_TIMEOUT |
| # seconds for the modem to disconnect when the base emulator is taken |
| # offline for reconfiguration and then another SERVICE_TIMEOUT |
| # seconds for the modem to reconnect after the base emulator is |
| # brought back online. |
| # |
| # TODO(jglasgow): generalize to use services associated with device |
| service = self.flim.FindCellularService(timeout=SERVICE_TIMEOUT*2) |
| if not service: |
| raise error.TestFail('No cellular service available.') |
| |
| # Always set the AutoConnect property even if the requested value |
| # is the same so that shill will retain the AutoConnect property, else |
| # shill may override it. |
| props = service.GetProperties() |
| autoconnect = props['AutoConnect'] |
| logger.info('AutoConnect = %s' % autoconnect) |
| logger.info('Setting AutoConnect = %s.', self.autoconnect) |
| service.SetProperty('AutoConnect', dbus.Boolean(self.autoconnect)) |
| |
| if autoconnect != self.autoconnect: |
| props = service.GetProperties() |
| autoconnect = props['AutoConnect'] |
| changed = True |
| |
| # Make sure the cellular service gets persisted by taking it out of |
| # the ephemeral profile. |
| if not props['Profile']: |
| manager_props = self.flim.manager.GetProperties() |
| active_profile = manager_props['ActiveProfile'] |
| logger.info("Setting cellular service profile to %s", |
| active_profile) |
| service.SetProperty('Profile', active_profile) |
| |
| if autoconnect != self.autoconnect: |
| raise error.TestFail('AutoConnect is %s, but we want it to be %s' % |
| (autoconnect, self.autoconnect)) |
| |
| self.autoconnect_changed = changed |
| |
| return self |
| |
| def __exit__(self, exception, value, traceback): |
| """Restore autoconnect state if we changed it.""" |
| if not self.autoconnect_changed: |
| return False |
| |
| try: |
| self.PowerOnDevice(self.device) |
| except Exception as e: |
| if exception: |
| logger.error( |
| 'Exiting AutoConnectContext with one exception, but ' + |
| 'PowerOnDevice raised another') |
| logger.error( |
| 'Swallowing PowerOnDevice exception %s' % e) |
| return False |
| else: |
| raise e |
| |
| # TODO(jglasgow): generalize to use services associated with |
| # device, and restore state only on changed services |
| service = self.flim.FindCellularService() |
| if not service: |
| logger.error('Cannot find cellular service. ' |
| 'Autoconnect state not restored.') |
| return False |
| service.SetProperty('AutoConnect', dbus.Boolean(not self.autoconnect)) |
| |
| return False |