blob: 3a206f99930ebeddf646dd3b8d32012723147822 [file] [log] [blame]
#!/usr/bin/python
# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Command line tool to analyze wave file and detect artifacts."""
import argparse
import collections
import json
import logging
import math
import numpy
import os
import pprint
import subprocess
import tempfile
import wave
# Normal autotest environment.
try:
import common
from autotest_lib.client.cros.audio import audio_analysis
from autotest_lib.client.cros.audio import audio_data
from autotest_lib.client.cros.audio import audio_quality_measurement
# Standalone execution without autotest environment.
except ImportError:
import audio_analysis
import audio_data
import audio_quality_measurement
# Holder for quality parameters used in audio_quality_measurement module.
QualityParams = collections.namedtuple('QualityParams',
['block_size_secs',
'frequency_error_threshold',
'delay_amplitude_threshold',
'noise_amplitude_threshold',
'burst_amplitude_threshold'])
def add_args(parser):
"""Adds command line arguments."""
parser.add_argument('filename', metavar='FILE', type=str,
help='The wav or raw file to check.'
'The file format is determined by file extension.'
'For raw format, user must also pass -b, -r, -c'
'for bit width, rate, and number of channels.')
parser.add_argument('--debug', action='store_true', default=False,
help='Show debug message.')
parser.add_argument('--spectral-only', action='store_true', default=False,
help='Only do spectral analysis on each channel.')
parser.add_argument('--freqs', metavar='FREQ', type=float,
nargs='*',
help='Expected frequencies in the channels. '
'Frequencies are separated by space. '
'E.g.: --freqs 1000 2000. '
'It means only the first two '
'channels (1000Hz, 2000Hz) are to be checked. '
'Unwanted channels can be specified by 0. '
'E.g.: --freqs 1000 0 2000 0 3000. '
'It means only channe 0,2,4 are to be examined.')
parser.add_argument('--freq-threshold', metavar='FREQ_THRESHOLD', type=float,
default=5,
help='Frequency difference threshold in Hz. '
'Default is 5Hz')
parser.add_argument('--ignore-high-freq', metavar='HIGH_FREQ_THRESHOLD',
type=float, default=5000,
help='Frequency threshold in Hz to be ignored for '
'high frequency. Default is 5KHz')
parser.add_argument('--output-file', metavar='OUTPUT_FILE', type=str,
help='Output file to dump analysis result in JSON format')
parser.add_argument('-b', '--bit-width', metavar='BIT_WIDTH', type=int,
default=32,
help='For raw file. Bit width of a sample. '
'Assume sample format is little-endian signed int. '
'Default is 32')
parser.add_argument('-r', '--rate', metavar='RATE', type=int,
default=48000,
help='For raw file. Sampling rate. Default is 48000')
parser.add_argument('-c', '--channel', metavar='CHANNEL', type=int,
default=8,
help='For raw file. Number of channels. '
'Default is 8.')
# Arguments for quality measurement customization.
parser.add_argument(
'--quality-block-size-secs',
metavar='BLOCK_SIZE_SECS', type=float,
default=audio_quality_measurement.DEFAULT_BLOCK_SIZE_SECS,
help='Block size for quality measurement. '
'Refer to audio_quality_measurement module for detail.')
parser.add_argument(
'--quality-frequency-error-threshold',
metavar='FREQ_ERR_THRESHOLD', type=float,
default=audio_quality_measurement.DEFAULT_FREQUENCY_ERROR,
help='Frequency error threshold for identifying sine wave'
'in quality measurement. '
'Refer to audio_quality_measurement module for detail.')
parser.add_argument(
'--quality-delay-amplitude-threshold',
metavar='DELAY_AMPLITUDE_THRESHOLD', type=float,
default=audio_quality_measurement.DEFAULT_DELAY_AMPLITUDE_THRESHOLD,
help='Amplitude ratio threshold for identifying delay in sine wave'
'in quality measurement. '
'Refer to audio_quality_measurement module for detail.')
parser.add_argument(
'--quality-noise-amplitude-threshold',
metavar='NOISE_AMPLITUDE_THRESHOLD', type=float,
default=audio_quality_measurement.DEFAULT_NOISE_AMPLITUDE_THRESHOLD,
help='Amplitude ratio threshold for identifying noise in sine wave'
'in quality measurement. '
'Refer to audio_quality_measurement module for detail.')
parser.add_argument(
'--quality-burst-amplitude-threshold',
metavar='BURST_AMPLITUDE_THRESHOLD', type=float,
default=audio_quality_measurement.DEFAULT_BURST_AMPLITUDE_THRESHOLD,
help='Amplitude ratio threshold for identifying burst in sine wave'
'in quality measurement. '
'Refer to audio_quality_measurement module for detail.')
def parse_args(parser):
"""Parses args."""
args = parser.parse_args()
return args
class WaveFileException(Exception):
"""Error in WaveFile."""
pass
class WaveFormatExtensibleException(Exception):
"""Wave file is in WAVE_FORMAT_EXTENSIBLE format which is not supported."""
pass
class WaveFile(object):
"""Class which handles wave file reading.
Properties:
raw_data: audio_data.AudioRawData object for data in wave file.
rate: sampling rate.
"""
def __init__(self, filename):
"""Inits a wave file.
@param filename: file name of the wave file.
"""
self.raw_data = None
self.rate = None
self._wave_reader = None
self._n_channels = None
self._sample_width_bits = None
self._n_frames = None
self._binary = None
try:
self._read_wave_file(filename)
except WaveFormatExtensibleException:
logging.warning(
'WAVE_FORMAT_EXTENSIBLE is not supproted. '
'Try command "sox in.wav -t wavpcm out.wav" to convert '
'the file to WAVE_FORMAT_PCM format.')
self._convert_and_read_wav_file(filename)
def _convert_and_read_wav_file(self, filename):
"""Converts the wav file and read it.
Converts the file into WAVE_FORMAT_PCM format using sox command and
reads its content.
@param filename: The wave file to be read.
@raises: RuntimeError: sox is not installed.
"""
# Checks if sox is installed.
try:
subprocess.check_output(['sox', '--version'])
except:
raise RuntimeError('sox command is not installed. '
'Try sudo apt-get install sox')
with tempfile.NamedTemporaryFile(suffix='.wav') as converted_file:
command = ['sox', filename, '-t', 'wavpcm', converted_file.name]
logging.debug('Convert the file using sox: %s', command)
subprocess.check_call(command)
self._read_wave_file(converted_file.name)
def _read_wave_file(self, filename):
"""Reads wave file header and samples.
@param filename: The wave file to be read.
@raises WaveFormatExtensibleException: Wave file is in
WAVE_FORMAT_EXTENSIBLE format.
@raises WaveFileException: Wave file format is not supported.
"""
try:
self._wave_reader = wave.open(filename, 'r')
self._read_wave_header()
self._read_wave_binary()
except wave.Error as e:
if 'unknown format: 65534' in str(e):
raise WaveFormatExtensibleException()
else:
logging.exception('Unsupported wave format')
raise WaveFileException()
finally:
if self._wave_reader:
self._wave_reader.close()
def _read_wave_header(self):
"""Reads wave file header.
@raises WaveFileException: wave file is compressed.
"""
# Header is a tuple of
# (nchannels, sampwidth, framerate, nframes, comptype, compname).
header = self._wave_reader.getparams()
logging.debug('Wave header: %s', header)
self._n_channels = header[0]
self._sample_width_bits = header[1] * 8
self.rate = header[2]
self._n_frames = header[3]
comptype = header[4]
compname = header[5]
if comptype != 'NONE' or compname != 'not compressed':
raise WaveFileException('Can not support compressed wav file.')
def _read_wave_binary(self):
"""Reads in samples in wave file."""
self._binary = self._wave_reader.readframes(self._n_frames)
format_str = 'S%d_LE' % self._sample_width_bits
self.raw_data = audio_data.AudioRawData(
binary=self._binary,
channel=self._n_channels,
sample_format=format_str)
class QualityCheckerError(Exception):
"""Error in QualityChecker."""
pass
class CompareFailure(QualityCheckerError):
"""Exception when frequency comparison fails."""
pass
class QualityFailure(QualityCheckerError):
"""Exception when quality check fails."""
pass
class QualityChecker(object):
"""Quality checker controls the flow of checking quality of raw data."""
def __init__(self, raw_data, rate):
"""Inits a quality checker.
@param raw_data: An audio_data.AudioRawData object.
@param rate: Sampling rate.
"""
self._raw_data = raw_data
self._rate = rate
self._spectrals = []
self._quality_result = []
def do_spectral_analysis(self, ignore_high_freq, check_quality,
quality_params):
"""Gets the spectral_analysis result.
@param ignore_high_freq: Ignore high frequencies above this threshold.
@param check_quality: Check quality of each channel.
@param quality_params: A QualityParams object for quality measurement.
"""
self.has_data()
for channel_idx in xrange(self._raw_data.channel):
signal = self._raw_data.channel_data[channel_idx]
max_abs = max(numpy.abs(signal))
logging.debug('Channel %d max abs signal: %f', channel_idx, max_abs)
if max_abs == 0:
logging.info('No data on channel %d, skip this channel',
channel_idx)
continue
saturate_value = audio_data.get_maximum_value_from_sample_format(
self._raw_data.sample_format)
normalized_signal = audio_analysis.normalize_signal(
signal, saturate_value)
logging.debug('saturate_value: %f', saturate_value)
logging.debug('max signal after normalized: %f', max(normalized_signal))
spectral = audio_analysis.spectral_analysis(
normalized_signal, self._rate)
logging.debug('Channel %d spectral:\n%s', channel_idx,
pprint.pformat(spectral))
# Ignore high frequencies above the threshold.
spectral = [(f, c) for (f, c) in spectral if f < ignore_high_freq]
logging.info('Channel %d spectral after ignoring high frequencies '
'above %f:\n%s', channel_idx, ignore_high_freq,
pprint.pformat(spectral))
if check_quality:
quality = audio_quality_measurement.quality_measurement(
signal=normalized_signal,
rate=self._rate,
dominant_frequency=spectral[0][0],
block_size_secs=quality_params.block_size_secs,
frequency_error_threshold=quality_params.frequency_error_threshold,
delay_amplitude_threshold=quality_params.delay_amplitude_threshold,
noise_amplitude_threshold=quality_params.noise_amplitude_threshold,
burst_amplitude_threshold=quality_params.burst_amplitude_threshold)
logging.debug('Channel %d quality:\n%s', channel_idx,
pprint.pformat(quality))
self._quality_result.append(quality)
self._spectrals.append(spectral)
def has_data(self):
"""Checks if data has been set.
@raises QualityCheckerError: if data or rate is not set yet.
"""
if not self._raw_data or not self._rate:
raise QualityCheckerError('Data and rate is not set yet')
def check_freqs(self, expected_freqs, freq_threshold):
"""Checks the dominant frequencies in the channels.
@param expected_freq: A list of frequencies. If frequency is 0, it
means this channel should be ignored.
@param freq_threshold: The difference threshold to compare two
frequencies.
"""
logging.debug('expected_freqs: %s', expected_freqs)
for idx, expected_freq in enumerate(expected_freqs):
if expected_freq == 0:
continue
if not self._spectrals[idx]:
raise CompareFailure(
'Failed at channel %d: no dominant frequency' % idx)
dominant_freq = self._spectrals[idx][0][0]
if abs(dominant_freq - expected_freq) > freq_threshold:
raise CompareFailure(
'Failed at channel %d: %f is too far away from %f' % (
idx, dominant_freq, expected_freq))
def check_quality(self):
"""Checks the quality measurement results on each channel.
@raises: QualityFailure when there is artifact.
"""
error_msgs = []
for idx, quality_res in enumerate(self._quality_result):
artifacts = quality_res['artifacts']
if artifacts['noise_before_playback']:
error_msgs.append(
'Found noise before playback: %s' % (
artifacts['noise_before_playback']))
if artifacts['noise_after_playback']:
error_msgs.append(
'Found noise after playback: %s' % (
artifacts['noise_after_playback']))
if artifacts['delay_during_playback']:
error_msgs.append(
'Found delay during playback: %s' % (
artifacts['delay_during_playback']))
if artifacts['burst_during_playback']:
error_msgs.append(
'Found burst during playback: %s' % (
artifacts['burst_during_playback']))
if error_msgs:
raise QualityFailure('Found bad quality: %s', '\n'.join(error_msgs))
def dump(self, output_file):
"""Dumps the result into a file in json format.
@param output_file: A file path to dump spectral and quality
measurement result of each channel.
"""
dump_dict = {
'spectrals': self._spectrals,
'quality_result': self._quality_result
}
with open(output_file, 'w') as f:
json.dump(dump_dict, f)
class CheckQualityError(Exception):
"""Error in check_quality main function."""
pass
def read_audio_file(args):
"""Reads audio file.
@param args: The namespace parsed from command line arguments.
@returns: A tuple (raw_data, rate) where raw_data is
audio_data.AudioRawData, rate is sampling rate.
"""
if args.filename.endswith('.wav'):
wavefile = WaveFile(args.filename)
raw_data = wavefile.raw_data
rate = wavefile.rate
elif args.filename.endswith('.raw'):
binary = None
with open(args.filename, 'r') as f:
binary = f.read()
raw_data = audio_data.AudioRawData(
binary=binary,
channel=args.channel,
sample_format='S%d_LE' % args.bit_width)
rate = args.rate
else:
raise CheckQualityError(
'File format for %s is not supported' % args.filename)
return raw_data, rate
def get_quality_params(args):
"""Gets quality parameters in arguments.
@param args: The namespace parsed from command line arguments.
@returns: A QualityParams object.
"""
quality_params = QualityParams(
block_size_secs=args.quality_block_size_secs,
frequency_error_threshold=args.quality_frequency_error_threshold,
delay_amplitude_threshold=args.quality_delay_amplitude_threshold,
noise_amplitude_threshold=args.quality_noise_amplitude_threshold,
burst_amplitude_threshold=args.quality_burst_amplitude_threshold)
return quality_params
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description='Check signal quality of a wave file. Each channel should'
' either be all zeros, or sine wave of a fixed frequency.')
add_args(parser)
args = parse_args(parser)
level = logging.DEBUG if args.debug else logging.INFO
format = '%(asctime)-15s:%(levelname)s:%(pathname)s:%(lineno)d: %(message)s'
logging.basicConfig(format=format, level=level)
raw_data, rate = read_audio_file(args)
checker = QualityChecker(raw_data, rate)
quality_params = get_quality_params(args)
checker.do_spectral_analysis(ignore_high_freq=args.ignore_high_freq,
check_quality=(not args.spectral_only),
quality_params=quality_params)
if args.output_file:
checker.dump(args.output_file)
if args.freqs:
checker.check_freqs(args.freqs, args.freq_threshold)
if not args.spectral_only:
checker.check_quality()