# Lint as: python2, python3
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""This module provides the test utilities for audio tests using chameleon."""

# TODO (cychiang) Move test utilities from chameleon_audio_helpers
# to this module.

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import logging
import multiprocessing
import os
import pprint
import re
from contextlib import contextmanager

from autotest_lib.client.common_lib import error
from autotest_lib.client.bin import utils
from autotest_lib.client.cros import constants
from autotest_lib.client.cros.audio import audio_analysis
from autotest_lib.client.cros.audio import audio_spec
from autotest_lib.client.cros.audio import audio_data
from autotest_lib.client.cros.audio import audio_helper
from autotest_lib.client.cros.audio import audio_quality_measurement
from autotest_lib.client.cros.chameleon import chameleon_audio_ids
from six.moves import range

CHAMELEON_AUDIO_IDS_TO_CRAS_NODE_TYPES = {
        chameleon_audio_ids.CrosIds.HDMI: 'HDMI',
        chameleon_audio_ids.CrosIds.HEADPHONE: 'HEADPHONE',
        chameleon_audio_ids.CrosIds.EXTERNAL_MIC: 'MIC',
        chameleon_audio_ids.CrosIds.SPEAKER: 'INTERNAL_SPEAKER',
        chameleon_audio_ids.CrosIds.INTERNAL_MIC: 'INTERNAL_MIC',
        chameleon_audio_ids.CrosIds.BLUETOOTH_HEADPHONE: 'BLUETOOTH',
        chameleon_audio_ids.CrosIds.BLUETOOTH_MIC: 'BLUETOOTH',
        chameleon_audio_ids.CrosIds.USBIN: 'USB',
        chameleon_audio_ids.CrosIds.USBOUT: 'USB',
}


def cros_port_id_to_cras_node_type(port_id):
    """Gets Cras node type from Cros port id.

    @param port_id: A port id defined in chameleon_audio_ids.CrosIds.

    @returns: A Cras node type defined in cras_utils.CRAS_NODE_TYPES.

    """
    return CHAMELEON_AUDIO_IDS_TO_CRAS_NODE_TYPES[port_id]


def check_output_port(audio_facade, port_id):
    """Checks selected output node on Cros device is correct for a port.

    @param port_id: A port id defined in chameleon_audio_ids.CrosIds.

    """
    output_node_type = cros_port_id_to_cras_node_type(port_id)
    check_audio_nodes(audio_facade, ([output_node_type], None))


def check_input_port(audio_facade, port_id):
    """Checks selected input node on Cros device is correct for a port.

    @param port_id: A port id defined in chameleon_audio_ids.CrosIds.

    """
    input_node_type = cros_port_id_to_cras_node_type(port_id)
    check_audio_nodes(audio_facade, (None, [input_node_type]))


def check_audio_nodes(audio_facade, audio_nodes):
    """Checks the node selected by Cros device is correct.

    @param audio_facade: A RemoteAudioFacade to access audio functions on
                         Cros device.

    @param audio_nodes: A tuple (out_audio_nodes, in_audio_nodes) containing
                        expected selected output and input nodes.

    @raises: error.TestFail if the nodes selected by Cros device are not expected.

    """
    curr_out_nodes, curr_in_nodes = audio_facade.get_selected_node_types()
    out_audio_nodes, in_audio_nodes = audio_nodes
    if (in_audio_nodes != None
                and sorted(curr_in_nodes) != sorted(in_audio_nodes)):
        raise error.TestFail(
                'Wrong input node(s) selected: %s '
                'expected: %s' % (str(curr_in_nodes), str(in_audio_nodes)))

    # Treat line-out node as headphone node in Chameleon test since some
    # Cros devices detect audio board as lineout. This actually makes sense
    # because 3.5mm audio jack is connected to LineIn port on Chameleon.
    if (out_audio_nodes == ['HEADPHONE'] and curr_out_nodes == ['LINEOUT']):
        return

    if (out_audio_nodes != None
                and sorted(curr_out_nodes) != sorted(out_audio_nodes)):
        raise error.TestFail(
                'Wrong output node(s) selected %s '
                'expected: %s' % (str(curr_out_nodes), str(out_audio_nodes)))


def check_plugged_nodes_contain(audio_facade, audio_nodes):
    """Checks the nodes needed to be plugged on Cros device are plugged.

    @param audio_facade: A RemoteAudioFacade to access audio functions on
                         Cros device.

    @param audio_nodes: A tuple (out_audio_nodes, in_audio_nodes) containing
                        expected plugged output and input nodes.

    @raises: error.TestFail if the plugged nodes on Cros device are not plugged.

    """
    curr_out_nodes, curr_in_nodes = audio_facade.get_plugged_node_types()
    out_audio_nodes, in_audio_nodes = audio_nodes
    if in_audio_nodes != None:
        for node in in_audio_nodes:
            if node not in curr_in_nodes:
                raise error.TestFail('Wrong input node(s) plugged: %s '
                                     'expected %s to be plugged!' %
                                     (str(curr_in_nodes), str(in_audio_nodes)))
    if out_audio_nodes != None:
        for node in out_audio_nodes:
            if node not in curr_out_nodes:
                raise error.TestFail(
                        'Wrong output node(s) plugged: %s '
                        'expected %s to be plugged!' % (str(curr_out_nodes),
                                                        str(out_audio_nodes)))


def check_plugged_nodes(audio_facade, audio_nodes):
    """Checks the nodes that are currently plugged on Cros device are correct.

    @param audio_facade: A RemoteAudioFacade to access audio functions on
                         Cros device.

    @param audio_nodes: A tuple (out_audio_nodes, in_audio_nodes) containing
                        expected plugged output and input nodes.

    @raises: error.TestFail if the plugged nodes on Cros device are not expected.

    """
    curr_out_nodes, curr_in_nodes = audio_facade.get_plugged_node_types()
    out_audio_nodes, in_audio_nodes = audio_nodes
    if (in_audio_nodes != None
                and sorted(curr_in_nodes) != sorted(in_audio_nodes)):
        raise error.TestFail('Wrong input node(s) plugged: %s '
                             'expected: %s!' % (str(sorted(curr_in_nodes)),
                                                str(sorted(in_audio_nodes))))
    if (out_audio_nodes != None
                and sorted(curr_out_nodes) != sorted(out_audio_nodes)):
        raise error.TestFail('Wrong output node(s) plugged: %s '
                             'expected: %s!' % (str(sorted(curr_out_nodes)),
                                                str(sorted(out_audio_nodes))))


def bluetooth_nodes_plugged(audio_facade):
    """Checks bluetooth nodes are plugged.

    @param audio_facade: A RemoteAudioFacade to access audio functions on
                         Cros device.

    @raises: error.TestFail if either input or output bluetooth node is
             not plugged.

    """
    curr_out_nodes, curr_in_nodes = audio_facade.get_plugged_node_types()
    return 'BLUETOOTH' in curr_out_nodes and 'BLUETOOTH' in curr_in_nodes


def get_board_name(host):
    """Gets the board name.

    @param host: The CrosHost object.

    @returns: The board name.

    """
    return host.get_board().split(':')[1]


def has_internal_speaker(host):
    """Checks if the Cros device has speaker.

    @param host: The CrosHost object.

    @returns: True if Cros device has internal speaker. False otherwise.

    """
    board_name = get_board_name(host)
    if not audio_spec.has_internal_speaker(host.get_board_type(), board_name):
        logging.info('Board %s does not have speaker.', board_name)
        return False
    return True


def has_internal_microphone(host):
    """Checks if the Cros device has internal microphone.

    @param host: The CrosHost object.

    @returns: True if Cros device has internal microphone. False otherwise.

    """
    board_name = get_board_name(host)
    if not audio_spec.has_internal_microphone(host.get_board_type()):
        logging.info('Board %s does not have internal microphone.', board_name)
        return False
    return True


def has_audio_jack(host):
    """Checks if the Cros device has a 3.5mm audio jack.

    @param host: The CrosHost object.

    @returns: True if Cros device has it. False otherwise.

    """
    board_name = get_board_name(host)
    if not audio_spec.has_audio_jack(board_name, host.get_board_type()):
        logging.info('Board %s does not have a audio jack.', board_name)
        return False
    return True


def has_hotwording(host):
    """Checks if the Cros device has hotwording.

    @param host: The CrosHost object.

    @returns: True if the board has hotwording. False otherwise.

    """
    board_name = get_board_name(host)
    model_name = host.get_platform()

    return audio_spec.has_hotwording(board_name, model_name)


def has_echo_reference(host):
    """Checks if the Cros device has echo reference.

    @param host: The CrosHost object.

    @returns: True if the board has echo reference. False otherwise.

    """
    return audio_spec.has_echo_reference(get_board_name(host))

def suspend_resume(host, suspend_time_secs=30, resume_network_timeout_secs=60):
    """Performs the suspend/resume on Cros device.

    @param suspend_time_secs: Time in seconds to let Cros device suspend.
    @resume_network_timeout_secs: Time in seconds to let Cros device resume and
                                  obtain network.
    """

    def action_suspend():
        """Calls the host method suspend."""
        host.suspend(suspend_time=suspend_time_secs)

    boot_id = host.get_boot_id()
    proc = multiprocessing.Process(target=action_suspend)
    logging.info("Suspending...")
    proc.daemon = True
    proc.start()
    host.test_wait_for_sleep(suspend_time_secs)
    logging.info("DUT suspended! Waiting to resume...")
    host.test_wait_for_resume(boot_id,
                              suspend_time_secs + resume_network_timeout_secs)
    logging.info("DUT resumed!")


def suspend_resume_and_verify(host,
                              factory,
                              suspend_time_secs=30,
                              resume_network_timeout_secs=50,
                              rpc_reconnect_timeout=60):
    """Performs the suspend/resume on Cros device and verify it.

    @param suspend_time_secs: Time in seconds to let Cros device suspend.
    @resume_network_timeout_secs: Time in seconds to let Cros device resume and
                                  obtain network.
    @rpc_reconnect_timeout=60: Time in seconds to wait for multimedia server to
                               reconnect.
    """

    suspend_resume(host,
                   suspend_time_secs=suspend_time_secs,
                   resume_network_timeout_secs=resume_network_timeout_secs)
    utils.poll_for_condition(condition=factory.ready,
                             timeout=rpc_reconnect_timeout,
                             desc='multimedia server reconnect')


def dump_cros_audio_logs(host,
                         audio_facade,
                         directory,
                         suffix='',
                         fail_if_warnings=False):
    """Dumps logs for audio debugging from Cros device.

    @param host: The CrosHost object.
    @param audio_facade: A RemoteAudioFacade to access audio functions on
                         Cros device.
    @directory: The directory to dump logs.

    """

    def get_file_path(name):
        """Gets file path to dump logs.

        @param name: The file name.

        @returns: The file path with an optional suffix.

        """
        file_name = '%s.%s' % (name, suffix) if suffix else name
        file_path = os.path.join(directory, file_name)
        return file_path

    audio_facade.dump_diagnostics(get_file_path('audio_diagnostics.txt'))

    host.get_file('/var/log/messages', get_file_path('messages'))

    host.get_file(constants.MULTIMEDIA_XMLRPC_SERVER_LOG_FILE,
                  get_file_path('multimedia_xmlrpc_server.log'))

    # Raising error if any warning messages in the audio diagnostics
    if fail_if_warnings:
        audio_logs = examine_audio_diagnostics(
                get_file_path('audio_diagnostics.txt'))
        if audio_logs != '':
            raise error.TestFail(audio_logs)


def examine_audio_diagnostics(path):
    """Examines audio diagnostic content.

    @param path: Path to audio diagnostic file.

    @returns: Warning messages or ''.

    """
    warning_msgs = []
    line_number = 1

    underrun_pattern = re.compile('num_underruns: (\d*)')

    with open(path) as f:
        for line in f.readlines():

            # Check for number of underruns.
            search_result = underrun_pattern.search(line)
            if search_result:
                num_underruns = int(search_result.group(1))
                if num_underruns != 0:
                    warning_msgs.append('Found %d underrun at line %d: %s' %
                                        (num_underruns, line_number, line))

            # TODO(cychiang) add other check like maximum client reply delay.
            line_number = line_number + 1

    if warning_msgs:
        return ('Found issue in audio diganostics result : %s' %
                '\n'.join(warning_msgs))

    logging.info('audio_diagnostic result looks fine')
    return ''


@contextmanager
def monitor_no_nodes_changed(audio_facade, callback=None):
    """Context manager to monitor nodes changed signal on Cros device.

    Starts the counter in the beginning. Stops the counter in the end to make
    sure there is no NodesChanged signal during the try block.

    E.g. with monitor_no_nodes_changed(audio_facade):
             do something on playback/recording

    @param audio_facade: A RemoteAudioFacade to access audio functions on
                         Cros device.
    @param fail_callback: The callback to call before raising TestFail
                          when there is unexpected NodesChanged signals.

    @raises: error.TestFail if there is NodesChanged signal on
             Cros device during the context.

    """
    try:
        audio_facade.start_counting_signal('NodesChanged')
        yield
    finally:
        count = audio_facade.stop_counting_signal()
        if count:
            message = 'Got %d unexpected NodesChanged signal' % count
            logging.error(message)
            if callback:
                callback()
            raise error.TestFail(message)


# The second dominant frequency should have energy less than -26dB of the
# first dominant frequency in the spectrum.
_DEFAULT_SECOND_PEAK_RATIO = 0.05

# Tolerate more noise for bluetooth audio using HSP.
_HSP_SECOND_PEAK_RATIO = 0.2

# Tolerate more noise for speaker.
_SPEAKER_SECOND_PEAK_RATIO = 0.1

# Tolerate more noise for internal microphone.
_INTERNAL_MIC_SECOND_PEAK_RATIO = 0.2

# maximum tolerant noise level
DEFAULT_TOLERANT_NOISE_LEVEL = 0.01

# If relative error of two durations is less than 0.2,
# they will be considered equivalent.
DEFAULT_EQUIVALENT_THRESHOLD = 0.2

# The frequency at lower than _DC_FREQ_THRESHOLD should have coefficient
# smaller than _DC_COEFF_THRESHOLD.
_DC_FREQ_THRESHOLD = 0.001
_DC_COEFF_THRESHOLD = 0.01


def get_second_peak_ratio(source_id, recorder_id, is_hsp=False):
    """Gets the second peak ratio suitable for use case.

    @param source_id: ID defined in chameleon_audio_ids for source widget.
    @param recorder_id: ID defined in chameleon_audio_ids for recorder widget.
    @param is_hsp: For bluetooth HSP use case.

    @returns: A float for proper second peak ratio to be used in
              check_recorded_frequency.
    """
    if is_hsp:
        return _HSP_SECOND_PEAK_RATIO
    elif source_id == chameleon_audio_ids.CrosIds.SPEAKER:
        return _SPEAKER_SECOND_PEAK_RATIO
    elif recorder_id == chameleon_audio_ids.CrosIds.INTERNAL_MIC:
        return _INTERNAL_MIC_SECOND_PEAK_RATIO
    else:
        return _DEFAULT_SECOND_PEAK_RATIO


# The deviation of estimated dominant frequency from golden frequency.
DEFAULT_FREQUENCY_DIFF_THRESHOLD = 5


def check_recorded_frequency(
        golden_file,
        recorder,
        second_peak_ratio=_DEFAULT_SECOND_PEAK_RATIO,
        frequency_diff_threshold=DEFAULT_FREQUENCY_DIFF_THRESHOLD,
        ignore_frequencies=None,
        check_anomaly=False,
        check_artifacts=False,
        mute_durations=None,
        volume_changes=None,
        tolerant_noise_level=DEFAULT_TOLERANT_NOISE_LEVEL):
    """Checks if the recorded data contains sine tone of golden frequency.

    @param golden_file: An AudioTestData object that serves as golden data.
    @param recorder: An AudioWidget used in the test to record data.
    @param second_peak_ratio: The test fails when the second dominant
                              frequency has coefficient larger than this
                              ratio of the coefficient of first dominant
                              frequency.
    @param frequency_diff_threshold: The maximum difference between estimated
                                     frequency of test signal and golden
                                     frequency. This value should be small for
                                     signal passed through line.
    @param ignore_frequencies: A list of frequencies to be ignored. The
                               component in the spectral with frequency too
                               close to the frequency in the list will be
                               ignored. The comparison of frequencies uses
                               frequency_diff_threshold as well.
    @param check_anomaly: True to check anomaly in the signal.
    @param check_artifacts: True to check artifacts in the signal.
    @param mute_durations: Each duration of mute in seconds in the signal.
    @param volume_changes: A list containing alternative -1 for decreasing
                           volume and +1 for increasing volume.
    @param tolerant_noise_level: The maximum noise level can be tolerated

    @returns: A list containing tuples of (dominant_frequency, coefficient) for
              valid channels. Coefficient can be a measure of signal magnitude
              on that dominant frequency. Invalid channels where golden_channel
              is None are ignored.

    @raises error.TestFail if the recorded data does not contain sine tone of
            golden frequency.

    """
    if not ignore_frequencies:
        ignore_frequencies = []

    # Also ignore harmonics of ignore frequencies.
    ignore_frequencies_harmonics = []
    for ignore_freq in ignore_frequencies:
        ignore_frequencies_harmonics += [ignore_freq * n for n in range(1, 4)]

    data_format = recorder.data_format
    recorded_data = audio_data.AudioRawData(
            binary=recorder.get_binary(),
            channel=data_format['channel'],
            sample_format=data_format['sample_format'])

    errors = []
    dominant_spectrals = []

    for test_channel, golden_channel in enumerate(recorder.channel_map):
        if golden_channel is None:
            logging.info('Skipped channel %d', test_channel)
            continue

        signal = recorded_data.channel_data[test_channel]
        saturate_value = audio_data.get_maximum_value_from_sample_format(
                data_format['sample_format'])
        logging.debug('Channel %d max signal: %f', test_channel, max(signal))
        normalized_signal = audio_analysis.normalize_signal(
                signal, saturate_value)
        logging.debug('saturate_value: %f', saturate_value)
        logging.debug('max signal after normalized: %f',
                      max(normalized_signal))
        spectral = audio_analysis.spectral_analysis(normalized_signal,
                                                    data_format['rate'])
        logging.debug('spectral: %s', spectral)

        if not spectral:
            errors.append('Channel %d: Can not find dominant frequency.' %
                          test_channel)

        golden_frequency = golden_file.frequencies[golden_channel]
        logging.debug('Checking channel %s spectral %s against frequency %s',
                      test_channel, spectral, golden_frequency)

        dominant_frequency = spectral[0][0]

        if (abs(dominant_frequency - golden_frequency) >
                    frequency_diff_threshold):
            errors.append(
                    'Channel %d: Dominant frequency %s is away from golden %s'
                    % (test_channel, dominant_frequency, golden_frequency))

        if check_anomaly:
            detected_anomaly = audio_analysis.anomaly_detection(
                    signal=normalized_signal,
                    rate=data_format['rate'],
                    freq=golden_frequency)
            if detected_anomaly:
                errors.append('Channel %d: Detect anomaly near these time: %s'
                              % (test_channel, detected_anomaly))
            else:
                logging.info(
                        'Channel %d: Quality is good as there is no anomaly',
                        test_channel)

        if check_artifacts or mute_durations or volume_changes:
            result = audio_quality_measurement.quality_measurement(
                    normalized_signal,
                    data_format['rate'],
                    dominant_frequency=dominant_frequency)
            logging.debug('Quality measurement result:\n%s',
                          pprint.pformat(result))
            if check_artifacts:
                if len(result['artifacts']['noise_before_playback']) > 0:
                    errors.append(
                            'Channel %d: Detects artifacts before playing near'
                            ' these time and duration: %s' %
                            (test_channel,
                             str(result['artifacts']['noise_before_playback']))
                    )

                if len(result['artifacts']['noise_after_playback']) > 0:
                    errors.append(
                            'Channel %d: Detects artifacts after playing near'
                            ' these time and duration: %s' %
                            (test_channel,
                             str(result['artifacts']['noise_after_playback'])))

            if mute_durations:
                delays = result['artifacts']['delay_during_playback']
                delay_durations = []
                for x in delays:
                    delay_durations.append(x[1])
                mute_matched, delay_matched = longest_common_subsequence(
                        mute_durations, delay_durations,
                        DEFAULT_EQUIVALENT_THRESHOLD)

                # updated delay list
                new_delays = [
                        delays[i] for i in delay_matched
                        if not delay_matched[i]
                ]

                result['artifacts']['delay_during_playback'] = new_delays

                unmatched_mutes = [
                        mute_durations[i] for i in mute_matched
                        if not mute_matched[i]
                ]

                if len(unmatched_mutes) > 0:
                    errors.append('Channel %d: Unmatched mute duration: %s' %
                                  (test_channel, unmatched_mutes))

            if check_artifacts:
                if len(result['artifacts']['delay_during_playback']) > 0:
                    errors.append(
                            'Channel %d: Detects delay during playing near'
                            ' these time and duration: %s' %
                            (test_channel,
                             result['artifacts']['delay_during_playback']))

                if len(result['artifacts']['burst_during_playback']) > 0:
                    errors.append(
                            'Channel %d: Detects burst/pop near these time: %s'
                            % (test_channel,
                               result['artifacts']['burst_during_playback']))

                if result['equivalent_noise_level'] > tolerant_noise_level:
                    errors.append(
                            'Channel %d: noise level is higher than tolerant'
                            ' noise level: %f > %f' %
                            (test_channel, result['equivalent_noise_level'],
                             tolerant_noise_level))

            if volume_changes:
                matched = True
                volume_changing = result['volume_changes']
                if len(volume_changing) != len(volume_changes):
                    matched = False
                else:
                    for i in range(len(volume_changing)):
                        if volume_changing[i][1] != volume_changes[i]:
                            matched = False
                            break
                if not matched:
                    errors.append(
                            'Channel %d: volume changing is not as expected, '
                            'found changing time and events are: %s while '
                            'expected changing events are %s' %
                            (test_channel, volume_changing, volume_changes))

        # Filter out the harmonics resulted from imperfect sin wave.
        # This list is different for different channels.
        harmonics = [dominant_frequency * n for n in range(2, 10)]

        def should_be_ignored(frequency):
            """Checks if frequency is close to any frequency in ignore list.

            The ignore list is harmonics of frequency to be ignored
            (like power noise), plus harmonics of dominant frequencies,
            plus DC.

            @param frequency: The frequency to be tested.

            @returns: True if the frequency should be ignored. False otherwise.

            """
            for ignore_frequency in (
                    ignore_frequencies_harmonics + harmonics + [0.0]):
                if (abs(frequency - ignore_frequency) <
                            frequency_diff_threshold):
                    logging.debug('Ignore frequency: %s', frequency)
                    return True

        # Checks DC is small enough.
        for freq, coeff in spectral:
            if freq < _DC_FREQ_THRESHOLD and coeff > _DC_COEFF_THRESHOLD:
                errors.append('Channel %d: Found large DC coefficient: '
                              '(%f Hz, %f)' % (test_channel, freq, coeff))

        # Filter out the frequencies to be ignored.
        spectral_post_ignore = [
                x for x in spectral if not should_be_ignored(x[0])
        ]

        if len(spectral_post_ignore) > 1:
            first_coeff = spectral_post_ignore[0][1]
            second_coeff = spectral_post_ignore[1][1]
            if second_coeff > first_coeff * second_peak_ratio:
                errors.append(
                        'Channel %d: Found large second dominant frequencies: '
                        '%s' % (test_channel, spectral_post_ignore))

        if not spectral_post_ignore:
            errors.append(
                    'Channel %d: No frequency left after removing unwanted '
                    'frequencies. Spectral: %s; After removing unwanted '
                    'frequencies: %s' % (test_channel, spectral,
                                         spectral_post_ignore))

        else:
            dominant_spectrals.append(spectral_post_ignore[0])

    if errors:
        raise error.TestFail(', '.join(errors))

    return dominant_spectrals


def longest_common_subsequence(list1, list2, equivalent_threshold):
    """Finds longest common subsequence of list1 and list2

    Such as list1: [0.3, 0.4],
            list2: [0.001, 0.299, 0.002, 0.401, 0.001]
            equivalent_threshold: 0.001
    it will return matched1: [True, True],
                   matched2: [False, True, False, True, False]

    @param list1: a list of integer or float value
    @param list2: a list of integer or float value
    @param equivalent_threshold: two values are considered equivalent if their
                                 relative error is less than
                                 equivalent_threshold.

    @returns: a tuple of list (matched_1, matched_2) indicating each item
              of list1 and list2 are matched or not.

    """
    length1, length2 = len(list1), len(list2)
    matching = [[0] * (length2 + 1)] * (length1 + 1)
    # matching[i][j] is the maximum number of matched pairs for first i items
    # in list1 and first j items in list2.
    for i in range(length1):
        for j in range(length2):
            # Maximum matched pairs may be obtained without
            # i-th item in list1 or without j-th item in list2
            matching[i + 1][j + 1] = max(matching[i + 1][j],
                                         matching[i][j + 1])
            diff = abs(list1[i] - list2[j])
            relative_error = diff / list1[i]
            # If i-th item in list1 can be matched to j-th item in list2
            if relative_error < equivalent_threshold:
                matching[i + 1][j + 1] = matching[i][j] + 1

    # Backtracking which item in list1 and list2 are matched
    matched1 = [False] * length1
    matched2 = [False] * length2
    i, j = length1, length2
    while i > 0 and j > 0:
        # Maximum number is obtained by matching i-th item in list1
        # and j-th one in list2.
        if matching[i][j] == matching[i - 1][j - 1] + 1:
            matched1[i - 1] = True
            matched2[j - 1] = True
            i, j = i - 1, j - 1
        elif matching[i][j] == matching[i - 1][j]:
            i -= 1
        else:
            j -= 1
    return (matched1, matched2)


def switch_to_hsp(audio_facade):
    """Switches to HSP profile.

    Selects bluetooth microphone and runs a recording process on Cros device.
    This triggers bluetooth profile be switched from A2DP to HSP.
    Note the user can call stop_recording on audio facade to stop the recording
    process, or let multimedia_xmlrpc_server terminates it in its cleanup.

    """
    audio_facade.set_chrome_active_node_type(None, 'BLUETOOTH')
    check_audio_nodes(audio_facade, (None, ['BLUETOOTH']))
    audio_facade.start_recording(
            dict(file_type='raw',
                 sample_format='S16_LE',
                 channel=2,
                 rate=48000))


def compare_recorded_correlation(golden_file, recorder, parameters=None):
    """Checks recorded audio in an AudioInputWidget against a golden file.

    Compares recorded data with golden data by cross correlation method.
    Refer to audio_helper.compare_data for details of comparison.

    @param golden_file: An AudioTestData object that serves as golden data.
    @param recorder: An AudioInputWidget that has recorded some audio data.
    @param parameters: A dict containing parameters for method.

    """
    logging.info('Comparing recorded data with golden file %s ...',
                 golden_file.path)
    audio_helper.compare_data_correlation(
            golden_file.get_binary(), golden_file.data_format,
            recorder.get_binary(), recorder.data_format, recorder.channel_map,
            parameters)


def check_and_set_chrome_active_node_types(audio_facade,
                                           output_type=None,
                                           input_type=None):
    """Check the target types are available, and set them to be active nodes.

    @param audio_facade: An AudioFacadeNative or AudioFacadeAdapter object.
    @output_type: An output node type defined in cras_utils.CRAS_NODE_TYPES.
                 None to skip.
    @input_type: An input node type defined in cras_utils.CRAS_NODE_TYPES.
                 None to skip.

    @raises: error.TestError if the expected node type is missing. We use
             error.TestError here because usually this step is not the main
             purpose of the test, but a setup step.

    """
    output_types, input_types = audio_facade.get_plugged_node_types()
    if output_type and output_type not in output_types:
        raise error.TestError('Target output type %s not present in %r' %
                              (output_type, output_types))
    if input_type and input_type not in input_types:
        raise error.TestError('Target input type %s not present in %r' %
                              (input_type, input_types))
    audio_facade.set_chrome_active_node_type(output_type, input_type)


def check_hp_or_lineout_plugged(audio_facade):
    """Checks whether line-out or headphone is plugged.

    @param audio_facade: A RemoteAudioFacade to access audio functions on
                         Cros device.

    @returns: 'LINEOUT' if line-out node is plugged.
              'HEADPHONE' if headphone node is plugged.

    @raises: error.TestFail if the plugged nodes does not contain one of
             'LINEOUT' and 'HEADPHONE'.

    """
    # Checks whether line-out or headphone is detected.
    output_nodes, _ = audio_facade.get_plugged_node_types()
    if 'LINEOUT' in output_nodes:
        return 'LINEOUT'
    if 'HEADPHONE' in output_nodes:
        return 'HEADPHONE'
    raise error.TestFail(
            'No line-out or headphone in plugged nodes:%r.'
            'Please check the audio cable or jack plugger.' % output_nodes)


def get_internal_mic_node(host):
    """Return the expected internal microphone node.

    @param host: The CrosHost object.

    @returns: The name of the expected internal microphone nodes.
    """
    board_type = host.get_board_type()
    board = get_board_name(host)
    model = host.get_platform()
    sku = host.host_info_store.get().device_sku

    return audio_spec.get_internal_mic_node(board_type, board, model, sku)


def get_plugged_internal_mics(host):
    """Return a list of all the plugged internal microphone nodes.

    @param host: The CrosHost object.

    @returns: A list of all the plugged internal microphone nodes.
    """
    board_type = host.get_board_type()
    board = get_board_name(host)
    model = host.get_platform()
    sku = host.host_info_store.get().device_sku

    return audio_spec.get_plugged_internal_mics(board_type, board, model, sku)

def get_headphone_node(host):
    """Return the expected headphone node.

    @param host: The CrosHost object.

    @returns: The name of the expected headphone nodes.
    """
    return audio_spec.get_headphone_node(get_board_name(host))
