blob: add7b3c9e816d26c698d5f78218cfdf08ae517ea [file] [log] [blame]
# Copyright 2018 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2
__all__ = (
'ForkExecutor',
)
import collections
import functools
import os
import sys
import traceback
from portage.util._async.AsyncFunction import AsyncFunction
from portage.util.futures import asyncio
from portage.util.cpuinfo import get_cpu_count
class ForkExecutor(object):
"""
An implementation of concurrent.futures.Executor that forks a
new process for each task, with support for cancellation of tasks.
This is entirely driven by an event loop.
"""
def __init__(self, max_workers=None, loop=None):
self._max_workers = max_workers or get_cpu_count()
self._loop = asyncio._wrap_loop(loop)
self._submit_queue = collections.deque()
self._running_tasks = {}
self._shutdown = False
self._shutdown_future = self._loop.create_future()
def submit(self, fn, *args, **kwargs):
"""Submits a callable to be executed with the given arguments.
Schedules the callable to be executed as fn(*args, **kwargs) and returns
a Future instance representing the execution of the callable.
Returns:
A Future representing the given call.
"""
future = self._loop.create_future()
proc = AsyncFunction(target=functools.partial(
self._guarded_fn_call, fn, args, kwargs))
self._submit_queue.append((future, proc))
self._schedule()
return future
def _schedule(self):
while (not self._shutdown and self._submit_queue and
len(self._running_tasks) < self._max_workers):
future, proc = self._submit_queue.popleft()
future.add_done_callback(functools.partial(self._cancel_cb, proc))
proc.addExitListener(functools.partial(self._proc_exit, future))
proc.scheduler = self._loop
proc.start()
self._running_tasks[id(proc)] = proc
def _cancel_cb(self, proc, future):
if future.cancelled():
# async, handle the rest in _proc_exit
proc.cancel()
@staticmethod
def _guarded_fn_call(fn, args, kwargs):
try:
result = fn(*args, **kwargs)
exception = None
except Exception as e:
result = None
exception = _ExceptionWithTraceback(e)
return result, exception
def _proc_exit(self, future, proc):
if not future.cancelled():
if proc.returncode == os.EX_OK:
result, exception = proc.result
if exception is not None:
future.set_exception(exception)
else:
future.set_result(result)
else:
# TODO: add special exception class for this, maybe
# distinguish between kill and crash
future.set_exception(
Exception('pid {} crashed or killed, exitcode {}'.\
format(proc.pid, proc.returncode)))
del self._running_tasks[id(proc)]
self._schedule()
if self._shutdown and not self._running_tasks:
self._shutdown_future.set_result(None)
def shutdown(self, wait=True):
self._shutdown = True
if not self._running_tasks and not self._shutdown_future.done():
self._shutdown_future.set_result(None)
if wait:
self._loop.run_until_complete(self._shutdown_future)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown(wait=True)
return False
class _ExceptionWithTraceback:
def __init__(self, exc):
tb = traceback.format_exception(type(exc), exc, exc.__traceback__)
tb = ''.join(tb)
self.exc = exc
self.tb = '\n"""\n%s"""' % tb
def __reduce__(self):
return _rebuild_exc, (self.exc, self.tb)
class _RemoteTraceback(Exception):
def __init__(self, tb):
self.tb = tb
def __str__(self):
return self.tb
def _rebuild_exc(exc, tb):
exc.__cause__ = _RemoteTraceback(tb)
return exc
if sys.version_info < (3,):
# Python 2 does not support exception chaining, so
# don't bother to preserve the traceback.
_ExceptionWithTraceback = lambda exc: exc