| # Copyright 1999-2011 Gentoo Foundation |
| # Distributed under the terms of the GNU General Public License v2 |
| |
| import time |
| |
| from _emerge.PollScheduler import PollScheduler |
| |
| class QueueScheduler(PollScheduler): |
| |
| """ |
| Add instances of SequentialTaskQueue and then call run(). The |
| run() method returns when no tasks remain. |
| """ |
| |
| def __init__(self, max_jobs=None, max_load=None): |
| PollScheduler.__init__(self) |
| |
| if max_jobs is None: |
| max_jobs = 1 |
| |
| self._max_jobs = max_jobs |
| self._max_load = max_load |
| |
| self._queues = [] |
| self._schedule_listeners = [] |
| |
| def add(self, q): |
| self._queues.append(q) |
| |
| def remove(self, q): |
| self._queues.remove(q) |
| |
| def clear(self): |
| for q in self._queues: |
| q.clear() |
| |
| def run(self, timeout=None): |
| |
| start_time = None |
| timed_out = False |
| remaining_timeout = timeout |
| if timeout is not None: |
| start_time = time.time() |
| |
| while self._schedule(): |
| self._schedule_wait(timeout=remaining_timeout) |
| if timeout is not None: |
| elapsed_time = time.time() - start_time |
| if elapsed_time < 0: |
| # The system clock has changed such that start_time |
| # is now in the future, so just assume that the |
| # timeout has already elapsed. |
| timed_out = True |
| break |
| remaining_timeout = timeout - 1000 * elapsed_time |
| if remaining_timeout <= 0: |
| timed_out = True |
| break |
| |
| if timeout is None or not timed_out: |
| while self._running_job_count(): |
| self._schedule_wait(timeout=remaining_timeout) |
| if timeout is not None: |
| elapsed_time = time.time() - start_time |
| if elapsed_time < 0: |
| # The system clock has changed such that start_time |
| # is now in the future, so just assume that the |
| # timeout has already elapsed. |
| timed_out = True |
| break |
| remaining_timeout = timeout - 1000 * elapsed_time |
| if remaining_timeout <= 0: |
| timed_out = True |
| break |
| |
| def _schedule_tasks(self): |
| """ |
| @rtype: bool |
| @returns: True if there may be remaining tasks to schedule, |
| False otherwise. |
| """ |
| if self._terminated_tasks: |
| return False |
| |
| while self._can_add_job(): |
| n = self._max_jobs - self._running_job_count() |
| if n < 1: |
| break |
| |
| if not self._start_next_job(n): |
| return False |
| |
| for q in self._queues: |
| if q: |
| return True |
| return False |
| |
| def _running_job_count(self): |
| job_count = 0 |
| for q in self._queues: |
| job_count += len(q.running_tasks) |
| self._jobs = job_count |
| return job_count |
| |
| def _start_next_job(self, n=1): |
| started_count = 0 |
| for q in self._queues: |
| initial_job_count = len(q.running_tasks) |
| q.schedule() |
| final_job_count = len(q.running_tasks) |
| if final_job_count > initial_job_count: |
| started_count += (final_job_count - initial_job_count) |
| if started_count >= n: |
| break |
| return started_count |
| |