blob: 9b96c6f36249d3f9d8f1e654e4f41c194dcf3c8b [file] [log] [blame]
# Copyright 2012-2013 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2
from portage import os
from _emerge.AsynchronousTask import AsynchronousTask
from _emerge.PollScheduler import PollScheduler
class AsyncScheduler(AsynchronousTask, PollScheduler):
def __init__(self, max_jobs=None, max_load=None, **kwargs):
AsynchronousTask.__init__(self)
PollScheduler.__init__(self, **kwargs)
if max_jobs is None:
max_jobs = 1
self._max_jobs = max_jobs
self._max_load = max_load
self._error_count = 0
self._running_tasks = set()
self._remaining_tasks = True
self._term_check_id = None
self._loadavg_check_id = None
def _poll(self):
if not (self._is_work_scheduled() or self._keep_scheduling()):
self.wait()
return self.returncode
def _cancel(self):
self._terminated.set()
self._termination_check()
def _terminate_tasks(self):
for task in list(self._running_tasks):
task.cancel()
def _next_task(self):
raise NotImplementedError(self)
def _keep_scheduling(self):
return self._remaining_tasks and not self._terminated.is_set()
def _running_job_count(self):
return len(self._running_tasks)
def _schedule_tasks(self):
while self._keep_scheduling() and self._can_add_job():
try:
task = self._next_task()
except StopIteration:
self._remaining_tasks = False
else:
self._running_tasks.add(task)
task.scheduler = self._sched_iface
task.addExitListener(self._task_exit)
task.start()
# Triggers cleanup and exit listeners if there's nothing left to do.
self.poll()
def _task_exit(self, task):
self._running_tasks.discard(task)
if task.returncode != os.EX_OK:
self._error_count += 1
self._schedule()
def _start(self):
self._term_check_id = self._event_loop.idle_add(self._termination_check)
if self._max_load is not None and \
self._loadavg_latency is not None and \
(self._max_jobs is True or self._max_jobs > 1):
# We have to schedule periodically, in case the load
# average has changed since the last call.
self._loadavg_check_id = self._event_loop.timeout_add(
self._loadavg_latency, self._schedule)
self._schedule()
def _wait(self):
# Loop while there are jobs to be scheduled.
while self._keep_scheduling():
self._event_loop.iteration()
# Clean shutdown of previously scheduled jobs. In the
# case of termination, this allows for basic cleanup
# such as flushing of buffered output to logs.
while self._is_work_scheduled():
self._event_loop.iteration()
if self._term_check_id is not None:
self._event_loop.source_remove(self._term_check_id)
self._term_check_id = None
if self._loadavg_check_id is not None:
self._event_loop.source_remove(self._loadavg_check_id)
self._loadavg_check_id = None
if self._error_count > 0:
self.returncode = 1
else:
self.returncode = os.EX_OK
return self.returncode