| # Copyright 2015 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """This module provides the test utilities for audio tests using chameleon.""" |
| |
| # TODO (cychiang) Move test utilities from chameleon_audio_helpers |
| # to this module. |
| |
| import logging |
| import multiprocessing |
| import os |
| import time |
| from contextlib import contextmanager |
| |
| from autotest_lib.client.common_lib import error |
| from autotest_lib.client.cros import constants |
| from autotest_lib.client.cros.audio import audio_analysis |
| from autotest_lib.client.cros.audio import audio_data |
| |
| def check_audio_nodes(audio_facade, audio_nodes): |
| """Checks the node selected by Cros device is correct. |
| |
| @param audio_facade: A RemoteAudioFacade to access audio functions on |
| Cros device. |
| |
| @param audio_nodes: A tuple (out_audio_nodes, in_audio_nodes) containing |
| expected selected output and input nodes. |
| |
| @raises: error.TestFail if the nodes selected by Cros device are not expected. |
| |
| """ |
| curr_out_nodes, curr_in_nodes = audio_facade.get_selected_node_types() |
| out_audio_nodes, in_audio_nodes = audio_nodes |
| if (in_audio_nodes != None and |
| sorted(curr_in_nodes) != sorted(in_audio_nodes)): |
| raise error.TestFail('Wrong input node(s) selected %s ' |
| 'instead %s!' % (str(curr_in_nodes), str(in_audio_nodes))) |
| if (out_audio_nodes != None and |
| sorted(curr_out_nodes) != sorted(out_audio_nodes)): |
| raise error.TestFail('Wrong output node(s) selected %s ' |
| 'instead %s!' % (str(curr_out_nodes), str(out_audio_nodes))) |
| |
| |
| def check_plugged_nodes(audio_facade, audio_nodes): |
| """Checks the nodes that are currently plugged on Cros device are correct. |
| |
| @param audio_facade: A RemoteAudioFacade to access audio functions on |
| Cros device. |
| |
| @param audio_nodes: A tuple (out_audio_nodes, in_audio_nodes) containing |
| expected plugged output and input nodes. |
| |
| @raises: error.TestFail if the plugged nodes on Cros device are not expected. |
| |
| """ |
| curr_out_nodes, curr_in_nodes = audio_facade.get_plugged_node_types() |
| out_audio_nodes, in_audio_nodes = audio_nodes |
| if (in_audio_nodes != None and |
| sorted(curr_in_nodes) != sorted(in_audio_nodes)): |
| raise error.TestFail('Wrong input node(s) plugged %s ' |
| 'instead %s!' % (str(curr_in_nodes), str(in_audio_nodes))) |
| if (out_audio_nodes != None and |
| sorted(curr_out_nodes) != sorted(out_audio_nodes)): |
| raise error.TestFail('Wrong output node(s) plugged %s ' |
| 'instead %s!' % (str(curr_out_nodes), str(out_audio_nodes))) |
| |
| |
| def bluetooth_nodes_plugged(audio_facade): |
| """Checks bluetooth nodes are plugged. |
| |
| @param audio_facade: A RemoteAudioFacade to access audio functions on |
| Cros device. |
| |
| @raises: error.TestFail if either input or output bluetooth node is |
| not plugged. |
| |
| """ |
| curr_out_nodes, curr_in_nodes = audio_facade.get_plugged_node_types() |
| return 'BLUETOOTH' in curr_out_nodes and 'BLUETOOTH' in curr_in_nodes |
| |
| |
| def _get_board_name(host): |
| """Gets the board name. |
| |
| @param host: The CrosHost object. |
| |
| @returns: The board name. |
| |
| """ |
| return host.get_board().split(':')[1] |
| |
| |
| def correction_plug_unplug_for_audio(host, port): |
| """Plugs/unplugs several times for Cros device to detect audio. |
| |
| For issue crbug.com/450101, Exynos HDMI driver has problem recognizing |
| HDMI audio, while display can be detected. Do several plug/unplug and wait |
| as a workaround. Note that port will be in unplugged state in the end if |
| extra plug/unplug is needed. |
| |
| @param host: A CrosHost object. |
| @param port: A ChameleonVideoInput object. |
| |
| """ |
| board = _get_board_name(host) |
| if board in ['peach_pit', 'peach_pi', 'daisy', 'daisy_spring', |
| 'daisy_skate']: |
| logging.info('Need extra plug/unplug on board %s', board) |
| for _ in xrange(3): |
| port.plug() |
| time.sleep(3) |
| port.unplug() |
| time.sleep(3) |
| |
| |
| def has_internal_speaker(host): |
| """Checks if the Cros device has speaker. |
| |
| @param host: The CrosHost object. |
| |
| @returns: True if Cros device has internal speaker. False otherwise. |
| |
| """ |
| board_name = _get_board_name(host) |
| if host.get_board_type() == 'CHROMEBOX' and board_name != 'stumpy': |
| logging.info('Board %s does not have speaker.', board_name) |
| return False |
| return True |
| |
| |
| def has_internal_microphone(host): |
| """Checks if the Cros device has internal microphone. |
| |
| @param host: The CrosHost object. |
| |
| @returns: True if Cros device has internal microphone. False otherwise. |
| |
| """ |
| board_name = _get_board_name(host) |
| if host.get_board_type() == 'CHROMEBOX': |
| logging.info('Board %s does not have internal microphone.', board_name) |
| return False |
| return True |
| |
| |
| def suspend_resume(host, suspend_time_secs, resume_network_timeout_secs=50): |
| """Performs the suspend/resume on Cros device. |
| |
| @param suspend_time_secs: Time in seconds to let Cros device suspend. |
| @resume_network_timeout_secs: Time in seconds to let Cros device resume and |
| obtain network. |
| """ |
| def action_suspend(): |
| """Calls the host method suspend.""" |
| host.suspend(suspend_time=suspend_time_secs) |
| |
| boot_id = host.get_boot_id() |
| proc = multiprocessing.Process(target=action_suspend) |
| logging.info("Suspending...") |
| proc.daemon = True |
| proc.start() |
| host.test_wait_for_sleep(suspend_time_secs / 3) |
| logging.info("DUT suspended! Waiting to resume...") |
| host.test_wait_for_resume( |
| boot_id, suspend_time_secs + resume_network_timeout_secs) |
| logging.info("DUT resumed!") |
| |
| |
| def dump_cros_audio_logs(host, audio_facade, directory, suffix=''): |
| """Dumps logs for audio debugging from Cros device. |
| |
| @param host: The CrosHost object. |
| @param audio_facade: A RemoteAudioFacade to access audio functions on |
| Cros device. |
| @directory: The directory to dump logs. |
| |
| """ |
| def get_file_path(name): |
| """Gets file path to dump logs. |
| |
| @param name: The file name. |
| |
| @returns: The file path with an optional suffix. |
| |
| """ |
| file_name = '%s.%s' % (name, suffix) if suffix else name |
| file_path = os.path.join(directory, file_name) |
| return file_path |
| |
| audio_facade.dump_diagnostics(get_file_path('audio_diagnostics.txt')) |
| |
| host.get_file('/var/log/messages', get_file_path('messages')) |
| |
| host.get_file(constants.MULTIMEDIA_XMLRPC_SERVER_LOG_FILE, |
| get_file_path('multimedia_xmlrpc_server.log')) |
| |
| |
| @contextmanager |
| def monitor_no_nodes_changed(audio_facade, callback=None): |
| """Context manager to monitor nodes changed signal on Cros device. |
| |
| Starts the counter in the beginning. Stops the counter in the end to make |
| sure there is no NodesChanged signal during the try block. |
| |
| E.g. with monitor_no_nodes_changed(audio_facade): |
| do something on playback/recording |
| |
| @param audio_facade: A RemoteAudioFacade to access audio functions on |
| Cros device. |
| @param fail_callback: The callback to call before raising TestFail |
| when there is unexpected NodesChanged signals. |
| |
| @raises: error.TestFail if there is NodesChanged signal on |
| Cros device during the context. |
| |
| """ |
| try: |
| audio_facade.start_counting_signal('NodesChanged') |
| yield |
| finally: |
| count = audio_facade.stop_counting_signal() |
| if count: |
| message = 'Got %d unexpected NodesChanged signal' % count |
| logging.error(message) |
| if callback: |
| callback() |
| raise error.TestFail(message) |
| |
| |
| # The second dominant frequency should have energy less than -26dB of the |
| # first dominant frequency in the spectrum. |
| DEFAULT_SECOND_PEAK_RATIO = 0.05 |
| |
| # Tolerate more for bluetooth audio using HSP. |
| HSP_SECOND_PEAK_RATIO = 0.2 |
| |
| # The deviation of estimated dominant frequency from golden frequency. |
| DEFAULT_FREQUENCY_DIFF_THRESHOLD = 5 |
| |
| def check_recorded_frequency( |
| golden_file, recorder, |
| second_peak_ratio=DEFAULT_SECOND_PEAK_RATIO, |
| frequency_diff_threshold=DEFAULT_FREQUENCY_DIFF_THRESHOLD, |
| ignore_frequencies=None, check_anomaly=False): |
| """Checks if the recorded data contains sine tone of golden frequency. |
| |
| @param golden_file: An AudioTestData object that serves as golden data. |
| @param recorder: An AudioWidget used in the test to record data. |
| @param second_peak_ratio: The test fails when the second dominant |
| frequency has coefficient larger than this |
| ratio of the coefficient of first dominant |
| frequency. |
| @param frequency_diff_threshold: The maximum difference between estimated |
| frequency of test signal and golden |
| frequency. This value should be small for |
| signal passed through line. |
| @param ignore_frequencies: A list of frequencies to be ignored. The |
| component in the spectral with frequency too |
| close to the frequency in the list will be |
| ignored. The comparison of frequencies uses |
| frequency_diff_threshold as well. |
| @param check_anomaly: True to check anomaly in the signal. |
| |
| @raises error.TestFail if the recorded data does not contain sine tone of |
| golden frequency. |
| |
| """ |
| data_format = recorder.data_format |
| recorded_data = audio_data.AudioRawData( |
| binary=recorder.get_binary(), |
| channel=data_format['channel'], |
| sample_format=data_format['sample_format']) |
| |
| errors = [] |
| |
| for test_channel, golden_channel in enumerate(recorder.channel_map): |
| if golden_channel is None: |
| logging.info('Skipped channel %d', test_channel) |
| continue |
| |
| signal = recorded_data.channel_data[test_channel] |
| saturate_value = audio_data.get_maximum_value_from_sample_format( |
| data_format['sample_format']) |
| normalized_signal = audio_analysis.normalize_signal( |
| signal, saturate_value) |
| spectral = audio_analysis.spectral_analysis( |
| normalized_signal, data_format['rate']) |
| |
| if not spectral: |
| errors.append( |
| 'Channel %d: Can not find dominant frequency.' % |
| test_channel) |
| |
| golden_frequency = golden_file.frequencies[golden_channel] |
| logging.debug('Checking channel %s spectral %s against frequency %s', |
| test_channel, spectral, golden_frequency) |
| |
| dominant_frequency = spectral[0][0] |
| |
| if (abs(dominant_frequency - golden_frequency) > |
| frequency_diff_threshold): |
| errors.append( |
| 'Channel %d: Dominant frequency %s is away from golden %s' % |
| (test_channel, dominant_frequency, golden_frequency)) |
| |
| if check_anomaly: |
| detected_anomaly = audio_analysis.anomaly_detection( |
| signal=normalized_signal, |
| rate=data_format['rate'], |
| freq=golden_frequency) |
| if detected_anomaly: |
| errors.append( |
| 'Channel %d: Detect anomaly near these time: %s' % |
| (test_channel, detected_anomaly)) |
| else: |
| logging.info( |
| 'Channel %d: Quality is good as there is no anomaly', |
| test_channel) |
| |
| |
| def should_be_ignored(frequency): |
| """Checks if frequency is close to any frequency in ignore list. |
| |
| @param frequency: The frequency to be tested. |
| |
| @returns: True if the frequency should be ignored. False otherwise. |
| |
| """ |
| for ignore_frequency in ignore_frequencies: |
| if (abs(frequency - ignore_frequency) < |
| frequency_diff_threshold): |
| logging.debug('Ignore frequency: %s', frequency) |
| return True |
| |
| # Filter out the frequencies to be ignored. |
| if ignore_frequencies: |
| spectral = [x for x in spectral if not should_be_ignored(x[0])] |
| |
| if len(spectral) > 1: |
| first_coeff = spectral[0][1] |
| second_coeff = spectral[1][1] |
| if second_coeff > first_coeff * second_peak_ratio: |
| errors.append( |
| 'Channel %d: Found large second dominant frequencies: ' |
| '%s' % (test_channel, spectral)) |
| |
| if errors: |
| raise error.TestFail(', '.join(errors)) |