| #!/usr/bin/env python |
| |
| # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import glib |
| import logging |
| import random |
| |
| DEFAULT_MAX_RANDOM_DELAY_MS = 10000 |
| |
| _instance = None |
| |
| def get_instance(): |
| """ |
| Return the singleton instance of the TaskLoop class. |
| |
| """ |
| global _instance |
| if _instance is None: |
| _instance = TaskLoop() |
| return _instance |
| |
| |
| class TaskLoop(object): |
| """ |
| The context to place asynchronous calls. |
| |
| This is a wrapper around the GLIB mainloop interface, exposing methods to |
| place (delayed) asynchronous calls. In addition to wrapping around the GLIB |
| API, this provides switches to control how delays are incorporated in method |
| calls globally. |
| |
| This class is meant to be a singleton. |
| Do not create an instance directly, use the module level function |
| get_instance() instead. |
| |
| Running the TaskLoop is blocking for the caller. So use this class like so: |
| |
| tl = task_loop.get_instance() |
| # Setup other things. |
| # Add initial tasks to tl to do stuff, post more tasks, and make the world a |
| # better place. |
| tl.start() |
| # This thread is now blocked. Some task should eventually call tl.stop() to |
| continue here. |
| |
| @var ignore_delays: Flag to control if delayed tasks are posted immediately. |
| |
| @var random_delays: Flag to control if arbitrary delays are inserted between |
| posted tasks. |
| |
| @var max_random_delay_ms: When random_delays is True, the maximum delay |
| inserted between posted tasks. |
| |
| """ |
| |
| |
| def __init__(self): |
| self._logger = logging.getLogger(__name__) |
| |
| # Initialize properties |
| self._ignore_delays = False |
| self._random_delays = False |
| self._max_random_delay_ms = DEFAULT_MAX_RANDOM_DELAY_MS |
| |
| # Get the mainloop so that tasks can be posted even before running the |
| # task loop. |
| self._mainloop = glib.MainLoop() |
| |
| # Initialize dictionary to track posted tasks. |
| self._next_post_id = 0 |
| self._posted_tasks = {} |
| |
| |
| @property |
| def ignore_delays(self): |
| """ |
| Boolean flag to control if delayed tasks are posted immediately. |
| |
| If True, all tasks posted henceforth are immediately marked active |
| ignoring any delay requested. With this switch, all other delay related |
| switches are ignored. |
| |
| """ |
| return self._ignore_delays |
| |
| |
| @ignore_delays.setter |
| def ignore_delays(self, value): |
| """ |
| Set |ignore_delays|. |
| |
| @param value: Boolean value for the |ignore_delays| flag |
| |
| """ |
| self._logger.debug('Turning %s delays ignored mode.', ('on' if value |
| else 'off')) |
| self._ignore_delays = value |
| |
| |
| @property |
| def random_delays(self): |
| """ |
| Boolean flag to control if random delays are inserted in posted tasks. |
| |
| If True, arbitrary delays in range [0, |max_random_delay_ms|] are |
| inserted in all posted tasks henceforth, ignoring the actual delay |
| requested. |
| |
| """ |
| return self._random_delays |
| |
| |
| @random_delays.setter |
| def random_delays(self, value): |
| """ |
| Set |random_delays|. |
| |
| @param value: Boolean value for the random_delays flag. |
| |
| """ |
| self._logger.debug('Turning %s random delays.', ('on' if value else |
| 'off')) |
| self._random_delays = value |
| |
| |
| @property |
| def max_random_delay_ms(self): |
| """ |
| The maximum arbitrary delay inserted in posted tasks in milliseconds. |
| Type: int |
| |
| """ |
| return self._max_random_delay_ms |
| |
| |
| @max_random_delay_ms.setter |
| def max_random_delay_ms(self, value): |
| """ |
| Set |max_random_delay_ms|. |
| |
| @param value: Non-negative int value for |max_random_delay_ms|. Negative |
| values are clamped to 0. |
| |
| """ |
| if value < 0: |
| self._logger.warning( |
| 'Can not set max_random_delay_ms to negative value %s. ' |
| 'Setting to 0 instead.', |
| value) |
| value = 0 |
| self._logger.debug('Set max random delay to %d. Random delay is %s', |
| value, ('on' if self.random_delays else 'off')) |
| self._max_random_delay_ms = value |
| |
| |
| def start(self): |
| """ |
| Run the task loop. |
| |
| This call is blocking. The thread that calls TaskLoop.start(...) becomes |
| the task loop itself and is blocked as such till TaskLoop.stop(...) is |
| called. |
| |
| """ |
| self._logger.info('Task Loop is now processing tasks...') |
| self._mainloop.run() |
| |
| |
| def stop(self): |
| """ |
| Stop the task loop. |
| |
| """ |
| self._logger.info('Task Loop quitting.') |
| self._mainloop.quit() |
| |
| |
| def post_repeated_task(self, callback, delay_ms=0): |
| """ |
| Post the given callback repeatedly forever until cancelled. |
| |
| The posted callback must not expect any arguments. It likely does not |
| make sense to provide fixed data parameters to a repeated task. Use the |
| object reference to provide context. |
| |
| In the |ignore_delays| mode, the task is reposted immediately after |
| dispatch. |
| In the |random_delays| mode, a new arbitrary delay is inserted before |
| each call to |callback|. |
| |
| @param callback: The function to call repeatedly. |callback| must expect |
| an object reference as the only argument. The return value from |
| |callback| is ignored. |
| |
| @param delay_ms: The delay between repeated calls to |callback|. The |
| first call is also delayed by this amount. Default: 0 |
| |
| @return: An integer ID that can be used to cancel the posted task. |
| |
| """ |
| assert callback is not None |
| |
| post_id = self._next_post_id |
| self._next_post_id += 1 |
| |
| next_delay_ms = self._next_delay_ms(delay_ms) |
| self._posted_tasks[post_id] = glib.timeout_add( |
| next_delay_ms, |
| TaskLoop._execute_repeated_task, |
| self, |
| post_id, |
| callback, |
| delay_ms) |
| return post_id |
| |
| |
| def post_task_after_delay(self, callback, delay_ms, *args, **kwargs): |
| """ |
| Post the given callback once to be dispatched after |delay_ms|. |
| |
| @param callback: The function to call. The function may expect arbitrary |
| number of arguments, passed in as |*args| and |**kwargs|. The |
| return value from |callback| is ignored. |
| |
| @param delay_ms: The delay before the call to |callback|. Default: 0 |
| |
| @return: An integer ID that can be used to cancel the posted task. |
| |
| """ |
| assert callback is not None |
| post_id = self._next_post_id |
| self._next_post_id = self._next_post_id + 1 |
| delay_ms = self._next_delay_ms(delay_ms) |
| self._posted_tasks[post_id] = glib.timeout_add(delay_ms, callback, |
| *args, **kwargs) |
| return post_id |
| |
| |
| def post_task(self, callback, *args, **kwargs): |
| """ |
| Post the given callback once. |
| |
| In |random_delays| mode, this function is equivalent to |
| |post_task_after_delay|. |
| |
| @param callback: The function to call. The function may expect arbitrary |
| number of arguments, passed in as |*args| and |**kwargs|. The |
| return value from |callback| is ignored. |
| |
| @return: An integer ID that can be used to cancel the posted task. |
| |
| """ |
| self._logger.debug('Task posted: %s', repr(callback)) |
| self._logger.debug('Arguments: %s, Keyword arguments: %s', |
| repr(args), repr(kwargs)) |
| return self.post_task_after_delay(callback, 0, *args, **kwargs) |
| |
| |
| def cancel_posted_task(self, post_id): |
| """ |
| Cancels a previously posted task that is yet to be dispatched. |
| |
| @param post_id: The |post_id| of the task to cancel, as returned by one |
| of the functions that post a task. |
| |
| @return: True if the posted task was removed. |
| |
| """ |
| if post_id in self._posted_tasks: |
| retval = glib.source_remove(self._posted_tasks[post_id]) |
| if retval: |
| del self._posted_tasks[post_id] |
| return retval |
| else: |
| return False |
| |
| |
| def _next_delay_ms(self, user_delay_ms): |
| """ |
| Determine the actual delay to post the next task. |
| |
| The actual delay posted may be different from the user requested delay |
| based on what mode we're in. |
| |
| @param user_delay_ms: The delay requested by the user. |
| |
| @return The actual delay to be posted. |
| |
| """ |
| next_delay_ms = user_delay_ms |
| if self.ignore_delays: |
| next_delay_ms = 0 |
| elif self.random_delays: |
| next_delay_ms = random.randint(0, self.max_random_delay_ms) |
| return next_delay_ms |
| |
| |
| def _execute_repeated_task(self, post_id, callback, delay_ms): |
| """ |
| A wrapper to repost an executed task, and return False. |
| |
| We need this to be able to repost the task at arbitrary intervals. |
| |
| @param post_id: The private post_id tracking this repeated task. |
| |
| @param callback: The user callback that must be called. |
| |
| @param delay_ms: The user requested delay between calls. |
| |
| """ |
| retval = callback() |
| self._logger.debug('Ignored return value from repeated task: %s', |
| repr(retval)) |
| |
| next_delay_ms = self._next_delay_ms(delay_ms) |
| self._posted_tasks[post_id] = glib.timeout_add( |
| next_delay_ms, |
| TaskLoop._execute_repeated_task, |
| self, |
| post_id, |
| callback, |
| delay_ms) |
| return False |