blob: 39b70daf1afb045ab3a64d48329090555c33b88c [file] [log] [blame]
#!/usr/bin/env python
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import at_transceiver
import logging
import mox
import os
import unittest
import at_channel
import modem_configuration
import task_loop
import wardmodem_exceptions as wme
class ATTransceiverTestCase(unittest.TestCase):
"""
Base test fixture for ATTransceiver class.
"""
class TestMachine(object):
""" Stub test machine used by tests below. """
def test_function(self, _):
"""
A stub StateMachine API function.
wardmodem calls will be placed to this function.
@param _: Ignored.
"""
pass
# Needed in a test machine.
def get_well_known_name(self):
""" Get the well known name of this machine as str. """
return "test_machine"
def setUp(self):
self._mox = mox.Mox()
# Create a temporary pty pair for the ATTransceiver constructor
master, slave = os.openpty()
self._modem_conf = modem_configuration.ModemConfiguration()
self._at_transceiver = at_transceiver.ATTransceiver(slave,
self._modem_conf,
slave)
# Now replace internal objects in _at_transceiver with mocks
self._at_transceiver._modem_response_timeout_milliseconds = 0
self._mock_modem_channel = self._mox.CreateMock(at_channel.ATChannel)
self._at_transceiver._modem_channel = self._mock_modem_channel
self._mock_mm_channel = self._mox.CreateMock(at_channel.ATChannel)
self._at_transceiver._mm_channel = self._mock_mm_channel
self._mock_task_loop = self._mox.CreateMock(task_loop.TaskLoop)
self._at_transceiver._task_loop = self._mock_task_loop
# Also empty out the internal maps, so that actual loaded configuration
# does not interfere with the test.
self._at_transceiver._at_to_wm_action_map = {}
self._at_transceiver._wm_response_to_at_map = {}
class ATTransceiverCommonTestCase(ATTransceiverTestCase):
"""
Tests common to all three modes of ATTransceiver.
"""
def test_successful_mode_selection(self):
"""
Test that all modes can be selected, when both channels are provided.
"""
self._at_transceiver.mode = at_transceiver.ATTransceiverMode.WARDMODEM
self.assertEqual(self._at_transceiver.mode,
at_transceiver.ATTransceiverMode.WARDMODEM)
self._at_transceiver.mode = (
at_transceiver.ATTransceiverMode.PASS_THROUGH)
self.assertEqual(self._at_transceiver.mode,
at_transceiver.ATTransceiverMode.PASS_THROUGH)
self._at_transceiver.mode = (
at_transceiver.ATTransceiverMode.SPLIT_VERIFY)
self.assertEqual(self._at_transceiver.mode,
at_transceiver.ATTransceiverMode.SPLIT_VERIFY)
def test_unsuccessful_mode_selection(self):
"""
Test that only WARDMODEM mode can be selected if the modem channel is
missing.
"""
self._at_transceiver._modem_channel = None
self._at_transceiver.mode = at_transceiver.ATTransceiverMode.WARDMODEM
self.assertEqual(self._at_transceiver.mode,
at_transceiver.ATTransceiverMode.WARDMODEM)
self._at_transceiver.mode = (
at_transceiver.ATTransceiverMode.PASS_THROUGH)
self.assertEqual(self._at_transceiver.mode,
at_transceiver.ATTransceiverMode.WARDMODEM)
self._at_transceiver.mode = (
at_transceiver.ATTransceiverMode.SPLIT_VERIFY)
self.assertEqual(self._at_transceiver.mode,
at_transceiver.ATTransceiverMode.WARDMODEM)
def test_update_at_to_wm_action_map(self):
"""
Test that _at_to_wm_action_map is updated correctly under different
scenarios.
"""
# The diffs if this test fails can be rather long.
self.maxDiff = None
self._at_transceiver._at_to_wm_action_map = {}
# Test initialization
raw_map = {'AT1=': ('STATE_MACHINE1', 'function1'),
'AT2=1,2': ('STATE_MACHINE2', 'function2'),
'AT3=*,care,do': ('STATE_MACHINE3', 'function3', (0, 1)),
'AT4?': ('STATE_MACHINE4', 'function4'),
'AT5=': ('STATE_MACHINE5', 'function5', ()),
'AT5=*': ('STATE_MACHINE6', 'function6')}
parsed_map = {'AT1=': {(): ('STATE_MACHINE1', 'function1', ())},
'AT2=': {('1','2'): ('STATE_MACHINE2', 'function2', ())},
'AT3=': {('*','care','do'): ('STATE_MACHINE3',
'function3', (0, 1))},
'AT4?': {(): ('STATE_MACHINE4', 'function4', ())},
'AT5=': {(): ('STATE_MACHINE5', 'function5', ()),
('*',): ('STATE_MACHINE6', 'function6', ())}}
self._at_transceiver._update_at_to_wm_action_map(raw_map)
self.assertEqual(parsed_map, self._at_transceiver._at_to_wm_action_map)
# Test update
raw_good_update = {'AT1=': ('STATE_MACHINE7', 'function7'),
'AT5=2': ('STATE_MACHINE8', 'function8', 0),
'AT6?': ('STATE_MACHINE9', 'function9')}
parsed_map = {'AT1=': {(): ('STATE_MACHINE7', 'function7', ())},
'AT2=': {('1','2'): ('STATE_MACHINE2', 'function2', ())},
'AT3=': {('*','care','do'): ('STATE_MACHINE3',
'function3', (0, 1))},
'AT4?': {(): ('STATE_MACHINE4', 'function4', ())},
'AT5=': {(): ('STATE_MACHINE5', 'function5', ()),
('*',): ('STATE_MACHINE6', 'function6', ()),
('2',): ('STATE_MACHINE8', 'function8', (0,))},
'AT6?': {(): ('STATE_MACHINE9', 'function9', ())}}
self._at_transceiver._update_at_to_wm_action_map(raw_good_update)
self.assertEqual(parsed_map, self._at_transceiver._at_to_wm_action_map)
def test_find_wardmodem_action_for_at(self):
"""
Setup _at_to_wm_action_map in the test and then test whether we can find
actions for AT commands off of that map.
"""
raw_map = {'AT1=': ('STATE_MACHINE1', 'function1'),
'AT2=1,2': ('STATE_MACHINE2', 'function2'),
'AT3=*,b,c': ('STATE_MACHINE3', 'function3', (0, 1)),
'AT4?': ('STATE_MACHINE4', 'function4'),
'AT5=': ('STATE_MACHINE5', 'function5', ()),
'AT5=*': ('STATE_MACHINE6', 'function6')}
self._at_transceiver._update_at_to_wm_action_map(raw_map)
self.assertEqual(
('STATE_MACHINE1', 'function1', ()),
self._at_transceiver._find_wardmodem_action_for_at('AT1='))
self.assertEqual(
('STATE_MACHINE2', 'function2', ()),
self._at_transceiver._find_wardmodem_action_for_at('AT2=1,2'))
self.assertEqual(
('STATE_MACHINE3', 'function3', ('a','b')),
self._at_transceiver._find_wardmodem_action_for_at('AT3=a,b,c'))
self.assertEqual(
('STATE_MACHINE3', 'function3', ('','b')),
self._at_transceiver._find_wardmodem_action_for_at('AT3=,b,c'))
self.assertEqual(
('STATE_MACHINE5', 'function5', ()),
self._at_transceiver._find_wardmodem_action_for_at('AT5='))
self.assertEqual(
('STATE_MACHINE6', 'function6', ()),
self._at_transceiver._find_wardmodem_action_for_at('AT5=s'))
# Unsuccessful cases
self.assertRaises(
wme.ATTransceiverException,
self._at_transceiver._find_wardmodem_action_for_at,
'DOESNOTEXIST')
def test_find_wardmodem_action_for_at_returns_fallback(self):
"""
Test that when a fallback machine is setup, and unmatched AT command is
forwarded to this machine.
"""
mock_test_machine = self._mox.CreateMock(self.TestMachine)
mock_test_machine.get_well_known_name().MultipleTimes().AndReturn(
'FALLBACK_MACHINE')
self._mox.ReplayAll()
self._at_transceiver.register_state_machine(mock_test_machine)
self._at_transceiver.register_fallback_state_machine(
mock_test_machine.get_well_known_name(),
'act_on')
self.assertEqual(
('FALLBACK_MACHINE', 'act_on', ('DOESNOTEXIST',)),
self._at_transceiver._find_wardmodem_action_for_at(
'DOESNOTEXIST'))
self._mox.VerifyAll()
def test_post_wardmodem_request(self):
"""
Test that a wardmodem request can be posted successfully end-to-end.
"""
raw_map = {'AT=*': ('TestMachine', 'test_function', 0)}
arg = 'fake_arg'
command = 'AT=' + arg
mock_test_machine = self._mox.CreateMock(self.TestMachine)
self._at_transceiver._update_at_to_wm_action_map(raw_map)
mock_test_machine.get_well_known_name().AndReturn('TestMachine')
self._mock_task_loop.post_task(
self._at_transceiver._execute_state_machine_function,
command, mox.IgnoreArg(), mock_test_machine.test_function,
arg)
self._mox.ReplayAll()
self._at_transceiver.register_state_machine(mock_test_machine)
self._at_transceiver._post_wardmodem_request(command)
self._mox.VerifyAll()
def test_update_wm_response_to_at_map(self):
"""
Test that the wm_response_to_at_map is correctly updated.
"""
raw_map = {'some_function': 'AT=some_function',
'some_other_function': 'AT=some_other_function'}
self._at_transceiver._update_wm_response_to_at_map(raw_map)
self.assertEqual(raw_map,
self._at_transceiver._wm_response_to_at_map)
raw_map = {'some_other_function': 'AT=overwritten_function',
'some_new_function': 'AT=this_is_new_too'}
updated_map = {'some_function': 'AT=some_function',
'some_other_function': 'AT=overwritten_function',
'some_new_function': 'AT=this_is_new_too'}
self._at_transceiver._update_wm_response_to_at_map(raw_map)
self.assertEqual(updated_map,
self._at_transceiver._wm_response_to_at_map)
def test_construct_at_response(self):
"""
Test that construct_at_response correctly replaces by actual arguments.
"""
self.assertEqual(
'AT=arg1,some,arg2',
self._at_transceiver._construct_at_response(
'AT=*,some,*', 'arg1','arg2'))
self.assertEqual(
'AT=1,some,thing',
self._at_transceiver._construct_at_response(
'AT=*,some,thing', 1))
self.assertEqual(
'AT=some,other,thing',
self._at_transceiver._construct_at_response(
'AT=some,other,thing'))
self.assertEqual(
'AT=needsnone',
self._at_transceiver._construct_at_response(
'AT=needsnone', 'butonegiven'))
# Unsuccessful cases
self.assertRaises(
wme.ATTransceiverException,
self._at_transceiver._construct_at_response,
'AT=*,needstwo,*', 'onlyonegiven')
def test_process_wardmodem_response(self):
"""
A basic test for process_wardmodem_response.
"""
self._mox.StubOutWithMock(self._at_transceiver,
'_process_wardmodem_at_command')
raw_map = {'func1': 'AT=*,given,*',
'func2': 'AT=nothing,needed'}
self._at_transceiver._update_wm_response_to_at_map(raw_map)
self._at_transceiver._process_wardmodem_at_command('AT=a,given,2')
self._at_transceiver._process_wardmodem_at_command('AT=nothing,needed')
self._mox.ReplayAll()
self._at_transceiver.process_wardmodem_response('func1','a',2)
self._at_transceiver.process_wardmodem_response('func2')
self._mox.UnsetStubs()
self._mox.VerifyAll()
class ATTransceiverWardModemTestCase(ATTransceiverTestCase):
"""
Test ATTransceiver class in the WARDMODEM mode.
"""
def setUp(self):
super(ATTransceiverWardModemTestCase, self).setUp()
self._at_transceiver.mode = at_transceiver.ATTransceiverMode.WARDMODEM
def test_wardmodem_at_command(self):
"""
Test the case when AT command is received from wardmodem.
"""
at_command = 'AT+commmmmmmmmand'
self._mock_mm_channel.send(at_command)
self._mox.ReplayAll()
self._at_transceiver._process_wardmodem_at_command(at_command)
self._mox.VerifyAll()
def test_mm_at_command(self):
"""
Test the case when AT command is received from modem manager.
"""
at_command = 'AT+commmmmmmmmand'
self._mox.StubOutWithMock(self._at_transceiver,
'_post_wardmodem_request')
self._at_transceiver._post_wardmodem_request(at_command)
self._mox.ReplayAll()
self._at_transceiver._process_mm_at_command(at_command)
self._mox.UnsetStubs()
self._mox.VerifyAll()
class ATTransceiverPassThroughTestCase(ATTransceiverTestCase):
"""
Test ATTransceiver class in the PASS_THROUGH mode.
"""
def setUp(self):
super(ATTransceiverPassThroughTestCase, self).setUp()
self._at_transceiver.mode = (
at_transceiver.ATTransceiverMode.PASS_THROUGH)
def test_modem_at_command(self):
"""
Test the case when AT command received from physical modem.
"""
at_command = 'AT+commmmmmmmmand'
self._mock_mm_channel.send(at_command)
self._mox.ReplayAll()
self._at_transceiver._process_modem_at_command(at_command)
self._mox.VerifyAll()
def test_mm_at_command(self):
"""
Test the case when AT command is received from modem manager.
"""
at_command = 'AT+commmmmmmmmand'
self._mock_modem_channel.send(at_command)
self._mox.ReplayAll()
self._at_transceiver._process_mm_at_command(at_command)
self._mox.VerifyAll()
class ATTransceiverSplitVerifyTestCase(ATTransceiverTestCase):
"""
Test ATTransceiver class in the SPLIT_VERIFY mode.
"""
def setUp(self):
super(ATTransceiverSplitVerifyTestCase, self).setUp()
self._at_transceiver.mode = (
at_transceiver.ATTransceiverMode.SPLIT_VERIFY)
def test_mm_at_command(self):
"""
Test that that incoming modem manager command is multiplexed to
wardmodem and physical modem.
"""
at_command = 'AT+commmmmmmmmand'
self._mox.StubOutWithMock(self._at_transceiver,
'_post_wardmodem_request')
self._mock_modem_channel.send(at_command).InAnyOrder()
self._at_transceiver._post_wardmodem_request(at_command).InAnyOrder()
self._mox.ReplayAll()
self._at_transceiver._process_mm_at_command(at_command)
self._mox.UnsetStubs()
self._mox.VerifyAll()
def test_successful_single_at_response_modem_wardmodem(self):
"""
Test the case when one AT response is received successfully.
In this case, physical modem command comes first.
"""
at_command = 'AT+commmmmmmmmand'
self._mock_mm_channel.send(at_command)
self._mox.ReplayAll()
self._at_transceiver._process_modem_at_command(at_command)
self._at_transceiver._process_wardmodem_at_command(at_command)
self._mox.VerifyAll()
def test_successful_single_at_response_wardmodem_modem(self):
"""
Test the case when one AT response is received successfully.
In this case, wardmodem command comes first.
"""
at_command = 'AT+commmmmmmmmand'
task_id = 3
self._mock_task_loop.post_task_after_delay(
self._at_transceiver._modem_response_timed_out,
mox.IgnoreArg()).AndReturn(task_id)
self._mock_task_loop.cancel_posted_task(task_id)
self._mock_mm_channel.send(at_command)
self._mox.ReplayAll()
self._at_transceiver._process_wardmodem_at_command(at_command)
self._at_transceiver._process_modem_at_command(at_command)
self._mox.VerifyAll()
def test_mismatched_at_response(self):
"""
Test the case when both responses arrive, but are not identical.
"""
wardmodem_command = 'AT+wardmodem'
modem_command = 'AT+modem'
self._mox.StubOutWithMock(self._at_transceiver,
'_report_verification_failure')
self._at_transceiver._report_verification_failure(
self._at_transceiver.VERIFICATION_FAILED_MISMATCH,
modem_command,
wardmodem_command)
self._mock_mm_channel.send(wardmodem_command)
self._mox.ReplayAll()
self._at_transceiver._process_modem_at_command(modem_command)
self._at_transceiver._process_wardmodem_at_command(wardmodem_command)
self._mox.UnsetStubs()
self._mox.VerifyAll()
def test_modem_response_times_out(self):
"""
Test the case when the physical modem fails to respond.
"""
at_command = 'AT+commmmmmmmmand'
task_id = 3
self._mox.StubOutWithMock(self._at_transceiver,
'_report_verification_failure')
self._mock_task_loop.post_task_after_delay(
self._at_transceiver._modem_response_timed_out,
mox.IgnoreArg()).AndReturn(task_id)
self._at_transceiver._report_verification_failure(
self._at_transceiver.VERIFICATION_FAILED_TIME_OUT,
None,
at_command)
self._mock_mm_channel.send(at_command)
self._mox.ReplayAll()
self._at_transceiver._process_wardmodem_at_command(at_command)
self._at_transceiver._modem_response_timed_out()
self._mox.UnsetStubs()
self._mox.VerifyAll()
def test_multiple_successful_responses(self):
"""
Test the case two wardmodem responses are queued, and then two matching
modem responses are received.
"""
first_at_command = 'AT+first'
second_at_command = 'AT+second'
first_task_id = 3
second_task_id = 4
self._mock_task_loop.post_task_after_delay(
self._at_transceiver._modem_response_timed_out,
mox.IgnoreArg()).AndReturn(first_task_id)
self._mock_task_loop.cancel_posted_task(first_task_id)
self._mock_mm_channel.send(first_at_command)
self._mock_task_loop.post_task_after_delay(
self._at_transceiver._modem_response_timed_out,
mox.IgnoreArg()).AndReturn(second_task_id)
self._mock_task_loop.cancel_posted_task(second_task_id)
self._mock_mm_channel.send(second_at_command)
self._mox.ReplayAll()
self._at_transceiver._process_wardmodem_at_command(first_at_command)
self._at_transceiver._process_wardmodem_at_command(second_at_command)
self._at_transceiver._process_modem_at_command(first_at_command)
self._at_transceiver._process_modem_at_command(second_at_command)
self._mox.VerifyAll()
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
unittest.main()