blob: 3a0ca24ccb86c833b10f5c8faa0b94d4856a1cc3 [file] [log] [blame]
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""This module provides the test utilities for audio tests using chameleon."""
# TODO (cychiang) Move test utilities from chameleon_audio_helpers
# to this module.
import logging
import multiprocessing
import os
import pprint
import re
from contextlib import contextmanager
from autotest_lib.client.common_lib import error
from autotest_lib.client.cros import constants
from import audio_analysis
from import audio_spec
from import audio_data
from import audio_helper
from import audio_quality_measurement
from autotest_lib.client.cros.chameleon import chameleon_audio_ids
chameleon_audio_ids.CrosIds.HDMI: 'HDMI',
chameleon_audio_ids.CrosIds.HEADPHONE: 'HEADPHONE',
chameleon_audio_ids.CrosIds.EXTERNAL_MIC: 'MIC',
chameleon_audio_ids.CrosIds.SPEAKER: 'INTERNAL_SPEAKER',
chameleon_audio_ids.CrosIds.INTERNAL_MIC: 'INTERNAL_MIC',
chameleon_audio_ids.CrosIds.BLUETOOTH_HEADPHONE: 'BLUETOOTH',
chameleon_audio_ids.CrosIds.BLUETOOTH_MIC: 'BLUETOOTH',
chameleon_audio_ids.CrosIds.USBIN: 'USB',
chameleon_audio_ids.CrosIds.USBOUT: 'USB',
def cros_port_id_to_cras_node_type(port_id):
"""Gets Cras node type from Cros port id.
@param port_id: A port id defined in chameleon_audio_ids.CrosIds.
@returns: A Cras node type defined in cras_utils.CRAS_NODE_TYPES.
def check_output_port(audio_facade, port_id):
"""Checks selected output node on Cros device is correct for a port.
@param port_id: A port id defined in chameleon_audio_ids.CrosIds.
output_node_type = cros_port_id_to_cras_node_type(port_id)
check_audio_nodes(audio_facade, ([output_node_type], None))
def check_input_port(audio_facade, port_id):
"""Checks selected input node on Cros device is correct for a port.
@param port_id: A port id defined in chameleon_audio_ids.CrosIds.
input_node_type = cros_port_id_to_cras_node_type(port_id)
check_audio_nodes(audio_facade, (None, [input_node_type]))
def check_audio_nodes(audio_facade, audio_nodes):
"""Checks the node selected by Cros device is correct.
@param audio_facade: A RemoteAudioFacade to access audio functions on
Cros device.
@param audio_nodes: A tuple (out_audio_nodes, in_audio_nodes) containing
expected selected output and input nodes.
@raises: error.TestFail if the nodes selected by Cros device are not expected.
curr_out_nodes, curr_in_nodes = audio_facade.get_selected_node_types()
out_audio_nodes, in_audio_nodes = audio_nodes
if (in_audio_nodes != None and
sorted(curr_in_nodes) != sorted(in_audio_nodes)):
raise error.TestFail('Wrong input node(s) selected: %s '
'expected: %s' % (str(curr_in_nodes), str(in_audio_nodes)))
# Treat line-out node as headphone node in Chameleon test since some
# Cros devices detect audio board as lineout. This actually makes sense
# because 3.5mm audio jack is connected to LineIn port on Chameleon.
if (out_audio_nodes == ['HEADPHONE'] and curr_out_nodes == ['LINEOUT']):
if (out_audio_nodes != None and
sorted(curr_out_nodes) != sorted(out_audio_nodes)):
raise error.TestFail('Wrong output node(s) selected %s '
'expected: %s' % (str(curr_out_nodes), str(out_audio_nodes)))
def check_plugged_nodes(audio_facade, audio_nodes):
"""Checks the nodes that are currently plugged on Cros device are correct.
@param audio_facade: A RemoteAudioFacade to access audio functions on
Cros device.
@param audio_nodes: A tuple (out_audio_nodes, in_audio_nodes) containing
expected plugged output and input nodes.
@raises: error.TestFail if the plugged nodes on Cros device are not expected.
curr_out_nodes, curr_in_nodes = audio_facade.get_plugged_node_types()
out_audio_nodes, in_audio_nodes = audio_nodes
if (in_audio_nodes != None and
sorted(curr_in_nodes) != sorted(in_audio_nodes)):
raise error.TestFail('Wrong input node(s) plugged: %s '
'expected: %s!' % (str(curr_in_nodes), str(in_audio_nodes)))
if (out_audio_nodes != None and
sorted(curr_out_nodes) != sorted(out_audio_nodes)):
raise error.TestFail('Wrong output node(s) plugged: %s '
'expected: %s!' % (str(curr_out_nodes), str(out_audio_nodes)))
def bluetooth_nodes_plugged(audio_facade):
"""Checks bluetooth nodes are plugged.
@param audio_facade: A RemoteAudioFacade to access audio functions on
Cros device.
@raises: error.TestFail if either input or output bluetooth node is
not plugged.
curr_out_nodes, curr_in_nodes = audio_facade.get_plugged_node_types()
return 'BLUETOOTH' in curr_out_nodes and 'BLUETOOTH' in curr_in_nodes
def get_board_name(host):
"""Gets the board name.
@param host: The CrosHost object.
@returns: The board name.
return host.get_board().split(':')[1]
def has_internal_speaker(host):
"""Checks if the Cros device has speaker.
@param host: The CrosHost object.
@returns: True if Cros device has internal speaker. False otherwise.
board_name = get_board_name(host)
if not audio_spec.has_internal_speaker(host.get_board_type(), board_name):'Board %s does not have speaker.', board_name)
return False
return True
def has_internal_microphone(host):
"""Checks if the Cros device has internal microphone.
@param host: The CrosHost object.
@returns: True if Cros device has internal microphone. False otherwise.
board_name = get_board_name(host)
if not audio_spec.has_internal_microphone(host.get_board_type()):'Board %s does not have internal microphone.', board_name)
return False
return True
def has_headphone(host):
"""Checks if the Cros device has headphone.
@param host: The CrosHost object.
@returns: True if Cros device has headphone. False otherwise.
board_name = get_board_name(host)
if not audio_spec.has_headphone(host.get_board_type()):'Board %s does not have headphone.', board_name)
return False
return True
def has_hotwording(host):
"""Checks if the Cros device has hotwording.
@param host: The CrosHost object.
@returns: True if the board has hotwording. False otherwise.
board_name = get_board_name(host)
model_name = host.get_platform()
return audio_spec.has_hotwording(board_name, model_name)
def suspend_resume(host, suspend_time_secs, resume_network_timeout_secs=50):
"""Performs the suspend/resume on Cros device.
@param suspend_time_secs: Time in seconds to let Cros device suspend.
@resume_network_timeout_secs: Time in seconds to let Cros device resume and
obtain network.
def action_suspend():
"""Calls the host method suspend."""
boot_id = host.get_boot_id()
proc = multiprocessing.Process(target=action_suspend)"Suspending...")
proc.daemon = True
host.test_wait_for_sleep(suspend_time_secs / 3)"DUT suspended! Waiting to resume...")
boot_id, suspend_time_secs + resume_network_timeout_secs)"DUT resumed!")
def dump_cros_audio_logs(host, audio_facade, directory, suffix='',
"""Dumps logs for audio debugging from Cros device.
@param host: The CrosHost object.
@param audio_facade: A RemoteAudioFacade to access audio functions on
Cros device.
@directory: The directory to dump logs.
def get_file_path(name):
"""Gets file path to dump logs.
@param name: The file name.
@returns: The file path with an optional suffix.
file_name = '%s.%s' % (name, suffix) if suffix else name
file_path = os.path.join(directory, file_name)
return file_path
host.get_file('/var/log/messages', get_file_path('messages'))
# Raising error if any warning messages in the audio diagnostics
if fail_if_warnings:
audio_logs = examine_audio_diagnostics(get_file_path(
if audio_logs != '':
raise error.TestFail(audio_logs)
def examine_audio_diagnostics(path):
"""Examines audio diagnostic content.
@param path: Path to audio diagnostic file.
@returns: Warning messages or ''.
warning_msgs = []
line_number = 1
underrun_pattern = re.compile('num_underruns: (\d*)')
with open(path) as f:
for line in f.readlines():
# Check for number of underruns.
search_result =
if search_result:
num_underruns = int(
if num_underruns != 0:
'Found %d underrun at line %d: %s' % (
num_underruns, line_number, line))
# TODO(cychiang) add other check like maximum client reply delay.
line_number = line_number + 1
if warning_msgs:
return ('Found issue in audio diganostics result : %s' %
'\n'.join(warning_msgs))'audio_diagnostic result looks fine')
return ''
def monitor_no_nodes_changed(audio_facade, callback=None):
"""Context manager to monitor nodes changed signal on Cros device.
Starts the counter in the beginning. Stops the counter in the end to make
sure there is no NodesChanged signal during the try block.
E.g. with monitor_no_nodes_changed(audio_facade):
do something on playback/recording
@param audio_facade: A RemoteAudioFacade to access audio functions on
Cros device.
@param fail_callback: The callback to call before raising TestFail
when there is unexpected NodesChanged signals.
@raises: error.TestFail if there is NodesChanged signal on
Cros device during the context.
count = audio_facade.stop_counting_signal()
if count:
message = 'Got %d unexpected NodesChanged signal' % count
if callback:
raise error.TestFail(message)
# The second dominant frequency should have energy less than -26dB of the
# first dominant frequency in the spectrum.
# Tolerate more noise for bluetooth audio using HSP.
# Tolerate more noise for speaker.
# Tolerate more noise for internal microphone.
# maximum tolerant noise level
# If relative error of two durations is less than 0.2,
# they will be considered equivalent.
# The frequency at lower than _DC_FREQ_THRESHOLD should have coefficient
# smaller than _DC_COEFF_THRESHOLD.
def get_second_peak_ratio(source_id, recorder_id, is_hsp=False):
"""Gets the second peak ratio suitable for use case.
@param source_id: ID defined in chameleon_audio_ids for source widget.
@param recorder_id: ID defined in chameleon_audio_ids for recorder widget.
@param is_hsp: For bluetooth HSP use case.
@returns: A float for proper second peak ratio to be used in
if is_hsp:
elif source_id == chameleon_audio_ids.CrosIds.SPEAKER:
elif recorder_id == chameleon_audio_ids.CrosIds.INTERNAL_MIC:
# The deviation of estimated dominant frequency from golden frequency.
def check_recorded_frequency(
golden_file, recorder,
ignore_frequencies=None, check_anomaly=False, check_artifacts=False,
mute_durations=None, volume_changes=None,
"""Checks if the recorded data contains sine tone of golden frequency.
@param golden_file: An AudioTestData object that serves as golden data.
@param recorder: An AudioWidget used in the test to record data.
@param second_peak_ratio: The test fails when the second dominant
frequency has coefficient larger than this
ratio of the coefficient of first dominant
@param frequency_diff_threshold: The maximum difference between estimated
frequency of test signal and golden
frequency. This value should be small for
signal passed through line.
@param ignore_frequencies: A list of frequencies to be ignored. The
component in the spectral with frequency too
close to the frequency in the list will be
ignored. The comparison of frequencies uses
frequency_diff_threshold as well.
@param check_anomaly: True to check anomaly in the signal.
@param check_artifacts: True to check artifacts in the signal.
@param mute_durations: Each duration of mute in seconds in the signal.
@param volume_changes: A list containing alternative -1 for decreasing
volume and +1 for increasing volume.
@param tolerant_noise_level: The maximum noise level can be tolerated
@returns: A list containing tuples of (dominant_frequency, coefficient) for
valid channels. Coefficient can be a measure of signal magnitude
on that dominant frequency. Invalid channels where golden_channel
is None are ignored.
@raises error.TestFail if the recorded data does not contain sine tone of
golden frequency.
if not ignore_frequencies:
ignore_frequencies = []
# Also ignore harmonics of ignore frequencies.
ignore_frequencies_harmonics = []
for ignore_freq in ignore_frequencies:
ignore_frequencies_harmonics += [ignore_freq * n for n in xrange(1, 4)]
data_format = recorder.data_format
recorded_data = audio_data.AudioRawData(
errors = []
dominant_spectrals = []
for test_channel, golden_channel in enumerate(recorder.channel_map):
if golden_channel is None:'Skipped channel %d', test_channel)
signal = recorded_data.channel_data[test_channel]
saturate_value = audio_data.get_maximum_value_from_sample_format(
logging.debug('Channel %d max signal: %f', test_channel, max(signal))
normalized_signal = audio_analysis.normalize_signal(
signal, saturate_value)
logging.debug('saturate_value: %f', saturate_value)
logging.debug('max signal after normalized: %f', max(normalized_signal))
spectral = audio_analysis.spectral_analysis(
normalized_signal, data_format['rate'])
logging.debug('spectral: %s', spectral)
if not spectral:
'Channel %d: Can not find dominant frequency.' %
golden_frequency = golden_file.frequencies[golden_channel]
logging.debug('Checking channel %s spectral %s against frequency %s',
test_channel, spectral, golden_frequency)
dominant_frequency = spectral[0][0]
if (abs(dominant_frequency - golden_frequency) >
'Channel %d: Dominant frequency %s is away from golden %s' %
(test_channel, dominant_frequency, golden_frequency))
if check_anomaly:
detected_anomaly = audio_analysis.anomaly_detection(
if detected_anomaly:
'Channel %d: Detect anomaly near these time: %s' %
(test_channel, detected_anomaly))
'Channel %d: Quality is good as there is no anomaly',
if check_artifacts or mute_durations or volume_changes:
result = audio_quality_measurement.quality_measurement(
logging.debug('Quality measurement result:\n%s', pprint.pformat(result))
if check_artifacts:
if len(result['artifacts']['noise_before_playback']) > 0:
'Channel %d: Detects artifacts before playing near'
' these time and duration: %s' %
if len(result['artifacts']['noise_after_playback']) > 0:
'Channel %d: Detects artifacts after playing near'
' these time and duration: %s' %
if mute_durations:
delays = result['artifacts']['delay_during_playback']
delay_durations = []
for x in delays:
mute_matched, delay_matched = longest_common_subsequence(
# updated delay list
new_delays = [delays[i]
for i in delay_matched if not delay_matched[i]]
result['artifacts']['delay_during_playback'] = new_delays
unmatched_mutes = [mute_durations[i]
for i in mute_matched if not mute_matched[i]]
if len(unmatched_mutes) > 0:
'Channel %d: Unmatched mute duration: %s' %
(test_channel, unmatched_mutes))
if check_artifacts:
if len(result['artifacts']['delay_during_playback']) > 0:
'Channel %d: Detects delay during playing near'
' these time and duration: %s' %
if len(result['artifacts']['burst_during_playback']) > 0:
'Channel %d: Detects burst/pop near these time: %s' %
if result['equivalent_noise_level'] > tolerant_noise_level:
'Channel %d: noise level is higher than tolerant'
' noise level: %f > %f' %
if volume_changes:
matched = True
volume_changing = result['volume_changes']
if len(volume_changing) != len(volume_changes):
matched = False
for i in xrange(len(volume_changing)):
if volume_changing[i][1] != volume_changes[i]:
matched = False
if not matched:
'Channel %d: volume changing is not as expected, '
'found changing time and events are: %s while '
'expected changing events are %s'%
# Filter out the harmonics resulted from imperfect sin wave.
# This list is different for different channels.
harmonics = [dominant_frequency * n for n in xrange(2, 10)]
def should_be_ignored(frequency):
"""Checks if frequency is close to any frequency in ignore list.
The ignore list is harmonics of frequency to be ignored
(like power noise), plus harmonics of dominant frequencies,
plus DC.
@param frequency: The frequency to be tested.
@returns: True if the frequency should be ignored. False otherwise.
for ignore_frequency in (ignore_frequencies_harmonics + harmonics
+ [0.0]):
if (abs(frequency - ignore_frequency) <
logging.debug('Ignore frequency: %s', frequency)
return True
# Checks DC is small enough.
for freq, coeff in spectral:
'Channel %d: Found large DC coefficient: '
'(%f Hz, %f)' % (test_channel, freq, coeff))
# Filter out the frequencies to be ignored.
spectral_post_ignore = [
x for x in spectral if not should_be_ignored(x[0])]
if len(spectral_post_ignore) > 1:
first_coeff = spectral_post_ignore[0][1]
second_coeff = spectral_post_ignore[1][1]
if second_coeff > first_coeff * second_peak_ratio:
'Channel %d: Found large second dominant frequencies: '
'%s' % (test_channel, spectral_post_ignore))
if not spectral_post_ignore:
'Channel %d: No frequency left after removing unwanted '
'frequencies. Spectral: %s; After removing unwanted '
'frequencies: %s' %
(test_channel, spectral, spectral_post_ignore))
if errors:
raise error.TestFail(', '.join(errors))
return dominant_spectrals
def longest_common_subsequence(list1, list2, equivalent_threshold):
"""Finds longest common subsequence of list1 and list2
Such as list1: [0.3, 0.4],
list2: [0.001, 0.299, 0.002, 0.401, 0.001]
equivalent_threshold: 0.001
it will return matched1: [True, True],
matched2: [False, True, False, True, False]
@param list1: a list of integer or float value
@param list2: a list of integer or float value
@param equivalent_threshold: two values are considered equivalent if their
relative error is less than
@returns: a tuple of list (matched_1, matched_2) indicating each item
of list1 and list2 are matched or not.
length1, length2 = len(list1), len(list2)
matching = [[0] * (length2 + 1)] * (length1 + 1)
# matching[i][j] is the maximum number of matched pairs for first i items
# in list1 and first j items in list2.
for i in xrange(length1):
for j in xrange(length2):
# Maximum matched pairs may be obtained without
# i-th item in list1 or without j-th item in list2
matching[i + 1][j + 1] = max(matching[i + 1][j],
matching[i][j + 1])
diff = abs(list1[i] - list2[j])
relative_error = diff / list1[i]
# If i-th item in list1 can be matched to j-th item in list2
if relative_error < equivalent_threshold:
matching[i + 1][j + 1] = matching[i][j] + 1
# Backtracking which item in list1 and list2 are matched
matched1 = [False] * length1
matched2 = [False] * length2
i, j = length1, length2
while i > 0 and j > 0:
# Maximum number is obtained by matching i-th item in list1
# and j-th one in list2.
if matching[i][j] == matching[i - 1][j - 1] + 1:
matched1[i - 1] = True
matched2[j - 1] = True
i, j = i - 1, j - 1
elif matching[i][j] == matching[i - 1][j]:
i -= 1
j -= 1
return (matched1, matched2)
def switch_to_hsp(audio_facade):
"""Switches to HSP profile.
Selects bluetooth microphone and runs a recording process on Cros device.
This triggers bluetooth profile be switched from A2DP to HSP.
Note the user can call stop_recording on audio facade to stop the recording
process, or let multimedia_xmlrpc_server terminates it in its cleanup.
audio_facade.set_chrome_active_node_type(None, 'BLUETOOTH')
check_audio_nodes(audio_facade, (None, ['BLUETOOTH']))
dict(file_type='raw', sample_format='S16_LE', channel=2,
def compare_recorded_correlation(golden_file, recorder, parameters=None):
"""Checks recorded audio in an AudioInputWidget against a golden file.
Compares recorded data with golden data by cross correlation method.
Refer to audio_helper.compare_data for details of comparison.
@param golden_file: An AudioTestData object that serves as golden data.
@param recorder: An AudioInputWidget that has recorded some audio data.
@param parameters: A dict containing parameters for method.
"""'Comparing recorded data with golden file %s ...',
golden_file.get_binary(), golden_file.data_format,
recorder.get_binary(), recorder.data_format, recorder.channel_map,
def check_and_set_chrome_active_node_types(audio_facade, output_type=None,
"""Check the target types are available, and set them to be active nodes.
@param audio_facade: An AudioFacadeNative or AudioFacadeAdapter object.
@output_type: An output node type defined in cras_utils.CRAS_NODE_TYPES.
None to skip.
@input_type: An input node type defined in cras_utils.CRAS_NODE_TYPES.
None to skip.
@raises: error.TestError if the expected node type is missing. We use
error.TestError here because usually this step is not the main
purpose of the test, but a setup step.
output_types, input_types = audio_facade.get_plugged_node_types()
logging.debug('Plugged types: output: %r, input: %r',
output_types, input_types)
if output_type and output_type not in output_types:
raise error.TestError(
'Target output type %s not present' % output_type)
if input_type and input_type not in input_types:
raise error.TestError(
'Target input type %s not present' % input_type)
audio_facade.set_chrome_active_node_type(output_type, input_type)
def check_hp_or_lineout_plugged(audio_facade):
"""Checks whether line-out or headphone is plugged.
@param audio_facade: A RemoteAudioFacade to access audio functions on
Cros device.
@returns: 'LINEOUT' if line-out node is plugged.
'HEADPHONE' if headphone node is plugged.
@raises: error.TestFail if the plugged nodes does not contain one of
# Checks whether line-out or headphone is detected.
output_nodes, _ = audio_facade.get_plugged_node_types()
if 'LINEOUT' in output_nodes:
return 'LINEOUT'
if 'HEADPHONE' in output_nodes:
return 'HEADPHONE'
raise error.TestFail('Can not detect line-out or headphone')