| #!/usr/bin/env python |
| |
| # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import at_channel |
| |
| import fcntl |
| import functools |
| import glib |
| import logging |
| import mox |
| import os |
| import tempfile |
| import unittest |
| |
| import task_loop |
| |
| class ATChannelTestCase(unittest.TestCase): |
| """ |
| Test fixture for ATChannel class. |
| |
| """ |
| |
| def setUp(self): |
| self.mox = mox.Mox() |
| |
| master, slave = os.openpty() |
| self._at_channel = at_channel.ATChannel( |
| self._recieve_command_local_callback, slave, 'test') |
| |
| # Replace the channel inside _at_channel with a tempfile |
| # We will use the tempfile to simulate a tty pair. |
| os.close(master) |
| os.close(slave) |
| self._channel_file = tempfile.TemporaryFile(mode = 'w+') |
| # These properties are a copy of the properties set in ATChannel for the |
| # tty pair. |
| flags = fcntl.fcntl(self._channel_file.fileno(), fcntl.F_GETFL) |
| flags = flags | os.O_NONBLOCK |
| fcntl.fcntl(self._channel_file.fileno(), fcntl.F_SETFL, flags) |
| self._at_channel._channel = self._channel_file.fileno() |
| # We need to seek() to the beginning of the file to simulate tty read. |
| # So remember the head of the file. |
| self._channel_file_head = self._channel_file.tell() |
| |
| # Also mock out the task_loop |
| self._mox_task_loop = self.mox.CreateMock(task_loop.TaskLoop) |
| self._at_channel._task_loop = self._mox_task_loop |
| |
| |
| def tearDown(self): |
| self._channel_file.close() |
| |
| # ########################################################################## |
| # Tests |
| |
| def test_successful_send(self): |
| """ |
| Test that a single AT command can be sent on the channel. |
| |
| """ |
| payload = 'A not so huge AT+CEREG command.' |
| self._at_channel.send(payload) |
| received_command = self._recieve_command_remote() |
| self.assertTrue(received_command.endswith('\r\n')) |
| self.assertEqual(payload.strip(), received_command.strip()) |
| |
| # Change the AT command guard strings and check again. |
| self._at_channel.at_prefix = '$$' |
| self._at_channel.at_suffix = '##' |
| payload = 'A not so huge AT+CEREG command.' |
| self._at_channel.send(payload) |
| received_command = self._recieve_command_remote() |
| self.assertTrue(received_command.startswith('$$')) |
| self.assertTrue(received_command.endswith('##')) |
| self.assertEqual(payload.strip(), |
| received_command.strip('$$').strip('##')) |
| |
| |
| def test_recieve_single_at_command(self): |
| """ |
| Test that a single AT command can be received together on the channel. |
| |
| """ |
| payload = 'We send you our AT+good wishes too!\r\n' |
| callback = lambda channel, payload: None |
| self._at_channel._receiver_callback = callback |
| self._mox_task_loop.post_task(callback, payload.strip()) |
| self.mox.ReplayAll() |
| self._send_command_remote(payload) |
| self._at_channel._handle_channel_cb(self._channel_file.fileno(), |
| glib.IO_IN) |
| self.mox.VerifyAll() |
| |
| |
| def test_receive_at_commands_differet_terminators(self): |
| """ |
| Test that AT commands are recieved correctly when different supported |
| termination strings are being used. |
| |
| """ |
| # ; is a continuation marker. AT1;2 == AT1\r\nAT2 |
| payloads = ['AT1\r\nA', 'T2\rA', 'T3\nA', 'T4;', '5\r\n'] |
| callback = lambda channel, payload: None |
| self._at_channel._receiver_callback = callback |
| self._mox_task_loop.post_task(callback, 'AT1') |
| self._mox_task_loop.post_task(callback, 'AT2') |
| self._mox_task_loop.post_task(callback, 'AT3') |
| self._mox_task_loop.post_task(callback, 'AT4') |
| self._mox_task_loop.post_task(callback, 'AT5') |
| |
| self.mox.ReplayAll() |
| for payload in payloads: |
| self._send_command_remote(payload) |
| self._at_channel._handle_channel_cb(self._channel_file.fileno(), |
| glib.IO_IN) |
| self.mox.VerifyAll() |
| |
| |
| def test_recieve_at_commands_in_parts(self): |
| """ |
| Test that a multiple AT commands can be received in parts on the |
| channel. |
| |
| """ |
| payloads = ['AT1', '11\r\n', '\r\nAT22', '2\r\nAT333', '\r\n'] |
| callback = lambda channel, payload: None |
| self._at_channel._receiver_callback = callback |
| self._mox_task_loop.post_task(callback, 'AT111') |
| self._mox_task_loop.post_task(callback, 'AT222') |
| self._mox_task_loop.post_task(callback, 'AT333') |
| |
| self.mox.ReplayAll() |
| for payload in payloads: |
| self._send_command_remote(payload) |
| self._at_channel._handle_channel_cb(self._channel_file.fileno(), |
| glib.IO_IN) |
| self.mox.VerifyAll() |
| |
| |
| def test_recieve_long_at_commands(self): |
| """ |
| Test that a multiple AT commands can be received in parts on the |
| channel. |
| |
| """ |
| payloads = ['AT1+', |
| '123456789\r\nAT2+123456789\r\nAT3+1234567', |
| '89\r\n'] |
| callback = lambda channel, payload: None |
| self._at_channel._receiver_callback = callback |
| self._mox_task_loop.post_task(callback, 'AT1+123456789') |
| self._mox_task_loop.post_task(callback, 'AT2+123456789') |
| self._mox_task_loop.post_task(callback, 'AT3+123456789') |
| |
| self.mox.ReplayAll() |
| at_channel.CHANNEL_READ_CHUNK_SIZE = 4 |
| for payload in payloads: |
| self._send_command_remote(payload) |
| self._at_channel._handle_channel_cb(self._channel_file.fileno(), |
| glib.IO_IN) |
| self.mox.VerifyAll() |
| |
| # ########################################################################## |
| # Helper functions |
| |
| def _clean_channel_file(self): |
| """ |
| Clean the tempfile used to simulate tty, and reset the r/w head. |
| |
| """ |
| self._channel_file.truncate(0) |
| self._channel_file_head = self._channel_file.tell() |
| |
| |
| def _send_command_remote(self, payload): |
| """ |
| Simulate a command being sent from the remote tty port. |
| |
| @param payload: The command to send. |
| |
| """ |
| self._clean_channel_file() |
| self._channel_file.write(payload) |
| self._channel_file.flush() |
| self._channel_file.seek(self._channel_file_head) |
| |
| |
| def _recieve_command_remote(self): |
| """ |
| Simluate a command being received at the remote tty port. |
| |
| """ |
| self._channel_file.flush() |
| self._channel_file.seek(self._channel_file_head) |
| payload_list = [] |
| for buf in iter(functools.partial(self._channel_file.read, 128), ''): |
| payload_list.append(buf) |
| self._clean_channel_file() |
| return ''.join(payload_list) |
| |
| |
| def _recieve_command_local_callback(self, payload): |
| pass |
| |
| |
| if __name__ == '__main__': |
| logging.basicConfig(level=logging.DEBUG) |
| unittest.main() |