diff options
author | shadchin <shadchin@yandex-team.com> | 2024-02-12 07:53:52 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@ydb.tech> | 2024-02-14 14:26:16 +0000 |
commit | 31f2a419764a8ba77c2a970cfc80056c6cd06756 (patch) | |
tree | c1995d239eba8571cefc640f6648e1d5dd4ce9e2 /contrib/tools/python3/src/Lib/asyncio/selector_events.py | |
parent | fe2ef02b38d9c85d80060963b265a1df9f38c3bb (diff) | |
download | ydb-31f2a419764a8ba77c2a970cfc80056c6cd06756.tar.gz |
Update Python from 3.11.8 to 3.12.2
Diffstat (limited to 'contrib/tools/python3/src/Lib/asyncio/selector_events.py')
-rw-r--r-- | contrib/tools/python3/src/Lib/asyncio/selector_events.py | 99 |
1 files changed, 87 insertions, 12 deletions
diff --git a/contrib/tools/python3/src/Lib/asyncio/selector_events.py b/contrib/tools/python3/src/Lib/asyncio/selector_events.py index 40df1b7da5..790711f834 100644 --- a/contrib/tools/python3/src/Lib/asyncio/selector_events.py +++ b/contrib/tools/python3/src/Lib/asyncio/selector_events.py @@ -9,6 +9,8 @@ __all__ = 'BaseSelectorEventLoop', import collections import errno import functools +import itertools +import os import selectors import socket import warnings @@ -28,6 +30,14 @@ from . import transports from . import trsock from .log import logger +_HAS_SENDMSG = hasattr(socket.socket, 'sendmsg') + +if _HAS_SENDMSG: + try: + SC_IOV_MAX = os.sysconf('SC_IOV_MAX') + except OSError: + # Fallback to send + _HAS_SENDMSG = False def _test_selector_event(selector, fd, event): # Test if the selector is monitoring 'event' events @@ -58,6 +68,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): def _make_socket_transport(self, sock, protocol, waiter=None, *, extra=None, server=None): + self._ensure_fd_no_transport(sock) return _SelectorSocketTransport(self, sock, protocol, waiter, extra, server) @@ -68,6 +79,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT, ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT, ): + self._ensure_fd_no_transport(rawsock) ssl_protocol = sslproto.SSLProtocol( self, protocol, sslcontext, waiter, server_side, server_hostname, @@ -80,6 +92,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): def _make_datagram_transport(self, sock, protocol, address=None, waiter=None, extra=None): + self._ensure_fd_no_transport(sock) return _SelectorDatagramTransport(self, sock, protocol, address, waiter, extra) @@ -758,8 +771,6 @@ class _SelectorTransport(transports._FlowControlMixin, max_size = 256 * 1024 # Buffer size passed to recv(). - _buffer_factory = bytearray # Constructs initial value for self._buffer. - # Attribute used in the destructor: it must be set even if the constructor # is not called (see _SelectorSslTransport which may start by raising an # exception) @@ -784,7 +795,7 @@ class _SelectorTransport(transports._FlowControlMixin, self.set_protocol(protocol) self._server = server - self._buffer = self._buffer_factory() + self._buffer = collections.deque() self._conn_lost = 0 # Set when call to connection_lost scheduled. self._closing = False # Set when close() called. self._paused = False # Set when pause_reading() called @@ -909,7 +920,7 @@ class _SelectorTransport(transports._FlowControlMixin, self._server = None def get_write_buffer_size(self): - return len(self._buffer) + return sum(map(len, self._buffer)) def _add_reader(self, fd, callback, *args): if not self.is_reading(): @@ -929,7 +940,10 @@ class _SelectorSocketTransport(_SelectorTransport): super().__init__(loop, sock, protocol, extra, server) self._eof = False self._empty_waiter = None - + if _HAS_SENDMSG: + self._write_ready = self._write_sendmsg + else: + self._write_ready = self._write_send # Disable the Nagle algorithm -- small writes will be # sent without waiting for the TCP ACK. This generally # decreases the latency (in some cases significantly.) @@ -1067,23 +1081,68 @@ class _SelectorSocketTransport(_SelectorTransport): self._fatal_error(exc, 'Fatal write error on socket transport') return else: - data = data[n:] + data = memoryview(data)[n:] if not data: return # Not all was written; register write handler. self._loop._add_writer(self._sock_fd, self._write_ready) # Add it to the buffer. - self._buffer.extend(data) + self._buffer.append(data) self._maybe_pause_protocol() - def _write_ready(self): + def _get_sendmsg_buffer(self): + return itertools.islice(self._buffer, SC_IOV_MAX) + + def _write_sendmsg(self): assert self._buffer, 'Data should not be empty' + if self._conn_lost: + return + try: + nbytes = self._sock.sendmsg(self._get_sendmsg_buffer()) + self._adjust_leftover_buffer(nbytes) + except (BlockingIOError, InterruptedError): + pass + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: + self._loop._remove_writer(self._sock_fd) + self._buffer.clear() + self._fatal_error(exc, 'Fatal write error on socket transport') + if self._empty_waiter is not None: + self._empty_waiter.set_exception(exc) + else: + self._maybe_resume_protocol() # May append to buffer. + if not self._buffer: + self._loop._remove_writer(self._sock_fd) + if self._empty_waiter is not None: + self._empty_waiter.set_result(None) + if self._closing: + self._call_connection_lost(None) + elif self._eof: + self._sock.shutdown(socket.SHUT_WR) + def _adjust_leftover_buffer(self, nbytes: int) -> None: + buffer = self._buffer + while nbytes: + b = buffer.popleft() + b_len = len(b) + if b_len <= nbytes: + nbytes -= b_len + else: + buffer.appendleft(b[nbytes:]) + break + + def _write_send(self): + assert self._buffer, 'Data should not be empty' if self._conn_lost: return try: - n = self._sock.send(self._buffer) + buffer = self._buffer.popleft() + n = self._sock.send(buffer) + if n != len(buffer): + # Not all data was written + self._buffer.appendleft(buffer[n:]) except (BlockingIOError, InterruptedError): pass except (SystemExit, KeyboardInterrupt): @@ -1095,8 +1154,6 @@ class _SelectorSocketTransport(_SelectorTransport): if self._empty_waiter is not None: self._empty_waiter.set_exception(exc) else: - if n: - del self._buffer[:n] self._maybe_resume_protocol() # May append to buffer. if not self._buffer: self._loop._remove_writer(self._sock_fd) @@ -1114,6 +1171,19 @@ class _SelectorSocketTransport(_SelectorTransport): if not self._buffer: self._sock.shutdown(socket.SHUT_WR) + def writelines(self, list_of_data): + if self._eof: + raise RuntimeError('Cannot call writelines() after write_eof()') + if self._empty_waiter is not None: + raise RuntimeError('unable to writelines; sendfile is in progress') + if not list_of_data: + return + self._buffer.extend([memoryview(data) for data in list_of_data]) + self._write_ready() + # If the entire buffer couldn't be written, register a write handler + if self._buffer: + self._loop._add_writer(self._sock_fd, self._write_ready) + def can_write_eof(self): return True @@ -1134,8 +1204,13 @@ class _SelectorSocketTransport(_SelectorTransport): def _reset_empty_waiter(self): self._empty_waiter = None + def close(self): + self._read_ready_cb = None + self._write_ready = None + super().close() + -class _SelectorDatagramTransport(_SelectorTransport): +class _SelectorDatagramTransport(_SelectorTransport, transports.DatagramTransport): _buffer_factory = collections.deque |