| # Copyright 2018 Gentoo Foundation |
| # Distributed under the terms of the GNU General Public License v2 |
| |
| __all__ = ( |
| 'ForkExecutor', |
| ) |
| |
| import collections |
| import functools |
| import multiprocessing |
| import os |
| import sys |
| import traceback |
| |
| from portage.util._async.AsyncFunction import AsyncFunction |
| from portage.util.futures import asyncio |
| |
| |
| class ForkExecutor(object): |
| """ |
| An implementation of concurrent.futures.Executor that forks a |
| new process for each task, with support for cancellation of tasks. |
| |
| This is entirely driven by an event loop. |
| """ |
| def __init__(self, max_workers=None, loop=None): |
| self._max_workers = max_workers or multiprocessing.cpu_count() |
| self._loop = asyncio._wrap_loop(loop) |
| self._submit_queue = collections.deque() |
| self._running_tasks = {} |
| self._shutdown = False |
| self._shutdown_future = self._loop.create_future() |
| |
| def submit(self, fn, *args, **kwargs): |
| """Submits a callable to be executed with the given arguments. |
| |
| Schedules the callable to be executed as fn(*args, **kwargs) and returns |
| a Future instance representing the execution of the callable. |
| |
| Returns: |
| A Future representing the given call. |
| """ |
| future = self._loop.create_future() |
| proc = AsyncFunction(target=functools.partial( |
| self._guarded_fn_call, fn, args, kwargs)) |
| self._submit_queue.append((future, proc)) |
| self._schedule() |
| return future |
| |
| def _schedule(self): |
| while (not self._shutdown and self._submit_queue and |
| len(self._running_tasks) < self._max_workers): |
| future, proc = self._submit_queue.popleft() |
| future.add_done_callback(functools.partial(self._cancel_cb, proc)) |
| proc.addExitListener(functools.partial(self._proc_exit, future)) |
| proc.scheduler = self._loop |
| proc.start() |
| self._running_tasks[id(proc)] = proc |
| |
| def _cancel_cb(self, proc, future): |
| if future.cancelled(): |
| # async, handle the rest in _proc_exit |
| proc.cancel() |
| |
| @staticmethod |
| def _guarded_fn_call(fn, args, kwargs): |
| try: |
| result = fn(*args, **kwargs) |
| exception = None |
| except Exception as e: |
| result = None |
| exception = _ExceptionWithTraceback(e) |
| |
| return result, exception |
| |
| def _proc_exit(self, future, proc): |
| if not future.cancelled(): |
| if proc.returncode == os.EX_OK: |
| result, exception = proc.result |
| if exception is not None: |
| future.set_exception(exception) |
| else: |
| future.set_result(result) |
| else: |
| # TODO: add special exception class for this, maybe |
| # distinguish between kill and crash |
| future.set_exception( |
| Exception('pid {} crashed or killed, exitcode {}'.\ |
| format(proc.pid, proc.returncode))) |
| |
| del self._running_tasks[id(proc)] |
| self._schedule() |
| if self._shutdown and not self._running_tasks: |
| self._shutdown_future.set_result(None) |
| |
| def shutdown(self, wait=True): |
| self._shutdown = True |
| if not self._running_tasks and not self._shutdown_future.done(): |
| self._shutdown_future.set_result(None) |
| if wait: |
| self._loop.run_until_complete(self._shutdown_future) |
| |
| def __enter__(self): |
| return self |
| |
| def __exit__(self, exc_type, exc_val, exc_tb): |
| self.shutdown(wait=True) |
| return False |
| |
| |
| class _ExceptionWithTraceback: |
| def __init__(self, exc): |
| tb = traceback.format_exception(type(exc), exc, exc.__traceback__) |
| tb = ''.join(tb) |
| self.exc = exc |
| self.tb = '\n"""\n%s"""' % tb |
| def __reduce__(self): |
| return _rebuild_exc, (self.exc, self.tb) |
| |
| |
| class _RemoteTraceback(Exception): |
| def __init__(self, tb): |
| self.tb = tb |
| def __str__(self): |
| return self.tb |
| |
| |
| def _rebuild_exc(exc, tb): |
| exc.__cause__ = _RemoteTraceback(tb) |
| return exc |
| |
| |
| if sys.version_info < (3,): |
| # Python 2 does not support exception chaining, so |
| # don't bother to preserve the traceback. |
| _ExceptionWithTraceback = lambda exc: exc |