wardmodem: Implement multiplexed serial channel end point.
- Implement serial end-point with read/write at AT command granularity.
- Implement a multiplexer that can send data from modem manager to both the
physical modem and wardmodem, and verify the responses.
BUG=chromium:242402
TEST=run unit-tests.
(1) python at_channel_unittest.py
(2) python at_transceiver_unittest.py
Change-Id: I59a4eefc8708fb418cede4a67adaa7904ebd82b0
Reviewed-on: https://gerrit.chromium.org/gerrit/57533
Commit-Queue: Prathmesh Prabhu <pprabhu@chromium.org>
Reviewed-by: Prathmesh Prabhu <pprabhu@chromium.org>
Tested-by: Prathmesh Prabhu <pprabhu@chromium.org>
diff --git a/client/cros/cellular/wardmodem/at_channel.py b/client/cros/cellular/wardmodem/at_channel.py
new file mode 100644
index 0000000..9684864
--- /dev/null
+++ b/client/cros/cellular/wardmodem/at_channel.py
@@ -0,0 +1,223 @@
+# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import fcntl
+import glib
+import logging
+import os
+import re
+import tty
+
+import task_loop
+
+CHANNEL_READ_CHUNK_SIZE = 128
+
+glib_cb_condition_str = {
+ glib.IO_IN: 'glib.IO_IN',
+ glib.IO_OUT: 'glib.IO_OUT',
+ glib.IO_PRI: 'glib.IO_PRI',
+ glib.IO_ERR: 'glib.IO_ERR',
+ glib.IO_HUP: 'glib.IO_HUP'
+}
+
+# And exception with error code 11 is raised when a write to some file
+# descriptor fails because the channel is full.
+IO_ERROR_CHANNEL_FULL = 11
+
+class ATChannel(object):
+ """
+ Send a single AT command in either direction asynchronously.
+
+ This class represents the AT command channel. The program can
+ (1) Request *one* AT command to be sent on the channel.
+ (2) Get notified of a received AT command.
+
+ """
+
+ def __init__(self, receiver_callback, channel, channel_name=''):
+ """
+ @param receiver_callback: The callback function to be called when an AT
+ command is received over the channel. The signature of the
+ callback must be
+
+ def receiver_callback(self, command)
+
+ @param channel: The file descriptor for channel, as returned by e.g.
+ os.open().
+
+ @param channel_name: [Optional] Name of the channel to be used for
+ logging.
+
+ @raises IOError if some file operation on |channel| fails.
+
+ """
+ super(ATChannel, self).__init__()
+ assert receiver_callback and channel
+
+ self._receiver_callback = receiver_callback
+ self._channel = channel
+ self._channel_name = channel_name
+
+ self._logger = logging.getLogger(__name__)
+ self._task_loop = task_loop.get_instance()
+ self._received_command = '' # Used to store partially received command.
+ self._at_terminator = '\r\n' # Terminate AT commands thus.
+
+ flags = fcntl.fcntl(self._channel, fcntl.F_GETFL)
+ flags = flags | os.O_RDWR | os.O_NONBLOCK
+ fcntl.fcntl(self._channel, fcntl.F_SETFL, flags)
+ try:
+ tty.setraw(self._channel)
+ except termios.error as ttyerror:
+ raise IOError(ttyerror.args)
+
+ # glib does not raise errors, merely prints to stderr.
+ # If we've come so far, assume channel is well behaved.
+ self._channel_cb_handler = glib.io_add_watch(
+ self._channel,
+ glib.IO_IN | glib.IO_PRI | glib.IO_ERR | glib.IO_HUP,
+ self._handle_channel_cb,
+ priority=glib.PRIORITY_HIGH)
+
+
+ @property
+ def at_terminator(self):
+ """
+ The string used to terminate AT commands sent / received on the channel.
+
+ Default value: '\r\n'
+ """
+ return self._at_terminator
+
+
+ @at_terminator.setter
+ def at_terminator(self, value):
+ """
+ Set the string to use to terminate AT commands.
+
+ This can vary by the modem being used.
+
+ @param value: The string terminator.
+
+ """
+ self._logger.debug('AT command terminator set to: |%s|', value)
+ self._at_terminator = value
+
+
+ def __del__(self):
+ glib.source_remove(self._channel_cb_handler)
+
+
+ def send(self, at_command):
+ """
+ Send an AT command on the channel.
+
+ @param at_command: The AT command to send.
+
+ @return: True if send was successful, False if send failed because the
+ channel was full.
+
+ @raises: OSError if send failed for any reason other than that the
+ channel was full.
+
+ """
+ at_command = self._prepare_for_send(at_command)
+ try:
+ os.write(self._channel, at_command)
+ except OSError as write_error:
+ if write_error.args[0] == IO_ERROR_CHANNEL_FULL:
+ self._logger.warning('%sSend Failed: |%s|',
+ self._channel_name, repr(at_command))
+ return False
+ raise write_error
+
+ self._logger.debug('%sSent: |%s|', self._channel_name, repr(at_command))
+ return True
+
+
+ def _process_received_command(self):
+ """
+ Process a command from the channel once it has been fully received.
+
+ """
+ self._logger.debug('%sReceived: |%s|',
+ self._channel_name, repr(self._received_command))
+ self._task_loop.post_task(self._receiver_callback,
+ self._received_command)
+
+
+ def _handle_channel_cb(self, channel, cb_condition):
+ """
+ Callback used by the channel when there is any data to read.
+
+ @param channel: The channel which issued the signal.
+
+ @param cb_condition: one of glib.IO_* conditions that caused the signal.
+
+ @return: True, so as to continue watching the channel for further
+ signals.
+
+ """
+ if channel != self._channel:
+ self._logger.warning('%sSignal received on unknown channel. '
+ 'Expected: |%d|, obtained |%d|. Ignoring.',
+ self._channel_name, self._channel, channel)
+ return True
+ if cb_condition == glib.IO_IN or cb_condition == glib.IO_PRI:
+ self._read_channel()
+ return True
+ self._logger.warning('%sUnexpected cb condition %s received. Ignoring.',
+ self._channel_name,
+ _glib_cb_condition_str[cb_condition])
+ return True
+
+
+ def _read_channel(self):
+ """
+ Read data from channel when the channel indicates available data.
+
+ """
+ incoming_list = []
+ try:
+ while True:
+ s = os.read(self._channel, CHANNEL_READ_CHUNK_SIZE)
+ if not s:
+ break
+ incoming_list.append(s)
+ except OSError as read_error:
+ if not read_error.args[0] == IO_ERROR_CHANNEL_FULL:
+ raise read_error
+ if not incoming_list:
+ return
+ incoming = ''.join(incoming_list)
+ if not incoming:
+ return
+
+ # TODO(pprabhu) Currently, we split incoming AT commands on '\r' or
+ # '\n'. It may be that some modems that expect the terminator sequence
+ # to be '\r\n' send spurious '\r's on the channel. If so, we must ignore
+ # spurious '\r' or '\n'.
+ parts = re.split('\r|\n', incoming)
+ for part in parts:
+ if (not part) and self._received_command:
+ self._process_received_command()
+ self._received_command = ''
+ elif part:
+ self._received_command = self._received_command + part
+
+
+ def _prepare_for_send(self, command):
+ """
+ Sanitize AT command before sending on channel.
+
+ @param command: The command to sanitize.
+
+ @reutrn: The sanitized command.
+
+ """
+ command = command.strip()
+ assert command.find('\r') == -1
+ assert command.find('\n') == -1
+ command = command + self.at_terminator
+ return command
diff --git a/client/cros/cellular/wardmodem/at_channel_unittest.py b/client/cros/cellular/wardmodem/at_channel_unittest.py
new file mode 100644
index 0000000..8824f85
--- /dev/null
+++ b/client/cros/cellular/wardmodem/at_channel_unittest.py
@@ -0,0 +1,184 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import at_channel
+
+import fcntl
+import functools
+import glib
+import logging
+import mox
+import os
+import tempfile
+import unittest
+
+import task_loop
+
+class ATChannelTestCase(unittest.TestCase):
+ """
+ Test fixture for ATChannel class.
+
+ """
+
+ def setUp(self):
+ self.mox = mox.Mox()
+
+ master, slave = os.openpty()
+ self._at_channel = at_channel.ATChannel(
+ self._recieve_command_local_callback, slave, 'test')
+
+ # Replace the channel inside _at_channel with a tempfile
+ # We will use the tempfile to simulate a tty pair.
+ os.close(master)
+ os.close(slave)
+ self._channel_file = tempfile.TemporaryFile(mode = 'w+')
+ # These properties are a copy of the properties set in ATChannel for the
+ # tty pair.
+ flags = fcntl.fcntl(self._channel_file.fileno(), fcntl.F_GETFL)
+ flags = flags | os.O_NONBLOCK
+ fcntl.fcntl(self._channel_file.fileno(), fcntl.F_SETFL, flags)
+ self._at_channel._channel = self._channel_file.fileno()
+ # We need to seek() to the beginning of the file to simulate tty read.
+ # So remember the head of the file.
+ self._channel_file_head = self._channel_file.tell()
+
+ # Also mock out the task_loop
+ self._mox_task_loop = self.mox.CreateMock(task_loop.TaskLoop)
+ self._at_channel._task_loop = self._mox_task_loop
+
+
+ def tearDown(self):
+ self._channel_file.close()
+
+ # ##########################################################################
+ # Tests
+
+ def test_successful_send(self):
+ """
+ Test that a single AT command can be sent on the channel.
+
+ """
+ payload = 'A not so huge AT+CEREG command.'
+ self._at_channel.send(payload)
+ received_command = self._recieve_command_remote()
+ self.assertTrue(received_command.endswith('\r\n'))
+ self.assertEqual(payload.strip(), received_command.strip())
+
+ # Change the AT command termination string and check again.
+ self._at_channel.at_terminator = '$$'
+ payload = 'A not so huge AT+CEREG command.'
+ self._at_channel.send(payload)
+ received_command = self._recieve_command_remote()
+ self.assertTrue(received_command.endswith('$$'))
+ self.assertEqual(payload.strip(), received_command.strip('$$'))
+
+
+ def test_recieve_single_at_command(self):
+ """
+ Test that a single AT command can be received together on the channel.
+
+ """
+ payload = 'We send you our AT+good wishes too!\r\n'
+ callback = lambda channel, payload: None
+ self._at_channel._receiver_callback = callback
+ self._mox_task_loop.post_task(callback, payload.strip())
+ self.mox.ReplayAll()
+ self._send_command_remote(payload)
+ self._at_channel._handle_channel_cb(self._channel_file.fileno(),
+ glib.IO_IN)
+ self.mox.VerifyAll()
+
+
+ def test_recieve_at_commands_in_parts(self):
+ """
+ Test that a multiple AT commands can be received in parts on the
+ channel.
+
+ """
+ payloads = ['AT1', '11\r\n', '\r\nAT22', '2\r\nAT333', '\r\n']
+ callback = lambda channel, payload: None
+ self._at_channel._receiver_callback = callback
+ self._mox_task_loop.post_task(callback, 'AT111')
+ self._mox_task_loop.post_task(callback, 'AT222')
+ self._mox_task_loop.post_task(callback, 'AT333')
+
+ self.mox.ReplayAll()
+ for payload in payloads:
+ self._send_command_remote(payload)
+ self._at_channel._handle_channel_cb(self._channel_file.fileno(),
+ glib.IO_IN)
+ self.mox.VerifyAll()
+
+
+ def test_recieve_long_at_commands(self):
+ """
+ Test that a multiple AT commands can be received in parts on the
+ channel.
+
+ """
+ payloads = ['AT1+',
+ '123456789\r\nAT2+123456789\r\nAT3+1234567',
+ '89\r\n']
+ callback = lambda channel, payload: None
+ self._at_channel._receiver_callback = callback
+ self._mox_task_loop.post_task(callback, 'AT1+123456789')
+ self._mox_task_loop.post_task(callback, 'AT2+123456789')
+ self._mox_task_loop.post_task(callback, 'AT3+123456789')
+
+ self.mox.ReplayAll()
+ at_channel.CHANNEL_READ_CHUNK_SIZE = 4
+ for payload in payloads:
+ self._send_command_remote(payload)
+ self._at_channel._handle_channel_cb(self._channel_file.fileno(),
+ glib.IO_IN)
+ self.mox.VerifyAll()
+
+ # ##########################################################################
+ # Helper functions
+
+ def _clean_channel_file(self):
+ """
+ Clean the tempfile used to simulate tty, and reset the r/w head.
+
+ """
+ self._channel_file.truncate(0)
+ self._channel_file_head = self._channel_file.tell()
+
+
+ def _send_command_remote(self, payload):
+ """
+ Simulate a command being sent from the remote tty port.
+
+ @param payload: The command to send.
+
+ """
+ self._clean_channel_file()
+ self._channel_file.write(payload)
+ self._channel_file.flush()
+ self._channel_file.seek(self._channel_file_head)
+
+
+ def _recieve_command_remote(self):
+ """
+ Simluate a command being received at the remote tty port.
+
+ """
+ self._channel_file.flush()
+ self._channel_file.seek(self._channel_file_head)
+ payload_list = []
+ for buf in iter(functools.partial(self._channel_file.read, 128), ''):
+ payload_list.append(buf)
+ self._clean_channel_file()
+ return ''.join(payload_list)
+
+
+ def _recieve_command_local_callback(self, payload):
+ pass
+
+
+if __name__ == '__main__':
+ logging.basicConfig(level=logging.DEBUG)
+ unittest.main()
diff --git a/client/cros/cellular/wardmodem/at_transceiver.py b/client/cros/cellular/wardmodem/at_transceiver.py
new file mode 100644
index 0000000..4ae350f
--- /dev/null
+++ b/client/cros/cellular/wardmodem/at_transceiver.py
@@ -0,0 +1,335 @@
+# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import collections
+import logging
+
+import at_channel
+import task_loop
+
+MODEM_RESPONSE_TIMEOUT_MILLISECONDS = 30000
+
+class ATTransceiverMode(object):
+ """
+ Enum to specify what mode the ATTransceiver is operating in.
+
+ There are three modes. These modes determine how the commands to/from
+ the modemmanager are routed.
+ WARDMODEM: modemmanager interacts with wardmodem alone.
+ SPLIT_VERIFY: modemmanager commands are sent to both the wardmodem
+ and the physical modem on the device. Responses from
+ wardmodem are verified against responses from the physical
+ modem. In case of a mismatch, wardmodem's response is
+ chosen, and a warning is issued.
+ PASS_THROUGH: modemmanager commands are routed to/from the physical
+ modem. Frankly, wardmodem isn't running in this mode.
+
+ """
+ WARDMODEM = 0
+ SPLIT_VERIFY = 1
+ PASS_THROUGH = 2
+
+ MODE_NAME = {
+ WARDMODEM: 'WARDMODEM',
+ SPLIT_VERIFY: 'SPLIT_VERIFY',
+ PASS_THROUGH: 'PASS_THROUGH'
+ }
+
+
+ @classmethod
+ def to_string(cls, value):
+ """
+ A class method to obtain string representation of the enum values.
+
+ @param value: the enum value to stringify.
+ """
+ return "%s.%s" % (cls.__name__, cls.MODE_NAME[value])
+
+
+class ATTransceiver(object):
+ """
+ A world facing multiplexer class that orchestrates the communication between
+ modem manager, the physical modem, and wardmodem back-end.
+
+ """
+
+ def __init__(self, mm_at_port, modem_at_port=None):
+ """
+ @param mm_at_port: File descriptor for AT port used by modem manager.
+ Can not be None.
+
+ @param modem_at_port: File descriptor for AT port used by the modem. May
+ be None, but that forces ATTransceiverMode.WARDMODEM. Default:
+ None.
+
+ """
+ super(ATTransceiver, self).__init__()
+ assert mm_at_port is not None
+
+ self._logger = logging.getLogger(__name__)
+ self._task_loop = task_loop.get_instance()
+ self._mode = ATTransceiverMode.WARDMODEM
+ # The time we wait for any particular response from physical modem.
+ self._modem_response_timeout_milliseconds = (
+ MODEM_RESPONSE_TIMEOUT_MILLISECONDS)
+ # We keep a queue of responses from the wardmodem and physical modem,
+ # so that we can verify they match.
+ self._cached_modem_responses = collections.deque()
+ self._cached_wardmodem_responses = collections.deque()
+ # When a wardmodem response has been received but the corresponding
+ # physical modem response hasn't arrived, we post a task to wait for the
+ # response.
+ self._modem_response_wait_task = None
+
+ if modem_at_port is not None:
+ self._modem_channel = at_channel.ATChannel(
+ self._process_modem_at_command,
+ modem_at_port,
+ 'modem_primary_channel')
+ else:
+ self._modem_channel = None
+
+ self._mm_channel = at_channel.ATChannel(self._process_mm_at_command,
+ mm_at_port,
+ 'mm_primary_channel')
+
+
+ # Verification failure reasons
+ VERIFICATION_FAILED_MISMATCH = 1
+ VERIFICATION_FAILED_TIME_OUT = 2
+
+
+ @property
+ def mode(self):
+ """
+ ATTranscieverMode value. Determines how commands are routed.
+
+ @see ATTransceiverMode
+
+ """
+ return self._mode
+
+
+ @mode.setter
+ def mode(self, value):
+ """
+ Set mode.
+
+ @param value: The value to set. Type: ATTransceiverMode.
+
+ """
+ if value != ATTransceiverMode.WARDMODEM and self._modem_channel is None:
+ self._logger.warning(
+ 'Can not switch to %s mode. No modem port provided.',
+ ATTransceiverMode.to_string(value))
+ return
+ self._logger.info('Set mode to %s',
+ ATTransceiverMode.to_string(value))
+ self._mode = value
+
+
+ @property
+ def at_terminator(self):
+ """
+ The string used to terminate AT commands sent / received on the channel.
+
+ Default value: '\r\n'
+ """
+ return self._mm_channel.at_terminator
+
+ @at_terminator.setter
+ def at_terminator(self, value):
+ """
+ Set the string to use to terminate AT commands.
+
+ This can vary by the modem being used.
+
+ @param value: The string terminator.
+
+ """
+ assert self._mm_channel
+ self._mm_channel.at_terminator = value
+ if self._modem_channel:
+ self._modem_channel.at_terminator = value
+
+
+ def process_wardmodem_response(self, response):
+ """
+ TODO(pprabhu)
+
+ @param response: wardmodem response to be translated to AT response to
+ the modem manager.
+
+ """
+ raise NotImplementedError()
+
+ # ##########################################################################
+ # Callbacks -- These are the functions that process events from the
+ # ATChannel or the TaskLoop. These functions are either
+ # (1) set as callbacks in the ATChannel, or
+ # (2) called internally to process the AT command to/from the TaskLoop.
+
+ def _process_modem_at_command(self, command):
+ """
+ Callback called by the physical modem channel when an AT response is
+ received.
+
+ @param command: AT command sent by the physical modem.
+
+ """
+ assert self.mode != ATTransceiverMode.WARDMODEM
+ self._logger.debug('Command {modem ==> []}: |%s|', command)
+ if self.mode == ATTransceiverMode.PASS_THROUGH:
+ self._logger.debug('Command {[] ==> mm}: |%s|' , command)
+ self._mm_channel.send(command)
+ else:
+ self._cached_modem_responses.append(command)
+ self._verify_and_send_mm_commands()
+
+
+ def _process_mm_at_command(self, command):
+ """
+ Callback called by the modem manager channel when an AT command is
+ received.
+
+ @param command: AT command sent by modem manager.
+
+ """
+ self._logger.debug('Command {mm ==> []}: |%s|', command)
+ if(self.mode == ATTransceiverMode.PASS_THROUGH or
+ self.mode == ATTransceiverMode.SPLIT_VERIFY):
+ self._logger.debug('Command {[] ==> modem}: |%s|', command)
+ self._modem_channel.send(command)
+ if(self.mode == ATTransceiverMode.WARDMODEM or
+ self.mode == ATTransceiverMode.SPLIT_VERIFY):
+ self._logger.debug('Command {[] ==> wardmodem}: |%s|', command)
+ self._post_wardmodem_request(command)
+
+
+ def _process_wardmodem_at_command(self, command):
+ """
+ Function called to process an AT command response of wardmodem.
+
+ This function is called after the response from the task loop has been
+ converted to an AT command.
+
+ @param command: The AT command response of wardmodem.
+
+ """
+ assert self.mode != ATTransceiverMode.PASS_THROUGH
+ self._logger.debug('Command {wardmodem ==> []: |%s|', command)
+ if self.mode == ATTransceiverMode.WARDMODEM:
+ self._logger.debug('Command {[] ==> mm}: |%s|', command)
+ self._mm_channel.send(command)
+ else:
+ self._cached_wardmodem_responses.append(command)
+ self._verify_and_send_mm_commands()
+
+
+ def _post_wardmodem_request(self, request):
+ """
+ TODO(pprabhu)
+
+ @param request: wardmodem request posted to satisfy a modemmanager AT
+ command.
+
+ """
+ raise NotImplementedError()
+
+ # ##########################################################################
+ # Helper functions
+
+ def _verify_and_send_mm_commands(self):
+ """
+ While there are corresponding responses from wardmodem and physical
+ modem, verify that they match and respond to modem manager.
+
+ """
+ if not self._cached_wardmodem_responses:
+ return
+ elif not self._cached_modem_responses:
+ if self._modem_response_wait_task is not None:
+ return
+ self._modem_response_wait_task = (
+ self._task_loop.post_task_after_delay(
+ self._modem_response_timed_out,
+ self._modem_response_timeout_milliseconds))
+ else:
+ if self._modem_response_wait_task is not None:
+ self._task_loop.cancel_posted_task(
+ self._modem_response_wait_task)
+ self._modem_response_wait_task = None
+ self._verify_and_send_mm_command(
+ self._cached_modem_responses.popleft(),
+ self._cached_wardmodem_responses.popleft())
+ self._verify_and_send_mm_commands()
+
+
+ def _verify_and_send_mm_command(self, modem_response, wardmodem_response):
+ """
+ Verify that the two AT commands match and respond to modem manager.
+
+ @param modem_response: AT command response of the physical modem.
+
+ @param wardmodem_response: AT command response of wardmodem.
+
+ """
+ # TODO(pprabhu) This can not handle unsolicited commands yet.
+ # Unsolicited commands from either of the modems will push the lists out
+ # of sync.
+ if wardmodem_response != modem_response:
+ self._logger.warning('Response verification failed.')
+ self._logger.warning('modem response: |%s|', modem_response)
+ self._logger.warning('wardmodem response: |%s|', wardmodem_response)
+ self._logger.warning('wardmodem response takes precedence.')
+ self._report_verification_failure(
+ self.VERIFICATION_FAILED_MISMATCH,
+ modem_response,
+ wardmodem_response)
+ self._logger.debug('Command {[] ==> mm}: |%s|' , wardmodem_response)
+ self._mm_channel.send(wardmodem_response)
+
+
+ def _modem_response_timed_out(self):
+ """
+ Callback called when we time out waiting for physical modem response for
+ some wardmodem response. Can't do much -- log physical modem failure and
+ forward wardmodem response anyway.
+
+ """
+ assert (not self._cached_modem_responses and
+ self._cached_wardmodem_responses)
+ wardmodem_response = self._cached_wardmodem_responses.popleft()
+ self._logger.warning('modem response timed out. '
+ 'Forwarding wardmodem response |%s| anyway.',
+ wardmodem_response)
+ self._logger.debug('Command {[] ==> mm}: |%s|' , wardmodem_response)
+ self._report_verification_failure(
+ self.VERIFICATION_FAILED_TIME_OUT,
+ None,
+ wardmodem_response)
+ self._mm_channel.send(wardmodem_response)
+ self._modem_response_wait_task = None
+ self._verify_and_send_mm_commands()
+
+
+ def _report_verification_failure(self, failure, modem_response,
+ wardmodem_response):
+ """
+ Failure to verify the wardmodem response will call this non-public
+ method.
+
+ At present, it is only used by unittests to detect failure.
+
+ @param failure: The cause of failure. Must be one of
+ VERIFICATION_FAILED_MISMATCH or VERIFICATION_FAILED_TIME_OUT.
+
+ @param modem_response: The received modem response (if any).
+
+ @param wardmodem_response: The received wardmodem response.
+
+ """
+ pass
+
+
diff --git a/client/cros/cellular/wardmodem/at_transceiver_unittest.py b/client/cros/cellular/wardmodem/at_transceiver_unittest.py
new file mode 100644
index 0000000..d7f9c1d
--- /dev/null
+++ b/client/cros/cellular/wardmodem/at_transceiver_unittest.py
@@ -0,0 +1,301 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import at_transceiver
+
+import logging
+import mox
+import os
+import unittest
+
+import at_channel
+import task_loop
+
+class ATTransceiverTestCase(unittest.TestCase):
+ """
+ Base test fixture for ATTransceiver class.
+
+ """
+
+ def setUp(self):
+ self._mox = mox.Mox()
+
+ # Create a temporary pty pair for the ATTransceiver constructor
+ master, slave = os.openpty()
+
+ self._at_transceiver = at_transceiver.ATTransceiver(slave, slave)
+
+ # Now replace internal objects in _at_transceiver with mocks
+ self._at_transceiver._modem_response_timeout_milliseconds = 0
+ self._mock_modem_channel = self._mox.CreateMock(at_channel.ATChannel)
+ self._at_transceiver._modem_channel = self._mock_modem_channel
+ self._mock_mm_channel = self._mox.CreateMock(at_channel.ATChannel)
+ self._at_transceiver._mm_channel = self._mock_mm_channel
+ self._mock_task_loop = self._mox.CreateMock(task_loop.TaskLoop)
+ self._at_transceiver._task_loop = self._mock_task_loop
+
+
+ def test_successful_mode_selection(self):
+ """
+ Test that all modes can be selected, when both channels are provided.
+
+ """
+ self._at_transceiver.mode = at_transceiver.ATTransceiverMode.WARDMODEM
+ self.assertEqual(self._at_transceiver.mode,
+ at_transceiver.ATTransceiverMode.WARDMODEM)
+ self._at_transceiver.mode = (
+ at_transceiver.ATTransceiverMode.PASS_THROUGH)
+ self.assertEqual(self._at_transceiver.mode,
+ at_transceiver.ATTransceiverMode.PASS_THROUGH)
+ self._at_transceiver.mode = (
+ at_transceiver.ATTransceiverMode.SPLIT_VERIFY)
+ self.assertEqual(self._at_transceiver.mode,
+ at_transceiver.ATTransceiverMode.SPLIT_VERIFY)
+
+ def test_unsuccessful_mode_selection(self):
+ """
+ Test that only WARDMODEM mode can be selected if the modem channel is
+ missing.
+
+ """
+ self._at_transceiver._modem_channel = None
+ self._at_transceiver.mode = at_transceiver.ATTransceiverMode.WARDMODEM
+ self.assertEqual(self._at_transceiver.mode,
+ at_transceiver.ATTransceiverMode.WARDMODEM)
+ self._at_transceiver.mode = (
+ at_transceiver.ATTransceiverMode.PASS_THROUGH)
+ self.assertEqual(self._at_transceiver.mode,
+ at_transceiver.ATTransceiverMode.WARDMODEM)
+ self._at_transceiver.mode = (
+ at_transceiver.ATTransceiverMode.SPLIT_VERIFY)
+ self.assertEqual(self._at_transceiver.mode,
+ at_transceiver.ATTransceiverMode.WARDMODEM)
+
+
+class ATTransceiverWardModemTestCase(ATTransceiverTestCase):
+ """
+ Test ATTransceiver class in the WARDMODEM mode.
+
+ """
+
+ def setUp(self):
+ super(ATTransceiverWardModemTestCase, self).setUp()
+ self._at_transceiver.mode = at_transceiver.ATTransceiverMode.WARDMODEM
+
+
+ def test_wardmodem_at_command(self):
+ """
+ Test the case when AT command is received from wardmodem.
+
+ """
+ at_command = 'AT+commmmmmmmmand'
+ self._mock_mm_channel.send(at_command)
+
+ self._mox.ReplayAll()
+ self._at_transceiver._process_wardmodem_at_command(at_command)
+ self._mox.VerifyAll()
+
+
+ def test_mm_at_command(self):
+ """
+ Test the case when AT command is received from modem manager.
+
+ """
+ at_command = 'AT+commmmmmmmmand'
+ self._mox.StubOutWithMock(self._at_transceiver,
+ '_post_wardmodem_request')
+
+ self._at_transceiver._post_wardmodem_request(at_command)
+
+ self._mox.ReplayAll()
+ self._at_transceiver._process_mm_at_command(at_command)
+ self._mox.UnsetStubs()
+ self._mox.VerifyAll()
+
+
+class ATTransceiverPassThroughTestCase(ATTransceiverTestCase):
+ """
+ Test ATTransceiver class in the PASS_THROUGH mode.
+
+ """
+
+ def setUp(self):
+ super(ATTransceiverPassThroughTestCase, self).setUp()
+ self._at_transceiver.mode = (
+ at_transceiver.ATTransceiverMode.PASS_THROUGH)
+
+
+ def test_modem_at_command(self):
+ """
+ Test the case when AT command received from physical modem.
+
+ """
+ at_command = 'AT+commmmmmmmmand'
+ self._mock_mm_channel.send(at_command)
+
+ self._mox.ReplayAll()
+ self._at_transceiver._process_modem_at_command(at_command)
+ self._mox.VerifyAll()
+
+
+ def test_mm_at_command(self):
+ """
+ Test the case when AT command is received from modem manager.
+
+ """
+ at_command = 'AT+commmmmmmmmand'
+ self._mock_modem_channel.send(at_command)
+
+ self._mox.ReplayAll()
+ self._at_transceiver._process_mm_at_command(at_command)
+ self._mox.VerifyAll()
+
+
+class ATTransceiverSplitVerifyTestCase(ATTransceiverTestCase):
+ """
+ Test ATTransceiver class in the SPLIT_VERIFY mode.
+
+ """
+
+ def setUp(self):
+ super(ATTransceiverSplitVerifyTestCase, self).setUp()
+ self._at_transceiver.mode = (
+ at_transceiver.ATTransceiverMode.SPLIT_VERIFY)
+
+
+ def test_mm_at_command(self):
+ """
+ Test that that incoming modem manager command is multiplexed to
+ wardmodem and physical modem.
+
+ """
+ at_command = 'AT+commmmmmmmmand'
+ self._mox.StubOutWithMock(self._at_transceiver,
+ '_post_wardmodem_request')
+ self._mock_modem_channel.send(at_command).InAnyOrder()
+ self._at_transceiver._post_wardmodem_request(at_command).InAnyOrder()
+
+ self._mox.ReplayAll()
+ self._at_transceiver._process_mm_at_command(at_command)
+ self._mox.UnsetStubs()
+ self._mox.VerifyAll()
+
+
+ def test_successful_single_at_response_modem_wardmodem(self):
+ """
+ Test the case when one AT response is received successfully.
+ In this case, physical modem command comes first.
+
+ """
+ at_command = 'AT+commmmmmmmmand'
+ self._mock_mm_channel.send(at_command)
+
+ self._mox.ReplayAll()
+ self._at_transceiver._process_modem_at_command(at_command)
+ self._at_transceiver._process_wardmodem_at_command(at_command)
+ self._mox.VerifyAll()
+
+
+ def test_successful_single_at_response_wardmodem_modem(self):
+ """
+ Test the case when one AT response is received successfully.
+ In this case, wardmodem command comes first.
+
+ """
+ at_command = 'AT+commmmmmmmmand'
+ task_id = 3
+ self._mock_task_loop.post_task_after_delay(
+ self._at_transceiver._modem_response_timed_out,
+ mox.IgnoreArg()).AndReturn(task_id)
+ self._mock_task_loop.cancel_posted_task(task_id)
+ self._mock_mm_channel.send(at_command)
+
+ self._mox.ReplayAll()
+ self._at_transceiver._process_wardmodem_at_command(at_command)
+ self._at_transceiver._process_modem_at_command(at_command)
+ self._mox.VerifyAll()
+
+ def test_mismatched_at_response(self):
+ """
+ Test the case when both responses arrive, but are not identical.
+
+ """
+ wardmodem_command = 'AT+wardmodem'
+ modem_command = 'AT+modem'
+ self._mox.StubOutWithMock(self._at_transceiver,
+ '_report_verification_failure')
+ self._at_transceiver._report_verification_failure(
+ self._at_transceiver.VERIFICATION_FAILED_MISMATCH,
+ modem_command,
+ wardmodem_command)
+ self._mock_mm_channel.send(wardmodem_command)
+
+ self._mox.ReplayAll()
+ self._at_transceiver._process_modem_at_command(modem_command)
+ self._at_transceiver._process_wardmodem_at_command(wardmodem_command)
+ self._mox.UnsetStubs()
+ self._mox.VerifyAll()
+
+
+ def test_modem_response_times_out(self):
+ """
+ Test the case when the physical modem fails to respond.
+
+ """
+ at_command = 'AT+commmmmmmmmand'
+ task_id = 3
+ self._mox.StubOutWithMock(self._at_transceiver,
+ '_report_verification_failure')
+
+ self._mock_task_loop.post_task_after_delay(
+ self._at_transceiver._modem_response_timed_out,
+ mox.IgnoreArg()).AndReturn(task_id)
+ self._at_transceiver._report_verification_failure(
+ self._at_transceiver.VERIFICATION_FAILED_TIME_OUT,
+ None,
+ at_command)
+ self._mock_mm_channel.send(at_command)
+
+ self._mox.ReplayAll()
+ self._at_transceiver._process_wardmodem_at_command(at_command)
+ self._at_transceiver._modem_response_timed_out()
+ self._mox.UnsetStubs()
+ self._mox.VerifyAll()
+
+
+ def test_multiple_successful_responses(self):
+ """
+ Test the case two wardmodem responses are queued, and then two matching
+ modem responses are received.
+
+ """
+ first_at_command = 'AT+first'
+ second_at_command = 'AT+second'
+ first_task_id = 3
+ second_task_id = 4
+
+ self._mock_task_loop.post_task_after_delay(
+ self._at_transceiver._modem_response_timed_out,
+ mox.IgnoreArg()).AndReturn(first_task_id)
+ self._mock_task_loop.cancel_posted_task(first_task_id)
+ self._mock_mm_channel.send(first_at_command)
+ self._mock_task_loop.post_task_after_delay(
+ self._at_transceiver._modem_response_timed_out,
+ mox.IgnoreArg()).AndReturn(second_task_id)
+ self._mock_task_loop.cancel_posted_task(second_task_id)
+ self._mock_mm_channel.send(second_at_command)
+
+ self._mox.ReplayAll()
+ self._at_transceiver._process_wardmodem_at_command(first_at_command)
+ self._at_transceiver._process_wardmodem_at_command(second_at_command)
+ self._at_transceiver._process_modem_at_command(first_at_command)
+ self._at_transceiver._process_modem_at_command(second_at_command)
+ self._mox.VerifyAll()
+
+
+if __name__ == '__main__':
+ logging.basicConfig(level=logging.DEBUG)
+ unittest.main()