blob: 5b7e61c9394fa0b7b2c80489e0103fc0360f2cf5 [file] [log] [blame]
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""This module provides the test utilities for audio tests using chameleon."""
# TODO (cychiang) Move test utilities from chameleon_audio_helpers
# to this module.
import logging
import multiprocessing
import os
import time
from contextlib import contextmanager
from autotest_lib.client.common_lib import error
from autotest_lib.client.cros import constants
from import audio_analysis
from import audio_data
from import audio_helper
from import audio_quality_measurement
from autotest_lib.client.cros.chameleon import chameleon_audio_ids
chameleon_audio_ids.CrosIds.HDMI: 'HDMI',
chameleon_audio_ids.CrosIds.HEADPHONE: 'HEADPHONE',
chameleon_audio_ids.CrosIds.EXTERNAL_MIC: 'MIC',
chameleon_audio_ids.CrosIds.SPEAKER: 'INTERNAL_SPEAKER',
chameleon_audio_ids.CrosIds.INTERNAL_MIC: 'INTERNAL_MIC',
chameleon_audio_ids.CrosIds.BLUETOOTH_HEADPHONE: 'BLUETOOTH',
chameleon_audio_ids.CrosIds.BLUETOOTH_MIC: 'BLUETOOTH',
chameleon_audio_ids.CrosIds.USBIN: 'USB',
chameleon_audio_ids.CrosIds.USBOUT: 'USB',
def cros_port_id_to_cras_node_type(port_id):
"""Gets Cras node type from Cros port id.
@param port_id: A port id defined in chameleon_audio_ids.CrosIds.
@returns: A Cras node type defined in cras_utils.CRAS_NODE_TYPES.
def check_output_port(audio_facade, port_id):
"""Checks selected output node on Cros device is correct for a port.
@param port_id: A port id defined in chameleon_audio_ids.CrosIds.
output_node_type = cros_port_id_to_cras_node_type(port_id)
check_audio_nodes(audio_facade, ([output_node_type], None))
def check_input_port(audio_facade, port_id):
"""Checks selected input node on Cros device is correct for a port.
@param port_id: A port id defined in chameleon_audio_ids.CrosIds.
input_node_type = cros_port_id_to_cras_node_type(port_id)
check_audio_nodes(audio_facade, (None, [input_node_type]))
def check_audio_nodes(audio_facade, audio_nodes):
"""Checks the node selected by Cros device is correct.
@param audio_facade: A RemoteAudioFacade to access audio functions on
Cros device.
@param audio_nodes: A tuple (out_audio_nodes, in_audio_nodes) containing
expected selected output and input nodes.
@raises: error.TestFail if the nodes selected by Cros device are not expected.
curr_out_nodes, curr_in_nodes = audio_facade.get_selected_node_types()
out_audio_nodes, in_audio_nodes = audio_nodes
if (in_audio_nodes != None and
sorted(curr_in_nodes) != sorted(in_audio_nodes)):
raise error.TestFail('Wrong input node(s) selected %s '
'instead %s!' % (str(curr_in_nodes), str(in_audio_nodes)))
if (out_audio_nodes != None and
sorted(curr_out_nodes) != sorted(out_audio_nodes)):
raise error.TestFail('Wrong output node(s) selected %s '
'instead %s!' % (str(curr_out_nodes), str(out_audio_nodes)))
def check_plugged_nodes(audio_facade, audio_nodes):
"""Checks the nodes that are currently plugged on Cros device are correct.
@param audio_facade: A RemoteAudioFacade to access audio functions on
Cros device.
@param audio_nodes: A tuple (out_audio_nodes, in_audio_nodes) containing
expected plugged output and input nodes.
@raises: error.TestFail if the plugged nodes on Cros device are not expected.
curr_out_nodes, curr_in_nodes = audio_facade.get_plugged_node_types()
out_audio_nodes, in_audio_nodes = audio_nodes
if (in_audio_nodes != None and
sorted(curr_in_nodes) != sorted(in_audio_nodes)):
raise error.TestFail('Wrong input node(s) plugged %s '
'instead %s!' % (str(curr_in_nodes), str(in_audio_nodes)))
if (out_audio_nodes != None and
sorted(curr_out_nodes) != sorted(out_audio_nodes)):
raise error.TestFail('Wrong output node(s) plugged %s '
'instead %s!' % (str(curr_out_nodes), str(out_audio_nodes)))
def bluetooth_nodes_plugged(audio_facade):
"""Checks bluetooth nodes are plugged.
@param audio_facade: A RemoteAudioFacade to access audio functions on
Cros device.
@raises: error.TestFail if either input or output bluetooth node is
not plugged.
curr_out_nodes, curr_in_nodes = audio_facade.get_plugged_node_types()
return 'BLUETOOTH' in curr_out_nodes and 'BLUETOOTH' in curr_in_nodes
def _get_board_name(host):
"""Gets the board name.
@param host: The CrosHost object.
@returns: The board name.
return host.get_board().split(':')[1]
def has_internal_speaker(host):
"""Checks if the Cros device has speaker.
@param host: The CrosHost object.
@returns: True if Cros device has internal speaker. False otherwise.
board_name = _get_board_name(host)
if host.get_board_type() == 'CHROMEBOX' and board_name != 'stumpy':'Board %s does not have speaker.', board_name)
return False
return True
def has_internal_microphone(host):
"""Checks if the Cros device has internal microphone.
@param host: The CrosHost object.
@returns: True if Cros device has internal microphone. False otherwise.
board_name = _get_board_name(host)
if host.get_board_type() == 'CHROMEBOX':'Board %s does not have internal microphone.', board_name)
return False
return True
def suspend_resume(host, suspend_time_secs, resume_network_timeout_secs=50):
"""Performs the suspend/resume on Cros device.
@param suspend_time_secs: Time in seconds to let Cros device suspend.
@resume_network_timeout_secs: Time in seconds to let Cros device resume and
obtain network.
def action_suspend():
"""Calls the host method suspend."""
boot_id = host.get_boot_id()
proc = multiprocessing.Process(target=action_suspend)"Suspending...")
proc.daemon = True
host.test_wait_for_sleep(suspend_time_secs / 3)"DUT suspended! Waiting to resume...")
boot_id, suspend_time_secs + resume_network_timeout_secs)"DUT resumed!")
def dump_cros_audio_logs(host, audio_facade, directory, suffix=''):
"""Dumps logs for audio debugging from Cros device.
@param host: The CrosHost object.
@param audio_facade: A RemoteAudioFacade to access audio functions on
Cros device.
@directory: The directory to dump logs.
def get_file_path(name):
"""Gets file path to dump logs.
@param name: The file name.
@returns: The file path with an optional suffix.
file_name = '%s.%s' % (name, suffix) if suffix else name
file_path = os.path.join(directory, file_name)
return file_path
host.get_file('/var/log/messages', get_file_path('messages'))
def monitor_no_nodes_changed(audio_facade, callback=None):
"""Context manager to monitor nodes changed signal on Cros device.
Starts the counter in the beginning. Stops the counter in the end to make
sure there is no NodesChanged signal during the try block.
E.g. with monitor_no_nodes_changed(audio_facade):
do something on playback/recording
@param audio_facade: A RemoteAudioFacade to access audio functions on
Cros device.
@param fail_callback: The callback to call before raising TestFail
when there is unexpected NodesChanged signals.
@raises: error.TestFail if there is NodesChanged signal on
Cros device during the context.
count = audio_facade.stop_counting_signal()
if count:
message = 'Got %d unexpected NodesChanged signal' % count
if callback:
raise error.TestFail(message)
# The second dominant frequency should have energy less than -26dB of the
# first dominant frequency in the spectrum.
# Tolerate more noise for bluetooth audio using HSP.
# Tolerate more noise for speaker.
# Tolerate more noise for internal microphone.
# maximum tolerant noise level
# If relative error of two durations is less than 0.2,
# they will be considered equivalent.
def get_second_peak_ratio(source_id, recorder_id, is_hsp=False):
"""Gets the second peak ratio suitable for use case.
@param source_id: ID defined in chameleon_audio_ids for source widget.
@param recorder_id: ID defined in chameleon_audio_ids for recorder widget.
@param is_hsp: For bluetooth HSP use case.
@returns: A float for proper second peak ratio to be used in
if is_hsp:
elif source_id == chameleon_audio_ids.CrosIds.SPEAKER:
elif recorder_id == chameleon_audio_ids.CrosIds.INTERNAL_MIC:
# The deviation of estimated dominant frequency from golden frequency.
def check_recorded_frequency(
golden_file, recorder,
ignore_frequencies=None, check_anomaly=False, check_artifacts=False,
mute_durations=None, volume_changes=None,
"""Checks if the recorded data contains sine tone of golden frequency.
@param golden_file: An AudioTestData object that serves as golden data.
@param recorder: An AudioWidget used in the test to record data.
@param second_peak_ratio: The test fails when the second dominant
frequency has coefficient larger than this
ratio of the coefficient of first dominant
@param frequency_diff_threshold: The maximum difference between estimated
frequency of test signal and golden
frequency. This value should be small for
signal passed through line.
@param ignore_frequencies: A list of frequencies to be ignored. The
component in the spectral with frequency too
close to the frequency in the list will be
ignored. The comparison of frequencies uses
frequency_diff_threshold as well.
@param check_anomaly: True to check anomaly in the signal.
@param check_artifacts: True to check artifacts in the signal.
@param mute_durations: Each duration of mute in seconds in the signal.
@param volume_changes: A list containing alternative -1 for decreasing
volume and +1 for increasing volume.
@param tolerant_noise_level: The maximum noise level can be tolerated
@returns: A list containing tuples of (dominant_frequency, coefficient) for
valid channels. Coefficient can be a measure of signal magnitude
on that dominant frequency. Invalid channels where golden_channel
is None are ignored.
@raises error.TestFail if the recorded data does not contain sine tone of
golden frequency.
if not ignore_frequencies:
ignore_frequencies = []
# Also ignore harmonics of ignore frequencies.
ignore_frequencies_harmonics = []
for ignore_freq in ignore_frequencies:
ignore_frequencies_harmonics += [ignore_freq * n for n in xrange(1, 4)]
data_format = recorder.data_format
recorded_data = audio_data.AudioRawData(
errors = []
dominant_spectrals = []
for test_channel, golden_channel in enumerate(recorder.channel_map):
if golden_channel is None:'Skipped channel %d', test_channel)
signal = recorded_data.channel_data[test_channel]
saturate_value = audio_data.get_maximum_value_from_sample_format(
logging.debug('Channel %d max signal: %f', test_channel, max(signal))
normalized_signal = audio_analysis.normalize_signal(
signal, saturate_value)
logging.debug('saturate_value: %f', saturate_value)
logging.debug('max signal after normalized: %f', max(normalized_signal))
spectral = audio_analysis.spectral_analysis(
normalized_signal, data_format['rate'])
logging.debug('spectral: %s', spectral)
if not spectral:
'Channel %d: Can not find dominant frequency.' %
golden_frequency = golden_file.frequencies[golden_channel]
logging.debug('Checking channel %s spectral %s against frequency %s',
test_channel, spectral, golden_frequency)
dominant_frequency = spectral[0][0]
if (abs(dominant_frequency - golden_frequency) >
'Channel %d: Dominant frequency %s is away from golden %s' %
(test_channel, dominant_frequency, golden_frequency))
if check_anomaly:
detected_anomaly = audio_analysis.anomaly_detection(
if detected_anomaly:
'Channel %d: Detect anomaly near these time: %s' %
(test_channel, detected_anomaly))
'Channel %d: Quality is good as there is no anomaly',
if check_artifacts or mute_durations or volume_changes:
result = audio_quality_measurement.quality_measurement(
if check_artifacts:
if len(result['artifacts']['noise_before_playback']) > 0:
'Channel %d: Detects artifacts before playing near'
' these time and duration: %s' %
if len(result['artifacts']['noise_after_playback']) > 0:
'Channel %d: Detects artifacts after playing near'
' these time and duration: %s' %
if mute_durations:
delays = result['artifacts']['delay_during_playback']
delay_durations = []
for x in delays:
mute_matched, delay_matched = longest_common_subsequence(
# updated delay list
new_delays = [delays[i]
for i in delay_matched if not delay_matched[i]]
result['artifacts']['delay_during_playback'] = new_delays
unmatched_mutes = [mute_durations[i]
for i in mute_matched if not mute_matched[i]]
if len(unmatched_mutes) > 0:
'Channel %d: Unmatched mute duration: %s' %
(test_channel, unmatched_mutes))
if check_artifacts:
if len(result['artifacts']['delay_during_playback']) > 0:
'Channel %d: Detects delay during playing near'
' these time and duration: %s' %
if len(result['artifacts']['burst_during_playback']) > 0:
'Channel %d: Detects burst/pop near these time: %s' %
if result['equivalent_noise_level'] > tolerant_noise_level:
'Channel %d: noise level is higher than tolerant'
' noise level: %f > %f' %
if volume_changes:
matched = True
volume_changing = result['volume_changes']
if len(volume_changing) != len(volume_changes):
matched = False
for i in xrange(len(volume_changing)):
if volume_changing[i][1] != volume_changes[i]:
matched = False
if not matched:
'Channel %d: volume changing is not as expected, '
'found changing time and events are: %s while '
'expected changing events are %s'%
# Filter out the harmonics resulted from imperfect sin wave.
# This list is different for different channels.
harmonics = [dominant_frequency * n for n in xrange(2, 10)]
def should_be_ignored(frequency):
"""Checks if frequency is close to any frequency in ignore list.
@param frequency: The frequency to be tested.
@returns: True if the frequency should be ignored. False otherwise.
for ignore_frequency in ignore_frequencies_harmonics + harmonics:
if (abs(frequency - ignore_frequency) <
logging.debug('Ignore frequency: %s', frequency)
return True
# Filter out the frequencies to be ignored.
spectral = [x for x in spectral if not should_be_ignored(x[0])]
if len(spectral) > 1:
first_coeff = spectral[0][1]
second_coeff = spectral[1][1]
if second_coeff > first_coeff * second_peak_ratio:
'Channel %d: Found large second dominant frequencies: '
'%s' % (test_channel, spectral))
if errors:
raise error.TestFail(', '.join(errors))
return dominant_spectrals
def longest_common_subsequence(list1, list2, equivalent_threshold):
"""Finds longest common subsequence of list1 and list2
Such as list1: [0.3, 0.4],
list2: [0.001, 0.299, 0.002, 0.401, 0.001]
equivalent_threshold: 0.001
it will return matched1: [True, True],
matched2: [False, True, False, True, False]
@param list1: a list of integer or float value
@param list2: a list of integer or float value
@param equivalent_threshold: two values are considered equivalent if their
relative error is less than
@returns: a tuple of list (matched_1, matched_2) indicating each item
of list1 and list2 are matched or not.
length1, length2 = len(list1), len(list2)
matching = [[0] * (length2 + 1)] * (length1 + 1)
# matching[i][j] is the maximum number of matched pairs for first i items
# in list1 and first j items in list2.
for i in xrange(length1):
for j in xrange(length2):
# Maximum matched pairs may be obtained without
# i-th item in list1 or without j-th item in list2
matching[i + 1][j + 1] = max(matching[i + 1][j],
matching[i][j + 1])
diff = abs(list1[i] - list2[j])
relative_error = diff / list1[i]
# If i-th item in list1 can be matched to j-th item in list2
if relative_error < equivalent_threshold:
matching[i + 1][j + 1] = matching[i][j] + 1
# Backtracking which item in list1 and list2 are matched
matched1 = [False] * length1
matched2 = [False] * length2
i, j = length1, length2
while i > 0 and j > 0:
# Maximum number is obtained by matching i-th item in list1
# and j-th one in list2.
if matching[i][j] == matching[i - 1][j - 1] + 1:
matched1[i - 1] = True
matched2[j - 1] = True
i, j = i - 1, j - 1
elif matching[i][j] == matching[i - 1][j]:
i -= 1
j -= 1
return (matched1, matched2)
def switch_to_hsp(audio_facade):
"""Switches to HSP profile.
Selects bluetooth microphone and runs a recording process on Cros device.
This triggers bluetooth profile be switched from A2DP to HSP.
Note the user can call stop_recording on audio facade to stop the recording
process, or let multimedia_xmlrpc_server terminates it in its cleanup.
audio_facade.set_chrome_active_node_type(None, 'BLUETOOTH')
check_audio_nodes(audio_facade, (None, ['BLUETOOTH']))
dict(file_type='raw', sample_format='S16_LE', channel=2,
def compare_recorded_correlation(golden_file, recorder, parameters=None):
"""Checks recorded audio in an AudioInputWidget against a golden file.
Compares recorded data with golden data by cross correlation method.
Refer to audio_helper.compare_data for details of comparison.
@param golden_file: An AudioTestData object that serves as golden data.
@param recorder: An AudioInputWidget that has recorded some audio data.
@param parameters: A dict containing parameters for method.
"""'Comparing recorded data with golden file %s ...',
golden_file.get_binary(), golden_file.data_format,
recorder.get_binary(), recorder.data_format, recorder.channel_map,