diff options
author | shadchin <shadchin@yandex-team.ru> | 2022-02-10 16:44:39 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:44:39 +0300 |
commit | e9656aae26e0358d5378e5b63dcac5c8dbe0e4d0 (patch) | |
tree | 64175d5cadab313b3e7039ebaa06c5bc3295e274 /contrib/tools/python3/src/Lib/asyncio/tasks.py | |
parent | 2598ef1d0aee359b4b6d5fdd1758916d5907d04f (diff) | |
download | ydb-e9656aae26e0358d5378e5b63dcac5c8dbe0e4d0.tar.gz |
Restoring authorship annotation for <shadchin@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'contrib/tools/python3/src/Lib/asyncio/tasks.py')
-rw-r--r-- | contrib/tools/python3/src/Lib/asyncio/tasks.py | 408 |
1 files changed, 204 insertions, 204 deletions
diff --git a/contrib/tools/python3/src/Lib/asyncio/tasks.py b/contrib/tools/python3/src/Lib/asyncio/tasks.py index d378a369ba..27a3c8c5a8 100644 --- a/contrib/tools/python3/src/Lib/asyncio/tasks.py +++ b/contrib/tools/python3/src/Lib/asyncio/tasks.py @@ -13,7 +13,7 @@ import concurrent.futures import contextvars import functools import inspect -import itertools +import itertools import types import warnings import weakref @@ -21,16 +21,16 @@ import weakref from . import base_tasks from . import coroutines from . import events -from . import exceptions +from . import exceptions from . import futures -from .coroutines import _is_coroutine +from .coroutines import _is_coroutine + +# Helper to generate new task names +# This uses itertools.count() instead of a "+= 1" operation because the latter +# is not thread safe. See bpo-11866 for a longer explanation. +_task_name_counter = itertools.count(1).__next__ -# Helper to generate new task names -# This uses itertools.count() instead of a "+= 1" operation because the latter -# is not thread safe. See bpo-11866 for a longer explanation. -_task_name_counter = itertools.count(1).__next__ - def current_task(loop=None): """Return a currently executed task.""" if loop is None: @@ -42,22 +42,22 @@ def all_tasks(loop=None): """Return a set of all tasks for the loop.""" if loop is None: loop = events.get_running_loop() - # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another - # thread while we do so. Therefore we cast it to list prior to filtering. The list - # cast itself requires iteration, so we repeat it several times ignoring - # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for - # details. - i = 0 - while True: - try: - tasks = list(_all_tasks) - except RuntimeError: - i += 1 - if i >= 1000: - raise - else: - break - return {t for t in tasks + # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another + # thread while we do so. Therefore we cast it to list prior to filtering. The list + # cast itself requires iteration, so we repeat it several times ignoring + # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for + # details. + i = 0 + while True: + try: + tasks = list(_all_tasks) + except RuntimeError: + i += 1 + if i >= 1000: + raise + else: + break + return {t for t in tasks if futures._get_loop(t) is loop and not t.done()} @@ -67,34 +67,34 @@ def _all_tasks_compat(loop=None): # method. if loop is None: loop = events.get_event_loop() - # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another - # thread while we do so. Therefore we cast it to list prior to filtering. The list - # cast itself requires iteration, so we repeat it several times ignoring - # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for - # details. - i = 0 - while True: - try: - tasks = list(_all_tasks) - except RuntimeError: - i += 1 - if i >= 1000: - raise - else: - break - return {t for t in tasks if futures._get_loop(t) is loop} - - -def _set_task_name(task, name): - if name is not None: - try: - set_name = task.set_name - except AttributeError: - pass - else: - set_name(name) - - + # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another + # thread while we do so. Therefore we cast it to list prior to filtering. The list + # cast itself requires iteration, so we repeat it several times ignoring + # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for + # details. + i = 0 + while True: + try: + tasks = list(_all_tasks) + except RuntimeError: + i += 1 + if i >= 1000: + raise + else: + break + return {t for t in tasks if futures._get_loop(t) is loop} + + +def _set_task_name(task, name): + if name is not None: + try: + set_name = task.set_name + except AttributeError: + pass + else: + set_name(name) + + class Task(futures._PyFuture): # Inherit Python Task implementation # from a Python Future implementation. @@ -113,7 +113,7 @@ class Task(futures._PyFuture): # Inherit Python Task implementation # status is still pending _log_destroy_pending = True - def __init__(self, coro, *, loop=None, name=None): + def __init__(self, coro, *, loop=None, name=None): super().__init__(loop=loop) if self._source_traceback: del self._source_traceback[-1] @@ -123,11 +123,11 @@ class Task(futures._PyFuture): # Inherit Python Task implementation self._log_destroy_pending = False raise TypeError(f"a coroutine was expected, got {coro!r}") - if name is None: - self._name = f'Task-{_task_name_counter()}' - else: - self._name = str(name) - + if name is None: + self._name = f'Task-{_task_name_counter()}' + else: + self._name = str(name) + self._must_cancel = False self._fut_waiter = None self._coro = coro @@ -147,21 +147,21 @@ class Task(futures._PyFuture): # Inherit Python Task implementation self._loop.call_exception_handler(context) super().__del__() - def __class_getitem__(cls, type): - return cls - + def __class_getitem__(cls, type): + return cls + def _repr_info(self): return base_tasks._task_repr_info(self) - def get_coro(self): - return self._coro - - def get_name(self): - return self._name - - def set_name(self, value): - self._name = str(value) - + def get_coro(self): + return self._coro + + def get_name(self): + return self._name + + def set_name(self, value): + self._name = str(value) + def set_result(self, result): raise RuntimeError('Task does not support set_result operation') @@ -202,7 +202,7 @@ class Task(futures._PyFuture): # Inherit Python Task implementation """ return base_tasks._task_print_stack(self, limit, file) - def cancel(self, msg=None): + def cancel(self, msg=None): """Request that this task cancel itself. This arranges for a CancelledError to be thrown into the @@ -226,23 +226,23 @@ class Task(futures._PyFuture): # Inherit Python Task implementation if self.done(): return False if self._fut_waiter is not None: - if self._fut_waiter.cancel(msg=msg): + if self._fut_waiter.cancel(msg=msg): # Leave self._fut_waiter; it may be a Task that # catches and ignores the cancellation so we may have # to cancel it again later. return True # It must be the case that self.__step is already scheduled. self._must_cancel = True - self._cancel_message = msg + self._cancel_message = msg return True def __step(self, exc=None): if self.done(): - raise exceptions.InvalidStateError( + raise exceptions.InvalidStateError( f'_step(): already done: {self!r}, {exc!r}') if self._must_cancel: - if not isinstance(exc, exceptions.CancelledError): - exc = self._make_cancelled_error() + if not isinstance(exc, exceptions.CancelledError): + exc = self._make_cancelled_error() self._must_cancel = False coro = self._coro self._fut_waiter = None @@ -260,16 +260,16 @@ class Task(futures._PyFuture): # Inherit Python Task implementation if self._must_cancel: # Task is cancelled right before coro stops. self._must_cancel = False - super().cancel(msg=self._cancel_message) + super().cancel(msg=self._cancel_message) else: super().set_result(exc.value) - except exceptions.CancelledError as exc: - # Save the original exception so we can chain it later. - self._cancelled_exc = exc + except exceptions.CancelledError as exc: + # Save the original exception so we can chain it later. + self._cancelled_exc = exc super().cancel() # I.e., Future.cancel(self). - except (KeyboardInterrupt, SystemExit) as exc: + except (KeyboardInterrupt, SystemExit) as exc: super().set_exception(exc) - raise + raise except BaseException as exc: super().set_exception(exc) else: @@ -294,8 +294,8 @@ class Task(futures._PyFuture): # Inherit Python Task implementation self.__wakeup, context=self._context) self._fut_waiter = result if self._must_cancel: - if self._fut_waiter.cancel( - msg=self._cancel_message): + if self._fut_waiter.cancel( + msg=self._cancel_message): self._must_cancel = False else: new_exc = RuntimeError( @@ -326,7 +326,7 @@ class Task(futures._PyFuture): # Inherit Python Task implementation def __wakeup(self, future): try: future.result() - except BaseException as exc: + except BaseException as exc: # This may also be a cancellation. self.__step(exc) else: @@ -352,15 +352,15 @@ else: Task = _CTask = _asyncio.Task -def create_task(coro, *, name=None): +def create_task(coro, *, name=None): """Schedule the execution of a coroutine object in a spawn task. Return a Task object. """ loop = events.get_running_loop() - task = loop.create_task(coro) - _set_task_name(task, name) - return task + task = loop.create_task(coro) + _set_task_name(task, name) + return task # wait() and as_completed() similar to those in PEP 3148. @@ -373,7 +373,7 @@ ALL_COMPLETED = concurrent.futures.ALL_COMPLETED async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED): """Wait for the Futures and coroutines given by fs to complete. - The fs iterable must not be empty. + The fs iterable must not be empty. Coroutines will be wrapped in Tasks. @@ -394,22 +394,22 @@ async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED): raise ValueError(f'Invalid return_when value: {return_when}') if loop is None: - loop = events.get_running_loop() - else: - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) - - fs = set(fs) - - if any(coroutines.iscoroutine(f) for f in fs): - warnings.warn("The explicit passing of coroutine objects to " - "asyncio.wait() is deprecated since Python 3.8, and " - "scheduled for removal in Python 3.11.", - DeprecationWarning, stacklevel=2) - - fs = {ensure_future(f, loop=loop) for f in fs} - + loop = events.get_running_loop() + else: + warnings.warn("The loop argument is deprecated since Python 3.8, " + "and scheduled for removal in Python 3.10.", + DeprecationWarning, stacklevel=2) + + fs = set(fs) + + if any(coroutines.iscoroutine(f) for f in fs): + warnings.warn("The explicit passing of coroutine objects to " + "asyncio.wait() is deprecated since Python 3.8, and " + "scheduled for removal in Python 3.11.", + DeprecationWarning, stacklevel=2) + + fs = {ensure_future(f, loop=loop) for f in fs} + return await _wait(fs, timeout, return_when, loop) @@ -432,11 +432,11 @@ async def wait_for(fut, timeout, *, loop=None): This function is a coroutine. """ if loop is None: - loop = events.get_running_loop() - else: - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) + loop = events.get_running_loop() + else: + warnings.warn("The loop argument is deprecated since Python 3.8, " + "and scheduled for removal in Python 3.10.", + DeprecationWarning, stacklevel=2) if timeout is None: return await fut @@ -447,11 +447,11 @@ async def wait_for(fut, timeout, *, loop=None): if fut.done(): return fut.result() - await _cancel_and_wait(fut, loop=loop) - try: - return fut.result() - except exceptions.CancelledError as exc: - raise exceptions.TimeoutError() from exc + await _cancel_and_wait(fut, loop=loop) + try: + return fut.result() + except exceptions.CancelledError as exc: + raise exceptions.TimeoutError() from exc waiter = loop.create_future() timeout_handle = loop.call_later(timeout, _release_waiter, waiter) @@ -464,16 +464,16 @@ async def wait_for(fut, timeout, *, loop=None): # wait until the future completes or the timeout try: await waiter - except exceptions.CancelledError: - if fut.done(): - return fut.result() - else: - fut.remove_done_callback(cb) - # We must ensure that the task is not running - # after wait_for() returns. - # See https://bugs.python.org/issue32751 - await _cancel_and_wait(fut, loop=loop) - raise + except exceptions.CancelledError: + if fut.done(): + return fut.result() + else: + fut.remove_done_callback(cb) + # We must ensure that the task is not running + # after wait_for() returns. + # See https://bugs.python.org/issue32751 + await _cancel_and_wait(fut, loop=loop) + raise if fut.done(): return fut.result() @@ -483,13 +483,13 @@ async def wait_for(fut, timeout, *, loop=None): # after wait_for() returns. # See https://bugs.python.org/issue32751 await _cancel_and_wait(fut, loop=loop) - # In case task cancellation failed with some - # exception, we should re-raise it - # See https://bugs.python.org/issue40607 - try: - return fut.result() - except exceptions.CancelledError as exc: - raise exceptions.TimeoutError() from exc + # In case task cancellation failed with some + # exception, we should re-raise it + # See https://bugs.python.org/issue40607 + try: + return fut.result() + except exceptions.CancelledError as exc: + raise exceptions.TimeoutError() from exc finally: timeout_handle.cancel() @@ -526,8 +526,8 @@ async def _wait(fs, timeout, return_when, loop): finally: if timeout_handle is not None: timeout_handle.cancel() - for f in fs: - f.remove_done_callback(_on_completion) + for f in fs: + f.remove_done_callback(_on_completion) done, pending = set(), set() for f in fs: @@ -574,19 +574,19 @@ def as_completed(fs, *, loop=None, timeout=None): Note: The futures 'f' are not necessarily members of fs. """ if futures.isfuture(fs) or coroutines.iscoroutine(fs): - raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}") - - if loop is not None: - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) - + raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}") + + if loop is not None: + warnings.warn("The loop argument is deprecated since Python 3.8, " + "and scheduled for removal in Python 3.10.", + DeprecationWarning, stacklevel=2) + from .queues import Queue # Import here to avoid circular import problem. done = Queue(loop=loop) - - if loop is None: - loop = events.get_event_loop() - todo = {ensure_future(f, loop=loop) for f in set(fs)} + + if loop is None: + loop = events.get_event_loop() + todo = {ensure_future(f, loop=loop) for f in set(fs)} timeout_handle = None def _on_timeout(): @@ -607,7 +607,7 @@ def as_completed(fs, *, loop=None, timeout=None): f = await done.get() if f is None: # Dummy value from _on_timeout(). - raise exceptions.TimeoutError + raise exceptions.TimeoutError return f.result() # May raise f.exception(). for f in todo: @@ -632,18 +632,18 @@ def __sleep0(): async def sleep(delay, result=None, *, loop=None): """Coroutine that completes after a given time (in seconds).""" - if loop is not None: - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) - + if loop is not None: + warnings.warn("The loop argument is deprecated since Python 3.8, " + "and scheduled for removal in Python 3.10.", + DeprecationWarning, stacklevel=2) + if delay <= 0: await __sleep0() return result if loop is None: - loop = events.get_running_loop() - + loop = events.get_running_loop() + future = loop.create_future() h = loop.call_later(delay, futures._set_result_unless_cancelled, @@ -668,8 +668,8 @@ def ensure_future(coro_or_future, *, loop=None): return task elif futures.isfuture(coro_or_future): if loop is not None and loop is not futures._get_loop(coro_or_future): - raise ValueError('The future belongs to a different loop than ' - 'the one specified as the loop argument') + raise ValueError('The future belongs to a different loop than ' + 'the one specified as the loop argument') return coro_or_future elif inspect.isawaitable(coro_or_future): return ensure_future(_wrap_awaitable(coro_or_future), loop=loop) @@ -678,7 +678,7 @@ def ensure_future(coro_or_future, *, loop=None): 'required') -@types.coroutine +@types.coroutine def _wrap_awaitable(awaitable): """Helper for asyncio.ensure_future(). @@ -687,9 +687,9 @@ def _wrap_awaitable(awaitable): """ return (yield from awaitable.__await__()) -_wrap_awaitable._is_coroutine = _is_coroutine +_wrap_awaitable._is_coroutine = _is_coroutine + - class _GatheringFuture(futures.Future): """Helper for gather(). @@ -703,12 +703,12 @@ class _GatheringFuture(futures.Future): self._children = children self._cancel_requested = False - def cancel(self, msg=None): + def cancel(self, msg=None): if self.done(): return False ret = False for child in self._children: - if child.cancel(msg=msg): + if child.cancel(msg=msg): ret = True if ret: # If any child tasks were actually cancelled, we should @@ -740,23 +740,23 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False): the outer Future is *not* cancelled in this case. (This is to prevent the cancellation of one child to cause other children to be cancelled.) - - If *return_exceptions* is False, cancelling gather() after it - has been marked done won't cancel any submitted awaitables. - For instance, gather can be marked done after propagating an - exception to the caller, therefore, calling ``gather.cancel()`` - after catching an exception (raised by one of the awaitables) from - gather won't cancel any other awaitables. + + If *return_exceptions* is False, cancelling gather() after it + has been marked done won't cancel any submitted awaitables. + For instance, gather can be marked done after propagating an + exception to the caller, therefore, calling ``gather.cancel()`` + after catching an exception (raised by one of the awaitables) from + gather won't cancel any other awaitables. """ - if loop is not None: - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) - - return _gather(*coros_or_futures, loop=loop, return_exceptions=return_exceptions) - - -def _gather(*coros_or_futures, loop=None, return_exceptions=False): + if loop is not None: + warnings.warn("The loop argument is deprecated since Python 3.8, " + "and scheduled for removal in Python 3.10.", + DeprecationWarning, stacklevel=2) + + return _gather(*coros_or_futures, loop=loop, return_exceptions=return_exceptions) + + +def _gather(*coros_or_futures, loop=None, return_exceptions=False): if not coros_or_futures: if loop is None: loop = events.get_event_loop() @@ -779,7 +779,7 @@ def _gather(*coros_or_futures, loop=None, return_exceptions=False): # Check if 'fut' is cancelled first, as # 'fut.exception()' will *raise* a CancelledError # instead of returning it. - exc = fut._make_cancelled_error() + exc = fut._make_cancelled_error() outer.set_exception(exc) return else: @@ -795,15 +795,15 @@ def _gather(*coros_or_futures, loop=None, return_exceptions=False): for fut in children: if fut.cancelled(): - # Check if 'fut' is cancelled first, as 'fut.exception()' - # will *raise* a CancelledError instead of returning it. - # Also, since we're adding the exception return value - # to 'results' instead of raising it, don't bother - # setting __context__. This also lets us preserve - # calling '_make_cancelled_error()' at most once. - res = exceptions.CancelledError( - '' if fut._cancel_message is None else - fut._cancel_message) + # Check if 'fut' is cancelled first, as 'fut.exception()' + # will *raise* a CancelledError instead of returning it. + # Also, since we're adding the exception return value + # to 'results' instead of raising it, don't bother + # setting __context__. This also lets us preserve + # calling '_make_cancelled_error()' at most once. + res = exceptions.CancelledError( + '' if fut._cancel_message is None else + fut._cancel_message) else: res = fut.exception() if res is None: @@ -814,8 +814,8 @@ def _gather(*coros_or_futures, loop=None, return_exceptions=False): # If gather is being cancelled we must propagate the # cancellation regardless of *return_exceptions* argument. # See issue 32684. - exc = fut._make_cancelled_error() - outer.set_exception(exc) + exc = fut._make_cancelled_error() + outer.set_exception(exc) else: outer.set_result(results) @@ -875,10 +875,10 @@ def shield(arg, *, loop=None): except CancelledError: res = None """ - if loop is not None: - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) + if loop is not None: + warnings.warn("The loop argument is deprecated since Python 3.8, " + "and scheduled for removal in Python 3.10.", + DeprecationWarning, stacklevel=2) inner = ensure_future(arg, loop=loop) if inner.done(): # Shortcut. @@ -886,7 +886,7 @@ def shield(arg, *, loop=None): loop = futures._get_loop(inner) outer = loop.create_future() - def _inner_done_callback(inner): + def _inner_done_callback(inner): if outer.cancelled(): if not inner.cancelled(): # Mark inner's result as retrieved. @@ -902,13 +902,13 @@ def shield(arg, *, loop=None): else: outer.set_result(inner.result()) - - def _outer_done_callback(outer): - if not inner.done(): - inner.remove_done_callback(_inner_done_callback) - - inner.add_done_callback(_inner_done_callback) - outer.add_done_callback(_outer_done_callback) + + def _outer_done_callback(outer): + if not inner.done(): + inner.remove_done_callback(_inner_done_callback) + + inner.add_done_callback(_inner_done_callback) + outer.add_done_callback(_outer_done_callback) return outer @@ -924,9 +924,9 @@ def run_coroutine_threadsafe(coro, loop): def callback(): try: futures._chain_future(ensure_future(coro, loop=loop), future) - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as exc: + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: if future.set_running_or_notify_cancel(): future.set_exception(exc) raise |