blob: ce4b2a93b61d44eb2df815ebdaae60c2f7ca1ec8 [file] [log] [blame]
# Copyright 2020-2021 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
import sys
import portage
from portage import os
from portage.tests import TestCase
from portage.util._async.AsyncFunction import AsyncFunction
from portage.util.futures import asyncio
from portage.util.futures._asyncio.streams import _writer
from portage.util.futures.unix_events import _set_nonblocking
class AsyncFunctionTestCase(TestCase):
@staticmethod
def _read_from_stdin(pw):
os.close(pw)
return ''.join(sys.stdin)
async def _testAsyncFunctionStdin(self, loop):
test_string = '1\n2\n3\n'
pr, pw = os.pipe()
fd_pipes = {0:pr}
reader = AsyncFunction(scheduler=loop, fd_pipes=fd_pipes, target=self._read_from_stdin, args=(pw,))
reader.start()
os.close(pr)
_set_nonblocking(pw)
with open(pw, mode='wb', buffering=0) as pipe_write:
await _writer(pipe_write, test_string.encode('utf_8'))
self.assertEqual((await reader.async_wait()), os.EX_OK)
self.assertEqual(reader.result, test_string)
def testAsyncFunctionStdin(self):
loop = asyncio._wrap_loop()
loop.run_until_complete(self._testAsyncFunctionStdin(loop=loop))
def _test_getpid_fork(self):
"""
Verify that portage.getpid() cache is updated in a forked child process.
"""
loop = asyncio._wrap_loop()
proc = AsyncFunction(scheduler=loop, target=portage.getpid)
proc.start()
proc.wait()
self.assertEqual(proc.pid, proc.result)
def test_getpid_fork(self):
self._test_getpid_fork()
def test_getpid_double_fork(self):
"""
Verify that portage.getpid() cache is updated correctly after
two forks.
"""
loop = asyncio._wrap_loop()
proc = AsyncFunction(scheduler=loop, target=self._test_getpid_fork)
proc.start()
self.assertEqual(proc.wait(), 0)