aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/tools/python3/src/Lib/asyncio
diff options
context:
space:
mode:
authorshadchin <shadchin@yandex-team.com>2024-02-12 07:53:52 +0300
committershadchin <shadchin@yandex-team.com>2024-02-12 08:07:36 +0300
commitce1b7ca3171f9158180640c6a02a74b4afffedea (patch)
treee47c1e8391b1b0128262c1e9b1e6ed4c8fff2348 /contrib/tools/python3/src/Lib/asyncio
parent57350d96f030db90f220ce50ee591d5c5d403df7 (diff)
downloadydb-ce1b7ca3171f9158180640c6a02a74b4afffedea.tar.gz
Update Python from 3.11.8 to 3.12.2
Diffstat (limited to 'contrib/tools/python3/src/Lib/asyncio')
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/__init__.py1
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/base_events.py63
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/base_futures.py1
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/base_tasks.py10
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/constants.py3
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/coroutines.py4
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/events.py44
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/futures.py4
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/locks.py1
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/proactor_events.py3
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/runners.py18
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/selector_events.py99
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/streams.py14
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/subprocess.py11
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/taskgroups.py12
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/tasks.py267
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/unix_events.py108
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/windows_events.py82
18 files changed, 479 insertions, 266 deletions
diff --git a/contrib/tools/python3/src/Lib/asyncio/__init__.py b/contrib/tools/python3/src/Lib/asyncio/__init__.py
index fed16ec7c6..03165a425e 100644
--- a/contrib/tools/python3/src/Lib/asyncio/__init__.py
+++ b/contrib/tools/python3/src/Lib/asyncio/__init__.py
@@ -34,6 +34,7 @@ __all__ = (base_events.__all__ +
streams.__all__ +
subprocess.__all__ +
tasks.__all__ +
+ taskgroups.__all__ +
threads.__all__ +
timeouts.__all__ +
transports.__all__)
diff --git a/contrib/tools/python3/src/Lib/asyncio/base_events.py b/contrib/tools/python3/src/Lib/asyncio/base_events.py
index 5fa28cae9c..c16c445bde 100644
--- a/contrib/tools/python3/src/Lib/asyncio/base_events.py
+++ b/contrib/tools/python3/src/Lib/asyncio/base_events.py
@@ -306,7 +306,7 @@ class Server(events.AbstractServer):
self._waiters = None
for waiter in waiters:
if not waiter.done():
- waiter.set_result(waiter)
+ waiter.set_result(None)
def _start_serving(self):
if self._serving:
@@ -378,7 +378,27 @@ class Server(events.AbstractServer):
self._serving_forever_fut = None
async def wait_closed(self):
- if self._sockets is None or self._waiters is None:
+ """Wait until server is closed and all connections are dropped.
+
+ - If the server is not closed, wait.
+ - If it is closed, but there are still active connections, wait.
+
+ Anyone waiting here will be unblocked once both conditions
+ (server is closed and all connections have been dropped)
+ have become true, in either order.
+
+ Historical note: In 3.11 and before, this was broken, returning
+ immediately if the server was already closed, even if there
+ were still active connections. An attempted fix in 3.12.0 was
+ still broken, returning immediately if the server was still
+ open and there were no active connections. Hopefully in 3.12.1
+ we have it right.
+ """
+ # Waiters are unblocked by self._wakeup(), which is called
+ # from two places: self.close() and self._detach(), but only
+ # when both conditions have become true. To signal that this
+ # has happened, self._wakeup() sets self._waiters to None.
+ if self._waiters is None:
return
waiter = self._loop.create_future()
self._waiters.append(waiter)
@@ -562,8 +582,13 @@ class BaseEventLoop(events.AbstractEventLoop):
'asyncgen': agen
})
- async def shutdown_default_executor(self):
- """Schedule the shutdown of the default executor."""
+ async def shutdown_default_executor(self, timeout=None):
+ """Schedule the shutdown of the default executor.
+
+ The timeout parameter specifies the amount of time the executor will
+ be given to finish joining. The default value is None, which means
+ that the executor will be given an unlimited amount of time.
+ """
self._executor_shutdown_called = True
if self._default_executor is None:
return
@@ -573,7 +598,13 @@ class BaseEventLoop(events.AbstractEventLoop):
try:
await future
finally:
- thread.join()
+ thread.join(timeout)
+
+ if thread.is_alive():
+ warnings.warn("The executor did not finishing joining "
+ f"its threads within {timeout} seconds.",
+ RuntimeWarning, stacklevel=2)
+ self._default_executor.shutdown(wait=False)
def _do_shutdown(self, future):
try:
@@ -992,7 +1023,8 @@ class BaseEventLoop(events.AbstractEventLoop):
local_addr=None, server_hostname=None,
ssl_handshake_timeout=None,
ssl_shutdown_timeout=None,
- happy_eyeballs_delay=None, interleave=None):
+ happy_eyeballs_delay=None, interleave=None,
+ all_errors=False):
"""Connect to a TCP server.
Create a streaming transport connection to a given internet host and
@@ -1082,6 +1114,8 @@ class BaseEventLoop(events.AbstractEventLoop):
if sock is None:
exceptions = [exc for sub in exceptions for exc in sub]
try:
+ if all_errors:
+ raise ExceptionGroup("create_connection failed", exceptions)
if len(exceptions) == 1:
raise exceptions[0]
else:
@@ -1819,7 +1853,22 @@ class BaseEventLoop(events.AbstractEventLoop):
exc_info=True)
else:
try:
- self._exception_handler(self, context)
+ ctx = None
+ thing = context.get("task")
+ if thing is None:
+ # Even though Futures don't have a context,
+ # Task is a subclass of Future,
+ # and sometimes the 'future' key holds a Task.
+ thing = context.get("future")
+ if thing is None:
+ # Handles also have a context.
+ thing = context.get("handle")
+ if thing is not None and hasattr(thing, "get_context"):
+ ctx = thing.get_context()
+ if ctx is not None and hasattr(ctx, "run"):
+ ctx.run(self._exception_handler, self, context)
+ else:
+ self._exception_handler(self, context)
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
diff --git a/contrib/tools/python3/src/Lib/asyncio/base_futures.py b/contrib/tools/python3/src/Lib/asyncio/base_futures.py
index cd811a788c..7987963bd9 100644
--- a/contrib/tools/python3/src/Lib/asyncio/base_futures.py
+++ b/contrib/tools/python3/src/Lib/asyncio/base_futures.py
@@ -1,7 +1,6 @@
__all__ = ()
import reprlib
-from _thread import get_ident
from . import format_helpers
diff --git a/contrib/tools/python3/src/Lib/asyncio/base_tasks.py b/contrib/tools/python3/src/Lib/asyncio/base_tasks.py
index 26298e638c..c907b68341 100644
--- a/contrib/tools/python3/src/Lib/asyncio/base_tasks.py
+++ b/contrib/tools/python3/src/Lib/asyncio/base_tasks.py
@@ -15,11 +15,13 @@ def _task_repr_info(task):
info.insert(1, 'name=%r' % task.get_name())
- coro = coroutines._format_coroutine(task._coro)
- info.insert(2, f'coro=<{coro}>')
-
if task._fut_waiter is not None:
- info.insert(3, f'wait_for={task._fut_waiter!r}')
+ info.insert(2, f'wait_for={task._fut_waiter!r}')
+
+ if task._coro:
+ coro = coroutines._format_coroutine(task._coro)
+ info.insert(2, f'coro=<{coro}>')
+
return info
diff --git a/contrib/tools/python3/src/Lib/asyncio/constants.py b/contrib/tools/python3/src/Lib/asyncio/constants.py
index 0ad997a921..b60c1e4236 100644
--- a/contrib/tools/python3/src/Lib/asyncio/constants.py
+++ b/contrib/tools/python3/src/Lib/asyncio/constants.py
@@ -30,6 +30,9 @@ SENDFILE_FALLBACK_READBUFFER_SIZE = 1024 * 256
FLOW_CONTROL_HIGH_WATER_SSL_READ = 256 # KiB
FLOW_CONTROL_HIGH_WATER_SSL_WRITE = 512 # KiB
+# Default timeout for joining the threads in the threadpool
+THREAD_JOIN_TIMEOUT = 300
+
# The enum should be here to break circular dependencies between
# base_events and sslproto
class _SendfileMode(enum.Enum):
diff --git a/contrib/tools/python3/src/Lib/asyncio/coroutines.py b/contrib/tools/python3/src/Lib/asyncio/coroutines.py
index 0e4b489f30..ab4f30eb51 100644
--- a/contrib/tools/python3/src/Lib/asyncio/coroutines.py
+++ b/contrib/tools/python3/src/Lib/asyncio/coroutines.py
@@ -4,7 +4,6 @@ import collections.abc
import inspect
import os
import sys
-import traceback
import types
@@ -26,8 +25,7 @@ def iscoroutinefunction(func):
# Prioritize native coroutine check to speed-up
# asyncio.iscoroutine.
-_COROUTINE_TYPES = (types.CoroutineType, types.GeneratorType,
- collections.abc.Coroutine)
+_COROUTINE_TYPES = (types.CoroutineType, collections.abc.Coroutine)
_iscoroutine_typecache = set()
diff --git a/contrib/tools/python3/src/Lib/asyncio/events.py b/contrib/tools/python3/src/Lib/asyncio/events.py
index cfc62156e4..016852880c 100644
--- a/contrib/tools/python3/src/Lib/asyncio/events.py
+++ b/contrib/tools/python3/src/Lib/asyncio/events.py
@@ -17,6 +17,7 @@ __all__ = (
import contextvars
import os
+import signal
import socket
import subprocess
import sys
@@ -65,6 +66,9 @@ class Handle:
info = self._repr_info()
return '<{}>'.format(' '.join(info))
+ def get_context(self):
+ return self._context
+
def cancel(self):
if not self._cancelled:
self._cancelled = True
@@ -675,6 +679,23 @@ class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
if (self._local._loop is None and
not self._local._set_called and
threading.current_thread() is threading.main_thread()):
+ stacklevel = 2
+ try:
+ f = sys._getframe(1)
+ except AttributeError:
+ pass
+ else:
+ # Move up the call stack so that the warning is attached
+ # to the line outside asyncio itself.
+ while f:
+ module = f.f_globals.get('__name__')
+ if not (module == 'asyncio' or module.startswith('asyncio.')):
+ break
+ f = f.f_back
+ stacklevel += 1
+ import warnings
+ warnings.warn('There is no current event loop',
+ DeprecationWarning, stacklevel=stacklevel)
self.set_event_loop(self.new_event_loop())
if self._local._loop is None:
@@ -786,14 +807,6 @@ def get_event_loop():
the result of `get_event_loop_policy().get_event_loop()` call.
"""
# NOTE: this function is implemented in C (see _asynciomodule.c)
- return _py__get_event_loop()
-
-
-def _get_event_loop(stacklevel=3):
- # This internal method is going away in Python 3.12, left here only for
- # backwards compatibility with 3.10.0 - 3.10.8 and 3.11.0.
- # Similarly, this method's C equivalent in _asyncio is going away as well.
- # See GH-99949 for more details.
current_loop = _get_running_loop()
if current_loop is not None:
return current_loop
@@ -826,7 +839,6 @@ _py__get_running_loop = _get_running_loop
_py__set_running_loop = _set_running_loop
_py_get_running_loop = get_running_loop
_py_get_event_loop = get_event_loop
-_py__get_event_loop = _get_event_loop
try:
@@ -834,7 +846,7 @@ try:
# functions in asyncio. Pure Python implementation is
# about 4 times slower than C-accelerated.
from _asyncio import (_get_running_loop, _set_running_loop,
- get_running_loop, get_event_loop, _get_event_loop)
+ get_running_loop, get_event_loop)
except ImportError:
pass
else:
@@ -843,4 +855,14 @@ else:
_c__set_running_loop = _set_running_loop
_c_get_running_loop = get_running_loop
_c_get_event_loop = get_event_loop
- _c__get_event_loop = _get_event_loop
+
+
+if hasattr(os, 'fork'):
+ def on_fork():
+ # Reset the loop and wakeupfd in the forked child process.
+ if _event_loop_policy is not None:
+ _event_loop_policy._local = BaseDefaultEventLoopPolicy._Local()
+ _set_running_loop(None)
+ signal.set_wakeup_fd(-1)
+
+ os.register_at_fork(after_in_child=on_fork)
diff --git a/contrib/tools/python3/src/Lib/asyncio/futures.py b/contrib/tools/python3/src/Lib/asyncio/futures.py
index 3a6b44a091..97fc4e3fcb 100644
--- a/contrib/tools/python3/src/Lib/asyncio/futures.py
+++ b/contrib/tools/python3/src/Lib/asyncio/futures.py
@@ -77,7 +77,7 @@ class Future:
the default event loop.
"""
if loop is None:
- self._loop = events._get_event_loop()
+ self._loop = events.get_event_loop()
else:
self._loop = loop
self._callbacks = []
@@ -413,7 +413,7 @@ def wrap_future(future, *, loop=None):
assert isinstance(future, concurrent.futures.Future), \
f'concurrent.futures.Future is expected, got {future!r}'
if loop is None:
- loop = events._get_event_loop()
+ loop = events.get_event_loop()
new_future = loop.create_future()
_chain_future(future, new_future)
return new_future
diff --git a/contrib/tools/python3/src/Lib/asyncio/locks.py b/contrib/tools/python3/src/Lib/asyncio/locks.py
index fd41dfd3f4..ce5d8d5bfb 100644
--- a/contrib/tools/python3/src/Lib/asyncio/locks.py
+++ b/contrib/tools/python3/src/Lib/asyncio/locks.py
@@ -8,7 +8,6 @@ import enum
from . import exceptions
from . import mixins
-from . import tasks
class _ContextManagerMixin:
async def __aenter__(self):
diff --git a/contrib/tools/python3/src/Lib/asyncio/proactor_events.py b/contrib/tools/python3/src/Lib/asyncio/proactor_events.py
index c6aab408fc..1e2a730cf3 100644
--- a/contrib/tools/python3/src/Lib/asyncio/proactor_events.py
+++ b/contrib/tools/python3/src/Lib/asyncio/proactor_events.py
@@ -288,7 +288,8 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
# we got end-of-file so no need to reschedule a new read
return
- data = self._data[:length]
+ # It's a new slice so make it immutable so protocols upstream don't have problems
+ data = bytes(memoryview(self._data)[:length])
else:
# the future will be replaced by next proactor.recv call
fut.cancel()
diff --git a/contrib/tools/python3/src/Lib/asyncio/runners.py b/contrib/tools/python3/src/Lib/asyncio/runners.py
index b3e0c44b7f..1b89236599 100644
--- a/contrib/tools/python3/src/Lib/asyncio/runners.py
+++ b/contrib/tools/python3/src/Lib/asyncio/runners.py
@@ -5,12 +5,11 @@ import enum
import functools
import threading
import signal
-import sys
from . import coroutines
from . import events
from . import exceptions
from . import tasks
-
+from . import constants
class _State(enum.Enum):
CREATED = "created"
@@ -70,7 +69,8 @@ class Runner:
loop = self._loop
_cancel_all_tasks(loop)
loop.run_until_complete(loop.shutdown_asyncgens())
- loop.run_until_complete(loop.shutdown_default_executor())
+ loop.run_until_complete(
+ loop.shutdown_default_executor(constants.THREAD_JOIN_TIMEOUT))
finally:
if self._set_event_loop:
events.set_event_loop(None)
@@ -157,12 +157,12 @@ class Runner:
raise KeyboardInterrupt()
-def run(main, *, debug=None):
+def run(main, *, debug=None, loop_factory=None):
"""Execute the coroutine and return the result.
This function runs the passed coroutine, taking care of
- managing the asyncio event loop and finalizing asynchronous
- generators.
+ managing the asyncio event loop, finalizing asynchronous
+ generators and closing the default executor.
This function cannot be called when another asyncio event loop is
running in the same thread.
@@ -173,6 +173,10 @@ def run(main, *, debug=None):
It should be used as a main entry point for asyncio programs, and should
ideally only be called once.
+ The executor is given a timeout duration of 5 minutes to shutdown.
+ If the executor hasn't finished within that duration, a warning is
+ emitted and the executor is closed.
+
Example:
async def main():
@@ -186,7 +190,7 @@ def run(main, *, debug=None):
raise RuntimeError(
"asyncio.run() cannot be called from a running event loop")
- with Runner(debug=debug) as runner:
+ with Runner(debug=debug, loop_factory=loop_factory) as runner:
return runner.run(main)
diff --git a/contrib/tools/python3/src/Lib/asyncio/selector_events.py b/contrib/tools/python3/src/Lib/asyncio/selector_events.py
index 40df1b7da5..790711f834 100644
--- a/contrib/tools/python3/src/Lib/asyncio/selector_events.py
+++ b/contrib/tools/python3/src/Lib/asyncio/selector_events.py
@@ -9,6 +9,8 @@ __all__ = 'BaseSelectorEventLoop',
import collections
import errno
import functools
+import itertools
+import os
import selectors
import socket
import warnings
@@ -28,6 +30,14 @@ from . import transports
from . import trsock
from .log import logger
+_HAS_SENDMSG = hasattr(socket.socket, 'sendmsg')
+
+if _HAS_SENDMSG:
+ try:
+ SC_IOV_MAX = os.sysconf('SC_IOV_MAX')
+ except OSError:
+ # Fallback to send
+ _HAS_SENDMSG = False
def _test_selector_event(selector, fd, event):
# Test if the selector is monitoring 'event' events
@@ -58,6 +68,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
def _make_socket_transport(self, sock, protocol, waiter=None, *,
extra=None, server=None):
+ self._ensure_fd_no_transport(sock)
return _SelectorSocketTransport(self, sock, protocol, waiter,
extra, server)
@@ -68,6 +79,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT,
):
+ self._ensure_fd_no_transport(rawsock)
ssl_protocol = sslproto.SSLProtocol(
self, protocol, sslcontext, waiter,
server_side, server_hostname,
@@ -80,6 +92,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
def _make_datagram_transport(self, sock, protocol,
address=None, waiter=None, extra=None):
+ self._ensure_fd_no_transport(sock)
return _SelectorDatagramTransport(self, sock, protocol,
address, waiter, extra)
@@ -758,8 +771,6 @@ class _SelectorTransport(transports._FlowControlMixin,
max_size = 256 * 1024 # Buffer size passed to recv().
- _buffer_factory = bytearray # Constructs initial value for self._buffer.
-
# Attribute used in the destructor: it must be set even if the constructor
# is not called (see _SelectorSslTransport which may start by raising an
# exception)
@@ -784,7 +795,7 @@ class _SelectorTransport(transports._FlowControlMixin,
self.set_protocol(protocol)
self._server = server
- self._buffer = self._buffer_factory()
+ self._buffer = collections.deque()
self._conn_lost = 0 # Set when call to connection_lost scheduled.
self._closing = False # Set when close() called.
self._paused = False # Set when pause_reading() called
@@ -909,7 +920,7 @@ class _SelectorTransport(transports._FlowControlMixin,
self._server = None
def get_write_buffer_size(self):
- return len(self._buffer)
+ return sum(map(len, self._buffer))
def _add_reader(self, fd, callback, *args):
if not self.is_reading():
@@ -929,7 +940,10 @@ class _SelectorSocketTransport(_SelectorTransport):
super().__init__(loop, sock, protocol, extra, server)
self._eof = False
self._empty_waiter = None
-
+ if _HAS_SENDMSG:
+ self._write_ready = self._write_sendmsg
+ else:
+ self._write_ready = self._write_send
# Disable the Nagle algorithm -- small writes will be
# sent without waiting for the TCP ACK. This generally
# decreases the latency (in some cases significantly.)
@@ -1067,23 +1081,68 @@ class _SelectorSocketTransport(_SelectorTransport):
self._fatal_error(exc, 'Fatal write error on socket transport')
return
else:
- data = data[n:]
+ data = memoryview(data)[n:]
if not data:
return
# Not all was written; register write handler.
self._loop._add_writer(self._sock_fd, self._write_ready)
# Add it to the buffer.
- self._buffer.extend(data)
+ self._buffer.append(data)
self._maybe_pause_protocol()
- def _write_ready(self):
+ def _get_sendmsg_buffer(self):
+ return itertools.islice(self._buffer, SC_IOV_MAX)
+
+ def _write_sendmsg(self):
assert self._buffer, 'Data should not be empty'
+ if self._conn_lost:
+ return
+ try:
+ nbytes = self._sock.sendmsg(self._get_sendmsg_buffer())
+ self._adjust_leftover_buffer(nbytes)
+ except (BlockingIOError, InterruptedError):
+ pass
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
+ self._loop._remove_writer(self._sock_fd)
+ self._buffer.clear()
+ self._fatal_error(exc, 'Fatal write error on socket transport')
+ if self._empty_waiter is not None:
+ self._empty_waiter.set_exception(exc)
+ else:
+ self._maybe_resume_protocol() # May append to buffer.
+ if not self._buffer:
+ self._loop._remove_writer(self._sock_fd)
+ if self._empty_waiter is not None:
+ self._empty_waiter.set_result(None)
+ if self._closing:
+ self._call_connection_lost(None)
+ elif self._eof:
+ self._sock.shutdown(socket.SHUT_WR)
+ def _adjust_leftover_buffer(self, nbytes: int) -> None:
+ buffer = self._buffer
+ while nbytes:
+ b = buffer.popleft()
+ b_len = len(b)
+ if b_len <= nbytes:
+ nbytes -= b_len
+ else:
+ buffer.appendleft(b[nbytes:])
+ break
+
+ def _write_send(self):
+ assert self._buffer, 'Data should not be empty'
if self._conn_lost:
return
try:
- n = self._sock.send(self._buffer)
+ buffer = self._buffer.popleft()
+ n = self._sock.send(buffer)
+ if n != len(buffer):
+ # Not all data was written
+ self._buffer.appendleft(buffer[n:])
except (BlockingIOError, InterruptedError):
pass
except (SystemExit, KeyboardInterrupt):
@@ -1095,8 +1154,6 @@ class _SelectorSocketTransport(_SelectorTransport):
if self._empty_waiter is not None:
self._empty_waiter.set_exception(exc)
else:
- if n:
- del self._buffer[:n]
self._maybe_resume_protocol() # May append to buffer.
if not self._buffer:
self._loop._remove_writer(self._sock_fd)
@@ -1114,6 +1171,19 @@ class _SelectorSocketTransport(_SelectorTransport):
if not self._buffer:
self._sock.shutdown(socket.SHUT_WR)
+ def writelines(self, list_of_data):
+ if self._eof:
+ raise RuntimeError('Cannot call writelines() after write_eof()')
+ if self._empty_waiter is not None:
+ raise RuntimeError('unable to writelines; sendfile is in progress')
+ if not list_of_data:
+ return
+ self._buffer.extend([memoryview(data) for data in list_of_data])
+ self._write_ready()
+ # If the entire buffer couldn't be written, register a write handler
+ if self._buffer:
+ self._loop._add_writer(self._sock_fd, self._write_ready)
+
def can_write_eof(self):
return True
@@ -1134,8 +1204,13 @@ class _SelectorSocketTransport(_SelectorTransport):
def _reset_empty_waiter(self):
self._empty_waiter = None
+ def close(self):
+ self._read_ready_cb = None
+ self._write_ready = None
+ super().close()
+
-class _SelectorDatagramTransport(_SelectorTransport):
+class _SelectorDatagramTransport(_SelectorTransport, transports.DatagramTransport):
_buffer_factory = collections.deque
diff --git a/contrib/tools/python3/src/Lib/asyncio/streams.py b/contrib/tools/python3/src/Lib/asyncio/streams.py
index 26ffc86584..f310aa2f36 100644
--- a/contrib/tools/python3/src/Lib/asyncio/streams.py
+++ b/contrib/tools/python3/src/Lib/asyncio/streams.py
@@ -125,7 +125,7 @@ class FlowControlMixin(protocols.Protocol):
def __init__(self, loop=None):
if loop is None:
- self._loop = events._get_event_loop(stacklevel=4)
+ self._loop = events.get_event_loop()
else:
self._loop = loop
self._paused = False
@@ -393,7 +393,8 @@ class StreamWriter:
async def start_tls(self, sslcontext, *,
server_hostname=None,
- ssl_handshake_timeout=None):
+ ssl_handshake_timeout=None,
+ ssl_shutdown_timeout=None):
"""Upgrade an existing stream-based connection to TLS."""
server_side = self._protocol._client_connected_cb is not None
protocol = self._protocol
@@ -401,7 +402,8 @@ class StreamWriter:
new_transport = await self._loop.start_tls( # type: ignore
self._transport, protocol, sslcontext,
server_side=server_side, server_hostname=server_hostname,
- ssl_handshake_timeout=ssl_handshake_timeout)
+ ssl_handshake_timeout=ssl_handshake_timeout,
+ ssl_shutdown_timeout=ssl_shutdown_timeout)
self._transport = new_transport
protocol._replace_writer(self)
@@ -426,7 +428,7 @@ class StreamReader:
self._limit = limit
if loop is None:
- self._loop = events._get_event_loop()
+ self._loop = events.get_event_loop()
else:
self._loop = loop
self._buffer = bytearray()
@@ -711,7 +713,7 @@ class StreamReader:
await self._wait_for_data('read')
# This will work right even if buffer is less than n bytes
- data = bytes(self._buffer[:n])
+ data = bytes(memoryview(self._buffer)[:n])
del self._buffer[:n]
self._maybe_resume_transport()
@@ -753,7 +755,7 @@ class StreamReader:
data = bytes(self._buffer)
self._buffer.clear()
else:
- data = bytes(self._buffer[:n])
+ data = bytes(memoryview(self._buffer)[:n])
del self._buffer[:n]
self._maybe_resume_transport()
return data
diff --git a/contrib/tools/python3/src/Lib/asyncio/subprocess.py b/contrib/tools/python3/src/Lib/asyncio/subprocess.py
index da4f00a4a0..043359bbd0 100644
--- a/contrib/tools/python3/src/Lib/asyncio/subprocess.py
+++ b/contrib/tools/python3/src/Lib/asyncio/subprocess.py
@@ -148,10 +148,11 @@ class Process:
async def _feed_stdin(self, input):
debug = self._loop.get_debug()
try:
- self.stdin.write(input)
- if debug:
- logger.debug(
- '%r communicate: feed stdin (%s bytes)', self, len(input))
+ if input is not None:
+ self.stdin.write(input)
+ if debug:
+ logger.debug(
+ '%r communicate: feed stdin (%s bytes)', self, len(input))
await self.stdin.drain()
except (BrokenPipeError, ConnectionResetError) as exc:
@@ -185,7 +186,7 @@ class Process:
return output
async def communicate(self, input=None):
- if input is not None:
+ if self.stdin is not None:
stdin = self._feed_stdin(input)
else:
stdin = self._noop()
diff --git a/contrib/tools/python3/src/Lib/asyncio/taskgroups.py b/contrib/tools/python3/src/Lib/asyncio/taskgroups.py
index bfdbe63049..d264e51f1f 100644
--- a/contrib/tools/python3/src/Lib/asyncio/taskgroups.py
+++ b/contrib/tools/python3/src/Lib/asyncio/taskgroups.py
@@ -2,7 +2,7 @@
# license: PSFL.
-__all__ = ["TaskGroup"]
+__all__ = ("TaskGroup",)
from . import events
from . import exceptions
@@ -162,8 +162,14 @@ class TaskGroup:
else:
task = self._loop.create_task(coro, context=context)
tasks._set_task_name(task, name)
- task.add_done_callback(self._on_task_done)
- self._tasks.add(task)
+ # optimization: Immediately call the done callback if the task is
+ # already done (e.g. if the coro was able to complete eagerly),
+ # and skip scheduling a done callback
+ if task.done():
+ self._on_task_done(task)
+ else:
+ self._tasks.add(task)
+ task.add_done_callback(self._on_task_done)
return task
# Since Python 3.8 Tasks propagate all exceptions correctly,
diff --git a/contrib/tools/python3/src/Lib/asyncio/tasks.py b/contrib/tools/python3/src/Lib/asyncio/tasks.py
index 6ca545e30a..65f2a6ef80 100644
--- a/contrib/tools/python3/src/Lib/asyncio/tasks.py
+++ b/contrib/tools/python3/src/Lib/asyncio/tasks.py
@@ -6,6 +6,7 @@ __all__ = (
'wait', 'wait_for', 'as_completed', 'sleep',
'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
'current_task', 'all_tasks',
+ 'create_eager_task_factory', 'eager_task_factory',
'_register_task', '_unregister_task', '_enter_task', '_leave_task',
)
@@ -24,7 +25,7 @@ from . import coroutines
from . import events
from . import exceptions
from . import futures
-from .coroutines import _is_coroutine
+from . import timeouts
# Helper to generate new task names
# This uses itertools.count() instead of a "+= 1" operation because the latter
@@ -43,22 +44,26 @@ def all_tasks(loop=None):
"""Return a set of all tasks for the loop."""
if loop is None:
loop = events.get_running_loop()
- # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
- # thread while we do so. Therefore we cast it to list prior to filtering. The list
- # cast itself requires iteration, so we repeat it several times ignoring
- # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
- # details.
+ # capturing the set of eager tasks first, so if an eager task "graduates"
+ # to a regular task in another thread, we don't risk missing it.
+ eager_tasks = list(_eager_tasks)
+ # Looping over the WeakSet isn't safe as it can be updated from another
+ # thread, therefore we cast it to list prior to filtering. The list cast
+ # itself requires iteration, so we repeat it several times ignoring
+ # RuntimeErrors (which are not very likely to occur).
+ # See issues 34970 and 36607 for details.
+ scheduled_tasks = None
i = 0
while True:
try:
- tasks = list(_all_tasks)
+ scheduled_tasks = list(_scheduled_tasks)
except RuntimeError:
i += 1
if i >= 1000:
raise
else:
break
- return {t for t in tasks
+ return {t for t in itertools.chain(scheduled_tasks, eager_tasks)
if futures._get_loop(t) is loop and not t.done()}
@@ -103,7 +108,8 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
# status is still pending
_log_destroy_pending = True
- def __init__(self, coro, *, loop=None, name=None, context=None):
+ def __init__(self, coro, *, loop=None, name=None, context=None,
+ eager_start=False):
super().__init__(loop=loop)
if self._source_traceback:
del self._source_traceback[-1]
@@ -127,8 +133,11 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
else:
self._context = context
- self._loop.call_soon(self.__step, context=self._context)
- _register_task(self)
+ if eager_start and self._loop.is_running():
+ self.__eager_start()
+ else:
+ self._loop.call_soon(self.__step, context=self._context)
+ _register_task(self)
def __del__(self):
if self._state == futures._PENDING and self._log_destroy_pending:
@@ -149,6 +158,9 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
def get_coro(self):
return self._coro
+ def get_context(self):
+ return self._context
+
def get_name(self):
return self._name
@@ -257,6 +269,25 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
self._num_cancels_requested -= 1
return self._num_cancels_requested
+ def __eager_start(self):
+ prev_task = _swap_current_task(self._loop, self)
+ try:
+ _register_eager_task(self)
+ try:
+ self._context.run(self.__step_run_and_handle_result, None)
+ finally:
+ _unregister_eager_task(self)
+ finally:
+ try:
+ curtask = _swap_current_task(self._loop, prev_task)
+ assert curtask is self
+ finally:
+ if self.done():
+ self._coro = None
+ self = None # Needed to break cycles when an exception occurs.
+ else:
+ _register_task(self)
+
def __step(self, exc=None):
if self.done():
raise exceptions.InvalidStateError(
@@ -265,11 +296,17 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
if not isinstance(exc, exceptions.CancelledError):
exc = self._make_cancelled_error()
self._must_cancel = False
- coro = self._coro
self._fut_waiter = None
_enter_task(self._loop, self)
- # Call either coro.throw(exc) or coro.send(None).
+ try:
+ self.__step_run_and_handle_result(exc)
+ finally:
+ _leave_task(self._loop, self)
+ self = None # Needed to break cycles when an exception occurs.
+
+ def __step_run_and_handle_result(self, exc):
+ coro = self._coro
try:
if exc is None:
# We use the `send` method directly, because coroutines
@@ -341,7 +378,6 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
self._loop.call_soon(
self.__step, new_exc, context=self._context)
finally:
- _leave_task(self._loop, self)
self = None # Needed to break cycles when an exception occurs.
def __wakeup(self, future):
@@ -444,65 +480,44 @@ async def wait_for(fut, timeout):
If the wait is cancelled, the task is also cancelled.
+ If the task supresses the cancellation and returns a value instead,
+ that value is returned.
+
This function is a coroutine.
"""
- loop = events.get_running_loop()
+ # The special case for timeout <= 0 is for the following case:
+ #
+ # async def test_waitfor():
+ # func_started = False
+ #
+ # async def func():
+ # nonlocal func_started
+ # func_started = True
+ #
+ # try:
+ # await asyncio.wait_for(func(), 0)
+ # except asyncio.TimeoutError:
+ # assert not func_started
+ # else:
+ # assert False
+ #
+ # asyncio.run(test_waitfor())
- if timeout is None:
- return await fut
- if timeout <= 0:
- fut = ensure_future(fut, loop=loop)
+ if timeout is not None and timeout <= 0:
+ fut = ensure_future(fut)
if fut.done():
return fut.result()
- await _cancel_and_wait(fut, loop=loop)
+ await _cancel_and_wait(fut)
try:
return fut.result()
except exceptions.CancelledError as exc:
- raise exceptions.TimeoutError() from exc
-
- waiter = loop.create_future()
- timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
- cb = functools.partial(_release_waiter, waiter)
-
- fut = ensure_future(fut, loop=loop)
- fut.add_done_callback(cb)
-
- try:
- # wait until the future completes or the timeout
- try:
- await waiter
- except exceptions.CancelledError:
- if fut.done():
- return fut.result()
- else:
- fut.remove_done_callback(cb)
- # We must ensure that the task is not running
- # after wait_for() returns.
- # See https://bugs.python.org/issue32751
- await _cancel_and_wait(fut, loop=loop)
- raise
-
- if fut.done():
- return fut.result()
- else:
- fut.remove_done_callback(cb)
- # We must ensure that the task is not running
- # after wait_for() returns.
- # See https://bugs.python.org/issue32751
- await _cancel_and_wait(fut, loop=loop)
- # In case task cancellation failed with some
- # exception, we should re-raise it
- # See https://bugs.python.org/issue40607
- try:
- return fut.result()
- except exceptions.CancelledError as exc:
- raise exceptions.TimeoutError() from exc
- finally:
- timeout_handle.cancel()
+ raise TimeoutError from exc
+ async with timeouts.timeout(timeout):
+ return await fut
async def _wait(fs, timeout, return_when, loop):
"""Internal helper for wait().
@@ -548,9 +563,10 @@ async def _wait(fs, timeout, return_when, loop):
return done, pending
-async def _cancel_and_wait(fut, loop):
+async def _cancel_and_wait(fut):
"""Cancel the *fut* future or task and wait until it completes."""
+ loop = events.get_running_loop()
waiter = loop.create_future()
cb = functools.partial(_release_waiter, waiter)
fut.add_done_callback(cb)
@@ -589,7 +605,7 @@ def as_completed(fs, *, timeout=None):
from .queues import Queue # Import here to avoid circular import problem.
done = Queue()
- loop = events._get_event_loop()
+ loop = events.get_event_loop()
todo = {ensure_future(f, loop=loop) for f in set(fs)}
timeout_handle = None
@@ -656,46 +672,33 @@ def ensure_future(coro_or_future, *, loop=None):
If the argument is a Future, it is returned directly.
"""
- return _ensure_future(coro_or_future, loop=loop)
-
-
-def _ensure_future(coro_or_future, *, loop=None):
if futures.isfuture(coro_or_future):
if loop is not None and loop is not futures._get_loop(coro_or_future):
raise ValueError('The future belongs to a different loop than '
'the one specified as the loop argument')
return coro_or_future
- called_wrap_awaitable = False
+ should_close = True
if not coroutines.iscoroutine(coro_or_future):
if inspect.isawaitable(coro_or_future):
+ async def _wrap_awaitable(awaitable):
+ return await awaitable
+
coro_or_future = _wrap_awaitable(coro_or_future)
- called_wrap_awaitable = True
+ should_close = False
else:
raise TypeError('An asyncio.Future, a coroutine or an awaitable '
'is required')
if loop is None:
- loop = events._get_event_loop(stacklevel=4)
+ loop = events.get_event_loop()
try:
return loop.create_task(coro_or_future)
except RuntimeError:
- if not called_wrap_awaitable:
+ if should_close:
coro_or_future.close()
raise
-@types.coroutine
-def _wrap_awaitable(awaitable):
- """Helper for asyncio.ensure_future().
-
- Wraps awaitable (an object with __await__) into a coroutine
- that will later be wrapped in a Task by ensure_future().
- """
- return (yield from awaitable.__await__())
-
-_wrap_awaitable._is_coroutine = _is_coroutine
-
-
class _GatheringFuture(futures.Future):
"""Helper for gather().
@@ -756,7 +759,7 @@ def gather(*coros_or_futures, return_exceptions=False):
gather won't cancel any other awaitables.
"""
if not coros_or_futures:
- loop = events._get_event_loop()
+ loop = events.get_event_loop()
outer = loop.create_future()
outer.set_result([])
return outer
@@ -820,11 +823,12 @@ def gather(*coros_or_futures, return_exceptions=False):
children = []
nfuts = 0
nfinished = 0
+ done_futs = []
loop = None
outer = None # bpo-46672
for arg in coros_or_futures:
if arg not in arg_to_fut:
- fut = _ensure_future(arg, loop=loop)
+ fut = ensure_future(arg, loop=loop)
if loop is None:
loop = futures._get_loop(fut)
if fut is not arg:
@@ -836,7 +840,10 @@ def gather(*coros_or_futures, return_exceptions=False):
nfuts += 1
arg_to_fut[arg] = fut
- fut.add_done_callback(_done_callback)
+ if fut.done():
+ done_futs.append(fut)
+ else:
+ fut.add_done_callback(_done_callback)
else:
# There's a duplicate Future object in coros_or_futures.
@@ -845,6 +852,13 @@ def gather(*coros_or_futures, return_exceptions=False):
children.append(fut)
outer = _GatheringFuture(children, loop=loop)
+ # Run done callbacks after GatheringFuture created so any post-processing
+ # can be performed at this point
+ # optimization: in the special case that *all* futures finished eagerly,
+ # this will effectively complete the gather eagerly, with the last
+ # callback setting the result (or exception) on outer before returning it
+ for fut in done_futs:
+ _done_callback(fut)
return outer
@@ -881,7 +895,7 @@ def shield(arg):
weak references to tasks. A task that isn't referenced elsewhere
may get garbage collected at any time, even before it's done.
"""
- inner = _ensure_future(arg)
+ inner = ensure_future(arg)
if inner.done():
# Shortcut.
return inner
@@ -937,8 +951,40 @@ def run_coroutine_threadsafe(coro, loop):
return future
-# WeakSet containing all alive tasks.
-_all_tasks = weakref.WeakSet()
+def create_eager_task_factory(custom_task_constructor):
+ """Create a function suitable for use as a task factory on an event-loop.
+
+ Example usage:
+
+ loop.set_task_factory(
+ asyncio.create_eager_task_factory(my_task_constructor))
+
+ Now, tasks created will be started immediately (rather than being first
+ scheduled to an event loop). The constructor argument can be any callable
+ that returns a Task-compatible object and has a signature compatible
+ with `Task.__init__`; it must have the `eager_start` keyword argument.
+
+ Most applications will use `Task` for `custom_task_constructor` and in
+ this case there's no need to call `create_eager_task_factory()`
+ directly. Instead the global `eager_task_factory` instance can be
+ used. E.g. `loop.set_task_factory(asyncio.eager_task_factory)`.
+ """
+
+ def factory(loop, coro, *, name=None, context=None):
+ return custom_task_constructor(
+ coro, loop=loop, name=name, context=context, eager_start=True)
+
+ return factory
+
+
+eager_task_factory = create_eager_task_factory(Task)
+
+
+# Collectively these two sets hold references to the complete set of active
+# tasks. Eagerly executed tasks use a faster regular set as an optimization
+# but may graduate to a WeakSet if the task blocks on IO.
+_scheduled_tasks = weakref.WeakSet()
+_eager_tasks = set()
# Dictionary containing tasks that are currently active in
# all running event loops. {EventLoop: Task}
@@ -946,8 +992,13 @@ _current_tasks = {}
def _register_task(task):
- """Register a new task in asyncio as executed by loop."""
- _all_tasks.add(task)
+ """Register an asyncio Task scheduled to run on an event loop."""
+ _scheduled_tasks.add(task)
+
+
+def _register_eager_task(task):
+ """Register an asyncio Task about to be eagerly executed."""
+ _eager_tasks.add(task)
def _enter_task(loop, task):
@@ -966,25 +1017,49 @@ def _leave_task(loop, task):
del _current_tasks[loop]
+def _swap_current_task(loop, task):
+ prev_task = _current_tasks.get(loop)
+ if task is None:
+ del _current_tasks[loop]
+ else:
+ _current_tasks[loop] = task
+ return prev_task
+
+
def _unregister_task(task):
- """Unregister a task."""
- _all_tasks.discard(task)
+ """Unregister a completed, scheduled Task."""
+ _scheduled_tasks.discard(task)
+
+
+def _unregister_eager_task(task):
+ """Unregister a task which finished its first eager step."""
+ _eager_tasks.discard(task)
+_py_current_task = current_task
_py_register_task = _register_task
+_py_register_eager_task = _register_eager_task
_py_unregister_task = _unregister_task
+_py_unregister_eager_task = _unregister_eager_task
_py_enter_task = _enter_task
_py_leave_task = _leave_task
+_py_swap_current_task = _swap_current_task
try:
- from _asyncio import (_register_task, _unregister_task,
- _enter_task, _leave_task,
- _all_tasks, _current_tasks)
+ from _asyncio import (_register_task, _register_eager_task,
+ _unregister_task, _unregister_eager_task,
+ _enter_task, _leave_task, _swap_current_task,
+ _scheduled_tasks, _eager_tasks, _current_tasks,
+ current_task)
except ImportError:
pass
else:
+ _c_current_task = current_task
_c_register_task = _register_task
+ _c_register_eager_task = _register_eager_task
_c_unregister_task = _unregister_task
+ _c_unregister_eager_task = _unregister_eager_task
_c_enter_task = _enter_task
_c_leave_task = _leave_task
+ _c_swap_current_task = _swap_current_task
diff --git a/contrib/tools/python3/src/Lib/asyncio/unix_events.py b/contrib/tools/python3/src/Lib/asyncio/unix_events.py
index 77d2670f83..f2e920ada4 100644
--- a/contrib/tools/python3/src/Lib/asyncio/unix_events.py
+++ b/contrib/tools/python3/src/Lib/asyncio/unix_events.py
@@ -195,22 +195,25 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
async def _make_subprocess_transport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
extra=None, **kwargs):
- with events.get_child_watcher() as watcher:
+ with warnings.catch_warnings():
+ warnings.simplefilter('ignore', DeprecationWarning)
+ watcher = events.get_child_watcher()
+
+ with watcher:
if not watcher.is_active():
# Check early.
# Raising exception before process creation
# prevents subprocess execution if the watcher
# is not ready to handle it.
raise RuntimeError("asyncio.get_child_watcher() is not activated, "
- "subprocess support is not installed.")
+ "subprocess support is not installed.")
waiter = self.create_future()
transp = _UnixSubprocessTransport(self, protocol, args, shell,
- stdin, stdout, stderr, bufsize,
- waiter=waiter, extra=extra,
- **kwargs)
-
+ stdin, stdout, stderr, bufsize,
+ waiter=waiter, extra=extra,
+ **kwargs)
watcher.add_child_handler(transp.get_pid(),
- self._child_watcher_callback, transp)
+ self._child_watcher_callback, transp)
try:
await waiter
except (SystemExit, KeyboardInterrupt):
@@ -223,8 +226,7 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
return transp
def _child_watcher_callback(self, pid, returncode, transp):
- # Skip one iteration for callbacks to be executed
- self.call_soon_threadsafe(self.call_soon, transp._process_exited, returncode)
+ self.call_soon_threadsafe(transp._process_exited, returncode)
async def create_unix_connection(
self, protocol_factory, path=None, *,
@@ -851,6 +853,13 @@ class AbstractChildWatcher:
waitpid(-1), there should be only one active object per process.
"""
+ def __init_subclass__(cls) -> None:
+ if cls.__module__ != __name__:
+ warnings._deprecated("AbstractChildWatcher",
+ "{name!r} is deprecated as of Python 3.12 and will be "
+ "removed in Python {remove}.",
+ remove=(3, 14))
+
def add_child_handler(self, pid, callback, *args):
"""Register a new child handler.
@@ -919,10 +928,6 @@ class PidfdChildWatcher(AbstractChildWatcher):
recent (5.3+) kernels.
"""
- def __init__(self):
- self._loop = None
- self._callbacks = {}
-
def __enter__(self):
return self
@@ -930,35 +935,22 @@ class PidfdChildWatcher(AbstractChildWatcher):
pass
def is_active(self):
- return self._loop is not None and self._loop.is_running()
+ return True
def close(self):
- self.attach_loop(None)
+ pass
def attach_loop(self, loop):
- if self._loop is not None and loop is None and self._callbacks:
- warnings.warn(
- 'A loop is being detached '
- 'from a child watcher with pending handlers',
- RuntimeWarning)
- for pidfd, _, _ in self._callbacks.values():
- self._loop._remove_reader(pidfd)
- os.close(pidfd)
- self._callbacks.clear()
- self._loop = loop
+ pass
def add_child_handler(self, pid, callback, *args):
- existing = self._callbacks.get(pid)
- if existing is not None:
- self._callbacks[pid] = existing[0], callback, args
- else:
- pidfd = os.pidfd_open(pid)
- self._loop._add_reader(pidfd, self._do_wait, pid)
- self._callbacks[pid] = pidfd, callback, args
+ loop = events.get_running_loop()
+ pidfd = os.pidfd_open(pid)
+ loop._add_reader(pidfd, self._do_wait, pid, pidfd, callback, args)
- def _do_wait(self, pid):
- pidfd, callback, args = self._callbacks.pop(pid)
- self._loop._remove_reader(pidfd)
+ def _do_wait(self, pid, pidfd, callback, args):
+ loop = events.get_running_loop()
+ loop._remove_reader(pidfd)
try:
_, status = os.waitpid(pid, 0)
except ChildProcessError:
@@ -976,12 +968,9 @@ class PidfdChildWatcher(AbstractChildWatcher):
callback(pid, returncode, *args)
def remove_child_handler(self, pid):
- try:
- pidfd, _, _ = self._callbacks.pop(pid)
- except KeyError:
- return False
- self._loop._remove_reader(pidfd)
- os.close(pidfd)
+ # asyncio never calls remove_child_handler() !!!
+ # The method is no-op but is implemented because
+ # abstract base classes require it.
return True
@@ -1049,6 +1038,13 @@ class SafeChildWatcher(BaseChildWatcher):
big number of children (O(n) each time SIGCHLD is raised)
"""
+ def __init__(self):
+ super().__init__()
+ warnings._deprecated("SafeChildWatcher",
+ "{name!r} is deprecated as of Python 3.12 and will be "
+ "removed in Python {remove}.",
+ remove=(3, 14))
+
def close(self):
self._callbacks.clear()
super().close()
@@ -1127,6 +1123,10 @@ class FastChildWatcher(BaseChildWatcher):
self._lock = threading.Lock()
self._zombies = {}
self._forks = 0
+ warnings._deprecated("FastChildWatcher",
+ "{name!r} is deprecated as of Python 3.12 and will be "
+ "removed in Python {remove}.",
+ remove=(3, 14))
def close(self):
self._callbacks.clear()
@@ -1239,6 +1239,10 @@ class MultiLoopChildWatcher(AbstractChildWatcher):
def __init__(self):
self._callbacks = {}
self._saved_sighandler = None
+ warnings._deprecated("MultiLoopChildWatcher",
+ "{name!r} is deprecated as of Python 3.12 and will be "
+ "removed in Python {remove}.",
+ remove=(3, 14))
def is_active(self):
return self._saved_sighandler is not None
@@ -1423,6 +1427,17 @@ class ThreadedChildWatcher(AbstractChildWatcher):
self._threads.pop(expected_pid)
+def can_use_pidfd():
+ if not hasattr(os, 'pidfd_open'):
+ return False
+ try:
+ pid = os.getpid()
+ os.close(os.pidfd_open(pid, 0))
+ except OSError:
+ # blocked by security policy like SECCOMP
+ return False
+ return True
+
class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
"""UNIX event loop policy with a watcher for child processes."""
@@ -1435,7 +1450,10 @@ class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
def _init_watcher(self):
with events._lock:
if self._watcher is None: # pragma: no branch
- self._watcher = ThreadedChildWatcher()
+ if can_use_pidfd():
+ self._watcher = PidfdChildWatcher()
+ else:
+ self._watcher = ThreadedChildWatcher()
def set_event_loop(self, loop):
"""Set the event loop.
@@ -1459,6 +1477,9 @@ class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
if self._watcher is None:
self._init_watcher()
+ warnings._deprecated("get_child_watcher",
+ "{name!r} is deprecated as of Python 3.12 and will be "
+ "removed in Python {remove}.", remove=(3, 14))
return self._watcher
def set_child_watcher(self, watcher):
@@ -1470,6 +1491,9 @@ class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
self._watcher.close()
self._watcher = watcher
+ warnings._deprecated("set_child_watcher",
+ "{name!r} is deprecated as of Python 3.12 and will be "
+ "removed in Python {remove}.", remove=(3, 14))
SelectorEventLoop = _UnixSelectorEventLoop
diff --git a/contrib/tools/python3/src/Lib/asyncio/windows_events.py b/contrib/tools/python3/src/Lib/asyncio/windows_events.py
index eb33551b41..c9a5fb841c 100644
--- a/contrib/tools/python3/src/Lib/asyncio/windows_events.py
+++ b/contrib/tools/python3/src/Lib/asyncio/windows_events.py
@@ -455,6 +455,17 @@ class IocpProactor:
fut.set_result(value)
return fut
+ @staticmethod
+ def finish_socket_func(trans, key, ov):
+ try:
+ return ov.getresult()
+ except OSError as exc:
+ if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
+ _overlapped.ERROR_OPERATION_ABORTED):
+ raise ConnectionResetError(*exc.args)
+ else:
+ raise
+
def recv(self, conn, nbytes, flags=0):
self._register_with_iocp(conn)
ov = _overlapped.Overlapped(NULL)
@@ -466,17 +477,7 @@ class IocpProactor:
except BrokenPipeError:
return self._result(b'')
- def finish_recv(trans, key, ov):
- try:
- return ov.getresult()
- except OSError as exc:
- if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
- _overlapped.ERROR_OPERATION_ABORTED):
- raise ConnectionResetError(*exc.args)
- else:
- raise
-
- return self._register(ov, conn, finish_recv)
+ return self._register(ov, conn, self.finish_socket_func)
def recv_into(self, conn, buf, flags=0):
self._register_with_iocp(conn)
@@ -489,17 +490,7 @@ class IocpProactor:
except BrokenPipeError:
return self._result(0)
- def finish_recv(trans, key, ov):
- try:
- return ov.getresult()
- except OSError as exc:
- if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
- _overlapped.ERROR_OPERATION_ABORTED):
- raise ConnectionResetError(*exc.args)
- else:
- raise
-
- return self._register(ov, conn, finish_recv)
+ return self._register(ov, conn, self.finish_socket_func)
def recvfrom(self, conn, nbytes, flags=0):
self._register_with_iocp(conn)
@@ -509,17 +500,7 @@ class IocpProactor:
except BrokenPipeError:
return self._result((b'', None))
- def finish_recv(trans, key, ov):
- try:
- return ov.getresult()
- except OSError as exc:
- if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
- _overlapped.ERROR_OPERATION_ABORTED):
- raise ConnectionResetError(*exc.args)
- else:
- raise
-
- return self._register(ov, conn, finish_recv)
+ return self._register(ov, conn, self.finish_socket_func)
def recvfrom_into(self, conn, buf, flags=0):
self._register_with_iocp(conn)
@@ -547,17 +528,7 @@ class IocpProactor:
ov.WSASendTo(conn.fileno(), buf, flags, addr)
- def finish_send(trans, key, ov):
- try:
- return ov.getresult()
- except OSError as exc:
- if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
- _overlapped.ERROR_OPERATION_ABORTED):
- raise ConnectionResetError(*exc.args)
- else:
- raise
-
- return self._register(ov, conn, finish_send)
+ return self._register(ov, conn, self.finish_socket_func)
def send(self, conn, buf, flags=0):
self._register_with_iocp(conn)
@@ -567,17 +538,7 @@ class IocpProactor:
else:
ov.WriteFile(conn.fileno(), buf)
- def finish_send(trans, key, ov):
- try:
- return ov.getresult()
- except OSError as exc:
- if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
- _overlapped.ERROR_OPERATION_ABORTED):
- raise ConnectionResetError(*exc.args)
- else:
- raise
-
- return self._register(ov, conn, finish_send)
+ return self._register(ov, conn, self.finish_socket_func)
def accept(self, listener):
self._register_with_iocp(listener)
@@ -648,16 +609,7 @@ class IocpProactor:
offset_low, offset_high,
count, 0, 0)
- def finish_sendfile(trans, key, ov):
- try:
- return ov.getresult()
- except OSError as exc:
- if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
- _overlapped.ERROR_OPERATION_ABORTED):
- raise ConnectionResetError(*exc.args)
- else:
- raise
- return self._register(ov, sock, finish_sendfile)
+ return self._register(ov, sock, self.finish_socket_func)
def accept_pipe(self, pipe):
self._register_with_iocp(pipe)