| # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import logging |
| |
| import pm_errors |
| import state_machine |
| |
| from autotest_lib.client.cros.cellular import mm1_constants |
| |
| class RegisterMachine(state_machine.StateMachine): |
| """ |
| RegisterMachine handles the state transitions involved in bringing the |
| modem to the REGISTERED state. |
| |
| """ |
| def __init__(self, modem, operator_code="", return_cb=None, raise_cb=None): |
| super(RegisterMachine, self).__init__(modem) |
| self._networks = None |
| self._operator_code = operator_code |
| self._return_cb = return_cb |
| self._raise_cb = raise_cb |
| |
| |
| def Cancel(self): |
| """ Overriden from superclass. """ |
| logging.info('RegisterMachine: Canceling register.') |
| super(RegisterMachine, self).Cancel() |
| state = self._modem.Get(mm1_constants.I_MODEM, 'State') |
| reason = mm1_constants.MM_MODEM_STATE_CHANGE_REASON_USER_REQUESTED |
| if state == mm1_constants.MM_MODEM_STATE_SEARCHING: |
| logging.info('RegisterMachine: Setting state to ENABLED.') |
| self._modem.ChangeState(mm1_constants.MM_MODEM_STATE_ENABLED, |
| reason) |
| self._modem.SetRegistrationState( |
| mm1_constants.MM_MODEM_3GPP_REGISTRATION_STATE_IDLE) |
| self._modem.register_step = None |
| if self._raise_cb: |
| self._raise_cb( |
| pm_errors.MMCoreError(pm_errors.MMCoreError.CANCELLED, |
| 'Cancelled')) |
| |
| |
| def _HandleEnabledState(self): |
| logging.info('RegisterMachine: Modem is ENABLED.') |
| logging.info('RegisterMachine: Setting registration state ' |
| 'to SEARCHING.') |
| self._modem.SetRegistrationState( |
| mm1_constants.MM_MODEM_3GPP_REGISTRATION_STATE_SEARCHING) |
| logging.info('RegisterMachine: Setting state to SEARCHING.') |
| reason = mm1_constants.MM_MODEM_STATE_CHANGE_REASON_USER_REQUESTED |
| self._modem.ChangeState(mm1_constants.MM_MODEM_STATE_SEARCHING, reason) |
| logging.info('RegisterMachine: Starting network scan.') |
| try: |
| self._networks = self._modem.SyncScan() |
| except pm_errors.MMError as e: |
| self._modem.register_step = None |
| logging.error('An error occurred during network scan: ' + str(e)) |
| self._modem.ChangeState( |
| mm1_constants.MM_MODEM_STATE_ENABLED, |
| mm1_constants.MODEM_STATE_CHANGE_REASON_UNKNOWN) |
| self._modem.SetRegistrationState( |
| mm1_constants.MM_MODEM_3GPP_REGISTRATION_STATE_IDLE) |
| if self._raise_cb: |
| self._raise_cb(e) |
| return False |
| logging.info('RegisterMachine: Found networks: ' + str(self._networks)) |
| return True |
| |
| |
| def _HandleSearchingState(self): |
| logging.info('RegisterMachine: Modem is SEARCHING.') |
| if not self._networks: |
| logging.info('RegisterMachine: Scan returned no networks.') |
| logging.info('RegisterMachine: Setting state to ENABLED.') |
| self._modem.ChangeState( |
| mm1_constants.MM_MODEM_STATE_ENABLED, |
| mm1_constants.MM_MODEM_STATE_CHANGE_REASON_UNKNOWN) |
| # TODO(armansito): Figure out the correct registration |
| # state to transition to when no network is present. |
| logging.info('RegisterMachine: Setting registration state ' |
| 'to IDLE.') |
| self._modem.SetRegistrationState( |
| mm1_constants.MM_MODEM_3GPP_REGISTRATION_STATE_IDLE) |
| self._modem.register_step = None |
| if self._raise_cb: |
| self._raise_cb(pm_errors.MMMobileEquipmentError( |
| pm_errors.MMMobileEquipmentError.NO_NETWORK, |
| 'No networks were found to register.')) |
| return False |
| |
| # Pick the last network in the list. Roaming networks will come before |
| # the home network which makes the last item in the list the home |
| # network. |
| if self._operator_code: |
| if not self._operator_code in self._modem.scanned_networks: |
| if self._raise_cb: |
| self._raise_cb(pm_errors.MMCoreError( |
| pm_errors.MMCoreError.FAILED, |
| "Unknown network: " + self._operator_code)) |
| return False |
| network = self._modem.scanned_networks[self._operator_code] |
| else: |
| network = self._networks[-1] |
| logging.info( |
| 'RegisterMachine: Registering to network: ' + str(network)) |
| self._modem.SetRegistered( |
| network['operator-code'], |
| network['operator-long']) |
| |
| # The previous call should have set the state to REGISTERED. |
| self._modem.register_step = None |
| |
| if self._return_cb: |
| self._return_cb() |
| return False |
| |
| |
| def _GetModemStateFunctionMap(self): |
| return { |
| mm1_constants.MM_MODEM_STATE_ENABLED: |
| RegisterMachine._HandleEnabledState, |
| mm1_constants.MM_MODEM_STATE_SEARCHING: |
| RegisterMachine._HandleSearchingState |
| } |
| |
| |
| def _ShouldStartStateMachine(self): |
| if self._modem.register_step and self._modem.register_step != self: |
| # There is already an ongoing register operation. |
| message = 'Register operation already in progress.' |
| logging.info(message) |
| error = pm_errors.MMCoreError(pm_errors.MMCoreError.IN_PROGRESS, |
| message) |
| if self._raise_cb: |
| self._raise_cb(error) |
| else: |
| raise error |
| elif self._modem.register_step is None: |
| # There is no register operation going on, canceled or otherwise. |
| state = self._modem.Get(mm1_constants.I_MODEM, 'State') |
| if state != mm1_constants.MM_MODEM_STATE_ENABLED: |
| message = 'Cannot initiate register while in state %d, ' \ |
| 'state needs to be ENABLED.' % state |
| error = pm_errors.MMCoreError(pm_errors.MMCoreError.WRONG_STATE, |
| message) |
| if self._raise_cb: |
| self._raise_cb(error) |
| else: |
| raise error |
| |
| logging.info('Starting Register.') |
| self._modem.register_step = self |
| return True |