| # Copyright (c) 2010 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import copy, logging, os, pprint, re, threading, time, urllib |
| |
| from autotest_lib.client.bin import site_ui_test |
| from autotest_lib.client.common_lib import error, site_httpd, site_ui, utils |
| |
| # HTML templates. |
| _STATIC_CSS =''' |
| form { |
| margin: 0; |
| } |
| td { |
| border-width: 1px; |
| border-style: solid; |
| } |
| th { |
| border-width: 1px; |
| border-style: solid; |
| } |
| .control { |
| background-color: yellow; |
| } |
| .completion { |
| background-color: #999999; |
| border-width: 1px; |
| border-style: dashed; |
| } |
| ''' |
| |
| _RESULT_PASS_CSS =''' |
| td#%s { |
| background-color: green; |
| } |
| ''' |
| |
| _RESULT_FAIL_CSS =''' |
| td#%s { |
| background-color: red; |
| } |
| ''' |
| |
| _HTML_HEADER_TMPL = '''<html> |
| <head><title>AudioVideo_PlaybackRecordSemiAuto Test</title> |
| <style type="text/css"> <!-- |
| %s |
| --> |
| </style> |
| </head> |
| <body>''' |
| _HTML_FOOTER = ''' |
| </body> |
| </html>''' |
| |
| _DEVICE_LIST_INSTRUCTIONS = '''<p> |
| This is the device list test. There should be an entry for each hardware |
| device on the system. The test is a pass if every expected hardware device |
| is listed. |
| ''' |
| |
| _DEVICE_LIST_START = '<div class="device_table">' |
| _DEVICE_LIST_END = '</div>' |
| |
| _PLAYBACK_SECTION_LABEL = '''Playback Devices''' |
| _RECORD_SECTION_LABEL = '''Capture Devices''' |
| |
| _DEVICE_SECTION_START = '''<tr><td><table class="device_section"> |
| <tr> |
| <th>Name</th> |
| <th>Index</th> |
| <th>Channels</th> |
| <th>Is Hardware</th> |
| <th>Default Format</th> |
| <th>Default Sample Rate</th> |
| <th>Ports</th> |
| </tr> |
| ''' |
| _DEVICE_SECTION_END = '</table></td></tr>' |
| _DEVICE_SECTION_ENTRY_TMPL = '''<tr> |
| <td>%(name)s</td> |
| <td>%(index)d</td> |
| <td>%(channels)d</td> |
| <td>%(is_hardware)d</td> |
| <td>%(sample_format)s</td> |
| <td>%(sample_rate)s</td> |
| <td>%(ports)s</td> |
| </tr> |
| ''' |
| |
| _DEVICE_LIST_TEST = ''' |
| <tr><td><table> <tr> |
| <td>Device List Looks Correct?</td> |
| <td class="action" id="summary"><a href="?test=summary&result=pass">pass</a> |
| <a href="?test=summary&result=fail">fail</a> |
| </td> |
| </tr></table></td></tr> |
| ''' |
| |
| _TEST_CONTROL_START = '<table><th>Test description</th><th>Invoke Link</th>' |
| _TEST_CONTROL_END = '</table>' |
| |
| _TEST_CONTROL_ITEM = ''' |
| <tr><td>%s</td><td id="%s" class="control"><a href="%s">invoke</a></td></tr> |
| ''' |
| |
| _TEST_RESULT = '''<h1 class="completion"> |
| Test Result: <a href="%s">PASS</a> <a href="%s">FAIL</a> |
| </h1> |
| ''' |
| _TEST_COMPLETE = '''<h1 class="completion"> |
| End Test: <a href="done">DONE</a></h1> |
| ''' |
| |
| _VOLUME_INSTRUCTIONS = '''<p> |
| This is a volume calibration test. For the hardware device listed below, a |
| 1000Hz test tone will be played, starting from 0 volume, ramping up until the |
| volume to be used by the rest of the tests on this device. |
| |
| <b>IF THIS CAUSES DISCOMFORT TO THE LISTENER, DO NOT CONTINUE WITH THE OTHER |
| PLAYBACK TESTS</b> |
| ''' |
| |
| _VOLUME_TEST_DETAILS = '''<p> |
| Playback will be on %(channels)d channels, and run for 5 seconds. |
| |
| <p> |
| After the whole test is completed, three quick 1000Hz pulses will be played. |
| ''' |
| |
| _PLAYBACK_INSTRUCTIONS = '''<p> |
| This is a playback test. For the hardware device listed below, the following |
| tests sequence will be done once for evey channel configuration listed |
| at the end of the page: |
| <ol> |
| <ol> |
| <li>10 tone test for frequences (HZ): 30, 50, 100, 250, 500, 1000, 5000, |
| 10000, 15000, 20000 |
| <li>14 test tones, following A# Harmonic Minor Scale, up and down. |
| </ol> |
| </ol> |
| |
| <p> |
| After the whole test is completed, three quick 1000Hz pulses will be played. |
| |
| <p> |
| The test is a pass if every tone can be heard. |
| ''' |
| |
| _RECORD_INSTRUCTIONS = '''<p> |
| This is a record test. For the hardware device listed below, a record |
| test will be done. A sample will be recorded from the given device |
| and port, and then played back on the first port of the first playback |
| device listed. |
| |
| <p> |
| Different amplification settings for the mic will be used. Playback |
| volume will be set at the default test volume used during the tone tests. |
| The following tests will be done. |
| |
| <p> |
| At maximum hardware input amplificaiton, for each channel of the device: |
| <ol> |
| <li>A short 1000Hz tone will be played to signal the start of recording. |
| <li>A 500Hz tone will be played to signal the start of playback. |
| <li>A 1.5 second sample will be recorded. |
| </ol> |
| |
| <p> |
| This test sequence will also be repeated with all channels enabled with |
| the mic input amplification set at: |
| <ol> |
| <li>Maximum hardware amplificaiton. |
| <li>50% hardware amplificaiton. |
| <li>input muted. |
| </ol> |
| |
| <p> |
| After the whole test is completed, three quick 1000Hz pulses will be played. |
| |
| <p> |
| The test is a pass if the recording can be heard during playback with |
| reasonable clarity and volume for each non-muted setting. The last test, |
| with the input muted, should yield nothing at playback. |
| ''' |
| |
| # Names for various test webpages. |
| _CONTROL_ENDPOINT = 'control' |
| _LIST_ENDPOINT = 'list' |
| _VOLUME_ENDPOINT = 'volume' |
| _PLAYBACK_ENDPOINT = 'playback' |
| _RECORD_ENDPOINT = 'record' |
| |
| |
| # Configuration for the test program invocation. |
| _TONE_LENGTH_SEC = 0.3 |
| |
| _PACMD_PATH = '/usr/bin/pacmd' |
| _PACAT_PATH = '/usr/bin/pacat' |
| |
| # Regexps for parsing device stanzas. |
| _COUNT_RE = re.compile('>>> (\d+) (source|sink)\(s\) available.') |
| _STANZA_START_RE = re.compile(' (\*| ) index: (\d+)') |
| _NAME_RE = re.compile('\tname:\s+<(.+)>') |
| _FLAGS_RE = re.compile('\tflags:\s+(.+)') |
| _MAX_VOLUME_RE = re.compile('\tvolume steps:\s+(\d+)') |
| _BASE_VOLUME_RE = re.compile('\tbase volume:\s+(\d+)%') |
| _SAMPLE_SPEC_RE = re.compile('\tsample spec:\s+(\S+) (\d+)ch (\d+)Hz') |
| _CHANNEL_MAP_RE = re.compile('\tchannel map:\s+(.+)') |
| _TOP_LEVEL_RE = re.compile('\t\S') |
| _PORTS_RE = re.compile('\tports:') |
| _PORT_SPEC_RE = re.compile('\t\t(\S+): .* \(priority \d+\)') |
| |
| |
| class ToneThread(threading.Thread): |
| """Wraps the running of test_tones in a thread.""" |
| def __init__(self, audio, config): |
| threading.Thread.__init__(self) |
| self.audio = audio |
| self.config = config |
| |
| def run(self): |
| self.audio.run_test_tones(self.config) |
| |
| |
| class VolumeChangeThread(threading.Thread): |
| _WAKE_INTERVAL_SEC = 0.02 |
| |
| def __init__(self, audio, type, index, start_volume, end_volume, period): |
| """Changes the volume to end_volume over period seconds. |
| |
| Volume will be updated as max every 50ms, with a target of reaching max |
| volume for the last 100ms of playback. |
| |
| Args: |
| audio: An instance of the audio object. |
| type: Either "source" or "sink". |
| index: The index value of the specific source or sink to use. |
| start_volume: An integer specifying the start volume. |
| end_volume: An integer specifying the stop volume. |
| period: The period, in seconds, over which to adjust the volume from |
| start_volume to end_volume. |
| """ |
| threading.Thread.__init__(self) |
| self.audio = audio |
| self.type = type |
| self.index = index |
| self.start_volume = start_volume |
| self.end_volume = end_volume |
| self.period = period |
| |
| |
| def run(self): |
| delta = self.end_volume - self.start_volume |
| start = time.time() |
| end = start + self.period - 0.1 # Hit max volume 100ms before end. |
| now = start |
| while now < end: |
| elapsed = now - start |
| new_volume = int(self.start_volume + delta * elapsed / self.period) |
| self.audio.do_set_volume(self.type, self.index, new_volume) |
| time.sleep(self._WAKE_INTERVAL_SEC) |
| now = time.time() |
| self.audio.do_set_volume(self.type, self.index, self.end_volume) |
| |
| |
| class audiovideo_PlaybackRecordSemiAuto(site_ui_test.UITest): |
| version = 1 |
| preserve_srcdir = True |
| crash_handling_enabled = False |
| |
| def default_tone_config(self): |
| return { 'type': 'tone', |
| 'frequency': 1000, |
| 'tone_length_sec': 0.5, |
| 'channels': 2, |
| 'active_channel': None |
| } |
| |
| |
| def pacmd(self, cmd): |
| """ |
| Wrap a shell command within the necessary environment setup to get |
| access to the PulseAudio daemon. |
| """ |
| cmd = 'su chronos -c "%s"' % cmd |
| cmd = 'PULSE_RUNTIME_PATH=/var/run/pulse ' + cmd |
| return cmd |
| |
| |
| def setup(self): |
| os.chdir(self.srcdir) |
| utils.system('make clean') |
| utils.system('make') |
| |
| |
| def initialize(self, creds = '$default'): |
| self._playback_devices = self.enumerate_playback_devices() |
| self._record_devices = self.enumerate_record_devices() |
| self._test_tones_path = os.path.join(self.srcdir, "test_tones") |
| if not (os.path.exists(self._test_tones_path) and |
| os.access(self._test_tones_path, os.X_OK)): |
| raise error.TestError( |
| '%s is not an executable' % self._test_tones_path) |
| self._pp = pprint.PrettyPrinter() |
| logging.info(self._pp.pformat(self._playback_devices)) |
| logging.info(self._pp.pformat(self._record_devices)) |
| |
| # Test state. |
| self._running_test = None |
| self._results = {} |
| |
| # Run test server. |
| self._server_root = 'http://localhost:8000/' |
| self._testServer = site_httpd.HTTPListener(port=8000, |
| docroot=self.bindir) |
| self._testServer.run() |
| site_ui_test.UITest.initialize(self, creds) |
| |
| |
| def cleanup(self): |
| self._testServer.stop() |
| site_ui_test.UITest.cleanup(self) |
| |
| |
| def run_once(self, timeout=10000): |
| self._testServer.add_url_handler( |
| '/%s' % _CONTROL_ENDPOINT, |
| lambda server, form, o=self: o.handle_control(server, form)) |
| self._testServer.add_url_handler( |
| '/%s' % _LIST_ENDPOINT, |
| lambda server, form, o=self: o.handle_list(server, form)) |
| self._testServer.add_url_handler( |
| '/%s' % _VOLUME_ENDPOINT, |
| lambda server, form, o=self: o.handle_volume(server, form)) |
| self._testServer.add_url_handler( |
| '/%s' % _PLAYBACK_ENDPOINT, |
| lambda server, form, o=self: o.handle_playback(server, form)) |
| self._testServer.add_url_handler( |
| '/%s' % _RECORD_ENDPOINT, |
| lambda server, form, o=self: o.handle_record(server, form)) |
| |
| latch = self._testServer.add_wait_url('/done') |
| try: |
| session = site_ui.ChromeSession( |
| self._server_root + _CONTROL_ENDPOINT) |
| logging.debug('Chrome session started.') |
| |
| latch.wait(timeout) |
| if not latch.is_set(): |
| raise error.TestFail('Timeout.') |
| |
| expected_num_tests = self.expected_num_tests() |
| finished_tests = len(self._results) |
| results = self._pp.pformat(self._results) |
| if finished_tests != expected_num_tests: |
| raise error.TestFail( |
| 'Expected %d test results, found %d. Results %s' % ( |
| expected_num_tests, finished_tests, results)) |
| |
| logging.info('result = ' + results) |
| failed_tests = [] |
| for key, value in self._results.items(): |
| # TODO(ajwong): YOU NEED TO MAKE THIS ITERATION CORRECT. |
| if value != 'pass': |
| failed_tests.append((key, value)) |
| |
| if len(failed_tests): |
| raise error.TestFail( |
| 'User indicated test failure(s). Failed: %s' % |
| self._pp.pformat(failed_tests)) |
| finally: |
| session.close() |
| |
| |
| def get_pass_fail_div(self, endpoint, dict): |
| """Geneates HTML for a pass-fail link to finish a test case.""" |
| dict['result'] = 'pass' |
| pass_url = '%s?%s' % (endpoint, urllib.urlencode(dict)) |
| dict['result'] = 'fail' |
| fail_url = '%s?%s' % (endpoint, urllib.urlencode(dict)) |
| return _TEST_RESULT % (pass_url, fail_url) |
| |
| |
| def handle_list(self, server, args): |
| """Handles the list test endpoint. |
| |
| Prints out a list of all hardware devices found by pulseaudio. |
| """ |
| self.wait_for_current_test() |
| |
| server.wfile.write(_HTML_HEADER_TMPL % _STATIC_CSS) |
| |
| test_data = { 'test': _LIST_ENDPOINT, 'device': 0, 'port': 0 } |
| server.wfile.write( |
| self.get_pass_fail_div(_CONTROL_ENDPOINT, test_data)) |
| |
| server.wfile.write(_DEVICE_LIST_INSTRUCTIONS) |
| |
| # Output device summary. |
| server.wfile.write(_DEVICE_LIST_START) |
| |
| server.wfile.write(_PLAYBACK_SECTION_LABEL) |
| server.wfile.write(_DEVICE_SECTION_START) |
| for device in self._playback_devices['info']: |
| if device['is_hardware']: |
| server.wfile.write(_DEVICE_SECTION_ENTRY_TMPL % device) |
| server.wfile.write(_DEVICE_SECTION_END) |
| |
| server.wfile.write(_RECORD_SECTION_LABEL) |
| server.wfile.write(_DEVICE_SECTION_START) |
| for device in self._record_devices['info']: |
| if device['is_hardware']: |
| server.wfile.write(_DEVICE_SECTION_ENTRY_TMPL % device) |
| server.wfile.write(_DEVICE_SECTION_END) |
| |
| server.wfile.write(_DEVICE_LIST_END) |
| |
| # End Page. |
| server.wfile.write(_HTML_FOOTER) |
| |
| |
| def handle_volume(self, server, args): |
| """Handles the volume test point. |
| |
| Performs a volume calibration test on the device. This is separated |
| from the normal playback tests as a safety. This test should be run |
| before the playback test to make sure the test volume isn't dangerous |
| to either listener or equipment. |
| """ |
| self.wait_for_current_test() |
| |
| (device_num, port_num, device, port) = self.get_device_info(args, |
| self._playback_devices) |
| |
| server.wfile.write(_HTML_HEADER_TMPL % _STATIC_CSS) |
| |
| test_data = { |
| 'test': _VOLUME_ENDPOINT, |
| 'device': device_num, |
| 'port': port_num |
| } |
| server.wfile.write( |
| self.get_pass_fail_div(_CONTROL_ENDPOINT, test_data)) |
| |
| server.wfile.write(_VOLUME_INSTRUCTIONS) |
| |
| self.render_single_device_summary(server, device) |
| |
| server.wfile.write(_VOLUME_TEST_DETAILS % device) |
| if device.has_key('channel_map'): |
| server.wfile.write('<p>Channels are: %s' % |
| self._pp.pformat(device['channel_map'])) |
| |
| # End Page. |
| server.wfile.write(_HTML_FOOTER) |
| |
| self._running_test = threading.Thread( |
| target=lambda d=device,p=port: self.do_volume_test(d,p)) |
| self._running_test.start() |
| |
| |
| def handle_playback(self, server, args): |
| """Handles the playback test endpoint. |
| |
| Performs a playback test on the given device and port. |
| """ |
| self.wait_for_current_test() |
| |
| (device_num, port_num, device, port) = self.get_device_info(args, |
| self._playback_devices) |
| |
| server.wfile.write(_HTML_HEADER_TMPL % _STATIC_CSS) |
| |
| test_data = { |
| 'test': _PLAYBACK_ENDPOINT, |
| 'device': device_num, |
| 'port': port_num |
| } |
| server.wfile.write( |
| self.get_pass_fail_div(_CONTROL_ENDPOINT, test_data)) |
| |
| server.wfile.write(_PLAYBACK_INSTRUCTIONS) |
| |
| self.render_single_device_summary(server, device) |
| self.render_channel_test_order(server, device, port) |
| |
| # End Page. |
| server.wfile.write(_HTML_FOOTER) |
| |
| self.wait_for_current_test() |
| self._running_test = threading.Thread( |
| target=lambda d=device,p=port: self.do_playback_test(d,p)) |
| self._running_test.start() |
| |
| |
| def handle_record(self, server, args): |
| """Handles the playback test endpoint. |
| |
| Performs a record test on the given device and port. |
| """ |
| self.wait_for_current_test() |
| |
| (device_num, port_num, device, port) = self.get_device_info(args, |
| self._record_devices) |
| |
| server.wfile.write(_HTML_HEADER_TMPL % _STATIC_CSS) |
| |
| test_data = { |
| 'test': _RECORD_ENDPOINT, |
| 'device': device_num, |
| 'port': port_num |
| } |
| server.wfile.write( |
| self.get_pass_fail_div(_CONTROL_ENDPOINT, test_data)) |
| |
| server.wfile.write(_RECORD_INSTRUCTIONS) |
| |
| self.render_single_device_summary(server, device) |
| self.render_channel_test_order(server, device, port) |
| |
| # End Page. |
| server.wfile.write(_HTML_FOOTER) |
| |
| self.wait_for_current_test() |
| self._running_test = threading.Thread( |
| target=lambda d=device,p=port: self.do_record_test(d,p)) |
| self._running_test.start() |
| |
| |
| def expected_num_tests(self): |
| """Returns the expected number of tests to have been run.""" |
| expected_tests = 1 # For the device list test. |
| |
| # There is a volume calibration test, and a test tone test for |
| # each port on a playback device. |
| for device in self._playback_devices['info']: |
| if device['is_hardware']: |
| num_ports = len(device['ports']) |
| if num_ports > 0: |
| expected_tests += 2 * num_ports |
| else: |
| expected_tests += 2 |
| |
| # There is a one record/playback test per record device. |
| for device in self._record_devices['info']: |
| if device['is_hardware']: |
| num_ports = len(device['ports']) |
| if num_ports > 0: |
| expected_tests += num_ports |
| else: |
| expected_tests += 1 |
| return expected_tests |
| |
| |
| def handle_control(self, server, args): |
| """Handles GET request to the test control page.""" |
| self.add_results(args) |
| |
| css = '%s%s' % (_STATIC_CSS, self.get_result_css(self._results)) |
| |
| server.wfile.write(_HTML_HEADER_TMPL % css) |
| |
| server.wfile.write(_TEST_COMPLETE) |
| |
| # Output list of tests tests. |
| server.wfile.write(_TEST_CONTROL_START) |
| |
| server.wfile.write(_TEST_CONTROL_ITEM % ( |
| 'Device List', |
| self.get_test_key(_LIST_ENDPOINT), |
| _LIST_ENDPOINT)) |
| |
| for device_num in xrange(0, len(self._playback_devices['info'])): |
| device = self._playback_devices['info'][device_num] |
| if device['is_hardware']: |
| if len(device['ports']) > 0: |
| for port_num in xrange(0, len(device['ports'])): |
| server.wfile.write( |
| self.get_volume_item(device_num, port_num)) |
| server.wfile.write( |
| self.get_playback_item(device_num, port_num)) |
| else: |
| server.wfile.write(self.get_volume_item(device_num)) |
| server.wfile.write(self.get_playback_item(device_num)) |
| |
| for device_num in xrange(0, len(self._record_devices['info'])): |
| device = self._record_devices['info'][device_num] |
| if device['is_hardware']: |
| if len(device['ports']) > 0: |
| for port_num in xrange(0, len(device['ports'])): |
| server.wfile.write( |
| self.get_record_item(device_num, port_num)) |
| else: |
| server.wfile.write(self.get_record_item(device_num)) |
| server.wfile.write(_TEST_CONTROL_END) |
| |
| # End Page. |
| server.wfile.write(_HTML_FOOTER) |
| |
| |
| def render_single_device_summary(self, server, device): |
| """Output a HTML table with information on a single device""" |
| server.wfile.write(_DEVICE_LIST_START) |
| server.wfile.write(_DEVICE_SECTION_START) |
| server.wfile.write(_DEVICE_SECTION_ENTRY_TMPL % device) |
| server.wfile.write(_DEVICE_SECTION_END) |
| server.wfile.write(_DEVICE_LIST_END) |
| |
| |
| def render_channel_test_order(self, server, device, port): |
| """Output HTML a table with device channel ordering info.""" |
| if port != None: |
| server.wfile.write('<p>Active port on device: %s' % port) |
| else: |
| server.wfile.write('<p>Use default (only) port.') |
| |
| server.wfile.write('<p>Channels will be tested in this order:<ol>') |
| for channel in xrange(0, device['channels']): |
| if device.has_key('channel_map'): |
| server.wfile.write('<li>%s' % device['channel_map'][channel]) |
| else: |
| server.wfile.write('<li>%d' % channel) |
| |
| server.wfile.write('<li>All channels') |
| server.wfile.write('</ol>') |
| |
| |
| def get_device_info(self, args, devices): |
| """Translate CGI parameters into a tuple of values. |
| |
| Extracts the device, and port arugments from the the args dictionary. |
| Those are the device_num, and port_num indexes. These indexes are |
| used to get information from the devices dictionary. The extracted |
| values are returned in a 4-tuple. |
| |
| If port_nume = None, then port_info is None. |
| If device_num = None, then the first device will be returned. |
| |
| Returns: |
| (device_num, port_num, device_info, port_info) |
| """ |
| device_val = args['device'][0] |
| port_val = args['port'][0] |
| |
| device_num = 0 # Default to first device if none is given. |
| port_num = None |
| if device_val != 'None': |
| device_num = int(device_val) |
| if port_val != 'None': |
| port_num = int(port_val) |
| |
| device = devices['info'][device_num] |
| port = None |
| if port_num is not None and port_num >= 0: |
| port = device['ports'][port_num] |
| return (device_num, port_num, device, port) |
| |
| |
| def get_result_css(self, results): |
| """Color the test invocation links based on the pass/fail result.""" |
| stanzas = [] |
| for key in results.keys(): |
| if results[key] == 'pass': |
| stanzas.append(_RESULT_PASS_CSS % key) |
| elif results[key] == 'fail': |
| stanzas.append(_RESULT_FAIL_CSS % key) |
| return '\n'.join(stanzas) |
| |
| |
| def get_test_key(self, test, device=None, port=None): |
| """Generate a string represeting the test case.""" |
| return '%s-%s-%s' % (test, device, port) |
| |
| |
| def add_results(self, args): |
| """Process CGI arguments for the test result, and record it.""" |
| if 'test' not in args: |
| return |
| |
| key = self.get_test_key(args['test'][0], |
| args['device'][0], |
| args['port'][0]) |
| self._results[key] = args['result'][0] |
| |
| |
| def wait_for_current_test(self): |
| """Used to prevent multiple tests from running at once.""" |
| if self._running_test is not None: |
| self._running_test.join() |
| |
| |
| def get_volume_item(self, device_num, port_num=None): |
| """Geneates HTML for a volume test invocation table entry.""" |
| device = self._playback_devices['info'][device_num] |
| return self.get_test_item(_VOLUME_ENDPOINT, device, device_num, |
| port_num) |
| |
| |
| def get_playback_item(self, device_num, port_num=None): |
| """Geneates HTML for a playback test invocation table entry.""" |
| device = self._playback_devices['info'][device_num] |
| return self.get_test_item(_PLAYBACK_ENDPOINT, device, device_num, |
| port_num) |
| |
| |
| def get_record_item(self, device_num, port_num=None): |
| """Geneates HTML for a record test invocation table entry.""" |
| device = self._record_devices['info'][device_num] |
| return self.get_test_item(_RECORD_ENDPOINT, device, device_num, |
| port_num) |
| |
| |
| def get_test_item(self, endpoint, device, device_num, port_num=None): |
| """Helper function to create test invokation table entries""" |
| args = { 'device': device_num, 'port': port_num } |
| if port_num is not None: |
| description = '%s on %s, port %s' % ( |
| endpoint, device['name'], device['ports'][port_num]) |
| else: |
| description = '%s on %s, only port' % ( |
| endpoint, device['name']) |
| invoke_url = '%s?%s' % (endpoint, urllib.urlencode(args)) |
| return _TEST_CONTROL_ITEM % (description, |
| self.get_test_key(endpoint, device_num, |
| port_num), |
| invoke_url) |
| |
| |
| def add_port(self, port_list, line): |
| """Helper function for parsing the the port field.""" |
| m = _PORT_SPEC_RE.match(line) |
| if m is not None: |
| port_list.append(m.group(1)) |
| |
| |
| def merge_sinkinfo_line(self, current_sink, line): |
| """Helper function for parsing the lines in a sink description.""" |
| m = _NAME_RE.match(line) |
| if m is not None: |
| current_sink['name'] = m.group(1) |
| |
| m = _FLAGS_RE.match(line) |
| if m is not None: |
| flags = m.group(1) |
| current_sink['is_hardware'] = flags.find('HARDWARE') != -1 |
| current_sink['can_mute'] = flags.find('HW_MUTE_CTRL') != -1 |
| |
| m = _MAX_VOLUME_RE.match(line) |
| if m is not None: |
| current_sink['max_volume'] = int(m.group(1)) |
| |
| m = _BASE_VOLUME_RE.match(line) |
| if m is not None: |
| current_sink['base_volume_percent'] = int(m.group(1)) |
| |
| m = _SAMPLE_SPEC_RE.match(line) |
| if m is not None: |
| current_sink['sample_format'] = m.group(1) |
| current_sink['channels'] = int(m.group(2)) |
| current_sink['sample_rate'] = int(m.group(3)) |
| |
| m = _CHANNEL_MAP_RE.match(line) |
| if m is not None: |
| channel_map = [] |
| for channel in m.group(1).split(','): |
| channel_map.append(channel) |
| current_sink['channel_map'] = channel_map |
| |
| |
| def parse_device_info(self, device_info_output): |
| """Parses the output of a pacmd list-sources or list-sinks call.""" |
| device_info = { 'info' : [] } |
| current_device = None |
| port_parsing_mode = False |
| for line in device_info_output.split('\n'): |
| # Leave port_parsing_mode if we find a top-level attribute. |
| if port_parsing_mode and _TOP_LEVEL_RE.match(line) is not None: |
| port_parsing_mode = False |
| |
| # Grab the number of devices. |
| m = _COUNT_RE.match(line) |
| if m is not None: |
| device_info['num_devices'] = int(m.group(1)) |
| |
| # Parse the device stanza. |
| m = _STANZA_START_RE.match(line) |
| if m is not None: |
| current_device = {} |
| current_device['index'] = int(m.group(2)) |
| current_device['ports'] = [] |
| device_info['info'].append(current_device) |
| |
| if current_device is not None: |
| # Enter port_parsing_mode if we find the ports line. |
| if _PORTS_RE.match(line) is not None: |
| port_parsing_mode = True |
| elif port_parsing_mode: |
| self.add_port(current_device['ports'], line) |
| else: |
| self.merge_sinkinfo_line(current_device, line) |
| return device_info |
| |
| |
| def enumerate_playback_devices(self): |
| """Queries pulseaudio for all available sinks. |
| |
| Retruns: |
| A dictionary with the number of devices found, and the |
| parsed output of the pacmd call. |
| """ |
| list_sinks_output = self.do_pacmd('list-sinks') |
| device_info = self.parse_device_info(list_sinks_output) |
| if device_info['num_devices'] != len(device_info['info']): |
| raise error.TestError('Expected %d devices, parsed %d' % |
| (device_info['num_devices'], len(device_info['info']))) |
| return device_info |
| |
| |
| def enumerate_record_devices(self): |
| """Queries pulseaudio for all available sources. |
| |
| Retruns: |
| A dictionary with the number of devices found, and the |
| parsed output of the pacmd call. |
| """ |
| list_sources_output = self.do_pacmd('list-sources') |
| device_info = self.parse_device_info(list_sources_output) |
| if device_info['num_devices'] != len(device_info['info']): |
| raise error.TestError('Expected %d devices, parsed %d' % |
| (device_info['num_devices'], len(device_info['info']))) |
| return device_info |
| |
| |
| def set_default_device_and_port(self, type, device, port): |
| """Sets the default source or sink for Pulseaudio. |
| |
| Args: |
| type: Either 'sink' or 'source' |
| device: A dictionary with the parsed device information. |
| port: The name of the device port to use. Use None for the default. |
| """ |
| self.do_pacmd('set-default-%s %d' % (type, device['index'])) |
| logging.info( |
| '* Testing device %d (%s)' % (device['index'], device['name'])) |
| |
| if port is not None: |
| self.do_pacmd('set-%s-port %d %s' % (type, device['index'], port)) |
| logging.info('-- setting port %s' % port) |
| |
| |
| def do_signal_test_end(self): |
| """Play 3 short 1000Hz tones to signal a test case's completion. |
| |
| Playback is done on whatever the current default device is. |
| """ |
| config = self.default_tone_config() |
| config['tone_length_sec'] = 0.3 |
| self.play_tone(config, 1000) |
| self.play_tone(config, 1000) |
| self.play_tone(config, 1000) |
| |
| |
| def do_record_test(self, device, port): |
| """Performs a record test for the given device and port. |
| |
| This sets the default playback device is set to whatever device |
| is returned first in enumerate_playback_devices(). The playback |
| device is set to use its first port, unmuted, and set to the |
| test_volume. |
| |
| For each channel on the given device and port, a sample is recorded |
| and played-back at max source volume. Then once again with all |
| channel enabled. |
| |
| Next, a sample is taken at base volume (no amplification) if |
| that is available, and played back. |
| |
| This is followed by a sample at 1/2 amplifcation, and again with |
| the record device muted. |
| |
| Args: |
| device: device info dictionary gotten from |
| enumerate_record devices() |
| port: String with the name of the port to use on the device. |
| Can be None if the device does not have multiple ports. |
| """ |
| # Configure the playback device to something normal. |
| playback_device = self._playback_devices['info'][0] |
| playback_volume = self.get_test_volume(playback_device) |
| playback_port = None |
| if len(playback_device['ports']): |
| playback_port = playback_device['ports'][0] |
| self.set_default_device_and_port('sink', playback_device, |
| playback_port) |
| self.do_set_mute('sink', playback_device['index'], False) |
| self.do_set_volume('sink', playback_device['index'], playback_volume) |
| |
| # Set record device. |
| self.set_default_device_and_port('source', device, port) |
| |
| # Set to max hardware amplification volume. |
| self.do_set_volume('source', device['index'], device['max_volume']) |
| self.do_set_mute('source', device['index'], False) |
| |
| # Record from each channel, then from all channels. |
| for channel in xrange(0, device['channels']): |
| logging.info('-- record max vol channel %s' % |
| device['channel_map'][channel]) |
| self.record_playback_sample(device, channel) |
| |
| # Try recording at max, un-amped, 50% amp, and mute volumes. |
| logging.info('-- record max vol all channels') |
| self.record_playback_sample(device, None) |
| |
| # If there's no base_volume_percent, then guess that |
| # half-amplication is just 1/2 the max_volume. |
| half_amp_volume = device['max_volume'] / 2.0 |
| if device.has_key('base_volume_percent'): |
| base_volume = (device['max_volume'] * |
| device['base_volume_percent'] / 100.0) |
| half_amp_volume = (base_volume + |
| (device['max_volume'] - base_volume) / 2) |
| logging.info('-- record unamplified all channels') |
| self.do_set_volume('source', device['index'], base_volume) |
| self.record_playback_sample(device, None) |
| else: |
| logging.info('[Driver does to export unamplified volume level. ' |
| 'Skipping test.]') |
| |
| logging.info('-- record half-amp volume all channels') |
| self.do_set_volume('source', device['index'], half_amp_volume) |
| self.record_playback_sample(device, None) |
| |
| logging.info('-- record muted all channels') |
| if device['can_mute']: |
| self.do_set_mute('source', device['index'], True) |
| self.do_set_volume('source', device['index'], device['max_volume']) |
| else: |
| logging.info('[No hardware mute. Setting volume to 0.]') |
| self.do_set_volume('source', device['index'], 0) |
| self.record_playback_sample(device, None) |
| |
| self.do_signal_test_end() |
| |
| |
| def record_playback_sample(self, device, channel, duration=1.5): |
| """Records a sample from the default input device and plays it back. |
| |
| Args: |
| device: device info dictionary gotten from |
| enumerate_record devices() |
| channel: Which channel to record from. "None" to specify all. |
| duration: How long to record in seconds. |
| """ |
| # Record a sample. |
| try: |
| tmpfile = os.path.join(self.tmpdir, os.tmpnam()) |
| record_args = '' |
| if channel is not None: |
| record_args = ('--channels 1 --channel-map %s' % |
| device['channel_map'][channel]) |
| cmd = '%s -r %s %s' % (_PACAT_PATH, record_args, tmpfile) |
| logging.info('running %s' % cmd) |
| |
| signal_config = self.default_tone_config() |
| signal_config['tone_length_sec'] = 0.3 |
| self.play_tone(signal_config, 1000) # Signal record start. |
| logging.info('Record now (%fs)' % duration) |
| job = utils.BgJob(cmd) |
| time.sleep(duration) |
| utils.nuke_subprocess(job.sp) |
| |
| # Job should be dead already, so join with a very short timeout. |
| utils.join_bg_jobs([job], timeout=1) |
| result = job.result |
| if result.stdout or result.stderr: |
| raise error.CmdError( |
| cmd, result, |
| 'stdout: %s\nstderr: %s' % (result.stdout, result.stderr)) |
| |
| # Playback the sample. |
| self.play_tone(signal_config, 500) # Signal playback start. |
| cmd = '%s -p %s %s' % (_PACAT_PATH, record_args, tmpfile) |
| logging.info('Playing back sample') |
| utils.system(self.pacmd(cmd)) |
| |
| # TODO(ajwong): Try analyzing the sample using sox stats. |
| # Example command: |
| # |
| # sox -c $channel -r $rate -e $format -b $bit $tmpfile -n stat |
| # |
| # Then look at the "RMS amplitude" in the output and make sure |
| # it's above some sane level. |
| # |
| # Optionally, we can denoise it first with something like. |
| # |
| # sox $tmpfile -n trim 0 1 noiseprof | |
| # sox $tmpfile reduced-$tmpfile.wav noisered |
| # |
| # To try and make sure we aren't picking up bad nose. Then run |
| # the stats on the filtered file. |
| finally: |
| if os.path.isfile(tmpfile): |
| os.unlink(tmpfile) |
| |
| |
| def do_volume_test(self, device, port): |
| """Runs a volume calibration test on the given device. |
| |
| Args: |
| device: device info dictionary gotten from |
| enumerate_playback_devices() |
| port: String with the name of the port to use on the device. |
| Can be None if the device does not have multiple ports. |
| """ |
| self.set_default_device_and_port('sink', device, port) |
| logging.info('-- volume calibration all channels') |
| self.do_volume_calibration_test(device) |
| |
| |
| def do_playback_test(self, device, port): |
| """Runs the full set test tones tests on the given device. |
| |
| It does a sequence of tone test, followed by a scale test for each |
| channel individually, then again for all channels. |
| |
| Args: |
| device: device info dictionary gotten from |
| enumerate_playback_devices() |
| port: String with the name of the port to use on the device. |
| Can be None if the device does not have multiple ports. |
| """ |
| self.set_default_device_and_port('sink', device, port) |
| |
| # TODO(ajwong): chord test sounds terrible. fix & readd. |
| for channel in xrange(0, device['channels']): |
| logging.info('-- playback channel %s' % |
| device['channel_map'][channel]) |
| self.do_tone_test(device, channel) |
| self.do_scale_test(device, channel) |
| |
| # Run it once for all channels enabled. |
| logging.info('-- playback all channels') |
| self.do_tone_test(device) |
| self.do_scale_test(device) |
| |
| self.do_signal_test_end() |
| |
| |
| def get_test_volume(self, device): |
| """Attempts to guess at a good playback test volume. |
| |
| Args: |
| device: device info dictionary gotten from |
| enumerate_playback_devices() |
| """ |
| # TODO(ajwong): What is a good test volume? 50% of max default is |
| # pretty arbitrary. |
| test_volume = device['max_volume'] / 2.0 |
| if device.has_key('base_volume_percent'): |
| test_volume *= device['base_volume_percent'] / 100.0 |
| return test_volume |
| |
| |
| def do_volume_calibration_test(self, device): |
| """Play 1000Hz test tone for 5 seconds, slowly raising the volume. |
| |
| The volume will be a increased from 0 until the value of |
| get_test_volume() over the 5-second period. |
| |
| Args: |
| device: device info dictionary gotten from |
| enumerate_playback_devices() |
| """ |
| config = self.default_tone_config() |
| config['tone_length_sec'] = 5 |
| |
| # Silence the sink. |
| self.do_set_volume('sink', device['index'], 0) |
| |
| # TODO(ajwong): What is a good test volume? 50% of max default is |
| # pretty arbitrary. |
| test_volume = self.get_test_volume(device) |
| |
| tone_thread = ToneThread(self, config) |
| volume_change_thread = VolumeChangeThread(self, 'sink', |
| device['index'], |
| 0, test_volume, |
| config['tone_length_sec']) |
| volume_change_thread.start() |
| tone_thread.start() |
| tone_thread.join() |
| volume_change_thread.join() |
| |
| self.do_signal_test_end() |
| |
| |
| def do_pacmd(self, command): |
| """Helper function for invoking pacmd.""" |
| cmd = '%s %s' % (_PACMD_PATH, command) |
| logging.info(cmd) |
| return utils.system_output(self.pacmd(cmd), retain_output=True) |
| |
| |
| def do_set_volume(self, type, index, new_volume): |
| """Sets the volume for the device at index. |
| |
| Args: |
| type: 'source' or 'sink' |
| index: integer index of the pulse audio source or sink. |
| new_volume: integer volume to set the new device to. |
| """ |
| self.do_pacmd('set-%s-volume %d %d' % (type, index, new_volume)) |
| |
| |
| def do_set_mute(self, type, index, should_mute): |
| """Mutes the device at index. |
| Args: |
| type: 'source' or 'sink' |
| index: integer index of the pulse audio source or sink. |
| should_mute: boolean saying if the device should be muted. |
| """ |
| if should_mute: |
| mute_val = 1 |
| else: |
| mute_val = 0 |
| self.do_pacmd('set-%s-mute %d %d' % (type, index, mute_val)) |
| |
| |
| def play_tone(self, base_config, frequency): |
| """Convenience function to play a test tone at a given frequency. |
| |
| Args: |
| type: 'source' or 'sink' |
| index: integer index of the pulse audio source or sink. |
| new_volume: integer volume to set the new device to. |
| """ |
| config = copy.copy(base_config) |
| config['frequency'] = frequency |
| self.run_test_tones(config) |
| |
| |
| def do_tone_test(self, device, active_channel=None): |
| """Plays 10 test tones from 30Hz to 20000Hz. |
| |
| Args: |
| device: device info dictionary, gotten from |
| enumerate_playback_devices() |
| active_channel: integer identifying the channel to output test on. |
| If None, all channels are active. |
| """ |
| config = self.default_tone_config() |
| config['active_channel'] = active_channel |
| |
| # Play the low-tones. |
| self.play_tone(config, 30) |
| self.play_tone(config, 50) |
| self.play_tone(config, 100) |
| |
| # Play the mid-tones |
| self.play_tone(config, 250) |
| self.play_tone(config, 500) |
| self.play_tone(config, 1000) |
| |
| # Play the high-tones |
| self.play_tone(config, 5000) |
| self.play_tone(config, 10000) |
| self.play_tone(config, 15000) |
| self.play_tone(config, 20000) |
| |
| |
| def do_scale_test(self, device, active_channel=None): |
| """Plays the A# harmonic minor scale test on. |
| |
| Args: |
| device: device info description, gotten from |
| enumerate_playback_devices() |
| active_channel: integer identifying the channel to output test on. |
| If None, all channels are active. |
| """ |
| config = self.default_tone_config() |
| config['active_channel'] = active_channel |
| config['type'] = 'scale' |
| self.run_test_tones(config) |
| |
| |
| def do_chord_test(self): |
| """Starts 4 threads to play 4 test tones in parallel.""" |
| config = self.default_tone_config() |
| config['frequency'] = 466.16 |
| tonic = ToneThread(self, config) |
| |
| config = self.default_tone_config() |
| config['frequency'] = 554.37 |
| mediant = ToneThread(self, config) |
| |
| config = self.default_tone_config() |
| config['frequency'] = 698.46 |
| dominant = ToneThread(self, config) |
| |
| config = self.default_tone_config() |
| config['frequency'] = 932.33 |
| supertonic = ToneThread(self, config) |
| |
| tonic.start() |
| mediant.start() |
| dominant.start() |
| supertonic.start() |
| |
| tonic.join() |
| mediant.join() |
| dominant.join() |
| supertonic.join() |
| |
| |
| def run_test_tones(self, args): |
| """Runs the tone generator executable. |
| |
| Args: |
| args: A hash listing the parameters for test_tones. |
| Required keys: |
| exec - Executable to run |
| type - 'scale' or 'tone' |
| frequency - float with frequency in Hz. |
| tone_length_sec - float with length of test tone in secs. |
| channels - number of channels in output device. |
| |
| Optional keys: |
| active_channel: integer to select channel for playback. |
| None means playback on all channels. |
| """ |
| args['exec'] = self._test_tones_path |
| cmd = ('%(exec)s ' |
| '-t %(type)s -h %(frequency)f -l %(tone_length_sec)f ' |
| '-c %(channels)d' % args) |
| if args['active_channel'] is not None: |
| cmd += ' -a %s' % args['active_channel'] |
| if args['type'] == 'tone': |
| logging.info('[tone %dHz]' % args['frequency']) |
| elif args['type'] == 'scale': |
| logging.info('[A# harmonic minor scale]') |
| utils.system(self.pacmd(cmd)) |