blob: 238099b721ab5f0820fadd507084c91c87c42691 [file] [log] [blame]
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import fcntl
import glib
import logging
import os
import re
import termios
import tty
import task_loop
class ATChannel(object):
"""
Send a single AT command in either direction asynchronously.
This class represents the AT command channel. The program can
(1) Request *one* AT command to be sent on the channel.
(2) Get notified of a received AT command.
"""
CHANNEL_READ_CHUNK_SIZE = 128
GLIB_CB_CONDITION_STR = {
glib.IO_IN: 'glib.IO_IN',
glib.IO_OUT: 'glib.IO_OUT',
glib.IO_PRI: 'glib.IO_PRI',
glib.IO_ERR: 'glib.IO_ERR',
glib.IO_HUP: 'glib.IO_HUP'
}
# And exception with error code 11 is raised when a write to some file
# descriptor fails because the channel is full.
IO_ERROR_CHANNEL_FULL = 11
def __init__(self, receiver_callback, channel, channel_name='',
at_prefix='', at_suffix='\r\n'):
"""
@param receiver_callback: The callback function to be called when an AT
command is received over the channel. The signature of the
callback must be
def receiver_callback(self, command)
@param channel: The file descriptor for channel, as returned by e.g.
os.open().
@param channel_name: [Optional] Name of the channel to be used for
logging.
@param at_prefix: AT commands sent out on this channel will be prefixed
with |at_prefix|. Default ''.
@param at_suffix: AT commands sent out on this channel will be
terminated with |at_suffix|. Default '\r\n'.
@raises IOError if some file operation on |channel| fails.
"""
super(ATChannel, self).__init__()
assert receiver_callback and channel
self._receiver_callback = receiver_callback
self._channel = channel
self._channel_name = channel_name
self._at_prefix = at_prefix
self._at_suffix = at_suffix
self._logger = logging.getLogger(__name__)
self._task_loop = task_loop.get_instance()
self._received_command = '' # Used to store partially received command.
flags = fcntl.fcntl(self._channel, fcntl.F_GETFL)
flags = flags | os.O_RDWR | os.O_NONBLOCK
fcntl.fcntl(self._channel, fcntl.F_SETFL, flags)
try:
tty.setraw(self._channel, tty.TCSANOW)
except termios.error as ttyerror:
raise IOError(ttyerror.args)
# glib does not raise errors, merely prints to stderr.
# If we've come so far, assume channel is well behaved.
self._channel_cb_handler = glib.io_add_watch(
self._channel,
glib.IO_IN | glib.IO_PRI | glib.IO_ERR | glib.IO_HUP,
self._handle_channel_cb,
priority=glib.PRIORITY_HIGH)
@property
def at_prefix(self):
""" The string used to prefix AT commands sent on the channel. """
return self._at_prefix
@at_prefix.setter
def at_prefix(self, value):
"""
Set the string to use to prefix AT commands.
This can vary by the modem being used.
@param value: The string prefix.
"""
self._logger.debug('AT command prefix set to: |%s|', value)
self._at_prefix = value
@property
def at_suffix(self):
""" The string used to terminate AT commands sent on the channel. """
return self._at_suffix
@at_suffix.setter
def at_suffix(self, value):
"""
Set the string to use to terminate AT commands.
This can vary by the modem being used.
@param value: The string terminator.
"""
self._logger.debug('AT command suffix set to: |%s|', value)
self._at_suffix = value
def __del__(self):
glib.source_remove(self._channel_cb_handler)
def send(self, at_command):
"""
Send an AT command on the channel.
@param at_command: The AT command to send.
@return: True if send was successful, False if send failed because the
channel was full.
@raises: OSError if send failed for any reason other than that the
channel was full.
"""
at_command = self._prepare_for_send(at_command)
try:
os.write(self._channel, at_command)
except OSError as write_error:
if write_error.args[0] == self.IO_ERROR_CHANNEL_FULL:
self._logger.warning('%s Send Failed: |%s|',
self._channel_name, repr(at_command))
return False
raise write_error
self._logger.debug('%s Sent: |%s|',
self._channel_name, repr(at_command))
return True
def _process_received_command(self):
"""
Process a command from the channel once it has been fully received.
"""
self._logger.debug('%s Received: |%s|',
self._channel_name, repr(self._received_command))
self._task_loop.post_task(self._receiver_callback,
self._received_command)
def _handle_channel_cb(self, channel, cb_condition):
"""
Callback used by the channel when there is any data to read.
@param channel: The channel which issued the signal.
@param cb_condition: one of glib.IO_* conditions that caused the signal.
@return: True, so as to continue watching the channel for further
signals.
"""
if channel != self._channel:
self._logger.warning('%s Signal received on unknown channel. '
'Expected: |%d|, obtained |%d|. Ignoring.',
self._channel_name, self._channel, channel)
return True
if cb_condition == glib.IO_IN or cb_condition == glib.IO_PRI:
self._read_channel()
return True
self._logger.warning('%s Unexpected cb condition %s received. Ignored.',
self._channel_name,
self.GLIB_CB_CONDITION_STR[cb_condition])
return True
def _read_channel(self):
"""
Read data from channel when the channel indicates available data.
"""
incoming_list = []
try:
while True:
s = os.read(self._channel, self.CHANNEL_READ_CHUNK_SIZE)
if not s:
break
incoming_list.append(s)
except OSError as read_error:
if not read_error.args[0] == self.IO_ERROR_CHANNEL_FULL:
raise read_error
if not incoming_list:
return
incoming = ''.join(incoming_list)
if not incoming:
return
# TODO(pprabhu) Currently, we split incoming AT commands on '\r' or
# '\n'. It may be that some modems that expect the terminator sequence
# to be '\r\n' send spurious '\r's on the channel. If so, we must ignore
# spurious '\r' or '\n'.
# (1) replace ; by \rAT.
# ';' can be used to string together AT commands.
# So
# AT1;2
# is the same as sending two commands:
# AT1
# AT2
incoming = re.sub(';', '\rAT', incoming)
# (2) Replace any occurence of a terminator with '\r\r'.
# This ensures that splitting at the terminator actually gives us an
# empty part. viz --
# 'some_string\nother_string' --> 'some_string\r\rother_string'
# --> ['some_string', '', 'other_string']
# We use the empty string generated to detect completed commands.
incoming = re.sub('\r|\n|;', '\r\r', incoming)
# (3) Split into AT commands.
parts = re.split('\r', incoming)
for part in parts:
if (not part) and self._received_command:
self._process_received_command()
self._received_command = ''
elif part:
self._received_command = self._received_command + part
def _prepare_for_send(self, command):
"""
Sanitize AT command before sending on channel.
@param command: The command to sanitize.
@reutrn: The sanitized command.
"""
command = command.strip()
assert command.find('\r') == -1
assert command.find('\n') == -1
command = self.at_prefix + command + self.at_suffix
return command