| # Copyright 1998-2018 Gentoo Foundation |
| # Distributed under the terms of the GNU General Public License v2 |
| |
| import functools |
| import pty |
| import shutil |
| import socket |
| import sys |
| import subprocess |
| import tempfile |
| |
| from portage import os |
| from portage.tests import TestCase |
| from portage.util._async.PopenProcess import PopenProcess |
| from portage.util._eventloop.global_event_loop import global_event_loop |
| from _emerge.PipeReader import PipeReader |
| |
| class PipeReaderTestCase(TestCase): |
| |
| _use_array = False |
| _echo_cmd = "echo -n '%s'" |
| |
| def test_pipe(self): |
| def make_pipes(): |
| return os.pipe(), None |
| self._do_test(make_pipes) |
| |
| def test_pty_device(self): |
| def make_pipes(): |
| try: |
| return pty.openpty(), None |
| except EnvironmentError: |
| self.skipTest('pty not available') |
| self._do_test(make_pipes) |
| |
| def test_domain_socket(self): |
| def make_pipes(): |
| if sys.version_info >= (3, 2): |
| read_end, write_end = socket.socketpair() |
| return (read_end.detach(), write_end.detach()), None |
| else: |
| self.skipTest('socket detach not supported') |
| self._do_test(make_pipes) |
| |
| def test_named_pipe(self): |
| def make_pipes(): |
| tempdir = tempfile.mkdtemp() |
| fifo_path = os.path.join(tempdir, 'fifo') |
| os.mkfifo(fifo_path) |
| return ((os.open(fifo_path, os.O_NONBLOCK|os.O_RDONLY), |
| os.open(fifo_path, os.O_NONBLOCK|os.O_WRONLY)), |
| functools.partial(shutil.rmtree, tempdir)) |
| self._do_test(make_pipes) |
| |
| def _testPipeReader(self, master_fd, slave_fd, test_string): |
| """ |
| Use a poll loop to read data from a pipe and assert that |
| the data written to the pipe is identical to the data |
| read from the pipe. |
| """ |
| |
| # WARNING: It is very important to use unbuffered mode here, |
| # in order to avoid issue 5380 with python3. |
| master_file = os.fdopen(master_fd, 'rb', 0) |
| scheduler = global_event_loop() |
| |
| consumer = PipeReader( |
| input_files={"producer" : master_file}, |
| _use_array=self._use_array, |
| scheduler=scheduler) |
| |
| producer = PopenProcess( |
| pipe_reader=consumer, |
| proc=subprocess.Popen(["bash", "-c", self._echo_cmd % test_string], |
| stdout=slave_fd), |
| scheduler=scheduler) |
| |
| producer.start() |
| os.close(slave_fd) |
| producer.wait() |
| consumer.wait() |
| |
| self.assertEqual(producer.returncode, os.EX_OK) |
| self.assertEqual(consumer.returncode, os.EX_OK) |
| |
| return consumer.getvalue().decode('ascii', 'replace') |
| |
| def _do_test(self, make_pipes): |
| for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14): |
| test_string = x * "a" |
| (read_end, write_end), cleanup = make_pipes() |
| try: |
| output = self._testPipeReader(read_end, write_end, test_string) |
| self.assertEqual(test_string, output, |
| "x = %s, len(output) = %s" % (x, len(output))) |
| finally: |
| if cleanup is not None: |
| cleanup() |
| |
| |
| class PipeReaderArrayTestCase(PipeReaderTestCase): |
| |
| _use_array = True |
| # sleep allows reliable triggering of the failure mode on fast computers |
| _echo_cmd = "sleep 0.1 ; echo -n '%s'" |
| |
| def __init__(self, *args, **kwargs): |
| super(PipeReaderArrayTestCase, self).__init__(*args, **kwargs) |
| # https://bugs.python.org/issue5380 |
| # https://bugs.pypy.org/issue956 |
| self.todo = True |