summaryrefslogtreecommitdiffstats
path: root/contrib/tools/python3/src/Lib/asyncio
diff options
context:
space:
mode:
authorshadchin <[email protected]>2022-04-18 12:39:32 +0300
committershadchin <[email protected]>2022-04-18 12:39:32 +0300
commitd4be68e361f4258cf0848fc70018dfe37a2acc24 (patch)
tree153e294cd97ac8b5d7a989612704a0c1f58e8ad4 /contrib/tools/python3/src/Lib/asyncio
parent260c02f5ccf242d9d9b8a873afaf6588c00237d6 (diff)
IGNIETFERRO-1816 Update Python 3 from 3.9.12 to 3.10.4
ref:9f96be6d02ee8044fdd6f124b799b270c20ce641
Diffstat (limited to 'contrib/tools/python3/src/Lib/asyncio')
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/__init__.py4
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/base_events.py21
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/events.py25
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/futures.py6
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/locks.py72
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/mixins.py31
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/proactor_events.py40
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/queues.py21
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/runners.py3
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/streams.py51
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/subprocess.py30
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/tasks.py130
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/unix_events.py34
13 files changed, 192 insertions, 276 deletions
diff --git a/contrib/tools/python3/src/Lib/asyncio/__init__.py b/contrib/tools/python3/src/Lib/asyncio/__init__.py
index eb84bfb189c..200b14c2a3f 100644
--- a/contrib/tools/python3/src/Lib/asyncio/__init__.py
+++ b/contrib/tools/python3/src/Lib/asyncio/__init__.py
@@ -20,10 +20,6 @@ from .tasks import *
from .threads import *
from .transports import *
-# Exposed for _asynciomodule.c to implement now deprecated
-# Task.all_tasks() method. This function will be removed in 3.9.
-from .tasks import _all_tasks_compat # NoQA
-
__all__ = (base_events.__all__ +
coroutines.__all__ +
events.__all__ +
diff --git a/contrib/tools/python3/src/Lib/asyncio/base_events.py b/contrib/tools/python3/src/Lib/asyncio/base_events.py
index 7a14e5e139f..952da11064f 100644
--- a/contrib/tools/python3/src/Lib/asyncio/base_events.py
+++ b/contrib/tools/python3/src/Lib/asyncio/base_events.py
@@ -544,10 +544,9 @@ class BaseEventLoop(events.AbstractEventLoop):
closing_agens = list(self._asyncgens)
self._asyncgens.clear()
- results = await tasks._gather(
+ results = await tasks.gather(
*[ag.aclose() for ag in closing_agens],
- return_exceptions=True,
- loop=self)
+ return_exceptions=True)
for result, agen in zip(results, closing_agens):
if isinstance(result, Exception):
@@ -979,7 +978,7 @@ class BaseEventLoop(events.AbstractEventLoop):
happy_eyeballs_delay=None, interleave=None):
"""Connect to a TCP server.
- Create a streaming transport connection to a given Internet host and
+ Create a streaming transport connection to a given internet host and
port: socket family AF_INET or socket.AF_INET6 depending on host (or
family if specified), socket type SOCK_STREAM. protocol_factory must be
a callable returning a protocol instance.
@@ -1299,8 +1298,8 @@ class BaseEventLoop(events.AbstractEventLoop):
addr_infos = {} # Using order preserving dict
for idx, addr in ((0, local_addr), (1, remote_addr)):
if addr is not None:
- if not (isinstance(addr, tuple) and len(addr) == 2):
- raise TypeError('2-tuple is expected')
+ assert isinstance(addr, tuple) and len(addr) == 2, (
+ '2-tuple is expected')
infos = await self._ensure_resolved(
addr, family=family, type=socket.SOCK_DGRAM,
@@ -1469,7 +1468,7 @@ class BaseEventLoop(events.AbstractEventLoop):
fs = [self._create_server_getaddrinfo(host, port, family=family,
flags=flags)
for host in hosts]
- infos = await tasks._gather(*fs, loop=self)
+ infos = await tasks.gather(*fs)
infos = set(itertools.chain.from_iterable(infos))
completed = False
@@ -1537,14 +1536,6 @@ class BaseEventLoop(events.AbstractEventLoop):
self, protocol_factory, sock,
*, ssl=None,
ssl_handshake_timeout=None):
- """Handle an accepted connection.
-
- This is used by servers that accept connections outside of
- asyncio but that use asyncio to handle connections.
-
- This method is a coroutine. When completed, the coroutine
- returns a (transport, protocol) pair.
- """
if sock.type != socket.SOCK_STREAM:
raise ValueError(f'A Stream Socket was expected, got {sock!r}')
diff --git a/contrib/tools/python3/src/Lib/asyncio/events.py b/contrib/tools/python3/src/Lib/asyncio/events.py
index 413ff2aaa6d..5ab1acc41bf 100644
--- a/contrib/tools/python3/src/Lib/asyncio/events.py
+++ b/contrib/tools/python3/src/Lib/asyncio/events.py
@@ -418,6 +418,20 @@ class AbstractEventLoop:
"""
raise NotImplementedError
+ async def connect_accepted_socket(
+ self, protocol_factory, sock,
+ *, ssl=None,
+ ssl_handshake_timeout=None):
+ """Handle an accepted connection.
+
+ This is used by servers that accept connections outside of
+ asyncio, but use asyncio to handle connections.
+
+ This method is a coroutine. When completed, the coroutine
+ returns a (transport, protocol) pair.
+ """
+ raise NotImplementedError
+
async def create_datagram_endpoint(self, protocol_factory,
local_addr=None, remote_addr=None, *,
family=0, proto=0, flags=0,
@@ -745,9 +759,16 @@ def get_event_loop():
the result of `get_event_loop_policy().get_event_loop()` call.
"""
# NOTE: this function is implemented in C (see _asynciomodule.c)
+ return _py__get_event_loop()
+
+
+def _get_event_loop(stacklevel=3):
current_loop = _get_running_loop()
if current_loop is not None:
return current_loop
+ import warnings
+ warnings.warn('There is no current event loop',
+ DeprecationWarning, stacklevel=stacklevel)
return get_event_loop_policy().get_event_loop()
@@ -777,6 +798,7 @@ _py__get_running_loop = _get_running_loop
_py__set_running_loop = _set_running_loop
_py_get_running_loop = get_running_loop
_py_get_event_loop = get_event_loop
+_py__get_event_loop = _get_event_loop
try:
@@ -784,7 +806,7 @@ try:
# functions in asyncio. Pure Python implementation is
# about 4 times slower than C-accelerated.
from _asyncio import (_get_running_loop, _set_running_loop,
- get_running_loop, get_event_loop)
+ get_running_loop, get_event_loop, _get_event_loop)
except ImportError:
pass
else:
@@ -793,3 +815,4 @@ else:
_c__set_running_loop = _set_running_loop
_c_get_running_loop = get_running_loop
_c_get_event_loop = get_event_loop
+ _c__get_event_loop = _get_event_loop
diff --git a/contrib/tools/python3/src/Lib/asyncio/futures.py b/contrib/tools/python3/src/Lib/asyncio/futures.py
index aaab09c28e6..8e8cd876125 100644
--- a/contrib/tools/python3/src/Lib/asyncio/futures.py
+++ b/contrib/tools/python3/src/Lib/asyncio/futures.py
@@ -77,7 +77,7 @@ class Future:
the default event loop.
"""
if loop is None:
- self._loop = events.get_event_loop()
+ self._loop = events._get_event_loop()
else:
self._loop = loop
self._callbacks = []
@@ -115,7 +115,7 @@ class Future:
@_log_traceback.setter
def _log_traceback(self, val):
- if bool(val):
+ if val:
raise ValueError('_log_traceback can only be set to False')
self.__log_traceback = False
@@ -408,7 +408,7 @@ def wrap_future(future, *, loop=None):
assert isinstance(future, concurrent.futures.Future), \
f'concurrent.futures.Future is expected, got {future!r}'
if loop is None:
- loop = events.get_event_loop()
+ loop = events._get_event_loop()
new_future = loop.create_future()
_chain_future(future, new_future)
return new_future
diff --git a/contrib/tools/python3/src/Lib/asyncio/locks.py b/contrib/tools/python3/src/Lib/asyncio/locks.py
index d17d7ccd813..7b81c25b2d9 100644
--- a/contrib/tools/python3/src/Lib/asyncio/locks.py
+++ b/contrib/tools/python3/src/Lib/asyncio/locks.py
@@ -3,10 +3,10 @@
__all__ = ('Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore')
import collections
-import warnings
-from . import events
from . import exceptions
+from . import mixins
+from . import tasks
class _ContextManagerMixin:
@@ -20,7 +20,7 @@ class _ContextManagerMixin:
self.release()
-class Lock(_ContextManagerMixin):
+class Lock(_ContextManagerMixin, mixins._LoopBoundMixin):
"""Primitive lock objects.
A primitive lock is a synchronization primitive that is not owned
@@ -74,16 +74,10 @@ class Lock(_ContextManagerMixin):
"""
- def __init__(self, *, loop=None):
+ def __init__(self, *, loop=mixins._marker):
+ super().__init__(loop=loop)
self._waiters = None
self._locked = False
- if loop is None:
- self._loop = events.get_event_loop()
- else:
- self._loop = loop
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
def __repr__(self):
res = super().__repr__()
@@ -109,7 +103,7 @@ class Lock(_ContextManagerMixin):
if self._waiters is None:
self._waiters = collections.deque()
- fut = self._loop.create_future()
+ fut = self._get_loop().create_future()
self._waiters.append(fut)
# Finally block should be called before the CancelledError
@@ -161,7 +155,7 @@ class Lock(_ContextManagerMixin):
fut.set_result(True)
-class Event:
+class Event(mixins._LoopBoundMixin):
"""Asynchronous equivalent to threading.Event.
Class implementing event objects. An event manages a flag that can be set
@@ -170,16 +164,10 @@ class Event:
false.
"""
- def __init__(self, *, loop=None):
+ def __init__(self, *, loop=mixins._marker):
+ super().__init__(loop=loop)
self._waiters = collections.deque()
self._value = False
- if loop is None:
- self._loop = events.get_event_loop()
- else:
- self._loop = loop
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
def __repr__(self):
res = super().__repr__()
@@ -220,7 +208,7 @@ class Event:
if self._value:
return True
- fut = self._loop.create_future()
+ fut = self._get_loop().create_future()
self._waiters.append(fut)
try:
await fut
@@ -229,7 +217,7 @@ class Event:
self._waiters.remove(fut)
-class Condition(_ContextManagerMixin):
+class Condition(_ContextManagerMixin, mixins._LoopBoundMixin):
"""Asynchronous equivalent to threading.Condition.
This class implements condition variable objects. A condition variable
@@ -239,19 +227,10 @@ class Condition(_ContextManagerMixin):
A new Lock object is created and used as the underlying lock.
"""
- def __init__(self, lock=None, *, loop=None):
- if loop is None:
- self._loop = events.get_event_loop()
- else:
- self._loop = loop
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
-
+ def __init__(self, lock=None, *, loop=mixins._marker):
+ super().__init__(loop=loop)
if lock is None:
- lock = Lock(loop=loop)
- elif lock._loop is not self._loop:
- raise ValueError("loop argument must agree with lock")
+ lock = Lock()
self._lock = lock
# Export the lock's locked(), acquire() and release() methods.
@@ -284,7 +263,7 @@ class Condition(_ContextManagerMixin):
self.release()
try:
- fut = self._loop.create_future()
+ fut = self._get_loop().create_future()
self._waiters.append(fut)
try:
await fut
@@ -351,7 +330,7 @@ class Condition(_ContextManagerMixin):
self.notify(len(self._waiters))
-class Semaphore(_ContextManagerMixin):
+class Semaphore(_ContextManagerMixin, mixins._LoopBoundMixin):
"""A Semaphore implementation.
A semaphore manages an internal counter which is decremented by each
@@ -366,18 +345,12 @@ class Semaphore(_ContextManagerMixin):
ValueError is raised.
"""
- def __init__(self, value=1, *, loop=None):
+ def __init__(self, value=1, *, loop=mixins._marker):
+ super().__init__(loop=loop)
if value < 0:
raise ValueError("Semaphore initial value must be >= 0")
self._value = value
self._waiters = collections.deque()
- if loop is None:
- self._loop = events.get_event_loop()
- else:
- self._loop = loop
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
self._wakeup_scheduled = False
def __repr__(self):
@@ -411,7 +384,7 @@ class Semaphore(_ContextManagerMixin):
# _wakeup_scheduled is set if *another* task is scheduled to wakeup
# but its acquire() is not resumed yet
while self._wakeup_scheduled or self._value <= 0:
- fut = self._loop.create_future()
+ fut = self._get_loop().create_future()
self._waiters.append(fut)
try:
await fut
@@ -439,12 +412,7 @@ class BoundedSemaphore(Semaphore):
above the initial value.
"""
- def __init__(self, value=1, *, loop=None):
- if loop:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
-
+ def __init__(self, value=1, *, loop=mixins._marker):
self._bound_value = value
super().__init__(value, loop=loop)
diff --git a/contrib/tools/python3/src/Lib/asyncio/mixins.py b/contrib/tools/python3/src/Lib/asyncio/mixins.py
new file mode 100644
index 00000000000..650df05ccc9
--- /dev/null
+++ b/contrib/tools/python3/src/Lib/asyncio/mixins.py
@@ -0,0 +1,31 @@
+"""Event loop mixins."""
+
+import threading
+from . import events
+
+_global_lock = threading.Lock()
+
+# Used as a sentinel for loop parameter
+_marker = object()
+
+
+class _LoopBoundMixin:
+ _loop = None
+
+ def __init__(self, *, loop=_marker):
+ if loop is not _marker:
+ raise TypeError(
+ f'As of 3.10, the *loop* parameter was removed from '
+ f'{type(self).__name__}() since it is no longer necessary'
+ )
+
+ def _get_loop(self):
+ loop = events._get_running_loop()
+
+ if self._loop is None:
+ with _global_lock:
+ if self._loop is None:
+ self._loop = loop
+ if loop is not self._loop:
+ raise RuntimeError(f'{self!r} is bound to a different event loop')
+ return loop
diff --git a/contrib/tools/python3/src/Lib/asyncio/proactor_events.py b/contrib/tools/python3/src/Lib/asyncio/proactor_events.py
index e3f95cf21d6..411685bbda5 100644
--- a/contrib/tools/python3/src/Lib/asyncio/proactor_events.py
+++ b/contrib/tools/python3/src/Lib/asyncio/proactor_events.py
@@ -179,11 +179,12 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
"""Transport for read pipes."""
def __init__(self, loop, sock, protocol, waiter=None,
- extra=None, server=None):
- self._pending_data = None
+ extra=None, server=None, buffer_size=65536):
+ self._pending_data_length = -1
self._paused = True
super().__init__(loop, sock, protocol, waiter, extra, server)
+ self._data = bytearray(buffer_size)
self._loop.call_soon(self._loop_reading)
self._paused = False
@@ -217,12 +218,12 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
if self._read_fut is None:
self._loop.call_soon(self._loop_reading, None)
- data = self._pending_data
- self._pending_data = None
- if data is not None:
+ length = self._pending_data_length
+ self._pending_data_length = -1
+ if length > -1:
# Call the protocol methode after calling _loop_reading(),
# since the protocol can decide to pause reading again.
- self._loop.call_soon(self._data_received, data)
+ self._loop.call_soon(self._data_received, self._data[:length], length)
if self._loop.get_debug():
logger.debug("%r resumes reading", self)
@@ -243,15 +244,15 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
if not keep_open:
self.close()
- def _data_received(self, data):
+ def _data_received(self, data, length):
if self._paused:
# Don't call any protocol method while reading is paused.
# The protocol will be called on resume_reading().
- assert self._pending_data is None
- self._pending_data = data
+ assert self._pending_data_length == -1
+ self._pending_data_length = length
return
- if not data:
+ if length == 0:
self._eof_received()
return
@@ -269,6 +270,7 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
self._protocol.data_received(data)
def _loop_reading(self, fut=None):
+ length = -1
data = None
try:
if fut is not None:
@@ -277,18 +279,18 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
self._read_fut = None
if fut.done():
# deliver data later in "finally" clause
- data = fut.result()
+ length = fut.result()
+ if length == 0:
+ # we got end-of-file so no need to reschedule a new read
+ return
+
+ data = self._data[:length]
else:
# the future will be replaced by next proactor.recv call
fut.cancel()
if self._closing:
# since close() has been called we ignore any read data
- data = None
- return
-
- if data == b'':
- # we got end-of-file so no need to reschedule a new read
return
# bpo-33694: buffer_updated() has currently no fast path because of
@@ -296,7 +298,7 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
if not self._paused:
# reschedule a new read
- self._read_fut = self._loop._proactor.recv(self._sock, 32768)
+ self._read_fut = self._loop._proactor.recv_into(self._sock, self._data)
except ConnectionAbortedError as exc:
if not self._closing:
self._fatal_error(exc, 'Fatal read error on pipe transport')
@@ -314,8 +316,8 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
if not self._paused:
self._read_fut.add_done_callback(self._loop_reading)
finally:
- if data is not None:
- self._data_received(data)
+ if length > -1:
+ self._data_received(data, length)
class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
diff --git a/contrib/tools/python3/src/Lib/asyncio/queues.py b/contrib/tools/python3/src/Lib/asyncio/queues.py
index 14ae87e0a2d..10dd689bbb2 100644
--- a/contrib/tools/python3/src/Lib/asyncio/queues.py
+++ b/contrib/tools/python3/src/Lib/asyncio/queues.py
@@ -2,11 +2,10 @@ __all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty')
import collections
import heapq
-import warnings
from types import GenericAlias
-from . import events
from . import locks
+from . import mixins
class QueueEmpty(Exception):
@@ -19,7 +18,7 @@ class QueueFull(Exception):
pass
-class Queue:
+class Queue(mixins._LoopBoundMixin):
"""A queue, useful for coordinating producer and consumer coroutines.
If maxsize is less than or equal to zero, the queue size is infinite. If it
@@ -31,14 +30,8 @@ class Queue:
interrupted between calling qsize() and doing an operation on the Queue.
"""
- def __init__(self, maxsize=0, *, loop=None):
- if loop is None:
- self._loop = events.get_event_loop()
- else:
- self._loop = loop
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
+ def __init__(self, maxsize=0, *, loop=mixins._marker):
+ super().__init__(loop=loop)
self._maxsize = maxsize
# Futures.
@@ -46,7 +39,7 @@ class Queue:
# Futures.
self._putters = collections.deque()
self._unfinished_tasks = 0
- self._finished = locks.Event(loop=loop)
+ self._finished = locks.Event()
self._finished.set()
self._init(maxsize)
@@ -122,7 +115,7 @@ class Queue:
slot is available before adding item.
"""
while self.full():
- putter = self._loop.create_future()
+ putter = self._get_loop().create_future()
self._putters.append(putter)
try:
await putter
@@ -160,7 +153,7 @@ class Queue:
If queue is empty, wait until an item is available.
"""
while self.empty():
- getter = self._loop.create_future()
+ getter = self._get_loop().create_future()
self._getters.append(getter)
try:
await getter
diff --git a/contrib/tools/python3/src/Lib/asyncio/runners.py b/contrib/tools/python3/src/Lib/asyncio/runners.py
index 6920acba381..9a5e9a48479 100644
--- a/contrib/tools/python3/src/Lib/asyncio/runners.py
+++ b/contrib/tools/python3/src/Lib/asyncio/runners.py
@@ -60,8 +60,7 @@ def _cancel_all_tasks(loop):
for task in to_cancel:
task.cancel()
- loop.run_until_complete(
- tasks._gather(*to_cancel, loop=loop, return_exceptions=True))
+ loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))
for task in to_cancel:
if task.cancelled():
diff --git a/contrib/tools/python3/src/Lib/asyncio/streams.py b/contrib/tools/python3/src/Lib/asyncio/streams.py
index 3c80bb88925..080d8a62cde 100644
--- a/contrib/tools/python3/src/Lib/asyncio/streams.py
+++ b/contrib/tools/python3/src/Lib/asyncio/streams.py
@@ -23,7 +23,7 @@ _DEFAULT_LIMIT = 2 ** 16 # 64 KiB
async def open_connection(host=None, port=None, *,
- loop=None, limit=_DEFAULT_LIMIT, **kwds):
+ limit=_DEFAULT_LIMIT, **kwds):
"""A wrapper for create_connection() returning a (reader, writer) pair.
The reader returned is a StreamReader instance; the writer is a
@@ -41,12 +41,7 @@ async def open_connection(host=None, port=None, *,
StreamReaderProtocol classes, just copy the code -- there's
really nothing special here except some convenience.)
"""
- if loop is None:
- loop = events.get_event_loop()
- else:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
+ loop = events.get_running_loop()
reader = StreamReader(limit=limit, loop=loop)
protocol = StreamReaderProtocol(reader, loop=loop)
transport, _ = await loop.create_connection(
@@ -56,7 +51,7 @@ async def open_connection(host=None, port=None, *,
async def start_server(client_connected_cb, host=None, port=None, *,
- loop=None, limit=_DEFAULT_LIMIT, **kwds):
+ limit=_DEFAULT_LIMIT, **kwds):
"""Start a socket server, call back for each client connected.
The first parameter, `client_connected_cb`, takes two parameters:
@@ -78,12 +73,7 @@ async def start_server(client_connected_cb, host=None, port=None, *,
The return value is the same as loop.create_server(), i.e. a
Server object which can be used to stop the service.
"""
- if loop is None:
- loop = events.get_event_loop()
- else:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
+ loop = events.get_running_loop()
def factory():
reader = StreamReader(limit=limit, loop=loop)
@@ -98,14 +88,10 @@ if hasattr(socket, 'AF_UNIX'):
# UNIX Domain Sockets are supported on this platform
async def open_unix_connection(path=None, *,
- loop=None, limit=_DEFAULT_LIMIT, **kwds):
+ limit=_DEFAULT_LIMIT, **kwds):
"""Similar to `open_connection` but works with UNIX Domain Sockets."""
- if loop is None:
- loop = events.get_event_loop()
- else:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
+ loop = events.get_running_loop()
+
reader = StreamReader(limit=limit, loop=loop)
protocol = StreamReaderProtocol(reader, loop=loop)
transport, _ = await loop.create_unix_connection(
@@ -114,14 +100,9 @@ if hasattr(socket, 'AF_UNIX'):
return reader, writer
async def start_unix_server(client_connected_cb, path=None, *,
- loop=None, limit=_DEFAULT_LIMIT, **kwds):
+ limit=_DEFAULT_LIMIT, **kwds):
"""Similar to `start_server` but works with UNIX Domain Sockets."""
- if loop is None:
- loop = events.get_event_loop()
- else:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
+ loop = events.get_running_loop()
def factory():
reader = StreamReader(limit=limit, loop=loop)
@@ -144,7 +125,7 @@ class FlowControlMixin(protocols.Protocol):
def __init__(self, loop=None):
if loop is None:
- self._loop = events.get_event_loop()
+ self._loop = events._get_event_loop(stacklevel=4)
else:
self._loop = loop
self._paused = False
@@ -302,9 +283,13 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
def __del__(self):
# Prevent reports about unhandled exceptions.
# Better than self._closed._log_traceback = False hack
- closed = self._closed
- if closed.done() and not closed.cancelled():
- closed.exception()
+ try:
+ closed = self._closed
+ except AttributeError:
+ pass # failed constructor
+ else:
+ if closed.done() and not closed.cancelled():
+ closed.exception()
class StreamWriter:
@@ -400,7 +385,7 @@ class StreamReader:
self._limit = limit
if loop is None:
- self._loop = events.get_event_loop()
+ self._loop = events._get_event_loop()
else:
self._loop = loop
self._buffer = bytearray()
diff --git a/contrib/tools/python3/src/Lib/asyncio/subprocess.py b/contrib/tools/python3/src/Lib/asyncio/subprocess.py
index 820304eccaf..cd10231f710 100644
--- a/contrib/tools/python3/src/Lib/asyncio/subprocess.py
+++ b/contrib/tools/python3/src/Lib/asyncio/subprocess.py
@@ -1,7 +1,6 @@
__all__ = 'create_subprocess_exec', 'create_subprocess_shell'
import subprocess
-import warnings
from . import events
from . import protocols
@@ -193,24 +192,14 @@ class Process:
stderr = self._read_stream(2)
else:
stderr = self._noop()
- stdin, stdout, stderr = await tasks._gather(stdin, stdout, stderr,
- loop=self._loop)
+ stdin, stdout, stderr = await tasks.gather(stdin, stdout, stderr)
await self.wait()
return (stdout, stderr)
async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
- loop=None, limit=streams._DEFAULT_LIMIT,
- **kwds):
- if loop is None:
- loop = events.get_event_loop()
- else:
- warnings.warn("The loop argument is deprecated since Python 3.8 "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning,
- stacklevel=2
- )
-
+ limit=streams._DEFAULT_LIMIT, **kwds):
+ loop = events.get_running_loop()
protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
loop=loop)
transport, protocol = await loop.subprocess_shell(
@@ -221,16 +210,9 @@ async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
async def create_subprocess_exec(program, *args, stdin=None, stdout=None,
- stderr=None, loop=None,
- limit=streams._DEFAULT_LIMIT, **kwds):
- if loop is None:
- loop = events.get_event_loop()
- else:
- warnings.warn("The loop argument is deprecated since Python 3.8 "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning,
- stacklevel=2
- )
+ stderr=None, limit=streams._DEFAULT_LIMIT,
+ **kwds):
+ loop = events.get_running_loop()
protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
loop=loop)
transport, protocol = await loop.subprocess_exec(
diff --git a/contrib/tools/python3/src/Lib/asyncio/tasks.py b/contrib/tools/python3/src/Lib/asyncio/tasks.py
index 53252f2079d..c4bedb5c72b 100644
--- a/contrib/tools/python3/src/Lib/asyncio/tasks.py
+++ b/contrib/tools/python3/src/Lib/asyncio/tasks.py
@@ -62,30 +62,6 @@ def all_tasks(loop=None):
if futures._get_loop(t) is loop and not t.done()}
-def _all_tasks_compat(loop=None):
- # Different from "all_task()" by returning *all* Tasks, including
- # the completed ones. Used to implement deprecated "Tasks.all_task()"
- # method.
- if loop is None:
- loop = events.get_event_loop()
- # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
- # thread while we do so. Therefore we cast it to list prior to filtering. The list
- # cast itself requires iteration, so we repeat it several times ignoring
- # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
- # details.
- i = 0
- while True:
- try:
- tasks = list(_all_tasks)
- except RuntimeError:
- i += 1
- if i >= 1000:
- raise
- else:
- break
- return {t for t in tasks if futures._get_loop(t) is loop}
-
-
def _set_task_name(task, name):
if name is not None:
try:
@@ -370,7 +346,7 @@ FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
-async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
+async def wait(fs, *, timeout=None, return_when=ALL_COMPLETED):
"""Wait for the Futures and coroutines given by fs to complete.
The fs iterable must not be empty.
@@ -393,12 +369,7 @@ async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
raise ValueError(f'Invalid return_when value: {return_when}')
- if loop is None:
- loop = events.get_running_loop()
- else:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
+ loop = events.get_running_loop()
fs = set(fs)
@@ -418,7 +389,7 @@ def _release_waiter(waiter, *args):
waiter.set_result(None)
-async def wait_for(fut, timeout, *, loop=None):
+async def wait_for(fut, timeout):
"""Wait for the single Future or coroutine to complete, with timeout.
Coroutine will be wrapped in Task.
@@ -431,12 +402,7 @@ async def wait_for(fut, timeout, *, loop=None):
This function is a coroutine.
"""
- if loop is None:
- loop = events.get_running_loop()
- else:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
+ loop = events.get_running_loop()
if timeout is None:
return await fut
@@ -555,7 +521,7 @@ async def _cancel_and_wait(fut, loop):
# This is *not* a @coroutine! It is just an iterator (yielding Futures).
-def as_completed(fs, *, loop=None, timeout=None):
+def as_completed(fs, *, timeout=None):
"""Return an iterator whose values are coroutines.
When waiting for the yielded coroutines you'll get the results (or
@@ -576,16 +542,10 @@ def as_completed(fs, *, loop=None, timeout=None):
if futures.isfuture(fs) or coroutines.iscoroutine(fs):
raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}")
- if loop is not None:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
-
from .queues import Queue # Import here to avoid circular import problem.
- done = Queue(loop=loop)
+ done = Queue()
- if loop is None:
- loop = events.get_event_loop()
+ loop = events._get_event_loop()
todo = {ensure_future(f, loop=loop) for f in set(fs)}
timeout_handle = None
@@ -630,20 +590,13 @@ def __sleep0():
yield
-async def sleep(delay, result=None, *, loop=None):
+async def sleep(delay, result=None):
"""Coroutine that completes after a given time (in seconds)."""
- if loop is not None:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
-
if delay <= 0:
await __sleep0()
return result
- if loop is None:
- loop = events.get_running_loop()
-
+ loop = events.get_running_loop()
future = loop.create_future()
h = loop.call_later(delay,
futures._set_result_unless_cancelled,
@@ -659,23 +612,32 @@ def ensure_future(coro_or_future, *, loop=None):
If the argument is a Future, it is returned directly.
"""
- if coroutines.iscoroutine(coro_or_future):
- if loop is None:
- loop = events.get_event_loop()
- task = loop.create_task(coro_or_future)
- if task._source_traceback:
- del task._source_traceback[-1]
- return task
- elif futures.isfuture(coro_or_future):
+ return _ensure_future(coro_or_future, loop=loop)
+
+
+def _ensure_future(coro_or_future, *, loop=None):
+ if futures.isfuture(coro_or_future):
if loop is not None and loop is not futures._get_loop(coro_or_future):
raise ValueError('The future belongs to a different loop than '
- 'the one specified as the loop argument')
+ 'the one specified as the loop argument')
return coro_or_future
- elif inspect.isawaitable(coro_or_future):
- return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
- else:
- raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
- 'required')
+ called_wrap_awaitable = False
+ if not coroutines.iscoroutine(coro_or_future):
+ if inspect.isawaitable(coro_or_future):
+ coro_or_future = _wrap_awaitable(coro_or_future)
+ called_wrap_awaitable = True
+ else:
+ raise TypeError('An asyncio.Future, a coroutine or an awaitable '
+ 'is required')
+
+ if loop is None:
+ loop = events._get_event_loop(stacklevel=4)
+ try:
+ return loop.create_task(coro_or_future)
+ except RuntimeError:
+ if not called_wrap_awaitable:
+ coro_or_future.close()
+ raise
@types.coroutine
@@ -698,7 +660,8 @@ class _GatheringFuture(futures.Future):
cancelled.
"""
- def __init__(self, children, *, loop=None):
+ def __init__(self, children, *, loop):
+ assert loop is not None
super().__init__(loop=loop)
self._children = children
self._cancel_requested = False
@@ -718,7 +681,7 @@ class _GatheringFuture(futures.Future):
return ret
-def gather(*coros_or_futures, loop=None, return_exceptions=False):
+def gather(*coros_or_futures, return_exceptions=False):
"""Return a future aggregating results from the given coroutines/futures.
Coroutines will be wrapped in a future and scheduled in the event
@@ -748,18 +711,8 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False):
after catching an exception (raised by one of the awaitables) from
gather won't cancel any other awaitables.
"""
- if loop is not None:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
-
- return _gather(*coros_or_futures, loop=loop, return_exceptions=return_exceptions)
-
-
-def _gather(*coros_or_futures, loop=None, return_exceptions=False):
if not coros_or_futures:
- if loop is None:
- loop = events.get_event_loop()
+ loop = events._get_event_loop()
outer = loop.create_future()
outer.set_result([])
return outer
@@ -823,10 +776,11 @@ def _gather(*coros_or_futures, loop=None, return_exceptions=False):
children = []
nfuts = 0
nfinished = 0
+ loop = None
outer = None # bpo-46672
for arg in coros_or_futures:
if arg not in arg_to_fut:
- fut = ensure_future(arg, loop=loop)
+ fut = _ensure_future(arg, loop=loop)
if loop is None:
loop = futures._get_loop(fut)
if fut is not arg:
@@ -850,7 +804,7 @@ def _gather(*coros_or_futures, loop=None, return_exceptions=False):
return outer
-def shield(arg, *, loop=None):
+def shield(arg):
"""Wait for a future, shielding it from cancellation.
The statement
@@ -876,11 +830,7 @@ def shield(arg, *, loop=None):
except CancelledError:
res = None
"""
- if loop is not None:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
- inner = ensure_future(arg, loop=loop)
+ inner = _ensure_future(arg)
if inner.done():
# Shortcut.
return inner
diff --git a/contrib/tools/python3/src/Lib/asyncio/unix_events.py b/contrib/tools/python3/src/Lib/asyncio/unix_events.py
index eecbc101ee1..c88b818de62 100644
--- a/contrib/tools/python3/src/Lib/asyncio/unix_events.py
+++ b/contrib/tools/python3/src/Lib/asyncio/unix_events.py
@@ -44,6 +44,16 @@ def _sighandler_noop(signum, frame):
pass
+def waitstatus_to_exitcode(status):
+ try:
+ return os.waitstatus_to_exitcode(status)
+ except ValueError:
+ # The child exited, but we don't understand its status.
+ # This shouldn't happen, but if it does, let's just
+ # return that status; perhaps that helps debug it.
+ return status
+
+
class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
"""Unix event loop.
@@ -941,7 +951,7 @@ class PidfdChildWatcher(AbstractChildWatcher):
" will report returncode 255",
pid)
else:
- returncode = _compute_returncode(status)
+ returncode = waitstatus_to_exitcode(status)
os.close(pidfd)
callback(pid, returncode, *args)
@@ -956,20 +966,6 @@ class PidfdChildWatcher(AbstractChildWatcher):
return True
-def _compute_returncode(status):
- if os.WIFSIGNALED(status):
- # The child process died because of a signal.
- return -os.WTERMSIG(status)
- elif os.WIFEXITED(status):
- # The child process exited (e.g sys.exit()).
- return os.WEXITSTATUS(status)
- else:
- # The child exited, but we don't understand its status.
- # This shouldn't happen, but if it does, let's just
- # return that status; perhaps that helps debug it.
- return status
-
-
class BaseChildWatcher(AbstractChildWatcher):
def __init__(self):
@@ -1080,7 +1076,7 @@ class SafeChildWatcher(BaseChildWatcher):
# The child process is still alive.
return
- returncode = _compute_returncode(status)
+ returncode = waitstatus_to_exitcode(status)
if self._loop.get_debug():
logger.debug('process %s exited with returncode %s',
expected_pid, returncode)
@@ -1173,7 +1169,7 @@ class FastChildWatcher(BaseChildWatcher):
# A child process is still alive.
return
- returncode = _compute_returncode(status)
+ returncode = waitstatus_to_exitcode(status)
with self._lock:
try:
@@ -1300,7 +1296,7 @@ class MultiLoopChildWatcher(AbstractChildWatcher):
# The child process is still alive.
return
- returncode = _compute_returncode(status)
+ returncode = waitstatus_to_exitcode(status)
debug_log = True
try:
loop, callback, args = self._callbacks.pop(pid)
@@ -1403,7 +1399,7 @@ class ThreadedChildWatcher(AbstractChildWatcher):
"Unknown child process pid %d, will report returncode 255",
pid)
else:
- returncode = _compute_returncode(status)
+ returncode = waitstatus_to_exitcode(status)
if loop.get_debug():
logger.debug('process %s exited with returncode %s',
expected_pid, returncode)