| # Lint as: python2, python3 |
| # Copyright 2016 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Server side bluetooth adapter subtests.""" |
| |
| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
| |
| from datetime import datetime, timedelta |
| import errno |
| import functools |
| import six.moves.http_client |
| import inspect |
| import logging |
| import multiprocessing |
| import os |
| import re |
| import socket |
| import threading |
| import time |
| |
| import common |
| from autotest_lib.client.bin import utils |
| from autotest_lib.client.bin.input import input_event_recorder as recorder |
| from autotest_lib.client.common_lib import error |
| from autotest_lib.client.common_lib.cros.bluetooth import bluetooth_socket |
| from autotest_lib.client.cros.chameleon import chameleon |
| from autotest_lib.server.cros.bluetooth import bluetooth_peer_update |
| from autotest_lib.server.cros.bluetooth import bluetooth_test_utils |
| from autotest_lib.server import test |
| |
| from autotest_lib.client.bin.input.linux_input import ( |
| BTN_LEFT, BTN_RIGHT, EV_KEY, EV_REL, REL_X, REL_Y, REL_WHEEL, |
| REL_WHEEL_HI_RES, KEY_PLAYCD, KEY_PAUSECD, KEY_STOPCD, KEY_NEXTSONG, |
| KEY_PREVIOUSSONG) |
| from autotest_lib.server.cros.bluetooth.bluetooth_gatt_client_utils import ( |
| GATT_ClientFacade, GATT_Application, GATT_HIDApplication) |
| from autotest_lib.server.cros.multimedia import remote_facade_factory |
| import six |
| from six.moves import map |
| from six.moves import range |
| from six.moves import zip |
| |
| |
| Event = recorder.Event |
| |
| CHIPSET_TO_VIDPID = { 'BRCM-4354':[('0x002d','0x4354')], |
| 'MVL-8897':[('0x02df','0x912d')], |
| 'MVL-8997':[('0x1b4b','0x2b42')], |
| 'QCA-9462': [('0x168c', '0x0034')], |
| 'QCA-6174A-5':[('0x168c','0x003e')], |
| 'QCA-6174A-3':[('0x271','0x050a')], # UART |
| 'Intel-AX200':[('0x8086', '0x2723')], # CcP2 |
| 'Intel-AX201':[('0x8086','0x02f0')], # HrP2 |
| 'Intel-AC9260':[('0x8086','0x2526')], # ThP2 |
| 'Intel-AC9560':[('0x8086','0x31dc'), # JfP2 |
| ('0x8086','0x9df0')], |
| 'Intel-AC7260':[('0x8086','0x08b1'), # WP2 |
| ('0x8086','0x08b2')], |
| 'Intel-AC7265':[('0x8086','0x095a'), # StP2 |
| ('0x8086','0x095b')], |
| 'Realtek-RTL8822C-USB':[('0x10ec','0xc822')] } |
| |
| # We have a number of chipsets that are no longer supported. Known issues |
| # related to firmware will be ignored on these devices (b/169328792). |
| UNSUPPORTED_CHIPSETS = [ |
| 'BRCM-4354', 'MVL-8897', 'MVL-8997', 'Intel-AC7260', 'Intel-AC7265' |
| ] |
| |
| # Location of data traces relative to this (bluetooth_adapter_tests.py) file |
| BT_ADAPTER_TEST_PATH = os.path.dirname(__file__) |
| TRACE_LOCATION = os.path.join(BT_ADAPTER_TEST_PATH, 'input_traces/keyboard') |
| |
| RESUME_DELTA = -5 |
| |
| # Delay binding the methods since host is only available at run time. |
| SUPPORTED_DEVICE_TYPES = { |
| 'MOUSE': lambda btpeer: btpeer.get_bluetooth_hid_mouse, |
| 'KEYBOARD': lambda btpeer: btpeer.get_bluetooth_hid_keyboard, |
| 'BLE_MOUSE': lambda btpeer: btpeer.get_ble_mouse, |
| 'BLE_KEYBOARD': lambda btpeer: btpeer.get_ble_keyboard, |
| # Tester allows us to test DUT's discoverability, etc. from a peer |
| 'BLUETOOTH_TESTER': lambda btpeer: btpeer.get_bluetooth_tester, |
| # This is a base object that does not emulate any Bluetooth device. |
| # This object is preferred when only a pure XMLRPC server is needed |
| # on the btpeer host, e.g., to perform servod methods. |
| 'BLUETOOTH_BASE': lambda btpeer: btpeer.get_bluetooth_base, |
| # on the chameleon host, e.g., to perform servod methods. |
| 'BLUETOOTH_BASE': lambda chameleon: chameleon.get_bluetooth_base, |
| # A phone device that supports Bluetooth |
| 'BLE_PHONE': lambda chameleon: chameleon.get_ble_phone, |
| # A Bluetooth audio device emulating a headphone |
| 'BLUETOOTH_AUDIO': lambda chameleon: chameleon.get_bluetooth_audio, |
| } |
| |
| COMMON_FAILURES = { |
| 'Freeing adapter /org/bluez/hci': 'adapter_freed', |
| '/var/spool/crash/bluetoothd': 'bluetoothd_crashed', |
| } |
| |
| # TODO(b/150898182) - Don't run some tests on tablet form factors |
| # This list was generated by looking for tablet models on Goldeneye and removing |
| # the ones that were not launched |
| TABLET_MODELS = ['kakadu', 'kodama', 'krane', 'dru', 'druwl', 'dumo'] |
| |
| # TODO(b/161005264) - Some tests rely on software rotation to pass, so we must |
| # know which models don't use software rotation. Use a static list until we can |
| # query the bluez API instead. Extended advertising is supported on platforms |
| # on 4.19 and 5.4, with HrP2, JfP2, CcP2, RTL8822C, or QCN3991 chipsets. |
| EXT_ADV_MODELS = ['ezkinil', 'trembyle', 'drawcia', 'drawlat', 'drawman', |
| 'maglia', 'magolor', 'sarien', 'arcada', 'akemi', |
| 'drallion', 'drallion360', 'hatch', 'stryke', 'helios', |
| 'dragonair', 'dratini', 'duffy', 'jinlon', 'kaisa', |
| 'kindred', 'kled', 'puff', 'kohaku', 'nightfury', 'morphius', |
| 'lazor', 'trogdor'] |
| |
| # TODO(b/158336394) Realtek: Powers down during suspend due to high power usage |
| # during S3. |
| # TODO(b/168152910) Marvell: Powers down during suspend due to flakiness when |
| # entering suspend. This will also skip the tests |
| # for Veyron (which don't power down right now) but |
| # reconnect tests are still enabled for that platform |
| # to check for suspend stability. |
| SUSPEND_POWER_DOWN_CHIPSETS = ['Realtek-RTL8822C-USB', 'MVL-8897', 'MVL-8997'] |
| |
| |
| def method_name(): |
| """Get the method name of a class. |
| |
| This function is supposed to be invoked inside a class and will |
| return current method name who invokes this function. |
| |
| @returns: the string of the method name inside the class. |
| """ |
| return inspect.getouterframes(inspect.currentframe())[1][3] |
| |
| |
| def _run_method(method, method_name, *args, **kwargs): |
| """Run a target method and capture exceptions if any. |
| |
| This is just a wrapper of the target method so that we do not need to |
| write the exception capturing structure repeatedly. The method could |
| be either a device method or a facade method. |
| |
| @param method: the method to run |
| @param method_name: the name of the method |
| |
| @returns: the return value of target method() if successful. |
| False otherwise. |
| |
| """ |
| result = False |
| try: |
| result = method(*args, **kwargs) |
| except Exception as e: |
| logging.error('%s: %s', method_name, e) |
| except: |
| logging.error('%s: unexpected error', method_name) |
| return result |
| |
| |
| def get_bluetooth_emulated_device(btpeer, device_type): |
| """Get the bluetooth emulated device object. |
| |
| @param btpeer: the Bluetooth peer device |
| @param device_type : the bluetooth device type, e.g., 'MOUSE' |
| |
| @returns: the bluetooth device object |
| |
| """ |
| |
| def _retry_device_method(method_name, legal_falsy_values=[]): |
| """retry the emulated device's method. |
| |
| The method is invoked as device.xxxx() e.g., device.GetAdvertisedName(). |
| |
| Note that the method name string is provided to get the device's actual |
| method object at run time through getattr(). The rebinding is required |
| because a new device may have been created previously or during the |
| execution of fix_serial_device(). |
| |
| Given a device's method, it is not feasible to get the method name |
| through __name__ attribute. This limitation is due to the fact that |
| the device is a dotted object of an XML RPC server proxy. |
| As an example, with the method name 'GetAdvertisedName', we could |
| derive the correspoinding method device.GetAdvertisedName. On the |
| contrary, given device.GetAdvertisedName, it is not feasible to get the |
| method name by device.GetAdvertisedName.__name__ |
| |
| Also note that if the device method fails, we would try remediation |
| step and retry the device method. The remediation steps are |
| 1) re-creating the serial device. |
| 2) reset (powercycle) the bluetooth dongle. |
| 3) reboot Bluetooth peer. |
| If the device method still fails after these steps, we fail the test |
| |
| The default values exist for uses of this function before the options |
| were added, ideally we should change zero_ok to False. |
| |
| @param method_name: the string of the method name. |
| @param legal_falsy_values: Values that are falsy but might be OK. |
| |
| @returns: the result returned by the device's method if the call was |
| successful |
| |
| @raises: TestError if the devices's method fails or if repair of |
| peripheral kit fails |
| |
| """ |
| |
| action_table = [('recreate' , 'Fixing the serial device'), |
| ('reset', 'Power cycle the peer device'), |
| ('reboot', 'Reboot the chamleond host')] |
| |
| for i, (action, description) in enumerate(action_table): |
| logging.info('Attempt %s : %s ', i+1, method_name) |
| |
| result = _run_method(getattr(device, method_name), method_name) |
| if _is_successful(result, legal_falsy_values): |
| return result |
| |
| logging.error('%s failed the %s time. Attempting to %s', |
| method_name,i,description) |
| if not fix_serial_device(btpeer, device, action): |
| logging.info('%s failed', description) |
| else: |
| logging.info('%s successful', description) |
| |
| #try it last time after fix it by last action |
| result = _run_method(getattr(device, method_name), method_name) |
| if _is_successful(result, legal_falsy_values): |
| return result |
| |
| raise error.TestError('Failed to execute %s. Bluetooth peer device is' |
| 'not working' % method_name) |
| |
| |
| if device_type not in SUPPORTED_DEVICE_TYPES: |
| raise error.TestError('The device type is not supported: %s', |
| device_type) |
| |
| # Get the bluetooth device object and query some important properties. |
| device = SUPPORTED_DEVICE_TYPES[device_type](btpeer)() |
| |
| # Get some properties of the kit |
| # NOTE: Strings updated here must be kept in sync with Btpeer. |
| device._capabilities = _retry_device_method('GetCapabilities') |
| device._transports = device._capabilities["CAP_TRANSPORTS"] |
| device._is_le_only = ("TRANSPORT_LE" in device._transports and |
| len(device._transports) == 1) |
| device._has_pin = device._capabilities["CAP_HAS_PIN"] |
| device.can_init_connection = device._capabilities["CAP_INIT_CONNECT"] |
| |
| _retry_device_method('Init') |
| logging.info('device type: %s', device_type) |
| |
| device.name = _retry_device_method('GetAdvertisedName') |
| logging.info('device name: %s', device.name) |
| |
| device.address = _retry_device_method('GetLocalBluetoothAddress') |
| logging.info('address: %s', device.address) |
| |
| pin_falsy_values = [] if device._has_pin else [None] |
| device.pin = _retry_device_method('GetPinCode', pin_falsy_values) |
| logging.info('pin: %s', device.pin) |
| |
| class_falsy_values = [None] if device._is_le_only else [0] |
| |
| # Class of service is None for LE-only devices. Don't fail or parse it. |
| device.class_of_service = _retry_device_method('GetClassOfService', |
| class_falsy_values) |
| if device._is_le_only: |
| parsed_class_of_service = device.class_of_service |
| else: |
| parsed_class_of_service = "0x%04X" % device.class_of_service |
| logging.info('class of service: %s', parsed_class_of_service) |
| |
| device.class_of_device = _retry_device_method('GetClassOfDevice', |
| class_falsy_values) |
| # Class of device is None for LE-only devices. Don't fail or parse it. |
| if device._is_le_only: |
| parsed_class_of_device = device.class_of_device |
| else: |
| parsed_class_of_device = "0x%04X" % device.class_of_device |
| logging.info('class of device: %s', parsed_class_of_device) |
| |
| device.device_type = _retry_device_method('GetDeviceType') |
| logging.info('device type: %s', device.device_type) |
| |
| device.authentication_mode = None |
| if not device._is_le_only: |
| device.authentication_mode = _retry_device_method('GetAuthenticationMode') |
| logging.info('authentication mode: %s', device.authentication_mode) |
| |
| device.port = _retry_device_method('GetPort') |
| logging.info('serial port: %s\n', device.port) |
| |
| return device |
| |
| |
| def recreate_serial_device(device): |
| """Create and connect to a new serial device. |
| |
| @param device: the bluetooth HID device |
| |
| @returns: True if the serial device is re-created successfully. |
| |
| """ |
| logging.info('Remove the old serial device and create a new one.') |
| if device is not None: |
| try: |
| device.Close() |
| except: |
| logging.error('failed to close the serial device.') |
| return False |
| try: |
| device.CreateSerialDevice() |
| return True |
| except: |
| logging.error('failed to invoke CreateSerialDevice.') |
| return False |
| |
| |
| def _check_device_init(device, operation): |
| # Check if the serial device could initialize, connect, and |
| # enter command mode correctly. |
| logging.info('Checking device status...') |
| if not _run_method(device.Init, 'Init'): |
| logging.info('device.Init: failed after %s', operation) |
| return False |
| if not device.CheckSerialConnection(): |
| logging.info('device.CheckSerialConnection: failed after %s', operation) |
| return False |
| if not _run_method(device.EnterCommandMode, 'EnterCommandMode'): |
| logging.info('device.EnterCommandMode: failed after %s', operation) |
| return False |
| logging.info('The device is created successfully after %s.', operation) |
| return True |
| |
| def _reboot_btpeer(btpeer, device): |
| """ Reboot Bluetooth peer device. |
| |
| Also power cycle the device since reboot may not do that..""" |
| |
| # Chameleond fizz hosts should have write protect removed and |
| # set_gbb_flags set to 0 to minimize boot time |
| REBOOT_SLEEP_SECS = 10 |
| RESET_SLEEP_SECS = 1 |
| |
| # Close the bluetooth peripheral device and reboot the chameleon board. |
| device.Close() |
| logging.info("Powercycling the device") |
| device.PowerCycle() |
| time.sleep(RESET_SLEEP_SECS) |
| logging.info('rebooting Bluetooth peer...') |
| btpeer.reboot() |
| |
| # Every btpeer reboot would take a bit more than REBOOT_SLEEP_SECS. |
| # Sleep REBOOT_SLEEP_SECS and then begin probing the btpeer board. |
| time.sleep(REBOOT_SLEEP_SECS) |
| return _check_device_init(device, 'reboot') |
| |
| def _reset_device_power(device): |
| """Power cycle the device.""" |
| RESET_SLEEP_SECS = 1 |
| try: |
| if not device.PowerCycle(): |
| logging.info('device.PowerCycle() failed') |
| return False |
| except: |
| logging.error('exception in device.PowerCycle') |
| else: |
| logging.info('device powercycled') |
| time.sleep(RESET_SLEEP_SECS) |
| return _check_device_init(device, 'reset') |
| |
| def _is_successful(result, legal_falsy_values=[]): |
| """Is the method result considered successful? |
| |
| Some method results, for example that of class_of_service, may be 0 which is |
| considered a valid result. Occassionally, None is acceptable. |
| |
| The default values exist for uses of this function before the options were |
| added, ideally we should change zero_ok to False. |
| |
| @param result: a method result |
| @param legal_falsy_values: Values that are falsy but might be OK. |
| |
| @returns: True if bool(result) is True, or if result is 0 and zero_ok, or if |
| result is None and none_ok. |
| """ |
| truthiness_of_result = bool(result) |
| return truthiness_of_result or result in legal_falsy_values |
| |
| |
| def _flag_common_failures(instance): |
| """Checks if a common failure has occurred during the test run |
| |
| Scans system logs for known signs of failure. If a failure is discovered, |
| it is added to the test results, to make it easier to identify common root |
| causes from Stainless |
| """ |
| |
| for fail_tag, fail_log in COMMON_FAILURES.items(): |
| if instance.bluetooth_facade.messages_find(fail_tag): |
| logging.error('Detected failure tag: %s', fail_tag) |
| # We mark this instance's results with the discovered failure |
| if type(instance.results) is dict: |
| instance.results[fail_log] = True |
| |
| |
| def fix_serial_device(btpeer, device, operation='reset'): |
| """Fix the serial device. |
| |
| This function tries to fix the serial device by |
| (1) re-creating a serial device, or |
| (2) power cycling the usb port to which device is connected |
| (3) rebooting the Bluetooth peeer |
| |
| Argument operation determine which of the steps above are perform |
| |
| Note that rebooting the btpeer board or resetting the device will remove |
| the state on the peripheral which might cause test failures. Please use |
| reset/reboot only before or after a test. |
| |
| @param btpeer: the Bluetooth peer |
| @param device: the bluetooth device. |
| @param operation: Recovery operation to perform 'recreate/reset/reboot' |
| |
| @returns: True if the serial device is fixed. False otherwise. |
| |
| """ |
| |
| if operation == 'recreate': |
| # Check the serial connection. Fix it if needed. |
| if device.CheckSerialConnection(): |
| # The USB serial connection still exists. |
| # Re-connection suffices to solve the problem. The problem |
| # is usually caused by serial port change. For example, |
| # the serial port changed from /dev/ttyUSB0 to /dev/ttyUSB1. |
| logging.info('retry: creating a new serial device...') |
| return recreate_serial_device(device) |
| else: |
| # Recreate the bluetooth peer device |
| return _check_device_init(device, operation) |
| |
| elif operation == 'reset': |
| # Powercycle the USB port where the bluetooth peer device is connected. |
| # RN-42 and RN-52 share the same vid:pid so both will be powercycled. |
| # This will only work on fizz host with write protection removed. |
| # Note that the state on the device will be lost. |
| return _reset_device_power(device) |
| |
| elif operation == 'reboot': |
| # Reboot the Bluetooth peer device. |
| # The device is power cycled before rebooting Bluetooth peer device |
| return _reboot_btpeer(btpeer, device) |
| |
| else: |
| logging.error('fix_serial_device Invalid operation %s', operation) |
| return False |
| |
| |
| def retry(test_method, instance, *args, **kwargs): |
| """Execute the target facade test_method(). Retry if failing the first time. |
| |
| A test_method is something like self.test_xxxx() in BluetoothAdapterTests, |
| e.g., BluetoothAdapterTests.test_bluetoothd_running(). |
| |
| @param test_method: the test method to retry |
| |
| @returns: True if the return value of test_method() is successful. |
| False otherwise. |
| |
| """ |
| if _is_successful(_run_method(test_method, test_method.__name__, |
| instance, *args, **kwargs)): |
| return True |
| |
| # Try to fix the serial device if applicable. |
| logging.error('%s failed at the 1st time: (%s)', test_method.__name__, |
| str(instance.results)) |
| |
| # If this test does not use any attached serial device, just re-run |
| # the test. |
| logging.info('%s: retry the 2nd time.', test_method.__name__) |
| time.sleep(1) |
| |
| |
| if not hasattr(instance, 'use_btpeer'): |
| return _is_successful(_run_method(test_method, test_method.__name__, |
| instance, *args, **kwargs)) |
| for device_type in SUPPORTED_DEVICE_TYPES: |
| for device in getattr(instance, 'devices')[device_type]: |
| #fix_serial_device in 'recreate' mode doesn't require btpeer |
| #so just pass None for convenient. |
| if not fix_serial_device(None, device, "recreate"): |
| return False |
| |
| logging.info('%s: retry the 2nd time.', test_method.__name__) |
| return _is_successful(_run_method(test_method, test_method.__name__, |
| instance, *args, **kwargs)) |
| |
| |
| def test_retry_and_log(test_method_or_retry_flag, |
| messages_start=True, |
| messages_stop=True): |
| """A decorator that logs test results, collects error messages, and retries |
| on request. |
| |
| @param test_method_or_retry_flag: either the test_method or a retry_flag. |
| There are some possibilities of this argument: |
| 1. the test_method to conduct and retry: should retry the test_method. |
| This occurs with |
| @test_retry_and_log |
| 2. the retry flag is True. Should retry the test_method. |
| This occurs with |
| @test_retry_and_log(True) |
| 3. the retry flag is False. Do not retry the test_method. |
| This occurs with |
| @test_retry_and_log(False) |
| |
| @param messages_start: Start collecting messages before running the test |
| @param messages_stop: Stop collecting messages after running the test and |
| analyze the results. |
| |
| @returns: a wrapper of the test_method with test log. The retry mechanism |
| would depend on the retry flag. |
| |
| """ |
| |
| def decorator(test_method): |
| """A decorator wrapper of the decorated test_method. |
| |
| @param test_method: the test method being decorated. |
| |
| @returns the wrapper of the test method. |
| |
| """ |
| @functools.wraps(test_method) |
| def wrapper(instance, *args, **kwargs): |
| """A wrapper of the decorated method. |
| |
| @param instance: an BluetoothAdapterTests instance |
| |
| @returns the result of the test method |
| |
| """ |
| instance.results = None |
| fail_msg = None |
| test_result = False |
| should_raise = hasattr(instance, 'fail_fast') and instance.fail_fast |
| |
| instance.last_test_method = test_method.__name__ |
| syslog_captured = False |
| |
| try: |
| if messages_start: |
| # Grab /var/log/messages output during test run |
| instance.bluetooth_facade.messages_start() |
| |
| if callable(test_method_or_retry_flag |
| ) or test_method_or_retry_flag: |
| test_result = retry(test_method, instance, *args, **kwargs) |
| else: |
| test_result = test_method(instance, *args, **kwargs) |
| |
| if messages_stop: |
| syslog_captured = instance.bluetooth_facade.messages_stop() |
| |
| if test_result: |
| logging.info('[*** passed: {}]'.format( |
| test_method.__name__)) |
| else: |
| if syslog_captured: |
| _flag_common_failures(instance) |
| fail_msg = '[--- failed: {} ({})]'.format( |
| test_method.__name__, str(instance.results)) |
| logging.error(fail_msg) |
| instance.fails.append(fail_msg) |
| # Log TestError and TestNA and let the quicktest wrapper catch it. |
| # Those errors should skip out of the testcase entirely. |
| except error.TestNAError as e: |
| fail_msg = '[--- TESTNA {} ({})]'.format( |
| test_method.__name__, str(e)) |
| logging.error(fail_msg) |
| raise |
| except error.TestError as e: |
| fail_msg = '[--- ERROR {} ({})]'.format( |
| test_method.__name__, str(e)) |
| logging.error(fail_msg) |
| raise |
| except error.TestFail as e: |
| fail_msg = '[--- failed {} ({})]'.format( |
| test_method.__name__, str(e)) |
| logging.error(fail_msg) |
| instance.fails.append(fail_msg) |
| should_raise = True |
| |
| # Check whether we should fail fast |
| if fail_msg and should_raise: |
| logging.info('Fail fast') |
| raise error.TestFail(instance.fails) |
| |
| return test_result |
| |
| return wrapper |
| |
| if callable(test_method_or_retry_flag): |
| # If the decorator function comes with no argument like |
| # @test_retry_and_log |
| return decorator(test_method_or_retry_flag) |
| else: |
| # If the decorator function comes with an argument like |
| # @test_retry_and_log(False) |
| return decorator |
| |
| |
| def test_case_log(method): |
| """A decorator for test case methods. |
| |
| The main purpose of this decorator is to display the test case name |
| in the test log which looks like |
| |
| <... test_case_RA3_CD_SI200_CD_PC_CD_UA3 ...> |
| |
| @param method: the test case method to decorate. |
| |
| @returns: a wrapper function of the decorated method. |
| |
| """ |
| @functools.wraps(method) |
| def wrapper(instance, *args, **kwargs): |
| """Log the name of the wrapped method before execution""" |
| logging.info('\n<... %s ...>', method.__name__) |
| method(instance, *args, **kwargs) |
| return wrapper |
| |
| |
| class BluetoothAdapterTests(test.test): |
| """Server side bluetooth adapter tests. |
| |
| This test class tries to thoroughly verify most of the important work |
| states of a bluetooth adapter. |
| |
| The various test methods are supposed to be invoked by actual autotest |
| tests such as server/cros/site_tests/bluetooth_Adapter*. |
| |
| """ |
| version = 1 |
| ADAPTER_ACTION_SLEEP_SECS = 1 |
| ADAPTER_PAIRING_TIMEOUT_SECS = 60 |
| ADAPTER_CONNECTION_TIMEOUT_SECS = 30 |
| # Wait after connect for input device to be ready for use |
| ADAPTER_HID_INPUT_DELAY = 5 |
| ADAPTER_DISCONNECTION_TIMEOUT_SECS = 30 |
| ADAPTER_PAIRING_POLLING_SLEEP_SECS = 3 |
| ADAPTER_DISCOVER_TIMEOUT_SECS = 60 # 30 seconds too short sometimes |
| ADAPTER_DISCOVER_POLLING_SLEEP_SECS = 1 |
| ADAPTER_DISCOVER_NAME_TIMEOUT_SECS = 30 |
| ADAPTER_WAKE_ENABLE_TIMEOUT_SECS = 30 |
| |
| ADAPTER_WAIT_DEFAULT_TIMEOUT_SECS = 10 |
| ADAPTER_POLLING_DEFAULT_SLEEP_SECS = 1 |
| |
| HID_REPORT_SLEEP_SECS = 1 |
| |
| |
| DEFAULT_START_DELAY_SECS = 0 |
| DEFAULT_HOLD_INTERVAL_SECS = 10 |
| DEFAULT_HOLD_TIMEOUT_SECS = 60 |
| DEFAULT_HOLD_SLEEP_SECS = 1 |
| |
| # Default suspend time in seconds for suspend resume. |
| SUSPEND_TIME_SECS=10 |
| SUSPEND_ENTER_SECS=10 |
| RESUME_TIME_SECS=30 |
| RESUME_INTERNAL_TIMEOUT_SECS = 180 |
| |
| # Minimum RSSI required for peer devices during testing |
| MIN_RSSI = -70 |
| |
| # hci0 is the default hci device if there is no external bluetooth dongle. |
| EXPECTED_HCI = 'hci0' |
| |
| CLASS_OF_SERVICE_MASK = 0xFFE000 |
| CLASS_OF_DEVICE_MASK = 0x001FFF |
| |
| # Constants about advertising. |
| DAFAULT_MIN_ADVERTISEMENT_INTERVAL_MS = 181.25 |
| DAFAULT_MAX_ADVERTISEMENT_INTERVAL_MS = 181.25 |
| ADVERTISING_INTERVAL_UNIT = 0.625 |
| |
| # Error messages about advertising dbus methods. |
| ERROR_FAILED_TO_REGISTER_ADVERTISEMENT = ( |
| 'org.bluez.Error.NotPermitted: Maximum advertisements reached') |
| ERROR_INVALID_ADVERTISING_INTERVALS = ( |
| 'org.bluez.Error.InvalidArguments: Invalid arguments') |
| |
| # Supported profiles by chrome os. |
| SUPPORTED_UUIDS = { |
| 'GATT_UUID': '00001801-0000-1000-8000-00805f9b34fb', |
| 'A2DP_SOURCE_UUID': '0000110a-0000-1000-8000-00805f9b34fb', |
| 'HFP_AG_UUID': '0000111f-0000-1000-8000-00805f9b34fb', |
| 'PNP_UUID': '00001200-0000-1000-8000-00805f9b34fb', |
| 'GAP_UUID': '00001800-0000-1000-8000-00805f9b34fb'} |
| |
| # Board list for name/ID test check. These devices don't need to be tested |
| REFERENCE_BOARDS = [ |
| 'rambi', 'nyan', 'oak', 'reef', 'yorp', 'bip', 'volteer', |
| 'volteer2' |
| ] |
| |
| # Path for btmon logs |
| BTMON_DIR_LOG_PATH = '/var/log/btmon' |
| |
| # Path for usbmon logs |
| USBMON_DIR_LOG_PATH = '/var/log/usbmon' |
| |
| # The agent capability of various device types. |
| AGENT_CAPABILITY = { |
| 'BLUETOOTH_AUDIO': 'NoInputNoOutput', |
| } |
| |
| def assert_on_fail(self, result, raiseNA=False): |
| """ If the called function returns a false-like value, raise an error. |
| |
| Call test methods (i.e. with @test_retry_and_log) wrapped with this |
| function and failures will raise instead of continuing the test. |
| |
| For example: |
| self.assert_on_fail(self.test_pairing(...)) |
| |
| @param result: Result of test method called. |
| @param raiseNA: Whether to raise TestNAError instead of TestFail |
| |
| @raises error.TestNAError |
| @raises error.TestFail |
| """ |
| if not result: |
| failure_msg = 'Assert on fail: {}'.format(self.last_test_method) |
| logging.error(failure_msg) |
| if raiseNA: |
| raise error.TestNAError(failure_msg) |
| else: |
| raise error.TestFail(failure_msg) |
| |
| |
| # TODO(b/131170539) remove when sarien/arcada no longer have _signed |
| # postfix |
| def get_base_platform_name(self): |
| """Returns the DUT platform name |
| |
| If the DUT is a DVT device, _signed or _unsigned may be appended |
| to the device name, which we should ignore in our BT tests |
| |
| @returns: String name of the DUT's platform with _signed or |
| _unsigned removed |
| """ |
| |
| platform = self.host.get_platform() |
| |
| return platform.replace('_signed', '').replace('_unsigned', '') |
| |
| |
| def group_btpeers_type(self): |
| """Group all Bluetooth peers by the type of their detected device.""" |
| |
| # Use previously created btpeer_group instead of creating new |
| if len(self.btpeer_group_copy) > 0: |
| logging.info('Using previously created btpeer group') |
| for device_type in SUPPORTED_DEVICE_TYPES: |
| self.btpeer_group[device_type] = \ |
| self.btpeer_group_copy[device_type][:] |
| return |
| |
| # Create new btpeer_group |
| for device_type in SUPPORTED_DEVICE_TYPES: |
| self.btpeer_group[device_type] = list() |
| # Create copy of btpeer_group |
| self.btpeer_group_copy[device_type] = list() |
| |
| for idx, btpeer in enumerate(self.host.btpeer_list): |
| for device_type,gen_device_func in SUPPORTED_DEVICE_TYPES.items(): |
| try: |
| device = gen_device_func(btpeer)() |
| if device.CheckSerialConnection(): |
| self.btpeer_group[device_type].append(btpeer) |
| logging.info('%d-th btpeer find device %s', \ |
| idx, device_type) |
| # Create copy of btpeer_group |
| self.btpeer_group_copy[device_type].append(btpeer) |
| except: |
| logging.debug('Error with initializing %s on %d-th' |
| 'btpeer', device_type, idx) |
| if len(self.btpeer_group[device_type]) == 0: |
| logging.error('No device is detected on %d-th btpeer', idx) |
| |
| logging.debug("self.bt_group is %s",self.btpeer_group) |
| |
| |
| def wait_for_device(self, device, timeout=10): |
| """Waits for device to become available again |
| |
| We reset raspberry pi peer between tests. This method helps us wait to |
| prevent us from trying to use the device before it comes back up again. |
| |
| @param device: proxy object of peripheral device |
| """ |
| |
| def is_device_ready(): |
| """Tries to use a service of the device |
| |
| @returns: True if device is available to provide service |
| False otherwise |
| """ |
| |
| try: |
| # Call a simple (fast) function to determine if device is online |
| # and reachable. If we can query this property, we know the |
| # device is available for us to use |
| getattr(device, 'GetCapabilities')() |
| |
| except Exception as e: |
| return False |
| |
| return True |
| |
| |
| try: |
| utils.poll_for_condition(condition=is_device_ready, |
| desc='wait_for_device', |
| timeout=timeout) |
| |
| except utils.TimeoutError as e: |
| raise error.TestError('Peer is not available after waiting') |
| |
| |
| def clear_raspi_device(self, device): |
| """Clears a device on a raspi peer by resetting bluetooth stack |
| |
| @param device: proxy object of peripheral device |
| """ |
| |
| try: |
| device.ResetStack() |
| |
| except socket.error as e: |
| # Ignore conn reset, expected during stack reset |
| if e.errno != errno.ECONNRESET: |
| raise |
| |
| except chameleon.ChameleonConnectionError as e: |
| # Ignore chameleon conn reset, expected during stack reset |
| if str(errno.ECONNRESET) not in str(e): |
| raise |
| |
| except six.moves.http_client.BadStatusLine as e: |
| # BadStatusLine occurs occasionally when chameleon |
| # is restarted. We ignore it here |
| logging.error('Ignoring badstatusline exception') |
| pass |
| |
| # Catch generic Fault exception by rpc server, ignore |
| # method not available as it indicates platform didn't |
| # support method and that's ok |
| except Exception as e: |
| if not (e.__class__.__name__ == 'Fault' and |
| 'is not supported' in str(e)): |
| raise |
| |
| # Ensure device is back online before continuing |
| self.wait_for_device(device, timeout=30) |
| |
| def device_set_powered(self, device, powered): |
| """Set raspi BT powered state. |
| |
| @param powered: Powered state to set on Raspi. |
| """ |
| if powered: |
| device.AdapterPowerOn() |
| else: |
| device.AdapterPowerOff() |
| |
| def get_device_rasp(self, device_num, on_start=True): |
| """Get all bluetooth device objects from Bluetooth peer devices |
| This method should be called only after group_btpeers_type |
| @param device_num : dict of {device_type:number}, to specify the number |
| of device needed for each device_type. |
| |
| @param on_start: boolean describing whether the requested clear is for a |
| new test, or in the middle of a current one |
| |
| @returns: True if Success. |
| """ |
| |
| logging.info("in get_device_rasp %s onstart %s", device_num, on_start) |
| total_num_devices = sum(device_num.values()) |
| if total_num_devices > len(self.host.btpeer_list): |
| logging.error( |
| 'Total number of devices %s is greater than the' |
| ' number of Bluetooth peers %s', total_num_devices, |
| len(self.host.btpeer_list)) |
| return False |
| |
| for device_type, number in device_num.items(): |
| total_num_devices += number |
| if len(self.btpeer_group[device_type]) < number: |
| logging.error('Number of Bluetooth peers with device type' |
| '%s is %d, which is less then needed %d', device_type, |
| len(self.btpeer_group[device_type]), number) |
| return False |
| |
| for btpeer in self.btpeer_group[device_type][:number]: |
| logging.info("getting emulated %s", device_type) |
| device = self.reset_device(btpeer, device_type, on_start) |
| |
| self.devices[device_type].append(device) |
| |
| # Remove this btpeer from btpeer_group since it is already |
| # configured as a specific device |
| for temp_device in SUPPORTED_DEVICE_TYPES: |
| if btpeer in self.btpeer_group[temp_device]: |
| self.btpeer_group[temp_device].remove(btpeer) |
| |
| return True |
| |
| |
| def get_device(self, device_type, on_start=True): |
| """Get the bluetooth device object. |
| |
| @param device_type : the bluetooth device type, e.g., 'MOUSE' |
| |
| @param on_start: boolean describing whether the requested clear is for a |
| new test, or in the middle of a current one |
| |
| @returns: the bluetooth device object |
| |
| """ |
| |
| self.devices[device_type].append( |
| self.reset_device(self.host.btpeer, device_type, on_start)) |
| |
| return self.devices[device_type][-1] |
| |
| |
| def reset_device(self, peer, device_type, clear_device=True): |
| """Reset the peer device in order to be used as a different type. |
| |
| @param peer: the peer device to reset with new device type |
| @param device_type : the new bluetooth device type, e.g., 'MOUSE' |
| @param clear_device: whether to clear the device state |
| |
| @returns: the bluetooth device object |
| |
| """ |
| device = get_bluetooth_emulated_device(peer, device_type) |
| |
| # Re-fresh device to clean state if test is starting |
| if clear_device: |
| self.clear_raspi_device(device) |
| |
| try: |
| # Tell generic chameleon to bind to this device type |
| device.SpecifyDeviceType(device_type) |
| |
| # Catch generic Fault exception by rpc server, ignore method not |
| # available as it indicates platform didn't support method and that's |
| # ok |
| except Exception as e: |
| if not (e.__class__.__name__ == 'Fault' and |
| 'is not supported' in str(e)): |
| logging.error("got exception %s", str(e)) |
| raise |
| |
| return device |
| |
| |
| def is_device_available(self, btpeer, device_type): |
| """Determines if the named device is available on the linked peer |
| |
| @param device_type: the bluetooth HID device type, e.g., 'MOUSE' |
| |
| @returns: True if it is able to resolve the device, false otherwise |
| """ |
| |
| device = SUPPORTED_DEVICE_TYPES[device_type](btpeer)() |
| try: |
| # The proxy prevents us from checking if the object is None directly |
| # so instead we call a fast method that any peripheral must support. |
| # This will fail if the object over the proxy doesn't exist |
| getattr(device, 'GetCapabilities')() |
| |
| except Exception as e: |
| return False |
| |
| return True |
| |
| |
| def list_devices_available(self): |
| """Queries which devices are available on btpeer(s) |
| |
| @returns: dict mapping HID device types to number of supporting peers |
| available, e.g. {'MOUSE':1, 'KEYBOARD':1} |
| """ |
| devices_available = {} |
| for device_type in SUPPORTED_DEVICE_TYPES: |
| for btpeer in self.host.btpeer_list: |
| if self.is_device_available(btpeer, device_type): |
| devices_available[device_type] = \ |
| devices_available.get(device_type, 0) + 1 |
| |
| logging.debug("devices available are %s", devices_available) |
| return devices_available |
| |
| |
| def suspend_resume(self, suspend_time=SUSPEND_TIME_SECS): |
| """Suspend the DUT for a while and then resume. |
| |
| @param suspend_time: the suspend time in secs |
| @raises errors.TestFail if the device reboots during suspend |
| |
| """ |
| boot_id = self.host.get_boot_id() |
| suspend = self.suspend_async(suspend_time=suspend_time) |
| start_time = self.bluetooth_facade.get_device_time() |
| |
| # Give the system some time to enter suspend |
| self.test_suspend_and_wait_for_sleep( |
| suspend, sleep_timeout=self.SUSPEND_ENTER_SECS) |
| |
| # Wait for resume - since we're not testing suspend itself, we are |
| # lenient with the resume time here |
| self.test_wait_for_resume(boot_id, |
| suspend, |
| resume_timeout=suspend_time, |
| test_start_time=start_time) |
| |
| |
| def reboot(self): |
| """Reboot the DUT and recreate necessary processes and variables""" |
| self.host.reboot() |
| |
| # We need to recreate the bluetooth_facade after a reboot. |
| # Delete the proxy first so it won't delete the old one, which |
| # invokes disconnection, after creating the new one. |
| if hasattr(self, 'factory'): |
| del self.factory |
| if hasattr(self, 'bluetooth_facade'): |
| del self.bluetooth_facade |
| if hasattr(self, 'input_facade'): |
| del self.input_facade |
| self.factory = remote_facade_factory.RemoteFacadeFactory( |
| self.host, disable_arc=True, no_chrome=not self.start_browser) |
| self.bluetooth_facade = self.factory.create_bluetooth_facade() |
| self.input_facade = self.factory.create_input_facade() |
| |
| # Re-enable debugging verbose since Chrome will set it to |
| # default(disable). |
| self.enable_disable_debug_log(enable=True) |
| |
| # Re-disable cellular |
| self.enable_disable_cellular(enable=False) |
| |
| # Re-disable ui |
| self.enable_disable_ui(enable=False) |
| |
| self.start_new_btmon() |
| self.start_new_usbmon() |
| |
| |
| def _wait_till_condition_holds(self, func, method_name, |
| timeout=DEFAULT_HOLD_TIMEOUT_SECS, |
| sleep_interval=DEFAULT_HOLD_SLEEP_SECS, |
| hold_interval=DEFAULT_HOLD_INTERVAL_SECS, |
| start_delay=DEFAULT_START_DELAY_SECS): |
| """ Wait for the func() to hold true for a period of time |
| |
| |
| @param func: the function to wait for. |
| @param method_name: the invoking class method. |
| @param timeout: number of seconds to wait before giving up. |
| @param sleep_interval: the interval in seconds to sleep between |
| invoking func(). |
| @param hold_interval: the interval in seconds for the condition to |
| remain true |
| @param start_delay: interval in seconds to wait before starting |
| |
| @returns: True if the condition is met, |
| False otherwise |
| |
| """ |
| if start_delay > 0: |
| logging.debug('waiting for %s secs before checking %s',start_delay, |
| method_name) |
| time.sleep(start_delay) |
| |
| try: |
| utils.poll_till_condition_holds(condition=func, |
| timeout=timeout, |
| sleep_interval=sleep_interval, |
| hold_interval = hold_interval, |
| desc=('Waiting %s' % method_name)) |
| return True |
| except utils.TimeoutError as e: |
| logging.error('%s: %s', method_name, e) |
| except Exception as e: |
| logging.error('%s: %s', method_name, e) |
| err = 'bluetoothd possibly crashed. Check out /var/log/messages.' |
| logging.error(err) |
| except: |
| logging.error('%s: unexpected error', method_name) |
| return False |
| |
| |
| def _wait_for_condition(self, func, method_name, |
| timeout=ADAPTER_WAIT_DEFAULT_TIMEOUT_SECS, |
| sleep_interval=ADAPTER_POLLING_DEFAULT_SLEEP_SECS, |
| start_delay=DEFAULT_START_DELAY_SECS): |
| """Wait for the func() to become True. |
| |
| @param func: the function to wait for. |
| @param method_name: the invoking class method. |
| @param timeout: number of seconds to wait before giving up. |
| @param sleep_interval: the interval in seconds to sleep between |
| invoking func(). |
| @param start_delay: interval in seconds to wait before starting |
| |
| @returns: True if the condition is met, |
| False otherwise |
| |
| """ |
| |
| if start_delay > 0: |
| logging.debug('waiting for %s secs before checking %s',start_delay, |
| method_name) |
| time.sleep(start_delay) |
| |
| try: |
| utils.poll_for_condition(condition=func, |
| timeout=timeout, |
| sleep_interval=sleep_interval, |
| desc=('Waiting %s' % method_name)) |
| return True |
| except utils.TimeoutError as e: |
| logging.error('%s: %s', method_name, e) |
| except Exception as e: |
| logging.error('%s: %s', method_name, e) |
| err = 'bluetoothd possibly crashed. Check out /var/log/messages.' |
| logging.error(err) |
| except: |
| logging.error('%s: unexpected error', method_name) |
| return False |
| |
| def ignore_failure(instance, test_method, *args, **kwargs): |
| """ Wrapper to prevent a test_method failure from failing the test batch |
| |
| Sometimes a test method needs to be used as a normal function, for its |
| result. This wrapper prevent test_method failure being recorded in |
| instance.fails and causing a failure of the quick test batch. |
| |
| @param test_method: test_method |
| @returns: result of the test_method |
| """ |
| |
| original_fails = instance.fails[:] |
| test_result = test_method(*args, **kwargs) |
| if not test_result: |
| logging.info("%s failure is ignored",test_method.__name__) |
| instance.fails = original_fails |
| return test_result |
| |
| |
| def start_agent(self, device): |
| """Start the pairing agent of the device if applicable. |
| |
| @param device: the peer device |
| """ |
| dev_type = device.GetDeviceType() |
| capability = self.AGENT_CAPABILITY.get(dev_type) |
| if capability: |
| device.StartPairingAgent(capability) |
| |
| |
| def stop_agent(self, device): |
| """Stop the pairing agent of the device if applicable. |
| |
| @param device: the peer device |
| """ |
| dev_type = device.GetDeviceType() |
| capability = self.AGENT_CAPABILITY.get(dev_type) |
| if capability: |
| device.StopPairingAgent() |
| |
| |
| # ------------------------------------------------------------------- |
| # Adater standalone tests |
| # ------------------------------------------------------------------- |
| |
| |
| def service_exists(self, service_name): |
| """Checks if a service exists on the DUT |
| |
| @param service_name: name of the service |
| |
| @returns: True if service status can be queried, else False |
| """ |
| |
| status_cmd = 'initctl status {}'.format(service_name) |
| try: |
| # Querying the status of a non-existent service throws an |
| # AutoservRunError exception. If no exception is thrown, we know |
| # the service exists |
| self.host.run(status_cmd) |
| |
| except error.AutoservRunError: |
| return False |
| |
| return True |
| |
| |
| def service_enabled(self, service_name): |
| """Checks if a service is running on the DUT |
| |
| @param service_name: name of the service |
| |
| @throws: AutoservRunError is thrown if there is no service with the |
| provided name installed on the DUT. |
| |
| @returns: True if service is currently running, else False |
| """ |
| |
| status_cmd = 'initctl status {}'.format(service_name) |
| output = self.host.run(status_cmd).stdout |
| |
| return 'start/running' in output |
| |
| |
| def _initctl_services(self, services, command): |
| """Use initctl to control service on the DUT |
| |
| @param services: list of string service names |
| @param command: initctl command on the services |
| 'start': to start the service |
| 'stop': to stop the service |
| 'restart': to restart the service |
| |
| @returns: True if services were set successfully, else False |
| """ |
| for service in services: |
| # Some platforms will not support all services. In these cases, |
| # no need to fail, since they won't interfere with our tests |
| if not self.service_exists(service): |
| logging.debug('Service %s does not exist on DUT', service) |
| continue |
| |
| # A sample call to enable or disable a service is as follows: |
| # "initctl stop modemfwd" |
| if command in ['start', 'stop']: |
| enable = command == 'start' |
| if self.service_enabled(service) != enable: |
| self.host.run('initctl {} {}'.format(command, service)) |
| |
| if self.service_enabled(service) != enable: |
| logging.error('Failed to set initctl service to state %d', |
| enable) |
| return False |
| |
| if enable: |
| logging.info('Service {} enabled'.format(service)) |
| else: |
| logging.info('Service {} disabled'.format(service)) |
| |
| elif command == 'restart': |
| if self.service_enabled(service): |
| self.host.run('initctl {} {}'.format(command, service)) |
| else: |
| # Just start a stopped job. |
| self.host.run('initctl {} {}'.format('start', service)) |
| logging.info('Service {} restarted'.format(service)) |
| |
| else: |
| logging.error('unknown command {} on services {}'.format( |
| command, services)) |
| return False |
| |
| return True |
| |
| |
| def enable_disable_services(self, services, enable): |
| """Enable or disable service on the DUT |
| |
| @param services: list of string service names |
| @param enable: True to enable services, False to disable |
| |
| @returns: True if services were set successfully, else False |
| """ |
| command = 'start' if enable else 'stop' |
| return self._initctl_services(services, command) |
| |
| |
| def enable_disable_cellular(self, enable): |
| """Enable cellular services on the DUT |
| |
| @param enable: True to enable cellular services |
| False to disable cellular services |
| |
| @returns: True if services were set successfully, else False |
| """ |
| cellular_services = ['modemmanager', 'modemfwd'] |
| |
| return self.enable_disable_services(cellular_services, enable) |
| |
| |
| def enable_disable_ui(self, enable): |
| """Enable UI service on the DUT |
| |
| @param enable: True to enable UI services |
| False to disable UI services |
| |
| @returns: True if services were set successfully, else False |
| """ |
| ui_services = ['ui'] |
| |
| return self.enable_disable_services(ui_services, enable) |
| |
| |
| def restart_services(self, services): |
| """Restart a service on the DUT |
| |
| @param services: the services, e.g., ['cras',] |
| |
| @returns: True if services were set successfully, else False |
| """ |
| return self._initctl_services(services, 'restart') |
| |
| |
| def restart_cras(self): |
| """Restart the cras service on the DUT |
| |
| @returns: True if cras was restart successfully, else False |
| """ |
| return self.restart_services(['cras', ]) |
| |
| |
| def enable_disable_debug_log(self, enable): |
| """Enable or disable debug log in DUT |
| @param enable: True to enable all of the debug log, |
| False to disable all of the debug log. |
| """ |
| level = int(enable) |
| self.bluetooth_facade.set_debug_log_levels(level, level, level, level) |
| |
| |
| def start_new_btmon(self): |
| """ Start a new btmon process and save the log """ |
| |
| # Kill all btmon process before creating a new one |
| self.host.run('pkill btmon || true') |
| |
| # Make sure the directory exists |
| self.host.run('mkdir -p %s' % self.BTMON_DIR_LOG_PATH) |
| |
| # Time format. Ex, 2020_02_20_17_52_45 |
| now = time.strftime("%Y_%m_%d_%H_%M_%S") |
| file_name = 'btsnoop_%s' % now |
| self.host.run_background('btmon -SAw %s/%s' % (self.BTMON_DIR_LOG_PATH, |
| file_name)) |
| |
| def start_new_usbmon(self): |
| """ Start a new USBMON process and save the log """ |
| |
| # Kill all usbmon process before creating a new one |
| self.host.run('pkill tcpdump || true') |
| |
| # Make sure the directory exists |
| self.host.run('mkdir -p %s' % self.USBMON_DIR_LOG_PATH) |
| |
| # Time format. Ex, 2020_02_20_17_52_45 |
| now = time.strftime("%Y_%m_%d_%H_%M_%S") |
| file_name = 'usbmon_%s' % now |
| self.host.run_background('tcpdump -i usbmon0 -w %s/%s' % |
| (self.USBMON_DIR_LOG_PATH, file_name)) |
| |
| |
| def log_message(self, msg): |
| """ Write a string to log.""" |
| self.bluetooth_facade.log_message(msg) |
| |
| def is_wrt_supported(self): |
| """ Check if Bluetooth adapter support WRT logs. """ |
| return self.bluetooth_facade.is_wrt_supported() |
| |
| def enable_wrt_logs(self): |
| """ Enable WRT logs from Intel Adapters.""" |
| return self.bluetooth_facade.enable_wrt_logs() |
| |
| def collect_wrt_logs(self): |
| """ Collect WRT logs from Intel Adapters.""" |
| return self.bluetooth_facade.collect_wrt_logs() |
| |
| def get_num_connected_devices(self): |
| """ Return number of remote devices currently connected to the DUT. |
| |
| @returns: The number of devices known to bluez with the Connected |
| property active |
| """ |
| |
| num_connected_devices = 0 |
| for dev in self.bluetooth_facade.get_devices(): |
| if dev and dev.get('Connected', 0): |
| num_connected_devices += 1 |
| |
| return num_connected_devices |
| |
| @test_retry_and_log |
| def test_bluetoothd_running(self): |
| """Test that bluetoothd is running.""" |
| return self.bluetooth_facade.is_bluetoothd_running() |
| |
| |
| @test_retry_and_log |
| def test_start_bluetoothd(self): |
| """Test that bluetoothd could be started successfully.""" |
| return self.bluetooth_facade.start_bluetoothd() |
| |
| |
| @test_retry_and_log |
| def test_stop_bluetoothd(self): |
| """Test that bluetoothd could be stopped successfully.""" |
| return self.bluetooth_facade.stop_bluetoothd() |
| |
| |
| @test_retry_and_log |
| def test_has_adapter(self): |
| """Verify that there is an adapter. This will return True only if both |
| the kernel and bluetooth daemon see the adapter. |
| """ |
| return self.bluetooth_facade.has_adapter() |
| |
| @test_retry_and_log |
| def test_adapter_work_state(self): |
| """Test that the bluetooth adapter is in the correct working state. |
| |
| This includes that the adapter is detectable, is powered on, |
| and its hci device is hci0. |
| """ |
| has_adapter = self.bluetooth_facade.has_adapter() |
| is_powered_on = self._wait_for_condition( |
| self.bluetooth_facade.is_powered_on, method_name()) |
| hci = self.bluetooth_facade.get_hci() == self.EXPECTED_HCI |
| self.results = { |
| 'has_adapter': has_adapter, |
| 'is_powered_on': is_powered_on, |
| 'hci': hci} |
| return all(self.results.values()) |
| |
| @test_retry_and_log(False) |
| def test_adapter_wake_enabled(self): |
| """Test that the bluetooth adapter is wakeup enabled. |
| """ |
| wake_enabled = self._wait_for_condition( |
| self.bluetooth_facade.is_wake_enabled, method_name(), |
| timeout=self.ADAPTER_WAKE_ENABLE_TIMEOUT_SECS) |
| |
| self.results = { 'wake_enabled': wake_enabled } |
| return any(self.results.values()) |
| |
| @test_retry_and_log(False) |
| def test_device_wake_allowed(self, device_address): |
| """Test that given device can wake the system.""" |
| self.results = { |
| 'Wake allowed': |
| self.bluetooth_facade.get_device_property( |
| device_address, 'WakeAllowed') |
| } |
| |
| return all(self.results.values()) |
| |
| @test_retry_and_log(False) |
| def test_device_wake_not_allowed(self, device_address): |
| """Test that given device cannot wake the system.""" |
| self.results = { |
| 'Wake not allowed': |
| not self.bluetooth_facade.get_device_property( |
| device_address, 'WakeAllowed') |
| } |
| |
| return all(self.results.values()) |
| |
| @test_retry_and_log(False) |
| def test_adapter_set_wake_disabled(self): |
| """Disable wake and verify it was written. """ |
| success = self.bluetooth_facade.set_wake_enabled(False) |
| self.results = { 'disable_wake': success } |
| return all(self.results.values()) |
| |
| @test_retry_and_log |
| def test_power_on_adapter(self): |
| """Test that the adapter could be powered on successfully.""" |
| power_on = self.bluetooth_facade.set_powered(True) |
| is_powered_on = self._wait_for_condition( |
| self.bluetooth_facade.is_powered_on, method_name()) |
| |
| self.results = {'power_on': power_on, 'is_powered_on': is_powered_on} |
| return all(self.results.values()) |
| |
| |
| @test_retry_and_log |
| def test_power_off_adapter(self): |
| """Test that the adapter could be powered off successfully.""" |
| power_off = self.bluetooth_facade.set_powered(False) |
| is_powered_off = self._wait_for_condition( |
| lambda: not self.bluetooth_facade.is_powered_on(), |
| method_name()) |
| |
| self.results = { |
| 'power_off': power_off, |
| 'is_powered_off': is_powered_off} |
| return all(self.results.values()) |
| |
| |
| @test_retry_and_log |
| def test_reset_on_adapter(self): |
| """Test that the adapter could be reset on successfully. |
| |
| This includes restarting bluetoothd, and removing the settings |
| and cached devices. |
| """ |
| reset_on = self.bluetooth_facade.reset_on() |
| is_powered_on = self._wait_for_condition( |
| self.bluetooth_facade.is_powered_on, method_name()) |
| |
| self.results = {'reset_on': reset_on, 'is_powered_on': is_powered_on} |
| return all(self.results.values()) |
| |
| |
| @test_retry_and_log |
| def test_reset_off_adapter(self): |
| """Test that the adapter could be reset off successfully. |
| |
| This includes restarting bluetoothd, and removing the settings |
| and cached devices. |
| """ |
| reset_off = self.bluetooth_facade.reset_off() |
| is_powered_off = self._wait_for_condition( |
| lambda: not self.bluetooth_facade.is_powered_on(), |
| method_name()) |
| |
| self.results = { |
| 'reset_off': reset_off, |
| 'is_powered_off': is_powered_off} |
| return all(self.results.values()) |
| |
| |
| def test_is_powered_off(self): |
| """Check if the adapter is powered off.""" |
| is_powered_off = not self.bluetooth_facade.is_powered_on() |
| self.results = {'is_powered_off': is_powered_off} |
| return all(self.results.values()) |
| |
| |
| @test_retry_and_log(False) |
| def test_is_facade_valid(self): |
| """Checks whether the bluetooth facade is in a good state. |
| |
| If bluetoothd restarts (i.e. due to a crash), the object proxies will no |
| longer be valid (because the session will be closed). Check whether the |
| session failed and wait for a new session if it did. |
| """ |
| initially_ok = self.bluetooth_facade.is_bluetoothd_valid() |
| bluez_started = initially_ok or self.bluetooth_facade.start_bluetoothd() |
| |
| self.results = { |
| 'initially_ok': initially_ok, |
| 'bluez_started': bluez_started |
| } |
| return all(self.results.values()) |
| |
| |
| @test_retry_and_log(False) |
| def test_is_adapter_valid(self): |
| """Verify the bluetooth adapter is retrievable at test start |
| |
| @raises: error.TestNAError if we fail to retrieve the adapter on |
| an unsupported chipset |
| error.TestFail if we fail to retrieve the adapter on any other |
| platform |
| |
| @returns: True if the adapter was located properly |
| """ |
| |
| if not self.bluetooth_facade.has_adapter(): |
| logging.error('No adapter available, rebooting to recover') |
| |
| self.reboot() |
| |
| chipset = self.get_chipset_name() |
| |
| if not chipset: |
| raise error.TestFail('Unknown adapter is missing') |
| |
| # A missing adapter is a rare but known issue on several platforms |
| # that have no vendor support (b/169328792). Since there is no fix |
| # possible, we forgive these failures by raising a TestNA. |
| if chipset in UNSUPPORTED_CHIPSETS: |
| raise error.TestNAError('Unsupported adapter is missing') |
| |
| raise error.TestFail('Adapter is missing') |
| |
| return True |
| |
| @test_retry_and_log |
| def test_UUIDs(self): |
| """Test that basic profiles are supported.""" |
| adapter_UUIDs = self.bluetooth_facade.get_UUIDs() |
| self.results = [uuid for uuid in self.SUPPORTED_UUIDS.values() |
| if uuid not in adapter_UUIDs] |
| return not bool(self.results) |
| |
| |
| @test_retry_and_log |
| def test_start_discovery(self): |
| """Test that the adapter could start discovery.""" |
| start_discovery, _ = self.bluetooth_facade.start_discovery() |
| is_discovering = self._wait_for_condition( |
| self.bluetooth_facade.is_discovering, method_name()) |
| |
| self.results = { |
| 'start_discovery': start_discovery, |
| 'is_discovering': is_discovering} |
| return all(self.results.values()) |
| |
| @test_retry_and_log(False) |
| def test_is_discovering(self): |
| """Test that the adapter is already discovering.""" |
| is_discovering = self._wait_for_condition( |
| self.bluetooth_facade.is_discovering, method_name()) |
| |
| self.results = {'is_discovering': is_discovering} |
| return all(self.results.values()) |
| |
| @test_retry_and_log |
| def test_stop_discovery(self): |
| """Test that the adapter could stop discovery.""" |
| if not self.bluetooth_facade.is_discovering(): |
| return True |
| |
| stop_discovery, _ = self.bluetooth_facade.stop_discovery() |
| is_not_discovering = self._wait_for_condition( |
| lambda: not self.bluetooth_facade.is_discovering(), |
| method_name()) |
| |
| self.results = { |
| 'stop_discovery': stop_discovery, |
| 'is_not_discovering': is_not_discovering} |
| return all(self.results.values()) |
| |
| |
| @test_retry_and_log |
| def test_discoverable(self): |
| """Test that the adapter could be set discoverable.""" |
| set_discoverable = self.bluetooth_facade.set_discoverable(True) |
| is_discoverable = self._wait_for_condition( |
| self.bluetooth_facade.is_discoverable, method_name()) |
| |
| self.results = { |
| 'set_discoverable': set_discoverable, |
| 'is_discoverable': is_discoverable} |
| return all(self.results.values()) |
| |
| @test_retry_and_log(False) |
| def test_is_discoverable(self): |
| """Test that the adapter is discoverable.""" |
| is_discoverable = self._wait_for_condition( |
| self.bluetooth_facade.is_discoverable, method_name()) |
| |
| self.results = {'is_discoverable': is_discoverable} |
| return all(self.results.values()) |
| |
| |
| def _test_timeout_property(self, set_property, check_property, set_timeout, |
| get_timeout, property_name, |
| timeout_values = [0, 60, 180]): |
| """Common method to test (Discoverable/Pairable)Timeout . |
| |
| This is used to test |
| - DiscoverableTimeout property |
| - PairableTimeout property |
| |
| The test performs the following |
| - Set PropertyTimeout |
| - Read PropertyTimeout and make sure values match |
| - Set adapter propety |
| - In a loop check if property is active |
| - Test fails property is false before timeout |
| - Test fails property is True after timeout |
| Repeat the test for different values for timeout |
| |
| Note : Value of 0 mean it never timeouts, so the test will |
| end after 30 seconds. |
| """ |
| def check_timeout(timeout): |
| """Check for timeout value in loop while recording failures.""" |
| actual_timeout = get_timeout() |
| if timeout != actual_timeout: |
| logging.debug('%s timeout value read %s does not ' |
| 'match value set %s, yet', property_name, |
| actual_timeout, timeout) |
| return False |
| else: |
| return True |
| |
| def _test_timeout_property(timeout): |
| # minium time after timeout before checking property |
| MIN_DELTA_SECS = 3 |
| # Time between checking property |
| WAIT_TIME_SECS = 2 |
| |
| # Set and read back the timeout value |
| if not set_timeout(timeout): |
| logging.error('Setting the %s timeout failed',property_name) |
| return False |
| |
| |
| if not self._wait_for_condition(lambda : check_timeout(timeout), |
| 'check_'+property_name): |
| logging.error('checking %s_timeout value timed out', |
| property_name) |
| return False |
| |
| # |
| # Check that the timeout works |
| # Check property is true until timeout |
| # and then it is not |
| |
| property_set = set_property(True) |
| property_is_true = self._wait_for_condition(check_property, |
| method_name()) |
| |
| self.results = { 'set_%s' % property_name : property_set, |
| 'is_%s' % property_name: property_is_true} |
| logging.debug(self.results) |
| |
| if not all(self.results.values()): |
| logging.error('Setting %s failed',property_name) |
| return False |
| |
| start_time = time.time() |
| while True: |
| time.sleep(WAIT_TIME_SECS) |
| cur_time = time.time() |
| property_set = check_property() |
| time_elapsed = cur_time - start_time |
| |
| # Ignore check_property results made near the timeout |
| # to avoid spurious failures. |
| if abs(int(timeout - time_elapsed)) < MIN_DELTA_SECS: |
| continue |
| |
| # Timeout of zero seconds mean that the adapter never times out |
| # Check for 30 seconds and then exit the test. |
| if timeout == 0: |
| if not property_set: |
| logging.error('Adapter is not %s after %.2f ' |
| 'secs with a timeout of zero ', |
| property_name, time_elapsed) |
| return False |
| elif time_elapsed > 30: |
| logging.debug('Adapter %s after %.2f seconds ' |
| 'with timeout of zero as expected' , |
| property_name, time_elapsed) |
| return True |
| continue |
| |
| # |
| # Check if property is true till timeout ends and |
| # false afterwards |
| # |
| if time_elapsed < timeout: |
| if not property_set: |
| logging.error('Adapter is not %s after %.2f ' |
| 'secs before timeout of %.2f', |
| property_name, time_elapsed, timeout) |
| return False |
| else: |
| if property_set: |
| logging.error('Adapter is still %s after ' |
| ' %.2f secs with timeout of %.2f', |
| property_name, time_elapsed, timeout) |
| return False |
| else: |
| logging.debug('Adapter not %s after %.2f ' |
| 'secs with timeout of %.2f as expected ', |
| property_name, time_elapsed, timeout) |
| return True |
| |
| default_value = check_property() |
| default_timeout = get_timeout() |
| |
| result = [] |
| try: |
| for timeout in timeout_values: |
| result.append(_test_timeout_property(timeout)) |
| logging.debug("Test returning %s", all(result)) |
| return all(result) |
| except: |
| logging.error("exception in test_%s_timeout",property_name) |
| raise |
| finally: |
| # Set the property back to default value permanently before |
| # exiting the test |
| set_timeout(0) |
| set_property(default_value) |
| # Set the timeout back to default value before exiting the test |
| set_timeout(default_timeout) |
| |
| |
| @test_retry_and_log |
| def test_discoverable_timeout(self, timeout_values = [0, 60, 180]): |
| """Test adapter dbus property DiscoverableTimeout.""" |
| return self._test_timeout_property( |
| set_property = self.bluetooth_facade.set_discoverable, |
| check_property = self.bluetooth_facade.is_discoverable, |
| set_timeout = self.bluetooth_facade.set_discoverable_timeout, |
| get_timeout = self.bluetooth_facade.get_discoverable_timeout, |
| property_name = 'discoverable', |
| timeout_values = timeout_values) |
| |
| @test_retry_and_log |
| def test_pairable_timeout(self, timeout_values = [0, 60, 180]): |
| """Test adapter dbus property PairableTimeout.""" |
| return self._test_timeout_property( |
| set_property = self.bluetooth_facade.set_pairable, |
| check_property = self.bluetooth_facade.is_pairable, |
| set_timeout = self.bluetooth_facade.set_pairable_timeout, |
| get_timeout = self.bluetooth_facade.get_pairable_timeout, |
| property_name = 'pairable', |
| timeout_values = timeout_values) |
| |
| |
| @test_retry_and_log |
| def test_nondiscoverable(self): |
| """Test that the adapter could be set non-discoverable.""" |
| set_nondiscoverable = self.bluetooth_facade.set_discoverable(False) |
| is_nondiscoverable = self._wait_for_condition( |
| lambda: not self.bluetooth_facade.is_discoverable(), |
| method_name()) |
| |
| self.results = { |
| 'set_nondiscoverable': set_nondiscoverable, |
| 'is_nondiscoverable': is_nondiscoverable} |
| return all(self.results.values()) |
| |
| |
| @test_retry_and_log |
| def test_pairable(self): |
| """Test that the adapter could be set pairable.""" |
| set_pairable = self.bluetooth_facade.set_pairable(True) |
| is_pairable = self._wait_for_condition( |
| self.bluetooth_facade.is_pairable, method_name()) |
| |
| self.results = { |
| 'set_pairable': set_pairable, |
| 'is_pairable': is_pairable} |
| return all(self.results.values()) |
| |
| |
| @test_retry_and_log |
| def test_nonpairable(self): |
| """Test that the adapter could be set non-pairable.""" |
| set_nonpairable = self.bluetooth_facade.set_pairable(False) |
| is_nonpairable = self._wait_for_condition( |
| lambda: not self.bluetooth_facade.is_pairable(), method_name()) |
| |
| self.results = { |
| 'set_nonpairable': set_nonpairable, |
| 'is_nonpairable': is_nonpairable} |
| return all(self.results.values()) |
| |
| |
| @test_retry_and_log(False) |
| def test_check_valid_adapter_id(self): |
| """Fail if the Bluetooth ID is not in the correct format. |
| |
| @returns True if adapter ID follows expected format, False otherwise |
| """ |
| |
| device = self.get_base_platform_name() |
| adapter_info = self.get_adapter_properties() |
| |
| # Don't complete test if this is a reference board |
| if device in self.REFERENCE_BOARDS: |
| return True |
| |
| modalias = adapter_info['Modalias'] |
| logging.debug('Saw Bluetooth ID of: %s', modalias) |
| |
| # Valid Device ID is: |
| # <00E0(Google)>/<C405(Chrome OS)>/<non-zero versionNumber> |
| bt_format = 'bluetooth:v00E0pC405d(?!0000)' |
| |
| if not re.match(bt_format, modalias): |
| return False |
| |
| return True |
| |
| |
| @test_retry_and_log(False) |
| def test_check_valid_alias(self): |
| """Fail if the Bluetooth alias is not in the correct format. |
| |
| @returns True if adapter alias follows expected format, False otherwise |
| """ |
| |
| device = self.get_base_platform_name() |
| adapter_info = self.get_adapter_properties() |
| |
| # Don't complete test if this is a reference board |
| if device in self.REFERENCE_BOARDS: |
| return True |
| |
| alias = adapter_info['Alias'] |
| logging.debug('Saw Bluetooth Alias of: %s', alias) |
| |
| device_type = self.host.get_board_type().lower() |
| alias_format = '%s_[a-z0-9]{4}' % device_type |
| |
| self.results = {} |
| |
| alias_was_correct = True |
| if not re.match(alias_format, alias.lower()): |
| alias_was_correct = False |
| logging.info('unexpected alias %s found', alias) |
| self.results['alias_found'] = alias |
| |
| self.results['alias_was_correct'] = alias_was_correct |
| return all(self.results.values()) |
| |
| |
| # ------------------------------------------------------------------- |
| # Tests about general discovering, pairing, and connection |
| # ------------------------------------------------------------------- |
| |
| |
| @test_retry_and_log(False) |
| def test_discover_device(self, |
| device_address, |
| start_discovery=True, |
| stop_discovery=True): |
| """Test that the adapter could discover the specified device address. |
| |
| @param device_address: Address of the device. |
| @param start_discovery: Whether to start discovery. Set to False if you |
| call start_discovery before calling this. |
| @param stop_discovery: Whether to stop discovery at the end. If this is |
| set to False, make sure to call |
| test_stop_discovery afterwards. |
| |
| @returns: True if the device is found. False otherwise. |
| |
| """ |
| has_device_initially = False |
| discovery_stopped = False |
| is_not_discovering = False |
| device_discovered = False |
| # If start discovery is not set, discovery must already be started |
| discovery_started = not start_discovery |
| has_device = self.bluetooth_facade.has_device |
| |
| if has_device(device_address): |
| has_device_initially = True |
| else: |
| if start_discovery: |
| discovery_started = self.bluetooth_facade.start_discovery() |
| |
| if discovery_started: |
| try: |
| utils.poll_for_condition( |
| condition=(lambda: has_device(device_address)), |
| timeout=self.ADAPTER_DISCOVER_TIMEOUT_SECS, |
| sleep_interval= |
| self.ADAPTER_DISCOVER_POLLING_SLEEP_SECS, |
| desc='Waiting for discovering %s' % device_address) |
| device_discovered = True |
| except utils.TimeoutError as e: |
| logging.error('test_discover_device: %s', e) |
| except Exception as e: |
| logging.error('test_discover_device: %s', e) |
| err = ('bluetoothd probably crashed.' |
| 'Check out /var/log/messages') |
| logging.error(err) |
| except: |
| logging.error('test_discover_device: unexpected error') |
| |
| if start_discovery and stop_discovery: |
| discovery_stopped, _ = self.bluetooth_facade.stop_discovery() |
| is_not_discovering = self._wait_for_condition( |
| lambda: not self.bluetooth_facade.is_discovering(), |
| method_name()) |
| |
| self.results = { |
| 'has_device_initially': has_device_initially, |
| 'should_start_discovery': start_discovery, |
| 'should_stop_discovery': stop_discovery, |
| 'start_discovery': discovery_started, |
| 'stop_discovery': discovery_stopped, |
| 'is_not_discovering': is_not_discovering, |
| 'device_discovered': device_discovered} |
| |
| # Make sure a discovered device properly started and stopped discovery |
| device_found = device_discovered and discovery_started and ( |
| discovery_stopped and is_not_discovering |
| if stop_discovery else True) |
| |
| return has_device_initially or device_found |
| |
| |
| def _test_discover_by_device(self, device): |
| return device.Discover(self.bluetooth_facade.address) |
| |
| @test_retry_and_log(False, messages_start=False, messages_stop=False) |
| def test_discover_by_device(self, device): |
| """Test that the device could discover the adapter address. |
| |
| @param device: Meta device to represent peer device. |
| |
| @returns: True if the adapter is found by the device. |
| """ |
| adapter_discovered = False |
| discover_by_device = self._test_discover_by_device |
| discovered_initially = discover_by_device(device) |
| |
| if not discovered_initially: |
| try: |
| utils.poll_for_condition( |
| condition=(lambda: discover_by_device(device)), |
| timeout=self.ADAPTER_DISCOVER_TIMEOUT_SECS, |
| sleep_interval= |
| self.ADAPTER_DISCOVER_POLLING_SLEEP_SECS, |
| desc='Waiting for adapter to be discovered') |
| adapter_discovered = True |
| except utils.TimeoutError as e: |
| logging.error('test_discover_by_device: %s', e) |
| except Exception as e: |
| logging.error('test_discover_by_device: %s', e) |
| err = ('bluetoothd probably crashed.' |
| 'Check out /var/log/messages') |
| logging.error(err) |
| except: |
| logging.error('test_discover_by_device: unexpected error') |
| |
| self.results = { |
| 'adapter_discovered_initially': discovered_initially, |
| 'adapter_discovered': adapter_discovered |
| } |
| return any(self.results.values()) |
| |
| @test_retry_and_log(False, messages_start=False, messages_stop=False) |
| def test_discover_by_device_fails(self, device): |
| """Test that the device could not discover the adapter address. |
| |
| @param device: Meta device to represent peer device. |
| |
| @returns False if the adapter is found by the device. |
| """ |
| self.results = { |
| 'adapter_discovered': self._test_discover_by_device(device) |
| } |
| return not any(self.results.values()) |
| |
| @test_retry_and_log(False, messages_start=False, messages_stop=False) |
| def test_device_set_discoverable(self, device, discoverable): |
| """Test that we could set the peer device to discoverable. """ |
| try: |
| device.SetDiscoverable(discoverable) |
| except: |
| return False |
| |
| return True |
| |
| @test_retry_and_log |
| def test_pairing(self, device_address, pin, trusted=True): |
| """Test that the adapter could pair with the device successfully. |
| |
| @param device_address: Address of the device. |
| @param pin: pin code to pair with the device. |
| @param trusted: indicating whether to set the device trusted. |
| |
| @returns: True if pairing succeeds. False otherwise. |
| |
| """ |
| |
| def _pair_device(): |
| """Pair to the device. |
| |
| @returns: True if it could pair with the device. False otherwise. |
| |
| """ |
| return self.bluetooth_facade.pair_legacy_device( |
| device_address, pin, trusted, |
| self.ADAPTER_PAIRING_TIMEOUT_SECS) |
| |
| |
| def _verify_connection_info(): |
| """Verify that connection info to device is retrievable. |
| |
| @returns: True if the connection info is retrievable. |
| False otherwise. |
| """ |
| return (self.bluetooth_facade.get_connection_info(device_address) |
| is not None) |
| |
| def _verify_connected(): |
| """Verify the device is connected. |
| |
| @returns: True if the device is connected, False otherwise. |
| """ |
| return self.bluetooth_facade.device_is_connected(device_address) |
| |
| has_device = False |
| paired = False |
| connected = False |
| connection_info_retrievable = False |
| connected_devices = self.get_num_connected_devices() |
| |
| if self.bluetooth_facade.has_device(device_address): |
| has_device = True |
| try: |
| utils.poll_for_condition( |
| condition=_pair_device, |
| timeout=self.ADAPTER_PAIRING_TIMEOUT_SECS, |
| sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS, |
| desc='Waiting for pairing %s' % device_address) |
| paired = True |
| except utils.TimeoutError as e: |
| logging.error('test_pairing: %s', e) |
| except: |
| logging.error('test_pairing: unexpected error') |
| |
| connection_info_retrievable = _verify_connection_info() |
| connected = _verify_connected() |
| |
| self.results = { |
| 'has_device': has_device, |
| 'paired': paired, |
| 'connected': connected, |
| 'connection_info_retrievable': connection_info_retrievable, |
| 'connection_num': connected_devices + 1 |
| } |
| |
| return all(self.results.values()) |
| |
| @test_retry_and_log |
| def test_remove_pairing(self, device_address): |
| """Test that the adapter could remove the paired device. |
| |
| @param device_address: Address of the device. |
| |
| @returns: True if the device is removed successfully. False otherwise. |
| |
| """ |
| device_is_paired_initially = self.bluetooth_facade.device_is_paired( |
| device_address) |
| remove_pairing = False |
| pairing_removed = False |
| |
| if device_is_paired_initially: |
| remove_pairing = self.bluetooth_facade.remove_device_object( |
| device_address) |
| pairing_removed = not self.bluetooth_facade.device_is_paired( |
| device_address) |
| |
| self.results = { |
| 'device_is_paired_initially': device_is_paired_initially, |
| 'remove_pairing': remove_pairing, |
| 'pairing_removed': pairing_removed} |
| return all(self.results.values()) |
| |
| |
| def test_set_trusted(self, device_address, trusted=True): |
| """Test whether the device with the specified address is trusted. |
| |
| @param device_address: Address of the device. |
| @param trusted : True or False indicating if trusted is expected. |
| |
| @returns: True if the device's "Trusted" property is as specified; |
| False otherwise. |
| |
| """ |
| |
| set_trusted = self.bluetooth_facade.set_trusted( |
| device_address, trusted) |
| |
| actual_trusted = self.bluetooth_facade.get_device_property( |
| device_address, 'Trusted') |
| |
| self.results = { |
| 'set_trusted': set_trusted, |
| 'actual trusted': actual_trusted, |
| 'expected trusted': trusted} |
| return actual_trusted == trusted |
| |
| |
| @test_retry_and_log |
| def test_connection_by_adapter(self, device_address): |
| """Test that the adapter of dut could connect to the device successfully |
| |
| It is the caller's responsibility to pair to the device before |
| doing connection. |
| |
| @param device_address: Address of the device. |
| |
| @returns: True if connection is performed. False otherwise. |
| |
| """ |
| |
| def _connect_device(): |
| """Connect to the device. |
| |
| @returns: True if it could connect to the device. False otherwise. |
| |
| """ |
| return self.bluetooth_facade.connect_device(device_address) |
| |
| |
| has_device = False |
| connected = False |
| if self.bluetooth_facade.has_device(device_address): |
| has_device = True |
| try: |
| utils.poll_for_condition( |
| condition=_connect_device, |
| timeout=self.ADAPTER_PAIRING_TIMEOUT_SECS, |
| sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS, |
| desc='Waiting for connecting to %s' % device_address) |
| connected = True |
| except utils.TimeoutError as e: |
| logging.error('test_connection_by_adapter: %s', e) |
| except: |
| logging.error('test_connection_by_adapter: unexpected error') |
| |
| self.results = {'has_device': has_device, 'connected': connected} |
| return all(self.results.values()) |
| |
| |
| @test_retry_and_log |
| def test_disconnection_by_adapter(self, device_address): |
| """Test that the adapter of dut could disconnect the device successfully |
| |
| @param device_address: Address of the device. |
| |
| @returns: True if disconnection is performed. False otherwise. |
| |
| """ |
| return self.bluetooth_facade.disconnect_device(device_address) |
| |
| |
| def _enter_command_mode(self, device): |
| """Let the device enter command mode. |
| |
| Before using the device, need to call this method to make sure |
| it is in the command mode. |
| |
| @param device: the bluetooth HID device |
| |
| @returns: True if successful. False otherwise. |
| |
| """ |
| result = _is_successful(_run_method(device.EnterCommandMode, |
| 'EnterCommandMode')) |
| if not result: |
| logging.error('EnterCommandMode failed') |
| return result |
| |
| |
| @test_retry_and_log |
| def test_connection_by_device(self, device): |
| """Test that the device could connect to the adapter successfully. |
| |
| This emulates the behavior that a device may initiate a |
| connection request after waking up from power saving mode. |
| |
| @param device: the bluetooth HID device |
| |
| @returns: True if connection is performed correctly by device and |
| the adapter also enters connection state. |
| False otherwise. |
| |
| """ |
| if not self._enter_command_mode(device): |
| return False |
| |
| method_name = 'test_connection_by_device' |
| connection_by_device = False |
| adapter_address = self.bluetooth_facade.address |
| try: |
| connection_by_device = device.ConnectToRemoteAddress( |
| adapter_address) |
| except Exception as e: |
| logging.error('%s (device): %s', method_name, e) |
| except: |
| logging.error('%s (device): unexpected error', method_name) |
| |
| connection_seen_by_adapter = False |
| device_address = device.address |
| device_is_connected = self.bluetooth_facade.device_is_connected |
| try: |
| utils.poll_for_condition( |
| condition=lambda: device_is_connected(device_address), |
| timeout=self.ADAPTER_CONNECTION_TIMEOUT_SECS, |
| desc=('Waiting for connection from %s' % device_address)) |
| connection_seen_by_adapter = True |
| |
| # Although the connect may be complete, it can take a few |
| # seconds for the input device to be ready for use |
| time.sleep(self.ADAPTER_HID_INPUT_DELAY) |
| except utils.TimeoutError as e: |
| logging.error('%s (adapter): %s', method_name, e) |
| except: |
| logging.error('%s (adapter): unexpected error', method_name) |
| |
| self.results = { |
| 'connection_by_device': connection_by_device, |
| 'connection_seen_by_adapter': connection_seen_by_adapter} |
| return all(self.results.values()) |
| |
| @test_retry_and_log(True, messages_start=False, messages_stop=False) |
| def test_connection_by_device_only(self, device, adapter_address): |
| """Test that the device could connect to adapter successfully. |
| |
| This is a modified version of test_connection_by_device that only |
| communicates with the peer device and not the host (in case the host is |
| suspended for example). |
| |
| @param device: the bluetooth peer device |
| @param adapter_address: address of the adapter |
| |
| @returns: True if the connection was established by the device or False. |
| """ |
| connected = device.ConnectToRemoteAddress(adapter_address) |
| if connected: |
| # Although the connect may be complete, it can take a few |
| # seconds for the input device to be ready for use |
| time.sleep(self.ADAPTER_HID_INPUT_DELAY) |
| |
| self.results = { |
| 'connection_by_device': connected |
| } |
| |
| return all(self.results.values()) |
| |
| |
| @test_retry_and_log |
| def test_disconnection_by_device(self, device): |
| """Test that the device could disconnect the adapter successfully. |
| |
| This emulates the behavior that a device may initiate a |
| disconnection request before going into power saving mode. |
| |
| Note: should not try to enter command mode in this method. When |
| a device is connected, there is no way to enter command mode. |
| One could just issue a special disconnect command without |
| entering command mode. |
| |
| @param device: the bluetooth HID device |
| |
| @returns: True if disconnection is performed correctly by device and |
| the adapter also observes the disconnection. |
| False otherwise. |
| |
| """ |
| method_name = 'test_disconnection_by_device' |
| disconnection_by_device = False |
| try: |
| device.Disconnect() |
| disconnection_by_device = True |
| except Exception as e: |
| logging.error('%s (device): %s', method_name, e) |
| except: |
| logging.error('%s (device): unexpected error', method_name) |
| |
| disconnection_seen_by_adapter = False |
| device_address = device.address |
| device_is_connected = self.bluetooth_facade.device_is_connected |
| try: |
| utils.poll_for_condition( |
| condition=lambda: not device_is_connected(device_address), |
| timeout=self.ADAPTER_DISCONNECTION_TIMEOUT_SECS, |
| desc=('Waiting for disconnection from %s' % device_address)) |
| disconnection_seen_by_adapter = True |
| except utils.TimeoutError as e: |
| logging.error('%s (adapter): %s', method_name, e) |
| except: |
| logging.error('%s (adapter): unexpected error', method_name) |
| |
| self.results = { |
| 'disconnection_by_device': disconnection_by_device, |
| 'disconnection_seen_by_adapter': disconnection_seen_by_adapter} |
| return all(self.results.values()) |
| |
| |
| @test_retry_and_log(False) |
| def test_device_is_connected(self, device_address): |
| """Test that device address given is currently connected. |
| |
| @param device_address: Address of the device. |
| |
| @returns: True if the device is connected. |
| False otherwise. |
| """ |
| |
| def _is_connected(): |
| """Test if device is connected. |
| |
| @returns: True if device is connected. False otherwise. |
| |
| """ |
| return self.bluetooth_facade.device_is_connected(device_address) |
| |
| |
| method_name = 'test_device_is_connected' |
| has_device = False |
| connected = False |
| if self.bluetooth_facade.has_device(device_address): |
| has_device = True |
| try: |
| utils.poll_for_condition( |
| condition=_is_connected, |
| timeout=self.ADAPTER_CONNECTION_TIMEOUT_SECS, |
| sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS, |
| desc='Waiting to check connection to %s' % |
| device_address) |
| connected = True |
| except utils.TimeoutError as e: |
| logging.error('%s: %s', method_name, e) |
| except: |
| logging.error('%s: unexpected error', method_name) |
| self.results = {'has_device': has_device, 'connected': connected} |
| return all(self.results.values()) |
| |
| |
| @test_retry_and_log(False) |
| def test_device_is_not_connected(self, device_address): |
| """Test that device address given is NOT currently connected. |
| |
| @param device_address: Address of the device. |
| |
| @returns: True if the device is NOT connected. |
| False otherwise. |
| |
| """ |
| |
| def _is_not_connected(): |
| """Test if device is not connected. |
| |
| @returns: True if device is not connected. False otherwise. |
| |
| """ |
| return not self.bluetooth_facade.device_is_connected( |
| device_address) |
| |
| |
| method_name = 'test_device_is_not_connected' |
| not_connected = False |
| if self.bluetooth_facade.has_device(device_address): |
| try: |
| utils.poll_for_condition( |
| condition=_is_not_connected, |
| timeout=self.ADAPTER_CONNECTION_TIMEOUT_SECS, |
| sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS, |
| desc='Waiting to check connection to %s' % |
| device_address) |
| not_connected = True |
| except utils.TimeoutError as e: |
| logging.error('%s: %s', method_name, e) |
| except: |
| logging.error('%s: unexpected error', method_name) |
| raise |
| else: |
| not_connected = True |
| self.results = {'not_connected': not_connected} |
| return all(self.results.values()) |
| |
| |
| @test_retry_and_log |
| def test_device_is_paired(self, device_address): |
| """Test that the device address given is currently paired. |
| |
| @param device_address: Address of the device. |
| |
| @returns: True if the device is paired. |
| False otherwise. |
| |
| """ |
| def _is_paired(): |
| """Test if device is paired. |
| |
| @returns: True if device is paired. False otherwise. |
| |
| """ |
| return self.bluetooth_facade.device_is_paired(device_address) |
| |
| |
| method_name = 'test_device_is_paired' |
| has_device = False |
| paired = False |
| if self.bluetooth_facade.has_device(device_address): |
| has_device = True |
| try: |
| utils.poll_for_condition( |
| condition=_is_paired, |
| timeout=self.ADAPTER_PAIRING_TIMEOUT_SECS, |
| sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS, |
| desc='Waiting for connection to %s' % device_address) |
| paired = True |
| except utils.TimeoutError as e: |
| logging.error('%s: %s', method_name, e) |
| except: |
| logging.error('%s: unexpected error', method_name) |
| self.results = {'has_device': has_device, 'paired': paired} |
| return all(self.results.values()) |
| |
| |
| def _get_device_name(self, device_address): |
| """Get the device name. |
| |
| @returns: True if the device name is derived. None otherwise. |
| |
| """ |
| |
| self.discovered_device_name = self.bluetooth_facade.get_device_property( |
| device_address, 'Name') |
| |
| return bool(self.discovered_device_name) |
| |
| |
| @test_retry_and_log |
| def test_device_name(self, device_address, expected_device_name): |
| """Test that the device name discovered by the adapter is correct. |
| |
| @param device_address: Address of the device. |
| @param expected_device_name: the bluetooth device name |
| |
| @returns: True if the discovered_device_name is expected_device_name. |
| False otherwise. |
| |
| """ |
| try: |
| utils.poll_for_condition( |
| condition=lambda: self._get_device_name(device_address), |
| timeout=self.ADAPTER_DISCOVER_NAME_TIMEOUT_SECS, |
| sleep_interval=self.ADAPTER_DISCOVER_POLLING_SLEEP_SECS, |
| desc='Waiting for device name of %s' % device_address) |
| except utils.TimeoutError as e: |
| logging.error('test_device_name: %s', e) |
| except: |
| logging.error('test_device_name: unexpected error') |
| |
| self.results = { |
| 'expected_device_name': expected_device_name, |
| 'discovered_device_name': self.discovered_device_name} |
| return self.discovered_device_name == expected_device_name |
| |
| |
| @test_retry_and_log |
| def test_device_class_of_service(self, device_address, |
| expected_class_of_service): |
| """Test that the discovered device class of service is as expected. |
| |
| @param device_address: Address of the device. |
| @param expected_class_of_service: the expected class of service |
| |
| @returns: True if the discovered class of service matches the |
| expected class of service. False otherwise. |
| |
| """ |
| |
| device_class = self.bluetooth_facade.get_device_property(device_address, |
| 'Class') |
| discovered_class_of_service = (device_class & self.CLASS_OF_SERVICE_MASK |
| if device_class else None) |
| |
| self.results = { |
| 'device_class': device_class, |
| 'expected_class_of_service': expected_class_of_service, |
| 'discovered_class_of_service': discovered_class_of_service} |
| return discovered_class_of_service == expected_class_of_service |
| |
| |
| @test_retry_and_log |
| def test_device_class_of_device(self, device_address, |
| expected_class_of_device): |
| """Test that the discovered device class of device is as expected. |
| |
| @param device_address: Address of the device. |
| @param expected_class_of_device: the expected class of device |
| |
| @returns: True if the discovered class of device matches the |
| expected class of device. False otherwise. |
| |
| """ |
| |
| device_class = self.bluetooth_facade.get_device_property(device_address, |
| 'Class') |
| discovered_class_of_device = (device_class & self.CLASS_OF_DEVICE_MASK |
| if device_class else None) |
| |
| self.results = { |
| 'device_class': device_class, |
| 'expected_class_of_device': expected_class_of_device, |
| 'discovered_class_of_device': discovered_class_of_device} |
| return discovered_class_of_device == expected_class_of_device |
| |
| |
| def _get_btmon_log(self, method, logging_timespan=1): |
| """Capture the btmon log when executing the specified method. |
| |
| @param method: the method to capture log. |
| The method would be executed only when it is not None. |
| This allows us to simply capture btmon log without |
| executing any command. |
| @param logging_timespan: capture btmon log for logging_timespan seconds. |
| |
| """ |
| self.bluetooth_le_facade.btmon_start() |
| self.advertising_msg = method() if method else '' |
| time.sleep(logging_timespan) |
| self.bluetooth_le_facade.btmon_stop() |
| |
| |
| def convert_to_adv_jiffies(self, adv_interval_ms): |
| """Convert adv interval in ms to jiffies, i.e., multiples of 0.625 ms. |
| |
| @param adv_interval_ms: an advertising interval |
| |
| @returns: the equivalent jiffies |
| |
| """ |
| return adv_interval_ms / self.ADVERTISING_INTERVAL_UNIT |
| |
| |
| def compute_duration(self, max_adv_interval_ms): |
| """Compute duration from max_adv_interval_ms. |
| |
| Advertising duration is calculated approximately as |
| duration = max_adv_interval_ms / 1000.0 * 1.1 |
| |
| @param max_adv_interval_ms: max advertising interval in milliseconds. |
| |
| @returns: duration in seconds. |
| |
| """ |
| return max_adv_interval_ms / 1000.0 * 1.1 |
| |
| |
| def compute_logging_timespan(self, max_adv_interval_ms): |
| """Compute the logging timespan from max_adv_interval_ms. |
| |
| The logging timespan is the time needed to record btmon log. |
| |
| @param max_adv_interval_ms: max advertising interval in milliseconds. |
| |
| @returns: logging_timespan in seconds. |
| |
| """ |
| duration = self.compute_duration(max_adv_interval_ms) |
| logging_timespan = max(self.count_advertisements * duration, 1) |
| return logging_timespan |
| |
| |
| @test_retry_and_log(False) |
| def test_check_duration_and_intervals(self, min_adv_interval_ms, |
| max_adv_interval_ms, |
| number_advertisements): |
| """Verify that every advertisements are scheduled according to the |
| duration and intervals. |
| |
| An advertisement would be scheduled at the time span of |
| duration * number_advertisements |
| |
| @param min_adv_interval_ms: min advertising interval in milliseconds. |
| @param max_adv_interval_ms: max advertising interval in milliseconds. |
| @param number_advertisements: the number of existing advertisements |
| |
| @returns: True if all advertisements are scheduled based on the |
| duration and intervals. |
| |
| """ |
| |
| |
| def within_tolerance(expected, actual, max_error=0.1): |
| """Determine if the percent error is within specified tolerance. |
| |
| @param expected: The expected value. |
| @param actual: The actual (measured) value. |
| @param max_error: The maximum percent error acceptable. |
| |
| @returns: True if the percent error is less than or equal to |
| max_error. |
| """ |
| return abs(expected - actual) / abs(expected) <= max_error |
| |
| |
| start_str = 'Set Advertising Intervals:' |
| search_strings = ['HCI Command: LE Set Advertising Data', 'Company'] |
| search_str = '|'.join(search_strings) |
| |
| contents = self.bluetooth_le_facade.btmon_get(search_str=search_str, |
| start_str=start_str) |
| |
| # Company string looks like |
| # Company: not assigned (65283) |
| company_pattern = re.compile('Company:.*\((\d*)\)') |
| |
| # The string with timestamp looks like |
| # < HCI Command: LE Set Advertising Data (0x08|0x0008) [hci0] 3.799236 |
| set_adv_time_str = 'LE Set Advertising Data.*\[hci\d\].*(\d+\.\d+)' |
| set_adv_time_pattern = re.compile(set_adv_time_str) |
| |
| adv_timestamps = {} |
| timestamp = None |
| manufacturer_id = None |
| for line in contents: |
| result = set_adv_time_pattern.search(line) |
| if result: |
| timestamp = float(result.group(1)) |
| |
| result = company_pattern.search(line) |
| if result: |
| manufacturer_id = '0x%04x' % int(result.group(1)) |
| |
| if timestamp and manufacturer_id: |
| if manufacturer_id not in adv_timestamps: |
| adv_timestamps[manufacturer_id] = [] |
| adv_timestamps[manufacturer_id].append(timestamp) |
| timestamp = None |
| manufacturer_id = None |
| |
| duration = self.compute_duration(max_adv_interval_ms) |
| expected_timespan = duration * number_advertisements |
| |
| check_duration = True |
| for manufacturer_id, values in six.iteritems(adv_timestamps): |
| logging.debug('manufacturer_id %s: %s', manufacturer_id, values) |
| timespans = [values[i] - values[i - 1] |
| for i in range(1, len(values))] |
| errors = [timespans[i] for i in range(len(timespans)) |
| if not within_tolerance(expected_timespan, timespans[i])] |
| logging.debug('timespans: %s', timespans) |
| logging.debug('errors: %s', errors) |
| if bool(errors): |
| check_duration = False |
| |
| # Verify that the advertising intervals are also correct. |
| min_adv_interval_ms_found, max_adv_interval_ms_found = ( |
| self._verify_advertising_intervals(min_adv_interval_ms, |
| max_adv_interval_ms)) |
| |
| self.results = { |
| 'check_duration': check_duration, |
| 'max_adv_interval_ms_found': max_adv_interval_ms_found, |
| 'max_adv_interval_ms_found': max_adv_interval_ms_found, |
| } |
| return all(self.results.values()) |
| |
| |
| def _get_min_max_intervals_strings(self, min_adv_interval_ms, |
| max_adv_interval_ms): |
| """Get the min and max advertising intervals strings shown in btmon. |
| |
| Advertising intervals shown in the btmon log look like |
| Min advertising interval: 1280.000 msec (0x0800) |
| Max advertising interval: 1280.000 msec (0x0800) |
| |
| @param min_adv_interval_ms: min advertising interval in milliseconds. |
| @param max_adv_interval_ms: max advertising interval in milliseconds. |
| |
| @returns: the min and max intervals strings. |
| |
| """ |
| min_str = ('Min advertising interval: %.3f msec (0x%04x)' % |
| (min_adv_interval_ms, |
| min_adv_interval_ms / self.ADVERTISING_INTERVAL_UNIT)) |
| logging.debug('min_adv_interval_ms: %s', min_str) |
| |
| max_str = ('Max advertising interval: %.3f msec (0x%04x)' % |
| (max_adv_interval_ms, |
| max_adv_interval_ms / self.ADVERTISING_INTERVAL_UNIT)) |
| logging.debug('max_adv_interval_ms: %s', max_str) |
| |
| return (min_str, max_str) |
| |
| |
| def _verify_advertising_intervals(self, min_adv_interval_ms, |
| max_adv_interval_ms): |
| """Verify min and max advertising intervals. |
| |
| Advertising intervals look like |
| Min advertising interval: 1280.000 msec (0x0800) |
| Max advertising interval: 1280.000 msec (0x0800) |
| |
| @param min_adv_interval_ms: min advertising interval in milliseconds. |
| @param max_adv_interval_ms: max advertising interval in milliseconds. |
| |
| @returns: a tuple of (True, True) if both min and max advertising |
| intervals could be found. Otherwise, the corresponding element |
| in the tuple if False. |
| |
| """ |
| min_str, max_str = self._get_min_max_intervals_strings( |
| min_adv_interval_ms, max_adv_interval_ms) |
| |
| min_adv_interval_ms_found = self.bluetooth_le_facade.btmon_find(min_str) |
| max_adv_interval_ms_found = self.bluetooth_le_facade.btmon_find(max_str) |
| |
| return min_adv_interval_ms_found, max_adv_interval_ms_found |
| |
| |
| def _verify_scan_response_data(self, adv_data): |
| """Verify advertisement's scan response data is correct |
| |
| Unlike the other fixed advertising fields, Scan Response Data is set |
| in a tag-value data format. This function helps verify the data format |
| for specific tag values to ensure scan response was propagated correctly |
| |
| @param adv_data: Dictionary defining advertising fields to be registered |
| with bluetoothd daemon's RegisterAdvertisement interface |
| |
| @returns: True if all Registered Scan Response tags were located in |
| btmon trace, False otherwise |
| """ |
| |
| scan_rsp = adv_data.get('ScanResponseData') |
| if not scan_rsp: |
| return True |
| |
| for tag, data in scan_rsp.items(): |
| # Validate 16 bit Service Data tag |
| if int(tag, 16) == 0x16: |
| # First two bytes of data are endian-corrected UUID, followed |
| # by service data |
| uuid = '%x%x' % (data[1], data[0]) |
| data_str = ''.join( |
| ['%02x' % data[i] for i in range(2, len(data))]) |
| |
| # Service data has the following format in btmon trace: |
| # Service Data (UUID 0xfef3): 01020304 |
| search_str = 'Service Data (UUID 0x{}): {}'.format( |
| uuid, data_str) |
| |
| # Fail if data can't be located in btmon trace |
| if not self.bluetooth_le_facade.btmon_find(search_str): |
| return False |
| |
| return True |
| |
| def test_advertising_flags(self, flag_strs=[]): |
| """Verify that advertising flags are set in registered advertisement |
| |
| Each flag has a specific descriptor that appears in btmon trace. This |
| simple checker validates that the desired flag descriptors appear in |
| btmon trace when the advertisement was registered. |
| |
| @param flag_strs: Flag string descriptors expected in btmon trace |
| |
| #returns: True if all flag descriptors were located, False otherwise |
| """ |
| |
| for flag_str in flag_strs: |
| if not self.bluetooth_le_facade.btmon_find(flag_str): |
| logging.info( |
| 'Flag descriptor not located: {}'.format(flag_str)) |
| return False |
| |
| return True |
| |
| def ext_adv_enabled(self): |
| """ Check if platform supports extended advertising |
| |
| @returns True if extended advertising is supported, else False |
| """ |
| platform = self.get_base_platform_name() |
| return platform in EXT_ADV_MODELS |
| |
| |
| @test_retry_and_log(False) |
| def test_register_advertisement(self, advertisement_data, instance_id, |
| min_adv_interval_ms, max_adv_interval_ms): |
| """Verify that an advertisement is registered correctly. |
| |
| This test verifies the following data: |
| - advertisement added |
| - manufacturer data |
| - service UUIDs |
| - service data |
| - advertising intervals |
| - advertising enabled |
| |
| @param advertisement_data: the data of an advertisement to register. |
| @param instance_id: the instance id which starts at 1. |
| @param min_adv_interval_ms: min_adv_interval in milliseconds. |
| @param max_adv_interval_ms: max_adv_interval in milliseconds. |
| |
| @returns: True if the advertisement is registered correctly. |
| False otherwise. |
| |
| """ |
| # When registering a new advertisement, it is possible that another |
| # instance is advertising. It may need to wait for all other |
| # advertisements to complete advertising once. |
| self.count_advertisements += 1 |
| logging_timespan = self.compute_logging_timespan(max_adv_interval_ms) |
| self._get_btmon_log( |
| lambda: self.bluetooth_le_facade.register_advertisement( |
| advertisement_data), |
| logging_timespan=logging_timespan) |
| |
| # Verify that a new advertisement is added. |
| advertisement_added = ( |
| self.bluetooth_le_facade.btmon_find('Advertising Added') and |
| self.bluetooth_le_facade.btmon_find('Instance: %d' % |
| instance_id)) |
| |
| # Verify that the manufacturer data could be found. |
| manufacturer_data = advertisement_data.get('ManufacturerData', '') |
| manufacturer_data_found = True |
| for manufacturer_id in manufacturer_data: |
| # The 'not assigned' text below means the manufacturer id |
| # is not actually assigned to any real manufacturer. |
| # For real 16-bit manufacturer UUIDs, refer to |
| # https://www.bluetooth.com/specifications/assigned-numbers/16-bit-UUIDs-for-Members |
| manufacturer_data_found = self.bluetooth_le_facade.btmon_find( |
| 'Company: not assigned (%d)' % int(manufacturer_id, 16)) |
| |
| # Verify that all service UUIDs could be found. |
| service_uuids_found = True |
| for uuid in advertisement_data.get('ServiceUUIDs', []): |
| # Service UUIDs looks like ['0x180D', '0x180F'] |
| # Heart Rate (0x180D) |
| # Battery Service (0x180F) |
| # For actual 16-bit service UUIDs, refer to |
| # https://www.bluetooth.com/specifications/gatt/services |
| if not self.bluetooth_le_facade.btmon_find('0x%s' % uuid): |
| service_uuids_found = False |
| break |
| |
| # Verify service data. |
| service_data_found = True |
| for uuid, data in advertisement_data.get('ServiceData', {}).items(): |
| # A service data looks like |
| # Service Data (UUID 0x9999): 0001020304 |
| # while uuid is '9999' and data is [0x00, 0x01, 0x02, 0x03, 0x04] |
| data_str = ''.join(['%02x' % n for n in data]) |
| if not self.bluetooth_le_facade.btmon_find( |
| 'Service Data (UUID 0x%s): %s' % (uuid, data_str)): |
| service_data_found = False |
| break |
| |
| # Broadcast advertisements are overwritten in some kernel versions to |
| # be more aggressive. Verify that the advertising intervals are correct |
| # if this mode is not used |
| if advertisement_data.get('Type') != 'broadcast': |
| min_adv_interval_ms_found, max_adv_interval_ms_found = ( |
| self._verify_advertising_intervals(min_adv_interval_ms, |
| max_adv_interval_ms)) |
| |
| else: |
| min_adv_interval_ms_found = True |
| max_adv_interval_ms_found = True |
| |
| scan_rsp_correct = self._verify_scan_response_data(advertisement_data) |
| |
| # Verify advertising is enabled. |
| advertising_enabled = self.bluetooth_le_facade.btmon_find( |
| 'Advertising: Enabled (0x01)') |
| |
| self.results = { |
| 'advertisement_added': advertisement_added, |
| 'manufacturer_data_found': manufacturer_data_found, |
| 'service_uuids_found': service_uuids_found, |
| 'service_data_found': service_data_found, |
| 'min_adv_interval_ms_found': min_adv_interval_ms_found, |
| 'max_adv_interval_ms_found': max_adv_interval_ms_found, |
| 'scan_rsp_correct': scan_rsp_correct, |
| 'advertising_enabled': advertising_enabled, |
| } |
| return all(self.results.values()) |
| |
| |
| @test_retry_and_log(False) |
| def test_fail_to_register_advertisement(self, advertisement_data, |
| min_adv_interval_ms, |
| max_adv_interval_ms): |
| """Verify that failure is incurred when max advertisements are reached. |
| |
| This test verifies that a registration failure is incurred when |
| max advertisements are reached. The error message looks like: |
| |
| org.bluez.Error.Failed: Maximum advertisements reached |
| |
| @param advertisement_data: the advertisement to register. |
| @param min_adv_interval_ms: min_adv_interval in milliseconds. |
| @param max_adv_interval_ms: max_adv_interval in milliseconds. |
| |
| @returns: True if the error message is received correctly. |
| False otherwise. |
| |
| """ |
| logging_timespan = self.compute_logging_timespan(max_adv_interval_ms) |
| self._get_btmon_log( |
| lambda: self.bluetooth_le_facade.register_advertisement( |
| advertisement_data), |
| logging_timespan=logging_timespan) |
| |
| # Verify that it failed to register advertisement due to the fact |
| # that max advertisements are reached. |
| failed_to_register_error = (self.ERROR_FAILED_TO_REGISTER_ADVERTISEMENT |
| in self.advertising_msg) |
| |
| # Verify that no new advertisement is added. |
| advertisement_not_added = not self.bluetooth_le_facade.btmon_find( |
| 'Advertising Added:') |
| |
| self.results = { |
| 'failed_to_register_error': failed_to_register_error, |
| 'advertisement_not_added': advertisement_not_added, |
| } |
| |
| # If the registration fails and extended advertising is available, |
| # there will be no events in btmon. Therefore, we only run this part of |
| # the test if extended advertising is not available, indicating that |
| # software advertisement rotation is being used. |
| if not self.ext_adv_enabled(): |
| # Verify that the advertising intervals are correct. |
| min_adv_interval_ms_found, max_adv_interval_ms_found = ( |
| self._verify_advertising_intervals(min_adv_interval_ms, |
| max_adv_interval_ms)) |
| |
| # Verify advertising remains enabled. |
| advertising_enabled = self.bluetooth_le_facade.btmon_find( |
| 'Advertising: Enabled (0x01)') |
| |
| self.results.update({ |
| 'min_adv_interval_ms_found': min_adv_interval_ms_found, |
| 'max_adv_interval_ms_found': max_adv_interval_ms_found, |
| 'advertising_enabled': advertising_enabled, |
| }) |
| |
| return all(self.results.values()) |
| |
| |
| @test_retry_and_log(False) |
| def test_unregister_advertisement(self, advertisement_data, instance_id, |
| advertising_disabled): |
| """Verify that an advertisement is unregistered correctly. |
| |
| This test verifies the following data: |
| - advertisement removed |
| - advertising status: enabled if there are advertisements left; |
| disabled otherwise. |
| |
| @param advertisement_data: the data of an advertisement to unregister. |
| @param instance_id: the instance id of the advertisement to remove. |
| @param advertising_disabled: is advertising disabled? This happens |
| only when all advertisements are removed. |
| |
| @returns: True if the advertisement is unregistered correctly. |
| False otherwise. |
| |
| """ |
| self.count_advertisements -= 1 |
| self._get_btmon_log( |
| lambda: self.bluetooth_le_facade.unregister_advertisement( |
| advertisement_data)) |
| |
| # Verify that the advertisement is removed. |
| advertisement_removed = ( |
| self.bluetooth_le_facade.btmon_find('Advertising Removed') and |
| self.bluetooth_le_facade.btmon_find('Instance: %d' % |
| instance_id)) |
| |
| # If advertising_disabled is True, there should be no log like |
| # 'Advertising: Enabled (0x01)' |
| # If advertising_disabled is False, there should be log like |
| # 'Advertising: Enabled (0x01)' |
| |
| # Only need to check advertising status when the last advertisement |
| # is removed. For any other advertisements prior to the last one, |
| # we may or may not observe 'Advertising: Enabled (0x01)' message. |
| # Hence, the test would become flaky if we insist to see that message. |
| # A possible workaround is to sleep for a while and then check the |
| # message. The drawback is that we may need to wait up to 10 seconds |
| # if the advertising duration and intervals are long. |
| # In a test case, we always run test_check_duration_and_intervals() |
| # to check if advertising duration and intervals are correct after |
| # un-registering one or all advertisements, it is safe to do so. |
| advertising_enabled_found = self.bluetooth_le_facade.btmon_find( |
| 'Advertising: Enabled (0x01)') |
| advertising_disabled_found = self.bluetooth_le_facade.btmon_find( |
| 'Advertising: Disabled (0x00)') |
| advertising_status_correct = not advertising_disabled or ( |
| advertising_disabled_found and not advertising_enabled_found) |
| |
| self.results = { |
| 'advertisement_removed': advertisement_removed, |
| 'advertising_status_correct': advertising_status_correct, |
| } |
| return all(self.results.values()) |
| |
| |
| @test_retry_and_log(False) |
| def test_set_advertising_intervals(self, min_adv_interval_ms, |
| max_adv_interval_ms): |
| """Verify that new advertising intervals are set correctly. |
| |
| Note that setting advertising intervals does not enable/disable |
| advertising. Hence, there is no need to check the advertising |
| status. |
| |
| @param min_adv_interval_ms: the min advertising interval in ms. |
| @param max_adv_interval_ms: the max advertising interval in ms. |
| |
| @returns: True if the new advertising intervals are correct. |
| False otherwise. |
| |
| """ |
| self._get_btmon_log( |
| lambda: self.bluetooth_le_facade.set_advertising_intervals( |
| min_adv_interval_ms, max_adv_interval_ms)) |
| |
| # Verify the new advertising intervals. |
| # With intervals of 200 ms and 200 ms, the log looks like |
| # bluetoothd: Set Advertising Intervals: 0x0140, 0x0140 |
| txt = 'bluetoothd: Set Advertising Intervals: 0x%04x, 0x%04x' |
| adv_intervals_found = self.bluetooth_le_facade.btmon_find( |
| txt % (self.convert_to_adv_jiffies(min_adv_interval_ms), |
| self.convert_to_adv_jiffies(max_adv_interval_ms))) |
| |
| self.results = {'adv_intervals_found': adv_intervals_found} |
| return all(self.results.values()) |
| |
| |
| @test_retry_and_log(False) |
| def test_fail_to_set_advertising_intervals( |
| self, invalid_min_adv_interval_ms, invalid_max_adv_interval_ms, |
| orig_min_adv_interval_ms, orig_max_adv_interval_ms): |
| """Verify that setting invalid advertising intervals results in error. |
| |
| If invalid min/max advertising intervals are given, it would incur |
| the error: 'org.bluez.Error.InvalidArguments: Invalid arguments'. |
| Note that valid advertising intervals fall between 20 ms and 10,240 ms. |
| |
| @param invalid_min_adv_interval_ms: the invalid min advertising interval |
| in ms. |
| @param invalid_max_adv_interval_ms: the invalid max advertising interval |
| in ms. |
| @param orig_min_adv_interval_ms: the original min advertising interval |
| in ms. |
| @param orig_max_adv_interval_ms: the original max advertising interval |
| in ms. |
| |
| @returns: True if it fails to set invalid advertising intervals. |
| False otherwise. |
| |
| """ |
| self._get_btmon_log( |
| lambda: self.bluetooth_le_facade.set_advertising_intervals( |
| invalid_min_adv_interval_ms, |
| invalid_max_adv_interval_ms)) |
| |
| # Verify that the invalid error is observed in the dbus error callback |
| # message. |
| invalid_intervals_error = (self.ERROR_INVALID_ADVERTISING_INTERVALS in |
| self.advertising_msg) |
| |
| # Verify that the min/max advertising intervals remain the same |
| # after setting the invalid advertising intervals. |
| # |
| # In btmon log, we would see the following message first. |
| # bluetoothd: Set Advertising Intervals: 0x0010, 0x0010 |
| # And then, we should check if "Min advertising interval" and |
| # "Max advertising interval" remain the same. |
| start_str = 'bluetoothd: Set Advertising Intervals: 0x%04x, 0x%04x' % ( |
| self.convert_to_adv_jiffies(invalid_min_adv_interval_ms), |
| self.convert_to_adv_jiffies(invalid_max_adv_interval_ms)) |
| |
| search_strings = ['Min advertising interval:', |
| 'Max advertising interval:'] |
| search_str = '|'.join(search_strings) |
| |
| contents = self.bluetooth_le_facade.btmon_get(search_str=search_str, |
| start_str=start_str) |
| |
| # The min/max advertising intervals of all advertisements should remain |
| # the same as the previous valid ones. |
| min_max_str = '[Min|Max] advertising interval: (\d*\.\d*) msec' |
| min_max_pattern = re.compile(min_max_str) |
| correct_orig_min_adv_interval = True |
| correct_orig_max_adv_interval = True |
| for line in contents: |
| result = min_max_pattern.search(line) |
| if result: |
| interval = float(result.group(1)) |
| if 'Min' in line and interval != orig_min_adv_interval_ms: |
| correct_orig_min_adv_interval = False |
| elif 'Max' in line and interval != orig_max_adv_interval_ms: |
| correct_orig_max_adv_interval = False |
| |
| self.results = { |
| 'invalid_intervals_error': invalid_intervals_error, |
| 'correct_orig_min_adv_interval': correct_orig_min_adv_interval, |
| 'correct_orig_max_adv_interval': correct_orig_max_adv_interval} |
| |
| return all(self.results.values()) |
| |
| |
| @test_retry_and_log(False) |
| def test_check_advertising_intervals(self, min_adv_interval_ms, |
| max_adv_interval_ms): |
| """Verify that the advertising intervals are as expected. |
| |
| @param min_adv_interval_ms: the min advertising interval in ms. |
| @param max_adv_interval_ms: the max advertising interval in ms. |
| |
| @returns: True if the advertising intervals are correct. |
| False otherwise. |
| |
| """ |
| self._get_btmon_log(None) |
| |
| # Verify that the advertising intervals are correct. |
| min_adv_interval_ms_found, max_adv_interval_ms_found = ( |
| self._verify_advertising_intervals(min_adv_interval_ms, |
| max_adv_interval_ms)) |
| |
| self.results = { |
| 'min_adv_interval_ms_found': min_adv_interval_ms_found, |
| 'max_adv_interval_ms_found': max_adv_interval_ms_found, |
| } |
| return all(self.results.values()) |
| |
| |
| @test_retry_and_log(False) |
| def test_reset_advertising(self, instance_ids=[]): |
| """Verify that advertising is reset correctly. |
| |
| Note that reset advertising would set advertising intervals to |
| the default values. However, we would not be able to observe |
| the values change until new advertisements are registered. |
| Therefore, it is required that a test_register_advertisement() |
| test is conducted after this test. |
| |
| If instance_ids is [], all advertisements would still be removed |
| if there are any. However, no need to check 'Advertising Removed' |
| in btmon log since we may or may not be able to observe the message. |
| This feature is needed if this test is invoked as the first one in |
| a test case to reset advertising. In this situation, this test does |
| not know how many advertisements exist. |
| |
|