aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/tools/python3/Lib/asyncio/tasks.py
diff options
context:
space:
mode:
authorthegeorg <thegeorg@yandex-team.com>2024-02-19 02:38:52 +0300
committerthegeorg <thegeorg@yandex-team.com>2024-02-19 02:50:43 +0300
commitd96fa07134c06472bfee6718b5cfd1679196fc99 (patch)
tree31ec344fa9d3ff8dc038692516b6438dfbdb8a2d /contrib/tools/python3/Lib/asyncio/tasks.py
parent452cf9e068aef7110e35e654c5d47eb80111ef89 (diff)
downloadydb-d96fa07134c06472bfee6718b5cfd1679196fc99.tar.gz
Sync contrib/tools/python3 layout with upstream
* Move src/ subdir contents to the top of the layout * Rename self-written lib -> lib2 to avoid CaseFolding warning from the VCS * Regenerate contrib/libs/python proxy-headers accordingly 4ccc62ac1511abcf0fed14ccade38e984e088f1e
Diffstat (limited to 'contrib/tools/python3/Lib/asyncio/tasks.py')
-rw-r--r--contrib/tools/python3/Lib/asyncio/tasks.py1065
1 files changed, 1065 insertions, 0 deletions
diff --git a/contrib/tools/python3/Lib/asyncio/tasks.py b/contrib/tools/python3/Lib/asyncio/tasks.py
new file mode 100644
index 0000000000..65f2a6ef80
--- /dev/null
+++ b/contrib/tools/python3/Lib/asyncio/tasks.py
@@ -0,0 +1,1065 @@
+"""Support for tasks, coroutines and the scheduler."""
+
+__all__ = (
+ 'Task', 'create_task',
+ 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
+ 'wait', 'wait_for', 'as_completed', 'sleep',
+ 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
+ 'current_task', 'all_tasks',
+ 'create_eager_task_factory', 'eager_task_factory',
+ '_register_task', '_unregister_task', '_enter_task', '_leave_task',
+)
+
+import concurrent.futures
+import contextvars
+import functools
+import inspect
+import itertools
+import types
+import warnings
+import weakref
+from types import GenericAlias
+
+from . import base_tasks
+from . import coroutines
+from . import events
+from . import exceptions
+from . import futures
+from . import timeouts
+
+# Helper to generate new task names
+# This uses itertools.count() instead of a "+= 1" operation because the latter
+# is not thread safe. See bpo-11866 for a longer explanation.
+_task_name_counter = itertools.count(1).__next__
+
+
+def current_task(loop=None):
+ """Return a currently executed task."""
+ if loop is None:
+ loop = events.get_running_loop()
+ return _current_tasks.get(loop)
+
+
+def all_tasks(loop=None):
+ """Return a set of all tasks for the loop."""
+ if loop is None:
+ loop = events.get_running_loop()
+ # capturing the set of eager tasks first, so if an eager task "graduates"
+ # to a regular task in another thread, we don't risk missing it.
+ eager_tasks = list(_eager_tasks)
+ # Looping over the WeakSet isn't safe as it can be updated from another
+ # thread, therefore we cast it to list prior to filtering. The list cast
+ # itself requires iteration, so we repeat it several times ignoring
+ # RuntimeErrors (which are not very likely to occur).
+ # See issues 34970 and 36607 for details.
+ scheduled_tasks = None
+ i = 0
+ while True:
+ try:
+ scheduled_tasks = list(_scheduled_tasks)
+ except RuntimeError:
+ i += 1
+ if i >= 1000:
+ raise
+ else:
+ break
+ return {t for t in itertools.chain(scheduled_tasks, eager_tasks)
+ if futures._get_loop(t) is loop and not t.done()}
+
+
+def _set_task_name(task, name):
+ if name is not None:
+ try:
+ set_name = task.set_name
+ except AttributeError:
+ warnings.warn("Task.set_name() was added in Python 3.8, "
+ "the method support will be mandatory for third-party "
+ "task implementations since 3.13.",
+ DeprecationWarning, stacklevel=3)
+ else:
+ set_name(name)
+
+
+class Task(futures._PyFuture): # Inherit Python Task implementation
+ # from a Python Future implementation.
+
+ """A coroutine wrapped in a Future."""
+
+ # An important invariant maintained while a Task not done:
+ # _fut_waiter is either None or a Future. The Future
+ # can be either done() or not done().
+ # The task can be in any of 3 states:
+ #
+ # - 1: _fut_waiter is not None and not _fut_waiter.done():
+ # __step() is *not* scheduled and the Task is waiting for _fut_waiter.
+ # - 2: (_fut_waiter is None or _fut_waiter.done()) and __step() is scheduled:
+ # the Task is waiting for __step() to be executed.
+ # - 3: _fut_waiter is None and __step() is *not* scheduled:
+ # the Task is currently executing (in __step()).
+ #
+ # * In state 1, one of the callbacks of __fut_waiter must be __wakeup().
+ # * The transition from 1 to 2 happens when _fut_waiter becomes done(),
+ # as it schedules __wakeup() to be called (which calls __step() so
+ # we way that __step() is scheduled).
+ # * It transitions from 2 to 3 when __step() is executed, and it clears
+ # _fut_waiter to None.
+
+ # If False, don't log a message if the task is destroyed while its
+ # status is still pending
+ _log_destroy_pending = True
+
+ def __init__(self, coro, *, loop=None, name=None, context=None,
+ eager_start=False):
+ super().__init__(loop=loop)
+ if self._source_traceback:
+ del self._source_traceback[-1]
+ if not coroutines.iscoroutine(coro):
+ # raise after Future.__init__(), attrs are required for __del__
+ # prevent logging for pending task in __del__
+ self._log_destroy_pending = False
+ raise TypeError(f"a coroutine was expected, got {coro!r}")
+
+ if name is None:
+ self._name = f'Task-{_task_name_counter()}'
+ else:
+ self._name = str(name)
+
+ self._num_cancels_requested = 0
+ self._must_cancel = False
+ self._fut_waiter = None
+ self._coro = coro
+ if context is None:
+ self._context = contextvars.copy_context()
+ else:
+ self._context = context
+
+ if eager_start and self._loop.is_running():
+ self.__eager_start()
+ else:
+ self._loop.call_soon(self.__step, context=self._context)
+ _register_task(self)
+
+ def __del__(self):
+ if self._state == futures._PENDING and self._log_destroy_pending:
+ context = {
+ 'task': self,
+ 'message': 'Task was destroyed but it is pending!',
+ }
+ if self._source_traceback:
+ context['source_traceback'] = self._source_traceback
+ self._loop.call_exception_handler(context)
+ super().__del__()
+
+ __class_getitem__ = classmethod(GenericAlias)
+
+ def __repr__(self):
+ return base_tasks._task_repr(self)
+
+ def get_coro(self):
+ return self._coro
+
+ def get_context(self):
+ return self._context
+
+ def get_name(self):
+ return self._name
+
+ def set_name(self, value):
+ self._name = str(value)
+
+ def set_result(self, result):
+ raise RuntimeError('Task does not support set_result operation')
+
+ def set_exception(self, exception):
+ raise RuntimeError('Task does not support set_exception operation')
+
+ def get_stack(self, *, limit=None):
+ """Return the list of stack frames for this task's coroutine.
+
+ If the coroutine is not done, this returns the stack where it is
+ suspended. If the coroutine has completed successfully or was
+ cancelled, this returns an empty list. If the coroutine was
+ terminated by an exception, this returns the list of traceback
+ frames.
+
+ The frames are always ordered from oldest to newest.
+
+ The optional limit gives the maximum number of frames to
+ return; by default all available frames are returned. Its
+ meaning differs depending on whether a stack or a traceback is
+ returned: the newest frames of a stack are returned, but the
+ oldest frames of a traceback are returned. (This matches the
+ behavior of the traceback module.)
+
+ For reasons beyond our control, only one stack frame is
+ returned for a suspended coroutine.
+ """
+ return base_tasks._task_get_stack(self, limit)
+
+ def print_stack(self, *, limit=None, file=None):
+ """Print the stack or traceback for this task's coroutine.
+
+ This produces output similar to that of the traceback module,
+ for the frames retrieved by get_stack(). The limit argument
+ is passed to get_stack(). The file argument is an I/O stream
+ to which the output is written; by default output is written
+ to sys.stderr.
+ """
+ return base_tasks._task_print_stack(self, limit, file)
+
+ def cancel(self, msg=None):
+ """Request that this task cancel itself.
+
+ This arranges for a CancelledError to be thrown into the
+ wrapped coroutine on the next cycle through the event loop.
+ The coroutine then has a chance to clean up or even deny
+ the request using try/except/finally.
+
+ Unlike Future.cancel, this does not guarantee that the
+ task will be cancelled: the exception might be caught and
+ acted upon, delaying cancellation of the task or preventing
+ cancellation completely. The task may also return a value or
+ raise a different exception.
+
+ Immediately after this method is called, Task.cancelled() will
+ not return True (unless the task was already cancelled). A
+ task will be marked as cancelled when the wrapped coroutine
+ terminates with a CancelledError exception (even if cancel()
+ was not called).
+
+ This also increases the task's count of cancellation requests.
+ """
+ self._log_traceback = False
+ if self.done():
+ return False
+ self._num_cancels_requested += 1
+ # These two lines are controversial. See discussion starting at
+ # https://github.com/python/cpython/pull/31394#issuecomment-1053545331
+ # Also remember that this is duplicated in _asynciomodule.c.
+ # if self._num_cancels_requested > 1:
+ # return False
+ if self._fut_waiter is not None:
+ if self._fut_waiter.cancel(msg=msg):
+ # Leave self._fut_waiter; it may be a Task that
+ # catches and ignores the cancellation so we may have
+ # to cancel it again later.
+ return True
+ # It must be the case that self.__step is already scheduled.
+ self._must_cancel = True
+ self._cancel_message = msg
+ return True
+
+ def cancelling(self):
+ """Return the count of the task's cancellation requests.
+
+ This count is incremented when .cancel() is called
+ and may be decremented using .uncancel().
+ """
+ return self._num_cancels_requested
+
+ def uncancel(self):
+ """Decrement the task's count of cancellation requests.
+
+ This should be called by the party that called `cancel()` on the task
+ beforehand.
+
+ Returns the remaining number of cancellation requests.
+ """
+ if self._num_cancels_requested > 0:
+ self._num_cancels_requested -= 1
+ return self._num_cancels_requested
+
+ def __eager_start(self):
+ prev_task = _swap_current_task(self._loop, self)
+ try:
+ _register_eager_task(self)
+ try:
+ self._context.run(self.__step_run_and_handle_result, None)
+ finally:
+ _unregister_eager_task(self)
+ finally:
+ try:
+ curtask = _swap_current_task(self._loop, prev_task)
+ assert curtask is self
+ finally:
+ if self.done():
+ self._coro = None
+ self = None # Needed to break cycles when an exception occurs.
+ else:
+ _register_task(self)
+
+ def __step(self, exc=None):
+ if self.done():
+ raise exceptions.InvalidStateError(
+ f'_step(): already done: {self!r}, {exc!r}')
+ if self._must_cancel:
+ if not isinstance(exc, exceptions.CancelledError):
+ exc = self._make_cancelled_error()
+ self._must_cancel = False
+ self._fut_waiter = None
+
+ _enter_task(self._loop, self)
+ try:
+ self.__step_run_and_handle_result(exc)
+ finally:
+ _leave_task(self._loop, self)
+ self = None # Needed to break cycles when an exception occurs.
+
+ def __step_run_and_handle_result(self, exc):
+ coro = self._coro
+ try:
+ if exc is None:
+ # We use the `send` method directly, because coroutines
+ # don't have `__iter__` and `__next__` methods.
+ result = coro.send(None)
+ else:
+ result = coro.throw(exc)
+ except StopIteration as exc:
+ if self._must_cancel:
+ # Task is cancelled right before coro stops.
+ self._must_cancel = False
+ super().cancel(msg=self._cancel_message)
+ else:
+ super().set_result(exc.value)
+ except exceptions.CancelledError as exc:
+ # Save the original exception so we can chain it later.
+ self._cancelled_exc = exc
+ super().cancel() # I.e., Future.cancel(self).
+ except (KeyboardInterrupt, SystemExit) as exc:
+ super().set_exception(exc)
+ raise
+ except BaseException as exc:
+ super().set_exception(exc)
+ else:
+ blocking = getattr(result, '_asyncio_future_blocking', None)
+ if blocking is not None:
+ # Yielded Future must come from Future.__iter__().
+ if futures._get_loop(result) is not self._loop:
+ new_exc = RuntimeError(
+ f'Task {self!r} got Future '
+ f'{result!r} attached to a different loop')
+ self._loop.call_soon(
+ self.__step, new_exc, context=self._context)
+ elif blocking:
+ if result is self:
+ new_exc = RuntimeError(
+ f'Task cannot await on itself: {self!r}')
+ self._loop.call_soon(
+ self.__step, new_exc, context=self._context)
+ else:
+ result._asyncio_future_blocking = False
+ result.add_done_callback(
+ self.__wakeup, context=self._context)
+ self._fut_waiter = result
+ if self._must_cancel:
+ if self._fut_waiter.cancel(
+ msg=self._cancel_message):
+ self._must_cancel = False
+ else:
+ new_exc = RuntimeError(
+ f'yield was used instead of yield from '
+ f'in task {self!r} with {result!r}')
+ self._loop.call_soon(
+ self.__step, new_exc, context=self._context)
+
+ elif result is None:
+ # Bare yield relinquishes control for one event loop iteration.
+ self._loop.call_soon(self.__step, context=self._context)
+ elif inspect.isgenerator(result):
+ # Yielding a generator is just wrong.
+ new_exc = RuntimeError(
+ f'yield was used instead of yield from for '
+ f'generator in task {self!r} with {result!r}')
+ self._loop.call_soon(
+ self.__step, new_exc, context=self._context)
+ else:
+ # Yielding something else is an error.
+ new_exc = RuntimeError(f'Task got bad yield: {result!r}')
+ self._loop.call_soon(
+ self.__step, new_exc, context=self._context)
+ finally:
+ self = None # Needed to break cycles when an exception occurs.
+
+ def __wakeup(self, future):
+ try:
+ future.result()
+ except BaseException as exc:
+ # This may also be a cancellation.
+ self.__step(exc)
+ else:
+ # Don't pass the value of `future.result()` explicitly,
+ # as `Future.__iter__` and `Future.__await__` don't need it.
+ # If we call `_step(value, None)` instead of `_step()`,
+ # Python eval loop would use `.send(value)` method call,
+ # instead of `__next__()`, which is slower for futures
+ # that return non-generator iterators from their `__iter__`.
+ self.__step()
+ self = None # Needed to break cycles when an exception occurs.
+
+
+_PyTask = Task
+
+
+try:
+ import _asyncio
+except ImportError:
+ pass
+else:
+ # _CTask is needed for tests.
+ Task = _CTask = _asyncio.Task
+
+
+def create_task(coro, *, name=None, context=None):
+ """Schedule the execution of a coroutine object in a spawn task.
+
+ Return a Task object.
+ """
+ loop = events.get_running_loop()
+ if context is None:
+ # Use legacy API if context is not needed
+ task = loop.create_task(coro)
+ else:
+ task = loop.create_task(coro, context=context)
+
+ _set_task_name(task, name)
+ return task
+
+
+# wait() and as_completed() similar to those in PEP 3148.
+
+FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
+FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
+ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
+
+
+async def wait(fs, *, timeout=None, return_when=ALL_COMPLETED):
+ """Wait for the Futures or Tasks given by fs to complete.
+
+ The fs iterable must not be empty.
+
+ Coroutines will be wrapped in Tasks.
+
+ Returns two sets of Future: (done, pending).
+
+ Usage:
+
+ done, pending = await asyncio.wait(fs)
+
+ Note: This does not raise TimeoutError! Futures that aren't done
+ when the timeout occurs are returned in the second set.
+ """
+ if futures.isfuture(fs) or coroutines.iscoroutine(fs):
+ raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
+ if not fs:
+ raise ValueError('Set of Tasks/Futures is empty.')
+ if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
+ raise ValueError(f'Invalid return_when value: {return_when}')
+
+ fs = set(fs)
+
+ if any(coroutines.iscoroutine(f) for f in fs):
+ raise TypeError("Passing coroutines is forbidden, use tasks explicitly.")
+
+ loop = events.get_running_loop()
+ return await _wait(fs, timeout, return_when, loop)
+
+
+def _release_waiter(waiter, *args):
+ if not waiter.done():
+ waiter.set_result(None)
+
+
+async def wait_for(fut, timeout):
+ """Wait for the single Future or coroutine to complete, with timeout.
+
+ Coroutine will be wrapped in Task.
+
+ Returns result of the Future or coroutine. When a timeout occurs,
+ it cancels the task and raises TimeoutError. To avoid the task
+ cancellation, wrap it in shield().
+
+ If the wait is cancelled, the task is also cancelled.
+
+ If the task supresses the cancellation and returns a value instead,
+ that value is returned.
+
+ This function is a coroutine.
+ """
+ # The special case for timeout <= 0 is for the following case:
+ #
+ # async def test_waitfor():
+ # func_started = False
+ #
+ # async def func():
+ # nonlocal func_started
+ # func_started = True
+ #
+ # try:
+ # await asyncio.wait_for(func(), 0)
+ # except asyncio.TimeoutError:
+ # assert not func_started
+ # else:
+ # assert False
+ #
+ # asyncio.run(test_waitfor())
+
+
+ if timeout is not None and timeout <= 0:
+ fut = ensure_future(fut)
+
+ if fut.done():
+ return fut.result()
+
+ await _cancel_and_wait(fut)
+ try:
+ return fut.result()
+ except exceptions.CancelledError as exc:
+ raise TimeoutError from exc
+
+ async with timeouts.timeout(timeout):
+ return await fut
+
+async def _wait(fs, timeout, return_when, loop):
+ """Internal helper for wait().
+
+ The fs argument must be a collection of Futures.
+ """
+ assert fs, 'Set of Futures is empty.'
+ waiter = loop.create_future()
+ timeout_handle = None
+ if timeout is not None:
+ timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
+ counter = len(fs)
+
+ def _on_completion(f):
+ nonlocal counter
+ counter -= 1
+ if (counter <= 0 or
+ return_when == FIRST_COMPLETED or
+ return_when == FIRST_EXCEPTION and (not f.cancelled() and
+ f.exception() is not None)):
+ if timeout_handle is not None:
+ timeout_handle.cancel()
+ if not waiter.done():
+ waiter.set_result(None)
+
+ for f in fs:
+ f.add_done_callback(_on_completion)
+
+ try:
+ await waiter
+ finally:
+ if timeout_handle is not None:
+ timeout_handle.cancel()
+ for f in fs:
+ f.remove_done_callback(_on_completion)
+
+ done, pending = set(), set()
+ for f in fs:
+ if f.done():
+ done.add(f)
+ else:
+ pending.add(f)
+ return done, pending
+
+
+async def _cancel_and_wait(fut):
+ """Cancel the *fut* future or task and wait until it completes."""
+
+ loop = events.get_running_loop()
+ waiter = loop.create_future()
+ cb = functools.partial(_release_waiter, waiter)
+ fut.add_done_callback(cb)
+
+ try:
+ fut.cancel()
+ # We cannot wait on *fut* directly to make
+ # sure _cancel_and_wait itself is reliably cancellable.
+ await waiter
+ finally:
+ fut.remove_done_callback(cb)
+
+
+# This is *not* a @coroutine! It is just an iterator (yielding Futures).
+def as_completed(fs, *, timeout=None):
+ """Return an iterator whose values are coroutines.
+
+ When waiting for the yielded coroutines you'll get the results (or
+ exceptions!) of the original Futures (or coroutines), in the order
+ in which and as soon as they complete.
+
+ This differs from PEP 3148; the proper way to use this is:
+
+ for f in as_completed(fs):
+ result = await f # The 'await' may raise.
+ # Use result.
+
+ If a timeout is specified, the 'await' will raise
+ TimeoutError when the timeout occurs before all Futures are done.
+
+ Note: The futures 'f' are not necessarily members of fs.
+ """
+ if futures.isfuture(fs) or coroutines.iscoroutine(fs):
+ raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}")
+
+ from .queues import Queue # Import here to avoid circular import problem.
+ done = Queue()
+
+ loop = events.get_event_loop()
+ todo = {ensure_future(f, loop=loop) for f in set(fs)}
+ timeout_handle = None
+
+ def _on_timeout():
+ for f in todo:
+ f.remove_done_callback(_on_completion)
+ done.put_nowait(None) # Queue a dummy value for _wait_for_one().
+ todo.clear() # Can't do todo.remove(f) in the loop.
+
+ def _on_completion(f):
+ if not todo:
+ return # _on_timeout() was here first.
+ todo.remove(f)
+ done.put_nowait(f)
+ if not todo and timeout_handle is not None:
+ timeout_handle.cancel()
+
+ async def _wait_for_one():
+ f = await done.get()
+ if f is None:
+ # Dummy value from _on_timeout().
+ raise exceptions.TimeoutError
+ return f.result() # May raise f.exception().
+
+ for f in todo:
+ f.add_done_callback(_on_completion)
+ if todo and timeout is not None:
+ timeout_handle = loop.call_later(timeout, _on_timeout)
+ for _ in range(len(todo)):
+ yield _wait_for_one()
+
+
+@types.coroutine
+def __sleep0():
+ """Skip one event loop run cycle.
+
+ This is a private helper for 'asyncio.sleep()', used
+ when the 'delay' is set to 0. It uses a bare 'yield'
+ expression (which Task.__step knows how to handle)
+ instead of creating a Future object.
+ """
+ yield
+
+
+async def sleep(delay, result=None):
+ """Coroutine that completes after a given time (in seconds)."""
+ if delay <= 0:
+ await __sleep0()
+ return result
+
+ loop = events.get_running_loop()
+ future = loop.create_future()
+ h = loop.call_later(delay,
+ futures._set_result_unless_cancelled,
+ future, result)
+ try:
+ return await future
+ finally:
+ h.cancel()
+
+
+def ensure_future(coro_or_future, *, loop=None):
+ """Wrap a coroutine or an awaitable in a future.
+
+ If the argument is a Future, it is returned directly.
+ """
+ if futures.isfuture(coro_or_future):
+ if loop is not None and loop is not futures._get_loop(coro_or_future):
+ raise ValueError('The future belongs to a different loop than '
+ 'the one specified as the loop argument')
+ return coro_or_future
+ should_close = True
+ if not coroutines.iscoroutine(coro_or_future):
+ if inspect.isawaitable(coro_or_future):
+ async def _wrap_awaitable(awaitable):
+ return await awaitable
+
+ coro_or_future = _wrap_awaitable(coro_or_future)
+ should_close = False
+ else:
+ raise TypeError('An asyncio.Future, a coroutine or an awaitable '
+ 'is required')
+
+ if loop is None:
+ loop = events.get_event_loop()
+ try:
+ return loop.create_task(coro_or_future)
+ except RuntimeError:
+ if should_close:
+ coro_or_future.close()
+ raise
+
+
+class _GatheringFuture(futures.Future):
+ """Helper for gather().
+
+ This overrides cancel() to cancel all the children and act more
+ like Task.cancel(), which doesn't immediately mark itself as
+ cancelled.
+ """
+
+ def __init__(self, children, *, loop):
+ assert loop is not None
+ super().__init__(loop=loop)
+ self._children = children
+ self._cancel_requested = False
+
+ def cancel(self, msg=None):
+ if self.done():
+ return False
+ ret = False
+ for child in self._children:
+ if child.cancel(msg=msg):
+ ret = True
+ if ret:
+ # If any child tasks were actually cancelled, we should
+ # propagate the cancellation request regardless of
+ # *return_exceptions* argument. See issue 32684.
+ self._cancel_requested = True
+ return ret
+
+
+def gather(*coros_or_futures, return_exceptions=False):
+ """Return a future aggregating results from the given coroutines/futures.
+
+ Coroutines will be wrapped in a future and scheduled in the event
+ loop. They will not necessarily be scheduled in the same order as
+ passed in.
+
+ All futures must share the same event loop. If all the tasks are
+ done successfully, the returned future's result is the list of
+ results (in the order of the original sequence, not necessarily
+ the order of results arrival). If *return_exceptions* is True,
+ exceptions in the tasks are treated the same as successful
+ results, and gathered in the result list; otherwise, the first
+ raised exception will be immediately propagated to the returned
+ future.
+
+ Cancellation: if the outer Future is cancelled, all children (that
+ have not completed yet) are also cancelled. If any child is
+ cancelled, this is treated as if it raised CancelledError --
+ the outer Future is *not* cancelled in this case. (This is to
+ prevent the cancellation of one child to cause other children to
+ be cancelled.)
+
+ If *return_exceptions* is False, cancelling gather() after it
+ has been marked done won't cancel any submitted awaitables.
+ For instance, gather can be marked done after propagating an
+ exception to the caller, therefore, calling ``gather.cancel()``
+ after catching an exception (raised by one of the awaitables) from
+ gather won't cancel any other awaitables.
+ """
+ if not coros_or_futures:
+ loop = events.get_event_loop()
+ outer = loop.create_future()
+ outer.set_result([])
+ return outer
+
+ def _done_callback(fut):
+ nonlocal nfinished
+ nfinished += 1
+
+ if outer is None or outer.done():
+ if not fut.cancelled():
+ # Mark exception retrieved.
+ fut.exception()
+ return
+
+ if not return_exceptions:
+ if fut.cancelled():
+ # Check if 'fut' is cancelled first, as
+ # 'fut.exception()' will *raise* a CancelledError
+ # instead of returning it.
+ exc = fut._make_cancelled_error()
+ outer.set_exception(exc)
+ return
+ else:
+ exc = fut.exception()
+ if exc is not None:
+ outer.set_exception(exc)
+ return
+
+ if nfinished == nfuts:
+ # All futures are done; create a list of results
+ # and set it to the 'outer' future.
+ results = []
+
+ for fut in children:
+ if fut.cancelled():
+ # Check if 'fut' is cancelled first, as 'fut.exception()'
+ # will *raise* a CancelledError instead of returning it.
+ # Also, since we're adding the exception return value
+ # to 'results' instead of raising it, don't bother
+ # setting __context__. This also lets us preserve
+ # calling '_make_cancelled_error()' at most once.
+ res = exceptions.CancelledError(
+ '' if fut._cancel_message is None else
+ fut._cancel_message)
+ else:
+ res = fut.exception()
+ if res is None:
+ res = fut.result()
+ results.append(res)
+
+ if outer._cancel_requested:
+ # If gather is being cancelled we must propagate the
+ # cancellation regardless of *return_exceptions* argument.
+ # See issue 32684.
+ exc = fut._make_cancelled_error()
+ outer.set_exception(exc)
+ else:
+ outer.set_result(results)
+
+ arg_to_fut = {}
+ children = []
+ nfuts = 0
+ nfinished = 0
+ done_futs = []
+ loop = None
+ outer = None # bpo-46672
+ for arg in coros_or_futures:
+ if arg not in arg_to_fut:
+ fut = ensure_future(arg, loop=loop)
+ if loop is None:
+ loop = futures._get_loop(fut)
+ if fut is not arg:
+ # 'arg' was not a Future, therefore, 'fut' is a new
+ # Future created specifically for 'arg'. Since the caller
+ # can't control it, disable the "destroy pending task"
+ # warning.
+ fut._log_destroy_pending = False
+
+ nfuts += 1
+ arg_to_fut[arg] = fut
+ if fut.done():
+ done_futs.append(fut)
+ else:
+ fut.add_done_callback(_done_callback)
+
+ else:
+ # There's a duplicate Future object in coros_or_futures.
+ fut = arg_to_fut[arg]
+
+ children.append(fut)
+
+ outer = _GatheringFuture(children, loop=loop)
+ # Run done callbacks after GatheringFuture created so any post-processing
+ # can be performed at this point
+ # optimization: in the special case that *all* futures finished eagerly,
+ # this will effectively complete the gather eagerly, with the last
+ # callback setting the result (or exception) on outer before returning it
+ for fut in done_futs:
+ _done_callback(fut)
+ return outer
+
+
+def shield(arg):
+ """Wait for a future, shielding it from cancellation.
+
+ The statement
+
+ task = asyncio.create_task(something())
+ res = await shield(task)
+
+ is exactly equivalent to the statement
+
+ res = await something()
+
+ *except* that if the coroutine containing it is cancelled, the
+ task running in something() is not cancelled. From the POV of
+ something(), the cancellation did not happen. But its caller is
+ still cancelled, so the yield-from expression still raises
+ CancelledError. Note: If something() is cancelled by other means
+ this will still cancel shield().
+
+ If you want to completely ignore cancellation (not recommended)
+ you can combine shield() with a try/except clause, as follows:
+
+ task = asyncio.create_task(something())
+ try:
+ res = await shield(task)
+ except CancelledError:
+ res = None
+
+ Save a reference to tasks passed to this function, to avoid
+ a task disappearing mid-execution. The event loop only keeps
+ weak references to tasks. A task that isn't referenced elsewhere
+ may get garbage collected at any time, even before it's done.
+ """
+ inner = ensure_future(arg)
+ if inner.done():
+ # Shortcut.
+ return inner
+ loop = futures._get_loop(inner)
+ outer = loop.create_future()
+
+ def _inner_done_callback(inner):
+ if outer.cancelled():
+ if not inner.cancelled():
+ # Mark inner's result as retrieved.
+ inner.exception()
+ return
+
+ if inner.cancelled():
+ outer.cancel()
+ else:
+ exc = inner.exception()
+ if exc is not None:
+ outer.set_exception(exc)
+ else:
+ outer.set_result(inner.result())
+
+
+ def _outer_done_callback(outer):
+ if not inner.done():
+ inner.remove_done_callback(_inner_done_callback)
+
+ inner.add_done_callback(_inner_done_callback)
+ outer.add_done_callback(_outer_done_callback)
+ return outer
+
+
+def run_coroutine_threadsafe(coro, loop):
+ """Submit a coroutine object to a given event loop.
+
+ Return a concurrent.futures.Future to access the result.
+ """
+ if not coroutines.iscoroutine(coro):
+ raise TypeError('A coroutine object is required')
+ future = concurrent.futures.Future()
+
+ def callback():
+ try:
+ futures._chain_future(ensure_future(coro, loop=loop), future)
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
+ if future.set_running_or_notify_cancel():
+ future.set_exception(exc)
+ raise
+
+ loop.call_soon_threadsafe(callback)
+ return future
+
+
+def create_eager_task_factory(custom_task_constructor):
+ """Create a function suitable for use as a task factory on an event-loop.
+
+ Example usage:
+
+ loop.set_task_factory(
+ asyncio.create_eager_task_factory(my_task_constructor))
+
+ Now, tasks created will be started immediately (rather than being first
+ scheduled to an event loop). The constructor argument can be any callable
+ that returns a Task-compatible object and has a signature compatible
+ with `Task.__init__`; it must have the `eager_start` keyword argument.
+
+ Most applications will use `Task` for `custom_task_constructor` and in
+ this case there's no need to call `create_eager_task_factory()`
+ directly. Instead the global `eager_task_factory` instance can be
+ used. E.g. `loop.set_task_factory(asyncio.eager_task_factory)`.
+ """
+
+ def factory(loop, coro, *, name=None, context=None):
+ return custom_task_constructor(
+ coro, loop=loop, name=name, context=context, eager_start=True)
+
+ return factory
+
+
+eager_task_factory = create_eager_task_factory(Task)
+
+
+# Collectively these two sets hold references to the complete set of active
+# tasks. Eagerly executed tasks use a faster regular set as an optimization
+# but may graduate to a WeakSet if the task blocks on IO.
+_scheduled_tasks = weakref.WeakSet()
+_eager_tasks = set()
+
+# Dictionary containing tasks that are currently active in
+# all running event loops. {EventLoop: Task}
+_current_tasks = {}
+
+
+def _register_task(task):
+ """Register an asyncio Task scheduled to run on an event loop."""
+ _scheduled_tasks.add(task)
+
+
+def _register_eager_task(task):
+ """Register an asyncio Task about to be eagerly executed."""
+ _eager_tasks.add(task)
+
+
+def _enter_task(loop, task):
+ current_task = _current_tasks.get(loop)
+ if current_task is not None:
+ raise RuntimeError(f"Cannot enter into task {task!r} while another "
+ f"task {current_task!r} is being executed.")
+ _current_tasks[loop] = task
+
+
+def _leave_task(loop, task):
+ current_task = _current_tasks.get(loop)
+ if current_task is not task:
+ raise RuntimeError(f"Leaving task {task!r} does not match "
+ f"the current task {current_task!r}.")
+ del _current_tasks[loop]
+
+
+def _swap_current_task(loop, task):
+ prev_task = _current_tasks.get(loop)
+ if task is None:
+ del _current_tasks[loop]
+ else:
+ _current_tasks[loop] = task
+ return prev_task
+
+
+def _unregister_task(task):
+ """Unregister a completed, scheduled Task."""
+ _scheduled_tasks.discard(task)
+
+
+def _unregister_eager_task(task):
+ """Unregister a task which finished its first eager step."""
+ _eager_tasks.discard(task)
+
+
+_py_current_task = current_task
+_py_register_task = _register_task
+_py_register_eager_task = _register_eager_task
+_py_unregister_task = _unregister_task
+_py_unregister_eager_task = _unregister_eager_task
+_py_enter_task = _enter_task
+_py_leave_task = _leave_task
+_py_swap_current_task = _swap_current_task
+
+
+try:
+ from _asyncio import (_register_task, _register_eager_task,
+ _unregister_task, _unregister_eager_task,
+ _enter_task, _leave_task, _swap_current_task,
+ _scheduled_tasks, _eager_tasks, _current_tasks,
+ current_task)
+except ImportError:
+ pass
+else:
+ _c_current_task = current_task
+ _c_register_task = _register_task
+ _c_register_eager_task = _register_eager_task
+ _c_unregister_task = _unregister_task
+ _c_unregister_eager_task = _unregister_eager_task
+ _c_enter_task = _enter_task
+ _c_leave_task = _leave_task
+ _c_swap_current_task = _swap_current_task