aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/tools/python3/src/Lib/asyncio/tasks.py
diff options
context:
space:
mode:
authorshadchin <shadchin@yandex-team.com>2024-02-12 07:53:52 +0300
committershadchin <shadchin@yandex-team.com>2024-02-12 08:07:36 +0300
commitce1b7ca3171f9158180640c6a02a74b4afffedea (patch)
treee47c1e8391b1b0128262c1e9b1e6ed4c8fff2348 /contrib/tools/python3/src/Lib/asyncio/tasks.py
parent57350d96f030db90f220ce50ee591d5c5d403df7 (diff)
downloadydb-ce1b7ca3171f9158180640c6a02a74b4afffedea.tar.gz
Update Python from 3.11.8 to 3.12.2
Diffstat (limited to 'contrib/tools/python3/src/Lib/asyncio/tasks.py')
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/tasks.py267
1 files changed, 171 insertions, 96 deletions
diff --git a/contrib/tools/python3/src/Lib/asyncio/tasks.py b/contrib/tools/python3/src/Lib/asyncio/tasks.py
index 6ca545e30a..65f2a6ef80 100644
--- a/contrib/tools/python3/src/Lib/asyncio/tasks.py
+++ b/contrib/tools/python3/src/Lib/asyncio/tasks.py
@@ -6,6 +6,7 @@ __all__ = (
'wait', 'wait_for', 'as_completed', 'sleep',
'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
'current_task', 'all_tasks',
+ 'create_eager_task_factory', 'eager_task_factory',
'_register_task', '_unregister_task', '_enter_task', '_leave_task',
)
@@ -24,7 +25,7 @@ from . import coroutines
from . import events
from . import exceptions
from . import futures
-from .coroutines import _is_coroutine
+from . import timeouts
# Helper to generate new task names
# This uses itertools.count() instead of a "+= 1" operation because the latter
@@ -43,22 +44,26 @@ def all_tasks(loop=None):
"""Return a set of all tasks for the loop."""
if loop is None:
loop = events.get_running_loop()
- # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
- # thread while we do so. Therefore we cast it to list prior to filtering. The list
- # cast itself requires iteration, so we repeat it several times ignoring
- # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
- # details.
+ # capturing the set of eager tasks first, so if an eager task "graduates"
+ # to a regular task in another thread, we don't risk missing it.
+ eager_tasks = list(_eager_tasks)
+ # Looping over the WeakSet isn't safe as it can be updated from another
+ # thread, therefore we cast it to list prior to filtering. The list cast
+ # itself requires iteration, so we repeat it several times ignoring
+ # RuntimeErrors (which are not very likely to occur).
+ # See issues 34970 and 36607 for details.
+ scheduled_tasks = None
i = 0
while True:
try:
- tasks = list(_all_tasks)
+ scheduled_tasks = list(_scheduled_tasks)
except RuntimeError:
i += 1
if i >= 1000:
raise
else:
break
- return {t for t in tasks
+ return {t for t in itertools.chain(scheduled_tasks, eager_tasks)
if futures._get_loop(t) is loop and not t.done()}
@@ -103,7 +108,8 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
# status is still pending
_log_destroy_pending = True
- def __init__(self, coro, *, loop=None, name=None, context=None):
+ def __init__(self, coro, *, loop=None, name=None, context=None,
+ eager_start=False):
super().__init__(loop=loop)
if self._source_traceback:
del self._source_traceback[-1]
@@ -127,8 +133,11 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
else:
self._context = context
- self._loop.call_soon(self.__step, context=self._context)
- _register_task(self)
+ if eager_start and self._loop.is_running():
+ self.__eager_start()
+ else:
+ self._loop.call_soon(self.__step, context=self._context)
+ _register_task(self)
def __del__(self):
if self._state == futures._PENDING and self._log_destroy_pending:
@@ -149,6 +158,9 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
def get_coro(self):
return self._coro
+ def get_context(self):
+ return self._context
+
def get_name(self):
return self._name
@@ -257,6 +269,25 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
self._num_cancels_requested -= 1
return self._num_cancels_requested
+ def __eager_start(self):
+ prev_task = _swap_current_task(self._loop, self)
+ try:
+ _register_eager_task(self)
+ try:
+ self._context.run(self.__step_run_and_handle_result, None)
+ finally:
+ _unregister_eager_task(self)
+ finally:
+ try:
+ curtask = _swap_current_task(self._loop, prev_task)
+ assert curtask is self
+ finally:
+ if self.done():
+ self._coro = None
+ self = None # Needed to break cycles when an exception occurs.
+ else:
+ _register_task(self)
+
def __step(self, exc=None):
if self.done():
raise exceptions.InvalidStateError(
@@ -265,11 +296,17 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
if not isinstance(exc, exceptions.CancelledError):
exc = self._make_cancelled_error()
self._must_cancel = False
- coro = self._coro
self._fut_waiter = None
_enter_task(self._loop, self)
- # Call either coro.throw(exc) or coro.send(None).
+ try:
+ self.__step_run_and_handle_result(exc)
+ finally:
+ _leave_task(self._loop, self)
+ self = None # Needed to break cycles when an exception occurs.
+
+ def __step_run_and_handle_result(self, exc):
+ coro = self._coro
try:
if exc is None:
# We use the `send` method directly, because coroutines
@@ -341,7 +378,6 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
self._loop.call_soon(
self.__step, new_exc, context=self._context)
finally:
- _leave_task(self._loop, self)
self = None # Needed to break cycles when an exception occurs.
def __wakeup(self, future):
@@ -444,65 +480,44 @@ async def wait_for(fut, timeout):
If the wait is cancelled, the task is also cancelled.
+ If the task supresses the cancellation and returns a value instead,
+ that value is returned.
+
This function is a coroutine.
"""
- loop = events.get_running_loop()
+ # The special case for timeout <= 0 is for the following case:
+ #
+ # async def test_waitfor():
+ # func_started = False
+ #
+ # async def func():
+ # nonlocal func_started
+ # func_started = True
+ #
+ # try:
+ # await asyncio.wait_for(func(), 0)
+ # except asyncio.TimeoutError:
+ # assert not func_started
+ # else:
+ # assert False
+ #
+ # asyncio.run(test_waitfor())
- if timeout is None:
- return await fut
- if timeout <= 0:
- fut = ensure_future(fut, loop=loop)
+ if timeout is not None and timeout <= 0:
+ fut = ensure_future(fut)
if fut.done():
return fut.result()
- await _cancel_and_wait(fut, loop=loop)
+ await _cancel_and_wait(fut)
try:
return fut.result()
except exceptions.CancelledError as exc:
- raise exceptions.TimeoutError() from exc
-
- waiter = loop.create_future()
- timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
- cb = functools.partial(_release_waiter, waiter)
-
- fut = ensure_future(fut, loop=loop)
- fut.add_done_callback(cb)
-
- try:
- # wait until the future completes or the timeout
- try:
- await waiter
- except exceptions.CancelledError:
- if fut.done():
- return fut.result()
- else:
- fut.remove_done_callback(cb)
- # We must ensure that the task is not running
- # after wait_for() returns.
- # See https://bugs.python.org/issue32751
- await _cancel_and_wait(fut, loop=loop)
- raise
-
- if fut.done():
- return fut.result()
- else:
- fut.remove_done_callback(cb)
- # We must ensure that the task is not running
- # after wait_for() returns.
- # See https://bugs.python.org/issue32751
- await _cancel_and_wait(fut, loop=loop)
- # In case task cancellation failed with some
- # exception, we should re-raise it
- # See https://bugs.python.org/issue40607
- try:
- return fut.result()
- except exceptions.CancelledError as exc:
- raise exceptions.TimeoutError() from exc
- finally:
- timeout_handle.cancel()
+ raise TimeoutError from exc
+ async with timeouts.timeout(timeout):
+ return await fut
async def _wait(fs, timeout, return_when, loop):
"""Internal helper for wait().
@@ -548,9 +563,10 @@ async def _wait(fs, timeout, return_when, loop):
return done, pending
-async def _cancel_and_wait(fut, loop):
+async def _cancel_and_wait(fut):
"""Cancel the *fut* future or task and wait until it completes."""
+ loop = events.get_running_loop()
waiter = loop.create_future()
cb = functools.partial(_release_waiter, waiter)
fut.add_done_callback(cb)
@@ -589,7 +605,7 @@ def as_completed(fs, *, timeout=None):
from .queues import Queue # Import here to avoid circular import problem.
done = Queue()
- loop = events._get_event_loop()
+ loop = events.get_event_loop()
todo = {ensure_future(f, loop=loop) for f in set(fs)}
timeout_handle = None
@@ -656,46 +672,33 @@ def ensure_future(coro_or_future, *, loop=None):
If the argument is a Future, it is returned directly.
"""
- return _ensure_future(coro_or_future, loop=loop)
-
-
-def _ensure_future(coro_or_future, *, loop=None):
if futures.isfuture(coro_or_future):
if loop is not None and loop is not futures._get_loop(coro_or_future):
raise ValueError('The future belongs to a different loop than '
'the one specified as the loop argument')
return coro_or_future
- called_wrap_awaitable = False
+ should_close = True
if not coroutines.iscoroutine(coro_or_future):
if inspect.isawaitable(coro_or_future):
+ async def _wrap_awaitable(awaitable):
+ return await awaitable
+
coro_or_future = _wrap_awaitable(coro_or_future)
- called_wrap_awaitable = True
+ should_close = False
else:
raise TypeError('An asyncio.Future, a coroutine or an awaitable '
'is required')
if loop is None:
- loop = events._get_event_loop(stacklevel=4)
+ loop = events.get_event_loop()
try:
return loop.create_task(coro_or_future)
except RuntimeError:
- if not called_wrap_awaitable:
+ if should_close:
coro_or_future.close()
raise
-@types.coroutine
-def _wrap_awaitable(awaitable):
- """Helper for asyncio.ensure_future().
-
- Wraps awaitable (an object with __await__) into a coroutine
- that will later be wrapped in a Task by ensure_future().
- """
- return (yield from awaitable.__await__())
-
-_wrap_awaitable._is_coroutine = _is_coroutine
-
-
class _GatheringFuture(futures.Future):
"""Helper for gather().
@@ -756,7 +759,7 @@ def gather(*coros_or_futures, return_exceptions=False):
gather won't cancel any other awaitables.
"""
if not coros_or_futures:
- loop = events._get_event_loop()
+ loop = events.get_event_loop()
outer = loop.create_future()
outer.set_result([])
return outer
@@ -820,11 +823,12 @@ def gather(*coros_or_futures, return_exceptions=False):
children = []
nfuts = 0
nfinished = 0
+ done_futs = []
loop = None
outer = None # bpo-46672
for arg in coros_or_futures:
if arg not in arg_to_fut:
- fut = _ensure_future(arg, loop=loop)
+ fut = ensure_future(arg, loop=loop)
if loop is None:
loop = futures._get_loop(fut)
if fut is not arg:
@@ -836,7 +840,10 @@ def gather(*coros_or_futures, return_exceptions=False):
nfuts += 1
arg_to_fut[arg] = fut
- fut.add_done_callback(_done_callback)
+ if fut.done():
+ done_futs.append(fut)
+ else:
+ fut.add_done_callback(_done_callback)
else:
# There's a duplicate Future object in coros_or_futures.
@@ -845,6 +852,13 @@ def gather(*coros_or_futures, return_exceptions=False):
children.append(fut)
outer = _GatheringFuture(children, loop=loop)
+ # Run done callbacks after GatheringFuture created so any post-processing
+ # can be performed at this point
+ # optimization: in the special case that *all* futures finished eagerly,
+ # this will effectively complete the gather eagerly, with the last
+ # callback setting the result (or exception) on outer before returning it
+ for fut in done_futs:
+ _done_callback(fut)
return outer
@@ -881,7 +895,7 @@ def shield(arg):
weak references to tasks. A task that isn't referenced elsewhere
may get garbage collected at any time, even before it's done.
"""
- inner = _ensure_future(arg)
+ inner = ensure_future(arg)
if inner.done():
# Shortcut.
return inner
@@ -937,8 +951,40 @@ def run_coroutine_threadsafe(coro, loop):
return future
-# WeakSet containing all alive tasks.
-_all_tasks = weakref.WeakSet()
+def create_eager_task_factory(custom_task_constructor):
+ """Create a function suitable for use as a task factory on an event-loop.
+
+ Example usage:
+
+ loop.set_task_factory(
+ asyncio.create_eager_task_factory(my_task_constructor))
+
+ Now, tasks created will be started immediately (rather than being first
+ scheduled to an event loop). The constructor argument can be any callable
+ that returns a Task-compatible object and has a signature compatible
+ with `Task.__init__`; it must have the `eager_start` keyword argument.
+
+ Most applications will use `Task` for `custom_task_constructor` and in
+ this case there's no need to call `create_eager_task_factory()`
+ directly. Instead the global `eager_task_factory` instance can be
+ used. E.g. `loop.set_task_factory(asyncio.eager_task_factory)`.
+ """
+
+ def factory(loop, coro, *, name=None, context=None):
+ return custom_task_constructor(
+ coro, loop=loop, name=name, context=context, eager_start=True)
+
+ return factory
+
+
+eager_task_factory = create_eager_task_factory(Task)
+
+
+# Collectively these two sets hold references to the complete set of active
+# tasks. Eagerly executed tasks use a faster regular set as an optimization
+# but may graduate to a WeakSet if the task blocks on IO.
+_scheduled_tasks = weakref.WeakSet()
+_eager_tasks = set()
# Dictionary containing tasks that are currently active in
# all running event loops. {EventLoop: Task}
@@ -946,8 +992,13 @@ _current_tasks = {}
def _register_task(task):
- """Register a new task in asyncio as executed by loop."""
- _all_tasks.add(task)
+ """Register an asyncio Task scheduled to run on an event loop."""
+ _scheduled_tasks.add(task)
+
+
+def _register_eager_task(task):
+ """Register an asyncio Task about to be eagerly executed."""
+ _eager_tasks.add(task)
def _enter_task(loop, task):
@@ -966,25 +1017,49 @@ def _leave_task(loop, task):
del _current_tasks[loop]
+def _swap_current_task(loop, task):
+ prev_task = _current_tasks.get(loop)
+ if task is None:
+ del _current_tasks[loop]
+ else:
+ _current_tasks[loop] = task
+ return prev_task
+
+
def _unregister_task(task):
- """Unregister a task."""
- _all_tasks.discard(task)
+ """Unregister a completed, scheduled Task."""
+ _scheduled_tasks.discard(task)
+
+
+def _unregister_eager_task(task):
+ """Unregister a task which finished its first eager step."""
+ _eager_tasks.discard(task)
+_py_current_task = current_task
_py_register_task = _register_task
+_py_register_eager_task = _register_eager_task
_py_unregister_task = _unregister_task
+_py_unregister_eager_task = _unregister_eager_task
_py_enter_task = _enter_task
_py_leave_task = _leave_task
+_py_swap_current_task = _swap_current_task
try:
- from _asyncio import (_register_task, _unregister_task,
- _enter_task, _leave_task,
- _all_tasks, _current_tasks)
+ from _asyncio import (_register_task, _register_eager_task,
+ _unregister_task, _unregister_eager_task,
+ _enter_task, _leave_task, _swap_current_task,
+ _scheduled_tasks, _eager_tasks, _current_tasks,
+ current_task)
except ImportError:
pass
else:
+ _c_current_task = current_task
_c_register_task = _register_task
+ _c_register_eager_task = _register_eager_task
_c_unregister_task = _unregister_task
+ _c_unregister_eager_task = _unregister_eager_task
_c_enter_task = _enter_task
_c_leave_task = _leave_task
+ _c_swap_current_task = _swap_current_task