| # Copyright (c) 2013 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """This module provides cras audio utilities.""" |
| |
| import logging |
| import re |
| |
| from autotest_lib.client.cros.audio import cmd_utils |
| |
| _CRAS_TEST_CLIENT = '/usr/bin/cras_test_client' |
| |
| |
| class CrasUtilsError(Exception): |
| """Error in CrasUtils.""" |
| pass |
| |
| |
| def playback(blocking=True, *args, **kargs): |
| """A helper function to execute the playback_cmd. |
| |
| @param blocking: Blocks this call until playback finishes. |
| @param args: args passed to playback_cmd. |
| @param kargs: kargs passed to playback_cmd. |
| |
| @returns: The process running the playback command. Note that if the |
| blocking parameter is true, this will return a finished process. |
| """ |
| process = cmd_utils.popen(playback_cmd(*args, **kargs)) |
| if blocking: |
| cmd_utils.wait_and_check_returncode(process) |
| return process |
| |
| |
| def capture(*args, **kargs): |
| """A helper function to execute the capture_cmd. |
| |
| @param args: args passed to capture_cmd. |
| @param kargs: kargs passed to capture_cmd. |
| |
| """ |
| cmd_utils.execute(capture_cmd(*args, **kargs)) |
| |
| |
| def playback_cmd(playback_file, block_size=None, duration=None, |
| pin_device=None, channels=2, rate=48000): |
| """Gets a command to playback a file with given settings. |
| |
| @param playback_file: the name of the file to play. '-' indicates to |
| playback raw audio from the stdin. |
| @param pin_device: the device id to playback on. |
| @param block_size: the number of frames per callback(dictates latency). |
| @param duration: seconds to playback. |
| @param channels: number of channels. |
| @param rate: the sampling rate. |
| |
| @returns: The command args put in a list of strings. |
| |
| """ |
| args = [_CRAS_TEST_CLIENT] |
| args += ['--playback_file', playback_file] |
| if pin_device is not None: |
| args += ['--pin_device', str(pin_device)] |
| if block_size is not None: |
| args += ['--block_size', str(block_size)] |
| if duration is not None: |
| args += ['--duration', str(duration)] |
| args += ['--num_channels', str(channels)] |
| args += ['--rate', str(rate)] |
| return args |
| |
| |
| def capture_cmd( |
| capture_file, block_size=None, duration=10, channels=1, rate=48000): |
| """Gets a command to capture the audio into the file with given settings. |
| |
| @param capture_file: the name of file the audio to be stored in. |
| @param block_size: the number of frames per callback(dictates latency). |
| @param duration: seconds to record. If it is None, duration is not set, |
| and command will keep capturing audio until it is |
| terminated. |
| @param channels: number of channels. |
| @param rate: the sampling rate. |
| |
| @returns: The command args put in a list of strings. |
| |
| """ |
| args = [_CRAS_TEST_CLIENT] |
| args += ['--capture_file', capture_file] |
| if block_size is not None: |
| args += ['--block_size', str(block_size)] |
| if duration is not None: |
| args += ['--duration', str(duration)] |
| args += ['--num_channels', str(channels)] |
| args += ['--rate', str(rate)] |
| return args |
| |
| |
| def listen_cmd( |
| capture_file, block_size=None, duration=10, channels=1, rate=48000): |
| """Gets a command to listen on hotword and record audio into the file with |
| given settings. |
| |
| @param capture_file: the name of file the audio to be stored in. |
| @param block_size: the number of frames per callback(dictates latency). |
| @param duration: seconds to record. If it is None, duration is not set, |
| and command will keep capturing audio until it is |
| terminated. |
| @param channels: number of channels. |
| @param rate: the sampling rate. |
| |
| @returns: The command args put in a list of strings. |
| |
| """ |
| args = [_CRAS_TEST_CLIENT] |
| args += ['--listen_for_hotword', capture_file] |
| if block_size is not None: |
| args += ['--block_size', str(block_size)] |
| if duration is not None: |
| args += ['--duration', str(duration)] |
| args += ['--num_channels', str(channels)] |
| args += ['--rate', str(rate)] |
| return args |
| |
| |
| def loopback(*args, **kargs): |
| """A helper function to execute loopback_cmd. |
| |
| @param args: args passed to loopback_cmd. |
| @param kargs: kargs passed to loopback_cmd. |
| |
| """ |
| |
| cmd_utils.execute(loopback_cmd(*args, **kargs)) |
| |
| |
| def loopback_cmd(output_file, duration=10, channels=2, rate=48000): |
| """Gets a command to record the loopback. |
| |
| @param output_file: The name of the file the loopback to be stored in. |
| @param channels: The number of channels of the recorded audio. |
| @param duration: seconds to record. |
| @param rate: the sampling rate. |
| |
| @returns: The command args put in a list of strings. |
| |
| """ |
| args = [_CRAS_TEST_CLIENT] |
| args += ['--loopback_file', output_file] |
| args += ['--duration_seconds', str(duration)] |
| args += ['--num_channels', str(channels)] |
| args += ['--rate', str(rate)] |
| return args |
| |
| |
| def get_cras_nodes_cmd(): |
| """Gets a command to query the nodes from Cras. |
| |
| @returns: The command to query nodes information from Cras using dbus-send. |
| |
| """ |
| return ('dbus-send --system --type=method_call --print-reply ' |
| '--dest=org.chromium.cras /org/chromium/cras ' |
| 'org.chromium.cras.Control.GetNodes') |
| |
| |
| def set_system_volume(volume): |
| """Set the system volume. |
| |
| @param volume: the system output vlume to be set(0 - 100). |
| |
| """ |
| get_cras_control_interface().SetOutputVolume(volume) |
| |
| |
| def set_node_volume(node_id, volume): |
| """Set the volume of the given output node. |
| |
| @param node_id: the id of the output node to be set the volume. |
| @param volume: the volume to be set(0-100). |
| |
| """ |
| get_cras_control_interface().SetOutputNodeVolume(node_id, volume) |
| |
| |
| def set_capture_gain(gain): |
| """Set the system capture gain. |
| |
| @param gain the capture gain in db*100 (100 = 1dB) |
| |
| """ |
| get_cras_control_interface().SetInputGain(gain) |
| |
| |
| def get_cras_control_interface(private=False): |
| """Gets Cras DBus control interface. |
| |
| @param private: Set to True to use a new instance for dbus.SystemBus |
| instead of the shared instance. |
| |
| @returns: A dBus.Interface object with Cras Control interface. |
| |
| @raises: ImportError if this is not called on Cros device. |
| |
| """ |
| try: |
| import dbus |
| except ImportError, e: |
| logging.exception( |
| 'Can not import dbus: %s. This method should only be ' |
| 'called on Cros device.', e) |
| raise |
| bus = dbus.SystemBus(private=private) |
| cras_object = bus.get_object('org.chromium.cras', '/org/chromium/cras') |
| return dbus.Interface(cras_object, 'org.chromium.cras.Control') |
| |
| |
| def get_cras_nodes(): |
| """Gets nodes information from Cras. |
| |
| @returns: A dict containing information of each node. |
| |
| """ |
| return get_cras_control_interface().GetNodes() |
| |
| |
| def get_selected_nodes(): |
| """Gets selected output nodes and input nodes. |
| |
| @returns: A tuple (output_nodes, input_nodes) where each |
| field is a list of selected node IDs returned from Cras DBus API. |
| Note that there may be multiple output/input nodes being selected |
| at the same time. |
| |
| """ |
| output_nodes = [] |
| input_nodes = [] |
| nodes = get_cras_nodes() |
| for node in nodes: |
| if node['Active']: |
| if node['IsInput']: |
| input_nodes.append(node['Id']) |
| else: |
| output_nodes.append(node['Id']) |
| return (output_nodes, input_nodes) |
| |
| |
| def set_selected_output_node_volume(volume): |
| """Sets the selected output node volume. |
| |
| @param volume: the volume to be set (0-100). |
| |
| """ |
| selected_output_node_ids, _ = get_selected_nodes() |
| for node_id in selected_output_node_ids: |
| set_node_volume(node_id, volume) |
| |
| |
| def get_active_stream_count(): |
| """Gets the number of active streams. |
| |
| @returns: The number of active streams. |
| |
| """ |
| return int(get_cras_control_interface().GetNumberOfActiveStreams()) |
| |
| |
| def set_system_mute(is_mute): |
| """Sets the system mute switch. |
| |
| @param is_mute: Set True to mute the system playback. |
| |
| """ |
| get_cras_control_interface().SetOutputMute(is_mute) |
| |
| |
| def set_capture_mute(is_mute): |
| """Sets the capture mute switch. |
| |
| @param is_mute: Set True to mute the capture. |
| |
| """ |
| get_cras_control_interface().SetInputMute(is_mute) |
| |
| |
| def node_type_is_plugged(node_type, nodes_info): |
| """Determine if there is any node of node_type plugged. |
| |
| This method is used in the AudioLoopbackDongleLabel class, where the |
| call is executed on autotest server. Use get_cras_nodes instead if |
| the call can be executed on Cros device. |
| |
| Since Cras only reports the plugged node in GetNodes, we can |
| parse the return value to see if there is any node with the given type. |
| For example, if INTERNAL_MIC is of intereset, the pattern we are |
| looking for is: |
| |
| dict entry( |
| string "Type" |
| variant string "INTERNAL_MIC" |
| ) |
| |
| @param node_type: A str representing node type defined in CRAS_NODE_TYPES. |
| @param nodes_info: A str containing output of command get_nodes_cmd. |
| |
| @returns: True if there is any node of node_type plugged. False otherwise. |
| |
| """ |
| match = re.search(r'string "Type"\s+variant\s+string "%s"' % node_type, |
| nodes_info) |
| return True if match else False |
| |
| |
| # Cras node types reported from Cras DBus control API. |
| CRAS_OUTPUT_NODE_TYPES = ['HEADPHONE', 'INTERNAL_SPEAKER', 'HDMI', 'USB', |
| 'BLUETOOTH', 'LINEOUT', 'UNKNOWN'] |
| CRAS_INPUT_NODE_TYPES = ['MIC', 'INTERNAL_MIC', 'USB', 'BLUETOOTH', |
| 'POST_DSP_LOOPBACK', 'POST_MIX_LOOPBACK', 'UNKNOWN', |
| 'KEYBOARD_MIC', 'HOTWORD', 'FRONT_MIC', 'REAR_MIC'] |
| CRAS_NODE_TYPES = CRAS_OUTPUT_NODE_TYPES + CRAS_INPUT_NODE_TYPES |
| |
| |
| def get_filtered_node_types(callback): |
| """Returns the pair of filtered output node types and input node types. |
| |
| @param callback: A callback function which takes a node as input parameter |
| and filter the node based on its return value. |
| |
| @returns: A tuple (output_node_types, input_node_types) where each |
| field is a list of node types defined in CRAS_NODE_TYPES, |
| and their 'attribute_name' is True. |
| |
| """ |
| output_node_types = [] |
| input_node_types = [] |
| nodes = get_cras_nodes() |
| for node in nodes: |
| if callback(node): |
| node_type = str(node['Type']) |
| if node_type not in CRAS_NODE_TYPES: |
| raise RuntimeError( |
| 'node type %s is not valid' % node_type) |
| if node['IsInput']: |
| input_node_types.append(node_type) |
| else: |
| output_node_types.append(node_type) |
| return (output_node_types, input_node_types) |
| |
| |
| def get_selected_node_types(): |
| """Returns the pair of active output node types and input node types. |
| |
| @returns: A tuple (output_node_types, input_node_types) where each |
| field is a list of selected node types defined in CRAS_NODE_TYPES. |
| |
| """ |
| def is_selected(node): |
| """Checks if a node is selected. |
| |
| A node is selected if its Active attribute is True. |
| |
| @returns: True is a node is selected, False otherwise. |
| |
| """ |
| return node['Active'] |
| |
| return get_filtered_node_types(is_selected) |
| |
| |
| def get_selected_input_device_name(): |
| """Returns the device name of the active input node. |
| |
| @returns: device name string. E.g. kbl_r5514_5663_max: :0,1 |
| """ |
| nodes = get_cras_nodes() |
| for node in nodes: |
| if node['Active'] and node['IsInput']: |
| return node['DeviceName'] |
| return None |
| |
| |
| def get_selected_output_device_name(): |
| """Returns the device name of the active output node. |
| |
| @returns: device name string. E.g. mtk-rt5650: :0,0 |
| """ |
| nodes = get_cras_nodes() |
| for node in nodes: |
| if node['Active'] and not node['IsInput']: |
| return node['DeviceName'] |
| return None |
| |
| |
| def get_selected_output_device_type(): |
| """Returns the device type of the active output node. |
| |
| @returns: device type string. E.g. INTERNAL_SPEAKER |
| """ |
| nodes = get_cras_nodes() |
| for node in nodes: |
| if node['Active'] and not node['IsInput']: |
| return node['Type'] |
| return None |
| |
| |
| def get_plugged_node_types(): |
| """Returns the pair of plugged output node types and input node types. |
| |
| @returns: A tuple (output_node_types, input_node_types) where each |
| field is a list of plugged node types defined in CRAS_NODE_TYPES. |
| |
| """ |
| def is_plugged(node): |
| """Checks if a node is plugged and is not unknown node. |
| |
| Cras DBus API only reports plugged node, so every node reported by Cras |
| DBus API is plugged. However, we filter out UNKNOWN node here because |
| the existence of unknown node depends on the number of redundant |
| playback/record audio device created on audio card. Also, the user of |
| Cras will ignore unknown nodes. |
| |
| @returns: True if a node is plugged and is not an UNKNOWN node. |
| |
| """ |
| return node['Type'] != 'UNKNOWN' |
| |
| return get_filtered_node_types(is_plugged) |
| |
| |
| def set_selected_node_types(output_node_types, input_node_types): |
| """Sets selected node types. |
| |
| @param output_node_types: A list of output node types. None to skip setting. |
| @param input_node_types: A list of input node types. None to skip setting. |
| |
| """ |
| if len(output_node_types) == 1: |
| set_single_selected_output_node(output_node_types[0]) |
| elif output_node_types: |
| set_selected_output_nodes(output_node_types) |
| if len(input_node_types) == 1: |
| set_single_selected_input_node(input_node_types[0]) |
| elif input_node_types: |
| set_selected_input_nodes(input_node_types) |
| |
| |
| def set_single_selected_output_node(node_type): |
| """Sets one selected output node. |
| |
| Note that Chrome UI uses SetActiveOutputNode of Cras DBus API |
| to select one output node. |
| |
| @param node_type: A node type. |
| |
| """ |
| nodes = get_cras_nodes() |
| for node in nodes: |
| if node['IsInput']: |
| continue |
| if node['Type'] == node_type: |
| set_active_output_node(node['Id']) |
| |
| |
| def set_single_selected_input_node(node_type): |
| """Sets one selected input node. |
| |
| Note that Chrome UI uses SetActiveInputNode of Cras DBus API |
| to select one input node. |
| |
| @param node_type: A node type. |
| |
| """ |
| nodes = get_cras_nodes() |
| for node in nodes: |
| if not node['IsInput']: |
| continue |
| if node['Type'] == node_type: |
| set_active_input_node(node['Id']) |
| |
| |
| def set_selected_output_nodes(types): |
| """Sets selected output node types. |
| |
| Note that Chrome UI uses SetActiveOutputNode of Cras DBus API |
| to select one output node. Here we use add/remove active output node |
| to support multiple nodes. |
| |
| @param types: A list of output node types. |
| |
| """ |
| nodes = get_cras_nodes() |
| for node in nodes: |
| if node['IsInput']: |
| continue |
| if node['Type'] in types: |
| add_active_output_node(node['Id']) |
| elif node['Active']: |
| remove_active_output_node(node['Id']) |
| |
| |
| def set_selected_input_nodes(types): |
| """Sets selected input node types. |
| |
| Note that Chrome UI uses SetActiveInputNode of Cras DBus API |
| to select one input node. Here we use add/remove active input node |
| to support multiple nodes. |
| |
| @param types: A list of input node types. |
| |
| """ |
| nodes = get_cras_nodes() |
| for node in nodes: |
| if not node['IsInput']: |
| continue |
| if node['Type'] in types: |
| add_active_input_node(node['Id']) |
| elif node['Active']: |
| remove_active_input_node(node['Id']) |
| |
| |
| def set_active_input_node(node_id): |
| """Sets one active input node. |
| |
| @param node_id: node id. |
| |
| """ |
| get_cras_control_interface().SetActiveInputNode(node_id) |
| |
| |
| def set_active_output_node(node_id): |
| """Sets one active output node. |
| |
| @param node_id: node id. |
| |
| """ |
| get_cras_control_interface().SetActiveOutputNode(node_id) |
| |
| |
| def add_active_output_node(node_id): |
| """Adds an active output node. |
| |
| @param node_id: node id. |
| |
| """ |
| get_cras_control_interface().AddActiveOutputNode(node_id) |
| |
| |
| def add_active_input_node(node_id): |
| """Adds an active input node. |
| |
| @param node_id: node id. |
| |
| """ |
| get_cras_control_interface().AddActiveInputNode(node_id) |
| |
| |
| def remove_active_output_node(node_id): |
| """Removes an active output node. |
| |
| @param node_id: node id. |
| |
| """ |
| get_cras_control_interface().RemoveActiveOutputNode(node_id) |
| |
| |
| def remove_active_input_node(node_id): |
| """Removes an active input node. |
| |
| @param node_id: node id. |
| |
| """ |
| get_cras_control_interface().RemoveActiveInputNode(node_id) |
| |
| |
| def get_node_id_from_node_type(node_type, is_input): |
| """Gets node id from node type. |
| |
| @param types: A node type defined in CRAS_NODE_TYPES. |
| @param is_input: True if the node is input. False otherwise. |
| |
| @returns: A string for node id. |
| |
| @raises: CrasUtilsError: if unique node id can not be found. |
| |
| """ |
| nodes = get_cras_nodes() |
| find_ids = [] |
| for node in nodes: |
| if node['Type'] == node_type and node['IsInput'] == is_input: |
| find_ids.append(node['Id']) |
| if len(find_ids) != 1: |
| raise CrasUtilsError( |
| 'Can not find unique node id from node type %s' % node_type) |
| return find_ids[0] |
| |
| |
| def get_device_id_of(node_id): |
| """Gets the device id of the node id. |
| |
| The conversion logic is replicated from the CRAS's type definition at |
| third_party/adhd/cras/src/common/cras_types.h. |
| |
| @param node_id: A string for node id. |
| |
| @returns: A string for device id. |
| |
| @raise: CrasUtilsError: if device id is invalid. |
| """ |
| device_id = str(long(node_id) >> 32) |
| if device_id == "0": |
| raise CrasUtilsError('Got invalid device_id: 0') |
| return device_id |
| |
| |
| def get_device_id_from_node_type(node_type, is_input): |
| """Gets device id from node type. |
| |
| @param types: A node type defined in CRAS_NODE_TYPES. |
| @param is_input: True if the node is input. False otherwise. |
| |
| @returns: A string for device id. |
| |
| """ |
| node_id = get_node_id_from_node_type(node_type, is_input) |
| return get_device_id_of(node_id) |
| |
| |
| def get_active_node_volume(): |
| """Returns volume from active node. |
| |
| @returns: int for volume |
| |
| @raises: CrasUtilsError: if node volume cannot be found. |
| """ |
| nodes = get_cras_nodes() |
| for node in nodes: |
| if node['Active'] == 1 and node['IsInput'] == 0: |
| return int(node['NodeVolume']) |
| raise CrasUtilsError('Cannot find active node volume from nodes.') |