blob: b20765b7a7446fd63b6d12dd366b4623da7929df [file] [log] [blame]
# Copyright 2018 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2
___all___ = (
'ALL_COMPLETED',
'FIRST_COMPLETED',
'FIRST_EXCEPTION',
'wait',
)
try:
from asyncio import ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION
except ImportError:
ALL_COMPLETED = 'ALL_COMPLETED'
FIRST_COMPLETED ='FIRST_COMPLETED'
FIRST_EXCEPTION = 'FIRST_EXCEPTION'
import portage
portage.proxy.lazyimport.lazyimport(globals(),
'portage.util.futures:asyncio',
)
from portage.util._eventloop.global_event_loop import (
global_event_loop as _global_event_loop,
)
def wait(futures, loop=None, timeout=None, return_when=ALL_COMPLETED):
"""
Use portage's internal EventLoop to emulate asyncio.wait:
https://docs.python.org/3/library/asyncio-task.html#asyncio.wait
@param futures: futures to wait for
@type futures: asyncio.Future (or compatible)
@param timeout: number of seconds to wait (wait indefinitely if
not specified)
@type timeout: int or float
@param return_when: indicates when this function should return, must
be one of the constants ALL_COMPLETED, FIRST_COMPLETED, or
FIRST_EXCEPTION (default is ALL_COMPLETED)
@type return_when: object
@param loop: event loop
@type loop: EventLoop
@return: tuple of (done, pending).
@rtype: asyncio.Future (or compatible)
"""
loop = asyncio._wrap_loop(loop)
result_future = loop.create_future()
_Waiter(futures, timeout, return_when, result_future, loop)
return result_future
class _Waiter(object):
def __init__(self, futures, timeout, return_when, result_future, loop):
self._futures = futures
self._completed = set()
self._exceptions = set()
self._return_when = return_when
self._result_future = result_future
self._loop = loop
self._ready = False
self._timeout = None
result_future.add_done_callback(self._cancel_callback)
for future in self._futures:
future.add_done_callback(self._done_callback)
if timeout is not None:
self._timeout = loop.call_later(timeout, self._timeout_callback)
def _cancel_callback(self, future):
if future.cancelled():
self._ready_callback()
def _timeout_callback(self):
if not self._ready:
self._ready = True
self._ready_callback()
def _done_callback(self, future):
if future.cancelled() or future.exception() is None:
self._completed.add(id(future))
else:
self._exceptions.add(id(future))
if not self._ready and (
(self._return_when is FIRST_COMPLETED and self._completed) or
(self._return_when is FIRST_EXCEPTION and self._exceptions) or
(len(self._futures) == len(self._completed) + len(self._exceptions))):
self._ready = True
# use call_soon in case multiple callbacks complete in quick succession
self._loop.call_soon(self._ready_callback)
def _ready_callback(self):
if self._timeout is not None:
self._timeout.cancel()
self._timeout = None
if self._result_future.cancelled():
return
done = []
pending = []
done_ids = self._completed.union(self._exceptions)
for future in self._futures:
if id(future) in done_ids:
done.append(future)
else:
pending.append(future)
future.remove_done_callback(self._done_callback)
self._result_future.set_result((set(done), set(pending)))