blob: bdd5b3449a8b6d7695e536170a5b25b2a47c0776 [file] [log] [blame]
#!/usr/bin/python2
# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Command line tool to analyze wave file and detect artifacts."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import collections
import json
import logging
import numpy
import pprint
import subprocess
import tempfile
import wave
from six.moves import range
# Normal autotest environment.
try:
import common
from autotest_lib.client.cros.audio import audio_analysis
from autotest_lib.client.cros.audio import audio_data
from autotest_lib.client.cros.audio import audio_quality_measurement
# Standalone execution without autotest environment.
except ImportError:
import audio_analysis
import audio_data
import audio_quality_measurement
# Holder for quality parameters used in audio_quality_measurement module.
QualityParams = collections.namedtuple('QualityParams',
['block_size_secs',
'frequency_error_threshold',
'delay_amplitude_threshold',
'noise_amplitude_threshold',
'burst_amplitude_threshold'])
def add_args(parser):
"""Adds command line arguments."""
parser.add_argument('filename', metavar='FILE', type=str,
help='The wav or raw file to check.'
'The file format is determined by file extension.'
'For raw format, user must also pass -b, -r, -c'
'for bit width, rate, and number of channels.')
parser.add_argument('--debug', action='store_true', default=False,
help='Show debug message.')
parser.add_argument('--spectral-only', action='store_true', default=False,
help='Only do spectral analysis on each channel.')
parser.add_argument('--freqs', metavar='FREQ', type=float,
nargs='*',
help='Expected frequencies in the channels. '
'Frequencies are separated by space. '
'E.g.: --freqs 1000 2000. '
'It means only the first two '
'channels (1000Hz, 2000Hz) are to be checked. '
'Unwanted channels can be specified by 0. '
'E.g.: --freqs 1000 0 2000 0 3000. '
'It means only channe 0,2,4 are to be examined.')
parser.add_argument('--freq-threshold', metavar='FREQ_THRESHOLD', type=float,
default=5,
help='Frequency difference threshold in Hz. '
'Default is 5Hz')
parser.add_argument('--ignore-high-freq', metavar='HIGH_FREQ_THRESHOLD',
type=float, default=5000,
help='Frequency threshold in Hz to be ignored for '
'high frequency. Default is 5KHz')
parser.add_argument('--output-file', metavar='OUTPUT_FILE', type=str,
help='Output file to dump analysis result in JSON format')
parser.add_argument('-b', '--bit-width', metavar='BIT_WIDTH', type=int,
default=32,
help='For raw file. Bit width of a sample. '
'Assume sample format is little-endian signed int. '
'Default is 32')
parser.add_argument('-r', '--rate', metavar='RATE', type=int,
default=48000,
help='For raw file. Sampling rate. Default is 48000')
parser.add_argument('-c', '--channel', metavar='CHANNEL', type=int,
default=8,
help='For raw file. Number of channels. '
'Default is 8.')
# Arguments for quality measurement customization.
parser.add_argument(
'--quality-block-size-secs',
metavar='BLOCK_SIZE_SECS', type=float,
default=audio_quality_measurement.DEFAULT_BLOCK_SIZE_SECS,
help='Block size for quality measurement. '
'Refer to audio_quality_measurement module for detail.')
parser.add_argument(
'--quality-frequency-error-threshold',
metavar='FREQ_ERR_THRESHOLD', type=float,
default=audio_quality_measurement.DEFAULT_FREQUENCY_ERROR,
help='Frequency error threshold for identifying sine wave'
'in quality measurement. '
'Refer to audio_quality_measurement module for detail.')
parser.add_argument(
'--quality-delay-amplitude-threshold',
metavar='DELAY_AMPLITUDE_THRESHOLD', type=float,
default=audio_quality_measurement.DEFAULT_DELAY_AMPLITUDE_THRESHOLD,
help='Amplitude ratio threshold for identifying delay in sine wave'
'in quality measurement. '
'Refer to audio_quality_measurement module for detail.')
parser.add_argument(
'--quality-noise-amplitude-threshold',
metavar='NOISE_AMPLITUDE_THRESHOLD', type=float,
default=audio_quality_measurement.DEFAULT_NOISE_AMPLITUDE_THRESHOLD,
help='Amplitude ratio threshold for identifying noise in sine wave'
'in quality measurement. '
'Refer to audio_quality_measurement module for detail.')
parser.add_argument(
'--quality-burst-amplitude-threshold',
metavar='BURST_AMPLITUDE_THRESHOLD', type=float,
default=audio_quality_measurement.DEFAULT_BURST_AMPLITUDE_THRESHOLD,
help='Amplitude ratio threshold for identifying burst in sine wave'
'in quality measurement. '
'Refer to audio_quality_measurement module for detail.')
def parse_args(parser):
"""Parses args."""
args = parser.parse_args()
return args
class WaveFileException(Exception):
"""Error in WaveFile."""
pass
class WaveFormatExtensibleException(Exception):
"""Wave file is in WAVE_FORMAT_EXTENSIBLE format which is not supported."""
pass
class WaveFile(object):
"""Class which handles wave file reading.
Properties:
raw_data: audio_data.AudioRawData object for data in wave file.
rate: sampling rate.
"""
def __init__(self, filename):
"""Inits a wave file.
@param filename: file name of the wave file.
"""
self.raw_data = None
self.rate = None
self._wave_reader = None
self._n_channels = None
self._sample_width_bits = None
self._n_frames = None
self._binary = None
try:
self._read_wave_file(filename)
except WaveFormatExtensibleException:
logging.warning(
'WAVE_FORMAT_EXTENSIBLE is not supproted. '
'Try command "sox in.wav -t wavpcm out.wav" to convert '
'the file to WAVE_FORMAT_PCM format.')
self._convert_and_read_wav_file(filename)
def _convert_and_read_wav_file(self, filename):
"""Converts the wav file and read it.
Converts the file into WAVE_FORMAT_PCM format using sox command and
reads its content.
@param filename: The wave file to be read.
@raises: RuntimeError: sox is not installed.
"""
# Checks if sox is installed.
try:
subprocess.check_output(['sox', '--version'])
except:
raise RuntimeError('sox command is not installed. '
'Try sudo apt-get install sox')
with tempfile.NamedTemporaryFile(suffix='.wav') as converted_file:
command = ['sox', filename, '-t', 'wavpcm', converted_file.name]
logging.debug('Convert the file using sox: %s', command)
subprocess.check_call(command)
self._read_wave_file(converted_file.name)
def _read_wave_file(self, filename):
"""Reads wave file header and samples.
@param filename: The wave file to be read.
@raises WaveFormatExtensibleException: Wave file is in
WAVE_FORMAT_EXTENSIBLE format.
@raises WaveFileException: Wave file format is not supported.
"""
try:
self._wave_reader = wave.open(filename, 'r')
self._read_wave_header()
self._read_wave_binary()
except wave.Error as e:
if 'unknown format: 65534' in str(e):
raise WaveFormatExtensibleException()
else:
logging.exception('Unsupported wave format')
raise WaveFileException()
finally:
if self._wave_reader:
self._wave_reader.close()
def _read_wave_header(self):
"""Reads wave file header.
@raises WaveFileException: wave file is compressed.
"""
# Header is a tuple of
# (nchannels, sampwidth, framerate, nframes, comptype, compname).
header = self._wave_reader.getparams()
logging.debug('Wave header: %s', header)
self._n_channels = header[0]
self._sample_width_bits = header[1] * 8
self.rate = header[2]
self._n_frames = header[3]
comptype = header[4]
compname = header[5]
if comptype != 'NONE' or compname != 'not compressed':
raise WaveFileException('Can not support compressed wav file.')
def _read_wave_binary(self):
"""Reads in samples in wave file."""
self._binary = self._wave_reader.readframes(self._n_frames)
format_str = 'S%d_LE' % self._sample_width_bits
self.raw_data = audio_data.AudioRawData(
binary=self._binary,
channel=self._n_channels,
sample_format=format_str)
def get_number_frames(self):
"""Get the number of frames in the wave file."""
return self._n_frames
class QualityCheckerError(Exception):
"""Error in QualityChecker."""
pass
class CompareFailure(QualityCheckerError):
"""Exception when frequency comparison fails."""
pass
class QualityFailure(QualityCheckerError):
"""Exception when quality check fails."""
pass
class QualityChecker(object):
"""Quality checker controls the flow of checking quality of raw data."""
def __init__(self, raw_data, rate):
"""Inits a quality checker.
@param raw_data: An audio_data.AudioRawData object.
@param rate: Sampling rate.
"""
self._raw_data = raw_data
self._rate = rate
self._spectrals = []
self._quality_result = []
def do_spectral_analysis(self, ignore_high_freq, check_quality,
quality_params):
"""Gets the spectral_analysis result.
@param ignore_high_freq: Ignore high frequencies above this threshold.
@param check_quality: Check quality of each channel.
@param quality_params: A QualityParams object for quality measurement.
"""
self.has_data()
for channel_idx in range(self._raw_data.channel):
signal = self._raw_data.channel_data[channel_idx]
max_abs = max(numpy.abs(signal))
logging.debug('Channel %d max abs signal: %f', channel_idx, max_abs)
if max_abs == 0:
logging.info('No data on channel %d, skip this channel',
channel_idx)
continue
saturate_value = audio_data.get_maximum_value_from_sample_format(
self._raw_data.sample_format)
normalized_signal = audio_analysis.normalize_signal(
signal, saturate_value)
logging.debug('saturate_value: %f', saturate_value)
logging.debug('max signal after normalized: %f', max(normalized_signal))
spectral = audio_analysis.spectral_analysis(
normalized_signal, self._rate)
logging.debug('Channel %d spectral:\n%s', channel_idx,
pprint.pformat(spectral))
# Ignore high frequencies above the threshold.
spectral = [(f, c) for (f, c) in spectral if f < ignore_high_freq]
logging.info('Channel %d spectral after ignoring high frequencies '
'above %f:\n%s', channel_idx, ignore_high_freq,
pprint.pformat(spectral))
if check_quality:
quality = audio_quality_measurement.quality_measurement(
signal=normalized_signal,
rate=self._rate,
dominant_frequency=spectral[0][0],
block_size_secs=quality_params.block_size_secs,
frequency_error_threshold=quality_params.frequency_error_threshold,
delay_amplitude_threshold=quality_params.delay_amplitude_threshold,
noise_amplitude_threshold=quality_params.noise_amplitude_threshold,
burst_amplitude_threshold=quality_params.burst_amplitude_threshold)
logging.debug('Channel %d quality:\n%s', channel_idx,
pprint.pformat(quality))
self._quality_result.append(quality)
self._spectrals.append(spectral)
def has_data(self):
"""Checks if data has been set.
@raises QualityCheckerError: if data or rate is not set yet.
"""
if not self._raw_data or not self._rate:
raise QualityCheckerError('Data and rate is not set yet')
def check_freqs(self, expected_freqs, freq_threshold):
"""Checks the dominant frequencies in the channels.
@param expected_freq: A list of frequencies. If frequency is 0, it
means this channel should be ignored.
@param freq_threshold: The difference threshold to compare two
frequencies.
"""
logging.debug('expected_freqs: %s', expected_freqs)
for idx, expected_freq in enumerate(expected_freqs):
if expected_freq == 0:
continue
if not self._spectrals[idx]:
raise CompareFailure(
'Failed at channel %d: no dominant frequency' % idx)
dominant_freq = self._spectrals[idx][0][0]
if abs(dominant_freq - expected_freq) > freq_threshold:
raise CompareFailure(
'Failed at channel %d: %f is too far away from %f' % (
idx, dominant_freq, expected_freq))
def check_quality(self):
"""Checks the quality measurement results on each channel.
@raises: QualityFailure when there is artifact.
"""
error_msgs = []
for idx, quality_res in enumerate(self._quality_result):
artifacts = quality_res['artifacts']
if artifacts['noise_before_playback']:
error_msgs.append(
'Found noise before playback: %s' % (
artifacts['noise_before_playback']))
if artifacts['noise_after_playback']:
error_msgs.append(
'Found noise after playback: %s' % (
artifacts['noise_after_playback']))
if artifacts['delay_during_playback']:
error_msgs.append(
'Found delay during playback: %s' % (
artifacts['delay_during_playback']))
if artifacts['burst_during_playback']:
error_msgs.append(
'Found burst during playback: %s' % (
artifacts['burst_during_playback']))
if error_msgs:
raise QualityFailure('Found bad quality: %s', '\n'.join(error_msgs))
def dump(self, output_file):
"""Dumps the result into a file in json format.
@param output_file: A file path to dump spectral and quality
measurement result of each channel.
"""
dump_dict = {
'spectrals': self._spectrals,
'quality_result': self._quality_result
}
with open(output_file, 'w') as f:
json.dump(dump_dict, f)
class CheckQualityError(Exception):
"""Error in check_quality main function."""
pass
def read_audio_file(args):
"""Reads audio file.
@param args: The namespace parsed from command line arguments.
@returns: A tuple (raw_data, rate) where raw_data is
audio_data.AudioRawData, rate is sampling rate.
"""
if args.filename.endswith('.wav'):
wavefile = WaveFile(args.filename)
raw_data = wavefile.raw_data
rate = wavefile.rate
elif args.filename.endswith('.raw'):
binary = None
with open(args.filename, 'r') as f:
binary = f.read()
raw_data = audio_data.AudioRawData(
binary=binary,
channel=args.channel,
sample_format='S%d_LE' % args.bit_width)
rate = args.rate
else:
raise CheckQualityError(
'File format for %s is not supported' % args.filename)
return raw_data, rate
def get_quality_params(args):
"""Gets quality parameters in arguments.
@param args: The namespace parsed from command line arguments.
@returns: A QualityParams object.
"""
quality_params = QualityParams(
block_size_secs=args.quality_block_size_secs,
frequency_error_threshold=args.quality_frequency_error_threshold,
delay_amplitude_threshold=args.quality_delay_amplitude_threshold,
noise_amplitude_threshold=args.quality_noise_amplitude_threshold,
burst_amplitude_threshold=args.quality_burst_amplitude_threshold)
return quality_params
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description='Check signal quality of a wave file. Each channel should'
' either be all zeros, or sine wave of a fixed frequency.')
add_args(parser)
args = parse_args(parser)
level = logging.DEBUG if args.debug else logging.INFO
format = '%(asctime)-15s:%(levelname)s:%(pathname)s:%(lineno)d: %(message)s'
logging.basicConfig(format=format, level=level)
raw_data, rate = read_audio_file(args)
checker = QualityChecker(raw_data, rate)
quality_params = get_quality_params(args)
checker.do_spectral_analysis(ignore_high_freq=args.ignore_high_freq,
check_quality=(not args.spectral_only),
quality_params=quality_params)
if args.output_file:
checker.dump(args.output_file)
if args.freqs:
checker.check_freqs(args.freqs, args.freq_threshold)
if not args.spectral_only:
checker.check_quality()