blob: ec87a91fa6d9a775636bc9edbc6e891940eb7ac0 [file] [log] [blame]
# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import os
import random
import re
import subprocess
import time
from autotest_lib.client.bin import test
from autotest_lib.client.common_lib import error
from autotest_lib.client.cros.audio import audio_helper
_STREAM_TYPE_INPUT = 0
_STREAM_TYPE_OUTPUT = 1
class audio_CrasStress(test.test):
"""Checks if output buffer will drift to super high level."""
version = 2
_MAX_STREAMS = 3
_LOOP_COUNT = 300
_INPUT_BUFFER_LEVEL = '.*?READ_AUDIO.*?hw_level.*?(\d+).*?'
_OUTPUT_BUFFER_LEVEL = '.*?FILL_AUDIO.*?hw_level.*?(\d+).*?'
_CHECK_PERIOD_TIME_SECS = 1 # Check buffer level every second.
"""
We only run 1024 and 512 block size streams in this test. So buffer level
of input device should stay between 0 and 1024. Buffer level of output
device should between 1024 to 2048. Sometimes it will be a little more.
Therefore, we set input buffer criteria to 2 * 1024 and output buffer
criteria to 3 * 1024.
"""
_RATES = ['48000', '44100']
_BLOCK_SIZES = ['512', '1024']
_INPUT_BUFFER_DRIFT_CRITERIA = 2 * 1024
_OUTPUT_BUFFER_DRIFT_CRITERIA = 3 * 1024
def _new_stream(self, stream_type):
"""Runs new stream by cras_test_client."""
if stream_type == _STREAM_TYPE_INPUT:
cmd = ['cras_test_client', '--capture_file', '/dev/null']
else:
cmd = ['cras_test_client', '--playback_file', '/dev/zero']
cmd += ['--rate', self._RATES[random.randint(0, 1)],
'--block_size', self._BLOCK_SIZES[random.randint(0, 1)]]
return subprocess.Popen(cmd)
def _check_buffer_level(self, stream_type):
buffer_level = self._get_buffer_level(stream_type)
if stream_type == _STREAM_TYPE_INPUT:
logging.debug("Max input buffer level: %d", buffer_level)
if buffer_level > self._INPUT_BUFFER_DRIFT_CRITERIA:
audio_helper.dump_audio_diagnostics(
os.path.join(self.resultsdir, "audio_diagnostics.txt"))
raise error.TestFail('Input buffer level %d drift too high' %
buffer_level)
if stream_type == _STREAM_TYPE_OUTPUT:
logging.debug("Max output buffer level: %d", buffer_level)
if buffer_level > self._OUTPUT_BUFFER_DRIFT_CRITERIA:
audio_helper.dump_audio_diagnostics(
os.path.join(self.resultsdir, "audio_diagnostics.txt"))
raise error.TestFail('Output buffer level %d drift too high' %
buffer_level)
def cleanup(self):
"""Clean up all streams."""
while len(self._streams) > 0:
self._streams[0].kill()
self._streams.remove(self._streams[0])
def run_once(self, input_stream=True, output_stream=True):
"""
Repeatedly add output streams of random configurations and
remove them to verify if output buffer level would drift.
@params input_stream: If true, run input stream in the test.
@params output_stream: If true, run output stream in the test.
"""
if not input_stream and not output_stream:
raise error.TestError('Not supported mode.')
self._streams = []
loop_count = 0
past_time = time.time()
while loop_count < self._LOOP_COUNT:
# 1 for adding stream, 0 for removing stream.
add = random.randint(0, 1)
if not self._streams:
add = 1
elif len(self._streams) == self._MAX_STREAMS:
add = 0
if add == 1:
# 0 for input stream, 1 for output stream.
stream_type = random.randint(0, 1)
if not input_stream:
stream_type = _STREAM_TYPE_OUTPUT
elif not output_stream:
stream_type = _STREAM_TYPE_INPUT
self._streams.append(self._new_stream(stream_type))
else:
self._streams[0].kill()
self._streams.remove(self._streams[0])
time.sleep(0.1)
now = time.time()
# Check buffer level.
if now - past_time > self._CHECK_PERIOD_TIME_SECS:
past_time = now
if input_stream:
self._check_buffer_level(_STREAM_TYPE_INPUT)
if output_stream:
self._check_buffer_level(_STREAM_TYPE_OUTPUT)
loop_count += 1
def _get_buffer_level(self, stream_type):
"""Gets a rough number about current buffer level.
@returns: The current buffer level.
"""
if stream_type == _STREAM_TYPE_INPUT:
match_str = self._INPUT_BUFFER_LEVEL
else:
match_str = self._OUTPUT_BUFFER_LEVEL
proc = subprocess.Popen(['cras_test_client', '--dump_a'],
stdout=subprocess.PIPE)
output, err = proc.communicate()
buffer_level = 0
for line in output.split('\n'):
search = re.match(match_str, line)
if search:
tmp = int(search.group(1))
if tmp > buffer_level:
buffer_level = tmp
return buffer_level