| # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import collections |
| import inspect |
| import logging |
| |
| import at_channel |
| import task_loop |
| import wardmodem_exceptions as wme |
| |
| MODEM_RESPONSE_TIMEOUT_MILLISECONDS = 30000 |
| ARG_PLACEHOLDER = '*' |
| |
| class ATTransceiverMode(object): |
| """ |
| Enum to specify what mode the ATTransceiver is operating in. |
| |
| There are three modes. These modes determine how the commands to/from |
| the modemmanager are routed. |
| WARDMODEM: modemmanager interacts with wardmodem alone. |
| SPLIT_VERIFY: modemmanager commands are sent to both the wardmodem |
| and the physical modem on the device. Responses from |
| wardmodem are verified against responses from the physical |
| modem. In case of a mismatch, wardmodem's response is |
| chosen, and a warning is issued. |
| PASS_THROUGH: modemmanager commands are routed to/from the physical |
| modem. Frankly, wardmodem isn't running in this mode. |
| |
| """ |
| WARDMODEM = 0 |
| SPLIT_VERIFY = 1 |
| PASS_THROUGH = 2 |
| |
| MODE_NAME = { |
| WARDMODEM: 'WARDMODEM', |
| SPLIT_VERIFY: 'SPLIT_VERIFY', |
| PASS_THROUGH: 'PASS_THROUGH' |
| } |
| |
| |
| @classmethod |
| def to_string(cls, value): |
| """ |
| A class method to obtain string representation of the enum values. |
| |
| @param value: the enum value to stringify. |
| |
| """ |
| return "%s.%s" % (cls.__name__, cls.MODE_NAME[value]) |
| |
| |
| class ATTransceiver(object): |
| """ |
| A world facing multiplexer class that orchestrates the communication between |
| modem manager, the physical modem, and wardmodem back-end. |
| |
| """ |
| |
| def __init__(self, mm_at_port, modem_conf, |
| modem_at_port=None): |
| """ |
| @param mm_at_port: File descriptor for AT port used by modem manager. |
| Can not be None. |
| |
| @param modem_conf: A ModemConfiguration object containing the |
| configuration data for the current modem. |
| |
| @param modem_at_port: File descriptor for AT port used by the modem. May |
| be None, but that forces ATTransceiverMode.WARDMODEM. Default: |
| None. |
| |
| """ |
| super(ATTransceiver, self).__init__() |
| assert mm_at_port is not None |
| |
| self._logger = logging.getLogger(__name__) |
| self._task_loop = task_loop.get_instance() |
| self._mode = ATTransceiverMode.WARDMODEM |
| # The time we wait for any particular response from physical modem. |
| self._modem_response_timeout_milliseconds = ( |
| MODEM_RESPONSE_TIMEOUT_MILLISECONDS) |
| # We keep a queue of responses from the wardmodem and physical modem, |
| # so that we can verify they match. |
| self._cached_modem_responses = collections.deque() |
| self._cached_wardmodem_responses = collections.deque() |
| |
| # When a wardmodem response has been received but the corresponding |
| # physical modem response hasn't arrived, we post a task to wait for the |
| # response. |
| self._modem_response_wait_task = None |
| |
| # We use a map from a set of well known state machine names to actual |
| # objects to dispatch state machine calls. This allows tests to provide |
| # alternative implementations of any state machine to wardmodem. |
| self._state_machines = {} |
| |
| # If registered with a non-None machine, the fallback machine is used to |
| # service all AT commands that are not matched with any other machine. |
| self._fallback_state_machine = None |
| self._fallback_machine_function = None |
| |
| # Maps an incoming AT command from modemmanager to an internal wardmodem |
| # action. |
| self._at_to_wm_action_map = {} |
| # Maps an internal response from wardmodem to an AT command to be sent |
| # to modemmanager. |
| self._wm_response_to_at_map = {} |
| |
| # Load mapping between AT commands and wardmodem actions. |
| self._update_at_to_wm_action_map(modem_conf.base_at_to_wm_action_map) |
| self._update_at_to_wm_action_map(modem_conf.plugin_at_to_wm_action_map) |
| self._update_wm_response_to_at_map( |
| modem_conf.base_wm_response_to_at_map) |
| self._update_wm_response_to_at_map( |
| modem_conf.plugin_wm_response_to_at_map) |
| self._logger.debug('Finished loading AT --> wardmodem configuration.') |
| self._logger.debug(self._at_to_wm_action_map) |
| self._logger.debug('Finished loading wardmodem --> AT configuration.') |
| self._logger.debug(self._wm_response_to_at_map) |
| |
| # Initialize channels -- let the session begin. |
| if modem_at_port is not None: |
| self._modem_channel = at_channel.ATChannel( |
| self._process_modem_at_command, |
| modem_at_port, |
| 'modem_primary_channel') |
| self._modem_channel.at_prefix = modem_conf.mm_to_modem_at_prefix |
| self._modem_channel.at_suffix = modem_conf.mm_to_modem_at_suffix |
| else: |
| self._modem_channel = None |
| |
| self._mm_channel = at_channel.ATChannel(self._process_mm_at_command, |
| mm_at_port, |
| 'mm_primary_channel') |
| self._mm_channel.at_prefix = modem_conf.modem_to_mm_at_prefix |
| self._mm_channel.at_suffix = modem_conf.modem_to_mm_at_suffix |
| |
| |
| # Verification failure reasons |
| VERIFICATION_FAILED_MISMATCH = 1 |
| VERIFICATION_FAILED_TIME_OUT = 2 |
| |
| |
| @property |
| def mode(self): |
| """ |
| ATTranscieverMode value. Determines how commands are routed. |
| |
| @see ATTransceiverMode |
| |
| """ |
| return self._mode |
| |
| |
| @mode.setter |
| def mode(self, value): |
| """ |
| Set mode. |
| |
| @param value: The value to set. Type: ATTransceiverMode. |
| |
| """ |
| if value != ATTransceiverMode.WARDMODEM and self._modem_channel is None: |
| self._logger.warning( |
| 'Can not switch to %s mode. No modem port provided.', |
| ATTransceiverMode.to_string(value)) |
| return |
| self._logger.info('Set mode to %s', |
| ATTransceiverMode.to_string(value)) |
| self._mode = value |
| |
| |
| def get_state_machine(self, well_known_name): |
| """ |
| Get the registered state machine for the given well known name. |
| |
| @param well_known_name: The name of the desired machine. |
| |
| @return: The machine. None if not found. |
| |
| """ |
| return self._state_machines.get(well_known_name, None) |
| |
| |
| def register_state_machine(self, state_machine): |
| """ |
| Register a new state machine. |
| |
| We maintain a map from the well known name of the state machine to the |
| object. Any older object mapped to the same name will be replaced. |
| |
| @param state_machine: [StateMachine object] The state machine |
| object to be used to dispatch calls. |
| |
| """ |
| state_machine_name = state_machine.get_well_known_name() |
| self._state_machines[state_machine_name] = state_machine |
| |
| |
| def register_fallback_state_machine(self, state_machine_name, function): |
| """ |
| Register the fallback state machine to forward AT commands to. |
| |
| If this machine is registered, all AT commands for which no matching |
| rule is found will result in the call |state_machine|.|function|(at). |
| where |at| is the actual AT command that could not be matched. |
| |
| @param state_machine_name: Well known name of the machine to fallback on |
| if no machine matches an incoming AT command. |
| |
| @param function: The function in |state_machine| to call. |
| |
| """ |
| if state_machine_name not in self._state_machines: |
| self._setup_error('Machine %s, set as fallback, has not been ' |
| 'registered. ' % state_machine_name) |
| self._fallback_state_machine = state_machine_name |
| self._fallback_machine_function = function |
| |
| |
| def process_wardmodem_response(self, response, *args): |
| """ |
| Convert responses from the wardmodem into AT commands and send them to |
| modemmanager. |
| |
| @param response: wardmodem response to be translated to AT response to |
| the modem manager. |
| |
| @param *args: arguments to the wardmodem response. |
| |
| @raises: ATTransceiverError if the response can not be translated into |
| an AT command. |
| |
| """ |
| self._logger.debug('Processing wardmodem response %s%s', |
| response, str(args) if args else '') |
| if response not in self._wm_response_to_at_map: |
| self._runtime_error('Unknown wardmodem response |%s|' % response) |
| at_response = self._construct_at_response( |
| self._wm_response_to_at_map[response], *args) |
| self._process_wardmodem_at_command(at_response) |
| |
| # ########################################################################## |
| # Callbacks -- These are the functions that process events from the |
| # ATChannel or the TaskLoop. These functions are either |
| # (1) set as callbacks in the ATChannel, or |
| # (2) called internally to process the AT command to/from the TaskLoop. |
| |
| def _process_modem_at_command(self, command): |
| """ |
| Callback called by the physical modem channel when an AT response is |
| received. |
| |
| @param command: AT command sent by the physical modem. |
| |
| """ |
| assert self.mode != ATTransceiverMode.WARDMODEM |
| self._logger.debug('Command {modem ==> []}: |%s|', command) |
| if self.mode == ATTransceiverMode.PASS_THROUGH: |
| self._logger.debug('Command {[] ==> mm}: |%s|' , command) |
| self._mm_channel.send(command) |
| else: |
| self._cached_modem_responses.append(command) |
| self._verify_and_send_mm_commands() |
| |
| |
| def _process_mm_at_command(self, command): |
| """ |
| Callback called by the modem manager channel when an AT command is |
| received. |
| |
| @param command: AT command sent by modem manager. |
| |
| """ |
| self._logger.debug('Command {mm ==> []}: |%s|', command) |
| if(self.mode == ATTransceiverMode.PASS_THROUGH or |
| self.mode == ATTransceiverMode.SPLIT_VERIFY): |
| self._logger.debug('Command {[] ==> modem}: |%s|', command) |
| self._modem_channel.send(command) |
| if(self.mode == ATTransceiverMode.WARDMODEM or |
| self.mode == ATTransceiverMode.SPLIT_VERIFY): |
| self._logger.debug('Command {[] ==> wardmodem}: |%s|', command) |
| self._post_wardmodem_request(command) |
| |
| |
| def _process_wardmodem_at_command(self, command): |
| """ |
| Function called to process an AT command response of wardmodem. |
| |
| This function is called after the response from the task loop has been |
| converted to an AT command. |
| |
| @param command: The AT command response of wardmodem. |
| |
| """ |
| assert self.mode != ATTransceiverMode.PASS_THROUGH |
| self._logger.debug('Command {wardmodem ==> []: |%s|', command) |
| if self.mode == ATTransceiverMode.WARDMODEM: |
| self._logger.debug('Command {[] ==> mm}: |%s|', command) |
| self._mm_channel.send(command) |
| else: |
| self._cached_wardmodem_responses.append(command) |
| self._verify_and_send_mm_commands() |
| |
| |
| def _post_wardmodem_request(self, command): |
| """ |
| For an AT command, find out the action to be taken on wardmodem and post |
| the action. |
| |
| @param command: AT command for which a request must be posted to |
| wardmodem. |
| |
| @raises: ATTransceiverException if no valid action exists for the given |
| AT command. |
| |
| """ |
| action = self._find_wardmodem_action_for_at(command) |
| state_machine_name, function_name, args = action |
| try: |
| state_machine = self._state_machines[state_machine_name] |
| except KeyError: |
| self._runtime_error( |
| 'Malformed action registered for AT command -- Unknown ' |
| 'state machine. AT command: |%s|. Action: |%s|' % |
| (command, action)) |
| try: |
| function = getattr(state_machine, function_name) |
| except AttributeError: |
| self._runtime_error( |
| 'Malformed action registered for AT command -- Unkonwn ' |
| 'function name. AT command: |%s|. Action: |%s|. Object ' |
| 'dictionary: %s.' % (command, action, dir(state_machine))) |
| |
| self._task_loop.post_task( |
| self._execute_state_machine_function, command, action, function, |
| *args) |
| |
| # ########################################################################## |
| # Helper functions |
| |
| def _execute_state_machine_function(self, at_command, action, function, |
| *args): |
| """ |
| A thin wrapper to execute state_machine.function(args). Instead of |
| posting the call directly, this method is posted for better error |
| reporting in case of failure. |
| |
| @param at_command: The AT command for which this function was called. |
| |
| @param action: The matching wardmodem action which led to this function |
| call. |
| |
| @param function: The function to call. |
| |
| @param *args: Arguments to be passed to function. |
| |
| """ |
| try: |
| function(*args) |
| except TypeError as e: |
| self._logger.error( |
| 'Possible malformed action registered for AT command -- ' |
| 'Incorrect arguments. AT command: |%s|. Action: |%s|. ' |
| 'Expected function signature: %s. ' |
| 'Original error raised: |%s|', |
| at_command, action, inspect.getargspec(function), str(e)) |
| # use 'raise' here to preserve the original backtrace. |
| raise |
| |
| |
| def _update_at_to_wm_action_map(self, raw_map): |
| """ |
| Update the dictionary that maps AT commands and their arguments to the |
| action to be taken by wardmodem. |
| |
| The internal map updated is |
| {at_command, {(arg1, arg2, ...), (state_machine_name, |
| function, |
| (idx1, idx2, ...))}} |
| Here, |
| - at_command [string] is the AT Command received, |
| - (arg1, arg2, ...) [tuple of string] is possibly empty, and |
| specifies the arguments that need to be matched. It may contain |
| the special symbol '*' to mean ignore that argument while |
| matching. |
| - state_machine_name [string] is name of a state machine in the |
| state machine map. |
| - function [string] is a function exported by the state machine |
| mapped to by state_machine_name |
| - (idx1, idx2, ...) [tuple of int] lists the (string) arguments that |
| should be passed on from the AT command to the called function. |
| |
| @param raw_map: The raw map from AT command to function read in from the |
| configuration file. For the format of this map, see the comment |
| at the head of a configuration file. |
| |
| @raises WardModemSetupException if raw_map was not well-formed, and the |
| update failed. Absolutely no guarantees about the state of the |
| map if the update fails. |
| |
| """ |
| for atcom in raw_map: |
| try: |
| at, args = self._parse_at_command(atcom) |
| except wme.ATTransceiverException as e: |
| self._setup_error(e.args) |
| action = self._sanitize_wardmodem_action(raw_map[atcom]) |
| |
| if at not in self._at_to_wm_action_map: |
| self._at_to_wm_action_map[at] = {} |
| if args in self._at_to_wm_action_map[at]: |
| self._logger.debug('Updated at_to_wm_action_map: ' |
| '|%s(%s): [%s --> %s]|', |
| at, args, |
| str(self._at_to_wm_action_map[at][args]), |
| str(action)) |
| else: |
| self._logger.debug('Added to at_to_wm_action_map: |%s(%s): %s|', |
| at, args, str(action)) |
| self._at_to_wm_action_map[at][args] = action |
| |
| |
| def _update_wm_response_to_at_map(self, raw_map): |
| """ |
| Update the dictionary that maps wardmodem responses to AT commands. |
| |
| The internal map updated is of the same form as raw_map: |
| {response_function: at_response} |
| where both response_function and at_response are of type string. |
| at_resposne may contain special placeholder charachters '*'. |
| |
| @param raw_map: The map read in from the configuration file. |
| |
| """ |
| for response_function, at_response in raw_map.iteritems(): |
| if response_function in self._wm_response_to_at_map: |
| self._logger.debug( |
| 'Updated wm_response_to_at_map: |%s: [%s --> %s]|', |
| response_function, |
| self._wm_response_to_at_map[response_function], |
| at_response) |
| else: |
| self._logger.debug( |
| 'Added to wm_response_to_at_map: |%s: %s|', |
| response_function, at_response) |
| self._wm_response_to_at_map[response_function] = at_response |
| |
| |
| def _sanitize_wardmodem_action(self, action): |
| """ |
| Test that the action specified in the AT command --> wardmodem action |
| map is sane and normalize to simplify handling later. |
| |
| Currently, this only checks that the action consists of tuples of the |
| right size / type. It might make sense to make this check a lot stricter |
| so that ill-formed configuration files are caught early. |
| |
| Returns the normalized form: 3-tuple with the last item being a tuple of |
| integers. |
| |
| @param action: The action tuple to check. |
| |
| @return action: Sanitized action tuple. Normalized form is (string, |
| string, (int*)). |
| |
| @raises: WardModemSetupException if action is ill-formed. |
| |
| """ |
| errstr = ('Ill formed action |%s|. Action must be of the form: ' |
| '(state_machine_name, function_name, (index_tuple)) ' |
| 'Here, index_tuple is a tuple of integers.' % str(action)) |
| sanitized_action = [] |
| if type(action) is not tuple: |
| self._setup_error(errstr) |
| if len(action) != 2 and len(action) != 3: |
| self._setup_error(errstr) |
| if type(action[0]) != str or type(action[1]) != str: |
| self._setup_error(errstr) |
| sanitized_action.append(action[0]) |
| sanitized_action.append(action[1]) |
| if len(action) != 3: |
| sanitized_action.append(()) |
| else: |
| if type(action[2]) == tuple: |
| for idx in action[2]: |
| if type(idx) != int: |
| self._setup_error(errstr) |
| sanitized_action.append(action[2]) |
| else: |
| if type(action[2]) != int: |
| self._setup_error(errstr) |
| sanitized_action.append((action[2],)) |
| return tuple(sanitized_action) |
| |
| |
| def _parse_at_command(self, atcom): |
| """ |
| Parse an AT command into the command and its arguments |
| |
| Examples: |
| 'AT?' --> ('AT?', ()) |
| 'AT+XX' --> ('AT+XX', ()) |
| 'AT%SCF=1,2' --> ('AT%SCF=', ('1', '2')) |
| 'ATX=*' --> ('ATX=', ('*',)) |
| |
| @param atcom: [string] the AT command to parse |
| |
| @return: [(string, (string))] A tuple of the AT command proper and a |
| tuple of arguments. If no arguments are present, an empty argument |
| tuple is included. |
| |
| @raises ATTransceiverError if atcom is not well-formed. |
| |
| """ |
| parts = atcom.split('=') |
| if len(parts) > 2: |
| self._runtime_error('Parsing error: |%s|' % atcom) |
| if len(parts) == 1: |
| return (atcom, ()) |
| # Note: Include the trailing '=' in the AT commmand. |
| at = parts[0] + '=' |
| if parts[1] == '': |
| # This was a command of the form 'ATXXX='. |
| # Treat this as having no arguments, instead of a single '' |
| # argument. |
| return (at, ()) |
| else: |
| return (at, tuple(parts[1].split(','))) |
| |
| |
| def _find_wardmodem_action_for_at(self, atcom): |
| """ |
| For the given AT command, find the appropriate action from wardmodem. |
| This will attempt to find a rule matching |atcom|. If that fails, and if |
| |_fallback_state_machine| exists, the default action from this machine |
| is returned. |
| |
| @param atcom: The AT command to find action for. Type: str. |
| |
| @return: Returns the tuple of (state_machine_name, function, |
| (arguments,)) for the corresponding action. The action to be |
| taken is roughly |
| state_machine.function(arguments) |
| Type: (string, string, (string,)) |
| |
| @raises: ATTransceiverException if the at command is ill-formed or we |
| don't have a corresponding action. |
| |
| """ |
| try: |
| at, args = self._parse_at_command(atcom) |
| except wme.ATTransceiverException as e: |
| self._runtime_error( |
| 'Ill formed AT command received. %s' % str(e.args)) |
| if at not in self._at_to_wm_action_map: |
| if self._fallback_state_machine: |
| return (self._fallback_state_machine, |
| self._fallback_machine_function, |
| (atcom,)) |
| self._runtime_error('Unknown AT command: |%s|' % atcom) |
| |
| for candidate_args in self._at_to_wm_action_map[at]: |
| candidate_action = self._at_to_wm_action_map[at][candidate_args] |
| if self._args_match(args, candidate_args): |
| # Found corresponding entry, now replace the indices of the |
| # arguments in the action with actual arguments. |
| machine, function, idxs = candidate_action |
| fargs = [] |
| for idx in idxs: |
| fargs.append(args[idx]) |
| return machine, function, tuple(fargs) |
| |
| if self._fallback_state_machine: |
| return (self._fallback_state_machine, |
| self._fallback_machine_function, |
| (atcom,)) |
| self._runtime_error('Unhandled arguments: |%s|' % atcom) |
| |
| |
| def _args_match(self, args, matches): |
| """ |
| Check whether args are captured by regexp. |
| |
| @param args: A tuple of strings, the arguments to check for inclusion. |
| |
| @param matches: A similar tuple, but may contain the wild-card '*'. |
| |
| @return True if args is represented by regexp, False otherwise. |
| |
| """ |
| if len(args) != len(matches): |
| return False |
| for i in range(len(args)): |
| arg = args[i] |
| match = matches[i] |
| if match == ARG_PLACEHOLDER: |
| return True |
| if arg != match: |
| return False |
| return True |
| |
| def _construct_at_response(self, raw_at, *args): |
| """ |
| Replace palceholders in an AT command template with actual arguments. |
| |
| @param raw_at: An AT command with '*' placeholders where arguments |
| should be provided. |
| |
| @param *args: Arguments to fill in the placeholders in |raw_at|. |
| |
| @return: AT command with placeholders replaced by arguments. |
| |
| @raises: ATTransceiverException if the number of arguments does not |
| match the number of placeholders. |
| |
| """ |
| parts = raw_at.split(ARG_PLACEHOLDER) |
| if len(args) < (len(parts) - 1): |
| self._runtime_error( |
| 'Failed to construct AT response from |%s|. Expected %d ' |
| 'arguments, found %d.' % |
| (raw_at, len(parts) - 1, len(args))) |
| if len(args) > (len(parts) - 1): |
| self._logger.warning( |
| 'Number of arguments in wardmodem response greater than ' |
| 'expected. Some of the arguments from %s will not be used ' |
| 'in the reconstruction of %s', str(args), raw_at) |
| |
| ret = [] |
| for i in range(len(parts) - 1): |
| ret += parts[i] |
| ret += str(args[i]) |
| ret += parts[len(parts) - 1] |
| return ''.join(ret) |
| |
| |
| def _verify_and_send_mm_commands(self): |
| """ |
| While there are corresponding responses from wardmodem and physical |
| modem, verify that they match and respond to modem manager. |
| |
| """ |
| if not self._cached_wardmodem_responses: |
| return |
| elif not self._cached_modem_responses: |
| if self._modem_response_wait_task is not None: |
| return |
| self._modem_response_wait_task = ( |
| self._task_loop.post_task_after_delay( |
| self._modem_response_timed_out, |
| self._modem_response_timeout_milliseconds)) |
| else: |
| if self._modem_response_wait_task is not None: |
| self._task_loop.cancel_posted_task( |
| self._modem_response_wait_task) |
| self._modem_response_wait_task = None |
| self._verify_and_send_mm_command( |
| self._cached_modem_responses.popleft(), |
| self._cached_wardmodem_responses.popleft()) |
| self._verify_and_send_mm_commands() |
| |
| |
| def _verify_and_send_mm_command(self, modem_response, wardmodem_response): |
| """ |
| Verify that the two AT commands match and respond to modem manager. |
| |
| @param modem_response: AT command response of the physical modem. |
| |
| @param wardmodem_response: AT command response of wardmodem. |
| |
| """ |
| # TODO(pprabhu) This can not handle unsolicited commands yet. |
| # Unsolicited commands from either of the modems will push the lists out |
| # of sync. |
| if wardmodem_response != modem_response: |
| self._logger.warning('Response verification failed.') |
| self._logger.warning('modem response: |%s|', modem_response) |
| self._logger.warning('wardmodem response: |%s|', wardmodem_response) |
| self._logger.warning('wardmodem response takes precedence.') |
| self._report_verification_failure( |
| self.VERIFICATION_FAILED_MISMATCH, |
| modem_response, |
| wardmodem_response) |
| self._logger.debug('Command {[] ==> mm}: |%s|' , wardmodem_response) |
| self._mm_channel.send(wardmodem_response) |
| |
| |
| def _modem_response_timed_out(self): |
| """ |
| Callback called when we time out waiting for physical modem response for |
| some wardmodem response. Can't do much -- log physical modem failure and |
| forward wardmodem response anyway. |
| |
| """ |
| assert (not self._cached_modem_responses and |
| self._cached_wardmodem_responses) |
| wardmodem_response = self._cached_wardmodem_responses.popleft() |
| self._logger.warning('modem response timed out. ' |
| 'Forwarding wardmodem response |%s| anyway.', |
| wardmodem_response) |
| self._logger.debug('Command {[] ==> mm}: |%s|' , wardmodem_response) |
| self._report_verification_failure( |
| self.VERIFICATION_FAILED_TIME_OUT, |
| None, |
| wardmodem_response) |
| self._mm_channel.send(wardmodem_response) |
| self._modem_response_wait_task = None |
| self._verify_and_send_mm_commands() |
| |
| |
| def _report_verification_failure(self, failure, modem_response, |
| wardmodem_response): |
| """ |
| Failure to verify the wardmodem response will call this non-public |
| method. |
| |
| At present, it is only used by unittests to detect failure. |
| |
| @param failure: The cause of failure. Must be one of |
| VERIFICATION_FAILED_MISMATCH or VERIFICATION_FAILED_TIME_OUT. |
| |
| @param modem_response: The received modem response (if any). |
| |
| @param wardmodem_response: The received wardmodem response. |
| |
| """ |
| pass |
| |
| |
| def _runtime_error(self, error_message): |
| """ |
| Log the message at error level and raise ATTransceiverException. |
| |
| @param error_message: The error message. |
| |
| @raises: ATTransceiverException. |
| |
| """ |
| self._logger.error(error_message) |
| raise wme.ATTransceiverException(error_message) |
| |
| |
| def _setup_error(self, error_message): |
| """ |
| Log the message at error level and raise WardModemSetupException. |
| |
| @param error_message: The error message. |
| |
| @raises: WardModemSetupException. |
| |
| """ |
| self._logger.error(error_message) |
| raise wme.WardModemSetupException(error_message) |