blob: 7daf0a6dcca2b3d28cc734f174d5ac715d1a1b6a [file] [log] [blame]
# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Feedback implementation for audio with closed-loop cable."""
import logging
import os
import tempfile
import common
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib.feedback import client
from autotest_lib.server.brillo import audio_utils
from autotest_lib.server.brillo import host_utils
# Constants used when recording playback.
#
_REC_FILENAME = 'rec_file.wav'
_REC_DURATION = 10
# Number of channels to record.
_DEFAULT_NUM_CHANNELS = 1
# Recording sample rate (48kHz).
_DEFAULT_SAMPLE_RATE = 48000
# Recording sample format is signed 16-bit PCM (two bytes).
_DEFAULT_SAMPLE_WIDTH = 2
# Default frequency to generate audio at (used for recording).
_DEFAULT_FREQUENCY = 440
# The peak when recording silence is 5% of the max volume.
_SILENCE_THRESHOLD = 0.05
def _max_volume(sample_width):
"""Returns the maximum possible volume.
This is the highest absolute value of an integer of a given width.
If the sample width is one, then we assume an unsigned intger. For all other
sample sizes, we assume that the format is signed.
@param sample_width: The sample width in bytes.
"""
return (1 << 8) if sample_width == 1 else (1 << (sample_width * 8 - 1))
class Client(client.Client):
"""Audio closed-loop feedback implementation.
This class (and the queries it instantiates) perform playback and recording
of audio on the DUT itself, with the assumption that the audio in/out
connections are cross-wired with a cable. It provides some shared logic
that queries can use for handling the DUT as well as maintaining shared
state between queries (such as an audible volume threshold).
"""
def __init__(self):
"""Construct the client library."""
super(Client, self).__init__()
self.host = None
self.dut_tmp_dir = None
self.tmp_dir = None
def set_audible_threshold(self, threshold):
"""Sets the audible volume threshold.
@param threshold: New threshold value.
"""
self.audible_threshold = threshold
# Interface overrides.
#
def _initialize_impl(self, test, host):
"""Initializes the feedback object.
@param test: An object representing the test case.
@param host: An object representing the DUT.
"""
self.host = host
self.tmp_dir = test.tmpdir
self.dut_tmp_dir = host.get_tmp_dir()
def _finalize_impl(self):
"""Finalizes the feedback object."""
pass
def _new_query_impl(self, query_id):
"""Instantiates a new query.
@param query_id: A query identifier.
@return A query object.
@raise error.TestError: Query is not supported.
"""
if query_id == client.QUERY_AUDIO_PLAYBACK_SILENT:
return SilentPlaybackAudioQuery(self)
elif query_id == client.QUERY_AUDIO_PLAYBACK_AUDIBLE:
return AudiblePlaybackAudioQuery(self)
elif query_id == client.QUERY_AUDIO_RECORDING:
return RecordingAudioQuery(self)
else:
raise error.TestError('Unsupported query (%s)' % query_id)
class _PlaybackAudioQuery(client.OutputQuery):
"""Playback query base class."""
def __init__(self, client):
"""Constructor.
@param client: The instantiating client object.
"""
super(_PlaybackAudioQuery, self).__init__()
self.client = client
self.dut_rec_filename = None
self.local_tmp_dir = None
self.recording_pid = None
def _get_local_rec_filename(self):
"""Waits for recording to finish and copies the file to the host.
@return A string of the local filename containing the recorded audio.
@raise error.TestError: Error while validating the recording.
"""
# Wait for recording to finish.
timeout = _REC_DURATION + 5
if not host_utils.wait_for_process(self.client.host,
self.recording_pid, timeout):
raise error.TestError(
'Recording did not terminate within %d seconds' % timeout)
_, local_rec_filename = tempfile.mkstemp(
prefix='recording-', suffix='.wav', dir=self.local_tmp_dir)
self.client.host.get_file(self.dut_rec_filename,
local_rec_filename, delete_dest=True)
return local_rec_filename
# Implementation overrides.
#
def _prepare_impl(self,
sample_width=_DEFAULT_SAMPLE_WIDTH,
sample_rate=_DEFAULT_SAMPLE_RATE,
num_channels=_DEFAULT_NUM_CHANNELS,
duration_secs=_REC_DURATION):
"""Implementation of query preparation logic.
@sample_width: Sample width to record at.
@sample_rate: Sample rate to record at.
@num_channels: Number of channels to record at.
@duration_secs: Duration (in seconds) to record for.
"""
self.num_channels = num_channels
self.sample_rate = sample_rate
self.sample_width = sample_width
self.dut_rec_filename = os.path.join(self.client.dut_tmp_dir,
_REC_FILENAME)
self.local_tmp_dir = tempfile.mkdtemp(dir=self.client.tmp_dir)
# Trigger recording in the background.
cmd = ('slesTest_recBuffQueue -c%d -d%d -r%d -%d %s' %
(num_channels, duration_secs, sample_rate, sample_width,
self.dut_rec_filename))
logging.info("Recording cmd: %s", cmd)
self.recording_pid = host_utils.run_in_background(self.client.host, cmd)
class SilentPlaybackAudioQuery(_PlaybackAudioQuery):
"""Implementation of a silent playback query."""
def __init__(self, client):
super(SilentPlaybackAudioQuery, self).__init__(client)
# Implementation overrides.
#
def _validate_impl(self):
"""Implementation of query validation logic."""
local_rec_filename = self._get_local_rec_filename()
try:
silence_peaks = audio_utils.check_wav_file(
local_rec_filename,
num_channels=self.num_channels,
sample_rate=self.sample_rate,
sample_width=self.sample_width)
except ValueError as e:
raise error.TestFail('Invalid file attributes: %s' % e)
silence_peak = max(silence_peaks)
# Fail if the silence peak volume exceeds the maximum allowed.
max_vol = _max_volume(self.sample_width) * _SILENCE_THRESHOLD
if silence_peak > max_vol:
logging.error('Silence peak level (%d) exceeds the max allowed '
'(%d)', silence_peak, max_vol)
raise error.TestFail('Environment is too noisy')
# Update the client audible threshold, if so instructed.
audible_threshold = silence_peak * 15
logging.info('Silent peak level (%d) is below the max allowed (%d); '
'setting audible threshold to %d',
silence_peak, max_vol, audible_threshold)
self.client.set_audible_threshold(audible_threshold)
class AudiblePlaybackAudioQuery(_PlaybackAudioQuery):
"""Implementation of an audible playback query."""
def __init__(self, client):
super(AudiblePlaybackAudioQuery, self).__init__(client)
def _check_peaks(self):
"""Ensure that peak recording volume exceeds the threshold."""
local_rec_filename = self._get_local_rec_filename()
try:
audible_peaks = audio_utils.check_wav_file(
local_rec_filename,
num_channels=self.num_channels,
sample_rate=self.sample_rate,
sample_width=self.sample_width)
except ValueError as e:
raise error.TestFail('Invalid file attributes: %s' % e)
min_channel, min_audible_peak = min(enumerate(audible_peaks),
key=lambda p: p[1])
if min_audible_peak < self.client.audible_threshold:
logging.error(
'Audible peak level (%d) is less than expected (%d) for '
'channel %d', min_audible_peak,
self.client.audible_threshold, min_channel)
raise error.TestFail(
'The played audio peak level is below the expected '
'threshold. Either playback did not work, or the volume '
'level is too low. Check the audio connections and '
'settings on the DUT.')
logging.info('Audible peak level (%d) exceeds the threshold (%d)',
min_audible_peak, self.client.audible_threshold)
# Implementation overrides.
#
def _validate_impl(self, audio_file=None):
"""Implementation of query validation logic.
@audio_file: File to compare recorded audio to.
"""
self._check_peaks()
# If the reference audio file is available, then perform an additional
# check.
if audio_file:
local_rec_filename = self._get_local_rec_filename()
audio_utils.compare_file(reference_audio_filename=audio_file,
test_audio_filename=local_rec_filename)
class RecordingAudioQuery(client.InputQuery):
"""Implementation of a recording query."""
def __init__(self, client):
super(RecordingAudioQuery, self).__init__()
self.client = client
def _prepare_impl(self, use_file=False,
sample_width=_DEFAULT_SAMPLE_WIDTH,
sample_rate=_DEFAULT_SAMPLE_RATE,
num_channels=_DEFAULT_NUM_CHANNELS,
duration_secs=_REC_DURATION,
frequency=_DEFAULT_FREQUENCY):
"""Implementation of query preparation logic.
@param use_file: A bool to indicate whether a file should be used for
playback. The other arguments are only valid if
use_file is True.
@param sample_width: Size of samples in bytes.
@param sample_rate: Recording sample rate in hertz.
@param num_channels: Number of channels to use for playback.
@param duration_secs: Number of seconds to play audio for.
@param frequency: Frequency of sine wave to generate.
"""
self.use_file = use_file
self.sample_rate = sample_rate
self.sample_width = sample_width
self.num_channels = num_channels
self.duration_secs = duration_secs
self.frequency = frequency
def _emit_impl(self):
"""Implementation of query emission logic."""
if self.use_file:
self.reference_filename, dut_play_file = \
audio_utils.generate_sine_file(
self.client.host, self.num_channels,
self.sample_rate, self.sample_width,
self.duration_secs, self.frequency,
self.client.tmp_dir)
playback_cmd = 'slesTest_playFdPath %s 0' % dut_play_file
self.client.host.run(playback_cmd)
else:
self.client.host.run('slesTest_sawtoothBufferQueue')
def _validate_impl(self, captured_audio_file,
peak_percent_min=1, peak_percent_max=100):
"""Implementation of query validation logic.
@param captured_audio_file: Path to the recorded WAV file.
@peak_percent_min: Lower bound on peak recorded volume as percentage of
max molume (0-100). Default is 1%.
@peak_percent_max: Upper bound on peak recorded volume as percentage of
max molume (0-100). Default is 100% (no limit).
"""
try:
recorded_peaks = audio_utils.check_wav_file(
captured_audio_file, num_channels=self.num_channels,
sample_rate=self.sample_rate,
sample_width=self.sample_width)
except ValueError as e:
raise error.TestFail('Recorded audio file is invalid: %s' % e)
max_volume = _max_volume(self.sample_width)
peak_min = max_volume * peak_percent_min / 100
peak_max = max_volume * peak_percent_max / 100
for channel, recorded_peak in enumerate(recorded_peaks):
if recorded_peak < peak_min:
logging.error(
'Recorded audio peak level (%d) is less than expected '
'(%d) for channel %d', recorded_peak, peak_min, channel)
raise error.TestFail(
'The recorded audio peak level is below the expected '
'threshold. Either recording did not capture the '
'produced audio, or the recording level is too low. '
'Check the audio connections and settings on the DUT.')
if recorded_peak > peak_max:
logging.error(
'Recorded audio peak level (%d) is more than expected '
'(%d) for channel %d', recorded_peak, peak_max, channel)
raise error.TestFail(
'The recorded audio peak level exceeds the expected '
'maximum. Either recording captured much background '
'noise, or the recording level is too high. Check the '
'audio connections and settings on the DUT.')
if self.use_file:
audio_utils.compare_file(
reference_audio_filename=self.reference_filename,
test_audio_filename=captured_audio_file)