diff options
author | shadchin <shadchin@yandex-team.com> | 2024-12-23 19:39:02 +0300 |
---|---|---|
committer | shadchin <shadchin@yandex-team.com> | 2024-12-23 19:54:20 +0300 |
commit | 65a5bf9d37a3b29eb394f560b9a09318196c40e8 (patch) | |
tree | e5cd68fb0682b2388e52d9806bb87adc348e21a8 /contrib/tools/python3/Lib/asyncio | |
parent | a1dd87a52878ab3e46e5fd2dba5ecbba6113d7e0 (diff) | |
download | ydb-65a5bf9d37a3b29eb394f560b9a09318196c40e8.tar.gz |
Update Python 3 to 3.12.8
commit_hash:c20045b8a987d8720e1f3328270357491d5530f3
Diffstat (limited to 'contrib/tools/python3/Lib/asyncio')
-rw-r--r-- | contrib/tools/python3/Lib/asyncio/__main__.py | 6 | ||||
-rw-r--r-- | contrib/tools/python3/Lib/asyncio/base_events.py | 18 | ||||
-rw-r--r-- | contrib/tools/python3/Lib/asyncio/futures.py | 6 | ||||
-rw-r--r-- | contrib/tools/python3/Lib/asyncio/sslproto.py | 5 | ||||
-rw-r--r-- | contrib/tools/python3/Lib/asyncio/staggered.py | 18 | ||||
-rw-r--r-- | contrib/tools/python3/Lib/asyncio/taskgroups.py | 41 |
6 files changed, 69 insertions, 25 deletions
diff --git a/contrib/tools/python3/Lib/asyncio/__main__.py b/contrib/tools/python3/Lib/asyncio/__main__.py index 0465580115..29e528aeed 100644 --- a/contrib/tools/python3/Lib/asyncio/__main__.py +++ b/contrib/tools/python3/Lib/asyncio/__main__.py @@ -2,6 +2,7 @@ import ast import asyncio import code import concurrent.futures +import contextvars import inspect import sys import threading @@ -17,6 +18,7 @@ class AsyncIOInteractiveConsole(code.InteractiveConsole): super().__init__(locals) self.compile.compiler.flags |= ast.PyCF_ALLOW_TOP_LEVEL_AWAIT self.loop = loop + self.context = contextvars.copy_context() def runcode(self, code): future = concurrent.futures.Future() @@ -46,12 +48,12 @@ class AsyncIOInteractiveConsole(code.InteractiveConsole): return try: - repl_future = self.loop.create_task(coro) + repl_future = self.loop.create_task(coro, context=self.context) futures._chain_future(repl_future, future) except BaseException as exc: future.set_exception(exc) - loop.call_soon_threadsafe(callback) + loop.call_soon_threadsafe(callback, context=self.context) try: return future.result() diff --git a/contrib/tools/python3/Lib/asyncio/base_events.py b/contrib/tools/python3/Lib/asyncio/base_events.py index cb037fd472..3146f7f3f6 100644 --- a/contrib/tools/python3/Lib/asyncio/base_events.py +++ b/contrib/tools/python3/Lib/asyncio/base_events.py @@ -17,7 +17,6 @@ import collections import collections.abc import concurrent.futures import errno -import functools import heapq import itertools import os @@ -1106,11 +1105,18 @@ class BaseEventLoop(events.AbstractEventLoop): except OSError: continue else: # using happy eyeballs - sock, _, _ = await staggered.staggered_race( - (functools.partial(self._connect_sock, - exceptions, addrinfo, laddr_infos) - for addrinfo in infos), - happy_eyeballs_delay, loop=self) + sock = (await staggered.staggered_race( + ( + # can't use functools.partial as it keeps a reference + # to exceptions + lambda addrinfo=addrinfo: self._connect_sock( + exceptions, addrinfo, laddr_infos + ) + for addrinfo in infos + ), + happy_eyeballs_delay, + loop=self, + ))[0] # can't use sock, _, _ as it keeks a reference to exceptions if sock is None: exceptions = [exc for sub in exceptions for exc in sub] diff --git a/contrib/tools/python3/Lib/asyncio/futures.py b/contrib/tools/python3/Lib/asyncio/futures.py index fd486f02c6..0c530bbdbc 100644 --- a/contrib/tools/python3/Lib/asyncio/futures.py +++ b/contrib/tools/python3/Lib/asyncio/futures.py @@ -194,8 +194,7 @@ class Future: the future is done and has an exception set, this exception is raised. """ if self._state == _CANCELLED: - exc = self._make_cancelled_error() - raise exc + raise self._make_cancelled_error() if self._state != _FINISHED: raise exceptions.InvalidStateError('Result is not ready.') self.__log_traceback = False @@ -212,8 +211,7 @@ class Future: InvalidStateError. """ if self._state == _CANCELLED: - exc = self._make_cancelled_error() - raise exc + raise self._make_cancelled_error() if self._state != _FINISHED: raise exceptions.InvalidStateError('Exception is not set.') self.__log_traceback = False diff --git a/contrib/tools/python3/Lib/asyncio/sslproto.py b/contrib/tools/python3/Lib/asyncio/sslproto.py index e51669a2ab..29e72b1fd9 100644 --- a/contrib/tools/python3/Lib/asyncio/sslproto.py +++ b/contrib/tools/python3/Lib/asyncio/sslproto.py @@ -101,7 +101,7 @@ class _SSLProtocolTransport(transports._FlowControlMixin, return self._ssl_protocol._app_protocol def is_closing(self): - return self._closed + return self._closed or self._ssl_protocol._is_transport_closing() def close(self): """Close the transport. @@ -379,6 +379,9 @@ class SSLProtocol(protocols.BufferedProtocol): self._app_transport_created = True return self._app_transport + def _is_transport_closing(self): + return self._transport is not None and self._transport.is_closing() + def connection_made(self, transport): """Called when the low-level connection is made. diff --git a/contrib/tools/python3/Lib/asyncio/staggered.py b/contrib/tools/python3/Lib/asyncio/staggered.py index c3a7441a7b..0f4df8855a 100644 --- a/contrib/tools/python3/Lib/asyncio/staggered.py +++ b/contrib/tools/python3/Lib/asyncio/staggered.py @@ -69,7 +69,11 @@ async def staggered_race(coro_fns, delay, *, loop=None): exceptions = [] running_tasks = [] - async def run_one_coro(previous_failed) -> None: + async def run_one_coro(ok_to_start, previous_failed) -> None: + # in eager tasks this waits for the calling task to append this task + # to running_tasks, in regular tasks this wait is a no-op that does + # not yield a future. See gh-124309. + await ok_to_start.wait() # Wait for the previous task to finish, or for delay seconds if previous_failed is not None: with contextlib.suppress(exceptions_mod.TimeoutError): @@ -85,8 +89,12 @@ async def staggered_race(coro_fns, delay, *, loop=None): return # Start task that will run the next coroutine this_failed = locks.Event() - next_task = loop.create_task(run_one_coro(this_failed)) + next_ok_to_start = locks.Event() + next_task = loop.create_task(run_one_coro(next_ok_to_start, this_failed)) running_tasks.append(next_task) + # next_task has been appended to running_tasks so next_task is ok to + # start. + next_ok_to_start.set() assert len(running_tasks) == this_index + 2 # Prepare place to put this coroutine's exceptions if not won exceptions.append(None) @@ -116,8 +124,11 @@ async def staggered_race(coro_fns, delay, *, loop=None): if i != this_index: t.cancel() - first_task = loop.create_task(run_one_coro(None)) + ok_to_start = locks.Event() + first_task = loop.create_task(run_one_coro(ok_to_start, None)) running_tasks.append(first_task) + # first_task has been appended to running_tasks so first_task is ok to start. + ok_to_start.set() try: # Wait for a growing list of tasks to all finish: poor man's version of # curio's TaskGroup or trio's nursery @@ -133,6 +144,7 @@ async def staggered_race(coro_fns, delay, *, loop=None): raise d.exception() return winner_result, winner_index, exceptions finally: + del exceptions # Make sure no tasks are left running if we leave this function for t in running_tasks: t.cancel() diff --git a/contrib/tools/python3/Lib/asyncio/taskgroups.py b/contrib/tools/python3/Lib/asyncio/taskgroups.py index d264e51f1f..aada3ffa8e 100644 --- a/contrib/tools/python3/Lib/asyncio/taskgroups.py +++ b/contrib/tools/python3/Lib/asyncio/taskgroups.py @@ -66,6 +66,20 @@ class TaskGroup: return self async def __aexit__(self, et, exc, tb): + tb = None + try: + return await self._aexit(et, exc) + finally: + # Exceptions are heavy objects that can have object + # cycles (bad for GC); let's not keep a reference to + # a bunch of them. It would be nicer to use a try/finally + # in __aexit__ directly but that introduced some diff noise + self._parent_task = None + self._errors = None + self._base_error = None + exc = None + + async def _aexit(self, et, exc): self._exiting = True if (exc is not None and @@ -126,25 +140,34 @@ class TaskGroup: assert not self._tasks if self._base_error is not None: - raise self._base_error + try: + raise self._base_error + finally: + exc = None # Propagate CancelledError if there is one, except if there # are other errors -- those have priority. - if propagate_cancellation_error and not self._errors: - raise propagate_cancellation_error + try: + if propagate_cancellation_error and not self._errors: + try: + raise propagate_cancellation_error + finally: + exc = None + finally: + propagate_cancellation_error = None if et is not None and et is not exceptions.CancelledError: self._errors.append(exc) if self._errors: - # Exceptions are heavy objects that can have object - # cycles (bad for GC); let's not keep a reference to - # a bunch of them. try: - me = BaseExceptionGroup('unhandled errors in a TaskGroup', self._errors) - raise me from None + raise BaseExceptionGroup( + 'unhandled errors in a TaskGroup', + self._errors, + ) from None finally: - self._errors = None + exc = None + def create_task(self, coro, *, name=None, context=None): """Create a new task in this group and return it. |