| # Copyright 2018 Gentoo Foundation |
| # Distributed under the terms of the GNU General Public License v2 |
| |
| import errno |
| import os |
| |
| import portage |
| portage.proxy.lazyimport.lazyimport(globals(), |
| '_emerge.PipeReader:PipeReader', |
| 'portage.util.futures:asyncio', |
| 'portage.util.futures.unix_events:_set_nonblocking', |
| ) |
| from portage.util.futures.compat_coroutine import coroutine |
| |
| |
| def _reader(input_file, loop=None): |
| """ |
| Asynchronously read a binary input file, and close it when |
| it reaches EOF. |
| |
| @param input_file: binary input file descriptor |
| @type input_file: file or int |
| @param loop: asyncio.AbstractEventLoop (or compatible) |
| @type loop: event loop |
| @return: bytes |
| @rtype: asyncio.Future (or compatible) |
| """ |
| loop = asyncio._wrap_loop(loop) |
| future = loop.create_future() |
| _Reader(future, input_file, loop) |
| return future |
| |
| |
| class _Reader(object): |
| def __init__(self, future, input_file, loop): |
| self._future = future |
| self._pipe_reader = PipeReader( |
| input_files={'input_file':input_file}, scheduler=loop) |
| |
| self._future.add_done_callback(self._cancel_callback) |
| self._pipe_reader.addExitListener(self._eof) |
| self._pipe_reader.start() |
| |
| def _cancel_callback(self, future): |
| if future.cancelled(): |
| self._cancel() |
| |
| def _eof(self, pipe_reader): |
| self._pipe_reader = None |
| self._future.set_result(pipe_reader.getvalue()) |
| |
| def _cancel(self): |
| if self._pipe_reader is not None and self._pipe_reader.poll() is None: |
| self._pipe_reader.removeExitListener(self._eof) |
| self._pipe_reader.cancel() |
| self._pipe_reader = None |
| |
| |
| @coroutine |
| def _writer(output_file, content, loop=None): |
| """ |
| Asynchronously write bytes to output file, and close it when |
| done. If an EnvironmentError other than EAGAIN is encountered, |
| which typically indicates that the other end of the pipe has |
| close, the error is raised. This function is a coroutine. |
| |
| @param output_file: output file descriptor |
| @type output_file: file or int |
| @param content: content to write |
| @type content: bytes |
| @param loop: asyncio.AbstractEventLoop (or compatible) |
| @type loop: event loop |
| """ |
| fd = output_file if isinstance(output_file, int) else output_file.fileno() |
| _set_nonblocking(fd) |
| loop = asyncio._wrap_loop(loop) |
| try: |
| while content: |
| waiter = loop.create_future() |
| loop.add_writer(fd, lambda: waiter.set_result(None)) |
| try: |
| yield waiter |
| while content: |
| try: |
| content = content[os.write(fd, content):] |
| except EnvironmentError as e: |
| if e.errno == errno.EAGAIN: |
| break |
| else: |
| raise |
| finally: |
| loop.remove_writer(fd) |
| except GeneratorExit: |
| raise |
| finally: |
| os.close(output_file) if isinstance(output_file, int) else output_file.close() |