| # -*- coding: utf-8 -*- |
| # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import datetime |
| import gtk |
| import os |
| import pprint |
| import re |
| import tempfile |
| import time |
| import StringIO |
| import subprocess |
| |
| from autotest_lib.client.cros import factory_setup_modules |
| from cros.factory.event_log import EventLog |
| from cros.factory.test import factory |
| from autotest_lib.client.common_lib import error |
| from autotest_lib.client.cros.audio import audio_helper |
| from autotest_lib.client.cros.camera.camera_preview import CameraPreview |
| from cros.factory.test import state_machine |
| from cros.factory.test.media_util import MediaMonitor |
| from cros.factory.test.media_util import MountedMedia |
| from autotest_lib.client.cros.i2c import usb_to_i2c |
| from autotest_lib.client.cros.rf.config import PluggableConfig |
| from autotest_lib.client.common_lib import utils |
| |
| _MESSAGE_USB = ( |
| 'Please insert the usb stick, adapter board and golden reference.\n' |
| '请插入USB随身碟, 测试板, 良品 以读取测试参数\n') |
| _MESSAGE_PREPARE_PANEL = ( |
| 'Please prepare the next AB panel.\n' |
| 'Be sure all the connector is connected.\n' |
| 'Press ENTER to execute connectivity test.\n' |
| '请连接下一块AB Panel\n' |
| '备妥后按ENTER执行测试\n') |
| _MESSAGE_ENTER_SN_HINT = ('Please scan SN on LCD.\n请扫描本体上S/N:') |
| _MESSAGE_CAMERA_CHECK = ( |
| 'hit TAB to pass and ENTER to fail.\n' + |
| '成功请按 TAB, 错误请按 ENTER.\n') |
| _MESSAGE_PROBING = ( |
| 'Probing components...\n' |
| '测试中...\n') |
| _MESSAGE_PLAYING_AUDIO = ( |
| 'Testing DMIC...\n' |
| '测试麦克风中...请保持安静\n') |
| _MESSAGE_DISPLAY = ( |
| 'Hit TAB if DUT have something displayed.\n' + |
| 'Otherwise, hit ENTER.\n' + |
| '成功在测试品看到画面请按 TAB, 否则请按 ENTER.\n') |
| _MESSAGE_WRITING_LOGS = ( |
| 'Writing logs...\n' |
| '纪录中...\n') |
| _MESSAGE_RESULT_TAB = ( |
| 'Results are listed below.\n' |
| 'Remove the panel after all power is turned off..\n' |
| '测试结果显示如下\n' |
| '确认电源皆断电后, 将Panel移除\n') |
| |
| _TEST_SN_NUMBER = 'TEST-SN-NUMBER' |
| COLOR_MAGENTA = gtk.gdk.color_parse('magenta1') |
| BRIGHTNESS_CONTROL_CMD = ( |
| 'echo %s > /sys/class/backlight/intel_backlight/brightness') |
| |
| # Regular expressions to match audiofuntest message. |
| _AUDIOFUNTEST_STOP_RE = re.compile('^Stop') |
| _AUDIOFUNTEST_SUCCESS_RATE_RE = re.compile('.*rate\s=\s(.*)$') |
| |
| |
| class factory_Connector(state_machine.FactoryStateMachine): |
| version = 8 |
| |
| def setup_tests(self): |
| # Register more states for test procedure with configuration data |
| # loaded from external media device (USB/SD). |
| |
| # Execute board specific commands |
| if 'board_specific' in self.config: |
| self.run_cmds(self.config['board_specific']) |
| |
| self.fixture_config = self.config.get('fixture_control', None) |
| |
| # Setup serial number input widget. |
| self.sn_input_widget = self.make_serial_number_widget( |
| message=_MESSAGE_ENTER_SN_HINT, |
| default_validate_regex=self.config['sn_regex'], |
| on_complete=self.on_sn_complete, |
| on_keypress=self.on_sn_keypress, |
| generate_status_row=True) |
| self.register_state(self.sn_input_widget) |
| |
| if 'display' in self.config: |
| self.setup_display_test() |
| self.register_state( |
| self.display_widget, None, self.perform_display_test) |
| |
| if 'camera' in self.config: |
| self.setup_camera_preview() |
| self.register_state(self.preview_widget, None, |
| self.perform_camera_preview) |
| |
| if 'probing' in self.config: |
| self.setup_probing() |
| self.probing_widget = self.make_decision_widget( |
| message=_MESSAGE_PROBING, key_action_mapping={}) |
| self.register_state(self.probing_widget, None, |
| self.perform_probing) |
| |
| if 'audio' in self.config: |
| self.setup_audio() |
| self.audio_widget = self.make_decision_widget( |
| message=_MESSAGE_PLAYING_AUDIO, key_action_mapping={}) |
| self.register_state(self.audio_widget, None, |
| self.perform_audio) |
| |
| self.writing_widget = self.make_decision_widget( |
| message=_MESSAGE_WRITING_LOGS, |
| key_action_mapping={}, |
| fg_color=COLOR_MAGENTA) |
| self.register_state(self.writing_widget, None, self.write_log) |
| |
| # Setup result tab. |
| self.result_widget = self.make_result_widget( |
| _MESSAGE_RESULT_TAB, |
| key_action_mapping={ |
| gtk.keysyms.Return: (self.advance_state, [])}) |
| # After this state (result), go back to "prepare panel" state. |
| self.register_state(self.result_widget, None, None, |
| self._STATE_PREPARE_PANEL) |
| |
| # Execute commands for special SKU. |
| if 'pre_setup' in self.config: |
| for cmd_tuple in self.config['pre_setup']: |
| cmd, expect_re = cmd_tuple |
| self.run_cmd(cmd, expect_re) |
| |
| def setup_camera_preview(self): |
| self.camera_config = self.config['camera'] |
| factory.log('Camera config is [%s]' % self.camera_config) |
| # Prepare the status row. |
| self._status_rows.append(('camera', 'Camera Preview', True)) |
| self._results_to_check.append('camera') |
| # Setup preview widget |
| self.camera_preview = CameraPreview( |
| key_action_mapping={ |
| gtk.keysyms.Tab: self.pass_preview_test, |
| gtk.keysyms.Return: self.fail_preview_test}, |
| msg=_MESSAGE_CAMERA_CHECK, |
| device_index=int(self.camera_config['DEVICE_INDEX']), |
| width=int(self.camera_config['WIDTH']), |
| height=int(self.camera_config['HEIGHT'])) |
| self.preview_widget = self.camera_preview.widget |
| |
| def pass_preview_test(self): |
| self.camera_preview.capture_stop() |
| factory.log('Preview passed.') |
| self.update_status('camera', True) |
| self.advance_state() |
| |
| def fail_preview_test(self): |
| self.camera_preview.capture_stop() |
| factory.log('Preview failed.') |
| self.update_status('camera', False) |
| self.advance_state() |
| |
| def perform_camera_preview(self): |
| try: |
| if self.camera_config['IS_AUTO']: |
| self.camera_preview.init_device(start_capture=False) |
| ret, cvImg = self.camera_preview.capture_single() |
| self.pass_preview_test() |
| else: |
| self.camera_preview.init_device() |
| except IOError as e: |
| factory.log('Cannot get cammera - %s' % e) |
| self.fail_preview_test() |
| |
| def setup_probing(self): |
| self.probing_config = self.config['probing'] |
| factory.log('Probing config is [%s]' % self.probing_config) |
| if self.is_internal: |
| self.setup_internal_probing() |
| else: |
| self.setup_external_probing() |
| |
| def setup_internal_probing(self): |
| # Prepare the status row. |
| self.probe_list = self.probing_config['items'] |
| for test_name in sorted(self.probe_list.iterkeys()): |
| self._status_rows.append((test_name, test_name, True)) |
| self._results_to_check.append(test_name) |
| |
| def setup_external_probing(self): |
| self.probe_list = self.probing_config['i2c_list'] |
| self.chipset = self.probing_config['chipset'] |
| # Prepare the status row. |
| for test_name, test_label, _ in self.probe_list: |
| self._status_rows.append((test_name, test_label, True)) |
| self._results_to_check.append(test_name) |
| |
| def setup_audio(self): |
| self.audio_config = self.config['audio'] |
| factory.log('Audio config is [%s]' % self.audio_config) |
| self.play_freq = self.audio_config['freq'] |
| self.play_repeat = self.audio_config['repeat'] |
| self.play_tolerance = self.audio_config['tolerance'] |
| self.play_duration = self.audio_config['duration'] |
| self.use_audiofuntest = self.audio_config['audiofuntest'] |
| # Prepare the status row. |
| self._status_rows.append(('audio', 'Audio', False)) |
| self._results_to_check.append('audio') |
| |
| def perform_probing(self): |
| if self.is_internal: |
| self.perform_internal_probing() |
| else: |
| self.perform_external_probing() |
| self.advance_state() |
| |
| def perform_internal_probing(self): |
| for test_name in sorted(self.probe_list.iterkeys()): |
| commands = self.probe_list[test_name] |
| self.update_status(test_name, True) |
| for cmd, expect_re in commands: |
| if not self.run_cmd(cmd, expect_re): |
| self.update_status(test_name, False) |
| |
| def perform_external_probing(self): |
| try: |
| controller = usb_to_i2c.create_i2c_controller(self.chipset) |
| # Turn on the led light to indicate test in progress. |
| # http://www.nxp.com/documents/data_sheet/SC18IM700.pdf |
| controller.send_and_check_status( |
| 98, [0x11, 0x97, 0x80, 0x00, 0x40, 0xE1]) |
| # Probe the peripherals. |
| for test_name, _, i2c_port in self.probe_list: |
| bus_status = controller.send_and_check_status( |
| eval(i2c_port), []) |
| factory.log("[%s] at port [%s] tested, returned %d" % ( |
| test_name, i2c_port, bus_status)) |
| if bus_status == usb_to_i2c.I2CController.I2C_OK: |
| self.update_status(test_name, True) |
| else: |
| self.update_status(test_name, False) |
| except Exception as e: |
| factory.log('Exception - %s' % e) |
| |
| def perform_audio(self): |
| # Repeat several times, if one of them is succeed, mark as a PASS. |
| self.update_status('audio', 'FAILED - freq not match') |
| for i in range(self.play_repeat): |
| error_ret = self.audio_loopback( |
| test_freq=self.play_freq, |
| tolerance=self.play_tolerance, |
| loop_duration=self.play_duration, |
| audiofuntest=self.use_audiofuntest) |
| self.log_to_file.write('On %d audio loopback, got : %s\n' % ( |
| i, pprint.pformat(error_ret))) |
| self._event_log.Log( |
| 'connectivity_audio', ab_serial_number=self.serial_number, |
| repeated=i, audio_result=error_ret) |
| if len(error_ret) == 0: |
| self.update_status('audio', 'PASSED') |
| break |
| else : |
| self.update_status('audio', 'FAILED') |
| |
| self.advance_state() |
| |
| def set_brightness(self, brightness): |
| if not self.run_cmd(BRIGHTNESS_CONTROL_CMD % brightness, '^$'): |
| raise error.TestNAError('Failed to set brightness') |
| |
| def setup_display_test(self): |
| # TODO(itspeter): setup the external display related test. |
| self.display_config = self.config['display'] |
| if 'brightness_control' in self.display_config: |
| self.brightness_control = int( |
| self.display_config['brightness_control']) |
| else: |
| self.brightness_control = None |
| # Prepare the status row. |
| self._status_rows.append(('display', 'Display panel', True)) |
| self._results_to_check.append('display') |
| |
| self.display_widget = self.make_decision_widget( |
| message=_MESSAGE_DISPLAY, |
| key_action_mapping={ |
| gtk.keysyms.Return: (self.end_display_test, [False]), |
| gtk.keysyms.Tab: (self.end_display_test, [True])}) |
| |
| def perform_display_test(self): |
| if self.brightness_control: |
| self.set_brightness(self.brightness_control) |
| |
| def end_display_test(self, result): |
| self.update_status('display', result) |
| if self.brightness_control: |
| self.set_brightness(0) |
| self.advance_state() |
| |
| def on_usb_insert(self, dev_path): |
| if self.current_state == self._STATE_WAIT_USB: |
| self.dev_path = dev_path |
| with MountedMedia(dev_path, 1) as config_dir: |
| # Load configuration. |
| config_path = os.path.join(config_dir, self.config_file) |
| self.config = self.base_config.Read(config_path) |
| if 'is_internal' in self.config: |
| self.is_internal = True |
| else: |
| self.is_internal = False |
| # Configure the items |
| factory.log('Config loaded. Setup the UI...') |
| self.setup_tests() |
| self.advance_state() |
| |
| def on_usb_remove(self, dev_path): |
| if self.current_state != self._STATE_WAIT_USB: |
| raise error.TestNAError('USB removal is not allowed during test') |
| |
| def on_sn_keypress(self, entry, key): |
| if key.keyval == gtk.keysyms.Tab: |
| entry.set_text(_TEST_SN_NUMBER) |
| return True |
| return False |
| |
| def on_sn_complete(self, serial_number): |
| self.serial_number = serial_number |
| self._event_log.Log( |
| 'connectivity_start', ab_serial_number=self.serial_number) |
| self.log_to_file.write('Serial_number : %s\n' % serial_number) |
| self.log_to_file.write('Started at : %s\n' % datetime.datetime.now()) |
| self.update_status('sn', serial_number) |
| self.advance_state() |
| |
| def run_cmds(self, cmd_tuples): |
| '''Executes consecutive commands. |
| |
| Args: |
| cmd_tuples: a list of tuple in following format: |
| (command, expected regular expression) |
| ''' |
| for cmd_tuple in cmd_tuples: |
| cmd, expect_re = cmd_tuple |
| self.run_cmd(cmd, expect_re) |
| |
| def run_cmd(self, cmd, expect_result): |
| '''Runs a command line and compare with expect_result. |
| |
| Args: |
| cmd: The command to execute. |
| expect_result: Expected result in regular expression. |
| |
| Return: |
| True if the result matches. |
| ''' |
| factory.log('Running command [%s]' % cmd) |
| ret = exception_str = '' |
| try: |
| ret = utils.system_output(cmd) |
| factory.log('Command returns [%s, expecting %s]' % ( |
| ret, expect_result)) |
| if re.search(expect_result, ret, re.MULTILINE): |
| self._event_log.Log( |
| 'connectivity_command', ab_serial_number=self.serial_number, |
| status='PASSED', command=cmd, |
| expecting=expect_result, ret=ret) |
| return True |
| except Exception as e: |
| exception_str = '%s' % e |
| factory.log('Command failed with exception - %s' % exception_str) |
| self._event_log.Log( |
| 'connectivity_command', ab_serial_number=self.serial_number, |
| status='FAILED', exception=exception_str, |
| command=cmd, expecting=expect_result, ret=ret) |
| return False |
| |
| def write_log(self): |
| self.generate_final_result() |
| |
| # Executes additional commands to external fixture. |
| if self.fixture_config and 'ending' in self.fixture_config: |
| self.run_cmds(self.fixture_config['ending']) |
| |
| self._event_log.Log( |
| 'connectivity_end', ab_serial_number=self.serial_number, |
| result=self.display_dict) |
| self.log_to_file.write('Result in summary:\n%s\n' % |
| pprint.pformat(self.display_dict)) |
| try: |
| self.write_to_usb(self.serial_number + '.txt', |
| self.log_to_file.getvalue()) |
| except Exception as e: |
| raise error.TestNAError( |
| 'Unable to save current log to USB stick - %s' % e) |
| # Switch to result tab. |
| self.advance_state() |
| |
| def write_to_usb(self, filename, content): |
| with MountedMedia(self.dev_path, 1) as mount_dir: |
| with open(os.path.join(mount_dir, filename), 'a') as f: |
| f.write(content) |
| factory.log('Log wrote with filename[ %s ].' % filename) |
| |
| def prepare_for_next_test(self): |
| """Prepares for the next test cycle.""" |
| if self.fixture_config and 'beginning' in self.fixture_config: |
| # Executes additional commands to external fixture. |
| self.run_cmds(self.fixture_config['beginning']) |
| self.reset_data_for_next_test() |
| self.advance_state() |
| |
| def reset_data_for_next_test(self): |
| """Resets internal data for the next testing cycle.""" |
| self.reset_status_rows() |
| self.log_to_file = StringIO.StringIO() |
| self.sn_input_widget.get_entry().set_text('') |
| factory.log('Data reseted.') |
| |
| def audio_loopback(self, test_freq=1000, loop_duration=1, |
| tolerance=100, audiofuntest=True): |
| """Tests digital mic function. |
| |
| Args: |
| test_freq: the frequency to play and test. |
| loop_duration: the duration in seconds to record. |
| audiofuntest: choose whether testing with audiofuntest |
| Return: |
| List of error messages generated during test. |
| """ |
| if audiofuntest: |
| return self.start_audiofuntest() |
| else: |
| return self.start_audioloop( |
| test_freq=test_freq, |
| tolerance=tolerance, loop_duration=loop_duration) |
| |
| def start_audiofuntest(self): |
| ''' |
| Run audiofuntest, more reliable |
| ''' |
| factory.log('Start audiofuntest') |
| errors = [] |
| audiofuntest_path = os.path.join(self.autodir, 'deps', |
| 'test_tones', 'src', 'audiofuntest') |
| if not (os.path.exists(audiofuntest_path) and |
| os.access(audiofuntest_path, os.X_OK)): |
| raise error.TestError( |
| '%s is not an executable' % audiofuntest_path) |
| |
| sub_proc = subprocess.Popen([audiofuntest_path, '-r', '48000'], |
| stderr=subprocess.PIPE) |
| success_rate = 0.0 |
| while True: |
| line = sub_proc.stderr.readline() |
| factory.log(line) |
| m = _AUDIOFUNTEST_SUCCESS_RATE_RE.match(line) |
| if m: |
| success_rate = float(m.group(1)) |
| factory.log('success_rate = %f' % success_rate) |
| |
| m = _AUDIOFUNTEST_STOP_RE.match(line) |
| if m: |
| self._audioresult = success_rate > 50.0 |
| factory.log(line) |
| break |
| |
| if ( hasattr(self, '_audioresult') and (self._audioresult is False) ): |
| errors.append('Success rate is too low: %.1f\n' % |
| success_rate) |
| return errors |
| |
| def start_audioloop(self, test_freq=1000, loop_duration=1, tolerance=100): |
| ''' |
| If audiofuntest is not work, change to this test. |
| ''' |
| factory.log('Start audioloop') |
| errors = [] |
| audio_helper.set_mixer_controls( |
| [{'name': '"Digital-Mic Capture Switch"', |
| 'value': 'on'}, |
| {'name': '"Digital-Mic Capture Volume"', |
| 'value': '100,100'}, |
| {'name': '"Speaker Playback Volume"', |
| 'value': '100,100'}]) |
| |
| # Callbacks for sound playback and record result check. |
| def playback_sine(): |
| cmd = '%s -n -d synth %d sine %d' % (audio_helper.SOX_PATH, |
| loop_duration, test_freq) |
| utils.system(cmd) |
| |
| def check_loop_output(sox_output): |
| freq = audio_helper.get_rough_freq(sox_output) |
| factory.log('Got freq %d' % freq) |
| if abs(freq - test_freq) > tolerance: |
| errors.append('Frequency not match, expect %d but got %d' % |
| (test_freq, freq)) |
| |
| def record_callback(filename): |
| rec_cmd = 'arecord -D hw:0,0 -d %d -f dat' % loop_duration |
| audio_helper.record_sample(filename, rec_cmd) |
| |
| with tempfile.NamedTemporaryFile(mode='w+t') as noise_file: |
| audio_helper.record_sample(noise_file.name) |
| audio_helper.loopback_test_channels(noise_file.name, |
| self.resultsdir, |
| lambda ch: playback_sine(), |
| check_loop_output, |
| record_callback=record_callback) |
| return errors |
| |
| def run_once(self, config_file): |
| # Initial EventLog |
| self._event_log = EventLog.ForAutoTest() |
| self.serial_number = 'Initialize' |
| |
| factory.log('%s run_once' % self.__class__) |
| # Display dual screen if external display is connected. |
| self.run_cmd('xrandr --auto', '') |
| # Disable power management. |
| self.run_cmd('stop powerd', 'powerd stop') |
| # Initialize variables. |
| self.config_file = config_file |
| self.base_config = PluggableConfig({}) |
| |
| # Set up the USB prompt widgets. |
| self.usb_prompt_widget = self.make_decision_widget( |
| message=_MESSAGE_USB, key_action_mapping=[]) |
| |
| self.prepare_panel_widget = self.make_decision_widget( |
| message=_MESSAGE_PREPARE_PANEL, |
| key_action_mapping={ |
| gtk.keysyms.Return: (self.prepare_for_next_test, [])}) |
| # States after "prepare panel" will be configured in setup_test, after |
| # configuration on external media (USB/SD) is loaded. |
| self._STATE_WAIT_USB = self.register_state(self.usb_prompt_widget) |
| self._STATE_PREPARE_PANEL = self.register_state( |
| self.prepare_panel_widget, None) |
| # Setup the usb monitor. |
| monitor = MediaMonitor() |
| monitor.Start(on_insert=self.on_usb_insert, |
| on_remove=self.on_usb_remove) |
| self.start_state_machine(self._STATE_WAIT_USB) |