| #!/usr/bin/env python |
| |
| # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import task_loop |
| |
| import glib |
| import logging |
| import mox |
| import time |
| import unittest |
| |
| class TaskLoopTestCase(unittest.TestCase): |
| """ |
| Test fixture for TaskLoop class. |
| |
| These unit-tests have a trade-off between speed and stability. The whole |
| suite takes about half a minute to run, and probably could be speeded up a |
| little bit. But, making the numbers too small might make the tests flaky. |
| """ |
| |
| def setUp(self): |
| self._mox = mox.Mox() |
| self._callback_mocker = self._mox.CreateMock(TaskLoopTestCase) |
| self._task_loop = task_loop.get_instance() |
| |
| # ########################################################################## |
| # Tests |
| |
| def test_post_task_simple(self): |
| """Post a simple task and expect it gets dispatched.""" |
| self._callback_mocker._callback() |
| |
| self._mox.ReplayAll() |
| self._task_loop.post_task(self._callback_mocker._callback) |
| self._run_task_loop(2) |
| self._mox.VerifyAll() |
| |
| |
| def test_post_task_set_attribute(self): |
| """Post a task that accesses an attribute from the context object.""" |
| self.flag = False |
| self._task_loop.post_task(self._callback_set_attribute) |
| self._run_task_loop(2) |
| self.assertTrue(self.flag) |
| |
| |
| def test_post_task_with_argument(self): |
| """Post task with some argument.""" |
| arg = True |
| self._callback_mocker._callback_with_arguments(arg) |
| |
| self._mox.ReplayAll() |
| self._task_loop.post_task( |
| self._callback_mocker._callback_with_arguments, arg) |
| self._run_task_loop(2) |
| self._mox.VerifyAll() |
| |
| |
| def test_post_task_after_delay(self): |
| """Post a task with some delay and check that the delay is respected.""" |
| start_time = time.time() |
| self.time = start_time |
| self._task_loop.post_task_after_delay(self._callback_set_time, 3000) |
| self._run_task_loop(5) |
| delayed_time = self.time - start_time |
| self.assertGreaterEqual(delayed_time, 3) |
| |
| |
| def test_post_repeated_task(self): |
| """Post a repeated task and check it gets dispatched multiple times.""" |
| self.count = 0 |
| self._task_loop.post_repeated_task(self._callback_increment_count, 1000) |
| self._run_task_loop(5) |
| self.assertGreaterEqual(self.count, 3) |
| |
| |
| def test_ignore_delays(self): |
| """Post a task and test ignore_delays mode.""" |
| self._task_loop.ignore_delays = False |
| |
| self._task_loop.post_task_after_delay(self._callback_mocker._callback, |
| 10000) |
| # Not enough time to dispatch the posted task |
| self._run_task_loop(1) |
| self._mox.VerifyAll() |
| |
| |
| def test_cancel_posted_task(self): |
| """Test that a cancelled task is not dispatched.""" |
| post_id = self._task_loop.post_task_after_delay( |
| self._callback_mocker._callback, |
| 2000) |
| self._task_loop.post_task(self._callback_cancel_task, post_id) |
| self._run_task_loop(3) |
| self._mox.VerifyAll() |
| |
| |
| def test_multiple_cancels(self): |
| """Test that successive cancels after a successful cancel fail.""" |
| post_id = self._task_loop.post_task_after_delay( |
| self._callback_mocker._callback, |
| 2000) |
| self._task_loop.post_task(self._callback_cancel_task, post_id) |
| self._task_loop.post_task(self._callback_cancel_cancelled_task, post_id) |
| self._run_task_loop(3) |
| self._mox.VerifyAll() |
| |
| |
| def test_random_delays(self): |
| """Test that random delays works (sort of). This test could be flaky.""" |
| # Warning: This test could be flaky. Add more differences? |
| self.count = 0 |
| self.times = {} |
| self._task_loop.random_delays = True |
| self._task_loop.max_random_delay_ms = 1000 |
| self._task_loop.post_repeated_task(self._callback_record_times, 500) |
| self._run_task_loop(5) |
| self.assertGreaterEqual(self.count, 4) |
| # Test that not all time gaps are almost the same |
| diff1 = round(self.times[1] - self.times[0], 3) |
| diff2 = round(self.times[2] - self.times[1], 3) |
| diff3 = round(self.times[3] - self.times[2], 3) |
| self.assertTrue(diff1 != diff2 or diff2 != diff3 or diff3 != diff1) |
| |
| # ########################################################################## |
| # Helper functions |
| |
| def _stop_task_loop(self): |
| print('Stopping task_loop.') |
| self._task_loop.stop() |
| |
| def _run_task_loop(self, run_for_seconds): |
| """ |
| Runs the task loop for |run_for_seconds| seconds. This function is |
| blocking, so the main thread will return only after |run_for_seconds|. |
| """ |
| # post a task to stop the task loop. |
| glib.timeout_add(run_for_seconds*1000, self._stop_task_loop) |
| self._task_loop.start() |
| # We will continue only when the stop task has been executed. |
| |
| # ########################################################################## |
| # Callbacks for tests |
| |
| def _callback(self): |
| print('Actual TaskLoopTestCase._callback called!') |
| |
| |
| def _callback_set_attribute(self): |
| self.flag = True |
| |
| |
| def _callback_set_time(self): |
| self.time = time.time() |
| |
| |
| def _callback_increment_count(self): |
| self.count = self.count + 1 |
| |
| |
| def _callback_record_times(self): |
| self.times[self.count] = time.time() |
| self.count = self.count + 1 |
| |
| |
| def _callback_with_arguments(self, arg): |
| pass |
| |
| |
| def _callback_cancel_task(self, post_id): |
| self._task_loop.cancel_posted_task(post_id) |
| |
| |
| def _callback_cancel_cancelled_task(self, post_id): |
| self.assertFalse(self._task_loop.cancel_posted_task(post_id)) |
| |
| |
| if __name__ == '__main__': |
| logging.basicConfig(level=logging.DEBUG) |
| unittest.main() |