| # Copyright 2018 Gentoo Foundation |
| # Distributed under the terms of the GNU General Public License v2 |
| |
| try: |
| from asyncio.transports import Transport as _Transport |
| except ImportError: |
| _Transport = object |
| |
| |
| class _FlowControlMixin(_Transport): |
| """ |
| This is identical to the standard library's private |
| asyncio.transports._FlowControlMixin class. |
| |
| All the logic for (write) flow control in a mix-in base class. |
| |
| The subclass must implement get_write_buffer_size(). It must call |
| _maybe_pause_protocol() whenever the write buffer size increases, |
| and _maybe_resume_protocol() whenever it decreases. It may also |
| override set_write_buffer_limits() (e.g. to specify different |
| defaults). |
| |
| The subclass constructor must call super().__init__(extra). This |
| will call set_write_buffer_limits(). |
| |
| The user may call set_write_buffer_limits() and |
| get_write_buffer_size(), and their protocol's pause_writing() and |
| resume_writing() may be called. |
| """ |
| |
| def __init__(self, extra=None, loop=None): |
| super().__init__(extra) |
| assert loop is not None |
| self._loop = loop |
| self._protocol_paused = False |
| self._set_write_buffer_limits() |
| |
| def _maybe_pause_protocol(self): |
| size = self.get_write_buffer_size() |
| if size <= self._high_water: |
| return |
| if not self._protocol_paused: |
| self._protocol_paused = True |
| try: |
| self._protocol.pause_writing() |
| except Exception as exc: |
| self._loop.call_exception_handler({ |
| 'message': 'protocol.pause_writing() failed', |
| 'exception': exc, |
| 'transport': self, |
| 'protocol': self._protocol, |
| }) |
| |
| def _maybe_resume_protocol(self): |
| if (self._protocol_paused and |
| self.get_write_buffer_size() <= self._low_water): |
| self._protocol_paused = False |
| try: |
| self._protocol.resume_writing() |
| except Exception as exc: |
| self._loop.call_exception_handler({ |
| 'message': 'protocol.resume_writing() failed', |
| 'exception': exc, |
| 'transport': self, |
| 'protocol': self._protocol, |
| }) |
| |
| def get_write_buffer_limits(self): |
| return (self._low_water, self._high_water) |
| |
| def _set_write_buffer_limits(self, high=None, low=None): |
| if high is None: |
| if low is None: |
| high = 64*1024 |
| else: |
| high = 4*low |
| if low is None: |
| low = high // 4 |
| if not high >= low >= 0: |
| raise ValueError('high (%r) must be >= low (%r) must be >= 0' % |
| (high, low)) |
| self._high_water = high |
| self._low_water = low |
| |
| def set_write_buffer_limits(self, high=None, low=None): |
| self._set_write_buffer_limits(high=high, low=low) |
| self._maybe_pause_protocol() |
| |
| def get_write_buffer_size(self): |
| raise NotImplementedError |