blob: 83367becfaba1bf226159b7cb5866cf49adea39d [file] [log] [blame]
from autotest_lib.client.common_lib.cros.cfm import cras_input_node
from autotest_lib.client.common_lib.cros.cfm import cras_output_node
class CrasNodeCollector(object):
"""Utility class for obtaining node data from cras_test_client."""
DEVICE_COLUMN_COUNT = 2
NODE_COLUMN_COUNT = 8
DEVICE_HEADERS = ['ID', 'Name']
OUTPUT_NODE_HEADERS = ['Stable Id', 'ID', 'Vol', 'Plugged', 'L/R swapped',
'Time Hotword', 'Type', 'Name']
INPUT_NODE_HEADERS = ['Stable Id', 'ID', 'Gain', 'Plugged', 'L/Rswapped',
'Time Hotword', 'Type', 'Name']
def __init__(self, host):
"""
Constructor
@param host the device under test (CrOS).
"""
self._host = host
def _replace_multiple_whitespace_with_one(self, string):
"""
Replace multiple sequential whitespaces with a single whitespace.
@returns a string
"""
return ' '.join(string.split())
def _construct_columns(self, columns_str, columns_count):
"""
Constructs a list of strings from a single string.
@param columns_str A whitespace separated list of values.
@param columns_count number of columns to create
@returns a list with strings.
"""
# 1) Replace multiple whitespaces with one
# 2) Split on whitespace, create nof_columns columns
columns_str = self._replace_multiple_whitespace_with_one(columns_str)
return columns_str.split(None, columns_count-1)
def _collect_input_device_data(self):
cmd = ("cras_test_client --dump_server_info "
" | awk '/Input Devices:/,/Input Nodes:/'")
lines = self._host.run_output(cmd).split('\n')
# Ignore the first two lines ("Input Devices" and headers) and the
# last line ("Input Nodes")
lines = lines[2:-1]
rows = [self._construct_columns(line, self.DEVICE_COLUMN_COUNT)
for line in lines]
return [dict(zip(self.DEVICE_HEADERS, row)) for row in rows]
def _collect_output_device_data(self):
cmd = ("cras_test_client --dump_server_info "
" | awk '/Output Devices:/,/Output Nodes:/'")
lines = self._host.run_output(cmd).split('\n')
# Ignore the first two lines ("Output Devices" and headers) and the
# last line ("Output Nodes")
lines = lines[2:-1]
rows = [self._construct_columns(line, self.DEVICE_COLUMN_COUNT)
for line in lines]
return [dict(zip(self.DEVICE_HEADERS, row)) for row in rows]
def _collect_output_node_cras_data(self):
"""
Collects output nodes data using cras_test_client.
@returns a list of dictionaries where keys are in OUTPUT_NODE_HEADERS
"""
# It's a bit hacky to use awk; we should probably do the parsing
# in Python instead using textfsm or some other lib.
cmd = ("cras_test_client --dump_server_info"
"| awk '/Output Nodes:/,/Input Devices:/'")
lines = self._host.run_output(cmd).split('\n')
# Ignore the first two lines ("Output Nodes:" and headers) and the
# last line ("Input Devices:")
lines = lines[2:-1]
rows = [self._construct_columns(line, self.NODE_COLUMN_COUNT)
for line in lines]
return [dict(zip(self.OUTPUT_NODE_HEADERS, row)) for row in rows]
def _collect_input_node_cras_data(self):
"""
Collects input nodes data using cras_test_client.
@returns a list of dictionaries where keys are in INPUT_NODE_HEADERS
"""
cmd = ("cras_test_client --dump_server_info "
" | awk '/Input Nodes:/,/Attached clients:/'")
lines = self._host.run_output(cmd).split('\n')
# Ignore the first two lines ("Input Nodes:" and headers) and the
# last line ("Attached clients:")
lines = lines[2:-1]
rows = [self._construct_columns(line, self.NODE_COLUMN_COUNT)
for line in lines]
return [dict(zip(self.INPUT_NODE_HEADERS, row)) for row in rows]
def _get_device_name(self, device_data, device_id):
for device in device_data:
if device['ID'] == device_id:
return device['Name']
return None
def _create_input_node(self, node_data, device_data):
"""
Create a CrasInputNode.
@param node_data Input Node data
@param device_data Input Device data
@return CrasInputNode
"""
device_id = node_data['ID'].split(':')[0]
device_name = self._get_device_name(device_data, device_id)
return cras_input_node.CrasInputNode(
node_id=node_data['ID'],
node_name=node_data['Name'],
gain=node_data['Gain'],
node_type=node_data['Type'],
device_id=device_id,
device_name=device_name)
def _create_output_node(self, node_data, device_data):
"""
Create a CrasOutputNode.
@param node_data Output Node data
@param device_data Output Device data
@return CrasOutputNode
"""
device_id = node_data['ID'].split(':')[0]
device_name = self._get_device_name(device_data, device_id)
return cras_output_node.CrasOutputNode(
node_id=node_data['ID'],
node_type=node_data['Type'],
node_name=node_data['Name'],
volume=node_data['Vol'],
device_id=device_id,
device_name=device_name)
def get_input_nodes(self):
device_data = self._collect_input_device_data()
node_data = self._collect_input_node_cras_data()
return [self._create_input_node(d, device_data) for d in node_data]
def get_output_nodes(self):
device_data = self._collect_output_device_data()
node_data = self._collect_output_node_cras_data()
return [self._create_output_node(d, device_data) for d in node_data]