aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/tools/python3/src/Lib/asyncio/tasks.py
diff options
context:
space:
mode:
authorshadchin <shadchin@yandex-team.ru>2022-02-10 16:44:30 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:44:30 +0300
commit2598ef1d0aee359b4b6d5fdd1758916d5907d04f (patch)
tree012bb94d777798f1f56ac1cec429509766d05181 /contrib/tools/python3/src/Lib/asyncio/tasks.py
parent6751af0b0c1b952fede40b19b71da8025b5d8bcf (diff)
downloadydb-2598ef1d0aee359b4b6d5fdd1758916d5907d04f.tar.gz
Restoring authorship annotation for <shadchin@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/tools/python3/src/Lib/asyncio/tasks.py')
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/tasks.py408
1 files changed, 204 insertions, 204 deletions
diff --git a/contrib/tools/python3/src/Lib/asyncio/tasks.py b/contrib/tools/python3/src/Lib/asyncio/tasks.py
index 27a3c8c5a8..d378a369ba 100644
--- a/contrib/tools/python3/src/Lib/asyncio/tasks.py
+++ b/contrib/tools/python3/src/Lib/asyncio/tasks.py
@@ -13,7 +13,7 @@ import concurrent.futures
import contextvars
import functools
import inspect
-import itertools
+import itertools
import types
import warnings
import weakref
@@ -21,16 +21,16 @@ import weakref
from . import base_tasks
from . import coroutines
from . import events
-from . import exceptions
+from . import exceptions
from . import futures
-from .coroutines import _is_coroutine
-
-# Helper to generate new task names
-# This uses itertools.count() instead of a "+= 1" operation because the latter
-# is not thread safe. See bpo-11866 for a longer explanation.
-_task_name_counter = itertools.count(1).__next__
+from .coroutines import _is_coroutine
+# Helper to generate new task names
+# This uses itertools.count() instead of a "+= 1" operation because the latter
+# is not thread safe. See bpo-11866 for a longer explanation.
+_task_name_counter = itertools.count(1).__next__
+
def current_task(loop=None):
"""Return a currently executed task."""
if loop is None:
@@ -42,22 +42,22 @@ def all_tasks(loop=None):
"""Return a set of all tasks for the loop."""
if loop is None:
loop = events.get_running_loop()
- # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
- # thread while we do so. Therefore we cast it to list prior to filtering. The list
- # cast itself requires iteration, so we repeat it several times ignoring
- # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
- # details.
- i = 0
- while True:
- try:
- tasks = list(_all_tasks)
- except RuntimeError:
- i += 1
- if i >= 1000:
- raise
- else:
- break
- return {t for t in tasks
+ # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
+ # thread while we do so. Therefore we cast it to list prior to filtering. The list
+ # cast itself requires iteration, so we repeat it several times ignoring
+ # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
+ # details.
+ i = 0
+ while True:
+ try:
+ tasks = list(_all_tasks)
+ except RuntimeError:
+ i += 1
+ if i >= 1000:
+ raise
+ else:
+ break
+ return {t for t in tasks
if futures._get_loop(t) is loop and not t.done()}
@@ -67,34 +67,34 @@ def _all_tasks_compat(loop=None):
# method.
if loop is None:
loop = events.get_event_loop()
- # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
- # thread while we do so. Therefore we cast it to list prior to filtering. The list
- # cast itself requires iteration, so we repeat it several times ignoring
- # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
- # details.
- i = 0
- while True:
- try:
- tasks = list(_all_tasks)
- except RuntimeError:
- i += 1
- if i >= 1000:
- raise
- else:
- break
- return {t for t in tasks if futures._get_loop(t) is loop}
-
-
-def _set_task_name(task, name):
- if name is not None:
- try:
- set_name = task.set_name
- except AttributeError:
- pass
- else:
- set_name(name)
-
-
+ # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
+ # thread while we do so. Therefore we cast it to list prior to filtering. The list
+ # cast itself requires iteration, so we repeat it several times ignoring
+ # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
+ # details.
+ i = 0
+ while True:
+ try:
+ tasks = list(_all_tasks)
+ except RuntimeError:
+ i += 1
+ if i >= 1000:
+ raise
+ else:
+ break
+ return {t for t in tasks if futures._get_loop(t) is loop}
+
+
+def _set_task_name(task, name):
+ if name is not None:
+ try:
+ set_name = task.set_name
+ except AttributeError:
+ pass
+ else:
+ set_name(name)
+
+
class Task(futures._PyFuture): # Inherit Python Task implementation
# from a Python Future implementation.
@@ -113,7 +113,7 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
# status is still pending
_log_destroy_pending = True
- def __init__(self, coro, *, loop=None, name=None):
+ def __init__(self, coro, *, loop=None, name=None):
super().__init__(loop=loop)
if self._source_traceback:
del self._source_traceback[-1]
@@ -123,11 +123,11 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
self._log_destroy_pending = False
raise TypeError(f"a coroutine was expected, got {coro!r}")
- if name is None:
- self._name = f'Task-{_task_name_counter()}'
- else:
- self._name = str(name)
-
+ if name is None:
+ self._name = f'Task-{_task_name_counter()}'
+ else:
+ self._name = str(name)
+
self._must_cancel = False
self._fut_waiter = None
self._coro = coro
@@ -147,21 +147,21 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
self._loop.call_exception_handler(context)
super().__del__()
- def __class_getitem__(cls, type):
- return cls
-
+ def __class_getitem__(cls, type):
+ return cls
+
def _repr_info(self):
return base_tasks._task_repr_info(self)
- def get_coro(self):
- return self._coro
-
- def get_name(self):
- return self._name
-
- def set_name(self, value):
- self._name = str(value)
-
+ def get_coro(self):
+ return self._coro
+
+ def get_name(self):
+ return self._name
+
+ def set_name(self, value):
+ self._name = str(value)
+
def set_result(self, result):
raise RuntimeError('Task does not support set_result operation')
@@ -202,7 +202,7 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
"""
return base_tasks._task_print_stack(self, limit, file)
- def cancel(self, msg=None):
+ def cancel(self, msg=None):
"""Request that this task cancel itself.
This arranges for a CancelledError to be thrown into the
@@ -226,23 +226,23 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
if self.done():
return False
if self._fut_waiter is not None:
- if self._fut_waiter.cancel(msg=msg):
+ if self._fut_waiter.cancel(msg=msg):
# Leave self._fut_waiter; it may be a Task that
# catches and ignores the cancellation so we may have
# to cancel it again later.
return True
# It must be the case that self.__step is already scheduled.
self._must_cancel = True
- self._cancel_message = msg
+ self._cancel_message = msg
return True
def __step(self, exc=None):
if self.done():
- raise exceptions.InvalidStateError(
+ raise exceptions.InvalidStateError(
f'_step(): already done: {self!r}, {exc!r}')
if self._must_cancel:
- if not isinstance(exc, exceptions.CancelledError):
- exc = self._make_cancelled_error()
+ if not isinstance(exc, exceptions.CancelledError):
+ exc = self._make_cancelled_error()
self._must_cancel = False
coro = self._coro
self._fut_waiter = None
@@ -260,16 +260,16 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
if self._must_cancel:
# Task is cancelled right before coro stops.
self._must_cancel = False
- super().cancel(msg=self._cancel_message)
+ super().cancel(msg=self._cancel_message)
else:
super().set_result(exc.value)
- except exceptions.CancelledError as exc:
- # Save the original exception so we can chain it later.
- self._cancelled_exc = exc
+ except exceptions.CancelledError as exc:
+ # Save the original exception so we can chain it later.
+ self._cancelled_exc = exc
super().cancel() # I.e., Future.cancel(self).
- except (KeyboardInterrupt, SystemExit) as exc:
+ except (KeyboardInterrupt, SystemExit) as exc:
super().set_exception(exc)
- raise
+ raise
except BaseException as exc:
super().set_exception(exc)
else:
@@ -294,8 +294,8 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
self.__wakeup, context=self._context)
self._fut_waiter = result
if self._must_cancel:
- if self._fut_waiter.cancel(
- msg=self._cancel_message):
+ if self._fut_waiter.cancel(
+ msg=self._cancel_message):
self._must_cancel = False
else:
new_exc = RuntimeError(
@@ -326,7 +326,7 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
def __wakeup(self, future):
try:
future.result()
- except BaseException as exc:
+ except BaseException as exc:
# This may also be a cancellation.
self.__step(exc)
else:
@@ -352,15 +352,15 @@ else:
Task = _CTask = _asyncio.Task
-def create_task(coro, *, name=None):
+def create_task(coro, *, name=None):
"""Schedule the execution of a coroutine object in a spawn task.
Return a Task object.
"""
loop = events.get_running_loop()
- task = loop.create_task(coro)
- _set_task_name(task, name)
- return task
+ task = loop.create_task(coro)
+ _set_task_name(task, name)
+ return task
# wait() and as_completed() similar to those in PEP 3148.
@@ -373,7 +373,7 @@ ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
"""Wait for the Futures and coroutines given by fs to complete.
- The fs iterable must not be empty.
+ The fs iterable must not be empty.
Coroutines will be wrapped in Tasks.
@@ -394,22 +394,22 @@ async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
raise ValueError(f'Invalid return_when value: {return_when}')
if loop is None:
- loop = events.get_running_loop()
- else:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
-
- fs = set(fs)
-
- if any(coroutines.iscoroutine(f) for f in fs):
- warnings.warn("The explicit passing of coroutine objects to "
- "asyncio.wait() is deprecated since Python 3.8, and "
- "scheduled for removal in Python 3.11.",
- DeprecationWarning, stacklevel=2)
-
- fs = {ensure_future(f, loop=loop) for f in fs}
-
+ loop = events.get_running_loop()
+ else:
+ warnings.warn("The loop argument is deprecated since Python 3.8, "
+ "and scheduled for removal in Python 3.10.",
+ DeprecationWarning, stacklevel=2)
+
+ fs = set(fs)
+
+ if any(coroutines.iscoroutine(f) for f in fs):
+ warnings.warn("The explicit passing of coroutine objects to "
+ "asyncio.wait() is deprecated since Python 3.8, and "
+ "scheduled for removal in Python 3.11.",
+ DeprecationWarning, stacklevel=2)
+
+ fs = {ensure_future(f, loop=loop) for f in fs}
+
return await _wait(fs, timeout, return_when, loop)
@@ -432,11 +432,11 @@ async def wait_for(fut, timeout, *, loop=None):
This function is a coroutine.
"""
if loop is None:
- loop = events.get_running_loop()
- else:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
+ loop = events.get_running_loop()
+ else:
+ warnings.warn("The loop argument is deprecated since Python 3.8, "
+ "and scheduled for removal in Python 3.10.",
+ DeprecationWarning, stacklevel=2)
if timeout is None:
return await fut
@@ -447,11 +447,11 @@ async def wait_for(fut, timeout, *, loop=None):
if fut.done():
return fut.result()
- await _cancel_and_wait(fut, loop=loop)
- try:
- return fut.result()
- except exceptions.CancelledError as exc:
- raise exceptions.TimeoutError() from exc
+ await _cancel_and_wait(fut, loop=loop)
+ try:
+ return fut.result()
+ except exceptions.CancelledError as exc:
+ raise exceptions.TimeoutError() from exc
waiter = loop.create_future()
timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
@@ -464,16 +464,16 @@ async def wait_for(fut, timeout, *, loop=None):
# wait until the future completes or the timeout
try:
await waiter
- except exceptions.CancelledError:
- if fut.done():
- return fut.result()
- else:
- fut.remove_done_callback(cb)
- # We must ensure that the task is not running
- # after wait_for() returns.
- # See https://bugs.python.org/issue32751
- await _cancel_and_wait(fut, loop=loop)
- raise
+ except exceptions.CancelledError:
+ if fut.done():
+ return fut.result()
+ else:
+ fut.remove_done_callback(cb)
+ # We must ensure that the task is not running
+ # after wait_for() returns.
+ # See https://bugs.python.org/issue32751
+ await _cancel_and_wait(fut, loop=loop)
+ raise
if fut.done():
return fut.result()
@@ -483,13 +483,13 @@ async def wait_for(fut, timeout, *, loop=None):
# after wait_for() returns.
# See https://bugs.python.org/issue32751
await _cancel_and_wait(fut, loop=loop)
- # In case task cancellation failed with some
- # exception, we should re-raise it
- # See https://bugs.python.org/issue40607
- try:
- return fut.result()
- except exceptions.CancelledError as exc:
- raise exceptions.TimeoutError() from exc
+ # In case task cancellation failed with some
+ # exception, we should re-raise it
+ # See https://bugs.python.org/issue40607
+ try:
+ return fut.result()
+ except exceptions.CancelledError as exc:
+ raise exceptions.TimeoutError() from exc
finally:
timeout_handle.cancel()
@@ -526,8 +526,8 @@ async def _wait(fs, timeout, return_when, loop):
finally:
if timeout_handle is not None:
timeout_handle.cancel()
- for f in fs:
- f.remove_done_callback(_on_completion)
+ for f in fs:
+ f.remove_done_callback(_on_completion)
done, pending = set(), set()
for f in fs:
@@ -574,19 +574,19 @@ def as_completed(fs, *, loop=None, timeout=None):
Note: The futures 'f' are not necessarily members of fs.
"""
if futures.isfuture(fs) or coroutines.iscoroutine(fs):
- raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}")
-
- if loop is not None:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
-
+ raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}")
+
+ if loop is not None:
+ warnings.warn("The loop argument is deprecated since Python 3.8, "
+ "and scheduled for removal in Python 3.10.",
+ DeprecationWarning, stacklevel=2)
+
from .queues import Queue # Import here to avoid circular import problem.
done = Queue(loop=loop)
-
- if loop is None:
- loop = events.get_event_loop()
- todo = {ensure_future(f, loop=loop) for f in set(fs)}
+
+ if loop is None:
+ loop = events.get_event_loop()
+ todo = {ensure_future(f, loop=loop) for f in set(fs)}
timeout_handle = None
def _on_timeout():
@@ -607,7 +607,7 @@ def as_completed(fs, *, loop=None, timeout=None):
f = await done.get()
if f is None:
# Dummy value from _on_timeout().
- raise exceptions.TimeoutError
+ raise exceptions.TimeoutError
return f.result() # May raise f.exception().
for f in todo:
@@ -632,18 +632,18 @@ def __sleep0():
async def sleep(delay, result=None, *, loop=None):
"""Coroutine that completes after a given time (in seconds)."""
- if loop is not None:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
-
+ if loop is not None:
+ warnings.warn("The loop argument is deprecated since Python 3.8, "
+ "and scheduled for removal in Python 3.10.",
+ DeprecationWarning, stacklevel=2)
+
if delay <= 0:
await __sleep0()
return result
if loop is None:
- loop = events.get_running_loop()
-
+ loop = events.get_running_loop()
+
future = loop.create_future()
h = loop.call_later(delay,
futures._set_result_unless_cancelled,
@@ -668,8 +668,8 @@ def ensure_future(coro_or_future, *, loop=None):
return task
elif futures.isfuture(coro_or_future):
if loop is not None and loop is not futures._get_loop(coro_or_future):
- raise ValueError('The future belongs to a different loop than '
- 'the one specified as the loop argument')
+ raise ValueError('The future belongs to a different loop than '
+ 'the one specified as the loop argument')
return coro_or_future
elif inspect.isawaitable(coro_or_future):
return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
@@ -678,7 +678,7 @@ def ensure_future(coro_or_future, *, loop=None):
'required')
-@types.coroutine
+@types.coroutine
def _wrap_awaitable(awaitable):
"""Helper for asyncio.ensure_future().
@@ -687,9 +687,9 @@ def _wrap_awaitable(awaitable):
"""
return (yield from awaitable.__await__())
-_wrap_awaitable._is_coroutine = _is_coroutine
-
+_wrap_awaitable._is_coroutine = _is_coroutine
+
class _GatheringFuture(futures.Future):
"""Helper for gather().
@@ -703,12 +703,12 @@ class _GatheringFuture(futures.Future):
self._children = children
self._cancel_requested = False
- def cancel(self, msg=None):
+ def cancel(self, msg=None):
if self.done():
return False
ret = False
for child in self._children:
- if child.cancel(msg=msg):
+ if child.cancel(msg=msg):
ret = True
if ret:
# If any child tasks were actually cancelled, we should
@@ -740,23 +740,23 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False):
the outer Future is *not* cancelled in this case. (This is to
prevent the cancellation of one child to cause other children to
be cancelled.)
-
- If *return_exceptions* is False, cancelling gather() after it
- has been marked done won't cancel any submitted awaitables.
- For instance, gather can be marked done after propagating an
- exception to the caller, therefore, calling ``gather.cancel()``
- after catching an exception (raised by one of the awaitables) from
- gather won't cancel any other awaitables.
+
+ If *return_exceptions* is False, cancelling gather() after it
+ has been marked done won't cancel any submitted awaitables.
+ For instance, gather can be marked done after propagating an
+ exception to the caller, therefore, calling ``gather.cancel()``
+ after catching an exception (raised by one of the awaitables) from
+ gather won't cancel any other awaitables.
"""
- if loop is not None:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
-
- return _gather(*coros_or_futures, loop=loop, return_exceptions=return_exceptions)
-
-
-def _gather(*coros_or_futures, loop=None, return_exceptions=False):
+ if loop is not None:
+ warnings.warn("The loop argument is deprecated since Python 3.8, "
+ "and scheduled for removal in Python 3.10.",
+ DeprecationWarning, stacklevel=2)
+
+ return _gather(*coros_or_futures, loop=loop, return_exceptions=return_exceptions)
+
+
+def _gather(*coros_or_futures, loop=None, return_exceptions=False):
if not coros_or_futures:
if loop is None:
loop = events.get_event_loop()
@@ -779,7 +779,7 @@ def _gather(*coros_or_futures, loop=None, return_exceptions=False):
# Check if 'fut' is cancelled first, as
# 'fut.exception()' will *raise* a CancelledError
# instead of returning it.
- exc = fut._make_cancelled_error()
+ exc = fut._make_cancelled_error()
outer.set_exception(exc)
return
else:
@@ -795,15 +795,15 @@ def _gather(*coros_or_futures, loop=None, return_exceptions=False):
for fut in children:
if fut.cancelled():
- # Check if 'fut' is cancelled first, as 'fut.exception()'
- # will *raise* a CancelledError instead of returning it.
- # Also, since we're adding the exception return value
- # to 'results' instead of raising it, don't bother
- # setting __context__. This also lets us preserve
- # calling '_make_cancelled_error()' at most once.
- res = exceptions.CancelledError(
- '' if fut._cancel_message is None else
- fut._cancel_message)
+ # Check if 'fut' is cancelled first, as 'fut.exception()'
+ # will *raise* a CancelledError instead of returning it.
+ # Also, since we're adding the exception return value
+ # to 'results' instead of raising it, don't bother
+ # setting __context__. This also lets us preserve
+ # calling '_make_cancelled_error()' at most once.
+ res = exceptions.CancelledError(
+ '' if fut._cancel_message is None else
+ fut._cancel_message)
else:
res = fut.exception()
if res is None:
@@ -814,8 +814,8 @@ def _gather(*coros_or_futures, loop=None, return_exceptions=False):
# If gather is being cancelled we must propagate the
# cancellation regardless of *return_exceptions* argument.
# See issue 32684.
- exc = fut._make_cancelled_error()
- outer.set_exception(exc)
+ exc = fut._make_cancelled_error()
+ outer.set_exception(exc)
else:
outer.set_result(results)
@@ -875,10 +875,10 @@ def shield(arg, *, loop=None):
except CancelledError:
res = None
"""
- if loop is not None:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
+ if loop is not None:
+ warnings.warn("The loop argument is deprecated since Python 3.8, "
+ "and scheduled for removal in Python 3.10.",
+ DeprecationWarning, stacklevel=2)
inner = ensure_future(arg, loop=loop)
if inner.done():
# Shortcut.
@@ -886,7 +886,7 @@ def shield(arg, *, loop=None):
loop = futures._get_loop(inner)
outer = loop.create_future()
- def _inner_done_callback(inner):
+ def _inner_done_callback(inner):
if outer.cancelled():
if not inner.cancelled():
# Mark inner's result as retrieved.
@@ -902,13 +902,13 @@ def shield(arg, *, loop=None):
else:
outer.set_result(inner.result())
-
- def _outer_done_callback(outer):
- if not inner.done():
- inner.remove_done_callback(_inner_done_callback)
-
- inner.add_done_callback(_inner_done_callback)
- outer.add_done_callback(_outer_done_callback)
+
+ def _outer_done_callback(outer):
+ if not inner.done():
+ inner.remove_done_callback(_inner_done_callback)
+
+ inner.add_done_callback(_inner_done_callback)
+ outer.add_done_callback(_outer_done_callback)
return outer
@@ -924,9 +924,9 @@ def run_coroutine_threadsafe(coro, loop):
def callback():
try:
futures._chain_future(ensure_future(coro, loop=loop), future)
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as exc:
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
if future.set_running_or_notify_cancel():
future.set_exception(exc)
raise