| # Copyright 2012-2013 Gentoo Foundation |
| # Distributed under the terms of the GNU General Public License v2 |
| |
| import subprocess |
| import tempfile |
| |
| from portage import os |
| from portage.tests import TestCase |
| from portage.util._async.PipeLogger import PipeLogger |
| from portage.util._async.PopenProcess import PopenProcess |
| from portage.util._eventloop.global_event_loop import global_event_loop |
| from _emerge.PipeReader import PipeReader |
| |
| class PopenPipeTestCase(TestCase): |
| """ |
| Test PopenProcess, which can be useful for Jython support, since it |
| uses the subprocess.Popen instead of os.fork(). |
| """ |
| |
| _echo_cmd = "echo -n '%s'" |
| |
| def _testPipeReader(self, test_string): |
| """ |
| Use a poll loop to read data from a pipe and assert that |
| the data written to the pipe is identical to the data |
| read from the pipe. |
| """ |
| |
| producer = PopenProcess(proc=subprocess.Popen( |
| ["bash", "-c", self._echo_cmd % test_string], |
| stdout=subprocess.PIPE, stderr=subprocess.STDOUT), |
| pipe_reader=PipeReader(), scheduler=global_event_loop()) |
| |
| consumer = producer.pipe_reader |
| consumer.input_files = {"producer" : producer.proc.stdout} |
| |
| producer.start() |
| producer.wait() |
| |
| self.assertEqual(producer.returncode, os.EX_OK) |
| self.assertEqual(consumer.returncode, os.EX_OK) |
| |
| return consumer.getvalue().decode('ascii', 'replace') |
| |
| def _testPipeLogger(self, test_string): |
| |
| producer = PopenProcess(proc=subprocess.Popen( |
| ["bash", "-c", self._echo_cmd % test_string], |
| stdout=subprocess.PIPE, stderr=subprocess.STDOUT), |
| scheduler=global_event_loop()) |
| |
| fd, log_file_path = tempfile.mkstemp() |
| try: |
| |
| consumer = PipeLogger(background=True, |
| input_fd=producer.proc.stdout, |
| log_file_path=log_file_path) |
| |
| producer.pipe_reader = consumer |
| |
| producer.start() |
| producer.wait() |
| |
| self.assertEqual(producer.returncode, os.EX_OK) |
| self.assertEqual(consumer.returncode, os.EX_OK) |
| |
| with open(log_file_path, 'rb') as f: |
| content = f.read() |
| |
| finally: |
| os.close(fd) |
| os.unlink(log_file_path) |
| |
| return content.decode('ascii', 'replace') |
| |
| def testPopenPipe(self): |
| for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14): |
| test_string = x * "a" |
| output = self._testPipeReader(test_string) |
| self.assertEqual(test_string, output, |
| "x = %s, len(output) = %s" % (x, len(output))) |
| |
| output = self._testPipeLogger(test_string) |
| self.assertEqual(test_string, output, |
| "x = %s, len(output) = %s" % (x, len(output))) |