blob: 6f8abe2b4f1df24f06cd66768b77e0da25dbc919 [file] [log] [blame] [edit]
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""This module provides utilities to do audio data analysis."""
import logging
import numpy
import operator
# Only peaks with coefficient greater than 0.01 of the first peak should be
# considered. Note that this correspond to -40dB in the spectrum.
DEFAULT_MIN_PEAK_RATIO = 0.01
PEAK_WINDOW_SIZE_HZ = 20 # Window size for peak detection.
# The minimum RMS value of meaningful audio data.
MEANINGFUL_RMS_THRESHOLD = 0.001
class RMSTooSmallError(Exception):
"""Error when signal RMS is too small."""
pass
class EmptyDataError(Exception):
"""Error when signal is empty."""
def normalize_signal(signal, saturate_value):
"""Normalizes the signal with respect to the saturate value.
@param signal: A list for one-channel PCM data.
@param saturate_value: The maximum value that the PCM data might be.
@returns: A numpy array containing normalized signal. The normalized signal
has value -1 and 1 when it saturates.
"""
signal = numpy.array(signal)
return signal / float(saturate_value)
def spectral_analysis(signal, rate, min_peak_ratio=DEFAULT_MIN_PEAK_RATIO,
peak_window_size_hz=PEAK_WINDOW_SIZE_HZ):
"""Gets the dominant frequencies by spectral analysis.
@param signal: A list of numbers for one-channel PCM data. This should be
normalized to [-1, 1] so the function can check if signal RMS
is too small to be meaningful.
@param rate: Sampling rate.
@param min_peak_ratio: The minimum peak_0/peak_i ratio such that the
peaks other than the greatest one should be
considered.
This is to ignore peaks that are too small compared
to the first peak peak_0.
@param peak_window_size_hz: The window size in Hz to find the peaks.
The minimum differences between found peaks will
be half of this value.
@returns: A list of tuples:
[(peak_frequency_0, peak_coefficient_0),
(peak_frequency_1, peak_coefficient_1),
(peak_frequency_2, peak_coefficient_2), ...]
where the tuples are sorted by coefficients.
The last peak_coefficient will be no less than
peak_coefficient * min_peak_ratio.
If RMS is less than MEANINGFUL_RMS_THRESHOLD, returns [(0, 0)].
"""
# Checks the signal is meaningful.
if len(signal) == 0:
raise EmptyDataError('Signal data is empty')
signal_rms = numpy.linalg.norm(signal) / numpy.sqrt(len(signal))
logging.debug('signal RMS = %s', signal_rms)
# If RMS is too small, set dominant frequency and coefficient to 0.
if signal_rms < MEANINGFUL_RMS_THRESHOLD:
logging.warning(
'RMS %s is too small to be meaningful. Set frequency to 0.',
signal_rms)
return [(0, 0)]
logging.debug('Doing spectral analysis ...')
# First, pass signal through a window function to mitigate spectral leakage.
y_conv_w = signal * numpy.hanning(len(signal))
length = len(y_conv_w)
# x_f is the frequency in Hz, y_f is the transformed coefficient.
x_f = _rfft_freq(length, rate)
y_f = 2.0 / length * numpy.fft.rfft(y_conv_w)
# y_f is complex so consider its absolute value for magnitude.
abs_y_f = numpy.abs(y_f)
threshold = max(abs_y_f) * min_peak_ratio
# Suppresses all coefficients that are below threshold.
for i in xrange(len(abs_y_f)):
if abs_y_f[i] < threshold:
abs_y_f[i] = 0
# Gets the peak detection window size in indice.
# x_f[1] is the frequency difference per index.
peak_window_size = int(peak_window_size_hz / x_f[1])
# Detects peaks.
peaks = peak_detection(abs_y_f, peak_window_size)
# Transform back the peak location from index to frequency.
results = []
for index, value in peaks:
results.append((x_f[index], value))
return results
def _rfft_freq(length, rate):
"""Gets the frequency at each index of real FFT.
@param length: The window length of FFT.
@param rate: Sampling rate.
@returns: A numpy array containing frequency corresponding to
numpy.fft.rfft result at each index.
"""
# The difference in Hz between each index.
val = rate / float(length)
# Only care half of frequencies for FFT on real signal.
result_length = length // 2 + 1
return numpy.linspace(0, (result_length - 1) * val, result_length)
def peak_detection(array, window_size):
"""Detects peaks in an array.
A point (i, array[i]) is a peak if array[i] is the maximum among
array[i - half_window_size] to array[i + half_window_size].
If array[i - half_window_size] to array[i + half_window_size] are all equal,
then there is no peak in this window.
Note that we only consider peak with value greater than 0.
@param window_size: The window to detect peaks.
@returns: A list of tuples:
[(peak_index_1, peak_value_1), (peak_index_2, peak_value_2), ...]
where the tuples are sorted by peak values.
"""
half_window_size = window_size / 2
length = len(array)
def mid_is_peak(array, mid, left, right):
"""Checks if value at mid is the largest among left to right in array.
@param array: A list of numbers.
@param mid: The mid index.
@param left: The left index.
@param rigth: The right index.
@returns: A tuple (is_peak, next_candidate)
is_peak is True if array[index] is the maximum among numbers
in array between index [left, right] inclusively.
next_candidate is the index of next candidate for peak if
is_peak is False. It is the index of maximum value in
[mid + 1, right]. If is_peak is True, next_candidate is
right + 1.
"""
value_mid = array[mid]
is_peak = True
next_peak_candidate_index = None
# Check the left half window.
for index in xrange(left, mid):
if array[index] >= value_mid:
is_peak = False
break
# Mid is at the end of array.
if mid == right:
return is_peak, right + 1
# Check the right half window and also record next candidate.
# Favor the larger index for next_peak_candidate_index.
for index in xrange(right, mid, -1):
if (next_peak_candidate_index is None or
array[index] > array[next_peak_candidate_index]):
next_peak_candidate_index = index
if array[next_peak_candidate_index] >= value_mid:
is_peak = False
if is_peak:
next_peak_candidate_index = right + 1
return is_peak, next_peak_candidate_index
results = []
mid = 0
next_candidate_idx = None
while mid < length:
left = max(0, mid - half_window_size)
right = min(length - 1, mid + half_window_size)
# Only consider value greater than 0.
if array[mid] == 0:
mid = mid + 1
continue;
is_peak, next_candidate_idx = mid_is_peak(array, mid, left, right)
if is_peak:
results.append((mid, array[mid]))
# Use the next candidate found in [mid + 1, right], or right + 1.
mid = next_candidate_idx
# Sort the peaks by values.
return sorted(results, key=lambda x: x[1], reverse=True)
# The default pattern mathing threshold. By experiment, this threshold
# can tolerate normal noise of 0.3 amplitude when sine wave signal
# amplitude is 1.
PATTERN_MATCHING_THRESHOLD = 0.85
# The default block size of pattern matching.
ANOMALY_DETECTION_BLOCK_SIZE = 120
def anomaly_detection(signal, rate, freq,
block_size=ANOMALY_DETECTION_BLOCK_SIZE,
threshold=PATTERN_MATCHING_THRESHOLD):
"""Detects anomaly in a sine wave signal.
This method detects anomaly in a sine wave signal by matching
patterns of each block.
For each moving window of block in the test signal, checks if there
is any block in golden signal that is similar to this block of test signal.
If there is such a block in golden signal, then this block of test
signal is matched and there is no anomaly in this block of test signal.
If there is any block in test signal that is not matched, then this block
covers an anomaly.
The block of test signal starts from index 0, and proceeds in steps of
half block size. The overlapping of test signal blocks makes sure there must
be at least one block covering the transition from sine wave to anomaly.
@param signal: A 1-D array-like object for 1-channel PCM data.
@param rate: The sampling rate.
@param freq: The expected frequency of signal.
@param block_size: The block size in samples to detect anomaly.
@param threshold: The threshold of correlation index to be judge as matched.
@returns: A list containing detected anomaly time in seconds.
"""
if len(signal) == 0:
raise EmptyDataError('Signal data is empty')
golden_y = _generate_golden_pattern(rate, freq, block_size)
results = []
for start in xrange(0, len(signal), block_size / 2):
end = start + block_size
test_signal = signal[start:end]
matched = _moving_pattern_matching(golden_y, test_signal, threshold)
if not matched:
results.append(start)
results = [float(x) / rate for x in results]
return results
def _generate_golden_pattern(rate, freq, block_size):
"""Generates a golden pattern of certain frequency.
The golden pattern must cover all the possibilities of waveforms in a
block. So, we need a golden pattern covering 1 period + 1 block size,
such that the test block can start anywhere in a period, and extends
a block size.
|period |1 bk|
| | |
. . . .
. . . .
. . .
@param rate: The sampling rate.
@param freq: The frequency of golden pattern.
@param block_size: The block size in samples to detect anomaly.
@returns: A 1-D array for golden pattern.
"""
samples_in_a_period = int(rate / freq) + 1
samples_in_golden_pattern = samples_in_a_period + block_size
golden_x = numpy.linspace(
0.0, (samples_in_golden_pattern - 1) * 1.0 / rate,
samples_in_golden_pattern)
golden_y = numpy.sin(freq * 2.0 * numpy.pi * golden_x)
return golden_y
def _moving_pattern_matching(golden_signal, test_signal, threshold):
"""Checks if test_signal is similar to any block of golden_signal.
Compares test signal with each block of golden signal by correlation
index. If there is any block of golden signal that is similar to
test signal, then it is matched.
@param golden_signal: A 1-D array for golden signal.
@param test_signal: A 1-D array for test signal.
@param threshold: The threshold of correlation index to be judge as matched.
@returns: True if there is a match. False otherwise.
@raises: ValueError: if test signal is longer than golden signal.
"""
if len(golden_signal) < len(test_signal):
raise ValueError('Test signal is longer than golden signal')
block_length = len(test_signal)
number_of_movings = len(golden_signal) - block_length + 1
correlation_indices = []
for moving_index in xrange(number_of_movings):
# Cuts one block of golden signal from start index.
# The block length is the same as test signal.
start = moving_index
end = start + block_length
golden_signal_block = golden_signal[start:end]
try:
correlation_index = _get_correlation_index(
golden_signal_block, test_signal)
except TestSignalNormTooSmallError:
logging.info('Caught one block of test signal that has no meaningful norm')
return False
correlation_indices.append(correlation_index)
# Checks if the maximum correlation index is high enough.
max_corr = max(correlation_indices)
if max_corr < threshold:
logging.debug('Got one unmatched block with max_corr: %s', max_corr)
return False
return True
class GoldenSignalNormTooSmallError(Exception):
"""Exception when golden signal norm is too small."""
pass
class TestSignalNormTooSmallError(Exception):
"""Exception when test signal norm is too small."""
pass
_MINIMUM_SIGNAL_NORM = 0.001
def _get_correlation_index(golden_signal, test_signal):
"""Computes correlation index of two signal of same length.
@param golden_signal: An 1-D array-like object.
@param test_signal: An 1-D array-like object.
@raises: ValueError: if two signal have different lengths.
@raises: GoldenSignalNormTooSmallError: if golden signal norm is too small
@raises: TestSignalNormTooSmallError: if test signal norm is too small.
@returns: The correlation index.
"""
if len(golden_signal) != len(test_signal):
raise ValueError(
'Only accepts signal of same length: %s, %s' % (
len(golden_signal), len(test_signal)))
norm_golden = numpy.linalg.norm(golden_signal)
norm_test = numpy.linalg.norm(test_signal)
if norm_golden <= _MINIMUM_SIGNAL_NORM:
raise GoldenSignalNormTooSmallError(
'No meaningful data as norm is too small.')
if norm_test <= _MINIMUM_SIGNAL_NORM:
raise TestSignalNormTooSmallError(
'No meaningful data as norm is too small.')
# The 'valid' cross correlation result of two signals of same length will
# contain only one number.
correlation = numpy.correlate(golden_signal, test_signal, 'valid')[0]
return correlation / (norm_golden * norm_test)