blob: 09e97822932173f7899db8bb909bf0c4de7dc574 [file] [log] [blame]
# Copyright 2020 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Server side Bluetooth audio tests."""
import logging
import time
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import error
from autotest_lib.client.cros.bluetooth.bluetooth_audio_test_data import (
A2DP, HFP_WBS, HFP_NBS, audio_test_data)
from autotest_lib.server.cros.bluetooth.bluetooth_adapter_tests import (
BluetoothAdapterTests, test_retry_and_log)
class BluetoothAdapterAudioTests(BluetoothAdapterTests):
"""Server side Bluetooth adapter audio test class."""
DEVICE_TYPE = 'BLUETOOTH_AUDIO'
FREQUENCY_TOLERANCE_RATIO = {
A2DP: 0.01,
HFP_WBS: 0.01,
# NBS provides lower audio quality and thus has larger tolerance.
HFP_NBS: 0.05,
}
WAIT_DAEMONS_READY_SECS = 1
def _get_pulseaudio_bluez_source(self, get_source_method, device,
test_profile):
"""Get the specified bluez device number in the pulseaudio source list.
@param get_source_method: the method to get distinct bluez source
@param device: the bluetooth peer device
@param test_profile: the test profile used, A2DP, HFP_WBS or HFP_NBS
@returns: True if the specified bluez source is derived
"""
sources = device.ListSources(test_profile)
logging.debug('ListSources()\n%s', sources)
self.bluez_source = get_source_method(test_profile)
result = bool(self.bluez_source)
if result:
logging.debug('bluez_source device number: %s', self.bluez_source)
else:
logging.debug('waiting for bluez_source ready in pulseaudio...')
return result
def _get_pulseaudio_bluez_sink(self, get_sink_method, device, test_profile):
"""Get the specified bluez device number in the pulseaudio sink list.
@param get_sink_method: the method to get distinct bluez sink
@param device: the bluetooth peer device
@param test_profile: the test profile used, A2DP, HFP_WBS or HFP_NBS
@returns: True if the specified bluez sink is derived
"""
sinks = device.ListSinks(test_profile)
logging.debug('ListSinks()\n%s', sinks)
self.bluez_sink = get_sink_method(test_profile)
result = bool(self.bluez_sink)
if result:
logging.debug('bluez_sink device number: %s', self.bluez_sink)
else:
logging.debug('waiting for bluez_sink ready in pulseaudio...')
return result
def _get_pulseaudio_bluez_source_a2dp(self, device, test_profile):
"""Get the a2dp bluez source device number.
@param device: the bluetooth peer device
@param test_profile: the test profile used, A2DP, HFP_WBS or HFP_NBS
@returns: True if the specified a2dp bluez source is derived
"""
return self._get_pulseaudio_bluez_source(
device.GetBluezSourceA2DPDevice, device, test_profile)
def _get_pulseaudio_bluez_source_hfp(self, device, test_profile):
"""Get the hfp bluez source device number.
@param device: the bluetooth peer device
@param test_profile: the test profile used, A2DP, HFP_WBS or HFP_NBS
@returns: True if the specified hfp bluez source is derived
"""
return self._get_pulseaudio_bluez_source(
device.GetBluezSourceHFPDevice, device, test_profile)
def _get_pulseaudio_bluez_sink_hfp(self, device, test_profile):
"""Get the hfp bluez sink device number.
@param device: the bluetooth peer device
@param test_profile: the test profile used, A2DP, HFP_WBS or HFP_NBS
@returns: True if the specified hfp bluez sink is derived
"""
return self._get_pulseaudio_bluez_sink(
device.GetBluezSinkHFPDevice, device, test_profile)
def _check_audio_frames_legitimacy(self, audio_test_data, recording_device):
"""Check if audio frames in the recorded file are legitimate.
For a wav file, a simple check is to make sure the recorded audio file
is not empty.
For a raw file, a simple check is to make sure the recorded audio file
are not all zeros.
@param audio_test_data: a dictionary about the audio test data
defined in client/cros/bluetooth/bluetooth_audio_test_data.py
@param recording_device: which device recorded the audio,
possible values are 'recorded_by_dut' or 'recorded_by_peer'
@returns: True if audio frames are legitimate.
"""
result = self.bluetooth_facade.check_audio_frames_legitimacy(
audio_test_data, recording_device)
if not result:
self.results = {'audio_frames_legitimacy': 'empty or all zeros'}
logging.error('The recorded audio file is empty or all zeros.')
return result
def _check_frequency(self, test_profile, recorded_freq, expected_freq):
"""Check if the recorded frequency is within tolerance.
@param test_profile: the test profile used, A2DP, HFP_WBS or HFP_NBS
@param recorded_freq: the frequency of recorded audio
@param expected_freq: the expected frequency
@returns: True if the recoreded frequency falls within the tolerance of
the expected frequency
"""
tolerance = expected_freq * self.FREQUENCY_TOLERANCE_RATIO[test_profile]
return abs(expected_freq - recorded_freq) <= tolerance
def _check_primary_frequencies(self, test_profile, audio_test_data,
recording_device):
"""Check if the recorded frequencies meet expectation.
@param test_profile: the test profile used, A2DP, HFP_WBS or HFP_NBS
@param audio_test_data: a dictionary about the audio test data
defined in client/cros/bluetooth/bluetooth_audio_test_data.py
@param recording_device: which device recorded the audio,
possible values are 'recorded_by_dut' or 'recorded_by_peer'
@returns: True if the recorded frequencies of all channels fall within
the tolerance of expected frequencies
"""
recorded_frequencies = self.bluetooth_facade.get_primary_frequencies(
audio_test_data, recording_device)
expected_frequencies = audio_test_data['frequencies']
final_result = True
self.results = dict()
if len(recorded_frequencies) < len(expected_frequencies):
logging.error('recorded_frequencies: %s, expected_frequencies: %s',
str(recorded_frequencies), str(expected_frequencies))
final_result = False
else:
for channel, expected_freq in enumerate(expected_frequencies):
recorded_freq = recorded_frequencies[channel]
ret_val = self._check_frequency(
test_profile, recorded_freq, expected_freq)
pass_fail_str = 'pass' if ret_val else 'fail'
result = ('primary frequency %d (expected %d): %s' %
(recorded_freq, expected_freq, pass_fail_str))
self.results['Channel %d' % channel] = result
logging.info('Channel %d: %s', channel, result)
if not ret_val:
final_result = False
logging.debug(str(self.results))
if not final_result:
logging.error('Failure at checking primary frequencies')
return final_result
def _poll_for_condition(self, condition, timeout=20, sleep_interval=1,
desc='waiting for condition'):
try:
utils.poll_for_condition(condition=condition,
timeout=timeout,
sleep_interval=sleep_interval,
desc=desc)
except Exception as e:
raise error.TestError('Exception occurred when %s' % desc)
def initialize_bluetooth_audio(self, device, test_profile):
"""Initialize the Bluetooth audio task.
Note: pulseaudio is not stable. Need to restart it in the beginning.
@param device: the bluetooth peer device
@param test_profile: the test profile used, A2DP, HFP_WBS or HFP_NBS
"""
if not device.StartPulseaudio(test_profile):
raise error.TestError('Failed to start pulseaudio.')
logging.debug('pulseaudio is started.')
if test_profile in (HFP_WBS, HFP_NBS):
if device.StartOfono():
logging.debug('ofono is started.')
else:
raise error.TestError('Failed to start ofono.')
elif device.StopOfono():
logging.debug('ofono is stopped.')
else:
logging.warn('Failed to stop ofono. Ignored.')
# Need time to complete starting services.
time.sleep(self.WAIT_DAEMONS_READY_SECS)
def cleanup_bluetooth_audio(self, device, test_profile):
"""Cleanup for Bluetooth audio.
@param device: the bluetooth peer device
@param test_profile: the test profile used, A2DP, HFP_WBS or HFP_NBS
"""
if device.StopPulseaudio():
logging.debug('pulseaudio is stopped.')
else:
logging.warn('Failed to stop pulseaudio. Ignored.')
if device.StopOfono():
logging.debug('ofono is stopped.')
else:
logging.warn('Failed to stop ofono. Ignored.')
def initialize_bluetooth_player(self, device):
"""Initialize the Bluetooth media player.
@param device: the Bluetooth peer device.
"""
if not device.ExportMediaPlayer():
raise error.TestError('Failed to export media player.')
logging.debug('mpris-proxy is started.')
# Wait for player to show up and observed by playerctl.
desc='waiting for media player'
self._poll_for_condition(
lambda: bool(device.GetExportedMediaPlayer()), desc=desc)
def cleanup_bluetooth_player(self, device):
"""Cleanup for Bluetooth media player.
@param device: the bluetooth peer device.
"""
device.UnexportMediaPlayer()
# ---------------------------------------------------------------
# Definitions of all bluetooth audio test cases
# ---------------------------------------------------------------
@test_retry_and_log(False)
def test_a2dp_sinewaves(self, device):
"""Test Case: a2dp sinewaves
@param device: the bluetooth peer device
@returns: True if the recorded primary frequency is within the
tolerance of the playback sine wave frequency.
"""
a2dp_test_data = audio_test_data[A2DP]
# Wait for pulseaudio a2dp bluez source
desc='waiting for pulseaudio a2dp bluez source'
logging.debug(desc)
self._poll_for_condition(
lambda: self._get_pulseaudio_bluez_source_a2dp(device, A2DP),
desc=desc)
# Start recording audio on the peer Bluetooth audio device.
logging.debug('Start recording a2dp')
if not device.StartRecordingAudioSubprocess(A2DP):
raise error.TestError(
'Failed to record on the peer Bluetooth audio device.')
# Play stereo audio on the DUT.
logging.debug('Play audio')
if not self.bluetooth_facade.play_audio(a2dp_test_data):
raise error.TestError('DUT failed to play audio.')
# Stop recording audio on the peer Bluetooth audio device.
logging.debug('Stop recording a2dp')
if not device.StopRecordingingAudioSubprocess():
msg = 'Failed to stop recording on the peer Bluetooth audio device'
logging.error(msg)
# Copy the recorded audio file to the DUT for spectrum analysis.
logging.debug('Scp recorded file')
recorded_file = a2dp_test_data['recorded_by_peer']
device.ScpToDut(recorded_file, recorded_file, self.host.ip)
# Check if the audio frames in the recorded file are legitimate.
if not self._check_audio_frames_legitimacy(a2dp_test_data,
'recorded_by_peer'):
return False
# Check if the primary frequencies of recorded file meet expectation.
check_freq_result = self._check_primary_frequencies(
A2DP, a2dp_test_data, 'recorded_by_peer')
return check_freq_result
@test_retry_and_log(False)
def test_hfp_dut_as_source(self, device, test_profile):
"""Test Case: hfp sinewave streaming from dut to peer device
@param device: the bluetooth peer device
@param test_profile: which test profile is used, HFP_WBS or HFP_NBS
@returns: True if the recorded primary frequency is within the
tolerance of the playback sine wave frequency.
"""
hfp_test_data = audio_test_data[test_profile]
# Select audio input device.
desc='waiting for cras to select audio input device'
logging.debug(desc)
self._poll_for_condition(
lambda: self.bluetooth_facade.select_input_device(device.name),
desc=desc)
# Enable HFP profile.
logging.debug('Start recording audio on DUT')
if not self.bluetooth_facade.start_capturing_audio_subprocess(
hfp_test_data, 'recorded_by_peer'):
raise error.TestError('Peer failed to start capturing audio.')
# Wait for pulseaudio bluez hfp source
desc='waiting for pulseaudio bluez hfp source'
logging.debug(desc)
self._poll_for_condition(
lambda: self._get_pulseaudio_bluez_source_hfp(device,
test_profile),
desc=desc)
logging.debug('Start recording audio on Pi')
# Start recording audio on the peer Bluetooth audio device.
if not device.StartRecordingAudioSubprocess(test_profile):
raise error.TestError(
'Failed to record on the peer Bluetooth audio device.')
# Play audio on the DUT in a non-blocked way.
# If there are issues, cras_test_client playing back might be blocked
# forever. We would like to avoid the testing procedure from that.
logging.debug('Start playing audio')
if not self.bluetooth_facade.start_playing_audio_subprocess(
hfp_test_data):
raise error.TestError('DUT failed to play audio.')
time.sleep(hfp_test_data['duration'])
logging.debug('Stop recording audio on Pi')
# Stop recording audio on the peer Bluetooth audio device.
if not device.StopRecordingingAudioSubprocess():
msg = 'Failed to stop recording on the peer Bluetooth audio device'
logging.error(msg)
# Disable HFP profile.
logging.debug('Stop recording audio on DUT')
if not self.bluetooth_facade.stop_capturing_audio_subprocess():
raise error.TestError('DUT failed to stop capturing audio.')
# Stop playing audio on DUT.
logging.debug('Stop playing audio on DUT')
if not self.bluetooth_facade.stop_playing_audio_subprocess():
raise error.TestError('DUT failed to stop playing audio.')
# Copy the recorded audio file to the DUT for spectrum analysis.
logging.debug('Scp to DUT')
recorded_file = hfp_test_data['recorded_by_peer']
device.ScpToDut(recorded_file, recorded_file, self.host.ip)
# Check if the audio frames in the recorded file are legitimate.
if not self._check_audio_frames_legitimacy(hfp_test_data,
'recorded_by_peer'):
return False
# Check if the primary frequencies of recorded file meet expectation.
check_freq_result = self._check_primary_frequencies(
test_profile, hfp_test_data, 'recorded_by_peer')
return check_freq_result
@test_retry_and_log(False)
def test_hfp_dut_as_sink(self, device, test_profile):
"""Test Case: hfp sinewave streaming from peer device to dut
@param device: the bluetooth peer device
@param test_profile: which test profile is used, HFP_WBS or HFP_NBS
@returns: True if the recorded primary frequency is within the
tolerance of the playback sine wave frequency.
"""
hfp_test_data = audio_test_data[test_profile]
# Select audio input device.
desc='waiting for cras to select audio input device'
logging.debug(desc)
self._poll_for_condition(
lambda: self.bluetooth_facade.select_input_device(device.name),
desc=desc)
# Enable HFP profile.
logging.debug('Start recording audio on DUT')
if not self.bluetooth_facade.start_capturing_audio_subprocess(
hfp_test_data, 'recorded_by_dut'):
raise error.TestError('DUT failed to start capturing audio.')
# Wait for pulseaudio bluez hfp source
desc='waiting for pulseaudio bluez hfp sink'
logging.debug(desc)
self._poll_for_condition(
lambda: self._get_pulseaudio_bluez_sink_hfp(device,
test_profile),
desc=desc)
# Select audio input device.
logging.debug('Select input device')
if not self.bluetooth_facade.select_input_device(device.name):
raise error.TestError('DUT failed to select audio input device.')
# Start playing audio on chameleon.
logging.debug('Start playing audio on Pi')
if not device.StartPlayingAudioSubprocess(test_profile):
err = 'Failed to start playing audio file on the peer device'
raise error.TestError(err)
time.sleep(hfp_test_data['duration'])
# Stop playing audio on chameleon.
logging.debug('Stop playing audio on Pi')
if not device.StopPlayingAudioSubprocess():
err = 'Failed to stop playing audio on the peer device'
raise error.TestError(err)
# Disable HFP profile.
logging.debug('Stop recording audio on DUT')
if not self.bluetooth_facade.stop_capturing_audio_subprocess():
raise error.TestError('DUT failed to stop capturing audio.')
# Check if the audio frames in the recorded file are legitimate.
if not self._check_audio_frames_legitimacy(hfp_test_data,
'recorded_by_dut'):
return False
# Check if the primary frequencies of recorded file meet expectation.
check_freq_result = self._check_primary_frequencies(
test_profile, hfp_test_data, 'recorded_by_dut')
return check_freq_result
@test_retry_and_log(False)
def test_avrcp_commands(self, device):
"""Test Case: Test AVRCP commands issued by peer can be received at DUT
The very first AVRCP command (Linux evdev event) the DUT receives
contains extra information than just the AVRCP event, e.g. EV_REP
report used to specify delay settings. Send the first command before
the actual test starts to avoid dealing with them during test.
The peer device name is required to monitor the event reception on the
DUT. However, as the peer device itself already registered with the
kernel as an udev input device. The AVRCP profile will register as an
separate input device with the Bluetooth device as its name.
Temporarily substitute the device name with its Bluetooth address.
@param device: the Bluetooth peer device
@returns: True if the all AVRCP commands received by DUT, false
otherwise
"""
device.SendMediaPlayerCommand('play')
name = device.name
device.name = device.address.lower()
result_pause = self.test_avrcp_event(device,
device.SendMediaPlayerCommand, 'pause')
result_play = self.test_avrcp_event(device,
device.SendMediaPlayerCommand, 'play')
result_stop = self.test_avrcp_event(device,
device.SendMediaPlayerCommand, 'stop')
result_next = self.test_avrcp_event(device,
device.SendMediaPlayerCommand, 'next')
result_previous = self.test_avrcp_event(device,
device.SendMediaPlayerCommand, 'previous')
device.name = name
self.results = {'pause': result_pause, 'play': result_play,
'stop': result_stop, 'next': result_next,
'previous': result_previous}
return all(self.results.values())
@test_retry_and_log(False)
def test_avrcp_media_info(self, device):
"""Test Case: Test AVRCP media info sent by DUT can be received by peer
The test update all media information twice to prevent previous
leftover data affect the current iteration of test. Then compare the
expected results against the information received on the peer device.
This test verifies media information including: playback status,
length, title, artist, and album. Position of the media is not
currently support as playerctl on the peer side cannot correctly
retrieve such information.
Length and position information are transmitted in the unit of
microsecond. However, BlueZ process those time data in the resolution
of millisecond. Discard microsecond detail when comparing those media
information.
@param device: the Bluetooth peer device
@returns: True if the all AVRCP media info received by DUT, false
otherwise
"""
# First round of updating media information to overwrite all leftovers.
init_status = 'stopped'
init_length = 20200414
init_position = 8686868
init_metadata = {'album': 'metadata_album_init',
'artist': 'metadata_artist_init',
'title': 'metadata_title_init'}
self.bluetooth_facade.set_player_playback_status(init_status)
self.bluetooth_facade.set_player_length(init_length)
self.bluetooth_facade.set_player_position(init_position)
self.bluetooth_facade.set_player_metadata(init_metadata)
# Second round of updating for actual testing.
expected_status = 'playing'
expected_length = 68686868
expected_position = 20200414
expected_metadata = {'album': 'metadata_album_expected',
'artist': 'metadata_artist_expected',
'title': 'metadata_title_expected'}
self.bluetooth_facade.set_player_playback_status(expected_status)
self.bluetooth_facade.set_player_length(expected_length)
self.bluetooth_facade.set_player_position(expected_position)
self.bluetooth_facade.set_player_metadata(expected_metadata)
received_media_info = device.GetMediaPlayerMediaInfo()
logging.debug(received_media_info)
result_status = bool(expected_status ==
received_media_info.get('status').lower())
result_album = bool(expected_metadata['album'] ==
received_media_info.get('album'))
result_artist = bool(expected_metadata['artist'] ==
received_media_info.get('artist'))
result_title = bool(expected_metadata['title'] ==
received_media_info.get('title'))
result_length = bool(expected_length / 1000 ==
int(received_media_info.get('length')) / 1000)
self.results = {'status': result_status, 'album': result_album,
'artist': result_artist, 'title': result_title,
'length': result_length}
return all(self.results.values())