| # Copyright 2016 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """This module provides utilities to detect some artifacts and measure the |
| quality of audio.""" |
| |
| import logging |
| import math |
| import numpy |
| |
| # Normal autotest environment. |
| try: |
| import common |
| from autotest_lib.client.cros.audio import audio_analysis |
| # Standalone execution without autotest environment. |
| except ImportError: |
| import audio_analysis |
| |
| |
| # The input signal should be one sine wave with fixed frequency which |
| # can have silence before and/or after sine wave. |
| # For example: |
| # silence sine wave silence |
| # -----------|VVVVVVVVVVVVV|----------- |
| # (a) (b) (c) |
| # This module detects these artifacts: |
| # 1. Detect noise in (a) and (c). |
| # 2. Detect delay in (b). |
| # 3. Detect burst in (b). |
| # Assume the transitions between (a)(b) and (b)(c) are smooth and |
| # amplitude increases/decreases linearly. |
| # This module will detect artifacts in the sine wave. |
| # This module also estimates the equivalent noise level by teager operator. |
| # This module also detects volume changes in the sine wave. However, volume |
| # changes may be affected by delay or burst. |
| # Some artifacts may cause each other. |
| |
| # In this module, amplitude and frequency are derived from Hilbert transform. |
| # Both amplitude and frequency are a function of time. |
| |
| # To detect each artifact, each point will be compared with |
| # average amplitude of its block. The block size will be 1.5 ms. |
| # Using average amplitude can mitigate the error caused by |
| # Hilbert transform and noise. |
| # In some case, for more accuracy, the block size may be modified |
| # to other values. |
| DEFAULT_BLOCK_SIZE_SECS = 0.0015 |
| |
| # If the difference between average frequency of this block and |
| # dominant frequency of full signal is less than 0.5 times of |
| # dominant frequency, this block is considered to be within the |
| # sine wave. In most cases, if there is no sine wave(only noise), |
| # average frequency will be much greater than 5 times of |
| # dominant frequency. |
| # Also, for delay during playback, the frequency will be about 0 |
| # in perfect situation or much greater than 5 times of dominant |
| # frequency if it's noised. |
| DEFAULT_FREQUENCY_ERROR = 0.5 |
| |
| # If the amplitude of some sample is less than 0.6 times of the |
| # average amplitude of its left/right block, it will be considered |
| # as a delay during playing. |
| DEFAULT_DELAY_AMPLITUDE_THRESHOLD = 0.6 |
| |
| # If the average amplitude of the block before or after playing |
| # is more than 0.5 times to the average amplitude of the wave, |
| # it will be considered as a noise artifact. |
| DEFAULT_NOISE_AMPLITUDE_THRESHOLD = 0.5 |
| |
| # In the sine wave, if the amplitude is more than 1.4 times of |
| # its left side and its right side, it will be considered as |
| # a burst. |
| DEFAULT_BURST_AMPLITUDE_THRESHOLD = 1.4 |
| |
| # When detecting burst, if the amplitude is lower than 0.5 times |
| # average amplitude, we ignore it. |
| DEFAULT_BURST_TOO_SMALL = 0.5 |
| |
| # For a signal which is the combination of sine wave with fixed frequency f and |
| # amplitude 1 and standard noise with amplitude k, the average teager value is |
| # nearly linear to the noise level k. |
| # Given frequency f, we simulate a sine wave with default noise level and |
| # calculate its average teager value. Then, we can estimate the equivalent |
| # noise level of input signal by the average teager value of input signal. |
| DEFAULT_STANDARD_NOISE = 0.005 |
| |
| # For delay, burst, volume increasing/decreasing, if two delay( |
| # burst, volume increasing/decreasing) happen within |
| # DEFAULT_SAME_EVENT_SECS seconds, we consider they are the |
| # same event. |
| DEFAULT_SAME_EVENT_SECS = 0.001 |
| |
| # When detecting increasing/decreasing volume of signal, if the amplitude |
| # is lower than 0.1 times average amplitude, we ignore it. |
| DEFAULT_VOLUME_CHANGE_TOO_SMALL = 0.1 |
| |
| # If average amplitude of right block is less/more than average |
| # amplitude of left block times DEFAULT_VOLUME_CHANGE_AMPLITUDE, it will be |
| # considered as decreasing/increasing on volume. |
| DEFAULT_VOLUME_CHANGE_AMPLITUDE = 0.1 |
| |
| # If the increasing/decreasing volume event is too close to the start or the end |
| # of sine wave, we consider its volume change as part of rising/falling phase in |
| # the start/end. |
| NEAR_START_OR_END_SECS = 0.01 |
| |
| # After applying Hilbert transform, the resulting amplitude and frequency may be |
| # extremely large in the start and/or the end part. Thus, we will append zeros |
| # before and after the whole wave for 0.1 secs. |
| APPEND_ZEROS_SECS = 0.1 |
| |
| # If the noise event is too close to the start or the end of the data, we |
| # consider its noise as part of artifacts caused by edge effect of Hilbert |
| # transform. |
| # For example, originally, the data duration is 10 seconds. |
| # We append 0.1 seconds of zeros in the beginning and the end of the data, so |
| # the data becomes 10.2 seocnds long. |
| # Then, we apply Hilbert transform to 10.2 seconds of data. |
| # Near 0.1 seconds and 10.1 seconds, there will be edge effect of Hilbert |
| # transform. We do not want these be treated as noise. |
| # If NEAR_DATA_START_OR_END_SECS is set to 0.01, then the noise happened |
| # at [0, 0.11] and [10.09, 10.1] will be ignored. |
| NEAR_DATA_START_OR_END_SECS = 0.01 |
| |
| # If the noise event is too close to the start or the end of the sine wave in |
| # the data, we consider its noise as part of artifacts caused by edge effect of |
| # Hilbert transform. |
| # A |-------------|vvvvvvvvvvvvvvvvvvvvvvv|-------------| |
| # B |ooooooooo| d | | d |ooooooooo| |
| # |
| # A is full signal. It contains a sine wave and silence before and after sine |
| # wave. |
| # In B, |oooo| shows the parts that we are going to check for noise before/after |
| # sine wave. | d | is determined by NEAR_SINE_START_OR_END_SECS. |
| NEAR_SINE_START_OR_END_SECS = 0.01 |
| |
| |
| class SineWaveNotFound(Exception): |
| """Error when there's no sine wave found in the signal""" |
| pass |
| |
| |
| def hilbert(x): |
| """Hilbert transform copied from scipy. |
| |
| More information can be found here: |
| http://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.hilbert.html |
| |
| @param x: Real signal data to transform. |
| |
| @returns: Analytic signal of x, we can further extract amplitude and |
| frequency from it. |
| |
| """ |
| x = numpy.asarray(x) |
| if numpy.iscomplexobj(x): |
| raise ValueError("x must be real.") |
| axis = -1 |
| N = x.shape[axis] |
| if N <= 0: |
| raise ValueError("N must be positive.") |
| |
| Xf = numpy.fft.fft(x, N, axis=axis) |
| h = numpy.zeros(N) |
| if N % 2 == 0: |
| h[0] = h[N // 2] = 1 |
| h[1:N // 2] = 2 |
| else: |
| h[0] = 1 |
| h[1:(N + 1) // 2] = 2 |
| |
| if len(x.shape) > 1: |
| ind = [newaxis] * x.ndim |
| ind[axis] = slice(None) |
| h = h[ind] |
| x = numpy.fft.ifft(Xf * h, axis=axis) |
| return x |
| |
| |
| def noised_sine_wave(frequency, rate, noise_level): |
| """Generates a sine wave of 2 second with specified noise level. |
| |
| @param frequency: Frequency of sine wave. |
| @param rate: Sampling rate. |
| @param noise_level: Required noise level. |
| |
| @returns: A sine wave with specified noise level. |
| |
| """ |
| wave = [] |
| for index in xrange(0, rate * 2): |
| sample = 2.0 * math.pi * frequency * float(index) / float(rate) |
| sine_wave = math.sin(sample) |
| noise = noise_level * numpy.random.standard_normal() |
| wave.append(sine_wave + noise) |
| return wave |
| |
| |
| def average_teager_value(wave, amplitude): |
| """Computes the normalized average teager value. |
| |
| After averaging the teager value, we will normalize the value by |
| dividing square of amplitude. |
| |
| @param wave: Wave to apply teager operator. |
| @param amplitude: Average amplitude of given wave. |
| |
| @returns: Average teager value. |
| |
| """ |
| teager_value, length = 0, len(wave) |
| for i in range(1, length - 1): |
| ith_teager_value = abs(wave[i] * wave[i] - wave[i - 1] * wave[i + 1]) |
| ith_teager_value *= max(1, abs(wave[i])) |
| teager_value += ith_teager_value |
| teager_value = (float(teager_value) / length) / (amplitude ** 2) |
| return teager_value |
| |
| |
| def noise_level(amplitude, frequency, rate, teager_value_of_input): |
| """Computes the noise level compared with standard_noise. |
| |
| For a signal which is the combination of sine wave with fixed frequency f |
| and amplitude 1 and standard noise with amplitude k, the average teager |
| value is nearly linear to the noise level k. |
| Thus, we can compute the average teager value of a sine wave with |
| standard_noise. Then, we can estimate the noise level of given input. |
| |
| @param amplitude: Amplitude of input audio. |
| @param frequency: Dominant frequency of input audio. |
| @param rate: Sampling rate. |
| @param teager_value_of_input: Average teager value of input audio. |
| |
| @returns: A float value denotes the audio is equivalent to have |
| how many times of noise compared with its amplitude. |
| For example, 0.02 denotes that the wave has a noise which |
| has standard distribution with standard deviation being |
| 0.02 times the amplitude of the wave. |
| |
| """ |
| standard_noise = DEFAULT_STANDARD_NOISE |
| |
| # Generates the standard sine wave with stdandard_noise level of noise. |
| standard_wave = noised_sine_wave(frequency, rate, standard_noise) |
| |
| # Calculates the average teager value. |
| teager_value_of_std_wave = average_teager_value(standard_wave, amplitude) |
| |
| return (teager_value_of_input / teager_value_of_std_wave) * standard_noise |
| |
| |
| def error(f1, f2): |
| """Calculates the relative error between f1 and f2. |
| |
| @param f1: Exact value. |
| @param f2: Test value. |
| |
| @returns: Relative error between f1 and f2. |
| |
| """ |
| return abs(float(f1) - float(f2)) / float(f1) |
| |
| |
| def hilbert_analysis(signal, rate, block_size): |
| """Finds amplitude and frequency of each time of signal by Hilbert transform. |
| |
| @param signal: The wave to analyze. |
| @param rate: Sampling rate. |
| @param block_size: The size of block to transform. |
| |
| @returns: A tuple of list: (amplitude, frequency) composed of |
| amplitude and frequency of each time. |
| |
| """ |
| # To apply Hilbert transform, the wave will be transformed |
| # segment by segment. For each segment, its size will be |
| # block_size and we will only take middle part of it. |
| # Thus, each segment looks like: |-----|=====|=====|-----|. |
| # "=...=" part will be taken while "-...-" part will be ignored. |
| # |
| # The whole size of taken part will be half of block_size |
| # which will be hilbert_block. |
| # The size of each ignored part will be half of hilbert_block |
| # which will be half_hilbert_block. |
| hilbert_block = block_size // 2 |
| half_hilbert_block = hilbert_block // 2 |
| # As mentioned above, for each block, we will only take middle |
| # part of it. Thus, the whole transformation will be completed as: |
| # |=====|=====|-----| |-----|=====|=====|-----| |
| # |-----|=====|=====|-----| |-----|=====|=====| |
| # |-----|=====|=====|-----| |
| # Specially, beginning and ending part may not have ignored part. |
| length = len(signal) |
| result = [] |
| for left_border in xrange(0, length, hilbert_block): |
| right_border = min(length, left_border + hilbert_block) |
| temp_left_border = max(0, left_border - half_hilbert_block) |
| temp_right_border = min(length, right_border + half_hilbert_block) |
| temp = hilbert(signal[temp_left_border:temp_right_border]) |
| for index in xrange(left_border, right_border): |
| result.append(temp[index - temp_left_border]) |
| result = numpy.asarray(result) |
| amplitude = numpy.abs(result) |
| phase = numpy.unwrap(numpy.angle(result)) |
| frequency = numpy.diff(phase) / (2.0 * numpy.pi) * rate |
| #frequency.append(frequency[len(frequency)-1]) |
| frequecny = numpy.append(frequency, frequency[len(frequency) - 1]) |
| return (amplitude, frequency) |
| |
| |
| def find_block_average_value(arr, side_block_size, block_size): |
| """For each index, finds average value of its block, left block, right block. |
| |
| It will find average value for each index in the range. |
| |
| For each index, the range of its block is |
| [max(0, index - block_size / 2), min(length - 1, index + block_size / 2)] |
| For each index, the range of its left block is |
| [max(0, index - size_block_size), index] |
| For each index, the range of its right block is |
| [index, min(length - 1, index + side_block_size)] |
| |
| @param arr: The array to be computed. |
| @param side_block_size: the size of the left_block and right_block. |
| @param block_size: the size of the block. |
| |
| @returns: A tuple of lists: (left_block_average_array, |
| right_block_average_array, |
| block_average_array) |
| """ |
| length = len(arr) |
| left_border, right_border = 0, 1 |
| left_block_sum = arr[0] |
| right_block_sum = arr[0] |
| left_average_array = numpy.zeros(length) |
| right_average_array = numpy.zeros(length) |
| block_average_array = numpy.zeros(length) |
| for index in xrange(0, length): |
| while left_border < index - side_block_size: |
| left_block_sum -= arr[left_border] |
| left_border += 1 |
| while right_border < min(length, index + side_block_size): |
| right_block_sum += arr[right_border] |
| right_border += 1 |
| |
| left_average_value = float(left_block_sum) / (index - left_border + 1) |
| right_average_value = float(right_block_sum) / (right_border - index) |
| left_average_array[index] = left_average_value |
| right_average_array[index] = right_average_value |
| |
| if index + 1 < length: |
| left_block_sum += arr[index + 1] |
| right_block_sum -= arr[index] |
| left_border, right_border = 0, 1 |
| block_sum = 0 |
| for index in xrange(0, length): |
| while left_border < index - block_size / 2: |
| block_sum -= arr[left_border] |
| left_border += 1 |
| while right_border < min(length, index + block_size / 2): |
| block_sum += arr[right_border] |
| right_border += 1 |
| |
| average_value = float(block_sum) / (right_border - left_border) |
| block_average_array[index] = average_value |
| return (left_average_array, right_average_array, block_average_array) |
| |
| |
| def find_start_end_index(dominant_frequency, |
| block_frequency_delta, |
| block_size, |
| frequency_error_threshold): |
| """Finds start and end index of sine wave. |
| |
| For each block with size of block_size, we check that whether its frequency |
| is close enough to the dominant_frequency. If yes, we will consider this |
| block to be within the sine wave. |
| Then, it will return the start and end index of sine wave indicating that |
| sine wave is between [start_index, end_index) |
| It's okay if the whole signal only contains sine wave. |
| |
| @param dominant_frequency: Dominant frequency of signal. |
| @param block_frequency_delta: Average absolute difference between dominant |
| frequency and frequency of each block. For |
| each index, its block is |
| [max(0, index - block_size / 2), |
| min(length - 1, index + block_size / 2)] |
| @param block_size: Block size in samples. |
| |
| @returns: A tuple composed of (start_index, end_index) |
| |
| """ |
| length = len(block_frequency_delta) |
| |
| # Finds the start/end time index of playing based on dominant frequency |
| start_index, end_index = length - 1 , 0 |
| for index in xrange(0, length): |
| left_border = max(0, index - block_size / 2) |
| right_border = min(length - 1, index + block_size / 2) |
| frequency_error = block_frequency_delta[index] / dominant_frequency |
| if frequency_error < frequency_error_threshold: |
| start_index = min(start_index, left_border) |
| end_index = max(end_index, right_border + 1) |
| return (start_index, end_index) |
| |
| |
| def noise_detection(start_index, end_index, |
| block_amplitude, average_amplitude, |
| rate, noise_amplitude_threshold): |
| """Detects noise before/after sine wave. |
| |
| If average amplitude of some sample's block before start of wave or after |
| end of wave is more than average_amplitude times noise_amplitude_threshold, |
| it will be considered as a noise. |
| |
| @param start_index: Start index of sine wave. |
| @param end_index: End index of sine wave. |
| @param block_amplitude: An array for average amplitude of each block, where |
| amplitude is computed from Hilbert transform. |
| @param average_amplitude: Average amplitude of sine wave. |
| @param rate: Sampling rate |
| @param noise_amplitude_threshold: If the average amplitude of a block is |
| higher than average amplitude of the wave times |
| noise_amplitude_threshold, it will be considered as |
| noise before/after playback. |
| |
| @returns: A tuple of lists indicating the time that noise happens: |
| (noise_before_playing, noise_after_playing). |
| |
| """ |
| length = len(block_amplitude) |
| amplitude_threshold = average_amplitude * noise_amplitude_threshold |
| same_event_samples = rate * DEFAULT_SAME_EVENT_SECS |
| |
| # Detects noise before playing. |
| noise_time_point = [] |
| last_noise_end_time_point = [] |
| previous_noise_index = None |
| times = 0 |
| for index in xrange(0, length): |
| # Ignore noise too close to the beginning or the end of sine wave. |
| # Check the docstring of NEAR_SINE_START_OR_END_SECS. |
| if ((start_index - rate * NEAR_SINE_START_OR_END_SECS) <= index and |
| (index < end_index + rate * NEAR_SINE_START_OR_END_SECS)): |
| continue |
| |
| # Ignore noise too close to the beginning or the end of original data. |
| # Check the docstring of NEAR_DATA_START_OR_END_SECS. |
| if (float(index) / rate <= |
| NEAR_DATA_START_OR_END_SECS + APPEND_ZEROS_SECS): |
| continue |
| if (float(length - index) / rate <= |
| NEAR_DATA_START_OR_END_SECS + APPEND_ZEROS_SECS): |
| continue |
| if block_amplitude[index] > amplitude_threshold: |
| same_event = False |
| if previous_noise_index: |
| same_event = (index - previous_noise_index) < same_event_samples |
| if not same_event: |
| index_start_sec = float(index) / rate - APPEND_ZEROS_SECS |
| index_end_sec = float(index + 1) / rate - APPEND_ZEROS_SECS |
| noise_time_point.append(index_start_sec) |
| last_noise_end_time_point.append(index_end_sec) |
| times += 1 |
| index_end_sec = float(index + 1) / rate - APPEND_ZEROS_SECS |
| last_noise_end_time_point[times - 1] = index_end_sec |
| previous_noise_index = index |
| |
| noise_before_playing, noise_after_playing = [], [] |
| for i in xrange(times): |
| duration = last_noise_end_time_point[i] - noise_time_point[i] |
| if noise_time_point[i] < float(start_index) / rate - APPEND_ZEROS_SECS: |
| noise_before_playing.append((noise_time_point[i], duration)) |
| else: |
| noise_after_playing.append((noise_time_point[i], duration)) |
| |
| return (noise_before_playing, noise_after_playing) |
| |
| |
| def delay_detection(start_index, end_index, |
| block_amplitude, average_amplitude, |
| dominant_frequency, |
| rate, |
| left_block_amplitude, |
| right_block_amplitude, |
| block_frequency_delta, |
| delay_amplitude_threshold, |
| frequency_error_threshold): |
| """Detects delay during playing. |
| |
| For each sample, we will check whether the average amplitude of its block |
| is less than average amplitude of its left block and its right block times |
| delay_amplitude_threshold. Also, we will check whether the frequency of |
| its block is far from the dominant frequency. |
| If at least one constraint fulfilled, it will be considered as a delay. |
| |
| @param start_index: Start index of sine wave. |
| @param end_index: End index of sine wave. |
| @param block_amplitude: An array for average amplitude of each block, where |
| amplitude is computed from Hilbert transform. |
| @param average_amplitude: Average amplitude of sine wave. |
| @param dominant_frequency: Dominant frequency of signal. |
| @param rate: Sampling rate |
| @param left_block_amplitude: Average amplitude of left block of each index. |
| Ref to find_block_average_value function. |
| @param right_block_amplitude: Average amplitude of right block of each index. |
| Ref to find_block_average_value function. |
| @param block_frequency_delta: Average absolute difference frequency to |
| dominant frequency of block of each index. |
| Ref to find_block_average_value function. |
| @param delay_amplitude_threshold: If the average amplitude of a block is |
| lower than average amplitude of the wave times |
| delay_amplitude_threshold, it will be considered |
| as delay. |
| @param frequency_error_threshold: Ref to DEFAULT_FREQUENCY_ERROR |
| |
| @returns: List of delay occurrence: |
| [(time_1, duration_1), (time_2, duration_2), ...], |
| where time and duration are in seconds. |
| |
| """ |
| delay_time_points = [] |
| last_delay_end_time_points = [] |
| previous_delay_index = None |
| times = 0 |
| same_event_samples = rate * DEFAULT_SAME_EVENT_SECS |
| start_time = float(start_index) / rate - APPEND_ZEROS_SECS |
| end_time = float(end_index) / rate - APPEND_ZEROS_SECS |
| for index in xrange(start_index, end_index): |
| if block_amplitude[index] > average_amplitude * delay_amplitude_threshold: |
| continue |
| now_time = float(index) / rate - APPEND_ZEROS_SECS |
| if abs(now_time - start_time) < NEAR_START_OR_END_SECS: |
| continue |
| if abs(now_time - end_time) < NEAR_START_OR_END_SECS: |
| continue |
| # If amplitude less than its left/right side and small enough, |
| # it will be considered as a delay. |
| amp_threshold = average_amplitude * delay_amplitude_threshold |
| left_threshold = delay_amplitude_threshold * left_block_amplitude[index] |
| amp_threshold = min(amp_threshold, left_threshold) |
| right_threshold = delay_amplitude_threshold * right_block_amplitude[index] |
| amp_threshold = min(amp_threshold, right_threshold) |
| |
| frequency_error = block_frequency_delta[index] / dominant_frequency |
| |
| amplitude_too_small = block_amplitude[index] < amp_threshold |
| frequency_not_match = frequency_error > frequency_error_threshold |
| |
| if amplitude_too_small or frequency_not_match: |
| same_event = False |
| if previous_delay_index: |
| same_event = (index - previous_delay_index) < same_event_samples |
| if not same_event: |
| index_start_sec = float(index) / rate - APPEND_ZEROS_SECS |
| index_end_sec = float(index + 1) / rate - APPEND_ZEROS_SECS |
| delay_time_points.append(index_start_sec) |
| last_delay_end_time_points.append(index_end_sec) |
| times += 1 |
| previous_delay_index = index |
| index_end_sec = float(index + 1) / rate - APPEND_ZEROS_SECS |
| last_delay_end_time_points[times - 1] = index_end_sec |
| |
| delay_list = [] |
| for i in xrange(len(delay_time_points)): |
| duration = last_delay_end_time_points[i] - delay_time_points[i] |
| delay_list.append( (delay_time_points[i], duration) ) |
| return delay_list |
| |
| |
| def burst_detection(start_index, end_index, |
| block_amplitude, average_amplitude, |
| dominant_frequency, |
| rate, |
| left_block_amplitude, |
| right_block_amplitude, |
| block_frequency_delta, |
| burst_amplitude_threshold, |
| frequency_error_threshold): |
| """Detects burst during playing. |
| |
| For each sample, we will check whether the average amplitude of its block is |
| more than average amplitude of its left block and its right block times |
| burst_amplitude_threshold. Also, we will check whether the frequency of |
| its block is not compatible to the dominant frequency. |
| If at least one constraint fulfilled, it will be considered as a burst. |
| |
| @param start_index: Start index of sine wave. |
| @param end_index: End index of sine wave. |
| @param block_amplitude: An array for average amplitude of each block, where |
| amplitude is computed from Hilbert transform. |
| @param average_amplitude: Average amplitude of sine wave. |
| @param dominant_frequency: Dominant frequency of signal. |
| @param rate: Sampling rate |
| @param left_block_amplitude: Average amplitude of left block of each index. |
| Ref to find_block_average_value function. |
| @param right_block_amplitude: Average amplitude of right block of each index. |
| Ref to find_block_average_value function. |
| @param block_frequency_delta: Average absolute difference frequency to |
| dominant frequency of block of each index. |
| @param burst_amplitude_threshold: If the amplitude is higher than average |
| amplitude of its left block and its right block |
| times burst_amplitude_threshold. It will be |
| considered as a burst. |
| @param frequency_error_threshold: Ref to DEFAULT_FREQUENCY_ERROR |
| |
| @returns: List of burst occurence: [time_1, time_2, ...], |
| where time is in seconds. |
| |
| """ |
| burst_time_points = [] |
| previous_burst_index = None |
| same_event_samples = rate * DEFAULT_SAME_EVENT_SECS |
| for index in xrange(start_index, end_index): |
| # If amplitude higher than its left/right side and large enough, |
| # it will be considered as a burst. |
| if block_amplitude[index] <= average_amplitude * DEFAULT_BURST_TOO_SMALL: |
| continue |
| if abs(index - start_index) < rate * NEAR_START_OR_END_SECS: |
| continue |
| if abs(index - end_index) < rate * NEAR_START_OR_END_SECS: |
| continue |
| amp_threshold = average_amplitude * DEFAULT_BURST_TOO_SMALL |
| left_threshold = burst_amplitude_threshold * left_block_amplitude[index] |
| amp_threshold = max(amp_threshold, left_threshold) |
| right_threshold = burst_amplitude_threshold * right_block_amplitude[index] |
| amp_threshold = max(amp_threshold, right_threshold) |
| |
| frequency_error = block_frequency_delta[index] / dominant_frequency |
| |
| amplitude_too_large = block_amplitude[index] > amp_threshold |
| frequency_not_match = frequency_error > frequency_error_threshold |
| |
| if amplitude_too_large or frequency_not_match: |
| same_event = False |
| if previous_burst_index: |
| same_event = index - previous_burst_index < same_event_samples |
| if not same_event: |
| burst_time_points.append(float(index) / rate - APPEND_ZEROS_SECS) |
| previous_burst_index = index |
| |
| return burst_time_points |
| |
| |
| def changing_volume_detection(start_index, end_index, |
| average_amplitude, |
| rate, |
| left_block_amplitude, |
| right_block_amplitude, |
| volume_changing_amplitude_threshold): |
| """Finds volume changing during playback. |
| |
| For each index, we will compare average amplitude of its left block and its |
| right block. If average amplitude of right block is more than average |
| amplitude of left block times (1 + DEFAULT_VOLUME_CHANGE_AMPLITUDE), it will |
| be considered as an increasing volume. If the one of right block is less |
| than that of left block times (1 - DEFAULT_VOLUME_CHANGE_AMPLITUDE), it will |
| be considered as a decreasing volume. |
| |
| @param start_index: Start index of sine wave. |
| @param end_index: End index of sine wave. |
| @param average_amplitude: Average amplitude of sine wave. |
| @param rate: Sampling rate |
| @param left_block_amplitude: Average amplitude of left block of each index. |
| Ref to find_block_average_value function. |
| @param right_block_amplitude: Average amplitude of right block of each index. |
| Ref to find_block_average_value function. |
| @param volume_changing_amplitude_threshold: If the average amplitude of right |
| block is higher or lower than |
| that of left one times this |
| value, it will be considered as |
| a volume change. |
| Also refer to |
| DEFAULT_VOLUME_CHANGE_AMPLITUDE |
| |
| @returns: List of volume changing composed of 1 for increasing and |
| -1 for decreasing. |
| |
| """ |
| length = len(left_block_amplitude) |
| |
| # Detects rising and/or falling volume. |
| previous_rising_index, previous_falling_index = None, None |
| changing_time = [] |
| changing_events = [] |
| amplitude_threshold = average_amplitude * DEFAULT_VOLUME_CHANGE_TOO_SMALL |
| same_event_samples = rate * DEFAULT_SAME_EVENT_SECS |
| for index in xrange(start_index, end_index): |
| # Skips if amplitude is too small. |
| if left_block_amplitude[index] < amplitude_threshold: |
| continue |
| if right_block_amplitude[index] < amplitude_threshold: |
| continue |
| # Skips if changing is from start or end time |
| if float(abs(start_index - index)) / rate < NEAR_START_OR_END_SECS: |
| continue |
| if float(abs(end_index - index)) / rate < NEAR_START_OR_END_SECS: |
| continue |
| |
| delta_margin = volume_changing_amplitude_threshold |
| if left_block_amplitude[index] > 0: |
| delta_margin *= left_block_amplitude[index] |
| |
| increasing_threshold = left_block_amplitude[index] + delta_margin |
| decreasing_threshold = left_block_amplitude[index] - delta_margin |
| |
| if right_block_amplitude[index] > increasing_threshold: |
| same_event = False |
| if previous_rising_index: |
| same_event = index - previous_rising_index < same_event_samples |
| if not same_event: |
| changing_time.append(float(index) / rate - APPEND_ZEROS_SECS) |
| changing_events.append(+1) |
| previous_rising_index = index |
| if right_block_amplitude[index] < decreasing_threshold: |
| same_event = False |
| if previous_falling_index: |
| same_event = index - previous_falling_index < same_event_samples |
| if not same_event: |
| changing_time.append(float(index) / rate - APPEND_ZEROS_SECS) |
| changing_events.append(-1) |
| previous_falling_index = index |
| |
| # Combines consecutive increasing/decreasing event. |
| combined_changing_events, prev = [], 0 |
| for i in xrange(len(changing_events)): |
| if changing_events[i] == prev: |
| continue |
| combined_changing_events.append((changing_time[i], changing_events[i])) |
| prev = changing_events[i] |
| return combined_changing_events |
| |
| |
| def quality_measurement( |
| signal, rate, |
| dominant_frequency=None, |
| block_size_secs=DEFAULT_BLOCK_SIZE_SECS, |
| frequency_error_threshold=DEFAULT_FREQUENCY_ERROR, |
| delay_amplitude_threshold=DEFAULT_DELAY_AMPLITUDE_THRESHOLD, |
| noise_amplitude_threshold=DEFAULT_NOISE_AMPLITUDE_THRESHOLD, |
| burst_amplitude_threshold=DEFAULT_BURST_AMPLITUDE_THRESHOLD, |
| volume_changing_amplitude_threshold=DEFAULT_VOLUME_CHANGE_AMPLITUDE): |
| """Detects several artifacts and estimates the noise level. |
| |
| This method detects artifact before playing, after playing, and delay |
| during playing. Also, it estimates the noise level of the signal. |
| To avoid the influence of noise, it calculates amplitude and frequency |
| block by block. |
| |
| @param signal: A list of numbers for one-channel PCM data. The data should |
| be normalized to [-1,1]. |
| @param rate: Sampling rate |
| @param dominant_frequency: Dominant frequency of signal. Set None to |
| recalculate the frequency in this function. |
| @param block_size_secs: Block size in seconds. The measurement will be done |
| block-by-block using average amplitude and frequency |
| in each block to avoid noise. |
| @param frequency_error_threshold: Ref to DEFAULT_FREQUENCY_ERROR. |
| @param delay_amplitude_threshold: If the average amplitude of a block is |
| lower than average amplitude of the wave |
| times delay_amplitude_threshold, it will |
| be considered as delay. |
| Also refer to delay_detection and |
| DEFAULT_DELAY_AMPLITUDE_THRESHOLD. |
| @param noise_amplitude_threshold: If the average amplitude of a block is |
| higher than average amplitude of the wave |
| times noise_amplitude_threshold, it will |
| be considered as noise before/after |
| playback. |
| Also refer to noise_detection and |
| DEFAULT_NOISE_AMPLITUDE_THRESHOLD. |
| @param burst_amplitude_threshold: If the average amplitude of a block is |
| higher than average amplitude of its left |
| block and its right block times |
| burst_amplitude_threshold. It will be |
| considered as a burst. |
| Also refer to burst_detection and |
| DEFAULT_BURST_AMPLITUDE_THRESHOLD. |
| @param volume_changing_amplitude_threshold: If the average amplitude of right |
| block is higher or lower than |
| that of left one times this |
| value, it will be considered as |
| a volume change. |
| Also refer to |
| changing_volume_detection and |
| DEFAULT_VOLUME_CHANGE_AMPLITUDE |
| |
| @returns: A dictoinary of detection/estimation: |
| {'artifacts': |
| {'noise_before_playback': |
| [(time_1, duration_1), (time_2, duration_2), ...], |
| 'noise_after_playback': |
| [(time_1, duration_1), (time_2, duration_2), ...], |
| 'delay_during_playback': |
| [(time_1, duration_1), (time_2, duration_2), ...], |
| 'burst_during_playback': |
| [time_1, time_2, ...] |
| }, |
| 'volume_changes': |
| [(time_1, flag_1), (time_2, flag_2), ...], |
| 'equivalent_noise_level': level |
| } |
| where durations and time points are in seconds. And, |
| equivalence_noise_level is the quotient of noise and |
| wave which refers to DEFAULT_STANDARD_NOISE. |
| volume_changes is a list of tuples containing time |
| stamps and decreasing/increasing flags for volume |
| change events. |
| |
| """ |
| # Calculates the block size, from seconds to samples. |
| block_size = int(block_size_secs * rate) |
| |
| signal = numpy.concatenate((numpy.zeros(int(rate * APPEND_ZEROS_SECS)), |
| signal, |
| numpy.zeros(int(rate * APPEND_ZEROS_SECS)))) |
| signal = numpy.array(signal, dtype=float) |
| length = len(signal) |
| |
| # Calculates the amplitude and frequency. |
| amplitude, frequency = hilbert_analysis(signal, rate, block_size) |
| |
| # Finds the dominant frequency. |
| if not dominant_frequency: |
| dominant_frequency = audio_analysis.spectral_analysis(signal, rate)[0][0] |
| |
| # Finds the array which contains absolute difference between dominant |
| # frequency and frequency at each time point. |
| frequency_delta = abs(frequency - dominant_frequency) |
| |
| # Computes average amplitude of each type of block |
| res = find_block_average_value(amplitude, block_size * 2, block_size) |
| left_block_amplitude, right_block_amplitude, block_amplitude = res |
| |
| # Computes average absolute difference of frequency and dominant frequency |
| # of the block of each index |
| _, _, block_frequency_delta = find_block_average_value(frequency_delta, |
| block_size * 2, |
| block_size) |
| |
| # Finds start and end index of sine wave. |
| start_index, end_index = find_start_end_index(dominant_frequency, |
| block_frequency_delta, |
| block_size, |
| frequency_error_threshold) |
| |
| if start_index > end_index: |
| raise SineWaveNotFound('No sine wave found in signal') |
| |
| logging.debug('Found sine wave: start: %s, end: %s', |
| float(start_index) / rate - APPEND_ZEROS_SECS, |
| float(end_index) / rate - APPEND_ZEROS_SECS) |
| |
| sum_of_amplitude = float(sum(amplitude[start_index:end_index])) |
| # Finds average amplitude of sine wave. |
| average_amplitude = sum_of_amplitude / (end_index - start_index) |
| |
| # Finds noise before and/or after playback. |
| noise_before_playing, noise_after_playing = noise_detection( |
| start_index, end_index, |
| block_amplitude, average_amplitude, |
| rate, |
| noise_amplitude_threshold) |
| |
| # Finds delay during playback. |
| delays = delay_detection(start_index, end_index, |
| block_amplitude, average_amplitude, |
| dominant_frequency, |
| rate, |
| left_block_amplitude, |
| right_block_amplitude, |
| block_frequency_delta, |
| delay_amplitude_threshold, |
| frequency_error_threshold) |
| |
| # Finds burst during playback. |
| burst_time_points = burst_detection(start_index, end_index, |
| block_amplitude, average_amplitude, |
| dominant_frequency, |
| rate, |
| left_block_amplitude, |
| right_block_amplitude, |
| block_frequency_delta, |
| burst_amplitude_threshold, |
| frequency_error_threshold) |
| |
| # Finds volume changing during playback. |
| volume_changes = changing_volume_detection( |
| start_index, end_index, |
| average_amplitude, |
| rate, |
| left_block_amplitude, |
| right_block_amplitude, |
| volume_changing_amplitude_threshold) |
| |
| # Calculates the average teager value. |
| teager_value = average_teager_value(signal[start_index:end_index], |
| average_amplitude) |
| |
| # Finds out the noise level. |
| noise = noise_level(average_amplitude, dominant_frequency, |
| rate, |
| teager_value) |
| |
| return {'artifacts': |
| {'noise_before_playback': noise_before_playing, |
| 'noise_after_playback': noise_after_playing, |
| 'delay_during_playback': delays, |
| 'burst_during_playback': burst_time_points |
| }, |
| 'volume_changes': volume_changes, |
| 'equivalent_noise_level': noise |
| } |