diff options
author | shadchin <shadchin@yandex-team.com> | 2024-02-12 07:53:52 +0300 |
---|---|---|
committer | shadchin <shadchin@yandex-team.com> | 2024-02-12 08:07:36 +0300 |
commit | ce1b7ca3171f9158180640c6a02a74b4afffedea (patch) | |
tree | e47c1e8391b1b0128262c1e9b1e6ed4c8fff2348 /contrib/tools/python3/src/Lib/asyncio | |
parent | 57350d96f030db90f220ce50ee591d5c5d403df7 (diff) | |
download | ydb-ce1b7ca3171f9158180640c6a02a74b4afffedea.tar.gz |
Update Python from 3.11.8 to 3.12.2
Diffstat (limited to 'contrib/tools/python3/src/Lib/asyncio')
18 files changed, 479 insertions, 266 deletions
diff --git a/contrib/tools/python3/src/Lib/asyncio/__init__.py b/contrib/tools/python3/src/Lib/asyncio/__init__.py index fed16ec7c6..03165a425e 100644 --- a/contrib/tools/python3/src/Lib/asyncio/__init__.py +++ b/contrib/tools/python3/src/Lib/asyncio/__init__.py @@ -34,6 +34,7 @@ __all__ = (base_events.__all__ + streams.__all__ + subprocess.__all__ + tasks.__all__ + + taskgroups.__all__ + threads.__all__ + timeouts.__all__ + transports.__all__) diff --git a/contrib/tools/python3/src/Lib/asyncio/base_events.py b/contrib/tools/python3/src/Lib/asyncio/base_events.py index 5fa28cae9c..c16c445bde 100644 --- a/contrib/tools/python3/src/Lib/asyncio/base_events.py +++ b/contrib/tools/python3/src/Lib/asyncio/base_events.py @@ -306,7 +306,7 @@ class Server(events.AbstractServer): self._waiters = None for waiter in waiters: if not waiter.done(): - waiter.set_result(waiter) + waiter.set_result(None) def _start_serving(self): if self._serving: @@ -378,7 +378,27 @@ class Server(events.AbstractServer): self._serving_forever_fut = None async def wait_closed(self): - if self._sockets is None or self._waiters is None: + """Wait until server is closed and all connections are dropped. + + - If the server is not closed, wait. + - If it is closed, but there are still active connections, wait. + + Anyone waiting here will be unblocked once both conditions + (server is closed and all connections have been dropped) + have become true, in either order. + + Historical note: In 3.11 and before, this was broken, returning + immediately if the server was already closed, even if there + were still active connections. An attempted fix in 3.12.0 was + still broken, returning immediately if the server was still + open and there were no active connections. Hopefully in 3.12.1 + we have it right. + """ + # Waiters are unblocked by self._wakeup(), which is called + # from two places: self.close() and self._detach(), but only + # when both conditions have become true. To signal that this + # has happened, self._wakeup() sets self._waiters to None. + if self._waiters is None: return waiter = self._loop.create_future() self._waiters.append(waiter) @@ -562,8 +582,13 @@ class BaseEventLoop(events.AbstractEventLoop): 'asyncgen': agen }) - async def shutdown_default_executor(self): - """Schedule the shutdown of the default executor.""" + async def shutdown_default_executor(self, timeout=None): + """Schedule the shutdown of the default executor. + + The timeout parameter specifies the amount of time the executor will + be given to finish joining. The default value is None, which means + that the executor will be given an unlimited amount of time. + """ self._executor_shutdown_called = True if self._default_executor is None: return @@ -573,7 +598,13 @@ class BaseEventLoop(events.AbstractEventLoop): try: await future finally: - thread.join() + thread.join(timeout) + + if thread.is_alive(): + warnings.warn("The executor did not finishing joining " + f"its threads within {timeout} seconds.", + RuntimeWarning, stacklevel=2) + self._default_executor.shutdown(wait=False) def _do_shutdown(self, future): try: @@ -992,7 +1023,8 @@ class BaseEventLoop(events.AbstractEventLoop): local_addr=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, - happy_eyeballs_delay=None, interleave=None): + happy_eyeballs_delay=None, interleave=None, + all_errors=False): """Connect to a TCP server. Create a streaming transport connection to a given internet host and @@ -1082,6 +1114,8 @@ class BaseEventLoop(events.AbstractEventLoop): if sock is None: exceptions = [exc for sub in exceptions for exc in sub] try: + if all_errors: + raise ExceptionGroup("create_connection failed", exceptions) if len(exceptions) == 1: raise exceptions[0] else: @@ -1819,7 +1853,22 @@ class BaseEventLoop(events.AbstractEventLoop): exc_info=True) else: try: - self._exception_handler(self, context) + ctx = None + thing = context.get("task") + if thing is None: + # Even though Futures don't have a context, + # Task is a subclass of Future, + # and sometimes the 'future' key holds a Task. + thing = context.get("future") + if thing is None: + # Handles also have a context. + thing = context.get("handle") + if thing is not None and hasattr(thing, "get_context"): + ctx = thing.get_context() + if ctx is not None and hasattr(ctx, "run"): + ctx.run(self._exception_handler, self, context) + else: + self._exception_handler(self, context) except (SystemExit, KeyboardInterrupt): raise except BaseException as exc: diff --git a/contrib/tools/python3/src/Lib/asyncio/base_futures.py b/contrib/tools/python3/src/Lib/asyncio/base_futures.py index cd811a788c..7987963bd9 100644 --- a/contrib/tools/python3/src/Lib/asyncio/base_futures.py +++ b/contrib/tools/python3/src/Lib/asyncio/base_futures.py @@ -1,7 +1,6 @@ __all__ = () import reprlib -from _thread import get_ident from . import format_helpers diff --git a/contrib/tools/python3/src/Lib/asyncio/base_tasks.py b/contrib/tools/python3/src/Lib/asyncio/base_tasks.py index 26298e638c..c907b68341 100644 --- a/contrib/tools/python3/src/Lib/asyncio/base_tasks.py +++ b/contrib/tools/python3/src/Lib/asyncio/base_tasks.py @@ -15,11 +15,13 @@ def _task_repr_info(task): info.insert(1, 'name=%r' % task.get_name()) - coro = coroutines._format_coroutine(task._coro) - info.insert(2, f'coro=<{coro}>') - if task._fut_waiter is not None: - info.insert(3, f'wait_for={task._fut_waiter!r}') + info.insert(2, f'wait_for={task._fut_waiter!r}') + + if task._coro: + coro = coroutines._format_coroutine(task._coro) + info.insert(2, f'coro=<{coro}>') + return info diff --git a/contrib/tools/python3/src/Lib/asyncio/constants.py b/contrib/tools/python3/src/Lib/asyncio/constants.py index 0ad997a921..b60c1e4236 100644 --- a/contrib/tools/python3/src/Lib/asyncio/constants.py +++ b/contrib/tools/python3/src/Lib/asyncio/constants.py @@ -30,6 +30,9 @@ SENDFILE_FALLBACK_READBUFFER_SIZE = 1024 * 256 FLOW_CONTROL_HIGH_WATER_SSL_READ = 256 # KiB FLOW_CONTROL_HIGH_WATER_SSL_WRITE = 512 # KiB +# Default timeout for joining the threads in the threadpool +THREAD_JOIN_TIMEOUT = 300 + # The enum should be here to break circular dependencies between # base_events and sslproto class _SendfileMode(enum.Enum): diff --git a/contrib/tools/python3/src/Lib/asyncio/coroutines.py b/contrib/tools/python3/src/Lib/asyncio/coroutines.py index 0e4b489f30..ab4f30eb51 100644 --- a/contrib/tools/python3/src/Lib/asyncio/coroutines.py +++ b/contrib/tools/python3/src/Lib/asyncio/coroutines.py @@ -4,7 +4,6 @@ import collections.abc import inspect import os import sys -import traceback import types @@ -26,8 +25,7 @@ def iscoroutinefunction(func): # Prioritize native coroutine check to speed-up # asyncio.iscoroutine. -_COROUTINE_TYPES = (types.CoroutineType, types.GeneratorType, - collections.abc.Coroutine) +_COROUTINE_TYPES = (types.CoroutineType, collections.abc.Coroutine) _iscoroutine_typecache = set() diff --git a/contrib/tools/python3/src/Lib/asyncio/events.py b/contrib/tools/python3/src/Lib/asyncio/events.py index cfc62156e4..016852880c 100644 --- a/contrib/tools/python3/src/Lib/asyncio/events.py +++ b/contrib/tools/python3/src/Lib/asyncio/events.py @@ -17,6 +17,7 @@ __all__ = ( import contextvars import os +import signal import socket import subprocess import sys @@ -65,6 +66,9 @@ class Handle: info = self._repr_info() return '<{}>'.format(' '.join(info)) + def get_context(self): + return self._context + def cancel(self): if not self._cancelled: self._cancelled = True @@ -675,6 +679,23 @@ class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy): if (self._local._loop is None and not self._local._set_called and threading.current_thread() is threading.main_thread()): + stacklevel = 2 + try: + f = sys._getframe(1) + except AttributeError: + pass + else: + # Move up the call stack so that the warning is attached + # to the line outside asyncio itself. + while f: + module = f.f_globals.get('__name__') + if not (module == 'asyncio' or module.startswith('asyncio.')): + break + f = f.f_back + stacklevel += 1 + import warnings + warnings.warn('There is no current event loop', + DeprecationWarning, stacklevel=stacklevel) self.set_event_loop(self.new_event_loop()) if self._local._loop is None: @@ -786,14 +807,6 @@ def get_event_loop(): the result of `get_event_loop_policy().get_event_loop()` call. """ # NOTE: this function is implemented in C (see _asynciomodule.c) - return _py__get_event_loop() - - -def _get_event_loop(stacklevel=3): - # This internal method is going away in Python 3.12, left here only for - # backwards compatibility with 3.10.0 - 3.10.8 and 3.11.0. - # Similarly, this method's C equivalent in _asyncio is going away as well. - # See GH-99949 for more details. current_loop = _get_running_loop() if current_loop is not None: return current_loop @@ -826,7 +839,6 @@ _py__get_running_loop = _get_running_loop _py__set_running_loop = _set_running_loop _py_get_running_loop = get_running_loop _py_get_event_loop = get_event_loop -_py__get_event_loop = _get_event_loop try: @@ -834,7 +846,7 @@ try: # functions in asyncio. Pure Python implementation is # about 4 times slower than C-accelerated. from _asyncio import (_get_running_loop, _set_running_loop, - get_running_loop, get_event_loop, _get_event_loop) + get_running_loop, get_event_loop) except ImportError: pass else: @@ -843,4 +855,14 @@ else: _c__set_running_loop = _set_running_loop _c_get_running_loop = get_running_loop _c_get_event_loop = get_event_loop - _c__get_event_loop = _get_event_loop + + +if hasattr(os, 'fork'): + def on_fork(): + # Reset the loop and wakeupfd in the forked child process. + if _event_loop_policy is not None: + _event_loop_policy._local = BaseDefaultEventLoopPolicy._Local() + _set_running_loop(None) + signal.set_wakeup_fd(-1) + + os.register_at_fork(after_in_child=on_fork) diff --git a/contrib/tools/python3/src/Lib/asyncio/futures.py b/contrib/tools/python3/src/Lib/asyncio/futures.py index 3a6b44a091..97fc4e3fcb 100644 --- a/contrib/tools/python3/src/Lib/asyncio/futures.py +++ b/contrib/tools/python3/src/Lib/asyncio/futures.py @@ -77,7 +77,7 @@ class Future: the default event loop. """ if loop is None: - self._loop = events._get_event_loop() + self._loop = events.get_event_loop() else: self._loop = loop self._callbacks = [] @@ -413,7 +413,7 @@ def wrap_future(future, *, loop=None): assert isinstance(future, concurrent.futures.Future), \ f'concurrent.futures.Future is expected, got {future!r}' if loop is None: - loop = events._get_event_loop() + loop = events.get_event_loop() new_future = loop.create_future() _chain_future(future, new_future) return new_future diff --git a/contrib/tools/python3/src/Lib/asyncio/locks.py b/contrib/tools/python3/src/Lib/asyncio/locks.py index fd41dfd3f4..ce5d8d5bfb 100644 --- a/contrib/tools/python3/src/Lib/asyncio/locks.py +++ b/contrib/tools/python3/src/Lib/asyncio/locks.py @@ -8,7 +8,6 @@ import enum from . import exceptions from . import mixins -from . import tasks class _ContextManagerMixin: async def __aenter__(self): diff --git a/contrib/tools/python3/src/Lib/asyncio/proactor_events.py b/contrib/tools/python3/src/Lib/asyncio/proactor_events.py index c6aab408fc..1e2a730cf3 100644 --- a/contrib/tools/python3/src/Lib/asyncio/proactor_events.py +++ b/contrib/tools/python3/src/Lib/asyncio/proactor_events.py @@ -288,7 +288,8 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, # we got end-of-file so no need to reschedule a new read return - data = self._data[:length] + # It's a new slice so make it immutable so protocols upstream don't have problems + data = bytes(memoryview(self._data)[:length]) else: # the future will be replaced by next proactor.recv call fut.cancel() diff --git a/contrib/tools/python3/src/Lib/asyncio/runners.py b/contrib/tools/python3/src/Lib/asyncio/runners.py index b3e0c44b7f..1b89236599 100644 --- a/contrib/tools/python3/src/Lib/asyncio/runners.py +++ b/contrib/tools/python3/src/Lib/asyncio/runners.py @@ -5,12 +5,11 @@ import enum import functools import threading import signal -import sys from . import coroutines from . import events from . import exceptions from . import tasks - +from . import constants class _State(enum.Enum): CREATED = "created" @@ -70,7 +69,8 @@ class Runner: loop = self._loop _cancel_all_tasks(loop) loop.run_until_complete(loop.shutdown_asyncgens()) - loop.run_until_complete(loop.shutdown_default_executor()) + loop.run_until_complete( + loop.shutdown_default_executor(constants.THREAD_JOIN_TIMEOUT)) finally: if self._set_event_loop: events.set_event_loop(None) @@ -157,12 +157,12 @@ class Runner: raise KeyboardInterrupt() -def run(main, *, debug=None): +def run(main, *, debug=None, loop_factory=None): """Execute the coroutine and return the result. This function runs the passed coroutine, taking care of - managing the asyncio event loop and finalizing asynchronous - generators. + managing the asyncio event loop, finalizing asynchronous + generators and closing the default executor. This function cannot be called when another asyncio event loop is running in the same thread. @@ -173,6 +173,10 @@ def run(main, *, debug=None): It should be used as a main entry point for asyncio programs, and should ideally only be called once. + The executor is given a timeout duration of 5 minutes to shutdown. + If the executor hasn't finished within that duration, a warning is + emitted and the executor is closed. + Example: async def main(): @@ -186,7 +190,7 @@ def run(main, *, debug=None): raise RuntimeError( "asyncio.run() cannot be called from a running event loop") - with Runner(debug=debug) as runner: + with Runner(debug=debug, loop_factory=loop_factory) as runner: return runner.run(main) diff --git a/contrib/tools/python3/src/Lib/asyncio/selector_events.py b/contrib/tools/python3/src/Lib/asyncio/selector_events.py index 40df1b7da5..790711f834 100644 --- a/contrib/tools/python3/src/Lib/asyncio/selector_events.py +++ b/contrib/tools/python3/src/Lib/asyncio/selector_events.py @@ -9,6 +9,8 @@ __all__ = 'BaseSelectorEventLoop', import collections import errno import functools +import itertools +import os import selectors import socket import warnings @@ -28,6 +30,14 @@ from . import transports from . import trsock from .log import logger +_HAS_SENDMSG = hasattr(socket.socket, 'sendmsg') + +if _HAS_SENDMSG: + try: + SC_IOV_MAX = os.sysconf('SC_IOV_MAX') + except OSError: + # Fallback to send + _HAS_SENDMSG = False def _test_selector_event(selector, fd, event): # Test if the selector is monitoring 'event' events @@ -58,6 +68,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): def _make_socket_transport(self, sock, protocol, waiter=None, *, extra=None, server=None): + self._ensure_fd_no_transport(sock) return _SelectorSocketTransport(self, sock, protocol, waiter, extra, server) @@ -68,6 +79,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT, ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT, ): + self._ensure_fd_no_transport(rawsock) ssl_protocol = sslproto.SSLProtocol( self, protocol, sslcontext, waiter, server_side, server_hostname, @@ -80,6 +92,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): def _make_datagram_transport(self, sock, protocol, address=None, waiter=None, extra=None): + self._ensure_fd_no_transport(sock) return _SelectorDatagramTransport(self, sock, protocol, address, waiter, extra) @@ -758,8 +771,6 @@ class _SelectorTransport(transports._FlowControlMixin, max_size = 256 * 1024 # Buffer size passed to recv(). - _buffer_factory = bytearray # Constructs initial value for self._buffer. - # Attribute used in the destructor: it must be set even if the constructor # is not called (see _SelectorSslTransport which may start by raising an # exception) @@ -784,7 +795,7 @@ class _SelectorTransport(transports._FlowControlMixin, self.set_protocol(protocol) self._server = server - self._buffer = self._buffer_factory() + self._buffer = collections.deque() self._conn_lost = 0 # Set when call to connection_lost scheduled. self._closing = False # Set when close() called. self._paused = False # Set when pause_reading() called @@ -909,7 +920,7 @@ class _SelectorTransport(transports._FlowControlMixin, self._server = None def get_write_buffer_size(self): - return len(self._buffer) + return sum(map(len, self._buffer)) def _add_reader(self, fd, callback, *args): if not self.is_reading(): @@ -929,7 +940,10 @@ class _SelectorSocketTransport(_SelectorTransport): super().__init__(loop, sock, protocol, extra, server) self._eof = False self._empty_waiter = None - + if _HAS_SENDMSG: + self._write_ready = self._write_sendmsg + else: + self._write_ready = self._write_send # Disable the Nagle algorithm -- small writes will be # sent without waiting for the TCP ACK. This generally # decreases the latency (in some cases significantly.) @@ -1067,23 +1081,68 @@ class _SelectorSocketTransport(_SelectorTransport): self._fatal_error(exc, 'Fatal write error on socket transport') return else: - data = data[n:] + data = memoryview(data)[n:] if not data: return # Not all was written; register write handler. self._loop._add_writer(self._sock_fd, self._write_ready) # Add it to the buffer. - self._buffer.extend(data) + self._buffer.append(data) self._maybe_pause_protocol() - def _write_ready(self): + def _get_sendmsg_buffer(self): + return itertools.islice(self._buffer, SC_IOV_MAX) + + def _write_sendmsg(self): assert self._buffer, 'Data should not be empty' + if self._conn_lost: + return + try: + nbytes = self._sock.sendmsg(self._get_sendmsg_buffer()) + self._adjust_leftover_buffer(nbytes) + except (BlockingIOError, InterruptedError): + pass + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: + self._loop._remove_writer(self._sock_fd) + self._buffer.clear() + self._fatal_error(exc, 'Fatal write error on socket transport') + if self._empty_waiter is not None: + self._empty_waiter.set_exception(exc) + else: + self._maybe_resume_protocol() # May append to buffer. + if not self._buffer: + self._loop._remove_writer(self._sock_fd) + if self._empty_waiter is not None: + self._empty_waiter.set_result(None) + if self._closing: + self._call_connection_lost(None) + elif self._eof: + self._sock.shutdown(socket.SHUT_WR) + def _adjust_leftover_buffer(self, nbytes: int) -> None: + buffer = self._buffer + while nbytes: + b = buffer.popleft() + b_len = len(b) + if b_len <= nbytes: + nbytes -= b_len + else: + buffer.appendleft(b[nbytes:]) + break + + def _write_send(self): + assert self._buffer, 'Data should not be empty' if self._conn_lost: return try: - n = self._sock.send(self._buffer) + buffer = self._buffer.popleft() + n = self._sock.send(buffer) + if n != len(buffer): + # Not all data was written + self._buffer.appendleft(buffer[n:]) except (BlockingIOError, InterruptedError): pass except (SystemExit, KeyboardInterrupt): @@ -1095,8 +1154,6 @@ class _SelectorSocketTransport(_SelectorTransport): if self._empty_waiter is not None: self._empty_waiter.set_exception(exc) else: - if n: - del self._buffer[:n] self._maybe_resume_protocol() # May append to buffer. if not self._buffer: self._loop._remove_writer(self._sock_fd) @@ -1114,6 +1171,19 @@ class _SelectorSocketTransport(_SelectorTransport): if not self._buffer: self._sock.shutdown(socket.SHUT_WR) + def writelines(self, list_of_data): + if self._eof: + raise RuntimeError('Cannot call writelines() after write_eof()') + if self._empty_waiter is not None: + raise RuntimeError('unable to writelines; sendfile is in progress') + if not list_of_data: + return + self._buffer.extend([memoryview(data) for data in list_of_data]) + self._write_ready() + # If the entire buffer couldn't be written, register a write handler + if self._buffer: + self._loop._add_writer(self._sock_fd, self._write_ready) + def can_write_eof(self): return True @@ -1134,8 +1204,13 @@ class _SelectorSocketTransport(_SelectorTransport): def _reset_empty_waiter(self): self._empty_waiter = None + def close(self): + self._read_ready_cb = None + self._write_ready = None + super().close() + -class _SelectorDatagramTransport(_SelectorTransport): +class _SelectorDatagramTransport(_SelectorTransport, transports.DatagramTransport): _buffer_factory = collections.deque diff --git a/contrib/tools/python3/src/Lib/asyncio/streams.py b/contrib/tools/python3/src/Lib/asyncio/streams.py index 26ffc86584..f310aa2f36 100644 --- a/contrib/tools/python3/src/Lib/asyncio/streams.py +++ b/contrib/tools/python3/src/Lib/asyncio/streams.py @@ -125,7 +125,7 @@ class FlowControlMixin(protocols.Protocol): def __init__(self, loop=None): if loop is None: - self._loop = events._get_event_loop(stacklevel=4) + self._loop = events.get_event_loop() else: self._loop = loop self._paused = False @@ -393,7 +393,8 @@ class StreamWriter: async def start_tls(self, sslcontext, *, server_hostname=None, - ssl_handshake_timeout=None): + ssl_handshake_timeout=None, + ssl_shutdown_timeout=None): """Upgrade an existing stream-based connection to TLS.""" server_side = self._protocol._client_connected_cb is not None protocol = self._protocol @@ -401,7 +402,8 @@ class StreamWriter: new_transport = await self._loop.start_tls( # type: ignore self._transport, protocol, sslcontext, server_side=server_side, server_hostname=server_hostname, - ssl_handshake_timeout=ssl_handshake_timeout) + ssl_handshake_timeout=ssl_handshake_timeout, + ssl_shutdown_timeout=ssl_shutdown_timeout) self._transport = new_transport protocol._replace_writer(self) @@ -426,7 +428,7 @@ class StreamReader: self._limit = limit if loop is None: - self._loop = events._get_event_loop() + self._loop = events.get_event_loop() else: self._loop = loop self._buffer = bytearray() @@ -711,7 +713,7 @@ class StreamReader: await self._wait_for_data('read') # This will work right even if buffer is less than n bytes - data = bytes(self._buffer[:n]) + data = bytes(memoryview(self._buffer)[:n]) del self._buffer[:n] self._maybe_resume_transport() @@ -753,7 +755,7 @@ class StreamReader: data = bytes(self._buffer) self._buffer.clear() else: - data = bytes(self._buffer[:n]) + data = bytes(memoryview(self._buffer)[:n]) del self._buffer[:n] self._maybe_resume_transport() return data diff --git a/contrib/tools/python3/src/Lib/asyncio/subprocess.py b/contrib/tools/python3/src/Lib/asyncio/subprocess.py index da4f00a4a0..043359bbd0 100644 --- a/contrib/tools/python3/src/Lib/asyncio/subprocess.py +++ b/contrib/tools/python3/src/Lib/asyncio/subprocess.py @@ -148,10 +148,11 @@ class Process: async def _feed_stdin(self, input): debug = self._loop.get_debug() try: - self.stdin.write(input) - if debug: - logger.debug( - '%r communicate: feed stdin (%s bytes)', self, len(input)) + if input is not None: + self.stdin.write(input) + if debug: + logger.debug( + '%r communicate: feed stdin (%s bytes)', self, len(input)) await self.stdin.drain() except (BrokenPipeError, ConnectionResetError) as exc: @@ -185,7 +186,7 @@ class Process: return output async def communicate(self, input=None): - if input is not None: + if self.stdin is not None: stdin = self._feed_stdin(input) else: stdin = self._noop() diff --git a/contrib/tools/python3/src/Lib/asyncio/taskgroups.py b/contrib/tools/python3/src/Lib/asyncio/taskgroups.py index bfdbe63049..d264e51f1f 100644 --- a/contrib/tools/python3/src/Lib/asyncio/taskgroups.py +++ b/contrib/tools/python3/src/Lib/asyncio/taskgroups.py @@ -2,7 +2,7 @@ # license: PSFL. -__all__ = ["TaskGroup"] +__all__ = ("TaskGroup",) from . import events from . import exceptions @@ -162,8 +162,14 @@ class TaskGroup: else: task = self._loop.create_task(coro, context=context) tasks._set_task_name(task, name) - task.add_done_callback(self._on_task_done) - self._tasks.add(task) + # optimization: Immediately call the done callback if the task is + # already done (e.g. if the coro was able to complete eagerly), + # and skip scheduling a done callback + if task.done(): + self._on_task_done(task) + else: + self._tasks.add(task) + task.add_done_callback(self._on_task_done) return task # Since Python 3.8 Tasks propagate all exceptions correctly, diff --git a/contrib/tools/python3/src/Lib/asyncio/tasks.py b/contrib/tools/python3/src/Lib/asyncio/tasks.py index 6ca545e30a..65f2a6ef80 100644 --- a/contrib/tools/python3/src/Lib/asyncio/tasks.py +++ b/contrib/tools/python3/src/Lib/asyncio/tasks.py @@ -6,6 +6,7 @@ __all__ = ( 'wait', 'wait_for', 'as_completed', 'sleep', 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe', 'current_task', 'all_tasks', + 'create_eager_task_factory', 'eager_task_factory', '_register_task', '_unregister_task', '_enter_task', '_leave_task', ) @@ -24,7 +25,7 @@ from . import coroutines from . import events from . import exceptions from . import futures -from .coroutines import _is_coroutine +from . import timeouts # Helper to generate new task names # This uses itertools.count() instead of a "+= 1" operation because the latter @@ -43,22 +44,26 @@ def all_tasks(loop=None): """Return a set of all tasks for the loop.""" if loop is None: loop = events.get_running_loop() - # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another - # thread while we do so. Therefore we cast it to list prior to filtering. The list - # cast itself requires iteration, so we repeat it several times ignoring - # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for - # details. + # capturing the set of eager tasks first, so if an eager task "graduates" + # to a regular task in another thread, we don't risk missing it. + eager_tasks = list(_eager_tasks) + # Looping over the WeakSet isn't safe as it can be updated from another + # thread, therefore we cast it to list prior to filtering. The list cast + # itself requires iteration, so we repeat it several times ignoring + # RuntimeErrors (which are not very likely to occur). + # See issues 34970 and 36607 for details. + scheduled_tasks = None i = 0 while True: try: - tasks = list(_all_tasks) + scheduled_tasks = list(_scheduled_tasks) except RuntimeError: i += 1 if i >= 1000: raise else: break - return {t for t in tasks + return {t for t in itertools.chain(scheduled_tasks, eager_tasks) if futures._get_loop(t) is loop and not t.done()} @@ -103,7 +108,8 @@ class Task(futures._PyFuture): # Inherit Python Task implementation # status is still pending _log_destroy_pending = True - def __init__(self, coro, *, loop=None, name=None, context=None): + def __init__(self, coro, *, loop=None, name=None, context=None, + eager_start=False): super().__init__(loop=loop) if self._source_traceback: del self._source_traceback[-1] @@ -127,8 +133,11 @@ class Task(futures._PyFuture): # Inherit Python Task implementation else: self._context = context - self._loop.call_soon(self.__step, context=self._context) - _register_task(self) + if eager_start and self._loop.is_running(): + self.__eager_start() + else: + self._loop.call_soon(self.__step, context=self._context) + _register_task(self) def __del__(self): if self._state == futures._PENDING and self._log_destroy_pending: @@ -149,6 +158,9 @@ class Task(futures._PyFuture): # Inherit Python Task implementation def get_coro(self): return self._coro + def get_context(self): + return self._context + def get_name(self): return self._name @@ -257,6 +269,25 @@ class Task(futures._PyFuture): # Inherit Python Task implementation self._num_cancels_requested -= 1 return self._num_cancels_requested + def __eager_start(self): + prev_task = _swap_current_task(self._loop, self) + try: + _register_eager_task(self) + try: + self._context.run(self.__step_run_and_handle_result, None) + finally: + _unregister_eager_task(self) + finally: + try: + curtask = _swap_current_task(self._loop, prev_task) + assert curtask is self + finally: + if self.done(): + self._coro = None + self = None # Needed to break cycles when an exception occurs. + else: + _register_task(self) + def __step(self, exc=None): if self.done(): raise exceptions.InvalidStateError( @@ -265,11 +296,17 @@ class Task(futures._PyFuture): # Inherit Python Task implementation if not isinstance(exc, exceptions.CancelledError): exc = self._make_cancelled_error() self._must_cancel = False - coro = self._coro self._fut_waiter = None _enter_task(self._loop, self) - # Call either coro.throw(exc) or coro.send(None). + try: + self.__step_run_and_handle_result(exc) + finally: + _leave_task(self._loop, self) + self = None # Needed to break cycles when an exception occurs. + + def __step_run_and_handle_result(self, exc): + coro = self._coro try: if exc is None: # We use the `send` method directly, because coroutines @@ -341,7 +378,6 @@ class Task(futures._PyFuture): # Inherit Python Task implementation self._loop.call_soon( self.__step, new_exc, context=self._context) finally: - _leave_task(self._loop, self) self = None # Needed to break cycles when an exception occurs. def __wakeup(self, future): @@ -444,65 +480,44 @@ async def wait_for(fut, timeout): If the wait is cancelled, the task is also cancelled. + If the task supresses the cancellation and returns a value instead, + that value is returned. + This function is a coroutine. """ - loop = events.get_running_loop() + # The special case for timeout <= 0 is for the following case: + # + # async def test_waitfor(): + # func_started = False + # + # async def func(): + # nonlocal func_started + # func_started = True + # + # try: + # await asyncio.wait_for(func(), 0) + # except asyncio.TimeoutError: + # assert not func_started + # else: + # assert False + # + # asyncio.run(test_waitfor()) - if timeout is None: - return await fut - if timeout <= 0: - fut = ensure_future(fut, loop=loop) + if timeout is not None and timeout <= 0: + fut = ensure_future(fut) if fut.done(): return fut.result() - await _cancel_and_wait(fut, loop=loop) + await _cancel_and_wait(fut) try: return fut.result() except exceptions.CancelledError as exc: - raise exceptions.TimeoutError() from exc - - waiter = loop.create_future() - timeout_handle = loop.call_later(timeout, _release_waiter, waiter) - cb = functools.partial(_release_waiter, waiter) - - fut = ensure_future(fut, loop=loop) - fut.add_done_callback(cb) - - try: - # wait until the future completes or the timeout - try: - await waiter - except exceptions.CancelledError: - if fut.done(): - return fut.result() - else: - fut.remove_done_callback(cb) - # We must ensure that the task is not running - # after wait_for() returns. - # See https://bugs.python.org/issue32751 - await _cancel_and_wait(fut, loop=loop) - raise - - if fut.done(): - return fut.result() - else: - fut.remove_done_callback(cb) - # We must ensure that the task is not running - # after wait_for() returns. - # See https://bugs.python.org/issue32751 - await _cancel_and_wait(fut, loop=loop) - # In case task cancellation failed with some - # exception, we should re-raise it - # See https://bugs.python.org/issue40607 - try: - return fut.result() - except exceptions.CancelledError as exc: - raise exceptions.TimeoutError() from exc - finally: - timeout_handle.cancel() + raise TimeoutError from exc + async with timeouts.timeout(timeout): + return await fut async def _wait(fs, timeout, return_when, loop): """Internal helper for wait(). @@ -548,9 +563,10 @@ async def _wait(fs, timeout, return_when, loop): return done, pending -async def _cancel_and_wait(fut, loop): +async def _cancel_and_wait(fut): """Cancel the *fut* future or task and wait until it completes.""" + loop = events.get_running_loop() waiter = loop.create_future() cb = functools.partial(_release_waiter, waiter) fut.add_done_callback(cb) @@ -589,7 +605,7 @@ def as_completed(fs, *, timeout=None): from .queues import Queue # Import here to avoid circular import problem. done = Queue() - loop = events._get_event_loop() + loop = events.get_event_loop() todo = {ensure_future(f, loop=loop) for f in set(fs)} timeout_handle = None @@ -656,46 +672,33 @@ def ensure_future(coro_or_future, *, loop=None): If the argument is a Future, it is returned directly. """ - return _ensure_future(coro_or_future, loop=loop) - - -def _ensure_future(coro_or_future, *, loop=None): if futures.isfuture(coro_or_future): if loop is not None and loop is not futures._get_loop(coro_or_future): raise ValueError('The future belongs to a different loop than ' 'the one specified as the loop argument') return coro_or_future - called_wrap_awaitable = False + should_close = True if not coroutines.iscoroutine(coro_or_future): if inspect.isawaitable(coro_or_future): + async def _wrap_awaitable(awaitable): + return await awaitable + coro_or_future = _wrap_awaitable(coro_or_future) - called_wrap_awaitable = True + should_close = False else: raise TypeError('An asyncio.Future, a coroutine or an awaitable ' 'is required') if loop is None: - loop = events._get_event_loop(stacklevel=4) + loop = events.get_event_loop() try: return loop.create_task(coro_or_future) except RuntimeError: - if not called_wrap_awaitable: + if should_close: coro_or_future.close() raise -@types.coroutine -def _wrap_awaitable(awaitable): - """Helper for asyncio.ensure_future(). - - Wraps awaitable (an object with __await__) into a coroutine - that will later be wrapped in a Task by ensure_future(). - """ - return (yield from awaitable.__await__()) - -_wrap_awaitable._is_coroutine = _is_coroutine - - class _GatheringFuture(futures.Future): """Helper for gather(). @@ -756,7 +759,7 @@ def gather(*coros_or_futures, return_exceptions=False): gather won't cancel any other awaitables. """ if not coros_or_futures: - loop = events._get_event_loop() + loop = events.get_event_loop() outer = loop.create_future() outer.set_result([]) return outer @@ -820,11 +823,12 @@ def gather(*coros_or_futures, return_exceptions=False): children = [] nfuts = 0 nfinished = 0 + done_futs = [] loop = None outer = None # bpo-46672 for arg in coros_or_futures: if arg not in arg_to_fut: - fut = _ensure_future(arg, loop=loop) + fut = ensure_future(arg, loop=loop) if loop is None: loop = futures._get_loop(fut) if fut is not arg: @@ -836,7 +840,10 @@ def gather(*coros_or_futures, return_exceptions=False): nfuts += 1 arg_to_fut[arg] = fut - fut.add_done_callback(_done_callback) + if fut.done(): + done_futs.append(fut) + else: + fut.add_done_callback(_done_callback) else: # There's a duplicate Future object in coros_or_futures. @@ -845,6 +852,13 @@ def gather(*coros_or_futures, return_exceptions=False): children.append(fut) outer = _GatheringFuture(children, loop=loop) + # Run done callbacks after GatheringFuture created so any post-processing + # can be performed at this point + # optimization: in the special case that *all* futures finished eagerly, + # this will effectively complete the gather eagerly, with the last + # callback setting the result (or exception) on outer before returning it + for fut in done_futs: + _done_callback(fut) return outer @@ -881,7 +895,7 @@ def shield(arg): weak references to tasks. A task that isn't referenced elsewhere may get garbage collected at any time, even before it's done. """ - inner = _ensure_future(arg) + inner = ensure_future(arg) if inner.done(): # Shortcut. return inner @@ -937,8 +951,40 @@ def run_coroutine_threadsafe(coro, loop): return future -# WeakSet containing all alive tasks. -_all_tasks = weakref.WeakSet() +def create_eager_task_factory(custom_task_constructor): + """Create a function suitable for use as a task factory on an event-loop. + + Example usage: + + loop.set_task_factory( + asyncio.create_eager_task_factory(my_task_constructor)) + + Now, tasks created will be started immediately (rather than being first + scheduled to an event loop). The constructor argument can be any callable + that returns a Task-compatible object and has a signature compatible + with `Task.__init__`; it must have the `eager_start` keyword argument. + + Most applications will use `Task` for `custom_task_constructor` and in + this case there's no need to call `create_eager_task_factory()` + directly. Instead the global `eager_task_factory` instance can be + used. E.g. `loop.set_task_factory(asyncio.eager_task_factory)`. + """ + + def factory(loop, coro, *, name=None, context=None): + return custom_task_constructor( + coro, loop=loop, name=name, context=context, eager_start=True) + + return factory + + +eager_task_factory = create_eager_task_factory(Task) + + +# Collectively these two sets hold references to the complete set of active +# tasks. Eagerly executed tasks use a faster regular set as an optimization +# but may graduate to a WeakSet if the task blocks on IO. +_scheduled_tasks = weakref.WeakSet() +_eager_tasks = set() # Dictionary containing tasks that are currently active in # all running event loops. {EventLoop: Task} @@ -946,8 +992,13 @@ _current_tasks = {} def _register_task(task): - """Register a new task in asyncio as executed by loop.""" - _all_tasks.add(task) + """Register an asyncio Task scheduled to run on an event loop.""" + _scheduled_tasks.add(task) + + +def _register_eager_task(task): + """Register an asyncio Task about to be eagerly executed.""" + _eager_tasks.add(task) def _enter_task(loop, task): @@ -966,25 +1017,49 @@ def _leave_task(loop, task): del _current_tasks[loop] +def _swap_current_task(loop, task): + prev_task = _current_tasks.get(loop) + if task is None: + del _current_tasks[loop] + else: + _current_tasks[loop] = task + return prev_task + + def _unregister_task(task): - """Unregister a task.""" - _all_tasks.discard(task) + """Unregister a completed, scheduled Task.""" + _scheduled_tasks.discard(task) + + +def _unregister_eager_task(task): + """Unregister a task which finished its first eager step.""" + _eager_tasks.discard(task) +_py_current_task = current_task _py_register_task = _register_task +_py_register_eager_task = _register_eager_task _py_unregister_task = _unregister_task +_py_unregister_eager_task = _unregister_eager_task _py_enter_task = _enter_task _py_leave_task = _leave_task +_py_swap_current_task = _swap_current_task try: - from _asyncio import (_register_task, _unregister_task, - _enter_task, _leave_task, - _all_tasks, _current_tasks) + from _asyncio import (_register_task, _register_eager_task, + _unregister_task, _unregister_eager_task, + _enter_task, _leave_task, _swap_current_task, + _scheduled_tasks, _eager_tasks, _current_tasks, + current_task) except ImportError: pass else: + _c_current_task = current_task _c_register_task = _register_task + _c_register_eager_task = _register_eager_task _c_unregister_task = _unregister_task + _c_unregister_eager_task = _unregister_eager_task _c_enter_task = _enter_task _c_leave_task = _leave_task + _c_swap_current_task = _swap_current_task diff --git a/contrib/tools/python3/src/Lib/asyncio/unix_events.py b/contrib/tools/python3/src/Lib/asyncio/unix_events.py index 77d2670f83..f2e920ada4 100644 --- a/contrib/tools/python3/src/Lib/asyncio/unix_events.py +++ b/contrib/tools/python3/src/Lib/asyncio/unix_events.py @@ -195,22 +195,25 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): async def _make_subprocess_transport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, extra=None, **kwargs): - with events.get_child_watcher() as watcher: + with warnings.catch_warnings(): + warnings.simplefilter('ignore', DeprecationWarning) + watcher = events.get_child_watcher() + + with watcher: if not watcher.is_active(): # Check early. # Raising exception before process creation # prevents subprocess execution if the watcher # is not ready to handle it. raise RuntimeError("asyncio.get_child_watcher() is not activated, " - "subprocess support is not installed.") + "subprocess support is not installed.") waiter = self.create_future() transp = _UnixSubprocessTransport(self, protocol, args, shell, - stdin, stdout, stderr, bufsize, - waiter=waiter, extra=extra, - **kwargs) - + stdin, stdout, stderr, bufsize, + waiter=waiter, extra=extra, + **kwargs) watcher.add_child_handler(transp.get_pid(), - self._child_watcher_callback, transp) + self._child_watcher_callback, transp) try: await waiter except (SystemExit, KeyboardInterrupt): @@ -223,8 +226,7 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): return transp def _child_watcher_callback(self, pid, returncode, transp): - # Skip one iteration for callbacks to be executed - self.call_soon_threadsafe(self.call_soon, transp._process_exited, returncode) + self.call_soon_threadsafe(transp._process_exited, returncode) async def create_unix_connection( self, protocol_factory, path=None, *, @@ -851,6 +853,13 @@ class AbstractChildWatcher: waitpid(-1), there should be only one active object per process. """ + def __init_subclass__(cls) -> None: + if cls.__module__ != __name__: + warnings._deprecated("AbstractChildWatcher", + "{name!r} is deprecated as of Python 3.12 and will be " + "removed in Python {remove}.", + remove=(3, 14)) + def add_child_handler(self, pid, callback, *args): """Register a new child handler. @@ -919,10 +928,6 @@ class PidfdChildWatcher(AbstractChildWatcher): recent (5.3+) kernels. """ - def __init__(self): - self._loop = None - self._callbacks = {} - def __enter__(self): return self @@ -930,35 +935,22 @@ class PidfdChildWatcher(AbstractChildWatcher): pass def is_active(self): - return self._loop is not None and self._loop.is_running() + return True def close(self): - self.attach_loop(None) + pass def attach_loop(self, loop): - if self._loop is not None and loop is None and self._callbacks: - warnings.warn( - 'A loop is being detached ' - 'from a child watcher with pending handlers', - RuntimeWarning) - for pidfd, _, _ in self._callbacks.values(): - self._loop._remove_reader(pidfd) - os.close(pidfd) - self._callbacks.clear() - self._loop = loop + pass def add_child_handler(self, pid, callback, *args): - existing = self._callbacks.get(pid) - if existing is not None: - self._callbacks[pid] = existing[0], callback, args - else: - pidfd = os.pidfd_open(pid) - self._loop._add_reader(pidfd, self._do_wait, pid) - self._callbacks[pid] = pidfd, callback, args + loop = events.get_running_loop() + pidfd = os.pidfd_open(pid) + loop._add_reader(pidfd, self._do_wait, pid, pidfd, callback, args) - def _do_wait(self, pid): - pidfd, callback, args = self._callbacks.pop(pid) - self._loop._remove_reader(pidfd) + def _do_wait(self, pid, pidfd, callback, args): + loop = events.get_running_loop() + loop._remove_reader(pidfd) try: _, status = os.waitpid(pid, 0) except ChildProcessError: @@ -976,12 +968,9 @@ class PidfdChildWatcher(AbstractChildWatcher): callback(pid, returncode, *args) def remove_child_handler(self, pid): - try: - pidfd, _, _ = self._callbacks.pop(pid) - except KeyError: - return False - self._loop._remove_reader(pidfd) - os.close(pidfd) + # asyncio never calls remove_child_handler() !!! + # The method is no-op but is implemented because + # abstract base classes require it. return True @@ -1049,6 +1038,13 @@ class SafeChildWatcher(BaseChildWatcher): big number of children (O(n) each time SIGCHLD is raised) """ + def __init__(self): + super().__init__() + warnings._deprecated("SafeChildWatcher", + "{name!r} is deprecated as of Python 3.12 and will be " + "removed in Python {remove}.", + remove=(3, 14)) + def close(self): self._callbacks.clear() super().close() @@ -1127,6 +1123,10 @@ class FastChildWatcher(BaseChildWatcher): self._lock = threading.Lock() self._zombies = {} self._forks = 0 + warnings._deprecated("FastChildWatcher", + "{name!r} is deprecated as of Python 3.12 and will be " + "removed in Python {remove}.", + remove=(3, 14)) def close(self): self._callbacks.clear() @@ -1239,6 +1239,10 @@ class MultiLoopChildWatcher(AbstractChildWatcher): def __init__(self): self._callbacks = {} self._saved_sighandler = None + warnings._deprecated("MultiLoopChildWatcher", + "{name!r} is deprecated as of Python 3.12 and will be " + "removed in Python {remove}.", + remove=(3, 14)) def is_active(self): return self._saved_sighandler is not None @@ -1423,6 +1427,17 @@ class ThreadedChildWatcher(AbstractChildWatcher): self._threads.pop(expected_pid) +def can_use_pidfd(): + if not hasattr(os, 'pidfd_open'): + return False + try: + pid = os.getpid() + os.close(os.pidfd_open(pid, 0)) + except OSError: + # blocked by security policy like SECCOMP + return False + return True + class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): """UNIX event loop policy with a watcher for child processes.""" @@ -1435,7 +1450,10 @@ class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): def _init_watcher(self): with events._lock: if self._watcher is None: # pragma: no branch - self._watcher = ThreadedChildWatcher() + if can_use_pidfd(): + self._watcher = PidfdChildWatcher() + else: + self._watcher = ThreadedChildWatcher() def set_event_loop(self, loop): """Set the event loop. @@ -1459,6 +1477,9 @@ class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): if self._watcher is None: self._init_watcher() + warnings._deprecated("get_child_watcher", + "{name!r} is deprecated as of Python 3.12 and will be " + "removed in Python {remove}.", remove=(3, 14)) return self._watcher def set_child_watcher(self, watcher): @@ -1470,6 +1491,9 @@ class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): self._watcher.close() self._watcher = watcher + warnings._deprecated("set_child_watcher", + "{name!r} is deprecated as of Python 3.12 and will be " + "removed in Python {remove}.", remove=(3, 14)) SelectorEventLoop = _UnixSelectorEventLoop diff --git a/contrib/tools/python3/src/Lib/asyncio/windows_events.py b/contrib/tools/python3/src/Lib/asyncio/windows_events.py index eb33551b41..c9a5fb841c 100644 --- a/contrib/tools/python3/src/Lib/asyncio/windows_events.py +++ b/contrib/tools/python3/src/Lib/asyncio/windows_events.py @@ -455,6 +455,17 @@ class IocpProactor: fut.set_result(value) return fut + @staticmethod + def finish_socket_func(trans, key, ov): + try: + return ov.getresult() + except OSError as exc: + if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, + _overlapped.ERROR_OPERATION_ABORTED): + raise ConnectionResetError(*exc.args) + else: + raise + def recv(self, conn, nbytes, flags=0): self._register_with_iocp(conn) ov = _overlapped.Overlapped(NULL) @@ -466,17 +477,7 @@ class IocpProactor: except BrokenPipeError: return self._result(b'') - def finish_recv(trans, key, ov): - try: - return ov.getresult() - except OSError as exc: - if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, - _overlapped.ERROR_OPERATION_ABORTED): - raise ConnectionResetError(*exc.args) - else: - raise - - return self._register(ov, conn, finish_recv) + return self._register(ov, conn, self.finish_socket_func) def recv_into(self, conn, buf, flags=0): self._register_with_iocp(conn) @@ -489,17 +490,7 @@ class IocpProactor: except BrokenPipeError: return self._result(0) - def finish_recv(trans, key, ov): - try: - return ov.getresult() - except OSError as exc: - if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, - _overlapped.ERROR_OPERATION_ABORTED): - raise ConnectionResetError(*exc.args) - else: - raise - - return self._register(ov, conn, finish_recv) + return self._register(ov, conn, self.finish_socket_func) def recvfrom(self, conn, nbytes, flags=0): self._register_with_iocp(conn) @@ -509,17 +500,7 @@ class IocpProactor: except BrokenPipeError: return self._result((b'', None)) - def finish_recv(trans, key, ov): - try: - return ov.getresult() - except OSError as exc: - if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, - _overlapped.ERROR_OPERATION_ABORTED): - raise ConnectionResetError(*exc.args) - else: - raise - - return self._register(ov, conn, finish_recv) + return self._register(ov, conn, self.finish_socket_func) def recvfrom_into(self, conn, buf, flags=0): self._register_with_iocp(conn) @@ -547,17 +528,7 @@ class IocpProactor: ov.WSASendTo(conn.fileno(), buf, flags, addr) - def finish_send(trans, key, ov): - try: - return ov.getresult() - except OSError as exc: - if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, - _overlapped.ERROR_OPERATION_ABORTED): - raise ConnectionResetError(*exc.args) - else: - raise - - return self._register(ov, conn, finish_send) + return self._register(ov, conn, self.finish_socket_func) def send(self, conn, buf, flags=0): self._register_with_iocp(conn) @@ -567,17 +538,7 @@ class IocpProactor: else: ov.WriteFile(conn.fileno(), buf) - def finish_send(trans, key, ov): - try: - return ov.getresult() - except OSError as exc: - if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, - _overlapped.ERROR_OPERATION_ABORTED): - raise ConnectionResetError(*exc.args) - else: - raise - - return self._register(ov, conn, finish_send) + return self._register(ov, conn, self.finish_socket_func) def accept(self, listener): self._register_with_iocp(listener) @@ -648,16 +609,7 @@ class IocpProactor: offset_low, offset_high, count, 0, 0) - def finish_sendfile(trans, key, ov): - try: - return ov.getresult() - except OSError as exc: - if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, - _overlapped.ERROR_OPERATION_ABORTED): - raise ConnectionResetError(*exc.args) - else: - raise - return self._register(ov, sock, finish_sendfile) + return self._register(ov, sock, self.finish_socket_func) def accept_pipe(self, pipe): self._register_with_iocp(pipe) |