| # Copyright 2020-2021 Gentoo Authors |
| # Distributed under the terms of the GNU General Public License v2 |
| |
| import sys |
| |
| import portage |
| from portage import os |
| from portage.tests import TestCase |
| from portage.util._async.AsyncFunction import AsyncFunction |
| from portage.util.futures import asyncio |
| from portage.util.futures._asyncio.streams import _writer |
| from portage.util.futures.unix_events import _set_nonblocking |
| |
| |
| class AsyncFunctionTestCase(TestCase): |
| |
| @staticmethod |
| def _read_from_stdin(pw): |
| os.close(pw) |
| return ''.join(sys.stdin) |
| |
| async def _testAsyncFunctionStdin(self, loop): |
| test_string = '1\n2\n3\n' |
| pr, pw = os.pipe() |
| fd_pipes = {0:pr} |
| reader = AsyncFunction(scheduler=loop, fd_pipes=fd_pipes, target=self._read_from_stdin, args=(pw,)) |
| reader.start() |
| os.close(pr) |
| _set_nonblocking(pw) |
| with open(pw, mode='wb', buffering=0) as pipe_write: |
| await _writer(pipe_write, test_string.encode('utf_8')) |
| self.assertEqual((await reader.async_wait()), os.EX_OK) |
| self.assertEqual(reader.result, test_string) |
| |
| def testAsyncFunctionStdin(self): |
| loop = asyncio._wrap_loop() |
| loop.run_until_complete(self._testAsyncFunctionStdin(loop=loop)) |
| |
| def _test_getpid_fork(self): |
| """ |
| Verify that portage.getpid() cache is updated in a forked child process. |
| """ |
| loop = asyncio._wrap_loop() |
| proc = AsyncFunction(scheduler=loop, target=portage.getpid) |
| proc.start() |
| proc.wait() |
| self.assertEqual(proc.pid, proc.result) |
| |
| def test_getpid_fork(self): |
| self._test_getpid_fork() |
| |
| def test_getpid_double_fork(self): |
| """ |
| Verify that portage.getpid() cache is updated correctly after |
| two forks. |
| """ |
| loop = asyncio._wrap_loop() |
| proc = AsyncFunction(scheduler=loop, target=self._test_getpid_fork) |
| proc.start() |
| self.assertEqual(proc.wait(), 0) |