| # Copyright (c) 2014 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import logging |
| |
| import common |
| from autotest_lib.client.cros.cellular.pseudomodem import modem_3gpp |
| from autotest_lib.client.cros.cellular.pseudomodem import pm_constants |
| from autotest_lib.client.cros.cellular.pseudomodem import state_machine |
| from autotest_lib.client.cros.cellular.pseudomodem import state_machine_factory |
| |
| class InteractiveStateMachineFactory(state_machine_factory.StateMachineFactory): |
| """ Run relevant state machines in interactive mode. """ |
| def __init__(self): |
| super(InteractiveStateMachineFactory, self).__init__() |
| self.SetInteractive(pm_constants.STATE_MACHINE_ENABLE) |
| self.SetInteractive(pm_constants.STATE_MACHINE_REGISTER) |
| |
| |
| class ScanMachine(state_machine.StateMachine): |
| """ |
| Handle shill initiated 3GPP scan request. |
| |
| A simple machine that allows the test to hook into the Scan asynchronous |
| call. |
| |
| """ |
| # State machine states. |
| SCAN_STATE = 'Scan' |
| DONE_STATE = 'Done' |
| |
| def __init__(self, modem): |
| super(ScanMachine, self).__init__(modem) |
| self._state = ScanMachine.SCAN_STATE |
| |
| |
| def _HandleScanState(self): |
| """ The only real state in this machine. """ |
| self._modem.DoScan() |
| self._state = ScanMachine.DONE_STATE |
| return True |
| |
| |
| def _GetCurrentState(self): |
| return self._state |
| |
| |
| def _GetModemStateFunctionMap(self): |
| return { |
| ScanMachine.SCAN_STATE: ScanMachine._HandleScanState, |
| # ScanMachine.DONE_STATE is the final state. So, no handler. |
| } |
| |
| |
| def _ShouldStartStateMachine(self): |
| return True |
| |
| |
| class ScanStateMachineFactory(state_machine_factory.StateMachineFactory): |
| """ Extend StateMachineFactory to create an interactive ScanMachine. """ |
| def ScanMachine(self, *args, **kwargs): |
| """ Create a ScanMachine when needed in the modem. """ |
| machine = ScanMachine(*args, **kwargs) |
| machine.EnterInteractiveMode(self._bus) |
| return machine |
| |
| |
| class AsyncScanModem(modem_3gpp.Modem3gpp): |
| """ 3GPP modem that uses ScanMachine for the Scan call. """ |
| def __init__(self): |
| super(AsyncScanModem, self).__init__( |
| state_machine_factory=ScanStateMachineFactory()) |
| |
| |
| def Scan(self, return_cb, raise_cb): |
| """ Overriden from Modem3gpp. """ |
| # Stash away the scan_ok callback for when the Scan finishes. |
| logging.debug('Network scan initiated.') |
| self._scan_ok_callback = return_cb |
| self._scan_failed_callback = raise_cb |
| self._scan_machine = self._state_machine_factory.ScanMachine(self) |
| self._scan_machine.Start() |
| |
| |
| def DoScan(self): |
| """ Defer to Modem3gpp to take the original |SyncScan| action. """ |
| # We're done scanning, drop |_scan_machine| reference. |
| self._scan_machine = None |
| try: |
| scan_result = super(AsyncScanModem, self).SyncScan() |
| except dbus.exceptions.DBusException as e: |
| logging.warning('Network scan failed') |
| self._scan_failed_callback(e) |
| return |
| |
| logging.debug('Network scan completed.') |
| self._scan_ok_callback(scan_result) |