| # Copyright 2020 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Server side bluetooth tests on Advertisement Monitor API""" |
| |
| import time |
| import logging |
| import array |
| |
| from autotest_lib.client.bin import utils |
| from autotest_lib.server.cros.bluetooth import bluetooth_adapter_tests |
| from autotest_lib.client.common_lib import error |
| |
| |
| class TestMonitor(): |
| """Local object hosting the test values for Advertisement Monitor object. |
| |
| This class holds the values of parameters for creating an Advertisement |
| Monitor object. |
| |
| """ |
| |
| # Index of the pattern data in the patterns filter. |
| PATTERN_DATA_IDX = 2 |
| |
| def __init__(self, app_id): |
| """Construction of a local monitor object. |
| |
| @param app_id: the app id associated with the monitor. |
| |
| """ |
| self.type = None |
| self.rssi = [] |
| self.patterns = [] |
| self.monitor_id = None |
| self.app_id = app_id |
| |
| |
| def _bytes(self, str_data): |
| """Convert string data to byte array. |
| |
| @param str_data: the string data. |
| |
| @returns: the byte array. |
| |
| """ |
| return [b for b in array.array('B', str_data)] |
| |
| |
| def update_type(self, monitor_type): |
| """Update the monitor type. |
| |
| @param monitor_type: type of the monitor. |
| |
| """ |
| self.type = monitor_type |
| |
| |
| def update_rssi(self, monitor_rssi): |
| """Update the RSSI filter values. |
| |
| @param rssi: the list of rssi threshold and timeout values. |
| |
| """ |
| self.rssi = monitor_rssi |
| |
| |
| def update_patterns(self, monitor_patterns): |
| """Update the content filter patterns. |
| |
| @param patterns: the list of start position, ad type and patterns. |
| |
| """ |
| # Convert string patterns to byte array, if any. |
| for pattern in monitor_patterns: |
| if isinstance(pattern[self.PATTERN_DATA_IDX], str): |
| pattern[self.PATTERN_DATA_IDX] = self._bytes( |
| pattern[self.PATTERN_DATA_IDX]) |
| |
| self.patterns = monitor_patterns |
| |
| |
| def update_monitor_id(self, monitor_id): |
| """Store the monitor id returned by add_monitor(). |
| |
| @param monitor_id: the monitor id. |
| |
| """ |
| self.monitor_id = monitor_id |
| |
| |
| def get_monitor_data(self): |
| """Return the monitor parameters. |
| |
| @returns: List containing the monitor data. |
| |
| """ |
| return [self.type, self.rssi, self.patterns] |
| |
| |
| def get_monitor_id(self): |
| """Return the monitor id. |
| |
| @returns: monitor id if monitor is already added, None otherwise. |
| |
| """ |
| return self.monitor_id |
| |
| |
| def get_app_id(self): |
| """Return the application id. |
| |
| @returns: app id associated to the monitor object. |
| |
| """ |
| return self.app_id |
| |
| |
| class BluetoothAdapterAdvMonitorTests( |
| bluetooth_adapter_tests.BluetoothAdapterTests): |
| """Server side bluetooth adapter advertising Test. |
| |
| This class comprises a number of test cases to verify bluetooth |
| Advertisement Monitor API. |
| |
| Refer to the test plan doc for more details: go/bt-advmon-api-test-plan |
| |
| """ |
| |
| ADD_MONITOR_POLLING_TIMEOUT_SECS = 3 |
| ADD_MONITOR_POLLING_SLEEP_SECS = 1 |
| PAIR_TEST_SLEEP_SECS = 5 |
| |
| # Non-zero count value is used to indicate the case where multiple |
| # DeviceFound/DeviceLost events are expected to occur. |
| MULTIPLE_EVENTS = -1 |
| |
| # Number of cycle to observe during a test |
| INTERLEAVE_SCAN_TEST_CYCLE = 10 |
| # Acceptable extra delay of interleave scan duration, in sec |
| INTERLEAVE_SCAN_DURATION_TOLERANCE = 0.1 |
| # Acceptable delay of cancelling interleave scan, in sec |
| INTERLEAVE_SCAN_CANCEL_TOLERANCE = 2 |
| # Acceptable extra/missing cycles in interleave scan |
| INTERLEAVE_SCAN_CYCLE_NUM_TOLERANCE = 2 |
| |
| # Duration of kernel perform 'start discovery', in sec |
| DISCOVERY_DURATION = 10.24 |
| |
| test_case_log = bluetooth_adapter_tests.test_case_log |
| test_retry_and_log = bluetooth_adapter_tests.test_retry_and_log |
| |
| |
| def advmon_check_manager_interface_exist(self): |
| """Check if AdvertisementMonitorManager1 interface is available. |
| |
| @returns: True if Manager interface is available, False otherwise. |
| |
| """ |
| return self.bluetooth_facade.advmon_check_manager_interface_exist() |
| |
| |
| def read_supported_types(self): |
| """Read the Advertisement Monitor supported monitor types. |
| |
| @returns: List of supported advertisement monitor types. |
| |
| """ |
| return self.bluetooth_facade.advmon_read_supported_types() |
| |
| |
| def read_supported_features(self): |
| """Read the Advertisement Monitor supported features. |
| |
| @returns: List of supported advertisement monitor features. |
| |
| """ |
| return self.bluetooth_facade.advmon_read_supported_features() |
| |
| |
| def create_app(self): |
| """Create an advertisement monitor app. |
| |
| @returns: app id, once the app is created. |
| |
| """ |
| return self.bluetooth_facade.advmon_create_app() |
| |
| |
| def exit_app(self, app_id): |
| """Exit an advertisement monitor app. |
| |
| @param app_id: the app id. |
| |
| @returns: True on success, False otherwise. |
| |
| """ |
| return self.bluetooth_facade.advmon_exit_app(app_id) |
| |
| |
| def kill_app(self, app_id): |
| """Kill an advertisement monitor app by sending SIGKILL. |
| |
| @param app_id: the app id. |
| |
| @returns: True on success, False otherwise. |
| |
| """ |
| return self.bluetooth_facade.advmon_kill_app(app_id) |
| |
| |
| def register_app(self, app_id): |
| """Register an advertisement monitor app. |
| |
| @param app_id: the app id. |
| |
| @returns: True on success, False otherwise. |
| |
| """ |
| return self.bluetooth_facade.advmon_register_app(app_id) |
| |
| |
| def unregister_app(self, app_id): |
| """Unregister an advertisement monitor app. |
| |
| @param app_id: the app id. |
| |
| @returns: True on success, False otherwise. |
| |
| """ |
| return self.bluetooth_facade.advmon_unregister_app(app_id) |
| |
| |
| def add_monitor(self, app_id, monitor_data): |
| """Create an Advertisement Monitor object. |
| |
| @param app_id: the app id. |
| @param monitor_data: the list containing monitor type, RSSI filter |
| values and patterns. |
| |
| @returns: monitor id, once the monitor is created, None otherwise. |
| |
| """ |
| return self.bluetooth_facade.advmon_add_monitor(app_id, monitor_data) |
| |
| |
| def remove_monitor(self, app_id, monitor_id): |
| """Remove the Advertisement Monitor object. |
| |
| @param app_id: the app id. |
| @param monitor_id: the monitor id. |
| |
| @returns: True on success, False otherwise. |
| |
| """ |
| return self.bluetooth_facade.advmon_remove_monitor(app_id, monitor_id) |
| |
| |
| def get_event_count(self, app_id, monitor_id, event='All'): |
| """Read the count of a particular event on the given monitor. |
| |
| @param app_id: the app id. |
| @param monitor_id: the monitor id. |
| @param event: name of the specific event or 'All' for all events. |
| |
| @returns: count of the specific event or dict of counts of all events. |
| |
| """ |
| return self.bluetooth_facade.advmon_get_event_count(app_id, |
| monitor_id, |
| event) |
| |
| |
| def reset_event_count(self, app_id, monitor_id, event='All'): |
| """Reset the count of a particular event on the given monitor. |
| |
| @param app_id: the app id. |
| @param monitor_id: the monitor id. |
| @param event: name of the specific event or 'All' for all events. |
| |
| @returns: True on success, False otherwise. |
| |
| """ |
| return self.bluetooth_facade.advmon_reset_event_count(app_id, |
| monitor_id, |
| event) |
| |
| |
| def interleave_logger_start(self): |
| """ Start interleave logger recording |
| """ |
| self.bluetooth_facade.advmon_interleave_scan_logger_start() |
| |
| def interleave_logger_stop(self): |
| """ Stop interleave logger recording |
| |
| @returns: True if logs were successfully collected, |
| False otherwise. |
| |
| """ |
| return self.bluetooth_facade.advmon_interleave_scan_logger_stop() |
| |
| def interleave_logger_get_records(self): |
| """ Get records in previous log collections |
| |
| @returns: a list of records, where each item is a record of |
| interleave |state| and the |time| the state starts. |
| |state| could be {'no filter', 'allowlist'} |
| |time| is system time in sec |
| |
| """ |
| return self.bluetooth_facade.\ |
| advmon_interleave_scan_logger_get_records() |
| |
| def interleave_logger_get_cancel_event(self): |
| """ Get cancel event in previous log collections |
| |
| @returns: the first cancel event in the collections, |
| None if no cancel event was found |
| |
| """ |
| events = self.bluetooth_facade.\ |
| advmon_interleave_scan_logger_get_cancel_events() |
| if len(events) == 0: |
| event = None |
| else: |
| event = events[0] |
| if len(events) > 1: |
| logging.warning('More than one cancel events found %s', events) |
| return event |
| |
| @test_retry_and_log(False) |
| def test_supported_types(self): |
| """Test supported monitor types. |
| |
| @returns: True on success, False otherwise. |
| |
| """ |
| supported_types = self.read_supported_types() |
| for supported_type in supported_types: |
| logging.info('type: %s', supported_type) |
| |
| # TODO(b/169658213) - add check for supported types. |
| return True |
| |
| |
| @test_retry_and_log(False) |
| def test_supported_features(self): |
| """Test supported features. |
| |
| @returns: True on success, False otherwise. |
| |
| """ |
| supported_features = self.read_supported_features() |
| for supported_feature in supported_features: |
| logging.info('feature: %s', supported_feature) |
| |
| # TODO(b/169658213) - add check for supported features. |
| return True |
| |
| |
| def test_is_controller_offloading_supported(self): |
| """Check if controller based RSSI filtering is supported. |
| |
| By default the LE_SCAN_FILTER_DUP flag is enabled on all platforms. |
| Due to this, the host does not receive as many advertisements during |
| passive scanning, which causes SW based RSSI filtering not to work |
| as intended. So, if the controller offloading is not supported, skip |
| the tests that involves RSSI filtering and raise TEST_NA. |
| |
| @raises: TestNA if controller based RSSI filtering is not supported. |
| |
| """ |
| supported_features = self.read_supported_features() |
| if not supported_features: |
| logging.info('Controller offloading not supported') |
| raise error.TestNAError('Controller offloading not supported') |
| |
| |
| def test_is_adv_monitoring_supported(self, require_rssi_filtering = False): |
| """Check if Adv Monitor API is supported. |
| |
| If AdvMonitor API is not supported by the platform, |
| AdvertisementMonitorManager1 interface won't be exposed by |
| bluetoothd. In such case, skip the test and raise TestNA. |
| |
| @param require_rssi_filtering: True if test requires RSSI filtering. |
| |
| @raises: TestNA if Adv Monitor API is not supported or if controller |
| based RSSI filtering is not supported. |
| |
| """ |
| if not self.advmon_check_manager_interface_exist(): |
| logging.info('Advertisement Monitor API not supported') |
| raise error.TestNAError('Advertisement Monitor API not supported') |
| |
| if require_rssi_filtering: |
| self.test_is_controller_offloading_supported() |
| |
| |
| @test_retry_and_log(False) |
| def test_exit_app(self, app_id): |
| """Test exit application. |
| |
| @param app_id: the app id. |
| |
| @returns: True on success, False otherwise. |
| |
| """ |
| return self.exit_app(app_id) |
| |
| |
| @test_retry_and_log(False) |
| def test_kill_app(self, app_id): |
| """Test kill application. |
| |
| @param app_id: the app id. |
| |
| @returns: True on success, False otherwise. |
| |
| """ |
| return self.kill_app(app_id) |
| |
| |
| @test_retry_and_log(False) |
| def test_register_app(self, app_id, expected=True): |
| """Test register application. |
| |
| @param app_id: the app id. |
| @param expected: expected result of the RegisterMonitor method. |
| |
| @returns: True on success, False otherwise. |
| |
| """ |
| return self.register_app(app_id) == expected |
| |
| |
| @test_retry_and_log(False) |
| def test_unregister_app(self, app_id, expected=True): |
| """Test unregister application. |
| |
| @param app_id: the app id. |
| @param expected: expected result of the UnregisterMonitor method. |
| |
| @returns: True on success, False otherwise. |
| |
| """ |
| return self.unregister_app(app_id) == expected |
| |
| |
| @test_retry_and_log(False) |
| def test_monitor_activate(self, monitor, expected): |
| """Test if the Activate method on the monitor has been invoked or not. |
| |
| @param monitor: the local monitor object. |
| @param expected: expected state of the Activate event. |
| |
| @returns: True on success, False otherwise. |
| |
| """ |
| app_id = monitor.get_app_id() |
| monitor_id = monitor.get_monitor_id() |
| if monitor_id is None: |
| return False |
| |
| def _check_activate(): |
| """Handler for the activate event.""" |
| return self.get_event_count(app_id, monitor_id, 'Activate') == 1 |
| |
| activated = False |
| try: |
| utils.poll_for_condition( |
| condition=_check_activate, |
| timeout=self.ADD_MONITOR_POLLING_TIMEOUT_SECS, |
| sleep_interval=self.ADD_MONITOR_POLLING_SLEEP_SECS, |
| desc='Waiting for activate') |
| activated = True |
| except utils.TimeoutError as e: |
| logging.error('activate: %s', e) |
| except: |
| logging.error('activate: unexpected error') |
| |
| return expected == activated |
| |
| |
| @test_retry_and_log(False) |
| def test_monitor_release(self, monitor, expected): |
| """Test if the Release method on the monitor has been invoked or not. |
| |
| @param monitor: the local monitor object. |
| @param expected: expected state of the Release event. |
| |
| @returns: True on success, False otherwise. |
| |
| """ |
| app_id = monitor.get_app_id() |
| monitor_id = monitor.get_monitor_id() |
| if monitor_id is None: |
| return False |
| |
| def _check_release(): |
| """Handler for the release event.""" |
| return self.get_event_count(app_id, monitor_id, 'Release') == 1 |
| |
| released = False |
| try: |
| utils.poll_for_condition( |
| condition=_check_release, |
| timeout=self.ADD_MONITOR_POLLING_TIMEOUT_SECS, |
| sleep_interval=self.ADD_MONITOR_POLLING_SLEEP_SECS, |
| desc='Waiting for release') |
| released = True |
| except utils.TimeoutError as e: |
| logging.error('release: %s', e) |
| except Exception as e: |
| logging.error('release: %s', e) |
| except: |
| logging.error('release: unexpected error') |
| |
| return expected == released |
| |
| |
| @test_retry_and_log(True) |
| def test_device_found(self, monitor, count, delay=0): |
| """Test if the DeviceFound method on a monitor has been invoked or not. |
| |
| @param monitor: the local monitor object. |
| @param count: expected count of the DeviceFound events. |
| @param delay: wait until 'delay' seconds before reading the event count. |
| |
| @returns: True on success, False otherwise. |
| |
| """ |
| app_id = monitor.get_app_id() |
| monitor_id = monitor.get_monitor_id() |
| if monitor_id is None: |
| return False |
| |
| if delay: |
| time.sleep(delay) |
| |
| checked_count = self.get_event_count(app_id, monitor_id, 'DeviceFound') |
| |
| if count == self.MULTIPLE_EVENTS: |
| return checked_count > 0 |
| |
| return checked_count == count |
| |
| |
| @test_retry_and_log(False) |
| def test_device_lost(self, monitor, count, delay=0): |
| """Test if the DeviceLost method on a monitor has been invoked or not. |
| |
| @param monitor: the local monitor object. |
| @param count: expected count of the DeviceLost events. |
| @param delay: wait until 'delay' seconds before reading the event count. |
| |
| @returns: True on success, False otherwise. |
| |
| """ |
| app_id = monitor.get_app_id() |
| monitor_id = monitor.get_monitor_id() |
| if monitor_id is None: |
| return False |
| |
| if delay: |
| time.sleep(delay) |
| |
| checked_count = self.get_event_count(app_id, monitor_id, 'DeviceLost') |
| |
| if count == self.MULTIPLE_EVENTS: |
| return checked_count > 1 |
| |
| return checked_count == count |
| |
| |
| @test_retry_and_log(False) |
| def test_reset_event_count(self, monitor, event='All'): |
| """Test resetting count of a particular event on the given monitor. |
| |
| @param monitor: the local monitor object. |
| @param event: name of the specific event or 'All' for all events. |
| |
| @returns: True on success, False otherwise. |
| |
| """ |
| return self.reset_event_count(monitor.get_app_id(), |
| monitor.get_monitor_id(), |
| event) |
| |
| |
| @test_retry_and_log(False) |
| def test_add_monitor(self, monitor, expected_activate=None, |
| expected_release=None): |
| """Test adding a monitor. |
| |
| @param monitor: the local monitor object. |
| @param expected_activate: expected state of the Activate event. |
| @param expected_release: expected state of the Release event. |
| |
| @returns: True on success, False otherwise. |
| |
| """ |
| app_id = monitor.get_app_id() |
| monitor_id = self.add_monitor(app_id, monitor.get_monitor_data()) |
| if monitor_id is None: |
| return False |
| monitor.update_monitor_id(monitor_id) |
| |
| checked_activate = True |
| if expected_activate is not None: |
| checked_activate = self.test_monitor_activate( |
| monitor, expected_activate) |
| |
| checked_release = True |
| if expected_release is not None: |
| checked_release = self.test_monitor_release( |
| monitor, expected_release) |
| |
| if self.get_event_count(app_id, monitor_id, 'Release') != 0: |
| self.remove_monitor(app_id, monitor_id) |
| monitor.update_monitor_id(None) |
| |
| self.results = { |
| 'activated': checked_activate, |
| 'released': checked_release |
| } |
| return all(self.results.values()) |
| |
| |
| @test_retry_and_log(False) |
| def test_remove_monitor(self, monitor): |
| """Test removing a monitor. |
| |
| @param monitor: the local monitor object. |
| |
| @returns: True on success, False otherwise. |
| |
| """ |
| app_id = monitor.get_app_id() |
| monitor_id = monitor.get_monitor_id() |
| if monitor_id is None: |
| return False |
| |
| ret = self.remove_monitor(app_id, monitor_id) |
| monitor.update_monitor_id(None) |
| |
| if ret is None: |
| return False |
| |
| return True |
| |
| |
| @test_retry_and_log(False) |
| def test_setup_peer_devices(self): |
| """Test availability of the peer devices. |
| |
| @returns: True on success, False otherwise. |
| |
| """ |
| self.peer_keybd = None |
| self.peer_mouse = None |
| |
| for device_type, device_list in self.devices.items(): |
| for device in device_list: |
| if device_type is 'BLE_KEYBOARD': |
| self.peer_keybd = device |
| elif device_type is 'BLE_MOUSE': |
| self.peer_mouse = device |
| |
| if self.peer_keybd is not None and self.peer_mouse is not None: |
| self.test_stop_peer_device_adv(self.peer_keybd) |
| self.test_stop_peer_device_adv(self.peer_mouse) |
| |
| self.results = { |
| 'keybd': self.peer_keybd is not None, |
| 'mouse': self.peer_mouse is not None |
| } |
| return all(self.results.values()) |
| |
| |
| @test_retry_and_log(False) |
| def test_start_peer_device_adv(self, device, duration=0): |
| """Test enabling the peer device advertisements. |
| |
| @param device: the device object. |
| @param duration: the duration of the advertisement. |
| |
| @returns: True on success, False otherwise. |
| |
| """ |
| ret = self.test_device_set_discoverable(device, True) |
| |
| if duration: |
| time.sleep(duration) |
| |
| return ret |
| |
| |
| @test_retry_and_log(False) |
| def test_stop_peer_device_adv(self, device, duration=0): |
| """Test disabling the peer device advertisements. |
| |
| @param device: the device object. |
| @param duration: the duration of the advertisement disable. |
| |
| @returns: True on success, False otherwise. |
| |
| """ |
| ret = self.test_device_set_discoverable(device, False) |
| |
| if duration: |
| time.sleep(duration) |
| |
| return ret |
| |
| def check_records_interleaving(self, durations, records, expect_cycles): |
| """ Check the state of records is interleaving and also the duration is |
| as expected. |
| |
| @param durations: a dict of {'allowlist': allowlist_duration, |
| 'no filter': no_filter_duration} |
| @param records: a list of records |
| |
| @returns: a dict of {'Interleaved': record_state_is_interleaved, |
| 'Span within range': duration_is_expected} |
| |
| """ |
| |
| actual_cycle = len(records) / len(durations.keys()) |
| offset = self.INTERLEAVE_SCAN_CYCLE_NUM_TOLERANCE |
| expect_cycle_lowerbound = max(1, expect_cycles - offset) |
| expect_cycle_upperbound = expect_cycles + offset |
| enough_cycle_num = (actual_cycle >= expect_cycle_lowerbound |
| and actual_cycle <= expect_cycle_upperbound) |
| interleaved = True |
| span_within_range = True |
| expected_state = None |
| |
| def _next_state(state): |
| if state == 'allowlist': |
| return 'no filter' |
| elif state == 'no filter': |
| return 'allowlist' |
| else: |
| logging.warning('Unexpected state %s', state) |
| return None |
| |
| for i, record in enumerate(records): |
| state = record['state'] |
| nstate = _next_state(state) |
| |
| # We can't count span on single data point and expected_state |
| # hasn't set |
| if i != 0: |
| span = (record['time'] - records[i - 1]['time']) |
| |
| if state != expected_state: |
| interleaved = False |
| |
| if span < durations[nstate] -\ |
| self.INTERLEAVE_SCAN_DURATION_TOLERANCE: |
| span_within_range = False |
| |
| if span > durations[nstate] +\ |
| self.INTERLEAVE_SCAN_DURATION_TOLERANCE: |
| span_within_range = False |
| |
| expected_state = nstate |
| |
| return { |
| 'Enough cycle number': enough_cycle_num, |
| 'Interleaved': interleaved, |
| 'Span within range': span_within_range |
| } |
| |
| def check_records_paused(self, records, cancel_event, expect_paused_time, |
| expect_resume): |
| """ Check if the interleave scan is paused |
| |
| @param records: a list of records |
| @param cancel_event: the timestamp interleave was canceled |
| @param expect_paused_time: minimum duration of interleave scan paused |
| @param expect_resume: True if interleave scan should restart, |
| False if ***we don't care*** |
| |
| @returns: a dict of {'Cancel event': (bool), |
| 'Non-empty records before paused': (bool), |
| 'Non-empty records after paused': (bool), |
| 'Paused enough time': (bool) |
| } |
| Note: some entries might not exist if it doesn't make sense |
| in that case. |
| |
| """ |
| |
| result = {} |
| |
| result.update({'Cancel event': cancel_event is not None}) |
| if cancel_event is None: |
| return result |
| |
| canceled_time = cancel_event + self.INTERLEAVE_SCAN_CANCEL_TOLERANCE |
| |
| before_paused_rec = [r for r in records if r['time'] < canceled_time] |
| after_paused_rec = [r for r in records if r['time'] >= canceled_time] |
| |
| result.update({ |
| 'Non-empty records before paused': |
| len(before_paused_rec) != 0 |
| }) |
| |
| if expect_resume: |
| result.update({ |
| 'Non-empty records after paused': |
| len(after_paused_rec) != 0 |
| }) |
| |
| if len(before_paused_rec) > 0 and len(after_paused_rec) > 0: |
| # Records are stored chronologically. |
| last_time_before_paused = before_paused_rec[-1]['time'] |
| first_time_after_paused = after_paused_rec[0]['time'] |
| paused_time = first_time_after_paused - last_time_before_paused |
| result.update( |
| {'Paused enough time': paused_time >= expect_paused_time}) |
| |
| return result |
| |
| def get_interleave_scan_durations(self): |
| """ Get interleave scan duration. |
| |
| @returns: a dict of {'allowlist': allowlist_duration, |
| 'no filter': no_filter_duration} |
| |
| """ |
| |
| # TODO(b/171844106): get this parameters via |
| # MGMT_OP_READ_DEF_SYSTEM_CONFIG |
| durations = {'allowlist': 300, 'no filter': 500} |
| |
| # Change the unit from msec to second for future convenience. |
| durations = {key: value * 0.001 for key, value in durations.items()} |
| return durations |
| |
| @test_retry_and_log(False) |
| def test_interleaving_state(self, |
| expect_true, |
| cycles=INTERLEAVE_SCAN_TEST_CYCLE): |
| """ Test for checking if kernel is doing interleave scan or not. |
| |
| @params expect_true: True if kernel should be running interleave scan |
| False if kernel shouldn't. |
| @params cycles: number of cycles to collect logs |
| |
| @returns: True on success, False otherwise. |
| |
| """ |
| durations = self.get_interleave_scan_durations() |
| interleave_period = sum(durations.values()) |
| log_time = interleave_period * cycles |
| self.interleave_logger_start() |
| time.sleep(log_time) |
| self.interleave_logger_stop() |
| records = self.interleave_logger_get_records() |
| |
| logging.debug(records) |
| |
| if not expect_true: |
| self.results = {'No records': len(records) == 0} |
| else: |
| self.results = self.check_records_interleaving( |
| durations, records, cycles) |
| |
| return all(self.results.values()) |
| |
| @test_retry_and_log(False) |
| def test_interleaving_suspend_resume(self): |
| """ Test for checking if kernel paused interleave scan during system |
| suspended. |
| |
| @returns: True on success, False otherwise. |
| |
| """ |
| durations = self.get_interleave_scan_durations() |
| interleave_period = sum(durations.values()) |
| |
| # make sure suspend time is long enough to verify there is no |
| # interleaving during suspended |
| expect_suspend_time = max(self.SUSPEND_TIME_SECS, |
| 2 * interleave_period) |
| |
| # make sure we'll get some records before/after system suspended |
| extra_sleep_time = 2 * interleave_period |
| |
| self.interleave_logger_start() |
| time.sleep(extra_sleep_time) |
| self.suspend_resume(suspend_time=expect_suspend_time) |
| time.sleep(extra_sleep_time) |
| self.interleave_logger_stop() |
| records = self.interleave_logger_get_records() |
| cancel_event = self.interleave_logger_get_cancel_event() |
| |
| logging.debug(records) |
| logging.debug(cancel_event) |
| |
| # Currently resume time is not very reliable. It is likely the actual |
| # time in sleeping is less than expect_suspend_time. |
| # Check the interleave scan paused for at least one cycle long instead. |
| self.results = self.check_records_paused(records, cancel_event, |
| interleave_period, True) |
| return all(self.results.values()) |
| |
| @test_retry_and_log(False) |
| def test_interleaving_active_scan_cycle(self): |
| """ Test for checking if kernel paused interleave scan during active |
| scan. |
| |
| @returns: True on success, False otherwise. |
| |
| """ |
| durations = self.get_interleave_scan_durations() |
| interleave_period = sum(durations.values()) |
| |
| # make sure we'll get some records before/after active scan |
| extra_sleep_time = 2 * interleave_period |
| |
| self.interleave_logger_start() |
| time.sleep(extra_sleep_time) |
| self.test_start_discovery() |
| time.sleep(extra_sleep_time + self.INTERLEAVE_SCAN_CANCEL_TOLERANCE) |
| self.interleave_logger_stop() |
| records = self.interleave_logger_get_records() |
| cancel_event = self.interleave_logger_get_cancel_event() |
| |
| logging.debug(records) |
| logging.debug(cancel_event) |
| |
| # BlueZ pauses discovery for every DISCOVERY_DURATION then restarts it |
| # 5 seconds later. Interleave scan also get restarted during the paused |
| # time. |
| self.results = self.check_records_paused(records, cancel_event, |
| self.DISCOVERY_DURATION, |
| False) |
| self.test_stop_discovery() |
| return all(self.results.values()) |
| |
| def advmon_test_monitor_creation(self): |
| """Test case: MONITOR_CREATION |
| |
| Validate register/unregister app and create/remove monitor. |
| |
| """ |
| self.test_is_adv_monitoring_supported() |
| |
| # Create a test app instance. |
| app1 = self.create_app() |
| |
| monitor1 = TestMonitor(app1) |
| monitor1.update_type('or_patterns') |
| monitor1.update_rssi([-40, 5, -60, 5]) |
| monitor1.update_patterns([ |
| [0, 0x19, [0xc2, 0x03]], |
| ]) |
| |
| monitor2 = TestMonitor(app1) |
| monitor2.update_type('or_patterns') |
| monitor2.update_rssi([-40, 10, -60, 10]) |
| monitor2.update_patterns([ |
| [0, 0x03, [0x12, 0x18]], |
| ]) |
| |
| # Read supported types and features, should not fail. |
| self.test_supported_types() |
| self.test_supported_features() |
| |
| # Activate/Release should not get called. |
| self.test_add_monitor(monitor1, |
| expected_activate=False, |
| expected_release=False) |
| |
| # Register the app, should not fail. |
| self.test_register_app(app1) |
| |
| # Already registered app path, should fail with AlreadyExists. |
| self.test_register_app(app1, expected=False) |
| |
| # Activate should get called for the monitor added before register app. |
| self.test_monitor_activate(monitor1, expected=True) |
| |
| # Correct monitor parameters, activate should get called. |
| self.test_add_monitor(monitor2, expected_activate=True) |
| |
| # Remove a monitor, should not fail. |
| self.test_remove_monitor(monitor1) |
| |
| # Unregister the app, should not fail. |
| self.test_unregister_app(app1) |
| |
| # Already unregistered app path, should fail with DoesNotExists. |
| self.test_unregister_app(app1, expected=False) |
| |
| # Release should get called for a monitor not removed before unregister. |
| self.test_monitor_release(monitor2, expected=True) |
| |
| # Remove another monitor, should not fail. |
| self.test_remove_monitor(monitor2) |
| |
| # Terminate the test app instance. |
| self.test_exit_app(app1) |
| |
| |
| def advmon_test_monitor_validity(self): |
| """Test case: MONITOR_VALIDITY |
| |
| Validate monitor parameters - monitor type, patterns, RSSI filter |
| values. |
| |
| """ |
| self.test_is_adv_monitoring_supported() |
| |
| # Create a test app instance. |
| app1 = self.create_app() |
| |
| monitor1 = TestMonitor(app1) |
| monitor1.update_type('incorrect_pattern') |
| monitor1.update_rssi([-40, 5, -60, 5]) |
| monitor1.update_patterns([ |
| [0, 0x19, [0xc2, 0x03]], |
| ]) |
| |
| monitor2 = TestMonitor(app1) |
| monitor2.update_type('or_patterns') |
| monitor2.update_rssi([-40, 10, -60, 10]) |
| monitor2.update_patterns([ |
| [0, 0x03, [0x12, 0x18]], |
| ]) |
| |
| # Register the app, should not fail. |
| self.test_register_app(app1) |
| |
| # Incorrect monitor type, release should get called. |
| self.test_add_monitor(monitor1, expected_release=True) |
| |
| # Incorrect rssi parameters, release should get called. |
| monitor2.update_rssi([-40, 0, -60, 10]) |
| self.test_add_monitor(monitor2, expected_release=True) |
| |
| monitor2.update_rssi([-40, 10, -60, 0]) |
| self.test_add_monitor(monitor2, expected_release=True) |
| |
| monitor2.update_rssi([40, 10, -60, 10]) |
| self.test_add_monitor(monitor2, expected_release=True) |
| |
| monitor2.update_rssi([-140, 10, -60, 10]) |
| self.test_add_monitor(monitor2, expected_release=True) |
| |
| monitor2.update_rssi([-40, 10, 60, 10]) |
| self.test_add_monitor(monitor2, expected_release=True) |
| |
| monitor2.update_rssi([-40, 10, -160, 10]) |
| self.test_add_monitor(monitor2, expected_release=True) |
| |
| monitor2.update_rssi([-60, 10, -40, 10]) |
| self.test_add_monitor(monitor2, expected_release=True) |
| |
| # Unset the rssi filter parameters. |
| monitor2.update_rssi([127, 0, 127, 0]) |
| |
| # Incorrect pattern parameters, release should get called. |
| monitor2.update_patterns([ |
| [32, 0x09, 'MOUSE'], |
| ]) |
| self.test_add_monitor(monitor2, expected_release=True) |
| |
| monitor2.update_patterns([ |
| [0, 0x00, 'MOUSE'], |
| ]) |
| self.test_add_monitor(monitor2, expected_release=True) |
| |
| monitor2.update_patterns([ |
| [0, 0x40, 'MOUSE'], |
| ]) |
| self.test_add_monitor(monitor2, expected_release=True) |
| |
| monitor2.update_patterns([ |
| [0, 0x09, '0123456789ABCDEF0123456789ABCDEF0'], |
| ]) |
| self.test_add_monitor(monitor2, expected_release=True) |
| |
| monitor2.update_patterns([ |
| [32, 0x09, [0xc2, 0x03]], |
| [0, 3, [0x12, 0x18]], |
| ]) |
| self.test_add_monitor(monitor2, expected_release=True) |
| |
| monitor2.update_patterns([ |
| [0, 0x19, [0xc2, 0x03]], |
| [0, 0x00, [0x12, 0x18]], |
| ]) |
| self.test_add_monitor(monitor2, expected_release=True) |
| |
| # Correct pattern parameters, activate should get called. |
| monitor2.update_patterns([ |
| [0, 0x09, 'MOUSE'], |
| ]) |
| self.test_add_monitor(monitor2, expected_activate=True) |
| self.test_remove_monitor(monitor2) |
| |
| monitor2.update_rssi([-40, 10, -60, 10]) |
| monitor2.update_patterns([ |
| [0, 0x19, [0xc2, 0x03]], |
| [0, 0x03, [0x12, 0x18]], |
| ]) |
| self.test_add_monitor(monitor2, expected_activate=True) |
| self.test_remove_monitor(monitor2) |
| |
| # Unregister the app, should not fail. |
| self.test_unregister_app(app1) |
| |
| # Terminate the test app instance. |
| self.test_exit_app(app1) |
| |
| |
| def advmon_test_pattern_filter_only(self): |
| """Test case: PATTERN_FILTER_ONLY |
| |
| Verify matching of advertisements w.r.t. various pattern values and |
| different AD Data Types - Local Name Service UUID and Device Type. |
| Test working of patterns filter matching with multiple clients, |
| multiple monitors and suspend/resume, without RSSI filtering. |
| |
| """ |
| self.test_is_adv_monitoring_supported() |
| self.test_setup_peer_devices() |
| |
| # Create two test app instances. |
| app1 = self.create_app() |
| app2 = self.create_app() |
| |
| # Register both apps, should not fail. |
| self.test_register_app(app1) |
| self.test_register_app(app2) |
| |
| # Add monitors in both apps. |
| monitor1 = TestMonitor(app1) |
| monitor1.update_type('or_patterns') |
| monitor1.update_patterns([ |
| [5, 0x09, '_REF'], |
| ]) |
| monitor1.update_rssi([127, 0, 127, 0]) |
| |
| monitor2 = TestMonitor(app1) |
| monitor2.update_type('or_patterns') |
| monitor2.update_patterns([ |
| [0, 0x03, [0x12, 0x18]], |
| ]) |
| monitor2.update_rssi([127, 0, 127, 0]) |
| |
| monitor3 = TestMonitor(app2) |
| monitor3.update_type('or_patterns') |
| monitor3.update_patterns([ |
| [0, 0x19, [0xc1, 0x03]], |
| [0, 0x09, 'MOUSE'], |
| ]) |
| monitor3.update_rssi([127, 0, 127, 0]) |
| |
| monitor4 = TestMonitor(app2) |
| monitor4.update_type('or_patterns') |
| monitor4.update_patterns([ |
| [0, 0x19, [0xc1, 0x03]], |
| [0, 0x19, [0xc3, 0x03]], |
| ]) |
| monitor4.update_rssi([127, 0, 127, 0]) |
| |
| # Activate should get invoked. |
| self.test_add_monitor(monitor1, expected_activate=True) |
| self.test_add_monitor(monitor2, expected_activate=True) |
| self.test_add_monitor(monitor3, expected_activate=True) |
| self.test_add_monitor(monitor4, expected_activate=True) |
| |
| # DeviceFound for mouse should get triggered only for monitors |
| # matching the adv pattern filter. |
| self.test_start_peer_device_adv(self.peer_mouse, duration=5) |
| self.test_device_found(monitor1, count=self.MULTIPLE_EVENTS) |
| self.test_device_found(monitor2, count=self.MULTIPLE_EVENTS) |
| self.test_device_found(monitor3, count=self.MULTIPLE_EVENTS) |
| # Device type 0xc203 should not match. |
| self.test_device_found(monitor4, count=0) |
| self.test_stop_peer_device_adv(self.peer_mouse) |
| |
| # Initiate suspend/resume. |
| self.suspend_resume() |
| |
| # Remove a monitor from one app, shouldn't affect working of other |
| # monitors or apps. |
| self.test_remove_monitor(monitor1) |
| |
| # Reset event counts before next test. |
| self.test_reset_event_count(monitor2) |
| self.test_reset_event_count(monitor3) |
| |
| # DeviceFound for mouse should get triggered again for monitors |
| # matching the adv pattern filter. |
| self.test_start_peer_device_adv(self.peer_mouse, duration=5) |
| self.test_device_found(monitor2, count=self.MULTIPLE_EVENTS) |
| self.test_device_found(monitor3, count=self.MULTIPLE_EVENTS) |
| self.test_stop_peer_device_adv(self.peer_mouse) |
| |
| # Terminate an app, shouldn't affect working of monitors in other apps. |
| self.test_exit_app(app1) |
| |
| # Reset event counts before next test. |
| self.test_reset_event_count(monitor3) |
| |
| # DeviceFound should get triggered for keyboard. |
| self.test_start_peer_device_adv(self.peer_keybd, duration=5) |
| self.test_device_found(monitor3, count=self.MULTIPLE_EVENTS) |
| self.test_device_found(monitor4, count=self.MULTIPLE_EVENTS) |
| self.test_stop_peer_device_adv(self.peer_keybd) |
| |
| # Unregister the running app, should not fail. |
| self.test_unregister_app(app2) |
| |
| # Terminate the running test app instance. |
| self.test_exit_app(app2) |
| |
| |
| def advmon_test_pattern_filter_1(self): |
| """Test case: PATTERN_FILTER_1 |
| |
| Verify matching of advertisements w.r.t. various pattern values and |
| different AD Data Types - Local Name Service UUID and Device Type. |
| |
| """ |
| self.test_is_adv_monitoring_supported(require_rssi_filtering = True) |
| self.test_setup_peer_devices() |
| |
| # Create a test app instance. |
| app1 = self.create_app() |
| |
| monitor1 = TestMonitor(app1) |
| monitor1.update_type('or_patterns') |
| monitor1.update_rssi([-60, 3, -80, 3]) |
| |
| # Register the app, should not fail. |
| self.test_register_app(app1) |
| |
| monitor1.update_patterns([ |
| [5, 0x09, '_REF'], |
| ]) |
| self.test_add_monitor(monitor1, expected_activate=True) |
| |
| # Local name 'KEYBD_REF' should match. |
| self.test_start_peer_device_adv(self.peer_keybd, duration=5) |
| self.test_device_found(monitor1, count=1) |
| |
| # Local name 'MOUSE_REF' should match. |
| self.test_start_peer_device_adv(self.peer_mouse, duration=5) |
| self.test_device_found(monitor1, count=2) |
| |
| self.test_stop_peer_device_adv(self.peer_keybd) |
| self.test_stop_peer_device_adv(self.peer_mouse) |
| self.test_remove_monitor(monitor1) |
| |
| monitor1.update_patterns([ |
| [0, 0x03, [0x12, 0x18]], |
| ]) |
| self.test_add_monitor(monitor1, expected_activate=True) |
| |
| # Service UUID 0x1812 should match. |
| self.test_start_peer_device_adv(self.peer_keybd, duration=5) |
| self.test_device_found(monitor1, count=1) |
| |
| # Service UUID 0x1812 should match. |
| self.test_start_peer_device_adv(self.peer_mouse, duration=5) |
| self.test_device_found(monitor1, count=2) |
| |
| self.test_stop_peer_device_adv(self.peer_keybd) |
| self.test_stop_peer_device_adv(self.peer_mouse) |
| self.test_remove_monitor(monitor1) |
| |
| monitor1.update_patterns([ |
| [0, 0x19, [0xc1, 0x03]], |
| [0, 0x09, 'MOUSE'], |
| ]) |
| self.test_add_monitor(monitor1, expected_activate=True) |
| |
| # Device type 0xc103 should match. |
| self.test_start_peer_device_adv(self.peer_keybd, duration=5) |
| self.test_device_found(monitor1, count=1) |
| |
| # Local name 'MOUSE_REF' should match. |
| self.test_start_peer_device_adv(self.peer_mouse, duration=5) |
| self.test_device_found(monitor1, count=2) |
| |
| self.test_stop_peer_device_adv(self.peer_keybd) |
| self.test_stop_peer_device_adv(self.peer_mouse) |
| self.test_remove_monitor(monitor1) |
| |
| monitor1.update_patterns([ |
| [0, 0x19, [0xc1, 0x03]], |
| [0, 0x19, [0xc3, 0x03]], |
| ]) |
| self.test_add_monitor(monitor1, expected_activate=True) |
| |
| # Device type 0xc103 should match. |
| self.test_start_peer_device_adv(self.peer_keybd, duration=5) |
| self.test_device_found(monitor1, count=1) |
| |
| # Device type 0xc203 should not match. |
| self.test_start_peer_device_adv(self.peer_mouse, duration=5) |
| self.test_device_found(monitor1, count=1) |
| |
| self.test_stop_peer_device_adv(self.peer_keybd) |
| self.test_stop_peer_device_adv(self.peer_mouse) |
| self.test_remove_monitor(monitor1) |
| |
| # Unregister the app, should not fail. |
| self.test_unregister_app(app1) |
| |
| # Terminate the test app instance. |
| self.test_exit_app(app1) |
| |
| |
| def advmon_test_rssi_filter_1(self): |
| """Test case: RSSI_FILTER_1 |
| |
| Verify unset RSSI filter and filter with no matching RSSI values. |
| |
| """ |
| self.test_is_adv_monitoring_supported(require_rssi_filtering = True) |
| self.test_setup_peer_devices() |
| |
| # Create a test app instance. |
| app1 = self.create_app() |
| |
| monitor1 = TestMonitor(app1) |
| monitor1.update_type('or_patterns') |
| monitor1.update_patterns([ |
| [0, 0x03, [0x12, 0x18]], |
| ]) |
| |
| # Register the app, should not fail. |
| self.test_register_app(app1) |
| |
| monitor1.update_rssi([127, 0, 127, 0]) |
| self.test_add_monitor(monitor1, expected_activate=True) |
| |
| # Unset RSSI filter, adv should match multiple times. |
| self.test_start_peer_device_adv(self.peer_keybd, duration=5) |
| self.test_device_found(monitor1, count=self.MULTIPLE_EVENTS) |
| |
| # Unset RSSI filter, DeviceLost should not get triggered. |
| self.test_stop_peer_device_adv(self.peer_keybd, duration=5) |
| self.test_device_lost(monitor1, count=0) |
| |
| self.test_remove_monitor(monitor1) |
| |
| monitor1.update_rssi([-10, 5, -20, 5]) |
| self.test_add_monitor(monitor1, expected_activate=True) |
| |
| # Adv RSSI lower than RSSI filter, DeviceFound should not get triggered. |
| self.test_start_peer_device_adv(self.peer_keybd, duration=10) |
| self.test_device_found(monitor1, count=0) |
| |
| # No device was found earlier, so DeviceLost should not get triggered. |
| self.test_stop_peer_device_adv(self.peer_keybd, duration=10) |
| self.test_device_lost(monitor1, count=0) |
| |
| self.test_remove_monitor(monitor1) |
| |
| # Unregister the app, should not fail. |
| self.test_unregister_app(app1) |
| |
| # Terminate the test app instance. |
| self.test_exit_app(app1) |
| |
| |
| def advmon_test_rssi_filter_2(self): |
| """Test case: RSSI_FILTER_2 |
| |
| Verify RSSI filter matching with multiple peer devices. |
| |
| """ |
| self.test_is_adv_monitoring_supported(require_rssi_filtering = True) |
| self.test_setup_peer_devices() |
| |
| # Create a test app instance. |
| app1 = self.create_app() |
| |
| monitor1 = TestMonitor(app1) |
| monitor1.update_type('or_patterns') |
| monitor1.update_patterns([ |
| [0, 0x03, [0x12, 0x18]], |
| ]) |
| |
| # Register the app, should not fail. |
| self.test_register_app(app1) |
| |
| monitor1.update_rssi([-60, 3, -80, 3]) |
| self.test_add_monitor(monitor1, expected_activate=True) |
| |
| # DeviceFound should get triggered only once per device. |
| self.test_start_peer_device_adv(self.peer_keybd, duration=10) |
| self.test_device_found(monitor1, count=1) |
| |
| # DeviceFound should get triggered for another device. |
| self.test_start_peer_device_adv(self.peer_mouse, duration=10) |
| self.test_device_found(monitor1, count=2) |
| |
| # DeviceLost should get triggered only once per device. |
| self.test_stop_peer_device_adv(self.peer_keybd, duration=10) |
| self.test_device_lost(monitor1, count=1) |
| |
| # DeviceLost should get triggered for another device. |
| self.test_stop_peer_device_adv(self.peer_mouse, duration=10) |
| self.test_device_lost(monitor1, count=2) |
| |
| self.test_remove_monitor(monitor1) |
| |
| monitor1.update_rssi([-60, 10, -80, 10]) |
| self.test_add_monitor(monitor1, expected_activate=True) |
| |
| # Device was online for short period of time, so DeviceFound should |
| # not get triggered. |
| self.test_start_peer_device_adv(self.peer_keybd, duration=5) |
| self.test_device_found(monitor1, count=0) |
| |
| # Device did not come back online, DeviceFound should not get triggered. |
| # No device was found earlier, so DeviceLost should not get triggered. |
| self.test_stop_peer_device_adv(self.peer_keybd, duration=15) |
| self.test_device_found(monitor1, count=0) |
| self.test_device_lost(monitor1, count=0) |
| |
| self.test_remove_monitor(monitor1) |
| |
| # Unregister the app, should not fail. |
| self.test_unregister_app(app1) |
| |
| # Terminate the test app instance. |
| self.test_exit_app(app1) |
| |
| |
| def advmon_test_rssi_filter_3(self): |
| """Test case: RSSI_FILTER_3 |
| |
| Verify reset of RSSI timers based on advertisements. |
| |
| """ |
| self.test_is_adv_monitoring_supported(require_rssi_filtering = True) |
| self.test_setup_peer_devices() |
| |
| # Create a test app instance. |
| app1 = self.create_app() |
| |
| monitor1 = TestMonitor(app1) |
| monitor1.update_type('or_patterns') |
| monitor1.update_patterns([ |
| [0, 0x03, [0x12, 0x18]], |
| ]) |
| |
| # Register the app, should not fail. |
| self.test_register_app(app1) |
| |
| monitor1.update_rssi([-60, 10, -80, 10]) |
| self.test_add_monitor(monitor1, expected_activate=True) |
| |
| # DeviceFound should not get triggered before timeout. |
| self.test_start_peer_device_adv(self.peer_keybd, duration=5) |
| self.test_device_found(monitor1, count=0) |
| |
| # DeviceFound should not get triggered as device went offline. |
| # No device was found earlier, so DeviceLost should not get triggered. |
| self.test_stop_peer_device_adv(self.peer_keybd, duration=10) |
| self.test_device_found(monitor1, count=0) |
| self.test_device_lost(monitor1, count=0) |
| |
| # Timer should get reset, so DeviceFound should not get triggered. |
| self.test_start_peer_device_adv(self.peer_keybd, duration=5) |
| self.test_device_found(monitor1, count=0) |
| |
| # DeviceFound should get triggered once timer completes. |
| self.test_device_found(monitor1, count=1, delay=10) |
| |
| # DeviceLost should not get triggered before timeout. |
| self.test_stop_peer_device_adv(self.peer_keybd, duration=5) |
| self.test_device_lost(monitor1, count=0) |
| |
| # Timer should get reset, so DeviceLost should not get triggered. |
| # DeviceFound should not get triggered as device is not lost yet. |
| self.test_start_peer_device_adv(self.peer_keybd, duration=5) |
| self.test_device_lost(monitor1, count=0) |
| self.test_device_found(monitor1, count=1) |
| |
| # Timer should get reset, so DeviceLost should not get triggered. |
| self.test_stop_peer_device_adv(self.peer_keybd, duration=5) |
| self.test_device_lost(monitor1, count=0) |
| |
| # DeviceLost should get triggered once timer completes. |
| self.test_device_lost(monitor1, count=1, delay=10) |
| |
| self.test_remove_monitor(monitor1) |
| |
| # Unregister the app, should not fail. |
| self.test_unregister_app(app1) |
| |
| # Terminate the test app instance. |
| self.test_exit_app(app1) |
| |
| |
| def advmon_test_multi_client(self): |
| """Test case: MULTI_CLIENT |
| |
| Verify working of patterns filter and RSSI filters with multiple |
| clients and multiple monitors. |
| |
| """ |
| self.test_is_adv_monitoring_supported(require_rssi_filtering = True) |
| self.test_setup_peer_devices() |
| |
| # Create two test app instances. |
| app1 = self.create_app() |
| app2 = self.create_app() |
| |
| # Register both apps, should not fail. |
| self.test_register_app(app1) |
| self.test_register_app(app2) |
| |
| # Monitors with same pattern and RSSI filter values in both apps. |
| monitor1 = TestMonitor(app1) |
| monitor1.update_type('or_patterns') |
| monitor1.update_patterns([ |
| [0, 0x03, [0x12, 0x18]], |
| [0, 0x19, [0xc1, 0x03]], |
| ]) |
| monitor1.update_rssi([-60, 3, -80, 3]) |
| |
| monitor2 = TestMonitor(app2) |
| monitor2.update_type('or_patterns') |
| monitor2.update_patterns([ |
| [0, 0x03, [0x12, 0x18]], |
| [0, 0x19, [0xc1, 0x03]], |
| ]) |
| monitor2.update_rssi([-60, 3, -80, 3]) |
| |
| # Activate should get invoked. |
| self.test_add_monitor(monitor1, expected_activate=True) |
| self.test_add_monitor(monitor2, expected_activate=True) |
| |
| # DeviceFound should get triggered for keyboard. |
| self.test_start_peer_device_adv(self.peer_keybd, duration=5) |
| self.test_device_found(monitor1, count=1) |
| self.test_device_found(monitor2, count=1) |
| self.test_stop_peer_device_adv(self.peer_keybd) |
| |
| # Remove a monitor from one app. |
| self.test_remove_monitor(monitor1) |
| |
| # Monitors with same pattern but different RSSI filter values. |
| monitor3 = TestMonitor(app1) |
| monitor3.update_type('or_patterns') |
| monitor3.update_patterns([ |
| [0, 0x19, [0xc2, 0x03]], |
| ]) |
| monitor3.update_rssi([-60, 3, -80, 3]) |
| |
| monitor4 = TestMonitor(app2) |
| monitor4.update_type('or_patterns') |
| monitor4.update_patterns([ |
| [0, 0x19, [0xc2, 0x03]], |
| ]) |
| monitor4.update_rssi([-60, 10, -80, 10]) |
| |
| # Activate should get invoked. |
| self.test_add_monitor(monitor3, expected_activate=True) |
| self.test_add_monitor(monitor4, expected_activate=True) |
| |
| # DeviceFound should get triggered for mouse. |
| self.test_start_peer_device_adv(self.peer_mouse, duration=5) |
| self.test_device_found(monitor2, count=2) |
| self.test_device_found(monitor3, count=1) |
| |
| # Since the RSSI timeouts are different for monitor4, DeviceFound |
| # event should get triggered after total of 10 seconds. |
| self.test_device_found(monitor4, count=0) |
| self.test_device_found(monitor4, count=1, delay=5) |
| self.test_stop_peer_device_adv(self.peer_mouse) |
| |
| # Unregister both apps, should not fail. |
| self.test_unregister_app(app1) |
| self.test_unregister_app(app2) |
| |
| # Terminate the both test app instances. |
| self.test_exit_app(app1) |
| self.test_exit_app(app2) |
| |
| |
| def advmon_test_fg_bg_combination(self): |
| """Test case: FG_BG_COMBINATION |
| |
| Verify background scanning and foreground scanning do not interfere |
| working of each other. |
| |
| """ |
| self.test_is_adv_monitoring_supported() |
| self.test_setup_peer_devices() |
| |
| # Create a test app instance. |
| app1 = self.create_app() |
| |
| monitor1 = TestMonitor(app1) |
| monitor1.update_type('or_patterns') |
| monitor1.update_patterns([ |
| [0, 0x03, [0x12, 0x18]], |
| ]) |
| monitor1.update_rssi([127, 0, 127, 0]) |
| |
| # Register the app, should not fail. |
| self.test_register_app(app1) |
| |
| # Activate should get invoked. |
| self.test_add_monitor(monitor1, expected_activate=True) |
| |
| # Pair/connect LE Mouse. |
| self.test_start_peer_device_adv(self.peer_mouse, duration=5) |
| time.sleep(self.PAIR_TEST_SLEEP_SECS) |
| self.test_discover_device(self.peer_mouse.address) |
| time.sleep(self.PAIR_TEST_SLEEP_SECS) |
| self.test_pairing(self.peer_mouse.address, self.peer_mouse.pin) |
| time.sleep(self.PAIR_TEST_SLEEP_SECS) |
| self.test_connection_by_adapter(self.peer_mouse.address) |
| self.test_connection_by_device(self.peer_mouse) |
| |
| # DeviceFound should get triggered for keyboard. |
| self.test_reset_event_count(monitor1) |
| self.test_start_peer_device_adv(self.peer_keybd, duration=5) |
| self.test_device_found(monitor1, count=self.MULTIPLE_EVENTS) |
| self.test_stop_peer_device_adv(self.peer_keybd) |
| |
| # Start foreground scanning. |
| self.test_start_discovery() |
| |
| # Disconnect LE mouse. |
| self.test_disconnection_by_device(self.peer_mouse) |
| |
| # Remove the monitor. |
| self.test_remove_monitor(monitor1) |
| |
| # Activate should get invoked. |
| self.test_add_monitor(monitor1, expected_activate=True) |
| |
| # Connect LE mouse. |
| self.test_connection_by_device(self.peer_mouse) |
| |
| # DeviceFound should get triggered for keyboard. |
| self.test_reset_event_count(monitor1) |
| self.test_start_peer_device_adv(self.peer_keybd, duration=5) |
| self.test_device_found(monitor1, count=self.MULTIPLE_EVENTS) |
| self.test_stop_peer_device_adv(self.peer_keybd) |
| |
| # Stop foreground scanning. |
| self.test_stop_discovery() |
| |
| # Disconnect LE mouse. |
| self.test_disconnection_by_device(self.peer_mouse) |
| |
| # DeviceFound should get triggered for keyboard. |
| self.test_reset_event_count(monitor1) |
| self.test_start_peer_device_adv(self.peer_keybd, duration=5) |
| self.test_device_found(monitor1, count=self.MULTIPLE_EVENTS) |
| self.test_stop_peer_device_adv(self.peer_keybd) |
| |
| # Remove the monitor. |
| self.test_remove_monitor(monitor1) |
| |
| # Connect LE mouse. |
| self.test_connection_by_device(self.peer_mouse) |
| |
| # Unregister the app, should not fail. |
| self.test_unregister_app(app1) |
| |
| # Terminate the test app instance. |
| self.test_exit_app(app1) |
| |
| |
| def advmon_test_suspend_resume(self): |
| """Test case: SUSPEND_RESUME |
| |
| Verify working of background scanning with suspend/resume. |
| |
| """ |
| self.test_is_adv_monitoring_supported(require_rssi_filtering = True) |
| self.test_setup_peer_devices() |
| |
| # Create two test app instances. |
| app1 = self.create_app() |
| app2 = self.create_app() |
| |
| # Register both apps, should not fail. |
| self.test_register_app(app1) |
| self.test_register_app(app2) |
| |
| # Add monitors in both apps. |
| monitor1 = TestMonitor(app1) |
| monitor1.update_type('or_patterns') |
| monitor1.update_patterns([ [0, 0x03, [0x12, 0x18]], ]) |
| monitor1.update_rssi([-60, 3, -80, 3]) |
| |
| monitor2 = TestMonitor(app1) |
| monitor2.update_type('or_patterns') |
| monitor2.update_patterns([ [0, 0x19, [0xc2, 0x03]], ]) |
| monitor2.update_rssi([-60, 10, -80, 10]) |
| |
| monitor3 = TestMonitor(app2) |
| monitor3.update_type('or_patterns') |
| monitor3.update_patterns([ [0, 0x03, [0x12, 0x18]], ]) |
| monitor3.update_rssi([-60, 3, -80, 3]) |
| |
| monitor4 = TestMonitor(app2) |
| monitor4.update_type('or_patterns') |
| monitor4.update_patterns([ [0, 0x19, [0xc2, 0x03]], ]) |
| monitor4.update_rssi([-60, 15, -80, 15]) |
| |
| # Activate should get invoked. |
| self.test_add_monitor(monitor1, expected_activate=True) |
| self.test_add_monitor(monitor2, expected_activate=True) |
| self.test_add_monitor(monitor3, expected_activate=True) |
| self.test_add_monitor(monitor4, expected_activate=True) |
| |
| # DeviceFound for mouse should get triggered only for monitors |
| # satisfying the RSSI timers. |
| self.test_start_peer_device_adv(self.peer_mouse, duration=5) |
| self.test_device_found(monitor1, count=1) |
| self.test_device_found(monitor2, count=0) |
| self.test_device_found(monitor3, count=1) |
| self.test_device_found(monitor4, count=0) |
| |
| # Initiate suspend/resume. |
| self.suspend_resume() |
| |
| # Remove a monitor from one app, shouldn't affect working of other |
| # monitors or apps. |
| self.test_remove_monitor(monitor1) |
| |
| # DeviceFound should get triggered for monitors with higher RSSI timers. |
| self.test_device_found(monitor2, count=1, delay=10) |
| self.test_device_found(monitor4, count=1, delay=5) |
| self.test_stop_peer_device_adv(self.peer_mouse) |
| |
| # Terminate an app, shouldn't affect working of monitors in other apps. |
| self.test_exit_app(app1) |
| |
| # DeviceFound should get triggered for keyboard. |
| self.test_start_peer_device_adv(self.peer_keybd, duration=5) |
| self.test_device_found(monitor3, count=2) |
| self.test_stop_peer_device_adv(self.peer_keybd) |
| |
| # Unregister the running app, should not fail. |
| self.test_unregister_app(app2) |
| |
| # Terminate the running test app instance. |
| self.test_exit_app(app2) |
| |
| |
| def advmon_test_interleaved_scan(self): |
| """ Test cases for verifying interleave scan """ |
| |
| self.test_is_adv_monitoring_supported() |
| |
| # cycles to collect logs for tests expect no interleave scan |
| EXPECT_FALSE_TEST_CYCLE = 3 |
| |
| # Create a test app instance. |
| app1 = self.create_app() |
| |
| monitor1 = TestMonitor(app1) |
| monitor1.update_type('or_patterns') |
| monitor1.update_patterns([ |
| [0, 0x03, [0x12, 0x18]], |
| ]) |
| monitor1.update_rssi([127, 0, 127, 0]) |
| |
| # Register the app, should not fail. |
| self.test_register_app(app1) |
| |
| # Activate should get invoked. |
| self.test_add_monitor(monitor1, expected_activate=True) |
| |
| # No device in allowlist, interleave with idle |
| self.test_interleaving_state(False, cycles=EXPECT_FALSE_TEST_CYCLE) |
| |
| # No device in allowlist, interleave with idle, interrupted by active |
| # scan |
| self.test_start_discovery() |
| self.test_interleaving_state(False, cycles=EXPECT_FALSE_TEST_CYCLE) |
| self.test_stop_discovery() |
| self.test_interleaving_state(False, cycles=EXPECT_FALSE_TEST_CYCLE) |
| |
| # No device in allowlist, interleave with idle, interrupted by suspend |
| # resume |
| self.suspend_resume() |
| self.test_interleaving_state(False, cycles=EXPECT_FALSE_TEST_CYCLE) |
| |
| # Pair/connect LE Mouse. |
| device = self.devices['BLE_MOUSE'][0] |
| self.test_discover_device(device.address) |
| time.sleep(self.PAIR_TEST_SLEEP_SECS) |
| self.test_pairing(device.address, device.pin, trusted=True) |
| time.sleep(self.PAIR_TEST_SLEEP_SECS) |
| |
| # BLE_MOUSE in allowlist, interleave with allowlist passive scan |
| self.test_interleaving_state(False, cycles=EXPECT_FALSE_TEST_CYCLE) |
| device.AdapterPowerOff() |
| # Make sure the peer is disconnected |
| self.test_device_is_not_connected(device.address) |
| self.test_interleaving_state(True) |
| |
| # Interleaving with allowlist should get paused during active scan |
| self.test_interleaving_active_scan_cycle() |
| |
| # Interleaving with allowlist should get resumed after stopping scan |
| self.test_interleaving_state(True) |
| |
| # Interleaving with allowlist should get paused during system suspend, |
| # get resumed after system awake |
| self.test_interleaving_suspend_resume() |
| self.test_interleaving_state(True) |
| |
| self.test_remove_monitor(monitor1) |
| self.test_interleaving_state(False, cycles=EXPECT_FALSE_TEST_CYCLE) |
| |
| # Unregister the app, should not fail. |
| self.test_unregister_app(app1) |
| |
| # Terminate the test app instance. |
| self.test_exit_app(app1) |