blob: 592762b5afb1ceb5b258b408ae38df5a4a979227 [file] [log] [blame]
# Copyright 2020 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Server side bluetooth tests on Advertisement Monitor API"""
import time
import logging
import array
from autotest_lib.client.bin import utils
from autotest_lib.server.cros.bluetooth import bluetooth_adapter_tests
class TestMonitor():
"""Local object hosting the test values for Advertisement Monitor object.
This class holds the values of parameters for creating an Advertisement
Monitor object.
"""
# Index of the pattern data in the patterns filter.
PATTERN_DATA_IDX = 2
def __init__(self, app_id):
"""Construction of a local monitor object.
@param app_id: the app id associated with the monitor.
"""
self.type = None
self.rssi = []
self.patterns = []
self.monitor_id = None
self.app_id = app_id
def _bytes(self, str_data):
"""Convert string data to byte array.
@param str_data: the string data.
@returns: the byte array.
"""
return [b for b in array.array('B', str_data)]
def update_type(self, monitor_type):
"""Update the monitor type.
@param monitor_type: type of the monitor.
"""
self.type = monitor_type
def update_rssi(self, monitor_rssi):
"""Update the RSSI filter values.
@param rssi: the list of rssi threshold and timeout values.
"""
self.rssi = monitor_rssi
def update_patterns(self, monitor_patterns):
"""Update the content filter patterns.
@param patterns: the list of start position, ad type and patterns.
"""
# Convert string patterns to byte array, if any.
for pattern in monitor_patterns:
if isinstance(pattern[self.PATTERN_DATA_IDX], str):
pattern[self.PATTERN_DATA_IDX] = self._bytes(
pattern[self.PATTERN_DATA_IDX])
self.patterns = monitor_patterns
def update_monitor_id(self, monitor_id):
"""Store the monitor id returned by add_monitor().
@param monitor_id: the monitor id.
"""
self.monitor_id = monitor_id
def get_monitor_data(self):
"""Return the monitor parameters.
@returns: List containing the monitor data.
"""
return [self.type, self.rssi, self.patterns]
def get_monitor_id(self):
"""Return the monitor id.
@returns: monitor id if monitor is already added, None otherwise.
"""
return self.monitor_id
def get_app_id(self):
"""Return the application id.
@returns: app id associated to the monitor object.
"""
return self.app_id
class BluetoothAdapterAdvMonitorTests(
bluetooth_adapter_tests.BluetoothAdapterTests):
"""Server side bluetooth adapter advertising Test.
This class comprises a number of test cases to verify bluetooth
Advertisement Monitor API.
Refer to the test plan doc for more details: go/bt-advmon-api-test-plan
"""
ADD_MONITOR_POLLING_TIMEOUT_SECS = 3
ADD_MONITOR_POLLING_SLEEP_SECS = 1
PAIR_TEST_SLEEP_SECS = 5
# Non-zero count value is used to indicate the case where multiple
# DeviceFound/DeviceLost events are expected to occur.
MULTIPLE_EVENTS = -1
test_case_log = bluetooth_adapter_tests.test_case_log
test_retry_and_log = bluetooth_adapter_tests.test_retry_and_log
def read_supported_types(self):
"""Read the Advertisement Monitor supported monitor types.
@returns: List of supported advertisement monitor types.
"""
return self.bluetooth_facade.advmon_read_supported_types()
def read_supported_features(self):
"""Read the Advertisement Monitor supported features.
@returns: List of supported advertisement monitor features.
"""
return self.bluetooth_facade.advmon_read_supported_features()
def create_app(self):
"""Create an advertisement monitor app.
@returns: app id, once the app is created.
"""
return self.bluetooth_facade.advmon_create_app()
def exit_app(self, app_id):
"""Exit an advertisement monitor app.
@param app_id: the app id.
@returns: True on success, False otherwise.
"""
return self.bluetooth_facade.advmon_exit_app(app_id)
def kill_app(self, app_id):
"""Kill an advertisement monitor app by sending SIGKILL.
@param app_id: the app id.
@returns: True on success, False otherwise.
"""
return self.bluetooth_facade.advmon_kill_app(app_id)
def register_app(self, app_id):
"""Register an advertisement monitor app.
@param app_id: the app id.
@returns: True on success, False otherwise.
"""
return self.bluetooth_facade.advmon_register_app(app_id)
def unregister_app(self, app_id):
"""Unregister an advertisement monitor app.
@param app_id: the app id.
@returns: True on success, False otherwise.
"""
return self.bluetooth_facade.advmon_unregister_app(app_id)
def add_monitor(self, app_id, monitor_data):
"""Create an Advertisement Monitor object.
@param app_id: the app id.
@param monitor_data: the list containing monitor type, RSSI filter
values and patterns.
@returns: monitor id, once the monitor is created, None otherwise.
"""
return self.bluetooth_facade.advmon_add_monitor(app_id, monitor_data)
def remove_monitor(self, app_id, monitor_id):
"""Remove the Advertisement Monitor object.
@param app_id: the app id.
@param monitor_id: the monitor id.
@returns: True on success, False otherwise.
"""
return self.bluetooth_facade.advmon_remove_monitor(app_id, monitor_id)
def get_event_count(self, app_id, monitor_id, event='All'):
"""Read the count of a particular event on the given monitor.
@param app_id: the app id.
@param monitor_id: the monitor id.
@param event: name of the specific event or 'All' for all events.
@returns: count of the specific event or dict of counts of all events.
"""
return self.bluetooth_facade.advmon_get_event_count(app_id,
monitor_id,
event)
def reset_event_count(self, app_id, monitor_id, event='All'):
"""Reset the count of a particular event on the given monitor.
@param app_id: the app id.
@param monitor_id: the monitor id.
@param event: name of the specific event or 'All' for all events.
@returns: True on success, False otherwise.
"""
return self.bluetooth_facade.advmon_reset_event_count(app_id,
monitor_id,
event)
@test_retry_and_log(False)
def test_supported_types(self):
"""Test supported monitor types.
@returns: True on success, False otherwise.
"""
supported_types = self.read_supported_types()
for supported_type in supported_types:
logging.info('type: %s', supported_type)
# TODO(b/169658213) - add check for supported types.
return True
@test_retry_and_log(False)
def test_supported_features(self):
"""Test supported features.
@returns: True on success, False otherwise.
"""
supported_features = self.read_supported_features()
for supported_feature in supported_features:
logging.info('feature: %s', supported_feature)
# TODO(b/169658213) - add check for supported features.
return True
@test_retry_and_log(False)
def test_exit_app(self, app_id):
"""Test exit application.
@param app_id: the app id.
@returns: True on success, False otherwise.
"""
return self.exit_app(app_id)
@test_retry_and_log(False)
def test_kill_app(self, app_id):
"""Test kill application.
@param app_id: the app id.
@returns: True on success, False otherwise.
"""
return self.kill_app(app_id)
@test_retry_and_log(False)
def test_register_app(self, app_id, expected=True):
"""Test register application.
@param app_id: the app id.
@param expected: expected result of the RegisterMonitor method.
@returns: True on success, False otherwise.
"""
return self.register_app(app_id) == expected
@test_retry_and_log(False)
def test_unregister_app(self, app_id, expected=True):
"""Test unregister application.
@param app_id: the app id.
@param expected: expected result of the UnregisterMonitor method.
@returns: True on success, False otherwise.
"""
return self.unregister_app(app_id) == expected
@test_retry_and_log(False)
def test_monitor_activate(self, monitor, expected):
"""Test if the Activate method on the monitor has been invoked or not.
@param monitor: the local monitor object.
@param expected: expected state of the Activate event.
@returns: True on success, False otherwise.
"""
app_id = monitor.get_app_id()
monitor_id = monitor.get_monitor_id()
if monitor_id is None:
return False
def _check_activate():
"""Handler for the activate event."""
return self.get_event_count(app_id, monitor_id, 'Activate') == 1
activated = False
try:
utils.poll_for_condition(
condition=_check_activate,
timeout=self.ADD_MONITOR_POLLING_TIMEOUT_SECS,
sleep_interval=self.ADD_MONITOR_POLLING_SLEEP_SECS,
desc='Waiting for activate')
activated = True
except utils.TimeoutError as e:
logging.error('activate: %s', e)
except:
logging.error('activate: unexpected error')
return expected == activated
@test_retry_and_log(False)
def test_monitor_release(self, monitor, expected):
"""Test if the Release method on the monitor has been invoked or not.
@param monitor: the local monitor object.
@param expected: expected state of the Release event.
@returns: True on success, False otherwise.
"""
app_id = monitor.get_app_id()
monitor_id = monitor.get_monitor_id()
if monitor_id is None:
return False
def _check_release():
"""Handler for the release event."""
return self.get_event_count(app_id, monitor_id, 'Release') == 1
released = False
try:
utils.poll_for_condition(
condition=_check_release,
timeout=self.ADD_MONITOR_POLLING_TIMEOUT_SECS,
sleep_interval=self.ADD_MONITOR_POLLING_SLEEP_SECS,
desc='Waiting for release')
released = True
except utils.TimeoutError as e:
logging.error('release: %s', e)
except Exception as e:
logging.error('release: %s', e)
except:
logging.error('release: unexpected error')
return expected == released
@test_retry_and_log(False)
def test_device_found(self, monitor, count, delay=0):
"""Test if the DeviceFound method on a monitor has been invoked or not.
@param monitor: the local monitor object.
@param count: expected count of the DeviceFound events.
@param delay: wait until 'delay' seconds before reading the event count.
@returns: True on success, False otherwise.
"""
app_id = monitor.get_app_id()
monitor_id = monitor.get_monitor_id()
if monitor_id is None:
return False
if delay:
time.sleep(delay)
checked_count = self.get_event_count(app_id, monitor_id, 'DeviceFound')
if count == self.MULTIPLE_EVENTS:
return checked_count > 1
return checked_count == count
@test_retry_and_log(False)
def test_device_lost(self, monitor, count, delay=0):
"""Test if the DeviceLost method on a monitor has been invoked or not.
@param monitor: the local monitor object.
@param count: expected count of the DeviceLost events.
@param delay: wait until 'delay' seconds before reading the event count.
@returns: True on success, False otherwise.
"""
app_id = monitor.get_app_id()
monitor_id = monitor.get_monitor_id()
if monitor_id is None:
return False
if delay:
time.sleep(delay)
checked_count = self.get_event_count(app_id, monitor_id, 'DeviceLost')
if count == self.MULTIPLE_EVENTS:
return checked_count > 1
return checked_count == count
@test_retry_and_log(False)
def test_reset_event_count(self, monitor, event='All'):
"""Test resetting count of a particular event on the given monitor.
@param monitor: the local monitor object.
@param event: name of the specific event or 'All' for all events.
@returns: True on success, False otherwise.
"""
return self.reset_event_count(monitor.get_app_id(),
monitor.get_monitor_id(),
event)
@test_retry_and_log(False)
def test_add_monitor(self, monitor, expected_activate=None,
expected_release=None):
"""Test adding a monitor.
@param monitor: the local monitor object.
@param expected_activate: expected state of the Activate event.
@param expected_release: expected state of the Release event.
@returns: True on success, False otherwise.
"""
app_id = monitor.get_app_id()
monitor_id = self.add_monitor(app_id, monitor.get_monitor_data())
if monitor_id is None:
return False
monitor.update_monitor_id(monitor_id)
checked_activate = True
if expected_activate is not None:
checked_activate = self.test_monitor_activate(
monitor, expected_activate)
checked_release = True
if expected_release is not None:
checked_release = self.test_monitor_release(
monitor, expected_release)
if self.get_event_count(app_id, monitor_id, 'Release') != 0:
self.remove_monitor(app_id, monitor_id)
monitor.update_monitor_id(None)
self.results = {
'activated': checked_activate,
'released': checked_release
}
return all(self.results.values())
@test_retry_and_log(False)
def test_remove_monitor(self, monitor):
"""Test removing a monitor.
@param monitor: the local monitor object.
@returns: True on success, False otherwise.
"""
app_id = monitor.get_app_id()
monitor_id = monitor.get_monitor_id()
if monitor_id is None:
return False
ret = self.remove_monitor(app_id, monitor_id)
monitor.update_monitor_id(None)
if ret is None:
return False
return True
@test_retry_and_log(False)
def test_setup_peer_devices(self):
"""Test availability of the peer devices.
@returns: True on success, False otherwise.
"""
self.peer_keybd = None
self.peer_mouse = None
for device_type, device_list in self.devices.items():
for device in device_list:
if device_type is 'BLE_KEYBOARD':
self.peer_keybd = device
elif device_type is 'BLE_MOUSE':
self.peer_mouse = device
if self.peer_keybd is not None and self.peer_mouse is not None:
self.test_stop_peer_device_adv(self.peer_keybd)
self.test_stop_peer_device_adv(self.peer_mouse)
self.results = {
'keybd': self.peer_keybd is not None,
'mouse': self.peer_mouse is not None
}
return all(self.results.values())
@test_retry_and_log(False)
def test_start_peer_device_adv(self, device, duration=0):
"""Test enabling the peer device advertisements.
@param device: the device object.
@param duration: the duration of the advertisement.
@returns: True on success, False otherwise.
"""
ret = self.test_device_set_discoverable(device, True)
if duration:
time.sleep(duration)
return ret
@test_retry_and_log(False)
def test_stop_peer_device_adv(self, device, duration=0):
"""Test disabling the peer device advertisements.
@param device: the device object.
@param duration: the duration of the advertisement disable.
@returns: True on success, False otherwise.
"""
ret = self.test_device_set_discoverable(device, False)
if duration:
time.sleep(duration)
return ret
def advmon_test_monitor_creation(self):
"""Test case: MONITOR_CREATION
Validate register/unregister app and create/remove monitor.
"""
# Create a test app instance.
app1 = self.create_app()
monitor1 = TestMonitor(app1)
monitor1.update_type('or_patterns')
monitor1.update_rssi([-40, 5, -60, 5])
monitor1.update_patterns([
[0, 0x19, [0xc2, 0x03]],
])
monitor2 = TestMonitor(app1)
monitor2.update_type('or_patterns')
monitor2.update_rssi([-40, 10, -60, 10])
monitor2.update_patterns([
[0, 0x03, [0x12, 0x18]],
])
# Read supported types and features, should not fail.
self.test_supported_types()
self.test_supported_features()
# Activate/Release should not get called.
self.test_add_monitor(monitor1,
expected_activate=False,
expected_release=False)
# Register the app, should not fail.
self.test_register_app(app1)
# Already registered app path, should fail with AlreadyExists.
self.test_register_app(app1, expected=False)
# Activate should get called for the monitor added before register app.
self.test_monitor_activate(monitor1, expected=True)
# Correct monitor parameters, activate should get called.
self.test_add_monitor(monitor2, expected_activate=True)
# Remove a monitor, should not fail.
self.test_remove_monitor(monitor1)
# Unregister the app, should not fail.
self.test_unregister_app(app1)
# Already unregistered app path, should fail with DoesNotExists.
self.test_unregister_app(app1, expected=False)
# Release should get called for a monitor not removed before unregister.
self.test_monitor_release(monitor2, expected=True)
# Remove another monitor, should not fail.
self.test_remove_monitor(monitor2)
# Terminate the test app instance.
self.test_exit_app(app1)
def advmon_test_monitor_validity(self):
"""Test case: MONITOR_VALIDITY
Validate monitor parameters - monitor type, patterns, RSSI filter
values.
"""
# Create a test app instance.
app1 = self.create_app()
monitor1 = TestMonitor(app1)
monitor1.update_type('incorrect_pattern')
monitor1.update_rssi([-40, 5, -60, 5])
monitor1.update_patterns([
[0, 0x19, [0xc2, 0x03]],
])
monitor2 = TestMonitor(app1)
monitor2.update_type('or_patterns')
monitor2.update_rssi([-40, 10, -60, 10])
monitor2.update_patterns([
[0, 0x03, [0x12, 0x18]],
])
# Register the app, should not fail.
self.test_register_app(app1)
# Incorrect monitor type, release should get called.
self.test_add_monitor(monitor1, expected_release=True)
# Incorrect rssi parameters, release should get called.
monitor2.update_rssi([-40, 0, -60, 10])
self.test_add_monitor(monitor2, expected_release=True)
monitor2.update_rssi([-40, 10, -60, 0])
self.test_add_monitor(monitor2, expected_release=True)
monitor2.update_rssi([40, 10, -60, 10])
self.test_add_monitor(monitor2, expected_release=True)
monitor2.update_rssi([-140, 10, -60, 10])
self.test_add_monitor(monitor2, expected_release=True)
monitor2.update_rssi([-40, 10, 60, 10])
self.test_add_monitor(monitor2, expected_release=True)
monitor2.update_rssi([-40, 10, -160, 10])
self.test_add_monitor(monitor2, expected_release=True)
monitor2.update_rssi([-60, 10, -40, 10])
self.test_add_monitor(monitor2, expected_release=True)
# Unset the rssi filter parameters.
monitor2.update_rssi([127, 0, 127, 0])
# Incorrect pattern parameters, release should get called.
monitor2.update_patterns([
[32, 0x09, 'MOUSE'],
])
self.test_add_monitor(monitor2, expected_release=True)
monitor2.update_patterns([
[0, 0x00, 'MOUSE'],
])
self.test_add_monitor(monitor2, expected_release=True)
monitor2.update_patterns([
[0, 0x40, 'MOUSE'],
])
self.test_add_monitor(monitor2, expected_release=True)
monitor2.update_patterns([
[0, 0x09, '0123456789ABCDEF0123456789ABCDEF0'],
])
self.test_add_monitor(monitor2, expected_release=True)
monitor2.update_patterns([
[32, 0x09, [0xc2, 0x03]],
[0, 3, [0x12, 0x18]],
])
self.test_add_monitor(monitor2, expected_release=True)
monitor2.update_patterns([
[0, 0x19, [0xc2, 0x03]],
[0, 0x00, [0x12, 0x18]],
])
self.test_add_monitor(monitor2, expected_release=True)
# Correct pattern parameters, activate should get called.
monitor2.update_patterns([
[0, 0x09, 'MOUSE'],
])
self.test_add_monitor(monitor2, expected_activate=True)
self.test_remove_monitor(monitor2)
monitor2.update_rssi([-40, 10, -60, 10])
monitor2.update_patterns([
[0, 0x19, [0xc2, 0x03]],
[0, 0x03, [0x12, 0x18]],
])
self.test_add_monitor(monitor2, expected_activate=True)
self.test_remove_monitor(monitor2)
# Unregister the app, should not fail.
self.test_unregister_app(app1)
# Terminate the test app instance.
self.test_exit_app(app1)
def advmon_test_pattern_filter_1(self):
"""Test case: PATTERN_FILTER_1
Verify matching of advertisements w.r.t. various pattern values and
different AD Data Types - Local Name Service UUID and Device Type.
"""
self.test_setup_peer_devices()
# Create a test app instance.
app1 = self.create_app()
monitor1 = TestMonitor(app1)
monitor1.update_type('or_patterns')
monitor1.update_rssi([-60, 3, -80, 3])
# Register the app, should not fail.
self.test_register_app(app1)
monitor1.update_patterns([
[5, 0x09, '_REF'],
])
self.test_add_monitor(monitor1, expected_activate=True)
# Local name 'KEYBD_REF' should match.
self.test_start_peer_device_adv(self.peer_keybd, duration=5)
self.test_device_found(monitor1, count=1)
# Local name 'MOUSE_REF' should match.
self.test_start_peer_device_adv(self.peer_mouse, duration=5)
self.test_device_found(monitor1, count=2)
self.test_stop_peer_device_adv(self.peer_keybd)
self.test_stop_peer_device_adv(self.peer_mouse)
self.test_remove_monitor(monitor1)
monitor1.update_patterns([
[0, 0x03, [0x12, 0x18]],
])
self.test_add_monitor(monitor1, expected_activate=True)
# Service UUID 0x1812 should match.
self.test_start_peer_device_adv(self.peer_keybd, duration=5)
self.test_device_found(monitor1, count=1)
# Service UUID 0x1812 should match.
self.test_start_peer_device_adv(self.peer_mouse, duration=5)
self.test_device_found(monitor1, count=2)
self.test_stop_peer_device_adv(self.peer_keybd)
self.test_stop_peer_device_adv(self.peer_mouse)
self.test_remove_monitor(monitor1)
monitor1.update_patterns([
[0, 0x19, [0xc1, 0x03]],
[0, 0x09, 'MOUSE'],
])
self.test_add_monitor(monitor1, expected_activate=True)
# Device type 0xc103 should match.
self.test_start_peer_device_adv(self.peer_keybd, duration=5)
self.test_device_found(monitor1, count=1)
# Local name 'MOUSE_REF' should match.
self.test_start_peer_device_adv(self.peer_mouse, duration=5)
self.test_device_found(monitor1, count=2)
self.test_stop_peer_device_adv(self.peer_keybd)
self.test_stop_peer_device_adv(self.peer_mouse)
self.test_remove_monitor(monitor1)
monitor1.update_patterns([
[0, 0x19, [0xc1, 0x03]],
[0, 0x19, [0xc3, 0x03]],
])
self.test_add_monitor(monitor1, expected_activate=True)
# Device type 0xc103 should match.
self.test_start_peer_device_adv(self.peer_keybd, duration=5)
self.test_device_found(monitor1, count=1)
# Device type 0xc203 should not match.
self.test_start_peer_device_adv(self.peer_mouse, duration=5)
self.test_device_found(monitor1, count=1)
self.test_stop_peer_device_adv(self.peer_keybd)
self.test_stop_peer_device_adv(self.peer_mouse)
self.test_remove_monitor(monitor1)
# Unregister the app, should not fail.
self.test_unregister_app(app1)
# Terminate the test app instance.
self.test_exit_app(app1)
def advmon_test_rssi_filter_1(self):
"""Test case: RSSI_FILTER_1
Verify unset RSSI filter and filter with no matching RSSI values.
"""
self.test_setup_peer_devices()
# Create a test app instance.
app1 = self.create_app()
monitor1 = TestMonitor(app1)
monitor1.update_type('or_patterns')
monitor1.update_patterns([
[0, 0x03, [0x12, 0x18]],
])
# Register the app, should not fail.
self.test_register_app(app1)
monitor1.update_rssi([127, 0, 127, 0])
self.test_add_monitor(monitor1, expected_activate=True)
# Unset RSSI filter, adv should match multiple times.
self.test_start_peer_device_adv(self.peer_keybd, duration=5)
self.test_device_found(monitor1, count=self.MULTIPLE_EVENTS)
# Unset RSSI filter, DeviceLost should not get triggered.
self.test_stop_peer_device_adv(self.peer_keybd, duration=5)
self.test_device_lost(monitor1, count=0)
self.test_remove_monitor(monitor1)
monitor1.update_rssi([-10, 5, -20, 5])
self.test_add_monitor(monitor1, expected_activate=True)
# Adv RSSI lower than RSSI filter, DeviceFound should not get triggered.
self.test_start_peer_device_adv(self.peer_keybd, duration=10)
self.test_device_found(monitor1, count=0)
# No device was found earlier, so DeviceLost should not get triggered.
self.test_stop_peer_device_adv(self.peer_keybd, duration=10)
self.test_device_lost(monitor1, count=0)
self.test_remove_monitor(monitor1)
# Unregister the app, should not fail.
self.test_unregister_app(app1)
# Terminate the test app instance.
self.test_exit_app(app1)
def advmon_test_rssi_filter_2(self):
"""Test case: RSSI_FILTER_2
Verify RSSI filter matching with multiple peer devices.
"""
self.test_setup_peer_devices()
# Create a test app instance.
app1 = self.create_app()
monitor1 = TestMonitor(app1)
monitor1.update_type('or_patterns')
monitor1.update_patterns([
[0, 0x03, [0x12, 0x18]],
])
# Register the app, should not fail.
self.test_register_app(app1)
monitor1.update_rssi([-60, 3, -80, 3])
self.test_add_monitor(monitor1, expected_activate=True)
# DeviceFound should get triggered only once per device.
self.test_start_peer_device_adv(self.peer_keybd, duration=10)
self.test_device_found(monitor1, count=1)
# DeviceFound should get triggered for another device.
self.test_start_peer_device_adv(self.peer_mouse, duration=10)
self.test_device_found(monitor1, count=2)
# DeviceLost should get triggered only once per device.
self.test_stop_peer_device_adv(self.peer_keybd, duration=10)
self.test_device_lost(monitor1, count=1)
# DeviceLost should get triggered for another device.
self.test_stop_peer_device_adv(self.peer_mouse, duration=10)
self.test_device_lost(monitor1, count=2)
self.test_remove_monitor(monitor1)
monitor1.update_rssi([-60, 10, -80, 10])
self.test_add_monitor(monitor1, expected_activate=True)
# Device was online for short period of time, so DeviceFound should
# not get triggered.
self.test_start_peer_device_adv(self.peer_keybd, duration=5)
self.test_device_found(monitor1, count=0)
# Device did not come back online, DeviceFound should not get triggered.
# No device was found earlier, so DeviceLost should not get triggered.
self.test_stop_peer_device_adv(self.peer_keybd, duration=15)
self.test_device_found(monitor1, count=0)
self.test_device_lost(monitor1, count=0)
self.test_remove_monitor(monitor1)
# Unregister the app, should not fail.
self.test_unregister_app(app1)
# Terminate the test app instance.
self.test_exit_app(app1)
def advmon_test_rssi_filter_3(self):
"""Test case: RSSI_FILTER_3
Verify reset of RSSI timers based on advertisements.
"""
self.test_setup_peer_devices()
# Create a test app instance.
app1 = self.create_app()
monitor1 = TestMonitor(app1)
monitor1.update_type('or_patterns')
monitor1.update_patterns([
[0, 0x03, [0x12, 0x18]],
])
# Register the app, should not fail.
self.test_register_app(app1)
monitor1.update_rssi([-60, 10, -80, 10])
self.test_add_monitor(monitor1, expected_activate=True)
# DeviceFound should not get triggered before timeout.
self.test_start_peer_device_adv(self.peer_keybd, duration=5)
self.test_device_found(monitor1, count=0)
# DeviceFound should not get triggered as device went offline.
# No device was found earlier, so DeviceLost should not get triggered.
self.test_stop_peer_device_adv(self.peer_keybd, duration=10)
self.test_device_found(monitor1, count=0)
self.test_device_lost(monitor1, count=0)
# Timer should get reset, so DeviceFound should not get triggered.
self.test_start_peer_device_adv(self.peer_keybd, duration=5)
self.test_device_found(monitor1, count=0)
# DeviceFound should get triggered once timer completes.
self.test_device_found(monitor1, count=1, delay=10)
# DeviceLost should not get triggered before timeout.
self.test_stop_peer_device_adv(self.peer_keybd, duration=5)
self.test_device_lost(monitor1, count=0)
# Timer should get reset, so DeviceLost should not get triggered.
# DeviceFound should not get triggered as device is not lost yet.
self.test_start_peer_device_adv(self.peer_keybd, duration=5)
self.test_device_lost(monitor1, count=0)
self.test_device_found(monitor1, count=1)
# Timer should get reset, so DeviceLost should not get triggered.
self.test_stop_peer_device_adv(self.peer_keybd, duration=5)
self.test_device_lost(monitor1, count=0)
# DeviceLost should get triggered once timer completes.
self.test_device_lost(monitor1, count=1, delay=10)
self.test_remove_monitor(monitor1)
# Unregister the app, should not fail.
self.test_unregister_app(app1)
# Terminate the test app instance.
self.test_exit_app(app1)
def advmon_test_fg_bg_combination(self):
"""Test case: FG_BG_COMBINATION
Verify background scanning and foreground scanning do not interfere
working of each other.
"""
self.test_setup_peer_devices()
# Create a test app instance.
app1 = self.create_app()
monitor1 = TestMonitor(app1)
monitor1.update_type('or_patterns')
monitor1.update_patterns([
[0, 0x03, [0x12, 0x18]],
])
monitor1.update_rssi([127, 0, 127, 0])
# Register the app, should not fail.
self.test_register_app(app1)
# Activate should get invoked.
self.test_add_monitor(monitor1, expected_activate=True)
# Pair/connect LE Mouse.
self.test_start_peer_device_adv(self.peer_mouse, duration=5)
time.sleep(self.PAIR_TEST_SLEEP_SECS)
self.test_discover_device(self.peer_mouse.address)
time.sleep(self.PAIR_TEST_SLEEP_SECS)
self.test_pairing(self.peer_mouse.address, self.peer_mouse.pin)
time.sleep(self.PAIR_TEST_SLEEP_SECS)
self.test_connection_by_adapter(self.peer_mouse.address)
self.test_connection_by_device(self.peer_mouse)
# DeviceFound should get triggered for keyboard.
self.test_reset_event_count(monitor1)
self.test_start_peer_device_adv(self.peer_keybd, duration=5)
self.test_device_found(monitor1, count=self.MULTIPLE_EVENTS)
self.test_stop_peer_device_adv(self.peer_keybd)
# Start foreground scanning.
self.test_start_discovery()
# Disconnect LE mouse.
self.test_disconnection_by_device(self.peer_mouse)
# Remove the monitor.
self.test_remove_monitor(monitor1)
# Activate should get invoked.
self.test_add_monitor(monitor1, expected_activate=True)
# Connect LE mouse.
self.test_connection_by_device(self.peer_mouse)
# DeviceFound should get triggered for keyboard.
self.test_reset_event_count(monitor1)
self.test_start_peer_device_adv(self.peer_keybd, duration=5)
self.test_device_found(monitor1, count=self.MULTIPLE_EVENTS)
self.test_stop_peer_device_adv(self.peer_keybd)
# Stop foreground scanning.
self.test_stop_discovery()
# Disconnect LE mouse.
self.test_disconnection_by_device(self.peer_mouse)
# DeviceFound should get triggered for keyboard.
self.test_reset_event_count(monitor1)
self.test_start_peer_device_adv(self.peer_keybd, duration=5)
self.test_device_found(monitor1, count=self.MULTIPLE_EVENTS)
self.test_stop_peer_device_adv(self.peer_keybd)
# Remove the monitor.
self.test_remove_monitor(monitor1)
# Connect LE mouse.
self.test_connection_by_device(self.peer_mouse)
# Unregister the app, should not fail.
self.test_unregister_app(app1)
# Terminate the test app instance.
self.test_exit_app(app1)