autotest: Autoformatted changes
Presubmit complained about formatting but I didn't want to pollute the
previous commits in this series so here is the result of running
git cl format --no-clang-format
BUG=None
TEST=None
Change-Id: I341ec0fbddd1bc0c06ea4206ba126c92666bb9dc
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/third_party/autotest/+/2489243
Tested-by: Abhishek Pandit-Subedi <abhishekpandit@chromium.org>
Commit-Queue: Abhishek Pandit-Subedi <abhishekpandit@chromium.org>
Reviewed-by: Shyh-In Hwang <josephsih@chromium.org>
diff --git a/client/cros/multimedia/bluetooth_facade_native.py b/client/cros/multimedia/bluetooth_facade_native.py
index 49e830e..15b6c44 100644
--- a/client/cros/multimedia/bluetooth_facade_native.py
+++ b/client/cros/multimedia/bluetooth_facade_native.py
@@ -1,7 +1,6 @@
# Copyright 2020 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
-
"""Facade to access the bluetooth-related functionality."""
from __future__ import absolute_import
@@ -32,8 +31,8 @@
from autotest_lib.client.cros import dbus_util
from autotest_lib.client.cros.udev_helpers import UdevadmInfo, UdevadmTrigger
from autotest_lib.client.cros import xmlrpc_server
-from autotest_lib.client.cros.audio import (
- audio_test_data as audio_test_data_module)
+from autotest_lib.client.cros.audio import (audio_test_data as
+ audio_test_data_module)
from autotest_lib.client.cros.audio import check_quality
from autotest_lib.client.cros.audio import cras_utils
from autotest_lib.client.cros.audio.sox_utils import (
@@ -47,7 +46,6 @@
from six.moves import map
from six.moves import range
-
CheckQualityArgsClass = collections.namedtuple(
'args_type', ['filename', 'rate', 'channel', 'bit_width'])
@@ -76,6 +74,7 @@
@param wrapped_function function to wrap.
"""
+
def decorator(wrapped_function):
"""Call a function and catch DBus errors.
@@ -83,6 +82,7 @@
@return function return value or default_return_value on failure.
"""
+
@functools.wraps(wrapped_function)
def wrapper(*args, **kwargs):
"""Pass args and kwargs to a dbus safe function.
@@ -97,10 +97,10 @@
return wrapped_function(*args, **kwargs)
except dbus.exceptions.DBusException as e:
- logging.debug('Exception while performing operation %s: %s: %s',
- wrapped_function.__name__,
- e.get_dbus_name(),
- e.get_dbus_message())
+ logging.debug(
+ 'Exception while performing operation %s: %s: %s',
+ wrapped_function.__name__, e.get_dbus_name(),
+ e.get_dbus_message())
return (default_return_value, str(e))
return wrapper
@@ -271,9 +271,9 @@
super(PairingAgent, self).__init__(*args, **kwargs)
self._pin = pin
-
@dbus.service.method('org.bluez.Agent1',
- in_signature='o', out_signature='s')
+ in_signature='o',
+ out_signature='s')
def RequestPinCode(self, device_path):
"""Requests pin code for a device.
@@ -284,7 +284,8 @@
@returns: The known pin code.
"""
- logging.info('RequestPinCode for %s; return %s', device_path, self._pin)
+ logging.info('RequestPinCode for %s; return %s', device_path,
+ self._pin)
return self._pin
@@ -363,15 +364,11 @@
# and fetch the bluetoothd job.
self._upstart_conn = dbus.connection.Connection(self.UPSTART_PATH)
self._upstart = self._upstart_conn.get_object(
- None,
- self.UPSTART_MANAGER_PATH)
+ None, self.UPSTART_MANAGER_PATH)
bluetoothd_path = self._upstart.GetJobByName(
- self.BLUETOOTHD_JOB,
- dbus_interface=self.UPSTART_MANAGER_IFACE)
- self._bluetoothd = self._upstart_conn.get_object(
- None,
- bluetoothd_path)
+ self.BLUETOOTHD_JOB, dbus_interface=self.UPSTART_MANAGER_IFACE)
+ self._bluetoothd = self._upstart_conn.get_object(None, bluetoothd_path)
# Arrange for the GLib main loop to be the default.
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
@@ -409,11 +406,9 @@
# Create an Advertisement Monitor Helper App Manager instance.
self.advmon_appmgr = adv_monitor_helper.AdvMonitorAppMgr(
- self._system_bus,
- self._dbus_mainloop,
+ self._system_bus, self._dbus_mainloop,
self._adv_monitor_manager)
-
@xmlrpc_server.dbus_safe(False)
def set_debug_log_levels(self, dispatcher_vb, newblue_vb, bluez_vb,
kernel_vb):
@@ -428,9 +423,8 @@
# TODO(b/145163508, b/145749798): update when debug logs is migrated to
# bluez.
- debug_object = self._system_bus.get_object(
- self.BLUETOOTH_SERVICE_NAME,
- self.BLUEZ_DEBUG_LOG_PATH)
+ debug_object = self._system_bus.get_object(self.BLUETOOTH_SERVICE_NAME,
+ self.BLUEZ_DEBUG_LOG_PATH)
debug_object.SetLevels(dbus.Byte(dispatcher_vb),
dbus.Byte(newblue_vb),
dbus.Byte(bluez_vb),
@@ -454,11 +448,12 @@
@returns : True if adapter is Intel made.
"""
# Dict of Intel Adapters that support WRT and vid:pid
- vid_pid_dict = {'HrP2' : '8086:02f0',
- 'ThP2' : '8086:2526',
- 'JfP2': '8086:31dc',
- 'JfP2-2' : '8086:9df0' } # On Sarien/Arcada
-
+ vid_pid_dict = {
+ 'HrP2': '8086:02f0',
+ 'ThP2': '8086:2526',
+ 'JfP2': '8086:31dc',
+ 'JfP2-2': '8086:9df0'
+ } # On Sarien/Arcada
def _get_lspci_vid_pid(output):
""" parse output of lspci -knn and get the vid:pid
@@ -502,15 +497,17 @@
1) Check if the DUT has Intel controller other than StP2
2) Make sure the controller is powered on
"""
- fw_trace_cmd = ('hcitool cmd 3f 7c 01 10 00 00 00 FE 81 02 80 04 00 00'
- ' 00 01 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00'
- ' 00 00 00 00 00 00 00')
+ fw_trace_cmd = (
+ 'hcitool cmd 3f 7c 01 10 00 00 00 FE 81 02 80 04 00 00'
+ ' 00 01 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00'
+ ' 00 00 00 00 00 00 00')
ddc_read_cmd = 'hcitool cmd 3f 8c 28 01'
ddc_write_cmd_prefix = 'hcitool cmd 3f 8b 03 28 01'
- hw_trace_cmd = ('hcitool cmd 3f 6f 01 08 00 00 00 00 00 00 00 00 01 00'
- ' 00 03 01 03 03 03 10 03 6A 0A 6A 0A 6A 0A 6A 0A 00 00'
- ' 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00'
- ' 00 00 00 00 00 00')
+ hw_trace_cmd = (
+ 'hcitool cmd 3f 6f 01 08 00 00 00 00 00 00 00 00 01 00'
+ ' 00 03 01 03 03 03 10 03 6A 0A 6A 0A 6A 0A 6A 0A 00 00'
+ ' 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00'
+ ' 00 00 00 00 00 00')
multi_comm_trace_str = ('000000F600000000005002000000003F3F3F3'
'F3F003F000000000000000001000000000000000000'
'000000000000000000000000000000000000000000'
@@ -538,8 +535,8 @@
logging.debug('output is %s', output)
return (True, output)
except Exception as e:
- logging.error('Exception %s while executing %s command', str(e),
- msg)
+ logging.error('Exception %s while executing %s command',
+ str(e), msg)
return (False, None)
def _get_ddc_write_cmd(ddc_read_result, ddc_write_cmd_prefix):
@@ -558,10 +555,11 @@
'hcitool 01 8C FC 00 28 01 ===> 58 <===='
"""
- last_line = [i for i in ddc_read_result.strip().split(b'\n')
- if i != ''][-1]
+ last_line = [
+ i for i in ddc_read_result.strip().split(b'\n') if i != ''
+ ][-1]
last_byte = [i for i in last_line.split(b' ') if i != ''][-1]
- processed_byte= hex(int(last_byte, 16) | 0x40).split('0x')[1]
+ processed_byte = hex(int(last_byte, 16) | 0x40).split('0x')[1]
cmd = ddc_write_cmd_prefix + ' ' + processed_byte
logging.debug('ddc_write_cmd is %s', cmd)
return cmd
@@ -591,10 +589,10 @@
logging.info('HW Trace command execution failed')
return False
- logging.debug('Executing the multi_comm_trace cmd %s to file %s'
- ,multi_comm_trace_str, multi_comm_trace_file)
+ logging.debug('Executing the multi_comm_trace cmd %s to file %s',
+ multi_comm_trace_str, multi_comm_trace_file)
with open(multi_comm_trace_file, 'w') as f:
- f.write(multi_comm_trace_str+'\n')
+ f.write(multi_comm_trace_str + '\n')
f.flush()
logging.info('WRT Logs enabled')
@@ -618,11 +616,13 @@
Precondition:
1) enable_wrt_logs has been called
"""
+
def _collect_logs():
"""Execute command to collect wrt logs."""
try:
- with open('/sys/kernel/debug/ieee80211/phy0/iwlwifi/'
- 'iwlmvm/fw_dbg_collect', 'w') as f:
+ with open(
+ '/sys/kernel/debug/ieee80211/phy0/iwlwifi/'
+ 'iwlmvm/fw_dbg_collect', 'w') as f:
f.write('1')
f.flush()
# There is some flakiness in log collection. This sleep
@@ -683,7 +683,8 @@
"""
try:
- self._bluetoothd.Start(dbus.Array(signature='s'), True,
+ self._bluetoothd.Start(dbus.Array(signature='s'),
+ True,
dbus_interface=self.UPSTART_JOB_IFACE)
except dbus.exceptions.DBusException as e:
# if bluetoothd was already started, the exception looks like
@@ -696,10 +697,9 @@
logging.debug('waiting for bluez start')
try:
- utils.poll_for_condition(
- condition=self._update_bluez,
- desc='Bluetooth Daemon has started.',
- timeout=self.ADAPTER_TIMEOUT)
+ utils.poll_for_condition(condition=self._update_bluez,
+ desc='Bluetooth Daemon has started.',
+ timeout=self.ADAPTER_TIMEOUT)
except Exception as e:
logging.error('timeout: error starting bluetoothd: %s', e)
return False
@@ -729,7 +729,6 @@
return True
-
@xmlrpc_server.dbus_safe(False)
def stop_bluetoothd(self):
"""stop bluetoothd.
@@ -738,6 +737,7 @@
False otherwise.
"""
+
def bluez_stopped():
"""Checks the bluetooth daemon status.
@@ -747,7 +747,8 @@
return not self._update_bluez()
try:
- self._bluetoothd.Stop(dbus.Array(signature='s'), True,
+ self._bluetoothd.Stop(dbus.Array(signature='s'),
+ True,
dbus_interface=self.UPSTART_JOB_IFACE)
except dbus.exceptions.DBusException as e:
# If bluetoothd was stopped already, the exception looks like
@@ -759,10 +760,9 @@
logging.debug('waiting for bluez stop')
try:
- utils.poll_for_condition(
- condition=bluez_stopped,
- desc='Bluetooth Daemon has stopped.',
- timeout=self.ADAPTER_TIMEOUT)
+ utils.poll_for_condition(condition=bluez_stopped,
+ desc='Bluetooth Daemon has stopped.',
+ timeout=self.ADAPTER_TIMEOUT)
bluetoothd_stopped = True
except Exception as e:
logging.error('timeout: error stopping bluetoothd: %s', e)
@@ -770,7 +770,6 @@
return bluetoothd_stopped
-
def is_bluetoothd_running(self):
"""Is bluetoothd running?
@@ -779,7 +778,6 @@
"""
return bool(self._get_dbus_proxy_for_bluetoothd())
-
def is_bluetoothd_proxy_valid(self):
"""Checks whether the proxy object for bluetoothd is ok.
@@ -801,7 +799,6 @@
return True
-
def _update_bluez(self):
"""Store a D-Bus proxy for the Bluetooth daemon in self._bluez.
@@ -814,7 +811,6 @@
self._bluez = self._get_dbus_proxy_for_bluetoothd()
return bool(self._bluez)
-
@xmlrpc_server.dbus_safe(False)
def _get_dbus_proxy_for_bluetoothd(self):
"""Get the D-Bus proxy for the Bluetooth daemon.
@@ -838,7 +834,6 @@
logging.error('Error getting dbus proxy for Bluez: %s', e)
return bluez
-
def _update_adapter(self):
"""Store a D-Bus proxy for the local adapter in self._adapter.
@@ -891,7 +886,6 @@
self._advertising = self._get_advertising()
return bool(self._advertising)
-
def _update_adv_monitor_manager(self):
"""Store a D-Bus proxy for the local advertisement monitor manager.
@@ -918,7 +912,6 @@
self._adv_monitor_manager = self._get_adv_monitor_manager()
return bool(self._adv_monitor_manager)
-
@xmlrpc_server.dbus_safe(False)
def _get_adapter(self):
"""Get the D-Bus proxy for the local adapter.
@@ -932,15 +925,13 @@
logging.debug('%s -> %r', path, list(ifaces.keys()))
if self.BLUEZ_ADAPTER_IFACE in ifaces:
logging.debug('using adapter %s', path)
- adapter = self._system_bus.get_object(
- self.BLUEZ_SERVICE_NAME,
- path)
+ adapter = self._system_bus.get_object(self.BLUEZ_SERVICE_NAME,
+ path)
return adapter
else:
logging.warning('No adapter found in interface!')
return None
-
@xmlrpc_server.dbus_safe(False)
def _get_advertising(self):
"""Get the D-Bus proxy for the local advertising interface.
@@ -951,7 +942,6 @@
return dbus.Interface(self._adapter,
self.BLUEZ_LE_ADVERTISING_MANAGER_IFACE)
-
@xmlrpc_server.dbus_safe(False)
def _get_adv_monitor_manager(self):
"""Get the D-Bus proxy for the local advertisement monitor manager.
@@ -962,7 +952,6 @@
return dbus.Interface(self._adapter,
self.BLUEZ_ADV_MONITOR_MANAGER_IFACE)
-
@xmlrpc_server.dbus_safe(False)
def reset_on(self):
"""Reset the adapter and settings and power up the adapter.
@@ -972,7 +961,6 @@
"""
return self._reset(set_power=True)
-
@xmlrpc_server.dbus_safe(False)
def reset_off(self):
"""Reset the adapter and settings, leave the adapter powered off.
@@ -982,7 +970,6 @@
"""
return self._reset(set_power=False)
-
def has_adapter(self):
"""Return if an adapter is present.
@@ -995,7 +982,6 @@
"""
return self._has_adapter and self._adapter is not None
-
def is_wake_enabled(self):
"""Checks whether the bluetooth adapter has wake enabled.
@@ -1008,7 +994,6 @@
enabled = self._is_wake_enabled()
return enabled
-
def set_wake_enabled(self, value):
"""Sets wake enabled to the value if path exists.
@@ -1062,7 +1047,6 @@
return False
-
def _reset(self, set_power=False):
"""Remove remote devices and set adapter to set_power state.
@@ -1108,7 +1092,6 @@
return True
-
@xmlrpc_server.dbus_safe(False)
def set_powered(self, powered):
"""Set the adapter power state.
@@ -1128,7 +1111,6 @@
return False
return self._set_powered(powered)
-
@xmlrpc_server.dbus_safe(False)
def _set_powered(self, powered):
"""Set the adapter power state.
@@ -1137,12 +1119,12 @@
"""
logging.debug('_set_powered %r', powered)
- self._adapter.Set(self.BLUEZ_ADAPTER_IFACE, 'Powered',
+ self._adapter.Set(self.BLUEZ_ADAPTER_IFACE,
+ 'Powered',
dbus.Boolean(powered, variant_level=1),
dbus_interface=dbus.PROPERTIES_IFACE)
return True
-
@xmlrpc_server.dbus_safe(False)
def set_discoverable(self, discoverable):
"""Set the adapter discoverable state.
@@ -1163,7 +1145,6 @@
dbus_interface=dbus.PROPERTIES_IFACE)
return True
-
@xmlrpc_server.dbus_safe(False)
def get_discoverable_timeout(self):
"""Get the adapter discoverable_timeout.
@@ -1171,10 +1152,10 @@
@return True on success, False otherwise.
"""
- return int(self._adapter.Get(self.BLUEZ_ADAPTER_IFACE,
- 'DiscoverableTimeout',
- dbus_interface=dbus.PROPERTIES_IFACE))
-
+ return int(
+ self._adapter.Get(self.BLUEZ_ADAPTER_IFACE,
+ 'DiscoverableTimeout',
+ dbus_interface=dbus.PROPERTIES_IFACE))
@xmlrpc_server.dbus_safe(False)
def set_discoverable_timeout(self, discoverable_timeout):
@@ -1192,7 +1173,6 @@
dbus_interface=dbus.PROPERTIES_IFACE)
return True
-
@xmlrpc_server.dbus_safe(False)
def get_pairable_timeout(self):
"""Get the adapter pairable_timeout.
@@ -1200,10 +1180,10 @@
@return True on success, False otherwise.
"""
- return int(self._adapter.Get(self.BLUEZ_ADAPTER_IFACE,
- 'PairableTimeout',
- dbus_interface=dbus.PROPERTIES_IFACE))
-
+ return int(
+ self._adapter.Get(self.BLUEZ_ADAPTER_IFACE,
+ 'PairableTimeout',
+ dbus_interface=dbus.PROPERTIES_IFACE))
@xmlrpc_server.dbus_safe(False)
def set_pairable_timeout(self, pairable_timeout):
@@ -1221,7 +1201,6 @@
dbus_interface=dbus.PROPERTIES_IFACE)
return True
-
@xmlrpc_server.dbus_safe(False)
def set_pairable(self, pairable):
"""Set the adapter pairable state.
@@ -1231,12 +1210,12 @@
@return True on success, False otherwise.
"""
- self._adapter.Set(self.BLUEZ_ADAPTER_IFACE, 'Pairable',
+ self._adapter.Set(self.BLUEZ_ADAPTER_IFACE,
+ 'Pairable',
dbus.Boolean(pairable, variant_level=1),
dbus_interface=dbus.PROPERTIES_IFACE)
return True
-
@xmlrpc_server.dbus_safe(False)
def _get_adapter_properties(self):
"""Read the adapter properties from the Bluetooth Daemon.
@@ -1248,7 +1227,8 @@
if self._bluez and self._adapter:
objects = self._bluez.GetManagedObjects(
dbus_interface=self.BLUEZ_MANAGER_IFACE)
- props = objects[self._adapter.object_path][self.BLUEZ_ADAPTER_IFACE]
+ props = objects[self._adapter.object_path][
+ self.BLUEZ_ADAPTER_IFACE]
else:
props = {}
logging.debug('get_adapter_properties')
@@ -1256,11 +1236,9 @@
logging.debug(i)
return props
-
def get_adapter_properties(self):
return json.dumps(self._get_adapter_properties())
-
def _is_powered_on(self):
return bool(self._get_adapter_properties().get(u'Powered'))
@@ -1331,7 +1309,6 @@
_control = bluetooth_socket.BluetoothControlSocket()
return json.dumps(_control.read_version())
-
def read_supported_commands(self):
"""Read the set of supported commands from the Kernel.
@@ -1345,7 +1322,6 @@
_control = bluetooth_socket.BluetoothControlSocket()
return json.dumps(_control.read_supported_commands())
-
def read_index_list(self):
"""Read the list of currently known controllers from the Kernel.
@@ -1358,7 +1334,6 @@
_control = bluetooth_socket.BluetoothControlSocket()
return json.dumps(_control.read_index_list())
-
def read_info(self):
"""Read the adapter information from the Kernel.
@@ -1374,7 +1349,6 @@
_control = bluetooth_socket.BluetoothControlSocket()
return json.dumps(_control.read_info(0))
-
def add_device(self, address, address_type, action):
"""Add a device to the Kernel action list.
@@ -1390,9 +1364,8 @@
# BluetoothControlSocket idle too long(about 3 secs)
# (b:137603211)
_control = bluetooth_socket.BluetoothControlSocket()
- return json.dumps(_control.add_device(
- 0, address, address_type, action))
-
+ return json.dumps(_control.add_device(0, address, address_type,
+ action))
def remove_device(self, address, address_type):
"""Remove a device from the Kernel action list.
@@ -1408,9 +1381,7 @@
# BluetoothControlSocket idle too long(about 3 secs)
# (b:137603211)
_control = bluetooth_socket.BluetoothControlSocket()
- return json.dumps(_control.remove_device(
- 0, address, address_type))
-
+ return json.dumps(_control.remove_device(0, address, address_type))
@xmlrpc_server.dbus_safe(False)
def _get_devices(self):
@@ -1427,7 +1398,6 @@
devices.append(objects[path][self.BLUEZ_DEVICE_IFACE])
return devices
-
def _encode_base64_json(self, data):
"""Base64 encode and json encode the data.
Required to handle non-ascii data
@@ -1444,7 +1414,6 @@
logging.debug('JSON encoded data is %s', json_encoded)
return json_encoded
-
def get_devices(self):
"""Read information about remote devices known to the adapter.
@@ -1455,7 +1424,6 @@
devices = self._get_devices()
return self._encode_base64_json(devices)
-
@xmlrpc_server.dbus_safe(None)
def get_device_property(self, address, prop_name):
"""Read a property of BT device by directly querying device dbus object
@@ -1477,12 +1445,12 @@
if device_obj:
# Query dbus object for property
- prop_val = device_obj.Get(self.BLUEZ_DEVICE_IFACE, prop_name,
+ prop_val = device_obj.Get(self.BLUEZ_DEVICE_IFACE,
+ prop_name,
dbus_interface=dbus.PROPERTIES_IFACE)
return self._encode_base64_json(prop_val)
-
@xmlrpc_server.dbus_safe(False)
def set_discovery_filter(self, filter):
"""Set the discovery filter.
@@ -1498,7 +1466,6 @@
filter, dbus_interface=self.BLUEZ_ADAPTER_IFACE)
return True
-
@xmlrpc_server.dbus_safe(False)
@dbus_print_error()
def start_discovery(self):
@@ -1515,7 +1482,6 @@
self._adapter.StartDiscovery(dbus_interface=self.BLUEZ_ADAPTER_IFACE)
return (True, None)
-
@dbus_print_error()
def stop_discovery(self):
"""Stop discovery of remote devices.
@@ -1552,7 +1518,6 @@
dbus_interface=self.BLUEZ_ADAPTER_IFACE)
return (True, None)
-
@xmlrpc_server.dbus_safe(False)
@dbus_print_error()
def pause_discovery(self, system_suspend_resume=False):
@@ -1568,11 +1533,10 @@
"""
if not self._adapter:
return (False, "Adapter Not Found")
- self._adapter.PauseDiscovery(
- system_suspend_resume, dbus_interface=self.BLUEZ_ADAPTER_IFACE)
+ self._adapter.PauseDiscovery(system_suspend_resume,
+ dbus_interface=self.BLUEZ_ADAPTER_IFACE)
return (True, None)
-
@xmlrpc_server.dbus_safe(False)
@dbus_print_error()
def unpause_discovery(self, system_suspend_resume=False):
@@ -1588,11 +1552,10 @@
"""
if not self._adapter:
return (False, "Adapter Not Found")
- self._adapter.UnpauseDiscovery(
- system_suspend_resume, dbus_interface=self.BLUEZ_ADAPTER_IFACE)
+ self._adapter.UnpauseDiscovery(system_suspend_resume,
+ dbus_interface=self.BLUEZ_ADAPTER_IFACE)
return (True, None)
-
def get_dev_info(self):
"""Read raw HCI device information.
@@ -1607,7 +1570,6 @@
"""
return json.dumps(self._raw.get_dev_info(0))
-
@dbus_print_error(None)
def get_supported_capabilities(self):
""" Get supported capabilities of the adapter
@@ -1618,7 +1580,6 @@
dbus_interface=self.BLUEZ_ADAPTER_IFACE)
return (json.dumps(value), None)
-
@xmlrpc_server.dbus_safe(False)
def register_profile(self, path, uuid, options):
"""Register new profile (service).
@@ -1632,17 +1593,15 @@
"""
profile_manager = dbus.Interface(
- self._system_bus.get_object(
- self.BLUEZ_SERVICE_NAME,
- self.BLUEZ_PROFILE_MANAGER_PATH),
- self.BLUEZ_PROFILE_MANAGER_IFACE)
- dbus_object = self._system_bus.get_object(
- self.BLUEZ_SERVICE_NAME, path)
- profile_manager.RegisterProfile(dbus_object, uuid,
- dbus.Dictionary(options, signature='sv'))
+ self._system_bus.get_object(self.BLUEZ_SERVICE_NAME,
+ self.BLUEZ_PROFILE_MANAGER_PATH),
+ self.BLUEZ_PROFILE_MANAGER_IFACE)
+ dbus_object = self._system_bus.get_object(self.BLUEZ_SERVICE_NAME,
+ path)
+ profile_manager.RegisterProfile(
+ dbus_object, uuid, dbus.Dictionary(options, signature='sv'))
return True
-
def has_device(self, address):
"""Checks if the device with a given address exists.
@@ -1665,7 +1624,6 @@
# False otherwise.
return bool(result)
-
@xmlrpc_server.dbus_safe(False)
def _find_device(self, address):
"""Finds the device with a given address.
@@ -1680,13 +1638,11 @@
"""
path = self._get_device_path(address)
if path:
- obj = self._system_bus.get_object(
- self.BLUEZ_SERVICE_NAME, path)
+ obj = self._system_bus.get_object(self.BLUEZ_SERVICE_NAME, path)
return dbus.Interface(obj, self.BLUEZ_DEVICE_IFACE)
logging.info('Device not found')
return None
-
@xmlrpc_server.dbus_safe(False)
def _get_device_path(self, address):
"""Gets the path for a device with a given address.
@@ -1710,7 +1666,8 @@
try:
device = self._system_bus.get_object(self.BLUEZ_SERVICE_NAME,
device_path)
- found_addr = device.Get(self.BLUEZ_DEVICE_IFACE, 'Address',
+ found_addr = device.Get(self.BLUEZ_DEVICE_IFACE,
+ 'Address',
dbus_interface=dbus.PROPERTIES_IFACE)
if found_addr == address:
@@ -1724,7 +1681,6 @@
logging.debug('No device found at {}'.format(device_path))
return None
-
@xmlrpc_server.dbus_safe(False)
def _setup_pairing_agent(self, pin):
"""Initializes and resiters a PairingAgent to handle authentication.
@@ -1733,19 +1689,19 @@
"""
if self._pairing_agent:
- logging.info('Removing the old agent before initializing a new one')
+ logging.info(
+ 'Removing the old agent before initializing a new one')
self._pairing_agent.remove_from_connection()
self._pairing_agent = None
- self._pairing_agent= PairingAgent(pin, self._system_bus,
- self.AGENT_PATH)
+ self._pairing_agent = PairingAgent(pin, self._system_bus,
+ self.AGENT_PATH)
agent_manager = dbus.Interface(
self._system_bus.get_object(self.BLUEZ_SERVICE_NAME,
self.BLUEZ_AGENT_MANAGER_PATH),
self.BLUEZ_AGENT_MANAGER_IFACE)
try:
- agent_obj = self._system_bus.get_object(
- self.BLUEZ_SERVICE_NAME,
- self.AGENT_PATH)
+ agent_obj = self._system_bus.get_object(self.BLUEZ_SERVICE_NAME,
+ self.AGENT_PATH)
agent_manager.RegisterAgent(agent_obj,
dbus.String(self._capability))
except dbus.exceptions.DBusException as e:
@@ -1759,9 +1715,8 @@
raise
logging.info('Agent registered: %s', self.AGENT_PATH)
-
@xmlrpc_server.dbus_safe(False)
- def _is_paired(self, device):
+ def _is_paired(self, device):
"""Checks if a device is paired.
@param device: An 'org.bluez.Device1' interface to the device.
@@ -1773,7 +1728,6 @@
paired = props.Get(self.BLUEZ_DEVICE_IFACE, 'Paired')
return bool(paired)
-
@xmlrpc_server.dbus_safe(False)
def device_is_paired(self, address):
"""Checks if a device is paired.
@@ -1789,7 +1743,6 @@
return False
return self._is_paired(device)
-
@xmlrpc_server.dbus_safe(False)
def _is_connected(self, device):
"""Checks if a device is connected.
@@ -1804,8 +1757,6 @@
logging.info('Got connected = %r', connected)
return bool(connected)
-
-
@xmlrpc_server.dbus_safe(False)
def _set_trusted_by_device(self, device, trusted=True):
"""Set the device trusted by device object.
@@ -1827,7 +1778,6 @@
logging.error('_set_trusted_by_device: unexpected error')
return False
-
@xmlrpc_server.dbus_safe(False)
def _set_trusted_by_path(self, device_path, trusted=True):
"""Set the device trusted by the device path.
@@ -1848,7 +1798,6 @@
logging.error('_set_trusted_by_path: unexpected error')
return False
-
@xmlrpc_server.dbus_safe(False)
def set_trusted(self, address, trusted=True):
"""Set the device trusted by address.
@@ -1868,7 +1817,6 @@
logging.error('set_trusted: unexpected error')
return False
-
@xmlrpc_server.dbus_safe(False)
def pair_legacy_device(self, address, pin, trusted, timeout=60):
"""Pairs a device with a given pin code.
@@ -1886,6 +1834,7 @@
@returns: True on success. False otherwise.
"""
+
def connect_reply():
"""Handler when connect succeeded."""
logging.info('Device connected: %s', device_path)
@@ -1909,10 +1858,9 @@
# On finishing pairing, also connect; let connect result exit
# mainloop instead
- device.Connect(
- reply_handler=connect_reply,
- error_handler=connect_error,
- timeout=timeout * 1000)
+ device.Connect(reply_handler=connect_reply,
+ error_handler=connect_error,
+ timeout=timeout * 1000)
def pair_error(error):
"""Handler when pairing failed.
@@ -1946,7 +1894,8 @@
try:
if not self._is_paired(device):
logging.info('Device is not paired. Pair and Connect.')
- device.Pair(reply_handler=pair_reply, error_handler=pair_error,
+ device.Pair(reply_handler=pair_reply,
+ error_handler=pair_error,
timeout=timeout * 1000)
mainloop.run()
elif not self._is_connected(device):
@@ -1961,7 +1910,6 @@
return self._is_paired(device) and self._is_connected(device)
-
@xmlrpc_server.dbus_safe(False)
def remove_device_object(self, address):
"""Removes a device object and the pairing information.
@@ -1978,11 +1926,10 @@
if not device:
logging.error('Device not found')
return False
- self._adapter.RemoveDevice(
- device.object_path, dbus_interface=self.BLUEZ_ADAPTER_IFACE)
+ self._adapter.RemoveDevice(device.object_path,
+ dbus_interface=self.BLUEZ_ADAPTER_IFACE)
return True
-
@xmlrpc_server.dbus_safe(False)
def connect_device(self, address):
"""Connects a device.
@@ -2004,7 +1951,6 @@
device.Connect()
return self._is_connected(device)
-
@xmlrpc_server.dbus_safe(False)
def device_is_connected(self, address):
"""Checks if a device is connected.
@@ -2020,7 +1966,6 @@
return False
return self._is_connected(device)
-
@xmlrpc_server.dbus_safe(False)
def disconnect_device(self, address):
"""Disconnects a device.
@@ -2042,7 +1987,6 @@
device.Disconnect()
return not self._is_connected(device)
-
@xmlrpc_server.dbus_safe(False)
def _device_services_resolved(self, device):
"""Checks if services are resolved.
@@ -2058,7 +2002,6 @@
logging.info('Services resolved = %r', resolved)
return bool(resolved)
-
@xmlrpc_server.dbus_safe(False)
def device_services_resolved(self, address):
"""Checks if service discovery is complete on a device.
@@ -2081,17 +2024,14 @@
return self._device_services_resolved(device)
-
def btmon_start(self):
"""Start btmon monitoring."""
self.btmon.start()
-
def btmon_stop(self):
"""Stop btmon monitoring."""
self.btmon.stop()
-
def btmon_get(self, search_str, start_str):
"""Get btmon output contents.
@@ -2105,7 +2045,6 @@
return self.btmon.get_contents(search_str=search_str,
start_str=start_str)
-
def btmon_find(self, pattern_str):
"""Find if a pattern string exists in btmon output.
@@ -2116,7 +2055,6 @@
"""
return self.btmon.find(pattern_str)
-
def messages_start(self):
"""Start messages monitoring.
@@ -2158,8 +2096,8 @@
return self.messages.LogContains(pattern_str)
@xmlrpc_server.dbus_safe(False)
- def dbus_async_method(self, dbus_method,
- reply_handler, error_handler, *args):
+ def dbus_async_method(self, dbus_method, reply_handler, error_handler,
+ *args):
"""Run an async dbus method.
@param dbus_method: the dbus async method to invoke.
@@ -2172,23 +2110,23 @@
an error string if the dbus method fails or exception occurs
"""
+
def successful_cb():
"""Called when the dbus_method completed successfully."""
reply_handler()
self.dbus_cb_msg = ''
self._dbus_mainloop.quit()
-
def error_cb(error):
"""Called when the dbus_method failed."""
error_handler(error)
self.dbus_cb_msg = str(error)
self._dbus_mainloop.quit()
-
# Call dbus_method with handlers.
try:
- dbus_method(*args, reply_handler=successful_cb,
+ dbus_method(*args,
+ reply_handler=successful_cb,
error_handler=error_cb)
except Exception as e:
logging.error('Exception %s in dbus_async_method ', e)
@@ -2198,7 +2136,6 @@
return self.dbus_cb_msg
-
def advmon_read_supported_types(self):
"""Read the Advertisement Monitor supported monitor types.
@@ -2213,7 +2150,6 @@
dbus_interface=self.DBUS_PROP_IFACE)
return dbus_util.dbus2primitive(types)
-
def advmon_read_supported_features(self):
"""Read the Advertisement Monitor supported features.
@@ -2228,7 +2164,6 @@
dbus_interface=self.DBUS_PROP_IFACE)
return dbus_util.dbus2primitive(features)
-
def advmon_create_app(self):
"""Create an advertisement monitor app.
@@ -2237,7 +2172,6 @@
"""
return self.advmon_appmgr.create_app()
-
def advmon_exit_app(self, app_id):
"""Exit an advertisement monitor app.
@@ -2248,7 +2182,6 @@
"""
return self.advmon_appmgr.exit_app(app_id)
-
def advmon_kill_app(self, app_id):
"""Kill an advertisement monitor app by sending SIGKILL.
@@ -2259,7 +2192,6 @@
"""
return self.advmon_appmgr.kill_app(app_id)
-
def advmon_register_app(self, app_id):
"""Register an advertisement monitor app.
@@ -2270,7 +2202,6 @@
"""
return self.advmon_appmgr.register_app(app_id)
-
def advmon_unregister_app(self, app_id):
"""Unregister an advertisement monitor app.
@@ -2281,7 +2212,6 @@
"""
return self.advmon_appmgr.unregister_app(app_id)
-
def advmon_add_monitor(self, app_id, monitor_data):
"""Create an Advertisement Monitor object.
@@ -2294,7 +2224,6 @@
"""
return self.advmon_appmgr.add_monitor(app_id, monitor_data)
-
def advmon_remove_monitor(self, app_id, monitor_id):
"""Remove the Advertisement Monitor object.
@@ -2306,7 +2235,6 @@
"""
return self.advmon_appmgr.remove_monitor(app_id, monitor_id)
-
def advmon_get_event_count(self, app_id, monitor_id, event):
"""Read the count of a particular event on the given monitor.
@@ -2319,7 +2247,6 @@
"""
return self.advmon_appmgr.get_event_count(app_id, monitor_id, event)
-
def advmon_reset_event_count(self, app_id, monitor_id, event):
"""Reset the count of a particular event on the given monitor.
@@ -2387,10 +2314,10 @@
lambda: logging.info('register_advertisement: succeeded.'),
# error handler
lambda error: logging.error(
- 'register_advertisement: failed: %s', str(error)),
+ 'register_advertisement: failed: %s', str(error)),
# other arguments
- adv.get_path(), dbus.Dictionary({}, signature='sv'))
-
+ adv.get_path(),
+ dbus.Dictionary({}, signature='sv'))
def unregister_advertisement(self, advertisement_data):
"""Unregister an advertisement.
@@ -2422,7 +2349,7 @@
lambda: logging.info('unregister_advertisement: succeeded.'),
# error handler
lambda error: logging.error(
- 'unregister_advertisement: failed: %s', str(error)),
+ 'unregister_advertisement: failed: %s', str(error)),
# other arguments
adv.get_path())
@@ -2432,7 +2359,6 @@
return result
-
def set_advertising_intervals(self, min_adv_interval_ms,
max_adv_interval_ms):
"""Set advertising intervals.
@@ -2449,12 +2375,11 @@
lambda: logging.info('set_advertising_intervals: succeeded.'),
# error handler
lambda error: logging.error(
- 'set_advertising_intervals: failed: %s', str(error)),
+ 'set_advertising_intervals: failed: %s', str(error)),
# other arguments
dbus.UInt16(min_adv_interval_ms),
dbus.UInt16(max_adv_interval_ms))
-
def reset_advertising(self):
"""Reset advertising.
@@ -2476,9 +2401,8 @@
# reply handler
lambda: logging.info('reset_advertising: succeeded.'),
# error handler
- lambda error: logging.error(
- 'reset_advertising: failed: %s', str(error)))
-
+ lambda error: logging.error('reset_advertising: failed: %s',
+ str(error)))
def create_audio_record_directory(self, audio_record_dir):
"""Create the audio recording directory.
@@ -2496,7 +2420,6 @@
audio_record_dir, e)
return False
-
def start_capturing_audio_subprocess(self, audio_data, recording_device):
"""Start capturing audio in a subprocess.
@@ -2514,7 +2437,6 @@
rate=audio_data['rate'],
duration=audio_data['duration'])
-
def stop_capturing_audio_subprocess(self):
"""Stop capturing audio.
@@ -2522,7 +2444,6 @@
"""
return self._cras_test_client.stop_capturing_subprocess()
-
def _generate_playback_file(self, audio_data):
"""Generate the playback file if it does not exist yet.
@@ -2532,7 +2453,8 @@
@param audio_data: the audio test data
"""
if not os.path.exists(audio_data['file']):
- data_format = dict(file_type='raw', sample_format='S16_LE',
+ data_format = dict(file_type='raw',
+ sample_format='S16_LE',
channel=audio_data['channels'],
rate=audio_data['rate'])
@@ -2546,7 +2468,6 @@
frequencies=audio_data['frequencies'])
logging.debug("Raw file generated: %s", audio_data['file'])
-
def start_playing_audio_subprocess(self, audio_data):
"""Start playing audio in a subprocess.
@@ -2566,7 +2487,6 @@
logging.error("start_playing_subprocess() failed: %s", str(e))
return False
-
def stop_playing_audio_subprocess(self):
"""Stop playing audio in the subprocess.
@@ -2574,7 +2494,6 @@
"""
return self._cras_test_client.stop_playing_subprocess()
-
def play_audio(self, audio_data):
"""Play audio.
@@ -2591,7 +2510,6 @@
rate=audio_data['rate'],
duration=audio_data['duration'])
-
def check_audio_frames_legitimacy(self, audio_test_data, recording_device,
recorded_file):
"""Get the number of frames in the recorded audio file.
@@ -2625,7 +2543,6 @@
wav_file = check_quality.WaveFile(recorded_filename)
return wav_file.get_number_frames() > 0
-
def convert_audio_sample_rate(self, input_file, out_file, test_data,
new_rate):
"""Convert audio file to new sample rate.
@@ -2639,17 +2556,27 @@
"""
test_data = json.loads(test_data)
logging.debug('Resampling file {} to new rate {}'.format(
- input_file, new_rate))
+ input_file, new_rate))
- convert_format(input_file, test_data['channels'],
- test_data['bit_width'], test_data['rate'], out_file,
- test_data['channels'], test_data['bit_width'], new_rate,
- 1.0, use_src_header=True, use_dst_header=True)
+ convert_format(input_file,
+ test_data['channels'],
+ test_data['bit_width'],
+ test_data['rate'],
+ out_file,
+ test_data['channels'],
+ test_data['bit_width'],
+ new_rate,
+ 1.0,
+ use_src_header=True,
+ use_dst_header=True)
return os.path.isfile(out_file)
-
- def trim_wav_file(self, in_file, out_file, new_duration, test_data,
+ def trim_wav_file(self,
+ in_file,
+ out_file,
+ new_duration,
+ test_data,
tolerance=0.1):
"""Trim long file to desired length.
@@ -2669,10 +2596,10 @@
test_data = json.loads(test_data)
trim_silence_from_wav_file(in_file, out_file, new_duration)
measured_length = get_file_length(out_file, test_data['channels'],
- test_data['bit_width'], test_data['rate'])
+ test_data['bit_width'],
+ test_data['rate'])
return abs(measured_length - new_duration) <= tolerance
-
def unzip_audio_test_data(self, tar_path, data_dir):
"""Unzip audio test data files.
@@ -2685,11 +2612,13 @@
# creates path to dir to extract test data to by taking name of the
# tarball without the extension eg. <dir>/file.ext to data_dir/file/
audio_test_dir = os.path.join(
- data_dir, os.path.split(tar_path)[1].split('.', 1)[0])
+ data_dir,
+ os.path.split(tar_path)[1].split('.', 1)[0])
unzip_cmd = 'tar -xf {0} -C {1}'.format(tar_path, data_dir)
- unzip_proc = subprocess.Popen(unzip_cmd.split(), stdout=subprocess.PIPE,
+ unzip_proc = subprocess.Popen(unzip_cmd.split(),
+ stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
_, stderr = unzip_proc.communicate()
@@ -2700,7 +2629,6 @@
return unzip_proc.returncode == 0 and os.path.isdir(audio_test_dir)
-
def convert_raw_to_wav(self, input_file, output_file, test_data):
"""Convert raw audio file to wav file.
@@ -2712,11 +2640,11 @@
"""
test_data = json.loads(test_data)
convert_raw_file(input_file, test_data['channels'],
- test_data['bit_width'], test_data['rate'], output_file)
+ test_data['bit_width'], test_data['rate'],
+ output_file)
return os.path.isfile(output_file)
-
def get_primary_frequencies(self, audio_test_data, recording_device,
recorded_file):
"""Get primary frequencies of the audio test file.
@@ -2748,12 +2676,13 @@
check_quality=False,
quality_params=None)
spectra = checker._spectrals
- primary_freq = [float(spectra[i][0][0]) if spectra[i] else 0
- for i in range(len(spectra))]
+ primary_freq = [
+ float(spectra[i][0][0]) if spectra[i] else 0
+ for i in range(len(spectra))
+ ]
primary_freq.sort()
return primary_freq
-
def enable_wbs(self, value):
"""Enable or disable wideband speech (wbs) per the value.
@@ -2763,7 +2692,6 @@
"""
return self._cras_test_client.enable_wbs(value)
-
def set_player_playback_status(self, status):
"""Set playback status for the registered media player.
@@ -2772,7 +2700,6 @@
"""
return self._cras_test_client.set_player_playback_status(status)
-
def set_player_position(self, position):
"""Set media position for the registered media player.
@@ -2781,7 +2708,6 @@
"""
return self._cras_test_client.set_player_position(position)
-
def set_player_metadata(self, metadata):
"""Set metadata for the registered media player.
@@ -2790,7 +2716,6 @@
"""
return self._cras_test_client.set_player_metadata(metadata)
-
def set_player_length(self, length):
"""Set media length for the registered media player.
@@ -2802,12 +2727,11 @@
@param length: length in micro seconds.
"""
- length_variant = dbus.types.Int64(length, variant_level = 1)
+ length_variant = dbus.types.Int64(length, variant_level=1)
length_dict = dbus.Dictionary({'length': length_variant},
- signature='sv')
+ signature='sv')
return self._cras_test_client.set_player_length(length_dict)
-
def select_input_device(self, device_name):
"""Select the audio input device.
@@ -2817,7 +2741,6 @@
"""
return self._cras_test_client.select_input_device(device_name)
-
@xmlrpc_server.dbus_safe(None)
def select_output_node(self, node_type):
"""Select the audio output node.
@@ -2828,7 +2751,6 @@
"""
return cras_utils.set_single_selected_output_node(node_type)
-
@xmlrpc_server.dbus_safe(None)
def get_selected_output_device_type(self):
"""Get the selected audio output node type.
@@ -2838,7 +2760,6 @@
# Note: should convert the dbus.String to the regular string.
return str(cras_utils.get_selected_output_device_type())
-
def get_gatt_attributes_map(self, address):
"""Return a JSON formatted string of the GATT attributes of a device,
keyed by UUID
@@ -2854,7 +2775,7 @@
device_object_path = self._get_device_path(address)
objects = self._bluez.GetManagedObjects(
- dbus_interface=self.BLUEZ_MANAGER_IFACE, byte_arrays=False)
+ dbus_interface=self.BLUEZ_MANAGER_IFACE, byte_arrays=False)
service_map = self._get_service_map(device_object_path, objects)
servs = dict()
@@ -2888,7 +2809,6 @@
return json.dumps(attribute_map)
-
def _get_gatt_interface(self, uuid, object_path, interface):
"""Get dbus interface by uuid
@param uuid: a string of uuid
@@ -2898,9 +2818,8 @@
"""
return dbus.Interface(
- self._system_bus.get_object(
- self.BLUEZ_SERVICE_NAME, object_path), interface)
-
+ self._system_bus.get_object(self.BLUEZ_SERVICE_NAME,
+ object_path), interface)
def get_gatt_service_property(self, object_path, property_name):
"""Get property from a service attribute
@@ -2911,9 +2830,9 @@
none otherwise
"""
- return self.get_gatt_attribute_property(
- object_path, self.BLUEZ_GATT_SERV_IFACE, property_name)
-
+ return self.get_gatt_attribute_property(object_path,
+ self.BLUEZ_GATT_SERV_IFACE,
+ property_name)
def get_gatt_characteristic_property(self, object_path, property_name):
"""Get property from a characteristic attribute
@@ -2924,9 +2843,9 @@
none otherwise
"""
- return self.get_gatt_attribute_property(
- object_path, self.BLUEZ_GATT_CHAR_IFACE, property_name)
-
+ return self.get_gatt_attribute_property(object_path,
+ self.BLUEZ_GATT_CHAR_IFACE,
+ property_name)
def get_gatt_descriptor_property(self, object_path, property_name):
"""Get property from descriptor attribute
@@ -2937,9 +2856,9 @@
none otherwise
"""
- return self.get_gatt_attribute_property(
- object_path, self.BLUEZ_GATT_DESC_IFACE, property_name)
-
+ return self.get_gatt_attribute_property(object_path,
+ self.BLUEZ_GATT_DESC_IFACE,
+ property_name)
@xmlrpc_server.dbus_safe(None)
def get_gatt_attribute_property(self, object_path, interface,
@@ -2952,8 +2871,8 @@
none otherwise
"""
- gatt_object = self._system_bus.get_object(
- self.BLUEZ_SERVICE_NAME, object_path)
+ gatt_object = self._system_bus.get_object(self.BLUEZ_SERVICE_NAME,
+ object_path)
prop = self._get_dbus_object_property(gatt_object, interface,
property_name)
logging.info(prop)
@@ -2969,7 +2888,6 @@
return list(map(str, prop))
return prop
-
@xmlrpc_server.dbus_safe(None)
def gatt_characteristic_read_value(self, uuid, object_path):
"""Perform method ReadValue on a characteristic attribute
@@ -2984,7 +2902,6 @@
value = dbus_interface.ReadValue(dbus.Dictionary({}, signature='sv'))
return _dbus_byte_array_to_b64_string(value)
-
@xmlrpc_server.dbus_safe(None)
def gatt_descriptor_read_value(self, uuid, object_path):
"""Perform method ReadValue on a descriptor attribute
@@ -2999,7 +2916,6 @@
value = dbus_interface.ReadValue(dbus.Dictionary({}, signature='sv'))
return _dbus_byte_array_to_b64_string(value)
-
@xmlrpc_server.dbus_safe(False)
def _get_attribute_map(self, object_path, dbus_interface, objects):
"""Gets a map of object paths under an object path.
@@ -3018,8 +2934,7 @@
if object_path:
for path, ifaces in six.iteritems(objects):
- if (dbus_interface in ifaces and
- path.startswith(object_path)):
+ if (dbus_interface in ifaces and path.startswith(object_path)):
uuid = ifaces[dbus_interface]['UUID'].lower()
attr_map[uuid] = path
@@ -3028,16 +2943,14 @@
return attr_map
-
def _get_service_map(self, device_path, objects):
"""Gets a map of service paths for a device.
@param device_path: the object path of the device.
@param objects: The managed objects.
"""
- return self._get_attribute_map(
- device_path, self.BLUEZ_GATT_SERV_IFACE, objects)
-
+ return self._get_attribute_map(device_path, self.BLUEZ_GATT_SERV_IFACE,
+ objects)
def _get_characteristic_map(self, serv_path, objects):
"""Gets a map of characteristic paths for a service.
@@ -3045,9 +2958,8 @@
@param serv_path: the object path of the service.
@param objects: The managed objects.
"""
- return self._get_attribute_map(
- serv_path, self.BLUEZ_GATT_CHAR_IFACE, objects)
-
+ return self._get_attribute_map(serv_path, self.BLUEZ_GATT_CHAR_IFACE,
+ objects)
def _get_descriptor_map(self, chrc_path, objects):
"""Gets a map of descriptor paths for a characteristic.
@@ -3055,13 +2967,12 @@
@param chrc_path: the object path of the characteristic.
@param objects: The managed objects.
"""
- return self._get_attribute_map(
- chrc_path, self.BLUEZ_GATT_DESC_IFACE, objects)
-
+ return self._get_attribute_map(chrc_path, self.BLUEZ_GATT_DESC_IFACE,
+ objects)
@xmlrpc_server.dbus_safe(None)
def _get_dbus_object_property(self, dbus_object, dbus_interface,
- dbus_property):
+ dbus_property):
"""Get the property in an object.
@param dbus_object: a dbus object
@@ -3072,9 +2983,8 @@
"""
return dbus_object.Get(dbus_interface,
- dbus_property,
- dbus_interface=dbus.PROPERTIES_IFACE)
-
+ dbus_property,
+ dbus_interface=dbus.PROPERTIES_IFACE)
@xmlrpc_server.dbus_safe(False)
def get_characteristic_map(self, address):
@@ -3094,11 +3004,11 @@
if device_path:
objects = self._bluez.GetManagedObjects(
- dbus_interface=self.BLUEZ_MANAGER_IFACE, byte_arrays=False)
+ dbus_interface=self.BLUEZ_MANAGER_IFACE, byte_arrays=False)
for path, ifaces in six.iteritems(objects):
- if (self.BLUEZ_GATT_CHAR_IFACE in ifaces and
- path.startswith(device_path)):
+ if (self.BLUEZ_GATT_CHAR_IFACE in ifaces
+ and path.startswith(device_path)):
uuid = ifaces[self.BLUEZ_GATT_CHAR_IFACE]['UUID'].lower()
char_map[uuid] = path
else:
@@ -3106,7 +3016,6 @@
return char_map
-
@xmlrpc_server.dbus_safe(None)
def _get_char_object(self, uuid, address):
"""Gets a characteristic object.
@@ -3126,9 +3035,8 @@
logging.error("path not found: %s %s", uuid, address)
return None
return dbus.Interface(
- self._system_bus.get_object(self.BLUEZ_SERVICE_NAME, path),
- self.BLUEZ_GATT_CHAR_IFACE)
-
+ self._system_bus.get_object(self.BLUEZ_SERVICE_NAME, path),
+ self.BLUEZ_GATT_CHAR_IFACE)
@xmlrpc_server.dbus_safe(None)
def read_characteristic(self, uuid, address):
@@ -3152,7 +3060,6 @@
value = char_obj.ReadValue(dbus.Dictionary({}, signature='sv'))
return _dbus_byte_array_to_b64_string(value)
-
@xmlrpc_server.dbus_safe(None)
def write_characteristic(self, uuid, address, value):
"""Performs a write operation on a gatt characteristic.
@@ -3176,7 +3083,6 @@
char_obj.WriteValue(dbus_value, dbus.Dictionary({}, signature='sv'))
return True
-
@xmlrpc_server.dbus_safe(None)
def exchange_messages(self, tx_object_path, rx_object_path, value):
"""Performs a write operation on a gatt characteristic and wait for
@@ -3199,13 +3105,13 @@
self._chrc_property = None
self._signal_watch = self._system_bus.add_signal_receiver(
- self._property_changed,
- signal_name='PropertiesChanged',
- path=rx_object_path)
+ self._property_changed,
+ signal_name='PropertiesChanged',
+ path=rx_object_path)
self._timeout_id = gobject.timeout_add(
- self.PROPERTY_UPDATE_TIMEOUT_MILLI_SECS,
- self._property_wait_timeout)
+ self.PROPERTY_UPDATE_TIMEOUT_MILLI_SECS,
+ self._property_wait_timeout)
write_value = _b64_string_to_dbus_byte_array(value)
tx_obj.WriteValue(write_value, dbus.Dictionary({}, signature='sv'))
@@ -3214,11 +3120,10 @@
return _dbus_byte_array_to_b64_string(self._chrc_property)
-
def _property_changed(self, *args, **kwargs):
"""Handler for properties changed signal."""
gobject.source_remove(self._timeout_id)
- self._signal_watch.remove();
+ self._signal_watch.remove()
changed_prop = args
logging.info(changed_prop)
@@ -3227,23 +3132,21 @@
if self._dbus_mainloop.is_running():
self._dbus_mainloop.quit()
-
def _property_wait_timeout(self):
"""Timeout handler when waiting for properties update signal."""
- self._signal_watch.remove();
+ self._signal_watch.remove()
if self._dbus_mainloop.is_running():
logging.warn("quit main loop due to timeout")
self._dbus_mainloop.quit()
# Return false so that this method will not be called again.
return False
-
@xmlrpc_server.dbus_safe(False)
def _get_gatt_characteristic_object(self, object_path):
return dbus.Interface(
- self._system_bus.get_object(self.BLUEZ_SERVICE_NAME, object_path),
- self.BLUEZ_GATT_CHAR_IFACE)
-
+ self._system_bus.get_object(self.BLUEZ_SERVICE_NAME,
+ object_path),
+ self.BLUEZ_GATT_CHAR_IFACE)
@xmlrpc_server.dbus_safe(False)
def start_notify(self, object_path, cccd_value):
@@ -3274,7 +3177,6 @@
logging.error('start_notify: unexpected error')
return False
-
@xmlrpc_server.dbus_safe(False)
def stop_notify(self, object_path):
"""Stops the notification session on the gatt characteristic.
@@ -3300,7 +3202,6 @@
logging.error('stop_notify: unexpected error')
return False
-
@xmlrpc_server.dbus_safe(False)
def is_notifying(self, object_path):
"""Is the GATT characteristic in a notifying session?
@@ -3313,7 +3214,6 @@
return self.get_gatt_characteristic_property(object_path, 'Notifying')
-
@xmlrpc_server.dbus_safe(False)
def is_characteristic_path_resolved(self, uuid, address):
"""Checks whether a characteristic is in the object tree.
@@ -3330,7 +3230,6 @@
"""
return bool(self.get_characteristic_map(address).get(uuid))
-
@xmlrpc_server.dbus_safe(False)
def get_connection_info(self, address):
"""Get device connection info.
@@ -3355,7 +3254,6 @@
logging.error('get_connection_info: unexpected error')
return None
-
@xmlrpc_server.dbus_safe(False)
def set_le_connection_parameters(self, address, parameters):
"""Set the LE connection parameters.
@@ -3373,15 +3271,14 @@
return not self.dbus_async_method(
plugin_device.SetLEConnectionParameters,
# reply handler
- lambda: logging.info(
- 'set_le_connection_parameters: succeeded.'),
+ lambda: logging.info('set_le_connection_parameters: succeeded.'
+ ),
# error handler
- lambda error: logging.error(
- 'set_le_connection_parameters: failed: %s', str(error)),
+ lambda error: logging.
+ error('set_le_connection_parameters: failed: %s', str(error)),
# other arguments
parameters)
-
@xmlrpc_server.dbus_safe(False)
def _get_plugin_device_interface(self, address):
"""Get the BlueZ Chromium device plugin interface.
@@ -3400,12 +3297,9 @@
return None
return dbus.Interface(
- self._system_bus.get_object(
- self.BLUEZ_SERVICE_NAME,
- path),
+ self._system_bus.get_object(self.BLUEZ_SERVICE_NAME, path),
self.BLUEZ_PLUGIN_DEVICE_IFACE)
-
def bt_caused_last_resume(self):
"""Checks if last resume from suspend was caused by bluetooth
@@ -3432,7 +3326,8 @@
# *next* 5 logs in time, since we are piping the powerd file backwards
# with tac command
resume_indicator = 'powerd_suspend returned'
- cmd = 'tac {} | grep -B 5 -m1 "{}"'.format(event_file, resume_indicator)
+ cmd = 'tac {} | grep -B 5 -m1 "{}"'.format(event_file,
+ resume_indicator)
try:
last_resume_details = utils.run(cmd).stdout
@@ -3450,7 +3345,6 @@
return False
-
def do_suspend(self, seconds, expect_bt_wake):
"""Suspend DUT using the power manager.
@@ -3474,7 +3368,7 @@
# was early, and cause of the resume
bt_caused_wake = self.bt_caused_last_resume()
logging.info('Cause for resume: {}'.format(
- 'BT' if bt_caused_wake else 'Not BT'))
+ 'BT' if bt_caused_wake else 'Not BT'))
if not expect_bt_wake and bt_caused_wake:
raise sys_power.SuspendFailure('BT woke us unexpectedly')
@@ -3490,7 +3384,6 @@
return True
-
def get_wlan_vid_pid(self):
""" Return vendor id and product id of the wlan chip on BT/WiFi module
@@ -3513,7 +3406,6 @@
logging.debug('returning vid:%s pid:%s', vid, pid)
return (vid, pid)
-
def get_bt_module_name(self):
""" Return bluetooth module name for non-USB devices
@@ -3521,19 +3413,21 @@
dict.Otherwise it returns the raw string read.
"""
# map the string read from device to chipset name
- chipset_string_dict = { 'qcom,wcn3991-bt\x00' : 'WCN3991'}
+ chipset_string_dict = {'qcom,wcn3991-bt\x00': 'WCN3991'}
- hci_device = '/sys/class/bluetooth/hci0'
+ hci_device = '/sys/class/bluetooth/hci0'
real_path = os.path.realpath(hci_device)
logging.debug('real path is %s', real_path)
- if 'usb' in real_path:
+ if 'usb' in real_path:
return ''
- device_path = os.path.join(real_path, 'device', 'of_node', 'compatible')
+ device_path = os.path.join(real_path, 'device', 'of_node',
+ 'compatible')
try:
chipset_string = open(device_path).read()
- logging.debug('read string %s from %s', chipset_string, device_path)
+ logging.debug('read string %s from %s', chipset_string,
+ device_path)
except Exception as e:
logging.error('Exception %s while reading from file', str(e),
device_path)
diff --git a/client/cros/multimedia/multimedia_xmlrpc_server.py b/client/cros/multimedia/multimedia_xmlrpc_server.py
index 7b8439e..e77997e 100755
--- a/client/cros/multimedia/multimedia_xmlrpc_server.py
+++ b/client/cros/multimedia/multimedia_xmlrpc_server.py
@@ -49,24 +49,34 @@
arc_res = arc_resource.ArcResource()
self._facades = {
- 'assistant' : assistant_facade_native.AssistantFacadeNative(
- resource),
- 'audio': audio_facade_native.AudioFacadeNative(
- resource, arc_resource=arc_res),
- 'bluetooth': bluetooth_facade_native.BluetoothFacadeNative(),
- 'video': video_facade_native.VideoFacadeNative(
- resource, arc_resource=arc_res),
- 'display': display_facade_native.DisplayFacadeNative(resource),
- 'system': system_facade_native.SystemFacadeNative(),
- 'usb': usb_facade_native.USBFacadeNative(),
- 'browser': browser_facade_native.BrowserFacadeNative(resource),
- 'input': input_facade_native.InputFacadeNative(),
- 'cfm_main_screen': cfm_facade_native.CFMFacadeNative(
- resource, 'hotrod'),
- 'cfm_mimo_screen': cfm_facade_native.CFMFacadeNative(
- resource, 'control'),
- 'kiosk': kiosk_facade_native.KioskFacadeNative(resource),
- 'graphics': graphics_facade_native.GraphicsFacadeNative()
+ 'assistant':
+ assistant_facade_native.AssistantFacadeNative(resource),
+ 'audio':
+ audio_facade_native.AudioFacadeNative(resource,
+ arc_resource=arc_res),
+ 'bluetooth':
+ bluetooth_facade_native.BluetoothFacadeNative(),
+ 'video':
+ video_facade_native.VideoFacadeNative(resource,
+ arc_resource=arc_res),
+ 'display':
+ display_facade_native.DisplayFacadeNative(resource),
+ 'system':
+ system_facade_native.SystemFacadeNative(),
+ 'usb':
+ usb_facade_native.USBFacadeNative(),
+ 'browser':
+ browser_facade_native.BrowserFacadeNative(resource),
+ 'input':
+ input_facade_native.InputFacadeNative(),
+ 'cfm_main_screen':
+ cfm_facade_native.CFMFacadeNative(resource, 'hotrod'),
+ 'cfm_mimo_screen':
+ cfm_facade_native.CFMFacadeNative(resource, 'control'),
+ 'kiosk':
+ kiosk_facade_native.KioskFacadeNative(resource),
+ 'graphics':
+ graphics_facade_native.GraphicsFacadeNative()
}
diff --git a/server/cros/bluetooth/bluetooth_adapter_quick_tests.py b/server/cros/bluetooth/bluetooth_adapter_quick_tests.py
index cb58078..db633ef 100644
--- a/server/cros/bluetooth/bluetooth_adapter_quick_tests.py
+++ b/server/cros/bluetooth/bluetooth_adapter_quick_tests.py
@@ -117,7 +117,9 @@
# server, which log out the user.
self.factory = remote_facade_factory.RemoteFacadeFactory(
- host, no_chrome=not self.start_browser, disable_arc=True,
+ host,
+ no_chrome=not self.start_browser,
+ disable_arc=True,
retry_rpc=False)
try:
self.bluetooth_facade = self.factory.create_bluetooth_facade()
diff --git a/server/cros/bluetooth/bluetooth_adapter_tests.py b/server/cros/bluetooth/bluetooth_adapter_tests.py
index e3a9ed4..9b9f1eb 100644
--- a/server/cros/bluetooth/bluetooth_adapter_tests.py
+++ b/server/cros/bluetooth/bluetooth_adapter_tests.py
@@ -1028,9 +1028,11 @@
del self.bluetooth_facade
if hasattr(self, 'input_facade'):
del self.input_facade
- self.factory = remote_facade_factory.RemoteFacadeFactory(self.host,
+ self.factory = remote_facade_factory.RemoteFacadeFactory(
+ self.host,
disable_arc=True,
- no_chrome=not self.start_browser, retry_rpc=False)
+ no_chrome=not self.start_browser,
+ retry_rpc=False)
self.bluetooth_facade = self.factory.create_bluetooth_facade()
self.input_facade = self.factory.create_input_facade()
diff --git a/server/cros/bluetooth/bluetooth_device.py b/server/cros/bluetooth/bluetooth_device.py
index 2c245de..8a8499f 100644
--- a/server/cros/bluetooth/bluetooth_device.py
+++ b/server/cros/bluetooth/bluetooth_device.py
@@ -40,7 +40,7 @@
XMLRPC_LOG_PATH = '/var/log/bluetooth_xmlrpc_device.log'
XMLRPC_REQUEST_TIMEOUT_SECONDS = 180
- def __init__(self, device_host, remote_facade_proxy = None):
+ def __init__(self, device_host, remote_facade_proxy=None):
"""Construct a BluetoothDevice.
@param device_host: host object representing a remote host.
diff --git a/server/cros/multimedia/bluetooth_facade_adapter.py b/server/cros/multimedia/bluetooth_facade_adapter.py
index 03d067e..0141cbc 100644
--- a/server/cros/multimedia/bluetooth_facade_adapter.py
+++ b/server/cros/multimedia/bluetooth_facade_adapter.py
@@ -1,7 +1,6 @@
# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
-
"""An adapter to remotely access the bluetooth facade on DUT."""
from autotest_lib.server.cros.bluetooth.bluetooth_device import BluetoothDevice
@@ -14,6 +13,7 @@
class on initialization, can be accessed from its _client property.
"""
+
def __init__(self, host, remote_facade_proxy):
"""Construct an BluetoothFacadeRemoteAdapter.
diff --git a/server/cros/multimedia/remote_facade_factory.py b/server/cros/multimedia/remote_facade_factory.py
index 166135f..06e4fd3 100644
--- a/server/cros/multimedia/remote_facade_factory.py
+++ b/server/cros/multimedia/remote_facade_factory.py
@@ -83,8 +83,12 @@
XMLRPC_RETRY_DELAY = 10
REBOOT_TIMEOUT = 60
- def __init__(self, host, no_chrome, extra_browser_args=None,
- disable_arc=False, retry_rpc=True):
+ def __init__(self,
+ host,
+ no_chrome,
+ extra_browser_args=None,
+ disable_arc=False,
+ retry_rpc=True):
"""Construct a RemoteFacadeProxy.
@param host: Host object representing a remote host.
@@ -323,8 +327,13 @@
"""
- def __init__(self, host, no_chrome=False, install_autotest=True,
- results_dir=None, extra_browser_args=None, disable_arc=False,
+ def __init__(self,
+ host,
+ no_chrome=False,
+ install_autotest=True,
+ results_dir=None,
+ extra_browser_args=None,
+ disable_arc=False,
retry_rpc=True):
"""Construct a RemoteFacadeFactory.
@@ -420,8 +429,8 @@
def create_kiosk_facade(self):
- """"Creates a kiosk facade object."""
- return kiosk_facade_adapter.KioskFacadeRemoteAdapter(
+ """"Creates a kiosk facade object."""
+ return kiosk_facade_adapter.KioskFacadeRemoteAdapter(
self._client, self._proxy)