aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/tools/python3/src/Lib/asyncio/selector_events.py
diff options
context:
space:
mode:
authorshadchin <shadchin@yandex-team.com>2024-02-12 07:53:52 +0300
committerDaniil Cherednik <dcherednik@ydb.tech>2024-02-14 14:26:16 +0000
commit31f2a419764a8ba77c2a970cfc80056c6cd06756 (patch)
treec1995d239eba8571cefc640f6648e1d5dd4ce9e2 /contrib/tools/python3/src/Lib/asyncio/selector_events.py
parentfe2ef02b38d9c85d80060963b265a1df9f38c3bb (diff)
downloadydb-31f2a419764a8ba77c2a970cfc80056c6cd06756.tar.gz
Update Python from 3.11.8 to 3.12.2
Diffstat (limited to 'contrib/tools/python3/src/Lib/asyncio/selector_events.py')
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/selector_events.py99
1 files changed, 87 insertions, 12 deletions
diff --git a/contrib/tools/python3/src/Lib/asyncio/selector_events.py b/contrib/tools/python3/src/Lib/asyncio/selector_events.py
index 40df1b7da5..790711f834 100644
--- a/contrib/tools/python3/src/Lib/asyncio/selector_events.py
+++ b/contrib/tools/python3/src/Lib/asyncio/selector_events.py
@@ -9,6 +9,8 @@ __all__ = 'BaseSelectorEventLoop',
import collections
import errno
import functools
+import itertools
+import os
import selectors
import socket
import warnings
@@ -28,6 +30,14 @@ from . import transports
from . import trsock
from .log import logger
+_HAS_SENDMSG = hasattr(socket.socket, 'sendmsg')
+
+if _HAS_SENDMSG:
+ try:
+ SC_IOV_MAX = os.sysconf('SC_IOV_MAX')
+ except OSError:
+ # Fallback to send
+ _HAS_SENDMSG = False
def _test_selector_event(selector, fd, event):
# Test if the selector is monitoring 'event' events
@@ -58,6 +68,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
def _make_socket_transport(self, sock, protocol, waiter=None, *,
extra=None, server=None):
+ self._ensure_fd_no_transport(sock)
return _SelectorSocketTransport(self, sock, protocol, waiter,
extra, server)
@@ -68,6 +79,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT,
):
+ self._ensure_fd_no_transport(rawsock)
ssl_protocol = sslproto.SSLProtocol(
self, protocol, sslcontext, waiter,
server_side, server_hostname,
@@ -80,6 +92,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
def _make_datagram_transport(self, sock, protocol,
address=None, waiter=None, extra=None):
+ self._ensure_fd_no_transport(sock)
return _SelectorDatagramTransport(self, sock, protocol,
address, waiter, extra)
@@ -758,8 +771,6 @@ class _SelectorTransport(transports._FlowControlMixin,
max_size = 256 * 1024 # Buffer size passed to recv().
- _buffer_factory = bytearray # Constructs initial value for self._buffer.
-
# Attribute used in the destructor: it must be set even if the constructor
# is not called (see _SelectorSslTransport which may start by raising an
# exception)
@@ -784,7 +795,7 @@ class _SelectorTransport(transports._FlowControlMixin,
self.set_protocol(protocol)
self._server = server
- self._buffer = self._buffer_factory()
+ self._buffer = collections.deque()
self._conn_lost = 0 # Set when call to connection_lost scheduled.
self._closing = False # Set when close() called.
self._paused = False # Set when pause_reading() called
@@ -909,7 +920,7 @@ class _SelectorTransport(transports._FlowControlMixin,
self._server = None
def get_write_buffer_size(self):
- return len(self._buffer)
+ return sum(map(len, self._buffer))
def _add_reader(self, fd, callback, *args):
if not self.is_reading():
@@ -929,7 +940,10 @@ class _SelectorSocketTransport(_SelectorTransport):
super().__init__(loop, sock, protocol, extra, server)
self._eof = False
self._empty_waiter = None
-
+ if _HAS_SENDMSG:
+ self._write_ready = self._write_sendmsg
+ else:
+ self._write_ready = self._write_send
# Disable the Nagle algorithm -- small writes will be
# sent without waiting for the TCP ACK. This generally
# decreases the latency (in some cases significantly.)
@@ -1067,23 +1081,68 @@ class _SelectorSocketTransport(_SelectorTransport):
self._fatal_error(exc, 'Fatal write error on socket transport')
return
else:
- data = data[n:]
+ data = memoryview(data)[n:]
if not data:
return
# Not all was written; register write handler.
self._loop._add_writer(self._sock_fd, self._write_ready)
# Add it to the buffer.
- self._buffer.extend(data)
+ self._buffer.append(data)
self._maybe_pause_protocol()
- def _write_ready(self):
+ def _get_sendmsg_buffer(self):
+ return itertools.islice(self._buffer, SC_IOV_MAX)
+
+ def _write_sendmsg(self):
assert self._buffer, 'Data should not be empty'
+ if self._conn_lost:
+ return
+ try:
+ nbytes = self._sock.sendmsg(self._get_sendmsg_buffer())
+ self._adjust_leftover_buffer(nbytes)
+ except (BlockingIOError, InterruptedError):
+ pass
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
+ self._loop._remove_writer(self._sock_fd)
+ self._buffer.clear()
+ self._fatal_error(exc, 'Fatal write error on socket transport')
+ if self._empty_waiter is not None:
+ self._empty_waiter.set_exception(exc)
+ else:
+ self._maybe_resume_protocol() # May append to buffer.
+ if not self._buffer:
+ self._loop._remove_writer(self._sock_fd)
+ if self._empty_waiter is not None:
+ self._empty_waiter.set_result(None)
+ if self._closing:
+ self._call_connection_lost(None)
+ elif self._eof:
+ self._sock.shutdown(socket.SHUT_WR)
+ def _adjust_leftover_buffer(self, nbytes: int) -> None:
+ buffer = self._buffer
+ while nbytes:
+ b = buffer.popleft()
+ b_len = len(b)
+ if b_len <= nbytes:
+ nbytes -= b_len
+ else:
+ buffer.appendleft(b[nbytes:])
+ break
+
+ def _write_send(self):
+ assert self._buffer, 'Data should not be empty'
if self._conn_lost:
return
try:
- n = self._sock.send(self._buffer)
+ buffer = self._buffer.popleft()
+ n = self._sock.send(buffer)
+ if n != len(buffer):
+ # Not all data was written
+ self._buffer.appendleft(buffer[n:])
except (BlockingIOError, InterruptedError):
pass
except (SystemExit, KeyboardInterrupt):
@@ -1095,8 +1154,6 @@ class _SelectorSocketTransport(_SelectorTransport):
if self._empty_waiter is not None:
self._empty_waiter.set_exception(exc)
else:
- if n:
- del self._buffer[:n]
self._maybe_resume_protocol() # May append to buffer.
if not self._buffer:
self._loop._remove_writer(self._sock_fd)
@@ -1114,6 +1171,19 @@ class _SelectorSocketTransport(_SelectorTransport):
if not self._buffer:
self._sock.shutdown(socket.SHUT_WR)
+ def writelines(self, list_of_data):
+ if self._eof:
+ raise RuntimeError('Cannot call writelines() after write_eof()')
+ if self._empty_waiter is not None:
+ raise RuntimeError('unable to writelines; sendfile is in progress')
+ if not list_of_data:
+ return
+ self._buffer.extend([memoryview(data) for data in list_of_data])
+ self._write_ready()
+ # If the entire buffer couldn't be written, register a write handler
+ if self._buffer:
+ self._loop._add_writer(self._sock_fd, self._write_ready)
+
def can_write_eof(self):
return True
@@ -1134,8 +1204,13 @@ class _SelectorSocketTransport(_SelectorTransport):
def _reset_empty_waiter(self):
self._empty_waiter = None
+ def close(self):
+ self._read_ready_cb = None
+ self._write_ready = None
+ super().close()
+
-class _SelectorDatagramTransport(_SelectorTransport):
+class _SelectorDatagramTransport(_SelectorTransport, transports.DatagramTransport):
_buffer_factory = collections.deque