blob: 650a16491ff23f80dc93227e98e7a769fb9504c7 [file] [log] [blame]
# Copyright 2018 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2
import errno
import os
import portage
portage.proxy.lazyimport.lazyimport(globals(),
'_emerge.PipeReader:PipeReader',
'portage.util.futures:asyncio',
'portage.util.futures.unix_events:_set_nonblocking',
)
from portage.util.futures.compat_coroutine import coroutine
def _reader(input_file, loop=None):
"""
Asynchronously read a binary input file, and close it when
it reaches EOF.
@param input_file: binary input file descriptor
@type input_file: file or int
@param loop: asyncio.AbstractEventLoop (or compatible)
@type loop: event loop
@return: bytes
@rtype: asyncio.Future (or compatible)
"""
loop = asyncio._wrap_loop(loop)
future = loop.create_future()
_Reader(future, input_file, loop)
return future
class _Reader(object):
def __init__(self, future, input_file, loop):
self._future = future
self._pipe_reader = PipeReader(
input_files={'input_file':input_file}, scheduler=loop)
self._future.add_done_callback(self._cancel_callback)
self._pipe_reader.addExitListener(self._eof)
self._pipe_reader.start()
def _cancel_callback(self, future):
if future.cancelled():
self._cancel()
def _eof(self, pipe_reader):
self._pipe_reader = None
self._future.set_result(pipe_reader.getvalue())
def _cancel(self):
if self._pipe_reader is not None and self._pipe_reader.poll() is None:
self._pipe_reader.removeExitListener(self._eof)
self._pipe_reader.cancel()
self._pipe_reader = None
@coroutine
def _writer(output_file, content, loop=None):
"""
Asynchronously write bytes to output file, and close it when
done. If an EnvironmentError other than EAGAIN is encountered,
which typically indicates that the other end of the pipe has
close, the error is raised. This function is a coroutine.
@param output_file: output file descriptor
@type output_file: file or int
@param content: content to write
@type content: bytes
@param loop: asyncio.AbstractEventLoop (or compatible)
@type loop: event loop
"""
fd = output_file if isinstance(output_file, int) else output_file.fileno()
_set_nonblocking(fd)
loop = asyncio._wrap_loop(loop)
try:
while content:
waiter = loop.create_future()
loop.add_writer(fd, lambda: waiter.set_result(None))
try:
yield waiter
while content:
try:
content = content[os.write(fd, content):]
except EnvironmentError as e:
if e.errno == errno.EAGAIN:
break
else:
raise
finally:
loop.remove_writer(fd)
except GeneratorExit:
raise
finally:
os.close(output_file) if isinstance(output_file, int) else output_file.close()