From ce1b7ca3171f9158180640c6a02a74b4afffedea Mon Sep 17 00:00:00 2001
From: shadchin <shadchin@yandex-team.com>
Date: Mon, 12 Feb 2024 07:53:52 +0300
Subject: Update Python from 3.11.8 to 3.12.2

---
 .../python3/src/Lib/asyncio/selector_events.py     | 99 +++++++++++++++++++---
 1 file changed, 87 insertions(+), 12 deletions(-)

(limited to 'contrib/tools/python3/src/Lib/asyncio/selector_events.py')

diff --git a/contrib/tools/python3/src/Lib/asyncio/selector_events.py b/contrib/tools/python3/src/Lib/asyncio/selector_events.py
index 40df1b7da5..790711f834 100644
--- a/contrib/tools/python3/src/Lib/asyncio/selector_events.py
+++ b/contrib/tools/python3/src/Lib/asyncio/selector_events.py
@@ -9,6 +9,8 @@ __all__ = 'BaseSelectorEventLoop',
 import collections
 import errno
 import functools
+import itertools
+import os
 import selectors
 import socket
 import warnings
@@ -28,6 +30,14 @@ from . import transports
 from . import trsock
 from .log import logger
 
+_HAS_SENDMSG = hasattr(socket.socket, 'sendmsg')
+
+if _HAS_SENDMSG:
+    try:
+        SC_IOV_MAX = os.sysconf('SC_IOV_MAX')
+    except OSError:
+        # Fallback to send
+        _HAS_SENDMSG = False
 
 def _test_selector_event(selector, fd, event):
     # Test if the selector is monitoring 'event' events
@@ -58,6 +68,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
 
     def _make_socket_transport(self, sock, protocol, waiter=None, *,
                                extra=None, server=None):
+        self._ensure_fd_no_transport(sock)
         return _SelectorSocketTransport(self, sock, protocol, waiter,
                                         extra, server)
 
@@ -68,6 +79,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
             ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
             ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT,
     ):
+        self._ensure_fd_no_transport(rawsock)
         ssl_protocol = sslproto.SSLProtocol(
             self, protocol, sslcontext, waiter,
             server_side, server_hostname,
@@ -80,6 +92,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
 
     def _make_datagram_transport(self, sock, protocol,
                                  address=None, waiter=None, extra=None):
+        self._ensure_fd_no_transport(sock)
         return _SelectorDatagramTransport(self, sock, protocol,
                                           address, waiter, extra)
 
@@ -758,8 +771,6 @@ class _SelectorTransport(transports._FlowControlMixin,
 
     max_size = 256 * 1024  # Buffer size passed to recv().
 
-    _buffer_factory = bytearray  # Constructs initial value for self._buffer.
-
     # Attribute used in the destructor: it must be set even if the constructor
     # is not called (see _SelectorSslTransport which may start by raising an
     # exception)
@@ -784,7 +795,7 @@ class _SelectorTransport(transports._FlowControlMixin,
         self.set_protocol(protocol)
 
         self._server = server
-        self._buffer = self._buffer_factory()
+        self._buffer = collections.deque()
         self._conn_lost = 0  # Set when call to connection_lost scheduled.
         self._closing = False  # Set when close() called.
         self._paused = False  # Set when pause_reading() called
@@ -909,7 +920,7 @@ class _SelectorTransport(transports._FlowControlMixin,
                 self._server = None
 
     def get_write_buffer_size(self):
-        return len(self._buffer)
+        return sum(map(len, self._buffer))
 
     def _add_reader(self, fd, callback, *args):
         if not self.is_reading():
@@ -929,7 +940,10 @@ class _SelectorSocketTransport(_SelectorTransport):
         super().__init__(loop, sock, protocol, extra, server)
         self._eof = False
         self._empty_waiter = None
-
+        if _HAS_SENDMSG:
+            self._write_ready = self._write_sendmsg
+        else:
+            self._write_ready = self._write_send
         # Disable the Nagle algorithm -- small writes will be
         # sent without waiting for the TCP ACK.  This generally
         # decreases the latency (in some cases significantly.)
@@ -1067,23 +1081,68 @@ class _SelectorSocketTransport(_SelectorTransport):
                 self._fatal_error(exc, 'Fatal write error on socket transport')
                 return
             else:
-                data = data[n:]
+                data = memoryview(data)[n:]
                 if not data:
                     return
             # Not all was written; register write handler.
             self._loop._add_writer(self._sock_fd, self._write_ready)
 
         # Add it to the buffer.
-        self._buffer.extend(data)
+        self._buffer.append(data)
         self._maybe_pause_protocol()
 
-    def _write_ready(self):
+    def _get_sendmsg_buffer(self):
+        return itertools.islice(self._buffer, SC_IOV_MAX)
+
+    def _write_sendmsg(self):
         assert self._buffer, 'Data should not be empty'
+        if self._conn_lost:
+            return
+        try:
+            nbytes = self._sock.sendmsg(self._get_sendmsg_buffer())
+            self._adjust_leftover_buffer(nbytes)
+        except (BlockingIOError, InterruptedError):
+            pass
+        except (SystemExit, KeyboardInterrupt):
+            raise
+        except BaseException as exc:
+            self._loop._remove_writer(self._sock_fd)
+            self._buffer.clear()
+            self._fatal_error(exc, 'Fatal write error on socket transport')
+            if self._empty_waiter is not None:
+                self._empty_waiter.set_exception(exc)
+        else:
+            self._maybe_resume_protocol()  # May append to buffer.
+            if not self._buffer:
+                self._loop._remove_writer(self._sock_fd)
+                if self._empty_waiter is not None:
+                    self._empty_waiter.set_result(None)
+                if self._closing:
+                    self._call_connection_lost(None)
+                elif self._eof:
+                    self._sock.shutdown(socket.SHUT_WR)
 
+    def _adjust_leftover_buffer(self, nbytes: int) -> None:
+        buffer = self._buffer
+        while nbytes:
+            b = buffer.popleft()
+            b_len = len(b)
+            if b_len <= nbytes:
+                nbytes -= b_len
+            else:
+                buffer.appendleft(b[nbytes:])
+                break
+
+    def _write_send(self):
+        assert self._buffer, 'Data should not be empty'
         if self._conn_lost:
             return
         try:
-            n = self._sock.send(self._buffer)
+            buffer = self._buffer.popleft()
+            n = self._sock.send(buffer)
+            if n != len(buffer):
+                # Not all data was written
+                self._buffer.appendleft(buffer[n:])
         except (BlockingIOError, InterruptedError):
             pass
         except (SystemExit, KeyboardInterrupt):
@@ -1095,8 +1154,6 @@ class _SelectorSocketTransport(_SelectorTransport):
             if self._empty_waiter is not None:
                 self._empty_waiter.set_exception(exc)
         else:
-            if n:
-                del self._buffer[:n]
             self._maybe_resume_protocol()  # May append to buffer.
             if not self._buffer:
                 self._loop._remove_writer(self._sock_fd)
@@ -1114,6 +1171,19 @@ class _SelectorSocketTransport(_SelectorTransport):
         if not self._buffer:
             self._sock.shutdown(socket.SHUT_WR)
 
+    def writelines(self, list_of_data):
+        if self._eof:
+            raise RuntimeError('Cannot call writelines() after write_eof()')
+        if self._empty_waiter is not None:
+            raise RuntimeError('unable to writelines; sendfile is in progress')
+        if not list_of_data:
+            return
+        self._buffer.extend([memoryview(data) for data in list_of_data])
+        self._write_ready()
+        # If the entire buffer couldn't be written, register a write handler
+        if self._buffer:
+            self._loop._add_writer(self._sock_fd, self._write_ready)
+
     def can_write_eof(self):
         return True
 
@@ -1134,8 +1204,13 @@ class _SelectorSocketTransport(_SelectorTransport):
     def _reset_empty_waiter(self):
         self._empty_waiter = None
 
+    def close(self):
+        self._read_ready_cb = None
+        self._write_ready = None
+        super().close()
+
 
-class _SelectorDatagramTransport(_SelectorTransport):
+class _SelectorDatagramTransport(_SelectorTransport, transports.DatagramTransport):
 
     _buffer_factory = collections.deque
 
-- 
cgit v1.2.3