bluetooth: Add Advertisement Monitor Interleave Scan Tests

This adds tests to verify interleave scan in kernel is working expectly.

BUG=b:163702298
TEST=run bluetooth_AdapterAdvMonitor locally

Change-Id: I779593f746b8f17d05213ab8d157e52d9cfdd808
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/third_party/autotest/+/2490081
Tested-by: Yun-Hao Chung <howardchung@google.com>
Commit-Queue: Yun-Hao Chung <howardchung@google.com>
Reviewed-by: Manish Mandlik <mmandlik@chromium.org>
diff --git a/server/cros/bluetooth/bluetooth_adapter_adv_monitor_tests.py b/server/cros/bluetooth/bluetooth_adapter_adv_monitor_tests.py
index 592762b..6539498 100644
--- a/server/cros/bluetooth/bluetooth_adapter_adv_monitor_tests.py
+++ b/server/cros/bluetooth/bluetooth_adapter_adv_monitor_tests.py
@@ -135,6 +135,15 @@
     # DeviceFound/DeviceLost events are expected to occur.
     MULTIPLE_EVENTS = -1
 
+    # Number of cycle to observe during a test
+    INTERLEAVE_SCAN_TEST_CYCLE = 10
+    # Acceptable extra delay of interleave scan duration, in sec
+    INTERLEAVE_SCAN_DURATION_TOLERANCE = 0.1
+    # Acceptable delay of cancelling interleave scan, in sec
+    INTERLEAVE_SCAN_CANCEL_TOLERANCE = 2
+    # Acceptable extra/missing cycles in interleave scan
+    INTERLEAVE_SCAN_CYCLE_NUM_TOLERANCE = 2
+
     test_case_log = bluetooth_adapter_tests.test_case_log
     test_retry_and_log = bluetooth_adapter_tests.test_retry_and_log
 
@@ -265,6 +274,49 @@
                                                               event)
 
 
+    def interleave_logger_start(self):
+        """ Start interleave logger recording
+        """
+        self.bluetooth_facade.advmon_interleave_scan_logger_start()
+
+    def interleave_logger_stop(self):
+        """ Stop interleave logger recording
+
+        @returns: True if logs were successfully collected,
+                  False otherwise.
+
+        """
+        return self.bluetooth_facade.advmon_interleave_scan_logger_stop()
+
+    def interleave_logger_get_records(self):
+        """ Get records in previous log collections
+
+        @returns: a list of records, where each item is a record of
+                  interleave |state| and the |time| the state starts.
+                  |state| could be {'no filter', 'allowlist'}
+                  |time| is kernel time in sec
+
+        """
+        return self.bluetooth_facade.\
+                                advmon_interleave_scan_logger_get_records()
+
+    def interleave_logger_get_cancel_event(self):
+        """ Get cancel event in previous log collections
+
+        @returns: the first cancel event in the collections,
+                  None if no cancel event was found
+
+        """
+        events = self.bluetooth_facade.\
+                            advmon_interleave_scan_logger_get_cancel_events()
+        if len(events) == 0:
+            event = None
+        else:
+            event = events[0]
+            if len(events) > 1:
+                logging.warning('More than one cancel events found %s', events)
+        return event
+
     @test_retry_and_log(False)
     def test_supported_types(self):
         """Test supported monitor types.
@@ -611,6 +663,176 @@
 
         return ret
 
+    def check_records_interleaving(self, durations, records, expect_cycles):
+        """ Check the state of records is interleaving and also the duration is
+            as expected.
+
+        @param durations: a dict of {'allowlist': allowlist_duration,
+                                     'no filter': no_filter_duration}
+        @param records: a list of records
+
+        @returns: a dict of {'Interleaved': record_state_is_interleaved,
+                             'Span within range': duration_is_expected}
+
+        """
+
+        actual_cycle = len(records) / len(durations.keys())
+        offset = self.INTERLEAVE_SCAN_CYCLE_NUM_TOLERANCE
+        expect_cycle_lowerbound = max(1, expect_cycles - offset)
+        expect_cycle_upperbound = expect_cycles + offset
+        enough_cycle_num = (actual_cycle >= expect_cycle_lowerbound
+                            and actual_cycle <= expect_cycle_upperbound)
+        interleaved = True
+        span_within_range = True
+        expected_state = None
+
+        def _next_state(state):
+            if state == 'allowlist':
+                return 'no filter'
+            elif state == 'no filter':
+                return 'allowlist'
+            else:
+                logging.warning('Unexpected state %s', state)
+                return None
+
+        for i, record in enumerate(records):
+            state = record['state']
+            nstate = _next_state(state)
+
+            # We can't count span on single data point and expected_state
+            # hasn't set
+            if i != 0:
+                span = (record['time'] - records[i - 1]['time'])
+
+                if state != expected_state:
+                    interleaved = False
+
+                if span < durations[nstate] -\
+                                        self.INTERLEAVE_SCAN_DURATION_TOLERANCE:
+                    span_within_range = False
+
+                if span > durations[nstate] +\
+                                        self.INTERLEAVE_SCAN_DURATION_TOLERANCE:
+                    span_within_range = False
+
+            expected_state = nstate
+
+        return {
+                'Enough cycle number': enough_cycle_num,
+                'Interleaved': interleaved,
+                'Span within range': span_within_range
+        }
+
+    def get_interleave_scan_durations(self):
+        """ Get interleave scan duration.
+
+        @returns: a dict of {'allowlist': allowlist_duration,
+                             'no filter': no_filter_duration}
+
+        """
+
+        # TODO(b/171844106): get this parameters via
+        #                    MGMT_OP_READ_DEF_SYSTEM_CONFIG
+        durations = {'allowlist': 300, 'no filter': 500}
+
+        # Change the unit from msec to second for future convenience.
+        durations = {key: value * 0.001 for key, value in durations.items()}
+        return durations
+
+    @test_retry_and_log(False)
+    def test_interleaving_state(self,
+                                expect_true,
+                                cycles=INTERLEAVE_SCAN_TEST_CYCLE):
+        """ Test for checking if kernel is doing interleave scan or not.
+
+        @params expect_true: True if kernel should be running interleave scan
+                             False if kernel shouldn't.
+        @params cycles: number of cycles to collect logs
+
+        @returns: True on success, False otherwise.
+
+        """
+        durations = self.get_interleave_scan_durations()
+        interleave_period = sum(durations.values())
+        log_time = interleave_period * cycles
+        self.interleave_logger_start()
+        time.sleep(log_time)
+        self.interleave_logger_stop()
+        records = self.interleave_logger_get_records()
+
+        logging.debug(records)
+
+        if not expect_true:
+            self.results = {'No records': len(records) == 0}
+        else:
+            self.results = self.check_records_interleaving(
+                    durations, records, cycles)
+
+        return all(self.results.values())
+
+    @test_retry_and_log(False)
+    def test_interleaving_suspend_resume(self):
+        """ Test for checking if kernel paused interleave scan during system
+            suspended.
+
+        @returns: True on success, False otherwise.
+
+        """
+        durations = self.get_interleave_scan_durations()
+        interleave_period = sum(durations.values())
+
+        # make sure suspend time is long enough to verify there is no
+        # interleaving during suspended
+        expect_suspend_time = max(self.SUSPEND_TIME_SECS,
+                                  2 * interleave_period)
+
+        # make sure we'll get some records before/after system suspended
+        extra_sleep_time = 2 * interleave_period
+
+        self.interleave_logger_start()
+        time.sleep(extra_sleep_time)
+        self.suspend_resume(suspend_time=expect_suspend_time)
+        time.sleep(extra_sleep_time)
+        self.interleave_logger_stop()
+        records = self.interleave_logger_get_records()
+        cancel_event = self.interleave_logger_get_cancel_event()
+
+        logging.debug(records)
+        logging.debug(cancel_event)
+
+        self.results = {}
+
+        if cancel_event is None:
+            self.results = {'Cancel event': cancel_event}
+            return False
+
+        canceled_time = cancel_event + self.INTERLEAVE_SCAN_CANCEL_TOLERANCE
+
+        suspend_records = [r for r in records if r['time'] < canceled_time]
+        resume_records = [r for r in records if r['time'] >= canceled_time]
+
+        if len(suspend_records) == 0:
+            self.results = {'non-empty records before suspended': False}
+            return False
+
+        if len(resume_records) == 0:
+            self.results = {'non-empty records after resumed': False}
+            return False
+
+        # Records are stored chronologically.
+        # The first record is the earliest, while the last one is the oldest.
+        last_suspend = suspend_records[-1]['time']
+        first_resume = resume_records[0]['time']
+        suspend_time = first_resume - last_suspend
+
+        # Currently resume time is not very reliable. It is likely the actual
+        # time in sleeping is less than expect_suspend_time.
+        # Check the interleave scan paused for at least one cycle long instead.
+        self.results = {
+                'Paused enough time': suspend_time >= interleave_period
+        }
+
+        return all(self.results.values())
 
     def advmon_test_monitor_creation(self):
         """Test case: MONITOR_CREATION
@@ -1149,3 +1371,76 @@
 
         # Terminate the test app instance.
         self.test_exit_app(app1)
+
+    def advmon_test_interleaved_scan(self):
+        """ Test cases for verifying interleave scan """
+
+        # cycles to collect logs for tests expect no interleave scan
+        EXPECT_FALSE_TEST_CYCLE = 3
+
+        # Create a test app instance.
+        app1 = self.create_app()
+
+        monitor1 = TestMonitor(app1)
+        monitor1.update_type('or_patterns')
+        monitor1.update_patterns([
+                [0, 0x03, [0x12, 0x18]],
+        ])
+        monitor1.update_rssi([127, 0, 127, 0])
+
+        # Register the app, should not fail.
+        self.test_register_app(app1)
+
+        # Activate should get invoked.
+        self.test_add_monitor(monitor1, expected_activate=True)
+
+        # No device in allowlist, interleave with idle
+        self.test_interleaving_state(False, cycles=EXPECT_FALSE_TEST_CYCLE)
+
+        # No device in allowlist, interleave with idle, interrupted by active
+        # scan
+        self.test_start_discovery()
+        self.test_interleaving_state(False, cycles=EXPECT_FALSE_TEST_CYCLE)
+        self.test_stop_discovery()
+        self.test_interleaving_state(False, cycles=EXPECT_FALSE_TEST_CYCLE)
+
+        # No device in allowlist, interleave with idle, interrupted by suspend
+        # resume
+        self.suspend_resume()
+        self.test_interleaving_state(False, cycles=EXPECT_FALSE_TEST_CYCLE)
+
+        # Pair/connect LE Mouse.
+        device = self.devices['BLE_MOUSE'][0]
+        self.test_discover_device(device.address)
+        time.sleep(self.PAIR_TEST_SLEEP_SECS)
+        self.test_pairing(device.address, device.pin, trusted=True)
+        time.sleep(self.PAIR_TEST_SLEEP_SECS)
+
+        # BLE_MOUSE in allowlist, interleave with allowlist passive scan
+        self.test_interleaving_state(False, cycles=EXPECT_FALSE_TEST_CYCLE)
+        device.TurnOff()
+        # Make sure the peer is disconnected
+        self.test_device_is_not_connected(device.address)
+        self.test_interleaving_state(True)
+
+        # Interleaving with allowlist should get paused during active scan
+        self.test_start_discovery()
+        self.test_interleaving_state(False, cycles=EXPECT_FALSE_TEST_CYCLE)
+
+        # Interleaving with allowlist should get resumed after stopping scan
+        self.test_stop_discovery()
+        self.test_interleaving_state(True)
+
+        # Interleaving with allowlist should get paused during system suspend,
+        # get resumed after system awake
+        self.test_interleaving_suspend_resume()
+        self.test_interleaving_state(True)
+
+        self.test_remove_monitor(monitor1)
+        self.test_interleaving_state(False, cycles=EXPECT_FALSE_TEST_CYCLE)
+
+        # Unregister the app, should not fail.
+        self.test_unregister_app(app1)
+
+        # Terminate the test app instance.
+        self.test_exit_app(app1)
diff --git a/server/site_tests/bluetooth_AdapterAdvMonitor/bluetooth_AdapterAdvMonitor.py b/server/site_tests/bluetooth_AdapterAdvMonitor/bluetooth_AdapterAdvMonitor.py
index f4e9a20..892bda9 100644
--- a/server/site_tests/bluetooth_AdapterAdvMonitor/bluetooth_AdapterAdvMonitor.py
+++ b/server/site_tests/bluetooth_AdapterAdvMonitor/bluetooth_AdapterAdvMonitor.py
@@ -50,6 +50,11 @@
         self.advmon_test_fg_bg_combination()
 
 
+    @test_wrapper('Interleave Scan Tests', devices={'BLE_MOUSE': 1})
+    def advmon_interleaved_scan(self):
+        """Tests interleave scan."""
+        self.advmon_test_interleaved_scan()
+
     @batch_wrapper('Advertisement Monitor API')
     def advmon_batch_run(self, num_iterations=1, test_name=None):
         """Run the Advertisement Monitor test batch or a specific given test.
@@ -67,7 +72,7 @@
         self.advmon_monitor_health_tests()
         self.advmon_single_client_tests()
         self.advmon_fg_bg_combination_tests()
-
+        self.advmon_interleaved_scan()
 
     def run_once(self,
                  host,