blob: 3c9c6e22ba75404accebf014b32ed8ad931bf381 [file] [log] [blame]
# Copyright 2012-2020 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
import fcntl
import functools
import multiprocessing
import signal
import sys
import portage
from portage import os
from portage.util.futures import asyncio
from portage.util.futures.compat_coroutine import coroutine
from _emerge.SpawnProcess import SpawnProcess
class ForkProcess(SpawnProcess):
__slots__ = ('_proc', '_proc_join_task')
# Number of seconds between poll attempts for process exit status
# (after the sentinel has become ready).
_proc_join_interval = 0.1
def _spawn(self, args, fd_pipes=None, **kwargs):
"""
Override SpawnProcess._spawn to fork a subprocess that calls
self._run(). This uses multiprocessing.Process in order to leverage
any pre-fork and post-fork interpreter housekeeping that it provides,
promoting a healthy state for the forked interpreter.
"""
# Since multiprocessing.Process closes sys.__stdin__, create a
# temporary duplicate of fd_pipes[0] so that sys.__stdin__ can
# be restored in the subprocess, in case this is needed for
# things like PROPERTIES=interactive support.
stdin_dup = None
try:
stdin_fd = fd_pipes.get(0)
if stdin_fd is not None and stdin_fd == portage._get_stdin().fileno():
stdin_dup = os.dup(stdin_fd)
fcntl.fcntl(stdin_dup, fcntl.F_SETFD,
fcntl.fcntl(stdin_fd, fcntl.F_GETFD))
fd_pipes[0] = stdin_dup
self._proc = multiprocessing.Process(target=self._bootstrap, args=(fd_pipes,))
self._proc.start()
finally:
if stdin_dup is not None:
os.close(stdin_dup)
self._proc_join_task = asyncio.ensure_future(
self._proc_join(self._proc, loop=self.scheduler), loop=self.scheduler)
self._proc_join_task.add_done_callback(
functools.partial(self._proc_join_done, self._proc))
return [self._proc.pid]
def _cancel(self):
if self._proc is None:
super(ForkProcess, self)._cancel()
else:
self._proc.terminate()
def _async_wait(self):
if self._proc_join_task is None:
super(ForkProcess, self)._async_wait()
def _async_waitpid(self):
if self._proc_join_task is None:
super(ForkProcess, self)._async_waitpid()
@coroutine
def _proc_join(self, proc, loop=None):
sentinel_reader = self.scheduler.create_future()
self.scheduler.add_reader(proc.sentinel,
lambda: sentinel_reader.done() or sentinel_reader.set_result(None))
try:
yield sentinel_reader
finally:
# If multiprocessing.Process supports the close method, then
# access to proc.sentinel will raise ValueError if the
# sentinel has been closed. In this case it's not safe to call
# remove_reader, since the file descriptor may have been closed
# and then reallocated to a concurrent coroutine. When the
# close method is not supported, proc.sentinel remains open
# until proc's finalizer is called.
try:
self.scheduler.remove_reader(proc.sentinel)
except ValueError:
pass
# Now that proc.sentinel is ready, poll until process exit
# status has become available.
while True:
proc.join(0)
if proc.exitcode is not None:
break
yield asyncio.sleep(self._proc_join_interval, loop=loop)
def _proc_join_done(self, proc, future):
future.cancelled() or future.result()
self._was_cancelled()
if self.returncode is None:
self.returncode = proc.exitcode
self._proc = None
if hasattr(proc, 'close'):
proc.close()
self._proc_join_task = None
self._async_wait()
def _unregister(self):
super(ForkProcess, self)._unregister()
if self._proc is not None:
if self._proc.is_alive():
self._proc.terminate()
self._proc = None
if self._proc_join_task is not None:
self._proc_join_task.cancel()
self._proc_join_task = None
def _bootstrap(self, fd_pipes):
# Use default signal handlers in order to avoid problems
# killing subprocesses as reported in bug #353239.
signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
# Unregister SIGCHLD handler and wakeup_fd for the parent
# process's event loop (bug 655656).
signal.signal(signal.SIGCHLD, signal.SIG_DFL)
try:
wakeup_fd = signal.set_wakeup_fd(-1)
if wakeup_fd > 0:
os.close(wakeup_fd)
except (ValueError, OSError):
pass
portage.locks._close_fds()
# We don't exec, so use close_fds=False
# (see _setup_pipes docstring).
portage.process._setup_pipes(fd_pipes, close_fds=False)
# Since multiprocessing.Process closes sys.__stdin__ and
# makes sys.stdin refer to os.devnull, restore it when
# appropriate.
if 0 in fd_pipes:
# It's possible that sys.stdin.fileno() is already 0,
# and in that case the above _setup_pipes call will
# have already updated its identity via dup2. Otherwise,
# perform the dup2 call now, and also copy the file
# descriptor flags.
if sys.stdin.fileno() != 0:
os.dup2(0, sys.stdin.fileno())
fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFD,
fcntl.fcntl(0, fcntl.F_GETFD))
sys.__stdin__ = sys.stdin
sys.exit(self._run())
def _run(self):
raise NotImplementedError(self)