| # -*- coding: utf-8 -*- |
| # Copyright (c) 2010 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| |
| # DESCRIPTION : |
| # |
| # This is a factory test to test the audio. Operator will test both record and |
| # playback for headset and built-in audio. Recordings are played back for |
| # confirmation. An additional pre-recorded sample is played to confirm speakers |
| # operate independently |
| |
| |
| import gtk |
| import logging |
| import os |
| import re |
| import sys |
| |
| from autotest_lib.client.bin import test |
| from autotest_lib.client.common_lib import error |
| from autotest_lib.client.common_lib import utils |
| from autotest_lib.client.cros import factory_setup_modules |
| from cros.factory.test import factory |
| from cros.factory.test import ui as ful |
| |
| |
| _LABEL_BIG_SIZE = (280, 60) |
| _LABEL_STATUS_SIZE = (140, 30) |
| _LABEL_START_STR = 'hit SPACE to start each audio test\n' +\ |
| '按空白键开始各项声音测试\n\n' |
| _LABEL_RESPONSE_STR = ful.USER_PASS_FAIL_SELECT_STR + '\n' |
| _SAMPLE_LIST = ['Headset Audio Test', 'Built-in Audio Test'] |
| _VERBOSE = False |
| |
| # Add -D hw:0,0 since default argument does not work properly. |
| # See crosbug.com/p/12330 |
| _CMD_PLAY_AUDIO = 'aplay -D hw:0,0 %s' |
| _CMD_RECORD_AUDIO = 'arecord -D hw:0,0 -f dat -t wav %s' |
| |
| |
| class factory_Audio(test.test): |
| version = 1 |
| |
| def audio_subtest_widget(self, name): |
| vb = gtk.VBox() |
| ebh = gtk.EventBox() |
| ebh.modify_bg(gtk.STATE_NORMAL, ful.LABEL_COLORS[ful.ACTIVE]) |
| ebh.add(ful.make_label(name, size=_LABEL_BIG_SIZE, |
| fg=ful.BLACK)) |
| vb.pack_start(ebh) |
| vb.pack_start(ful.make_vsep(3), False, False) |
| if re.search('Headset', name): |
| lab_str = 'Connect headset to device\n将耳机接上音源孔' |
| else: |
| lab_str = 'Remove headset from device\n将耳机移开音源孔' |
| vb.pack_start(ful.make_label(lab_str, fg=ful.WHITE)) |
| vb.pack_start(ful.make_vsep(3), False, False) |
| instruction = ['Press & hold \'r\' to record', |
| '压住 \'r\' 键开始录音', |
| '[Playback will follow]', |
| '[之后会重播录到的声音]', |
| ''] |
| if self._test_left_right: |
| instruction.extend([ |
| 'Press & hold left-Shift to play from left channel', |
| '压住 左Shift 键,从左声道播放范例', |
| 'Press & hold right-Shift to play from right channel', |
| '压住 右Shift 键,从右声道播放范例']) |
| else: |
| instruction.extend(['Press & hold \'p\' to play sample', |
| '压住 \'p\' 键播放范例']) |
| vb.pack_start(ful.make_label('\n'.join(instruction))) |
| vb.pack_start(ful.make_vsep(3), False, False) |
| vb.pack_start(ful.make_label(ful.USER_PASS_FAIL_SELECT_STR, |
| fg=ful.WHITE)) |
| |
| # Need event box to effect bg color. |
| eb = gtk.EventBox() |
| eb.modify_bg(gtk.STATE_NORMAL, ful.BLACK) |
| eb.add(vb) |
| |
| self._subtest_widget = eb |
| |
| self._test_widget.remove(self._top_level_test_list) |
| self._test_widget.add(self._subtest_widget) |
| self._test_widget.show_all() |
| |
| def close_bgjob(self, sample_name): |
| job = self._bg_job |
| if job: |
| utils.nuke_subprocess(job.sp) |
| utils.join_bg_jobs([job], timeout=1) |
| result = job.result |
| if _VERBOSE and (result.stdout or result.stderr): |
| raise error.CmdError( |
| sample_name, result, |
| 'stdout: %s\nstderr: %s' % (result.stdout, result.stderr)) |
| self._bg_job = None |
| |
| def goto_next_sample(self): |
| if not self._sample_queue: |
| gtk.main_quit() |
| return |
| self._current_sample = self._sample_queue.pop() |
| name = self._current_sample |
| self._status_map[name] = ful.ACTIVE |
| |
| def cleanup_sample(self): |
| factory.log('Inside cleanup_sample') |
| self._test_widget.remove(self._subtest_widget) |
| self._subtest_widget = None |
| self._test_widget.add(self._top_level_test_list) |
| self._test_widget.show_all() |
| self.goto_next_sample() |
| |
| def key_press_callback(self, widget, event): |
| name = self._current_sample |
| if (event.keyval == gtk.keysyms.space and self._subtest_widget is None): |
| # Start subtest. |
| self.audio_subtest_widget(name) |
| # Make sure we are not already recording. We can get repeated events. |
| elif self._active == False: |
| self.close_bgjob(name) |
| cmd = None |
| if event.keyval == ord('r'): |
| # Record via mic. |
| if os.path.isfile('rec.wav'): |
| os.unlink('rec.wav') |
| cmd = _CMD_RECORD_AUDIO % 'rec.wav' |
| else: |
| if self._test_left_right: |
| if event.keyval == gtk.keysyms.Shift_L: |
| cmd = _CMD_PLAY_AUDIO % self._left_audio_sample_path |
| elif event.keyval== gtk.keysyms.Shift_R: |
| cmd= _CMD_PLAY_AUDIO % self._right_audio_sample_path |
| else: |
| if event.keyval == ord('p'): |
| # Playback canned audio. |
| cmd = _CMD_PLAY_AUDIO % self._audio_sample_path |
| if cmd: |
| self._active = True |
| self._bg_job = utils.BgJob(cmd, stderr_level=logging.DEBUG) |
| factory.log("cmd: " + cmd) |
| self._test_widget.queue_draw() |
| return True |
| |
| def key_release_callback(self, widget, event): |
| # Make sure we capture more advanced key events only when |
| # entered a subtest. |
| if self._subtest_widget is None: |
| return True |
| name = self._current_sample |
| if event.keyval == gtk.keysyms.Tab: |
| self._status_map[name] = ful.FAILED |
| self.cleanup_sample() |
| elif event.keyval == gtk.keysyms.Return: |
| self._status_map[name] = ful.PASSED |
| self.cleanup_sample() |
| elif event.keyval == ord('Q'): |
| gtk.main_quit() |
| elif event.keyval == ord('r'): |
| self.close_bgjob(name) |
| cmd = _CMD_PLAY_AUDIO % 'rec.wav' |
| self._bg_job = utils.BgJob(cmd, stderr_level=logging.DEBUG) |
| factory.log("cmd: " + cmd) |
| # Clear active recording state. |
| self._active = False |
| else: |
| if self._test_left_right: |
| stop_playing = (event.keyval == gtk.keysyms.Shift_L or |
| event.keyval == gtk.keysyms.Shift_R) |
| else: |
| stop_playing = event.keyval == ord('p') |
| if stop_playing: |
| self.close_bgjob(name) |
| # Clear active playing state. |
| self._active = False |
| self._test_widget.queue_draw() |
| return True |
| |
| def label_status_expose(self, widget, event, label=None, name=None): |
| status = self._status_map[name] |
| if label: |
| label.set_text(status) |
| label.modify_fg(gtk.STATE_NORMAL, ful.LABEL_COLORS[status]) |
| |
| def make_sample_label_box(self, name): |
| eb = gtk.EventBox() |
| eb.modify_bg(gtk.STATE_NORMAL, ful.BLACK) |
| label_status = ful.make_label(ful.UNTESTED, size=_LABEL_STATUS_SIZE, |
| alignment=(0, 0.5), |
| fg=ful.LABEL_COLORS['UNTESTED']) |
| # Note that expose callback must not tie to the object we are going t |
| # modify. Or it'll cause infinite event loop. |
| expose_cb = lambda *x: self.label_status_expose( |
| *x, **{'name':name, 'label':label_status}) |
| eb.connect('expose_event', expose_cb) |
| label_en = ful.make_label(name, alignment=(1,0.5)) |
| label_sep = ful.make_label(' : ', alignment=(0.5, 0.5)) |
| hbox = gtk.HBox() |
| hbox.pack_end(label_status, False, False) |
| hbox.pack_end(label_sep, False, False) |
| hbox.pack_end(label_en, False, False) |
| eb.add(hbox) |
| return eb |
| |
| def register_callbacks(self, window): |
| window.connect('key-press-event', self.key_press_callback) |
| window.add_events(gtk.gdk.KEY_PRESS_MASK) |
| window.connect('key-release-event', self.key_release_callback) |
| window.add_events(gtk.gdk.KEY_RELEASE_MASK) |
| |
| def run_once(self, audio_sample_path=None, audio_init_volume=None, |
| amixer_init_cmd=None, left_right_audio_sample_pair=None): |
| |
| factory.log('%s run_once' % self.__class__) |
| |
| # Change initial volume. |
| if audio_init_volume: |
| os.system("amixer -c 0 sset Master %d%%" % audio_init_volume) |
| # Allow extra amixer command for init. |
| if amixer_init_cmd: |
| os.system("amixer -c 0 %s" % amixer_init_cmd) |
| |
| # Write recordings in tmpdir. |
| os.chdir(self.tmpdir) |
| |
| self._bg_job = None |
| |
| self._test_left_right = left_right_audio_sample_pair is not None |
| if self._test_left_right: |
| left, right = left_right_audio_sample_pair |
| self._left_audio_sample_path = utils.locate_file( |
| left, base_dir=self.job.autodir) |
| self._right_audio_sample_path = utils.locate_file( |
| right, base_dir=self.job.autodir) |
| if (self._left_audio_sample_path is None or |
| self._right_audio_sample_path is None): |
| raise error.TestFail( |
| 'ERROR: left_right_audio_sample_pair should be a pair of ' |
| 'audio sample for left and right channel test.') |
| else: |
| self._audio_sample_path = utils.locate_file( |
| audio_sample_path, base_dir=self.job.autodir) |
| if self._audio_sample_path is None: |
| raise error.TestFail('ERROR: must provide an audio sample ' |
| 'via audio_sample_path.') |
| |
| self._sample_queue = [x for x in reversed(_SAMPLE_LIST)] |
| self._status_map = dict((n, ful.UNTESTED) for n in _SAMPLE_LIST) |
| # Ensure that we don't try to handle multiple overlapping |
| # keypress actions. Make a note of when we are currently busy |
| # and refuse events during that time. |
| self._active = False |
| |
| prompt_label = ful.make_label(_LABEL_START_STR, alignment=(0.5, 0.5)) |
| |
| self._top_level_test_list = gtk.VBox() |
| self._top_level_test_list.pack_start(prompt_label, False, False) |
| |
| for name in _SAMPLE_LIST: |
| label_box = self.make_sample_label_box(name) |
| self._top_level_test_list.pack_start(label_box, False, False) |
| |
| self._test_widget = gtk.EventBox() |
| self._test_widget.modify_bg(gtk.STATE_NORMAL, ful.BLACK) |
| self._test_widget.add(self._top_level_test_list) |
| |
| self._subtest_widget = None |
| |
| self.goto_next_sample() |
| |
| ful.run_test_widget(self.job, self._test_widget, |
| window_registration_callback=self.register_callbacks) |
| |
| failed_set = set(name for name, status in self._status_map.items() |
| if status is not ful.PASSED) |
| if failed_set: |
| raise error.TestFail('some samples failed (%s)' % |
| ', '.join(failed_set)) |
| |
| factory.log('%s run_once finished' % self.__class__) |