| #!/usr/bin/env python |
| |
| # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import at_transceiver |
| |
| import logging |
| import mox |
| import os |
| import unittest |
| |
| import at_channel |
| import modem_configuration |
| import task_loop |
| import wardmodem_exceptions as wme |
| |
| class ATTransceiverTestCase(unittest.TestCase): |
| """ |
| Base test fixture for ATTransceiver class. |
| |
| """ |
| class TestMachine(object): |
| """ Stub test machine used by tests below. """ |
| def test_function(self, _): |
| """ |
| A stub StateMachine API function. |
| |
| wardmodem calls will be placed to this function. |
| |
| @param _: Ignored. |
| |
| """ |
| pass |
| |
| |
| # Needed in a test machine. |
| def get_well_known_name(self): |
| """ Get the well known name of this machine as str. """ |
| return "test_machine" |
| |
| |
| def setUp(self): |
| self._mox = mox.Mox() |
| |
| # Create a temporary pty pair for the ATTransceiver constructor |
| master, slave = os.openpty() |
| |
| self._modem_conf = modem_configuration.ModemConfiguration() |
| self._at_transceiver = at_transceiver.ATTransceiver(slave, |
| self._modem_conf, |
| slave) |
| |
| # Now replace internal objects in _at_transceiver with mocks |
| self._at_transceiver._modem_response_timeout_milliseconds = 0 |
| self._mock_modem_channel = self._mox.CreateMock(at_channel.ATChannel) |
| self._at_transceiver._modem_channel = self._mock_modem_channel |
| self._mock_mm_channel = self._mox.CreateMock(at_channel.ATChannel) |
| self._at_transceiver._mm_channel = self._mock_mm_channel |
| self._mock_task_loop = self._mox.CreateMock(task_loop.TaskLoop) |
| self._at_transceiver._task_loop = self._mock_task_loop |
| |
| # Also empty out the internal maps, so that actual loaded configuration |
| # does not interfere with the test. |
| self._at_transceiver._at_to_wm_action_map = {} |
| self._at_transceiver._wm_response_to_at_map = {} |
| |
| |
| class ATTransceiverCommonTestCase(ATTransceiverTestCase): |
| """ |
| Tests common to all three modes of ATTransceiver. |
| |
| """ |
| |
| def test_successful_mode_selection(self): |
| """ |
| Test that all modes can be selected, when both channels are provided. |
| |
| """ |
| self._at_transceiver.mode = at_transceiver.ATTransceiverMode.WARDMODEM |
| self.assertEqual(self._at_transceiver.mode, |
| at_transceiver.ATTransceiverMode.WARDMODEM) |
| self._at_transceiver.mode = ( |
| at_transceiver.ATTransceiverMode.PASS_THROUGH) |
| self.assertEqual(self._at_transceiver.mode, |
| at_transceiver.ATTransceiverMode.PASS_THROUGH) |
| self._at_transceiver.mode = ( |
| at_transceiver.ATTransceiverMode.SPLIT_VERIFY) |
| self.assertEqual(self._at_transceiver.mode, |
| at_transceiver.ATTransceiverMode.SPLIT_VERIFY) |
| |
| def test_unsuccessful_mode_selection(self): |
| """ |
| Test that only WARDMODEM mode can be selected if the modem channel is |
| missing. |
| |
| """ |
| self._at_transceiver._modem_channel = None |
| self._at_transceiver.mode = at_transceiver.ATTransceiverMode.WARDMODEM |
| self.assertEqual(self._at_transceiver.mode, |
| at_transceiver.ATTransceiverMode.WARDMODEM) |
| self._at_transceiver.mode = ( |
| at_transceiver.ATTransceiverMode.PASS_THROUGH) |
| self.assertEqual(self._at_transceiver.mode, |
| at_transceiver.ATTransceiverMode.WARDMODEM) |
| self._at_transceiver.mode = ( |
| at_transceiver.ATTransceiverMode.SPLIT_VERIFY) |
| self.assertEqual(self._at_transceiver.mode, |
| at_transceiver.ATTransceiverMode.WARDMODEM) |
| |
| |
| def test_update_at_to_wm_action_map(self): |
| """ |
| Test that _at_to_wm_action_map is updated correctly under different |
| scenarios. |
| |
| """ |
| # The diffs if this test fails can be rather long. |
| self.maxDiff = None |
| self._at_transceiver._at_to_wm_action_map = {} |
| |
| # Test initialization |
| raw_map = {'AT1=': ('STATE_MACHINE1', 'function1'), |
| 'AT2=1,2': ('STATE_MACHINE2', 'function2'), |
| 'AT3=*,care,do': ('STATE_MACHINE3', 'function3', (0, 1)), |
| 'AT4?': ('STATE_MACHINE4', 'function4'), |
| 'AT5=': ('STATE_MACHINE5', 'function5', ()), |
| 'AT5=*': ('STATE_MACHINE6', 'function6')} |
| parsed_map = {'AT1=': {(): ('STATE_MACHINE1', 'function1', ())}, |
| 'AT2=': {('1','2'): ('STATE_MACHINE2', 'function2', ())}, |
| 'AT3=': {('*','care','do'): ('STATE_MACHINE3', |
| 'function3', (0, 1))}, |
| 'AT4?': {(): ('STATE_MACHINE4', 'function4', ())}, |
| 'AT5=': {(): ('STATE_MACHINE5', 'function5', ()), |
| ('*',): ('STATE_MACHINE6', 'function6', ())}} |
| |
| self._at_transceiver._update_at_to_wm_action_map(raw_map) |
| self.assertEqual(parsed_map, self._at_transceiver._at_to_wm_action_map) |
| |
| # Test update |
| raw_good_update = {'AT1=': ('STATE_MACHINE7', 'function7'), |
| 'AT5=2': ('STATE_MACHINE8', 'function8', 0), |
| 'AT6?': ('STATE_MACHINE9', 'function9')} |
| parsed_map = {'AT1=': {(): ('STATE_MACHINE7', 'function7', ())}, |
| 'AT2=': {('1','2'): ('STATE_MACHINE2', 'function2', ())}, |
| 'AT3=': {('*','care','do'): ('STATE_MACHINE3', |
| 'function3', (0, 1))}, |
| 'AT4?': {(): ('STATE_MACHINE4', 'function4', ())}, |
| 'AT5=': {(): ('STATE_MACHINE5', 'function5', ()), |
| ('*',): ('STATE_MACHINE6', 'function6', ()), |
| ('2',): ('STATE_MACHINE8', 'function8', (0,))}, |
| 'AT6?': {(): ('STATE_MACHINE9', 'function9', ())}} |
| self._at_transceiver._update_at_to_wm_action_map(raw_good_update) |
| self.assertEqual(parsed_map, self._at_transceiver._at_to_wm_action_map) |
| |
| |
| def test_find_wardmodem_action_for_at(self): |
| """ |
| Setup _at_to_wm_action_map in the test and then test whether we can find |
| actions for AT commands off of that map. |
| |
| """ |
| raw_map = {'AT1=': ('STATE_MACHINE1', 'function1'), |
| 'AT2=1,2': ('STATE_MACHINE2', 'function2'), |
| 'AT3=*,b,c': ('STATE_MACHINE3', 'function3', (0, 1)), |
| 'AT4?': ('STATE_MACHINE4', 'function4'), |
| 'AT5=': ('STATE_MACHINE5', 'function5', ()), |
| 'AT5=*': ('STATE_MACHINE6', 'function6')} |
| self._at_transceiver._update_at_to_wm_action_map(raw_map) |
| |
| self.assertEqual( |
| ('STATE_MACHINE1', 'function1', ()), |
| self._at_transceiver._find_wardmodem_action_for_at('AT1=')) |
| self.assertEqual( |
| ('STATE_MACHINE2', 'function2', ()), |
| self._at_transceiver._find_wardmodem_action_for_at('AT2=1,2')) |
| self.assertEqual( |
| ('STATE_MACHINE3', 'function3', ('a','b')), |
| self._at_transceiver._find_wardmodem_action_for_at('AT3=a,b,c')) |
| self.assertEqual( |
| ('STATE_MACHINE3', 'function3', ('','b')), |
| self._at_transceiver._find_wardmodem_action_for_at('AT3=,b,c')) |
| self.assertEqual( |
| ('STATE_MACHINE5', 'function5', ()), |
| self._at_transceiver._find_wardmodem_action_for_at('AT5=')) |
| self.assertEqual( |
| ('STATE_MACHINE6', 'function6', ()), |
| self._at_transceiver._find_wardmodem_action_for_at('AT5=s')) |
| # Unsuccessful cases |
| self.assertRaises( |
| wme.ATTransceiverException, |
| self._at_transceiver._find_wardmodem_action_for_at, |
| 'DOESNOTEXIST') |
| |
| |
| def test_find_wardmodem_action_for_at_returns_fallback(self): |
| """ |
| Test that when a fallback machine is setup, and unmatched AT command is |
| forwarded to this machine. |
| |
| """ |
| mock_test_machine = self._mox.CreateMock(self.TestMachine) |
| mock_test_machine.get_well_known_name().MultipleTimes().AndReturn( |
| 'FALLBACK_MACHINE') |
| self._mox.ReplayAll() |
| self._at_transceiver.register_state_machine(mock_test_machine) |
| self._at_transceiver.register_fallback_state_machine( |
| mock_test_machine.get_well_known_name(), |
| 'act_on') |
| self.assertEqual( |
| ('FALLBACK_MACHINE', 'act_on', ('DOESNOTEXIST',)), |
| self._at_transceiver._find_wardmodem_action_for_at( |
| 'DOESNOTEXIST')) |
| self._mox.VerifyAll() |
| |
| |
| def test_post_wardmodem_request(self): |
| """ |
| Test that a wardmodem request can be posted successfully end-to-end. |
| |
| """ |
| raw_map = {'AT=*': ('TestMachine', 'test_function', 0)} |
| arg = 'fake_arg' |
| command = 'AT=' + arg |
| mock_test_machine = self._mox.CreateMock(self.TestMachine) |
| self._at_transceiver._update_at_to_wm_action_map(raw_map) |
| mock_test_machine.get_well_known_name().AndReturn('TestMachine') |
| self._mock_task_loop.post_task( |
| self._at_transceiver._execute_state_machine_function, |
| command, mox.IgnoreArg(), mock_test_machine.test_function, |
| arg) |
| |
| self._mox.ReplayAll() |
| self._at_transceiver.register_state_machine(mock_test_machine) |
| self._at_transceiver._post_wardmodem_request(command) |
| self._mox.VerifyAll() |
| |
| |
| def test_update_wm_response_to_at_map(self): |
| """ |
| Test that the wm_response_to_at_map is correctly updated. |
| |
| """ |
| raw_map = {'some_function': 'AT=some_function', |
| 'some_other_function': 'AT=some_other_function'} |
| self._at_transceiver._update_wm_response_to_at_map(raw_map) |
| self.assertEqual(raw_map, |
| self._at_transceiver._wm_response_to_at_map) |
| |
| raw_map = {'some_other_function': 'AT=overwritten_function', |
| 'some_new_function': 'AT=this_is_new_too'} |
| updated_map = {'some_function': 'AT=some_function', |
| 'some_other_function': 'AT=overwritten_function', |
| 'some_new_function': 'AT=this_is_new_too'} |
| self._at_transceiver._update_wm_response_to_at_map(raw_map) |
| self.assertEqual(updated_map, |
| self._at_transceiver._wm_response_to_at_map) |
| |
| |
| def test_construct_at_response(self): |
| """ |
| Test that construct_at_response correctly replaces by actual arguments. |
| |
| """ |
| self.assertEqual( |
| 'AT=arg1,some,arg2', |
| self._at_transceiver._construct_at_response( |
| 'AT=*,some,*', 'arg1','arg2')) |
| self.assertEqual( |
| 'AT=1,some,thing', |
| self._at_transceiver._construct_at_response( |
| 'AT=*,some,thing', 1)) |
| self.assertEqual( |
| 'AT=some,other,thing', |
| self._at_transceiver._construct_at_response( |
| 'AT=some,other,thing')) |
| self.assertEqual( |
| 'AT=needsnone', |
| self._at_transceiver._construct_at_response( |
| 'AT=needsnone', 'butonegiven')) |
| # Unsuccessful cases |
| self.assertRaises( |
| wme.ATTransceiverException, |
| self._at_transceiver._construct_at_response, |
| 'AT=*,needstwo,*', 'onlyonegiven') |
| |
| |
| def test_process_wardmodem_response(self): |
| """ |
| A basic test for process_wardmodem_response. |
| |
| """ |
| self._mox.StubOutWithMock(self._at_transceiver, |
| '_process_wardmodem_at_command') |
| raw_map = {'func1': 'AT=*,given,*', |
| 'func2': 'AT=nothing,needed'} |
| self._at_transceiver._update_wm_response_to_at_map(raw_map) |
| |
| self._at_transceiver._process_wardmodem_at_command('AT=a,given,2') |
| self._at_transceiver._process_wardmodem_at_command('AT=nothing,needed') |
| |
| self._mox.ReplayAll() |
| self._at_transceiver.process_wardmodem_response('func1','a',2) |
| self._at_transceiver.process_wardmodem_response('func2') |
| self._mox.UnsetStubs() |
| self._mox.VerifyAll() |
| |
| |
| class ATTransceiverWardModemTestCase(ATTransceiverTestCase): |
| """ |
| Test ATTransceiver class in the WARDMODEM mode. |
| |
| """ |
| |
| def setUp(self): |
| super(ATTransceiverWardModemTestCase, self).setUp() |
| self._at_transceiver.mode = at_transceiver.ATTransceiverMode.WARDMODEM |
| |
| |
| def test_wardmodem_at_command(self): |
| """ |
| Test the case when AT command is received from wardmodem. |
| |
| """ |
| at_command = 'AT+commmmmmmmmand' |
| self._mock_mm_channel.send(at_command) |
| |
| self._mox.ReplayAll() |
| self._at_transceiver._process_wardmodem_at_command(at_command) |
| self._mox.VerifyAll() |
| |
| |
| def test_mm_at_command(self): |
| """ |
| Test the case when AT command is received from modem manager. |
| |
| """ |
| at_command = 'AT+commmmmmmmmand' |
| self._mox.StubOutWithMock(self._at_transceiver, |
| '_post_wardmodem_request') |
| |
| self._at_transceiver._post_wardmodem_request(at_command) |
| |
| self._mox.ReplayAll() |
| self._at_transceiver._process_mm_at_command(at_command) |
| self._mox.UnsetStubs() |
| self._mox.VerifyAll() |
| |
| |
| class ATTransceiverPassThroughTestCase(ATTransceiverTestCase): |
| """ |
| Test ATTransceiver class in the PASS_THROUGH mode. |
| |
| """ |
| |
| def setUp(self): |
| super(ATTransceiverPassThroughTestCase, self).setUp() |
| self._at_transceiver.mode = ( |
| at_transceiver.ATTransceiverMode.PASS_THROUGH) |
| |
| |
| def test_modem_at_command(self): |
| """ |
| Test the case when AT command received from physical modem. |
| |
| """ |
| at_command = 'AT+commmmmmmmmand' |
| self._mock_mm_channel.send(at_command) |
| |
| self._mox.ReplayAll() |
| self._at_transceiver._process_modem_at_command(at_command) |
| self._mox.VerifyAll() |
| |
| |
| def test_mm_at_command(self): |
| """ |
| Test the case when AT command is received from modem manager. |
| |
| """ |
| at_command = 'AT+commmmmmmmmand' |
| self._mock_modem_channel.send(at_command) |
| |
| self._mox.ReplayAll() |
| self._at_transceiver._process_mm_at_command(at_command) |
| self._mox.VerifyAll() |
| |
| |
| class ATTransceiverSplitVerifyTestCase(ATTransceiverTestCase): |
| """ |
| Test ATTransceiver class in the SPLIT_VERIFY mode. |
| |
| """ |
| |
| def setUp(self): |
| super(ATTransceiverSplitVerifyTestCase, self).setUp() |
| self._at_transceiver.mode = ( |
| at_transceiver.ATTransceiverMode.SPLIT_VERIFY) |
| |
| |
| def test_mm_at_command(self): |
| """ |
| Test that that incoming modem manager command is multiplexed to |
| wardmodem and physical modem. |
| |
| """ |
| at_command = 'AT+commmmmmmmmand' |
| self._mox.StubOutWithMock(self._at_transceiver, |
| '_post_wardmodem_request') |
| self._mock_modem_channel.send(at_command).InAnyOrder() |
| self._at_transceiver._post_wardmodem_request(at_command).InAnyOrder() |
| |
| self._mox.ReplayAll() |
| self._at_transceiver._process_mm_at_command(at_command) |
| self._mox.UnsetStubs() |
| self._mox.VerifyAll() |
| |
| |
| def test_successful_single_at_response_modem_wardmodem(self): |
| """ |
| Test the case when one AT response is received successfully. |
| In this case, physical modem command comes first. |
| |
| """ |
| at_command = 'AT+commmmmmmmmand' |
| self._mock_mm_channel.send(at_command) |
| |
| self._mox.ReplayAll() |
| self._at_transceiver._process_modem_at_command(at_command) |
| self._at_transceiver._process_wardmodem_at_command(at_command) |
| self._mox.VerifyAll() |
| |
| |
| def test_successful_single_at_response_wardmodem_modem(self): |
| """ |
| Test the case when one AT response is received successfully. |
| In this case, wardmodem command comes first. |
| |
| """ |
| at_command = 'AT+commmmmmmmmand' |
| task_id = 3 |
| self._mock_task_loop.post_task_after_delay( |
| self._at_transceiver._modem_response_timed_out, |
| mox.IgnoreArg()).AndReturn(task_id) |
| self._mock_task_loop.cancel_posted_task(task_id) |
| self._mock_mm_channel.send(at_command) |
| |
| self._mox.ReplayAll() |
| self._at_transceiver._process_wardmodem_at_command(at_command) |
| self._at_transceiver._process_modem_at_command(at_command) |
| self._mox.VerifyAll() |
| |
| def test_mismatched_at_response(self): |
| """ |
| Test the case when both responses arrive, but are not identical. |
| |
| """ |
| wardmodem_command = 'AT+wardmodem' |
| modem_command = 'AT+modem' |
| self._mox.StubOutWithMock(self._at_transceiver, |
| '_report_verification_failure') |
| self._at_transceiver._report_verification_failure( |
| self._at_transceiver.VERIFICATION_FAILED_MISMATCH, |
| modem_command, |
| wardmodem_command) |
| self._mock_mm_channel.send(wardmodem_command) |
| |
| self._mox.ReplayAll() |
| self._at_transceiver._process_modem_at_command(modem_command) |
| self._at_transceiver._process_wardmodem_at_command(wardmodem_command) |
| self._mox.UnsetStubs() |
| self._mox.VerifyAll() |
| |
| |
| def test_modem_response_times_out(self): |
| """ |
| Test the case when the physical modem fails to respond. |
| |
| """ |
| at_command = 'AT+commmmmmmmmand' |
| task_id = 3 |
| self._mox.StubOutWithMock(self._at_transceiver, |
| '_report_verification_failure') |
| |
| self._mock_task_loop.post_task_after_delay( |
| self._at_transceiver._modem_response_timed_out, |
| mox.IgnoreArg()).AndReturn(task_id) |
| self._at_transceiver._report_verification_failure( |
| self._at_transceiver.VERIFICATION_FAILED_TIME_OUT, |
| None, |
| at_command) |
| self._mock_mm_channel.send(at_command) |
| |
| self._mox.ReplayAll() |
| self._at_transceiver._process_wardmodem_at_command(at_command) |
| self._at_transceiver._modem_response_timed_out() |
| self._mox.UnsetStubs() |
| self._mox.VerifyAll() |
| |
| |
| def test_multiple_successful_responses(self): |
| """ |
| Test the case two wardmodem responses are queued, and then two matching |
| modem responses are received. |
| |
| """ |
| first_at_command = 'AT+first' |
| second_at_command = 'AT+second' |
| first_task_id = 3 |
| second_task_id = 4 |
| |
| self._mock_task_loop.post_task_after_delay( |
| self._at_transceiver._modem_response_timed_out, |
| mox.IgnoreArg()).AndReturn(first_task_id) |
| self._mock_task_loop.cancel_posted_task(first_task_id) |
| self._mock_mm_channel.send(first_at_command) |
| self._mock_task_loop.post_task_after_delay( |
| self._at_transceiver._modem_response_timed_out, |
| mox.IgnoreArg()).AndReturn(second_task_id) |
| self._mock_task_loop.cancel_posted_task(second_task_id) |
| self._mock_mm_channel.send(second_at_command) |
| |
| self._mox.ReplayAll() |
| self._at_transceiver._process_wardmodem_at_command(first_at_command) |
| self._at_transceiver._process_wardmodem_at_command(second_at_command) |
| self._at_transceiver._process_modem_at_command(first_at_command) |
| self._at_transceiver._process_modem_at_command(second_at_command) |
| self._mox.VerifyAll() |
| |
| |
| if __name__ == '__main__': |
| logging.basicConfig(level=logging.DEBUG) |
| unittest.main() |