diff options
author | thegeorg <thegeorg@yandex-team.com> | 2024-02-19 02:38:52 +0300 |
---|---|---|
committer | thegeorg <thegeorg@yandex-team.com> | 2024-02-19 02:50:43 +0300 |
commit | d96fa07134c06472bfee6718b5cfd1679196fc99 (patch) | |
tree | 31ec344fa9d3ff8dc038692516b6438dfbdb8a2d /contrib/tools/python3/Lib/asyncio/tasks.py | |
parent | 452cf9e068aef7110e35e654c5d47eb80111ef89 (diff) | |
download | ydb-d96fa07134c06472bfee6718b5cfd1679196fc99.tar.gz |
Sync contrib/tools/python3 layout with upstream
* Move src/ subdir contents to the top of the layout
* Rename self-written lib -> lib2 to avoid CaseFolding warning from the VCS
* Regenerate contrib/libs/python proxy-headers accordingly
4ccc62ac1511abcf0fed14ccade38e984e088f1e
Diffstat (limited to 'contrib/tools/python3/Lib/asyncio/tasks.py')
-rw-r--r-- | contrib/tools/python3/Lib/asyncio/tasks.py | 1065 |
1 files changed, 1065 insertions, 0 deletions
diff --git a/contrib/tools/python3/Lib/asyncio/tasks.py b/contrib/tools/python3/Lib/asyncio/tasks.py new file mode 100644 index 0000000000..65f2a6ef80 --- /dev/null +++ b/contrib/tools/python3/Lib/asyncio/tasks.py @@ -0,0 +1,1065 @@ +"""Support for tasks, coroutines and the scheduler.""" + +__all__ = ( + 'Task', 'create_task', + 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED', + 'wait', 'wait_for', 'as_completed', 'sleep', + 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe', + 'current_task', 'all_tasks', + 'create_eager_task_factory', 'eager_task_factory', + '_register_task', '_unregister_task', '_enter_task', '_leave_task', +) + +import concurrent.futures +import contextvars +import functools +import inspect +import itertools +import types +import warnings +import weakref +from types import GenericAlias + +from . import base_tasks +from . import coroutines +from . import events +from . import exceptions +from . import futures +from . import timeouts + +# Helper to generate new task names +# This uses itertools.count() instead of a "+= 1" operation because the latter +# is not thread safe. See bpo-11866 for a longer explanation. +_task_name_counter = itertools.count(1).__next__ + + +def current_task(loop=None): + """Return a currently executed task.""" + if loop is None: + loop = events.get_running_loop() + return _current_tasks.get(loop) + + +def all_tasks(loop=None): + """Return a set of all tasks for the loop.""" + if loop is None: + loop = events.get_running_loop() + # capturing the set of eager tasks first, so if an eager task "graduates" + # to a regular task in another thread, we don't risk missing it. + eager_tasks = list(_eager_tasks) + # Looping over the WeakSet isn't safe as it can be updated from another + # thread, therefore we cast it to list prior to filtering. The list cast + # itself requires iteration, so we repeat it several times ignoring + # RuntimeErrors (which are not very likely to occur). + # See issues 34970 and 36607 for details. + scheduled_tasks = None + i = 0 + while True: + try: + scheduled_tasks = list(_scheduled_tasks) + except RuntimeError: + i += 1 + if i >= 1000: + raise + else: + break + return {t for t in itertools.chain(scheduled_tasks, eager_tasks) + if futures._get_loop(t) is loop and not t.done()} + + +def _set_task_name(task, name): + if name is not None: + try: + set_name = task.set_name + except AttributeError: + warnings.warn("Task.set_name() was added in Python 3.8, " + "the method support will be mandatory for third-party " + "task implementations since 3.13.", + DeprecationWarning, stacklevel=3) + else: + set_name(name) + + +class Task(futures._PyFuture): # Inherit Python Task implementation + # from a Python Future implementation. + + """A coroutine wrapped in a Future.""" + + # An important invariant maintained while a Task not done: + # _fut_waiter is either None or a Future. The Future + # can be either done() or not done(). + # The task can be in any of 3 states: + # + # - 1: _fut_waiter is not None and not _fut_waiter.done(): + # __step() is *not* scheduled and the Task is waiting for _fut_waiter. + # - 2: (_fut_waiter is None or _fut_waiter.done()) and __step() is scheduled: + # the Task is waiting for __step() to be executed. + # - 3: _fut_waiter is None and __step() is *not* scheduled: + # the Task is currently executing (in __step()). + # + # * In state 1, one of the callbacks of __fut_waiter must be __wakeup(). + # * The transition from 1 to 2 happens when _fut_waiter becomes done(), + # as it schedules __wakeup() to be called (which calls __step() so + # we way that __step() is scheduled). + # * It transitions from 2 to 3 when __step() is executed, and it clears + # _fut_waiter to None. + + # If False, don't log a message if the task is destroyed while its + # status is still pending + _log_destroy_pending = True + + def __init__(self, coro, *, loop=None, name=None, context=None, + eager_start=False): + super().__init__(loop=loop) + if self._source_traceback: + del self._source_traceback[-1] + if not coroutines.iscoroutine(coro): + # raise after Future.__init__(), attrs are required for __del__ + # prevent logging for pending task in __del__ + self._log_destroy_pending = False + raise TypeError(f"a coroutine was expected, got {coro!r}") + + if name is None: + self._name = f'Task-{_task_name_counter()}' + else: + self._name = str(name) + + self._num_cancels_requested = 0 + self._must_cancel = False + self._fut_waiter = None + self._coro = coro + if context is None: + self._context = contextvars.copy_context() + else: + self._context = context + + if eager_start and self._loop.is_running(): + self.__eager_start() + else: + self._loop.call_soon(self.__step, context=self._context) + _register_task(self) + + def __del__(self): + if self._state == futures._PENDING and self._log_destroy_pending: + context = { + 'task': self, + 'message': 'Task was destroyed but it is pending!', + } + if self._source_traceback: + context['source_traceback'] = self._source_traceback + self._loop.call_exception_handler(context) + super().__del__() + + __class_getitem__ = classmethod(GenericAlias) + + def __repr__(self): + return base_tasks._task_repr(self) + + def get_coro(self): + return self._coro + + def get_context(self): + return self._context + + def get_name(self): + return self._name + + def set_name(self, value): + self._name = str(value) + + def set_result(self, result): + raise RuntimeError('Task does not support set_result operation') + + def set_exception(self, exception): + raise RuntimeError('Task does not support set_exception operation') + + def get_stack(self, *, limit=None): + """Return the list of stack frames for this task's coroutine. + + If the coroutine is not done, this returns the stack where it is + suspended. If the coroutine has completed successfully or was + cancelled, this returns an empty list. If the coroutine was + terminated by an exception, this returns the list of traceback + frames. + + The frames are always ordered from oldest to newest. + + The optional limit gives the maximum number of frames to + return; by default all available frames are returned. Its + meaning differs depending on whether a stack or a traceback is + returned: the newest frames of a stack are returned, but the + oldest frames of a traceback are returned. (This matches the + behavior of the traceback module.) + + For reasons beyond our control, only one stack frame is + returned for a suspended coroutine. + """ + return base_tasks._task_get_stack(self, limit) + + def print_stack(self, *, limit=None, file=None): + """Print the stack or traceback for this task's coroutine. + + This produces output similar to that of the traceback module, + for the frames retrieved by get_stack(). The limit argument + is passed to get_stack(). The file argument is an I/O stream + to which the output is written; by default output is written + to sys.stderr. + """ + return base_tasks._task_print_stack(self, limit, file) + + def cancel(self, msg=None): + """Request that this task cancel itself. + + This arranges for a CancelledError to be thrown into the + wrapped coroutine on the next cycle through the event loop. + The coroutine then has a chance to clean up or even deny + the request using try/except/finally. + + Unlike Future.cancel, this does not guarantee that the + task will be cancelled: the exception might be caught and + acted upon, delaying cancellation of the task or preventing + cancellation completely. The task may also return a value or + raise a different exception. + + Immediately after this method is called, Task.cancelled() will + not return True (unless the task was already cancelled). A + task will be marked as cancelled when the wrapped coroutine + terminates with a CancelledError exception (even if cancel() + was not called). + + This also increases the task's count of cancellation requests. + """ + self._log_traceback = False + if self.done(): + return False + self._num_cancels_requested += 1 + # These two lines are controversial. See discussion starting at + # https://github.com/python/cpython/pull/31394#issuecomment-1053545331 + # Also remember that this is duplicated in _asynciomodule.c. + # if self._num_cancels_requested > 1: + # return False + if self._fut_waiter is not None: + if self._fut_waiter.cancel(msg=msg): + # Leave self._fut_waiter; it may be a Task that + # catches and ignores the cancellation so we may have + # to cancel it again later. + return True + # It must be the case that self.__step is already scheduled. + self._must_cancel = True + self._cancel_message = msg + return True + + def cancelling(self): + """Return the count of the task's cancellation requests. + + This count is incremented when .cancel() is called + and may be decremented using .uncancel(). + """ + return self._num_cancels_requested + + def uncancel(self): + """Decrement the task's count of cancellation requests. + + This should be called by the party that called `cancel()` on the task + beforehand. + + Returns the remaining number of cancellation requests. + """ + if self._num_cancels_requested > 0: + self._num_cancels_requested -= 1 + return self._num_cancels_requested + + def __eager_start(self): + prev_task = _swap_current_task(self._loop, self) + try: + _register_eager_task(self) + try: + self._context.run(self.__step_run_and_handle_result, None) + finally: + _unregister_eager_task(self) + finally: + try: + curtask = _swap_current_task(self._loop, prev_task) + assert curtask is self + finally: + if self.done(): + self._coro = None + self = None # Needed to break cycles when an exception occurs. + else: + _register_task(self) + + def __step(self, exc=None): + if self.done(): + raise exceptions.InvalidStateError( + f'_step(): already done: {self!r}, {exc!r}') + if self._must_cancel: + if not isinstance(exc, exceptions.CancelledError): + exc = self._make_cancelled_error() + self._must_cancel = False + self._fut_waiter = None + + _enter_task(self._loop, self) + try: + self.__step_run_and_handle_result(exc) + finally: + _leave_task(self._loop, self) + self = None # Needed to break cycles when an exception occurs. + + def __step_run_and_handle_result(self, exc): + coro = self._coro + try: + if exc is None: + # We use the `send` method directly, because coroutines + # don't have `__iter__` and `__next__` methods. + result = coro.send(None) + else: + result = coro.throw(exc) + except StopIteration as exc: + if self._must_cancel: + # Task is cancelled right before coro stops. + self._must_cancel = False + super().cancel(msg=self._cancel_message) + else: + super().set_result(exc.value) + except exceptions.CancelledError as exc: + # Save the original exception so we can chain it later. + self._cancelled_exc = exc + super().cancel() # I.e., Future.cancel(self). + except (KeyboardInterrupt, SystemExit) as exc: + super().set_exception(exc) + raise + except BaseException as exc: + super().set_exception(exc) + else: + blocking = getattr(result, '_asyncio_future_blocking', None) + if blocking is not None: + # Yielded Future must come from Future.__iter__(). + if futures._get_loop(result) is not self._loop: + new_exc = RuntimeError( + f'Task {self!r} got Future ' + f'{result!r} attached to a different loop') + self._loop.call_soon( + self.__step, new_exc, context=self._context) + elif blocking: + if result is self: + new_exc = RuntimeError( + f'Task cannot await on itself: {self!r}') + self._loop.call_soon( + self.__step, new_exc, context=self._context) + else: + result._asyncio_future_blocking = False + result.add_done_callback( + self.__wakeup, context=self._context) + self._fut_waiter = result + if self._must_cancel: + if self._fut_waiter.cancel( + msg=self._cancel_message): + self._must_cancel = False + else: + new_exc = RuntimeError( + f'yield was used instead of yield from ' + f'in task {self!r} with {result!r}') + self._loop.call_soon( + self.__step, new_exc, context=self._context) + + elif result is None: + # Bare yield relinquishes control for one event loop iteration. + self._loop.call_soon(self.__step, context=self._context) + elif inspect.isgenerator(result): + # Yielding a generator is just wrong. + new_exc = RuntimeError( + f'yield was used instead of yield from for ' + f'generator in task {self!r} with {result!r}') + self._loop.call_soon( + self.__step, new_exc, context=self._context) + else: + # Yielding something else is an error. + new_exc = RuntimeError(f'Task got bad yield: {result!r}') + self._loop.call_soon( + self.__step, new_exc, context=self._context) + finally: + self = None # Needed to break cycles when an exception occurs. + + def __wakeup(self, future): + try: + future.result() + except BaseException as exc: + # This may also be a cancellation. + self.__step(exc) + else: + # Don't pass the value of `future.result()` explicitly, + # as `Future.__iter__` and `Future.__await__` don't need it. + # If we call `_step(value, None)` instead of `_step()`, + # Python eval loop would use `.send(value)` method call, + # instead of `__next__()`, which is slower for futures + # that return non-generator iterators from their `__iter__`. + self.__step() + self = None # Needed to break cycles when an exception occurs. + + +_PyTask = Task + + +try: + import _asyncio +except ImportError: + pass +else: + # _CTask is needed for tests. + Task = _CTask = _asyncio.Task + + +def create_task(coro, *, name=None, context=None): + """Schedule the execution of a coroutine object in a spawn task. + + Return a Task object. + """ + loop = events.get_running_loop() + if context is None: + # Use legacy API if context is not needed + task = loop.create_task(coro) + else: + task = loop.create_task(coro, context=context) + + _set_task_name(task, name) + return task + + +# wait() and as_completed() similar to those in PEP 3148. + +FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED +FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION +ALL_COMPLETED = concurrent.futures.ALL_COMPLETED + + +async def wait(fs, *, timeout=None, return_when=ALL_COMPLETED): + """Wait for the Futures or Tasks given by fs to complete. + + The fs iterable must not be empty. + + Coroutines will be wrapped in Tasks. + + Returns two sets of Future: (done, pending). + + Usage: + + done, pending = await asyncio.wait(fs) + + Note: This does not raise TimeoutError! Futures that aren't done + when the timeout occurs are returned in the second set. + """ + if futures.isfuture(fs) or coroutines.iscoroutine(fs): + raise TypeError(f"expect a list of futures, not {type(fs).__name__}") + if not fs: + raise ValueError('Set of Tasks/Futures is empty.') + if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED): + raise ValueError(f'Invalid return_when value: {return_when}') + + fs = set(fs) + + if any(coroutines.iscoroutine(f) for f in fs): + raise TypeError("Passing coroutines is forbidden, use tasks explicitly.") + + loop = events.get_running_loop() + return await _wait(fs, timeout, return_when, loop) + + +def _release_waiter(waiter, *args): + if not waiter.done(): + waiter.set_result(None) + + +async def wait_for(fut, timeout): + """Wait for the single Future or coroutine to complete, with timeout. + + Coroutine will be wrapped in Task. + + Returns result of the Future or coroutine. When a timeout occurs, + it cancels the task and raises TimeoutError. To avoid the task + cancellation, wrap it in shield(). + + If the wait is cancelled, the task is also cancelled. + + If the task supresses the cancellation and returns a value instead, + that value is returned. + + This function is a coroutine. + """ + # The special case for timeout <= 0 is for the following case: + # + # async def test_waitfor(): + # func_started = False + # + # async def func(): + # nonlocal func_started + # func_started = True + # + # try: + # await asyncio.wait_for(func(), 0) + # except asyncio.TimeoutError: + # assert not func_started + # else: + # assert False + # + # asyncio.run(test_waitfor()) + + + if timeout is not None and timeout <= 0: + fut = ensure_future(fut) + + if fut.done(): + return fut.result() + + await _cancel_and_wait(fut) + try: + return fut.result() + except exceptions.CancelledError as exc: + raise TimeoutError from exc + + async with timeouts.timeout(timeout): + return await fut + +async def _wait(fs, timeout, return_when, loop): + """Internal helper for wait(). + + The fs argument must be a collection of Futures. + """ + assert fs, 'Set of Futures is empty.' + waiter = loop.create_future() + timeout_handle = None + if timeout is not None: + timeout_handle = loop.call_later(timeout, _release_waiter, waiter) + counter = len(fs) + + def _on_completion(f): + nonlocal counter + counter -= 1 + if (counter <= 0 or + return_when == FIRST_COMPLETED or + return_when == FIRST_EXCEPTION and (not f.cancelled() and + f.exception() is not None)): + if timeout_handle is not None: + timeout_handle.cancel() + if not waiter.done(): + waiter.set_result(None) + + for f in fs: + f.add_done_callback(_on_completion) + + try: + await waiter + finally: + if timeout_handle is not None: + timeout_handle.cancel() + for f in fs: + f.remove_done_callback(_on_completion) + + done, pending = set(), set() + for f in fs: + if f.done(): + done.add(f) + else: + pending.add(f) + return done, pending + + +async def _cancel_and_wait(fut): + """Cancel the *fut* future or task and wait until it completes.""" + + loop = events.get_running_loop() + waiter = loop.create_future() + cb = functools.partial(_release_waiter, waiter) + fut.add_done_callback(cb) + + try: + fut.cancel() + # We cannot wait on *fut* directly to make + # sure _cancel_and_wait itself is reliably cancellable. + await waiter + finally: + fut.remove_done_callback(cb) + + +# This is *not* a @coroutine! It is just an iterator (yielding Futures). +def as_completed(fs, *, timeout=None): + """Return an iterator whose values are coroutines. + + When waiting for the yielded coroutines you'll get the results (or + exceptions!) of the original Futures (or coroutines), in the order + in which and as soon as they complete. + + This differs from PEP 3148; the proper way to use this is: + + for f in as_completed(fs): + result = await f # The 'await' may raise. + # Use result. + + If a timeout is specified, the 'await' will raise + TimeoutError when the timeout occurs before all Futures are done. + + Note: The futures 'f' are not necessarily members of fs. + """ + if futures.isfuture(fs) or coroutines.iscoroutine(fs): + raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}") + + from .queues import Queue # Import here to avoid circular import problem. + done = Queue() + + loop = events.get_event_loop() + todo = {ensure_future(f, loop=loop) for f in set(fs)} + timeout_handle = None + + def _on_timeout(): + for f in todo: + f.remove_done_callback(_on_completion) + done.put_nowait(None) # Queue a dummy value for _wait_for_one(). + todo.clear() # Can't do todo.remove(f) in the loop. + + def _on_completion(f): + if not todo: + return # _on_timeout() was here first. + todo.remove(f) + done.put_nowait(f) + if not todo and timeout_handle is not None: + timeout_handle.cancel() + + async def _wait_for_one(): + f = await done.get() + if f is None: + # Dummy value from _on_timeout(). + raise exceptions.TimeoutError + return f.result() # May raise f.exception(). + + for f in todo: + f.add_done_callback(_on_completion) + if todo and timeout is not None: + timeout_handle = loop.call_later(timeout, _on_timeout) + for _ in range(len(todo)): + yield _wait_for_one() + + +@types.coroutine +def __sleep0(): + """Skip one event loop run cycle. + + This is a private helper for 'asyncio.sleep()', used + when the 'delay' is set to 0. It uses a bare 'yield' + expression (which Task.__step knows how to handle) + instead of creating a Future object. + """ + yield + + +async def sleep(delay, result=None): + """Coroutine that completes after a given time (in seconds).""" + if delay <= 0: + await __sleep0() + return result + + loop = events.get_running_loop() + future = loop.create_future() + h = loop.call_later(delay, + futures._set_result_unless_cancelled, + future, result) + try: + return await future + finally: + h.cancel() + + +def ensure_future(coro_or_future, *, loop=None): + """Wrap a coroutine or an awaitable in a future. + + If the argument is a Future, it is returned directly. + """ + if futures.isfuture(coro_or_future): + if loop is not None and loop is not futures._get_loop(coro_or_future): + raise ValueError('The future belongs to a different loop than ' + 'the one specified as the loop argument') + return coro_or_future + should_close = True + if not coroutines.iscoroutine(coro_or_future): + if inspect.isawaitable(coro_or_future): + async def _wrap_awaitable(awaitable): + return await awaitable + + coro_or_future = _wrap_awaitable(coro_or_future) + should_close = False + else: + raise TypeError('An asyncio.Future, a coroutine or an awaitable ' + 'is required') + + if loop is None: + loop = events.get_event_loop() + try: + return loop.create_task(coro_or_future) + except RuntimeError: + if should_close: + coro_or_future.close() + raise + + +class _GatheringFuture(futures.Future): + """Helper for gather(). + + This overrides cancel() to cancel all the children and act more + like Task.cancel(), which doesn't immediately mark itself as + cancelled. + """ + + def __init__(self, children, *, loop): + assert loop is not None + super().__init__(loop=loop) + self._children = children + self._cancel_requested = False + + def cancel(self, msg=None): + if self.done(): + return False + ret = False + for child in self._children: + if child.cancel(msg=msg): + ret = True + if ret: + # If any child tasks were actually cancelled, we should + # propagate the cancellation request regardless of + # *return_exceptions* argument. See issue 32684. + self._cancel_requested = True + return ret + + +def gather(*coros_or_futures, return_exceptions=False): + """Return a future aggregating results from the given coroutines/futures. + + Coroutines will be wrapped in a future and scheduled in the event + loop. They will not necessarily be scheduled in the same order as + passed in. + + All futures must share the same event loop. If all the tasks are + done successfully, the returned future's result is the list of + results (in the order of the original sequence, not necessarily + the order of results arrival). If *return_exceptions* is True, + exceptions in the tasks are treated the same as successful + results, and gathered in the result list; otherwise, the first + raised exception will be immediately propagated to the returned + future. + + Cancellation: if the outer Future is cancelled, all children (that + have not completed yet) are also cancelled. If any child is + cancelled, this is treated as if it raised CancelledError -- + the outer Future is *not* cancelled in this case. (This is to + prevent the cancellation of one child to cause other children to + be cancelled.) + + If *return_exceptions* is False, cancelling gather() after it + has been marked done won't cancel any submitted awaitables. + For instance, gather can be marked done after propagating an + exception to the caller, therefore, calling ``gather.cancel()`` + after catching an exception (raised by one of the awaitables) from + gather won't cancel any other awaitables. + """ + if not coros_or_futures: + loop = events.get_event_loop() + outer = loop.create_future() + outer.set_result([]) + return outer + + def _done_callback(fut): + nonlocal nfinished + nfinished += 1 + + if outer is None or outer.done(): + if not fut.cancelled(): + # Mark exception retrieved. + fut.exception() + return + + if not return_exceptions: + if fut.cancelled(): + # Check if 'fut' is cancelled first, as + # 'fut.exception()' will *raise* a CancelledError + # instead of returning it. + exc = fut._make_cancelled_error() + outer.set_exception(exc) + return + else: + exc = fut.exception() + if exc is not None: + outer.set_exception(exc) + return + + if nfinished == nfuts: + # All futures are done; create a list of results + # and set it to the 'outer' future. + results = [] + + for fut in children: + if fut.cancelled(): + # Check if 'fut' is cancelled first, as 'fut.exception()' + # will *raise* a CancelledError instead of returning it. + # Also, since we're adding the exception return value + # to 'results' instead of raising it, don't bother + # setting __context__. This also lets us preserve + # calling '_make_cancelled_error()' at most once. + res = exceptions.CancelledError( + '' if fut._cancel_message is None else + fut._cancel_message) + else: + res = fut.exception() + if res is None: + res = fut.result() + results.append(res) + + if outer._cancel_requested: + # If gather is being cancelled we must propagate the + # cancellation regardless of *return_exceptions* argument. + # See issue 32684. + exc = fut._make_cancelled_error() + outer.set_exception(exc) + else: + outer.set_result(results) + + arg_to_fut = {} + children = [] + nfuts = 0 + nfinished = 0 + done_futs = [] + loop = None + outer = None # bpo-46672 + for arg in coros_or_futures: + if arg not in arg_to_fut: + fut = ensure_future(arg, loop=loop) + if loop is None: + loop = futures._get_loop(fut) + if fut is not arg: + # 'arg' was not a Future, therefore, 'fut' is a new + # Future created specifically for 'arg'. Since the caller + # can't control it, disable the "destroy pending task" + # warning. + fut._log_destroy_pending = False + + nfuts += 1 + arg_to_fut[arg] = fut + if fut.done(): + done_futs.append(fut) + else: + fut.add_done_callback(_done_callback) + + else: + # There's a duplicate Future object in coros_or_futures. + fut = arg_to_fut[arg] + + children.append(fut) + + outer = _GatheringFuture(children, loop=loop) + # Run done callbacks after GatheringFuture created so any post-processing + # can be performed at this point + # optimization: in the special case that *all* futures finished eagerly, + # this will effectively complete the gather eagerly, with the last + # callback setting the result (or exception) on outer before returning it + for fut in done_futs: + _done_callback(fut) + return outer + + +def shield(arg): + """Wait for a future, shielding it from cancellation. + + The statement + + task = asyncio.create_task(something()) + res = await shield(task) + + is exactly equivalent to the statement + + res = await something() + + *except* that if the coroutine containing it is cancelled, the + task running in something() is not cancelled. From the POV of + something(), the cancellation did not happen. But its caller is + still cancelled, so the yield-from expression still raises + CancelledError. Note: If something() is cancelled by other means + this will still cancel shield(). + + If you want to completely ignore cancellation (not recommended) + you can combine shield() with a try/except clause, as follows: + + task = asyncio.create_task(something()) + try: + res = await shield(task) + except CancelledError: + res = None + + Save a reference to tasks passed to this function, to avoid + a task disappearing mid-execution. The event loop only keeps + weak references to tasks. A task that isn't referenced elsewhere + may get garbage collected at any time, even before it's done. + """ + inner = ensure_future(arg) + if inner.done(): + # Shortcut. + return inner + loop = futures._get_loop(inner) + outer = loop.create_future() + + def _inner_done_callback(inner): + if outer.cancelled(): + if not inner.cancelled(): + # Mark inner's result as retrieved. + inner.exception() + return + + if inner.cancelled(): + outer.cancel() + else: + exc = inner.exception() + if exc is not None: + outer.set_exception(exc) + else: + outer.set_result(inner.result()) + + + def _outer_done_callback(outer): + if not inner.done(): + inner.remove_done_callback(_inner_done_callback) + + inner.add_done_callback(_inner_done_callback) + outer.add_done_callback(_outer_done_callback) + return outer + + +def run_coroutine_threadsafe(coro, loop): + """Submit a coroutine object to a given event loop. + + Return a concurrent.futures.Future to access the result. + """ + if not coroutines.iscoroutine(coro): + raise TypeError('A coroutine object is required') + future = concurrent.futures.Future() + + def callback(): + try: + futures._chain_future(ensure_future(coro, loop=loop), future) + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: + if future.set_running_or_notify_cancel(): + future.set_exception(exc) + raise + + loop.call_soon_threadsafe(callback) + return future + + +def create_eager_task_factory(custom_task_constructor): + """Create a function suitable for use as a task factory on an event-loop. + + Example usage: + + loop.set_task_factory( + asyncio.create_eager_task_factory(my_task_constructor)) + + Now, tasks created will be started immediately (rather than being first + scheduled to an event loop). The constructor argument can be any callable + that returns a Task-compatible object and has a signature compatible + with `Task.__init__`; it must have the `eager_start` keyword argument. + + Most applications will use `Task` for `custom_task_constructor` and in + this case there's no need to call `create_eager_task_factory()` + directly. Instead the global `eager_task_factory` instance can be + used. E.g. `loop.set_task_factory(asyncio.eager_task_factory)`. + """ + + def factory(loop, coro, *, name=None, context=None): + return custom_task_constructor( + coro, loop=loop, name=name, context=context, eager_start=True) + + return factory + + +eager_task_factory = create_eager_task_factory(Task) + + +# Collectively these two sets hold references to the complete set of active +# tasks. Eagerly executed tasks use a faster regular set as an optimization +# but may graduate to a WeakSet if the task blocks on IO. +_scheduled_tasks = weakref.WeakSet() +_eager_tasks = set() + +# Dictionary containing tasks that are currently active in +# all running event loops. {EventLoop: Task} +_current_tasks = {} + + +def _register_task(task): + """Register an asyncio Task scheduled to run on an event loop.""" + _scheduled_tasks.add(task) + + +def _register_eager_task(task): + """Register an asyncio Task about to be eagerly executed.""" + _eager_tasks.add(task) + + +def _enter_task(loop, task): + current_task = _current_tasks.get(loop) + if current_task is not None: + raise RuntimeError(f"Cannot enter into task {task!r} while another " + f"task {current_task!r} is being executed.") + _current_tasks[loop] = task + + +def _leave_task(loop, task): + current_task = _current_tasks.get(loop) + if current_task is not task: + raise RuntimeError(f"Leaving task {task!r} does not match " + f"the current task {current_task!r}.") + del _current_tasks[loop] + + +def _swap_current_task(loop, task): + prev_task = _current_tasks.get(loop) + if task is None: + del _current_tasks[loop] + else: + _current_tasks[loop] = task + return prev_task + + +def _unregister_task(task): + """Unregister a completed, scheduled Task.""" + _scheduled_tasks.discard(task) + + +def _unregister_eager_task(task): + """Unregister a task which finished its first eager step.""" + _eager_tasks.discard(task) + + +_py_current_task = current_task +_py_register_task = _register_task +_py_register_eager_task = _register_eager_task +_py_unregister_task = _unregister_task +_py_unregister_eager_task = _unregister_eager_task +_py_enter_task = _enter_task +_py_leave_task = _leave_task +_py_swap_current_task = _swap_current_task + + +try: + from _asyncio import (_register_task, _register_eager_task, + _unregister_task, _unregister_eager_task, + _enter_task, _leave_task, _swap_current_task, + _scheduled_tasks, _eager_tasks, _current_tasks, + current_task) +except ImportError: + pass +else: + _c_current_task = current_task + _c_register_task = _register_task + _c_register_eager_task = _register_eager_task + _c_unregister_task = _unregister_task + _c_unregister_eager_task = _unregister_eager_task + _c_enter_task = _enter_task + _c_leave_task = _leave_task + _c_swap_current_task = _swap_current_task |