diff options
author | shadchin <shadchin@yandex-team.com> | 2024-02-12 07:53:52 +0300 |
---|---|---|
committer | shadchin <shadchin@yandex-team.com> | 2024-02-12 08:07:36 +0300 |
commit | ce1b7ca3171f9158180640c6a02a74b4afffedea (patch) | |
tree | e47c1e8391b1b0128262c1e9b1e6ed4c8fff2348 /contrib/tools/python3/src/Lib/asyncio/tasks.py | |
parent | 57350d96f030db90f220ce50ee591d5c5d403df7 (diff) | |
download | ydb-ce1b7ca3171f9158180640c6a02a74b4afffedea.tar.gz |
Update Python from 3.11.8 to 3.12.2
Diffstat (limited to 'contrib/tools/python3/src/Lib/asyncio/tasks.py')
-rw-r--r-- | contrib/tools/python3/src/Lib/asyncio/tasks.py | 267 |
1 files changed, 171 insertions, 96 deletions
diff --git a/contrib/tools/python3/src/Lib/asyncio/tasks.py b/contrib/tools/python3/src/Lib/asyncio/tasks.py index 6ca545e30a..65f2a6ef80 100644 --- a/contrib/tools/python3/src/Lib/asyncio/tasks.py +++ b/contrib/tools/python3/src/Lib/asyncio/tasks.py @@ -6,6 +6,7 @@ __all__ = ( 'wait', 'wait_for', 'as_completed', 'sleep', 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe', 'current_task', 'all_tasks', + 'create_eager_task_factory', 'eager_task_factory', '_register_task', '_unregister_task', '_enter_task', '_leave_task', ) @@ -24,7 +25,7 @@ from . import coroutines from . import events from . import exceptions from . import futures -from .coroutines import _is_coroutine +from . import timeouts # Helper to generate new task names # This uses itertools.count() instead of a "+= 1" operation because the latter @@ -43,22 +44,26 @@ def all_tasks(loop=None): """Return a set of all tasks for the loop.""" if loop is None: loop = events.get_running_loop() - # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another - # thread while we do so. Therefore we cast it to list prior to filtering. The list - # cast itself requires iteration, so we repeat it several times ignoring - # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for - # details. + # capturing the set of eager tasks first, so if an eager task "graduates" + # to a regular task in another thread, we don't risk missing it. + eager_tasks = list(_eager_tasks) + # Looping over the WeakSet isn't safe as it can be updated from another + # thread, therefore we cast it to list prior to filtering. The list cast + # itself requires iteration, so we repeat it several times ignoring + # RuntimeErrors (which are not very likely to occur). + # See issues 34970 and 36607 for details. + scheduled_tasks = None i = 0 while True: try: - tasks = list(_all_tasks) + scheduled_tasks = list(_scheduled_tasks) except RuntimeError: i += 1 if i >= 1000: raise else: break - return {t for t in tasks + return {t for t in itertools.chain(scheduled_tasks, eager_tasks) if futures._get_loop(t) is loop and not t.done()} @@ -103,7 +108,8 @@ class Task(futures._PyFuture): # Inherit Python Task implementation # status is still pending _log_destroy_pending = True - def __init__(self, coro, *, loop=None, name=None, context=None): + def __init__(self, coro, *, loop=None, name=None, context=None, + eager_start=False): super().__init__(loop=loop) if self._source_traceback: del self._source_traceback[-1] @@ -127,8 +133,11 @@ class Task(futures._PyFuture): # Inherit Python Task implementation else: self._context = context - self._loop.call_soon(self.__step, context=self._context) - _register_task(self) + if eager_start and self._loop.is_running(): + self.__eager_start() + else: + self._loop.call_soon(self.__step, context=self._context) + _register_task(self) def __del__(self): if self._state == futures._PENDING and self._log_destroy_pending: @@ -149,6 +158,9 @@ class Task(futures._PyFuture): # Inherit Python Task implementation def get_coro(self): return self._coro + def get_context(self): + return self._context + def get_name(self): return self._name @@ -257,6 +269,25 @@ class Task(futures._PyFuture): # Inherit Python Task implementation self._num_cancels_requested -= 1 return self._num_cancels_requested + def __eager_start(self): + prev_task = _swap_current_task(self._loop, self) + try: + _register_eager_task(self) + try: + self._context.run(self.__step_run_and_handle_result, None) + finally: + _unregister_eager_task(self) + finally: + try: + curtask = _swap_current_task(self._loop, prev_task) + assert curtask is self + finally: + if self.done(): + self._coro = None + self = None # Needed to break cycles when an exception occurs. + else: + _register_task(self) + def __step(self, exc=None): if self.done(): raise exceptions.InvalidStateError( @@ -265,11 +296,17 @@ class Task(futures._PyFuture): # Inherit Python Task implementation if not isinstance(exc, exceptions.CancelledError): exc = self._make_cancelled_error() self._must_cancel = False - coro = self._coro self._fut_waiter = None _enter_task(self._loop, self) - # Call either coro.throw(exc) or coro.send(None). + try: + self.__step_run_and_handle_result(exc) + finally: + _leave_task(self._loop, self) + self = None # Needed to break cycles when an exception occurs. + + def __step_run_and_handle_result(self, exc): + coro = self._coro try: if exc is None: # We use the `send` method directly, because coroutines @@ -341,7 +378,6 @@ class Task(futures._PyFuture): # Inherit Python Task implementation self._loop.call_soon( self.__step, new_exc, context=self._context) finally: - _leave_task(self._loop, self) self = None # Needed to break cycles when an exception occurs. def __wakeup(self, future): @@ -444,65 +480,44 @@ async def wait_for(fut, timeout): If the wait is cancelled, the task is also cancelled. + If the task supresses the cancellation and returns a value instead, + that value is returned. + This function is a coroutine. """ - loop = events.get_running_loop() + # The special case for timeout <= 0 is for the following case: + # + # async def test_waitfor(): + # func_started = False + # + # async def func(): + # nonlocal func_started + # func_started = True + # + # try: + # await asyncio.wait_for(func(), 0) + # except asyncio.TimeoutError: + # assert not func_started + # else: + # assert False + # + # asyncio.run(test_waitfor()) - if timeout is None: - return await fut - if timeout <= 0: - fut = ensure_future(fut, loop=loop) + if timeout is not None and timeout <= 0: + fut = ensure_future(fut) if fut.done(): return fut.result() - await _cancel_and_wait(fut, loop=loop) + await _cancel_and_wait(fut) try: return fut.result() except exceptions.CancelledError as exc: - raise exceptions.TimeoutError() from exc - - waiter = loop.create_future() - timeout_handle = loop.call_later(timeout, _release_waiter, waiter) - cb = functools.partial(_release_waiter, waiter) - - fut = ensure_future(fut, loop=loop) - fut.add_done_callback(cb) - - try: - # wait until the future completes or the timeout - try: - await waiter - except exceptions.CancelledError: - if fut.done(): - return fut.result() - else: - fut.remove_done_callback(cb) - # We must ensure that the task is not running - # after wait_for() returns. - # See https://bugs.python.org/issue32751 - await _cancel_and_wait(fut, loop=loop) - raise - - if fut.done(): - return fut.result() - else: - fut.remove_done_callback(cb) - # We must ensure that the task is not running - # after wait_for() returns. - # See https://bugs.python.org/issue32751 - await _cancel_and_wait(fut, loop=loop) - # In case task cancellation failed with some - # exception, we should re-raise it - # See https://bugs.python.org/issue40607 - try: - return fut.result() - except exceptions.CancelledError as exc: - raise exceptions.TimeoutError() from exc - finally: - timeout_handle.cancel() + raise TimeoutError from exc + async with timeouts.timeout(timeout): + return await fut async def _wait(fs, timeout, return_when, loop): """Internal helper for wait(). @@ -548,9 +563,10 @@ async def _wait(fs, timeout, return_when, loop): return done, pending -async def _cancel_and_wait(fut, loop): +async def _cancel_and_wait(fut): """Cancel the *fut* future or task and wait until it completes.""" + loop = events.get_running_loop() waiter = loop.create_future() cb = functools.partial(_release_waiter, waiter) fut.add_done_callback(cb) @@ -589,7 +605,7 @@ def as_completed(fs, *, timeout=None): from .queues import Queue # Import here to avoid circular import problem. done = Queue() - loop = events._get_event_loop() + loop = events.get_event_loop() todo = {ensure_future(f, loop=loop) for f in set(fs)} timeout_handle = None @@ -656,46 +672,33 @@ def ensure_future(coro_or_future, *, loop=None): If the argument is a Future, it is returned directly. """ - return _ensure_future(coro_or_future, loop=loop) - - -def _ensure_future(coro_or_future, *, loop=None): if futures.isfuture(coro_or_future): if loop is not None and loop is not futures._get_loop(coro_or_future): raise ValueError('The future belongs to a different loop than ' 'the one specified as the loop argument') return coro_or_future - called_wrap_awaitable = False + should_close = True if not coroutines.iscoroutine(coro_or_future): if inspect.isawaitable(coro_or_future): + async def _wrap_awaitable(awaitable): + return await awaitable + coro_or_future = _wrap_awaitable(coro_or_future) - called_wrap_awaitable = True + should_close = False else: raise TypeError('An asyncio.Future, a coroutine or an awaitable ' 'is required') if loop is None: - loop = events._get_event_loop(stacklevel=4) + loop = events.get_event_loop() try: return loop.create_task(coro_or_future) except RuntimeError: - if not called_wrap_awaitable: + if should_close: coro_or_future.close() raise -@types.coroutine -def _wrap_awaitable(awaitable): - """Helper for asyncio.ensure_future(). - - Wraps awaitable (an object with __await__) into a coroutine - that will later be wrapped in a Task by ensure_future(). - """ - return (yield from awaitable.__await__()) - -_wrap_awaitable._is_coroutine = _is_coroutine - - class _GatheringFuture(futures.Future): """Helper for gather(). @@ -756,7 +759,7 @@ def gather(*coros_or_futures, return_exceptions=False): gather won't cancel any other awaitables. """ if not coros_or_futures: - loop = events._get_event_loop() + loop = events.get_event_loop() outer = loop.create_future() outer.set_result([]) return outer @@ -820,11 +823,12 @@ def gather(*coros_or_futures, return_exceptions=False): children = [] nfuts = 0 nfinished = 0 + done_futs = [] loop = None outer = None # bpo-46672 for arg in coros_or_futures: if arg not in arg_to_fut: - fut = _ensure_future(arg, loop=loop) + fut = ensure_future(arg, loop=loop) if loop is None: loop = futures._get_loop(fut) if fut is not arg: @@ -836,7 +840,10 @@ def gather(*coros_or_futures, return_exceptions=False): nfuts += 1 arg_to_fut[arg] = fut - fut.add_done_callback(_done_callback) + if fut.done(): + done_futs.append(fut) + else: + fut.add_done_callback(_done_callback) else: # There's a duplicate Future object in coros_or_futures. @@ -845,6 +852,13 @@ def gather(*coros_or_futures, return_exceptions=False): children.append(fut) outer = _GatheringFuture(children, loop=loop) + # Run done callbacks after GatheringFuture created so any post-processing + # can be performed at this point + # optimization: in the special case that *all* futures finished eagerly, + # this will effectively complete the gather eagerly, with the last + # callback setting the result (or exception) on outer before returning it + for fut in done_futs: + _done_callback(fut) return outer @@ -881,7 +895,7 @@ def shield(arg): weak references to tasks. A task that isn't referenced elsewhere may get garbage collected at any time, even before it's done. """ - inner = _ensure_future(arg) + inner = ensure_future(arg) if inner.done(): # Shortcut. return inner @@ -937,8 +951,40 @@ def run_coroutine_threadsafe(coro, loop): return future -# WeakSet containing all alive tasks. -_all_tasks = weakref.WeakSet() +def create_eager_task_factory(custom_task_constructor): + """Create a function suitable for use as a task factory on an event-loop. + + Example usage: + + loop.set_task_factory( + asyncio.create_eager_task_factory(my_task_constructor)) + + Now, tasks created will be started immediately (rather than being first + scheduled to an event loop). The constructor argument can be any callable + that returns a Task-compatible object and has a signature compatible + with `Task.__init__`; it must have the `eager_start` keyword argument. + + Most applications will use `Task` for `custom_task_constructor` and in + this case there's no need to call `create_eager_task_factory()` + directly. Instead the global `eager_task_factory` instance can be + used. E.g. `loop.set_task_factory(asyncio.eager_task_factory)`. + """ + + def factory(loop, coro, *, name=None, context=None): + return custom_task_constructor( + coro, loop=loop, name=name, context=context, eager_start=True) + + return factory + + +eager_task_factory = create_eager_task_factory(Task) + + +# Collectively these two sets hold references to the complete set of active +# tasks. Eagerly executed tasks use a faster regular set as an optimization +# but may graduate to a WeakSet if the task blocks on IO. +_scheduled_tasks = weakref.WeakSet() +_eager_tasks = set() # Dictionary containing tasks that are currently active in # all running event loops. {EventLoop: Task} @@ -946,8 +992,13 @@ _current_tasks = {} def _register_task(task): - """Register a new task in asyncio as executed by loop.""" - _all_tasks.add(task) + """Register an asyncio Task scheduled to run on an event loop.""" + _scheduled_tasks.add(task) + + +def _register_eager_task(task): + """Register an asyncio Task about to be eagerly executed.""" + _eager_tasks.add(task) def _enter_task(loop, task): @@ -966,25 +1017,49 @@ def _leave_task(loop, task): del _current_tasks[loop] +def _swap_current_task(loop, task): + prev_task = _current_tasks.get(loop) + if task is None: + del _current_tasks[loop] + else: + _current_tasks[loop] = task + return prev_task + + def _unregister_task(task): - """Unregister a task.""" - _all_tasks.discard(task) + """Unregister a completed, scheduled Task.""" + _scheduled_tasks.discard(task) + + +def _unregister_eager_task(task): + """Unregister a task which finished its first eager step.""" + _eager_tasks.discard(task) +_py_current_task = current_task _py_register_task = _register_task +_py_register_eager_task = _register_eager_task _py_unregister_task = _unregister_task +_py_unregister_eager_task = _unregister_eager_task _py_enter_task = _enter_task _py_leave_task = _leave_task +_py_swap_current_task = _swap_current_task try: - from _asyncio import (_register_task, _unregister_task, - _enter_task, _leave_task, - _all_tasks, _current_tasks) + from _asyncio import (_register_task, _register_eager_task, + _unregister_task, _unregister_eager_task, + _enter_task, _leave_task, _swap_current_task, + _scheduled_tasks, _eager_tasks, _current_tasks, + current_task) except ImportError: pass else: + _c_current_task = current_task _c_register_task = _register_task + _c_register_eager_task = _register_eager_task _c_unregister_task = _unregister_task + _c_unregister_eager_task = _unregister_eager_task _c_enter_task = _enter_task _c_leave_task = _leave_task + _c_swap_current_task = _swap_current_task |