| # Copyright 2018 Gentoo Foundation |
| # Distributed under the terms of the GNU General Public License v2 |
| |
| ___all___ = ( |
| 'ALL_COMPLETED', |
| 'FIRST_COMPLETED', |
| 'FIRST_EXCEPTION', |
| 'wait', |
| ) |
| |
| try: |
| from asyncio import ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION |
| except ImportError: |
| ALL_COMPLETED = 'ALL_COMPLETED' |
| FIRST_COMPLETED ='FIRST_COMPLETED' |
| FIRST_EXCEPTION = 'FIRST_EXCEPTION' |
| |
| import portage |
| portage.proxy.lazyimport.lazyimport(globals(), |
| 'portage.util.futures:asyncio', |
| ) |
| from portage.util._eventloop.global_event_loop import ( |
| global_event_loop as _global_event_loop, |
| ) |
| |
| |
| def wait(futures, loop=None, timeout=None, return_when=ALL_COMPLETED): |
| """ |
| Use portage's internal EventLoop to emulate asyncio.wait: |
| https://docs.python.org/3/library/asyncio-task.html#asyncio.wait |
| |
| @param futures: futures to wait for |
| @type futures: asyncio.Future (or compatible) |
| @param timeout: number of seconds to wait (wait indefinitely if |
| not specified) |
| @type timeout: int or float |
| @param return_when: indicates when this function should return, must |
| be one of the constants ALL_COMPLETED, FIRST_COMPLETED, or |
| FIRST_EXCEPTION (default is ALL_COMPLETED) |
| @type return_when: object |
| @param loop: event loop |
| @type loop: EventLoop |
| @return: tuple of (done, pending). |
| @rtype: asyncio.Future (or compatible) |
| """ |
| loop = asyncio._wrap_loop(loop) |
| result_future = loop.create_future() |
| _Waiter(futures, timeout, return_when, result_future, loop) |
| return result_future |
| |
| |
| class _Waiter(object): |
| def __init__(self, futures, timeout, return_when, result_future, loop): |
| self._futures = futures |
| self._completed = set() |
| self._exceptions = set() |
| self._return_when = return_when |
| self._result_future = result_future |
| self._loop = loop |
| self._ready = False |
| self._timeout = None |
| result_future.add_done_callback(self._cancel_callback) |
| for future in self._futures: |
| future.add_done_callback(self._done_callback) |
| if timeout is not None: |
| self._timeout = loop.call_later(timeout, self._timeout_callback) |
| |
| def _cancel_callback(self, future): |
| if future.cancelled(): |
| self._ready_callback() |
| |
| def _timeout_callback(self): |
| if not self._ready: |
| self._ready = True |
| self._ready_callback() |
| |
| def _done_callback(self, future): |
| if future.cancelled() or future.exception() is None: |
| self._completed.add(id(future)) |
| else: |
| self._exceptions.add(id(future)) |
| if not self._ready and ( |
| (self._return_when is FIRST_COMPLETED and self._completed) or |
| (self._return_when is FIRST_EXCEPTION and self._exceptions) or |
| (len(self._futures) == len(self._completed) + len(self._exceptions))): |
| self._ready = True |
| # use call_soon in case multiple callbacks complete in quick succession |
| self._loop.call_soon(self._ready_callback) |
| |
| def _ready_callback(self): |
| if self._timeout is not None: |
| self._timeout.cancel() |
| self._timeout = None |
| if self._result_future.cancelled(): |
| return |
| done = [] |
| pending = [] |
| done_ids = self._completed.union(self._exceptions) |
| for future in self._futures: |
| if id(future) in done_ids: |
| done.append(future) |
| else: |
| pending.append(future) |
| future.remove_done_callback(self._done_callback) |
| self._result_future.set_result((set(done), set(pending))) |