blob: 4c532c860bfacf8c1d2f0c93454cc3ba8aeeb55b [file] [log] [blame]
#!/usr/bin/env python
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import at_channel
import fcntl
import functools
import glib
import logging
import mox
import os
import tempfile
import unittest
import task_loop
class ATChannelTestCase(unittest.TestCase):
"""
Test fixture for ATChannel class.
"""
def setUp(self):
self.mox = mox.Mox()
master, slave = os.openpty()
self._at_channel = at_channel.ATChannel(
self._recieve_command_local_callback, slave, 'test')
# Replace the channel inside _at_channel with a tempfile
# We will use the tempfile to simulate a tty pair.
os.close(master)
os.close(slave)
self._channel_file = tempfile.TemporaryFile(mode = 'w+')
# These properties are a copy of the properties set in ATChannel for the
# tty pair.
flags = fcntl.fcntl(self._channel_file.fileno(), fcntl.F_GETFL)
flags = flags | os.O_NONBLOCK
fcntl.fcntl(self._channel_file.fileno(), fcntl.F_SETFL, flags)
self._at_channel._channel = self._channel_file.fileno()
# We need to seek() to the beginning of the file to simulate tty read.
# So remember the head of the file.
self._channel_file_head = self._channel_file.tell()
# Also mock out the task_loop
self._mox_task_loop = self.mox.CreateMock(task_loop.TaskLoop)
self._at_channel._task_loop = self._mox_task_loop
def tearDown(self):
self._channel_file.close()
# ##########################################################################
# Tests
def test_successful_send(self):
"""
Test that a single AT command can be sent on the channel.
"""
payload = 'A not so huge AT+CEREG command.'
self._at_channel.send(payload)
received_command = self._recieve_command_remote()
self.assertTrue(received_command.endswith('\r\n'))
self.assertEqual(payload.strip(), received_command.strip())
# Change the AT command guard strings and check again.
self._at_channel.at_prefix = '$$'
self._at_channel.at_suffix = '##'
payload = 'A not so huge AT+CEREG command.'
self._at_channel.send(payload)
received_command = self._recieve_command_remote()
self.assertTrue(received_command.startswith('$$'))
self.assertTrue(received_command.endswith('##'))
self.assertEqual(payload.strip(),
received_command.strip('$$').strip('##'))
def test_recieve_single_at_command(self):
"""
Test that a single AT command can be received together on the channel.
"""
payload = 'We send you our AT+good wishes too!\r\n'
callback = lambda channel, payload: None
self._at_channel._receiver_callback = callback
self._mox_task_loop.post_task(callback, payload.strip())
self.mox.ReplayAll()
self._send_command_remote(payload)
self._at_channel._handle_channel_cb(self._channel_file.fileno(),
glib.IO_IN)
self.mox.VerifyAll()
def test_receive_at_commands_differet_terminators(self):
"""
Test that AT commands are recieved correctly when different supported
termination strings are being used.
"""
# ; is a continuation marker. AT1;2 == AT1\r\nAT2
payloads = ['AT1\r\nA', 'T2\rA', 'T3\nA', 'T4;', '5\r\n']
callback = lambda channel, payload: None
self._at_channel._receiver_callback = callback
self._mox_task_loop.post_task(callback, 'AT1')
self._mox_task_loop.post_task(callback, 'AT2')
self._mox_task_loop.post_task(callback, 'AT3')
self._mox_task_loop.post_task(callback, 'AT4')
self._mox_task_loop.post_task(callback, 'AT5')
self.mox.ReplayAll()
for payload in payloads:
self._send_command_remote(payload)
self._at_channel._handle_channel_cb(self._channel_file.fileno(),
glib.IO_IN)
self.mox.VerifyAll()
def test_recieve_at_commands_in_parts(self):
"""
Test that a multiple AT commands can be received in parts on the
channel.
"""
payloads = ['AT1', '11\r\n', '\r\nAT22', '2\r\nAT333', '\r\n']
callback = lambda channel, payload: None
self._at_channel._receiver_callback = callback
self._mox_task_loop.post_task(callback, 'AT111')
self._mox_task_loop.post_task(callback, 'AT222')
self._mox_task_loop.post_task(callback, 'AT333')
self.mox.ReplayAll()
for payload in payloads:
self._send_command_remote(payload)
self._at_channel._handle_channel_cb(self._channel_file.fileno(),
glib.IO_IN)
self.mox.VerifyAll()
def test_recieve_long_at_commands(self):
"""
Test that a multiple AT commands can be received in parts on the
channel.
"""
payloads = ['AT1+',
'123456789\r\nAT2+123456789\r\nAT3+1234567',
'89\r\n']
callback = lambda channel, payload: None
self._at_channel._receiver_callback = callback
self._mox_task_loop.post_task(callback, 'AT1+123456789')
self._mox_task_loop.post_task(callback, 'AT2+123456789')
self._mox_task_loop.post_task(callback, 'AT3+123456789')
self.mox.ReplayAll()
at_channel.CHANNEL_READ_CHUNK_SIZE = 4
for payload in payloads:
self._send_command_remote(payload)
self._at_channel._handle_channel_cb(self._channel_file.fileno(),
glib.IO_IN)
self.mox.VerifyAll()
# ##########################################################################
# Helper functions
def _clean_channel_file(self):
"""
Clean the tempfile used to simulate tty, and reset the r/w head.
"""
self._channel_file.truncate(0)
self._channel_file_head = self._channel_file.tell()
def _send_command_remote(self, payload):
"""
Simulate a command being sent from the remote tty port.
@param payload: The command to send.
"""
self._clean_channel_file()
self._channel_file.write(payload)
self._channel_file.flush()
self._channel_file.seek(self._channel_file_head)
def _recieve_command_remote(self):
"""
Simluate a command being received at the remote tty port.
"""
self._channel_file.flush()
self._channel_file.seek(self._channel_file_head)
payload_list = []
for buf in iter(functools.partial(self._channel_file.read, 128), ''):
payload_list.append(buf)
self._clean_channel_file()
return ''.join(payload_list)
def _recieve_command_local_callback(self, payload):
pass
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
unittest.main()