blob: b06adf6edb9943eee6f68328285c41d8c12a07a5 [file] [log] [blame]
# Copyright 2012 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2
try:
import threading
except ImportError:
# dummy_threading will not suffice
threading = None
from portage import os
from _emerge.AbstractPollTask import AbstractPollTask
class PipeReaderBlockingIO(AbstractPollTask):
"""
Reads output from one or more files and saves it in memory, for
retrieval via the getvalue() method. This is driven by a thread
for each input file, in order to support blocking IO. This may
be useful for using threads to handle blocking IO with Jython,
since Jython lacks the fcntl module which is needed for
non-blocking IO (see http://bugs.jython.org/issue1074).
"""
__slots__ = ("input_files", "_read_data", "_terminate",
"_threads", "_thread_rlock")
def _start(self):
self._terminate = threading.Event()
self._threads = {}
self._read_data = []
self._registered = True
self._thread_rlock = threading.RLock()
with self._thread_rlock:
for f in self.input_files.values():
t = threading.Thread(target=self._reader_thread, args=(f,))
t.daemon = True
t.start()
self._threads[f] = t
def _reader_thread(self, f):
try:
terminated = self._terminate.is_set
except AttributeError:
# Jython 2.7.0a2
terminated = self._terminate.isSet
bufsize = self._bufsize
while not terminated():
buf = f.read(bufsize)
with self._thread_rlock:
if terminated():
break
elif buf:
self._read_data.append(buf)
else:
del self._threads[f]
if not self._threads:
# Thread-safe callback to EventLoop
self.scheduler.idle_add(self._eof)
break
f.close()
def _eof(self):
self._registered = False
if self.returncode is None:
self.returncode = os.EX_OK
self.wait()
return False
def _cancel(self):
self._terminate.set()
self._registered = False
if self.returncode is None:
self.returncode = self._cancelled_returncode
self.wait()
def _wait(self):
if self.returncode is not None:
return self.returncode
self._wait_loop()
self.returncode = os.EX_OK
return self.returncode
def getvalue(self):
"""Retrieve the entire contents"""
with self._thread_rlock:
return b''.join(self._read_data)
def close(self):
"""Free the memory buffer."""
with self._thread_rlock:
self._read_data = None