blob: 31b5e0c786d1902238ceb80a22dda85961ee3b1e [file] [log] [blame]
# Copyright 2018 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2
import functools
import multiprocessing
from portage.util._async.AsyncTaskFuture import AsyncTaskFuture
from portage.util._async.TaskScheduler import TaskScheduler
from portage.util.futures import asyncio
def iter_completed(futures, max_jobs=None, max_load=None, loop=None):
"""
This is similar to asyncio.as_completed, but takes an iterator of
futures as input, and includes support for max_jobs and max_load
parameters.
@param futures: iterator of asyncio.Future (or compatible)
@type futures: iterator
@param max_jobs: max number of futures to process concurrently (default
is multiprocessing.cpu_count())
@type max_jobs: int
@param max_load: max load allowed when scheduling a new future,
otherwise schedule no more than 1 future at a time (default
is multiprocessing.cpu_count())
@type max_load: int or float
@param loop: event loop
@type loop: EventLoop
@return: iterator of futures that are done
@rtype: iterator
"""
loop = asyncio._wrap_loop(loop)
for future_done_set in async_iter_completed(futures,
max_jobs=max_jobs, max_load=max_load, loop=loop):
for future in loop.run_until_complete(future_done_set):
yield future
def async_iter_completed(futures, max_jobs=None, max_load=None, loop=None):
"""
An asynchronous version of iter_completed. This yields futures, which
when done, result in a set of input futures that are done. This serves
as a wrapper around portage's internal TaskScheduler class, using
standard asyncio interfaces.
@param futures: iterator of asyncio.Future (or compatible)
@type futures: iterator
@param max_jobs: max number of futures to process concurrently (default
is multiprocessing.cpu_count())
@type max_jobs: int
@param max_load: max load allowed when scheduling a new future,
otherwise schedule no more than 1 future at a time (default
is multiprocessing.cpu_count())
@type max_load: int or float
@param loop: event loop
@type loop: EventLoop
@return: iterator of futures, which when done, result in a set of
input futures that are done
@rtype: iterator
"""
loop = asyncio._wrap_loop(loop)
max_jobs = max_jobs or multiprocessing.cpu_count()
max_load = max_load or multiprocessing.cpu_count()
future_map = {}
def task_generator():
for future in futures:
future_map[id(future)] = future
yield AsyncTaskFuture(future=future)
scheduler = TaskScheduler(
task_generator(),
max_jobs=max_jobs,
max_load=max_load,
event_loop=loop)
def done_callback(future_done_set, wait_result):
"""Propagate results from wait_result to future_done_set."""
if future_done_set.cancelled():
return
done, pending = wait_result.result()
for future in done:
del future_map[id(future)]
future_done_set.set_result(done)
def cancel_callback(wait_result, future_done_set):
"""Cancel wait_result if future_done_set has been cancelled."""
if future_done_set.cancelled() and not wait_result.done():
wait_result.cancel()
try:
scheduler.start()
# scheduler should ensure that future_map is non-empty until
# task_generator is exhausted
while future_map:
wait_result = asyncio.ensure_future(
asyncio.wait(list(future_map.values()),
return_when=asyncio.FIRST_COMPLETED, loop=loop), loop=loop)
future_done_set = loop.create_future()
future_done_set.add_done_callback(
functools.partial(cancel_callback, wait_result))
wait_result.add_done_callback(
functools.partial(done_callback, future_done_set))
yield future_done_set
finally:
# cleanup in case of interruption by SIGINT, etc
scheduler.cancel()
scheduler.wait()
def iter_gather(futures, max_jobs=None, max_load=None, loop=None):
"""
This is similar to asyncio.gather, but takes an iterator of
futures as input, and includes support for max_jobs and max_load
parameters.
@param futures: iterator of asyncio.Future (or compatible)
@type futures: iterator
@param max_jobs: max number of futures to process concurrently (default
is multiprocessing.cpu_count())
@type max_jobs: int
@param max_load: max load allowed when scheduling a new future,
otherwise schedule no more than 1 future at a time (default
is multiprocessing.cpu_count())
@type max_load: int or float
@param loop: event loop
@type loop: EventLoop
@return: a Future resulting in a list of done input futures, in the
same order that they were yielded from the input iterator
@rtype: asyncio.Future (or compatible)
"""
loop = asyncio._wrap_loop(loop)
result = loop.create_future()
futures_list = []
def future_generator():
for future in futures:
futures_list.append(future)
yield future
completed_iter = async_iter_completed(
future_generator(),
max_jobs=max_jobs,
max_load=max_load,
loop=loop,
)
def handle_result(future_done_set):
if result.cancelled():
if not future_done_set.cancelled():
# All exceptions must be consumed from future_done_set, in order
# to avoid triggering the event loop's exception handler.
list(future.exception() for future in future_done_set.result()
if not future.cancelled())
return
try:
handle_result.current_task = next(completed_iter)
except StopIteration:
result.set_result(futures_list)
else:
handle_result.current_task.add_done_callback(handle_result)
try:
handle_result.current_task = next(completed_iter)
except StopIteration:
handle_result.current_task = None
result.set_result(futures_list)
else:
handle_result.current_task.add_done_callback(handle_result)
def cancel_callback(result):
if (result.cancelled() and
handle_result.current_task is not None and
not handle_result.current_task.done()):
handle_result.current_task.cancel()
result.add_done_callback(cancel_callback)
return result