| # Copyright (c) 2017 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Unit tests for server/cros/network/rf_switch/rf_switch.py.""" |
| |
| import unittest |
| |
| from autotest_lib.client.common_lib.test_utils import mock |
| from autotest_lib.server.cros.network.rf_switch import rf_mocks |
| from autotest_lib.server.cros.network.rf_switch import rf_switch |
| from autotest_lib.server.cros.network.rf_switch import scpi |
| |
| |
| class RfSwitchTestCases(unittest.TestCase): |
| """Unit tests for RfSwitch Methods.""" |
| |
| _HOST = '1.2.3.4' |
| |
| def setUp(self): |
| self.god = mock.mock_god(debug=False, fail_fast=True) |
| self.mock_rf_switch = rf_mocks.RfSwitchMock(self._HOST) |
| self.code = 0 |
| self.msg = '' |
| |
| def tearDown(self): |
| self.god.unstub_all() |
| |
| def _populate_stack_for_cmd(self, cmd): |
| """Helper to add call stacks to verify in mock. |
| |
| @param cmd: command to verify |
| """ |
| # monitor send for correct command. |
| self.god.stub_function(self.mock_rf_switch.socket, 'send') |
| self.mock_rf_switch.socket.send.expect_call('%s' % cmd) |
| self.mock_rf_switch.socket.send.expect_call( |
| '%s\n' % scpi.Scpi.CMD_ERROR_CHECK) |
| self.mock_rf_switch.socket.send.expect_call( |
| '%s\n' % rf_switch.RfSwitch._CMD_WAIT) |
| |
| self.god.stub_function_to_return( |
| self.mock_rf_switch.socket, 'recv', '%d, "%s"' % ( |
| self.code, self.msg)) |
| |
| def _populate_stack_for_query_commands(self, cmd, response): |
| """Helper to add call stacks for query commands |
| |
| @param cmd: command to verify |
| @param response: response to mock return |
| """ |
| self.god.stub_function(self.mock_rf_switch.socket, 'send') |
| self.mock_rf_switch.socket.send.expect_call('%s' % cmd) |
| |
| self.god.stub_function_to_return( |
| self.mock_rf_switch.socket, 'recv', response) |
| |
| def test_send_cmd_check_error(self): |
| """Verify send_cmd_check_error.""" |
| test_command = 'This is a command\n' |
| self._populate_stack_for_cmd(test_command) |
| self.mock_rf_switch.send_cmd_check_error(test_command) |
| self.god.check_playback() |
| |
| def test_send_cmd_check_error_throws_error(self): |
| """Verify send_cmd_check_error throws error.""" |
| test_command = 'This is a command' |
| code = 101 |
| msg = 'Error Message' |
| |
| self.god.stub_function_to_return( |
| self.mock_rf_switch.socket, 'recv', '%d, "%s"' % (code, msg)) |
| with self.assertRaises(rf_switch.RfSwitchException): |
| self.mock_rf_switch.send_cmd_check_error(test_command) |
| |
| def test_close_relays(self): |
| """Verify close_relays.""" |
| relays = 'R1:R3' |
| self._populate_stack_for_cmd( |
| '%s (@%s)\n' % (rf_switch.RfSwitch._CMD_CLOSE_RELAYS, relays)) |
| self.mock_rf_switch.close_relays(relays) |
| self.god.check_playback() |
| |
| def test_relays_closed(self): |
| """Verify relays_closed.""" |
| relays = 'R1:R3' |
| status = '1,0,1' |
| cmd = '%s (@%s)\n' % ( |
| rf_switch.RfSwitch._CMD_CHECK_RELAYS_CLOSED, relays) |
| self._populate_stack_for_query_commands(cmd, status) |
| self.mock_rf_switch.relays_closed(relays) |
| self.god.check_playback() |
| |
| def test_open_relays(self): |
| """Verify open_relays.""" |
| relays = 'R1:R3' |
| self._populate_stack_for_cmd( |
| '%s (@%s)\n' % (rf_switch.RfSwitch._CMD_OPEN_RELAYS, relays)) |
| self.mock_rf_switch.open_relays(relays) |
| self.god.check_playback() |
| |
| def test_open_all_relays(self): |
| """Verify open_all_relays.""" |
| self._populate_stack_for_cmd( |
| '%s\n' % rf_switch.RfSwitch._CMD_OPEN_ALL_RELAYS) |
| self.mock_rf_switch.open_all_relays() |
| self.god.check_playback() |
| |
| def test_set_verify_on(self): |
| """Verify set_verify for on.""" |
| relays = 'R1:R3' |
| status = True |
| self._populate_stack_for_cmd( |
| '%s 1,(@%s)\n' % (rf_switch.RfSwitch._CMD_SET_VERIFY, relays)) |
| self.mock_rf_switch.set_verify(relays, status) |
| self.god.check_playback() |
| |
| def test_set_verify_off(self): |
| """Verify set_verify for off.""" |
| relays = 'R1:R3' |
| status = False |
| self._populate_stack_for_cmd( |
| '%s 0,(@%s)\n' % (rf_switch.RfSwitch._CMD_SET_VERIFY, relays)) |
| self.mock_rf_switch.set_verify(relays, status) |
| self.god.check_playback() |
| |
| def test_get_verify(self): |
| """Verify get_verify.""" |
| relays = 'R1:R3' |
| status = '1,0,1' |
| cmd = '%s (@%s)\n' % (rf_switch.RfSwitch._CMD_GET_VERIFY, relays) |
| self._populate_stack_for_query_commands(cmd, status) |
| self.mock_rf_switch.get_verify(relays) |
| self.god.check_playback() |
| |
| def test_set_verify_inverted_true(self): |
| """Verify set_verify_inverted.""" |
| relays = 'R1:R3' |
| status = True |
| cmd = '%s INV,(@%s)\n' % ( |
| rf_switch.RfSwitch._CMD_SET_VERIFY_INVERTED, relays) |
| self._populate_stack_for_cmd(cmd) |
| self.mock_rf_switch.set_verify_inverted(relays, status) |
| self.god.check_playback() |
| |
| def test_set_verify_inverted_false(self): |
| """Verify set_verify_inverted.""" |
| relays = 'R1:R3' |
| status = False |
| cmd = '%s NORM,(@%s)\n' % ( |
| rf_switch.RfSwitch._CMD_SET_VERIFY_INVERTED, relays) |
| self._populate_stack_for_cmd(cmd) |
| self.mock_rf_switch.set_verify_inverted(relays, status) |
| self.god.check_playback() |
| |
| def test_get_verify_inverted(self): |
| """Verify get_verify_inverted.""" |
| relays = 'R1:R3' |
| status = '1,0,1' |
| cmd = '%s (@%s)\n' % ( |
| rf_switch.RfSwitch._CMD_GET_VERIFY_INVERTED, relays) |
| self._populate_stack_for_query_commands(cmd, status) |
| self.mock_rf_switch.get_verify_inverted(relays) |
| self.god.check_playback() |
| |
| def test_get_verify_state(self): |
| """Verify get_verify_state.""" |
| relays = 'R1:R3' |
| status = '1,0,1' |
| cmd = '%s (@%s)\n' % (rf_switch.RfSwitch._CMD_GET_VERIFY_STATE, relays) |
| self._populate_stack_for_query_commands(cmd, status) |
| self.mock_rf_switch.get_verify_state(relays) |
| self.god.check_playback() |
| |
| def test_busy(self): |
| """Verify busy.""" |
| status = '1' |
| cmd = '%s\n' % rf_switch.RfSwitch._CMD_CHECK_BUSY |
| self._populate_stack_for_query_commands(cmd, status) |
| busy = self.mock_rf_switch.busy |
| self.god.check_playback() |
| self.assertTrue(busy) |
| |
| def test_not_busy(self): |
| """Verify busy.""" |
| status = '0' |
| cmd = '%s\n' % rf_switch.RfSwitch._CMD_CHECK_BUSY |
| self._populate_stack_for_query_commands(cmd, status) |
| busy = self.mock_rf_switch.busy |
| self.god.check_playback() |
| self.assertFalse(busy) |
| |
| def test_wait(self): |
| """Verify wait.""" |
| self.god.stub_function(self.mock_rf_switch.socket, 'send') |
| self.mock_rf_switch.socket.send.expect_call( |
| '%s\n' % rf_switch.RfSwitch._CMD_WAIT) |
| self.mock_rf_switch.wait() |
| self.god.check_playback() |
| |
| def test_get_attenuation(self): |
| """Verify get_attenuation.""" |
| ap = 1 |
| attenuation = 100 |
| relays = rf_switch.RfSwitch._AP_ATTENUATOR_RELAYS_SHORT[ap] |
| return_relays = '1,1,0,1,1,0,0' |
| cmd = '%s (@%s)\n' % (rf_switch.RfSwitch._CMD_CHECK_RELAYS_CLOSED, |
| relays) |
| self._populate_stack_for_query_commands(cmd, return_relays) |
| response = self.mock_rf_switch.get_attenuation(ap) |
| self.god.check_playback() |
| self.assertEquals(response, attenuation, |
| 'get_attenuation returned %s, did not match %s' % ( |
| response, attenuation)) |
| |
| def test_set_attenuation(self): |
| """Verify set_attenuation.""" |
| ap = 1 |
| ap_relays = 'k1_49,k1_50,k1_51,k1_52,k1_53,k1_54,R1_9' |
| attenuation = 100 |
| relays_for_attenuation = 'k1_53,k1_52,k1_50,k1_49' |
| |
| self._populate_stack_for_cmd( |
| '%s (@%s)\n' % (rf_switch.RfSwitch._CMD_OPEN_RELAYS, ap_relays)) |
| self._populate_stack_for_cmd('%s (@%s)\n' % ( |
| rf_switch.RfSwitch._CMD_CLOSE_RELAYS, relays_for_attenuation)) |
| self.mock_rf_switch.set_attenuation(ap, attenuation) |
| self.god.check_playback() |
| |
| def test_set_attenuation_all(self): |
| """Verify we can set same attenuation to all.""" |
| |
| # 0 should close all relays |
| for x in xrange(rf_switch.RfSwitch._MAX_ENCLOSURE): |
| relays = ','.join(rf_switch.RfSwitch._AP_ATTENUATOR_RELAYS[x + 1]) |
| reverse_relays = ','.join( |
| rf_switch.RfSwitch._AP_ATTENUATOR_RELAYS[x + 1][::-1]) |
| self._populate_stack_for_cmd( |
| '%s (@%s)\n' % (rf_switch.RfSwitch._CMD_OPEN_RELAYS, relays)) |
| self._populate_stack_for_cmd('%s (@%s)\n' % ( |
| rf_switch.RfSwitch._CMD_CLOSE_RELAYS, reverse_relays)) |
| self.mock_rf_switch.set_attenuation(0, 0) |
| |
| # 127 should open all (close none) |
| for x in xrange(rf_switch.RfSwitch._MAX_ENCLOSURE): |
| relays = ','.join(rf_switch.RfSwitch._AP_ATTENUATOR_RELAYS[x + 1]) |
| self._populate_stack_for_cmd( |
| '%s (@%s)\n' % (rf_switch.RfSwitch._CMD_OPEN_RELAYS, relays)) |
| self._populate_stack_for_cmd('%s (@)\n' % ( |
| rf_switch.RfSwitch._CMD_CLOSE_RELAYS)) |
| self.mock_rf_switch.set_attenuation(0, 127) |
| self.god.check_playback() |
| |
| def test_set_attenuation_raise_error(self): |
| """Verify set_attenuation will raise error.""" |
| with self.assertRaises(ValueError): |
| self.mock_rf_switch.set_attenuation(5, 100) |
| with self.assertRaises(ValueError): |
| self.mock_rf_switch.set_attenuation(-1, 100) |
| |
| def test_connect_ap_client(self): |
| """Verify connect_ap_client sets correct relays.""" |
| relays = 'k1_1,k1_25' |
| ap = 1 |
| client = 1 |
| self._populate_stack_for_cmd( |
| '%s (@%s)\n' % (rf_switch.RfSwitch._CMD_CLOSE_RELAYS, relays)) |
| self.mock_rf_switch.connect_ap_client(ap, client) |
| self.god.check_playback() |
| |
| if __name__ == '__main__': |
| unittest.main() |