diff options
| author | shadchin <[email protected]> | 2022-04-18 12:39:32 +0300 |
|---|---|---|
| committer | shadchin <[email protected]> | 2022-04-18 12:39:32 +0300 |
| commit | d4be68e361f4258cf0848fc70018dfe37a2acc24 (patch) | |
| tree | 153e294cd97ac8b5d7a989612704a0c1f58e8ad4 /contrib/tools/python3/src/Lib/asyncio | |
| parent | 260c02f5ccf242d9d9b8a873afaf6588c00237d6 (diff) | |
IGNIETFERRO-1816 Update Python 3 from 3.9.12 to 3.10.4
ref:9f96be6d02ee8044fdd6f124b799b270c20ce641
Diffstat (limited to 'contrib/tools/python3/src/Lib/asyncio')
| -rw-r--r-- | contrib/tools/python3/src/Lib/asyncio/__init__.py | 4 | ||||
| -rw-r--r-- | contrib/tools/python3/src/Lib/asyncio/base_events.py | 21 | ||||
| -rw-r--r-- | contrib/tools/python3/src/Lib/asyncio/events.py | 25 | ||||
| -rw-r--r-- | contrib/tools/python3/src/Lib/asyncio/futures.py | 6 | ||||
| -rw-r--r-- | contrib/tools/python3/src/Lib/asyncio/locks.py | 72 | ||||
| -rw-r--r-- | contrib/tools/python3/src/Lib/asyncio/mixins.py | 31 | ||||
| -rw-r--r-- | contrib/tools/python3/src/Lib/asyncio/proactor_events.py | 40 | ||||
| -rw-r--r-- | contrib/tools/python3/src/Lib/asyncio/queues.py | 21 | ||||
| -rw-r--r-- | contrib/tools/python3/src/Lib/asyncio/runners.py | 3 | ||||
| -rw-r--r-- | contrib/tools/python3/src/Lib/asyncio/streams.py | 51 | ||||
| -rw-r--r-- | contrib/tools/python3/src/Lib/asyncio/subprocess.py | 30 | ||||
| -rw-r--r-- | contrib/tools/python3/src/Lib/asyncio/tasks.py | 130 | ||||
| -rw-r--r-- | contrib/tools/python3/src/Lib/asyncio/unix_events.py | 34 |
13 files changed, 192 insertions, 276 deletions
diff --git a/contrib/tools/python3/src/Lib/asyncio/__init__.py b/contrib/tools/python3/src/Lib/asyncio/__init__.py index eb84bfb189c..200b14c2a3f 100644 --- a/contrib/tools/python3/src/Lib/asyncio/__init__.py +++ b/contrib/tools/python3/src/Lib/asyncio/__init__.py @@ -20,10 +20,6 @@ from .tasks import * from .threads import * from .transports import * -# Exposed for _asynciomodule.c to implement now deprecated -# Task.all_tasks() method. This function will be removed in 3.9. -from .tasks import _all_tasks_compat # NoQA - __all__ = (base_events.__all__ + coroutines.__all__ + events.__all__ + diff --git a/contrib/tools/python3/src/Lib/asyncio/base_events.py b/contrib/tools/python3/src/Lib/asyncio/base_events.py index 7a14e5e139f..952da11064f 100644 --- a/contrib/tools/python3/src/Lib/asyncio/base_events.py +++ b/contrib/tools/python3/src/Lib/asyncio/base_events.py @@ -544,10 +544,9 @@ class BaseEventLoop(events.AbstractEventLoop): closing_agens = list(self._asyncgens) self._asyncgens.clear() - results = await tasks._gather( + results = await tasks.gather( *[ag.aclose() for ag in closing_agens], - return_exceptions=True, - loop=self) + return_exceptions=True) for result, agen in zip(results, closing_agens): if isinstance(result, Exception): @@ -979,7 +978,7 @@ class BaseEventLoop(events.AbstractEventLoop): happy_eyeballs_delay=None, interleave=None): """Connect to a TCP server. - Create a streaming transport connection to a given Internet host and + Create a streaming transport connection to a given internet host and port: socket family AF_INET or socket.AF_INET6 depending on host (or family if specified), socket type SOCK_STREAM. protocol_factory must be a callable returning a protocol instance. @@ -1299,8 +1298,8 @@ class BaseEventLoop(events.AbstractEventLoop): addr_infos = {} # Using order preserving dict for idx, addr in ((0, local_addr), (1, remote_addr)): if addr is not None: - if not (isinstance(addr, tuple) and len(addr) == 2): - raise TypeError('2-tuple is expected') + assert isinstance(addr, tuple) and len(addr) == 2, ( + '2-tuple is expected') infos = await self._ensure_resolved( addr, family=family, type=socket.SOCK_DGRAM, @@ -1469,7 +1468,7 @@ class BaseEventLoop(events.AbstractEventLoop): fs = [self._create_server_getaddrinfo(host, port, family=family, flags=flags) for host in hosts] - infos = await tasks._gather(*fs, loop=self) + infos = await tasks.gather(*fs) infos = set(itertools.chain.from_iterable(infos)) completed = False @@ -1537,14 +1536,6 @@ class BaseEventLoop(events.AbstractEventLoop): self, protocol_factory, sock, *, ssl=None, ssl_handshake_timeout=None): - """Handle an accepted connection. - - This is used by servers that accept connections outside of - asyncio but that use asyncio to handle connections. - - This method is a coroutine. When completed, the coroutine - returns a (transport, protocol) pair. - """ if sock.type != socket.SOCK_STREAM: raise ValueError(f'A Stream Socket was expected, got {sock!r}') diff --git a/contrib/tools/python3/src/Lib/asyncio/events.py b/contrib/tools/python3/src/Lib/asyncio/events.py index 413ff2aaa6d..5ab1acc41bf 100644 --- a/contrib/tools/python3/src/Lib/asyncio/events.py +++ b/contrib/tools/python3/src/Lib/asyncio/events.py @@ -418,6 +418,20 @@ class AbstractEventLoop: """ raise NotImplementedError + async def connect_accepted_socket( + self, protocol_factory, sock, + *, ssl=None, + ssl_handshake_timeout=None): + """Handle an accepted connection. + + This is used by servers that accept connections outside of + asyncio, but use asyncio to handle connections. + + This method is a coroutine. When completed, the coroutine + returns a (transport, protocol) pair. + """ + raise NotImplementedError + async def create_datagram_endpoint(self, protocol_factory, local_addr=None, remote_addr=None, *, family=0, proto=0, flags=0, @@ -745,9 +759,16 @@ def get_event_loop(): the result of `get_event_loop_policy().get_event_loop()` call. """ # NOTE: this function is implemented in C (see _asynciomodule.c) + return _py__get_event_loop() + + +def _get_event_loop(stacklevel=3): current_loop = _get_running_loop() if current_loop is not None: return current_loop + import warnings + warnings.warn('There is no current event loop', + DeprecationWarning, stacklevel=stacklevel) return get_event_loop_policy().get_event_loop() @@ -777,6 +798,7 @@ _py__get_running_loop = _get_running_loop _py__set_running_loop = _set_running_loop _py_get_running_loop = get_running_loop _py_get_event_loop = get_event_loop +_py__get_event_loop = _get_event_loop try: @@ -784,7 +806,7 @@ try: # functions in asyncio. Pure Python implementation is # about 4 times slower than C-accelerated. from _asyncio import (_get_running_loop, _set_running_loop, - get_running_loop, get_event_loop) + get_running_loop, get_event_loop, _get_event_loop) except ImportError: pass else: @@ -793,3 +815,4 @@ else: _c__set_running_loop = _set_running_loop _c_get_running_loop = get_running_loop _c_get_event_loop = get_event_loop + _c__get_event_loop = _get_event_loop diff --git a/contrib/tools/python3/src/Lib/asyncio/futures.py b/contrib/tools/python3/src/Lib/asyncio/futures.py index aaab09c28e6..8e8cd876125 100644 --- a/contrib/tools/python3/src/Lib/asyncio/futures.py +++ b/contrib/tools/python3/src/Lib/asyncio/futures.py @@ -77,7 +77,7 @@ class Future: the default event loop. """ if loop is None: - self._loop = events.get_event_loop() + self._loop = events._get_event_loop() else: self._loop = loop self._callbacks = [] @@ -115,7 +115,7 @@ class Future: @_log_traceback.setter def _log_traceback(self, val): - if bool(val): + if val: raise ValueError('_log_traceback can only be set to False') self.__log_traceback = False @@ -408,7 +408,7 @@ def wrap_future(future, *, loop=None): assert isinstance(future, concurrent.futures.Future), \ f'concurrent.futures.Future is expected, got {future!r}' if loop is None: - loop = events.get_event_loop() + loop = events._get_event_loop() new_future = loop.create_future() _chain_future(future, new_future) return new_future diff --git a/contrib/tools/python3/src/Lib/asyncio/locks.py b/contrib/tools/python3/src/Lib/asyncio/locks.py index d17d7ccd813..7b81c25b2d9 100644 --- a/contrib/tools/python3/src/Lib/asyncio/locks.py +++ b/contrib/tools/python3/src/Lib/asyncio/locks.py @@ -3,10 +3,10 @@ __all__ = ('Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore') import collections -import warnings -from . import events from . import exceptions +from . import mixins +from . import tasks class _ContextManagerMixin: @@ -20,7 +20,7 @@ class _ContextManagerMixin: self.release() -class Lock(_ContextManagerMixin): +class Lock(_ContextManagerMixin, mixins._LoopBoundMixin): """Primitive lock objects. A primitive lock is a synchronization primitive that is not owned @@ -74,16 +74,10 @@ class Lock(_ContextManagerMixin): """ - def __init__(self, *, loop=None): + def __init__(self, *, loop=mixins._marker): + super().__init__(loop=loop) self._waiters = None self._locked = False - if loop is None: - self._loop = events.get_event_loop() - else: - self._loop = loop - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) def __repr__(self): res = super().__repr__() @@ -109,7 +103,7 @@ class Lock(_ContextManagerMixin): if self._waiters is None: self._waiters = collections.deque() - fut = self._loop.create_future() + fut = self._get_loop().create_future() self._waiters.append(fut) # Finally block should be called before the CancelledError @@ -161,7 +155,7 @@ class Lock(_ContextManagerMixin): fut.set_result(True) -class Event: +class Event(mixins._LoopBoundMixin): """Asynchronous equivalent to threading.Event. Class implementing event objects. An event manages a flag that can be set @@ -170,16 +164,10 @@ class Event: false. """ - def __init__(self, *, loop=None): + def __init__(self, *, loop=mixins._marker): + super().__init__(loop=loop) self._waiters = collections.deque() self._value = False - if loop is None: - self._loop = events.get_event_loop() - else: - self._loop = loop - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) def __repr__(self): res = super().__repr__() @@ -220,7 +208,7 @@ class Event: if self._value: return True - fut = self._loop.create_future() + fut = self._get_loop().create_future() self._waiters.append(fut) try: await fut @@ -229,7 +217,7 @@ class Event: self._waiters.remove(fut) -class Condition(_ContextManagerMixin): +class Condition(_ContextManagerMixin, mixins._LoopBoundMixin): """Asynchronous equivalent to threading.Condition. This class implements condition variable objects. A condition variable @@ -239,19 +227,10 @@ class Condition(_ContextManagerMixin): A new Lock object is created and used as the underlying lock. """ - def __init__(self, lock=None, *, loop=None): - if loop is None: - self._loop = events.get_event_loop() - else: - self._loop = loop - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) - + def __init__(self, lock=None, *, loop=mixins._marker): + super().__init__(loop=loop) if lock is None: - lock = Lock(loop=loop) - elif lock._loop is not self._loop: - raise ValueError("loop argument must agree with lock") + lock = Lock() self._lock = lock # Export the lock's locked(), acquire() and release() methods. @@ -284,7 +263,7 @@ class Condition(_ContextManagerMixin): self.release() try: - fut = self._loop.create_future() + fut = self._get_loop().create_future() self._waiters.append(fut) try: await fut @@ -351,7 +330,7 @@ class Condition(_ContextManagerMixin): self.notify(len(self._waiters)) -class Semaphore(_ContextManagerMixin): +class Semaphore(_ContextManagerMixin, mixins._LoopBoundMixin): """A Semaphore implementation. A semaphore manages an internal counter which is decremented by each @@ -366,18 +345,12 @@ class Semaphore(_ContextManagerMixin): ValueError is raised. """ - def __init__(self, value=1, *, loop=None): + def __init__(self, value=1, *, loop=mixins._marker): + super().__init__(loop=loop) if value < 0: raise ValueError("Semaphore initial value must be >= 0") self._value = value self._waiters = collections.deque() - if loop is None: - self._loop = events.get_event_loop() - else: - self._loop = loop - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) self._wakeup_scheduled = False def __repr__(self): @@ -411,7 +384,7 @@ class Semaphore(_ContextManagerMixin): # _wakeup_scheduled is set if *another* task is scheduled to wakeup # but its acquire() is not resumed yet while self._wakeup_scheduled or self._value <= 0: - fut = self._loop.create_future() + fut = self._get_loop().create_future() self._waiters.append(fut) try: await fut @@ -439,12 +412,7 @@ class BoundedSemaphore(Semaphore): above the initial value. """ - def __init__(self, value=1, *, loop=None): - if loop: - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) - + def __init__(self, value=1, *, loop=mixins._marker): self._bound_value = value super().__init__(value, loop=loop) diff --git a/contrib/tools/python3/src/Lib/asyncio/mixins.py b/contrib/tools/python3/src/Lib/asyncio/mixins.py new file mode 100644 index 00000000000..650df05ccc9 --- /dev/null +++ b/contrib/tools/python3/src/Lib/asyncio/mixins.py @@ -0,0 +1,31 @@ +"""Event loop mixins.""" + +import threading +from . import events + +_global_lock = threading.Lock() + +# Used as a sentinel for loop parameter +_marker = object() + + +class _LoopBoundMixin: + _loop = None + + def __init__(self, *, loop=_marker): + if loop is not _marker: + raise TypeError( + f'As of 3.10, the *loop* parameter was removed from ' + f'{type(self).__name__}() since it is no longer necessary' + ) + + def _get_loop(self): + loop = events._get_running_loop() + + if self._loop is None: + with _global_lock: + if self._loop is None: + self._loop = loop + if loop is not self._loop: + raise RuntimeError(f'{self!r} is bound to a different event loop') + return loop diff --git a/contrib/tools/python3/src/Lib/asyncio/proactor_events.py b/contrib/tools/python3/src/Lib/asyncio/proactor_events.py index e3f95cf21d6..411685bbda5 100644 --- a/contrib/tools/python3/src/Lib/asyncio/proactor_events.py +++ b/contrib/tools/python3/src/Lib/asyncio/proactor_events.py @@ -179,11 +179,12 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, """Transport for read pipes.""" def __init__(self, loop, sock, protocol, waiter=None, - extra=None, server=None): - self._pending_data = None + extra=None, server=None, buffer_size=65536): + self._pending_data_length = -1 self._paused = True super().__init__(loop, sock, protocol, waiter, extra, server) + self._data = bytearray(buffer_size) self._loop.call_soon(self._loop_reading) self._paused = False @@ -217,12 +218,12 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, if self._read_fut is None: self._loop.call_soon(self._loop_reading, None) - data = self._pending_data - self._pending_data = None - if data is not None: + length = self._pending_data_length + self._pending_data_length = -1 + if length > -1: # Call the protocol methode after calling _loop_reading(), # since the protocol can decide to pause reading again. - self._loop.call_soon(self._data_received, data) + self._loop.call_soon(self._data_received, self._data[:length], length) if self._loop.get_debug(): logger.debug("%r resumes reading", self) @@ -243,15 +244,15 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, if not keep_open: self.close() - def _data_received(self, data): + def _data_received(self, data, length): if self._paused: # Don't call any protocol method while reading is paused. # The protocol will be called on resume_reading(). - assert self._pending_data is None - self._pending_data = data + assert self._pending_data_length == -1 + self._pending_data_length = length return - if not data: + if length == 0: self._eof_received() return @@ -269,6 +270,7 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, self._protocol.data_received(data) def _loop_reading(self, fut=None): + length = -1 data = None try: if fut is not None: @@ -277,18 +279,18 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, self._read_fut = None if fut.done(): # deliver data later in "finally" clause - data = fut.result() + length = fut.result() + if length == 0: + # we got end-of-file so no need to reschedule a new read + return + + data = self._data[:length] else: # the future will be replaced by next proactor.recv call fut.cancel() if self._closing: # since close() has been called we ignore any read data - data = None - return - - if data == b'': - # we got end-of-file so no need to reschedule a new read return # bpo-33694: buffer_updated() has currently no fast path because of @@ -296,7 +298,7 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, if not self._paused: # reschedule a new read - self._read_fut = self._loop._proactor.recv(self._sock, 32768) + self._read_fut = self._loop._proactor.recv_into(self._sock, self._data) except ConnectionAbortedError as exc: if not self._closing: self._fatal_error(exc, 'Fatal read error on pipe transport') @@ -314,8 +316,8 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, if not self._paused: self._read_fut.add_done_callback(self._loop_reading) finally: - if data is not None: - self._data_received(data) + if length > -1: + self._data_received(data, length) class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport, diff --git a/contrib/tools/python3/src/Lib/asyncio/queues.py b/contrib/tools/python3/src/Lib/asyncio/queues.py index 14ae87e0a2d..10dd689bbb2 100644 --- a/contrib/tools/python3/src/Lib/asyncio/queues.py +++ b/contrib/tools/python3/src/Lib/asyncio/queues.py @@ -2,11 +2,10 @@ __all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty') import collections import heapq -import warnings from types import GenericAlias -from . import events from . import locks +from . import mixins class QueueEmpty(Exception): @@ -19,7 +18,7 @@ class QueueFull(Exception): pass -class Queue: +class Queue(mixins._LoopBoundMixin): """A queue, useful for coordinating producer and consumer coroutines. If maxsize is less than or equal to zero, the queue size is infinite. If it @@ -31,14 +30,8 @@ class Queue: interrupted between calling qsize() and doing an operation on the Queue. """ - def __init__(self, maxsize=0, *, loop=None): - if loop is None: - self._loop = events.get_event_loop() - else: - self._loop = loop - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) + def __init__(self, maxsize=0, *, loop=mixins._marker): + super().__init__(loop=loop) self._maxsize = maxsize # Futures. @@ -46,7 +39,7 @@ class Queue: # Futures. self._putters = collections.deque() self._unfinished_tasks = 0 - self._finished = locks.Event(loop=loop) + self._finished = locks.Event() self._finished.set() self._init(maxsize) @@ -122,7 +115,7 @@ class Queue: slot is available before adding item. """ while self.full(): - putter = self._loop.create_future() + putter = self._get_loop().create_future() self._putters.append(putter) try: await putter @@ -160,7 +153,7 @@ class Queue: If queue is empty, wait until an item is available. """ while self.empty(): - getter = self._loop.create_future() + getter = self._get_loop().create_future() self._getters.append(getter) try: await getter diff --git a/contrib/tools/python3/src/Lib/asyncio/runners.py b/contrib/tools/python3/src/Lib/asyncio/runners.py index 6920acba381..9a5e9a48479 100644 --- a/contrib/tools/python3/src/Lib/asyncio/runners.py +++ b/contrib/tools/python3/src/Lib/asyncio/runners.py @@ -60,8 +60,7 @@ def _cancel_all_tasks(loop): for task in to_cancel: task.cancel() - loop.run_until_complete( - tasks._gather(*to_cancel, loop=loop, return_exceptions=True)) + loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True)) for task in to_cancel: if task.cancelled(): diff --git a/contrib/tools/python3/src/Lib/asyncio/streams.py b/contrib/tools/python3/src/Lib/asyncio/streams.py index 3c80bb88925..080d8a62cde 100644 --- a/contrib/tools/python3/src/Lib/asyncio/streams.py +++ b/contrib/tools/python3/src/Lib/asyncio/streams.py @@ -23,7 +23,7 @@ _DEFAULT_LIMIT = 2 ** 16 # 64 KiB async def open_connection(host=None, port=None, *, - loop=None, limit=_DEFAULT_LIMIT, **kwds): + limit=_DEFAULT_LIMIT, **kwds): """A wrapper for create_connection() returning a (reader, writer) pair. The reader returned is a StreamReader instance; the writer is a @@ -41,12 +41,7 @@ async def open_connection(host=None, port=None, *, StreamReaderProtocol classes, just copy the code -- there's really nothing special here except some convenience.) """ - if loop is None: - loop = events.get_event_loop() - else: - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) + loop = events.get_running_loop() reader = StreamReader(limit=limit, loop=loop) protocol = StreamReaderProtocol(reader, loop=loop) transport, _ = await loop.create_connection( @@ -56,7 +51,7 @@ async def open_connection(host=None, port=None, *, async def start_server(client_connected_cb, host=None, port=None, *, - loop=None, limit=_DEFAULT_LIMIT, **kwds): + limit=_DEFAULT_LIMIT, **kwds): """Start a socket server, call back for each client connected. The first parameter, `client_connected_cb`, takes two parameters: @@ -78,12 +73,7 @@ async def start_server(client_connected_cb, host=None, port=None, *, The return value is the same as loop.create_server(), i.e. a Server object which can be used to stop the service. """ - if loop is None: - loop = events.get_event_loop() - else: - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) + loop = events.get_running_loop() def factory(): reader = StreamReader(limit=limit, loop=loop) @@ -98,14 +88,10 @@ if hasattr(socket, 'AF_UNIX'): # UNIX Domain Sockets are supported on this platform async def open_unix_connection(path=None, *, - loop=None, limit=_DEFAULT_LIMIT, **kwds): + limit=_DEFAULT_LIMIT, **kwds): """Similar to `open_connection` but works with UNIX Domain Sockets.""" - if loop is None: - loop = events.get_event_loop() - else: - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) + loop = events.get_running_loop() + reader = StreamReader(limit=limit, loop=loop) protocol = StreamReaderProtocol(reader, loop=loop) transport, _ = await loop.create_unix_connection( @@ -114,14 +100,9 @@ if hasattr(socket, 'AF_UNIX'): return reader, writer async def start_unix_server(client_connected_cb, path=None, *, - loop=None, limit=_DEFAULT_LIMIT, **kwds): + limit=_DEFAULT_LIMIT, **kwds): """Similar to `start_server` but works with UNIX Domain Sockets.""" - if loop is None: - loop = events.get_event_loop() - else: - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) + loop = events.get_running_loop() def factory(): reader = StreamReader(limit=limit, loop=loop) @@ -144,7 +125,7 @@ class FlowControlMixin(protocols.Protocol): def __init__(self, loop=None): if loop is None: - self._loop = events.get_event_loop() + self._loop = events._get_event_loop(stacklevel=4) else: self._loop = loop self._paused = False @@ -302,9 +283,13 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol): def __del__(self): # Prevent reports about unhandled exceptions. # Better than self._closed._log_traceback = False hack - closed = self._closed - if closed.done() and not closed.cancelled(): - closed.exception() + try: + closed = self._closed + except AttributeError: + pass # failed constructor + else: + if closed.done() and not closed.cancelled(): + closed.exception() class StreamWriter: @@ -400,7 +385,7 @@ class StreamReader: self._limit = limit if loop is None: - self._loop = events.get_event_loop() + self._loop = events._get_event_loop() else: self._loop = loop self._buffer = bytearray() diff --git a/contrib/tools/python3/src/Lib/asyncio/subprocess.py b/contrib/tools/python3/src/Lib/asyncio/subprocess.py index 820304eccaf..cd10231f710 100644 --- a/contrib/tools/python3/src/Lib/asyncio/subprocess.py +++ b/contrib/tools/python3/src/Lib/asyncio/subprocess.py @@ -1,7 +1,6 @@ __all__ = 'create_subprocess_exec', 'create_subprocess_shell' import subprocess -import warnings from . import events from . import protocols @@ -193,24 +192,14 @@ class Process: stderr = self._read_stream(2) else: stderr = self._noop() - stdin, stdout, stderr = await tasks._gather(stdin, stdout, stderr, - loop=self._loop) + stdin, stdout, stderr = await tasks.gather(stdin, stdout, stderr) await self.wait() return (stdout, stderr) async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None, - loop=None, limit=streams._DEFAULT_LIMIT, - **kwds): - if loop is None: - loop = events.get_event_loop() - else: - warnings.warn("The loop argument is deprecated since Python 3.8 " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, - stacklevel=2 - ) - + limit=streams._DEFAULT_LIMIT, **kwds): + loop = events.get_running_loop() protocol_factory = lambda: SubprocessStreamProtocol(limit=limit, loop=loop) transport, protocol = await loop.subprocess_shell( @@ -221,16 +210,9 @@ async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None, async def create_subprocess_exec(program, *args, stdin=None, stdout=None, - stderr=None, loop=None, - limit=streams._DEFAULT_LIMIT, **kwds): - if loop is None: - loop = events.get_event_loop() - else: - warnings.warn("The loop argument is deprecated since Python 3.8 " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, - stacklevel=2 - ) + stderr=None, limit=streams._DEFAULT_LIMIT, + **kwds): + loop = events.get_running_loop() protocol_factory = lambda: SubprocessStreamProtocol(limit=limit, loop=loop) transport, protocol = await loop.subprocess_exec( diff --git a/contrib/tools/python3/src/Lib/asyncio/tasks.py b/contrib/tools/python3/src/Lib/asyncio/tasks.py index 53252f2079d..c4bedb5c72b 100644 --- a/contrib/tools/python3/src/Lib/asyncio/tasks.py +++ b/contrib/tools/python3/src/Lib/asyncio/tasks.py @@ -62,30 +62,6 @@ def all_tasks(loop=None): if futures._get_loop(t) is loop and not t.done()} -def _all_tasks_compat(loop=None): - # Different from "all_task()" by returning *all* Tasks, including - # the completed ones. Used to implement deprecated "Tasks.all_task()" - # method. - if loop is None: - loop = events.get_event_loop() - # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another - # thread while we do so. Therefore we cast it to list prior to filtering. The list - # cast itself requires iteration, so we repeat it several times ignoring - # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for - # details. - i = 0 - while True: - try: - tasks = list(_all_tasks) - except RuntimeError: - i += 1 - if i >= 1000: - raise - else: - break - return {t for t in tasks if futures._get_loop(t) is loop} - - def _set_task_name(task, name): if name is not None: try: @@ -370,7 +346,7 @@ FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION ALL_COMPLETED = concurrent.futures.ALL_COMPLETED -async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED): +async def wait(fs, *, timeout=None, return_when=ALL_COMPLETED): """Wait for the Futures and coroutines given by fs to complete. The fs iterable must not be empty. @@ -393,12 +369,7 @@ async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED): if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED): raise ValueError(f'Invalid return_when value: {return_when}') - if loop is None: - loop = events.get_running_loop() - else: - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) + loop = events.get_running_loop() fs = set(fs) @@ -418,7 +389,7 @@ def _release_waiter(waiter, *args): waiter.set_result(None) -async def wait_for(fut, timeout, *, loop=None): +async def wait_for(fut, timeout): """Wait for the single Future or coroutine to complete, with timeout. Coroutine will be wrapped in Task. @@ -431,12 +402,7 @@ async def wait_for(fut, timeout, *, loop=None): This function is a coroutine. """ - if loop is None: - loop = events.get_running_loop() - else: - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) + loop = events.get_running_loop() if timeout is None: return await fut @@ -555,7 +521,7 @@ async def _cancel_and_wait(fut, loop): # This is *not* a @coroutine! It is just an iterator (yielding Futures). -def as_completed(fs, *, loop=None, timeout=None): +def as_completed(fs, *, timeout=None): """Return an iterator whose values are coroutines. When waiting for the yielded coroutines you'll get the results (or @@ -576,16 +542,10 @@ def as_completed(fs, *, loop=None, timeout=None): if futures.isfuture(fs) or coroutines.iscoroutine(fs): raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}") - if loop is not None: - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) - from .queues import Queue # Import here to avoid circular import problem. - done = Queue(loop=loop) + done = Queue() - if loop is None: - loop = events.get_event_loop() + loop = events._get_event_loop() todo = {ensure_future(f, loop=loop) for f in set(fs)} timeout_handle = None @@ -630,20 +590,13 @@ def __sleep0(): yield -async def sleep(delay, result=None, *, loop=None): +async def sleep(delay, result=None): """Coroutine that completes after a given time (in seconds).""" - if loop is not None: - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) - if delay <= 0: await __sleep0() return result - if loop is None: - loop = events.get_running_loop() - + loop = events.get_running_loop() future = loop.create_future() h = loop.call_later(delay, futures._set_result_unless_cancelled, @@ -659,23 +612,32 @@ def ensure_future(coro_or_future, *, loop=None): If the argument is a Future, it is returned directly. """ - if coroutines.iscoroutine(coro_or_future): - if loop is None: - loop = events.get_event_loop() - task = loop.create_task(coro_or_future) - if task._source_traceback: - del task._source_traceback[-1] - return task - elif futures.isfuture(coro_or_future): + return _ensure_future(coro_or_future, loop=loop) + + +def _ensure_future(coro_or_future, *, loop=None): + if futures.isfuture(coro_or_future): if loop is not None and loop is not futures._get_loop(coro_or_future): raise ValueError('The future belongs to a different loop than ' - 'the one specified as the loop argument') + 'the one specified as the loop argument') return coro_or_future - elif inspect.isawaitable(coro_or_future): - return ensure_future(_wrap_awaitable(coro_or_future), loop=loop) - else: - raise TypeError('An asyncio.Future, a coroutine or an awaitable is ' - 'required') + called_wrap_awaitable = False + if not coroutines.iscoroutine(coro_or_future): + if inspect.isawaitable(coro_or_future): + coro_or_future = _wrap_awaitable(coro_or_future) + called_wrap_awaitable = True + else: + raise TypeError('An asyncio.Future, a coroutine or an awaitable ' + 'is required') + + if loop is None: + loop = events._get_event_loop(stacklevel=4) + try: + return loop.create_task(coro_or_future) + except RuntimeError: + if not called_wrap_awaitable: + coro_or_future.close() + raise @types.coroutine @@ -698,7 +660,8 @@ class _GatheringFuture(futures.Future): cancelled. """ - def __init__(self, children, *, loop=None): + def __init__(self, children, *, loop): + assert loop is not None super().__init__(loop=loop) self._children = children self._cancel_requested = False @@ -718,7 +681,7 @@ class _GatheringFuture(futures.Future): return ret -def gather(*coros_or_futures, loop=None, return_exceptions=False): +def gather(*coros_or_futures, return_exceptions=False): """Return a future aggregating results from the given coroutines/futures. Coroutines will be wrapped in a future and scheduled in the event @@ -748,18 +711,8 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False): after catching an exception (raised by one of the awaitables) from gather won't cancel any other awaitables. """ - if loop is not None: - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) - - return _gather(*coros_or_futures, loop=loop, return_exceptions=return_exceptions) - - -def _gather(*coros_or_futures, loop=None, return_exceptions=False): if not coros_or_futures: - if loop is None: - loop = events.get_event_loop() + loop = events._get_event_loop() outer = loop.create_future() outer.set_result([]) return outer @@ -823,10 +776,11 @@ def _gather(*coros_or_futures, loop=None, return_exceptions=False): children = [] nfuts = 0 nfinished = 0 + loop = None outer = None # bpo-46672 for arg in coros_or_futures: if arg not in arg_to_fut: - fut = ensure_future(arg, loop=loop) + fut = _ensure_future(arg, loop=loop) if loop is None: loop = futures._get_loop(fut) if fut is not arg: @@ -850,7 +804,7 @@ def _gather(*coros_or_futures, loop=None, return_exceptions=False): return outer -def shield(arg, *, loop=None): +def shield(arg): """Wait for a future, shielding it from cancellation. The statement @@ -876,11 +830,7 @@ def shield(arg, *, loop=None): except CancelledError: res = None """ - if loop is not None: - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) - inner = ensure_future(arg, loop=loop) + inner = _ensure_future(arg) if inner.done(): # Shortcut. return inner diff --git a/contrib/tools/python3/src/Lib/asyncio/unix_events.py b/contrib/tools/python3/src/Lib/asyncio/unix_events.py index eecbc101ee1..c88b818de62 100644 --- a/contrib/tools/python3/src/Lib/asyncio/unix_events.py +++ b/contrib/tools/python3/src/Lib/asyncio/unix_events.py @@ -44,6 +44,16 @@ def _sighandler_noop(signum, frame): pass +def waitstatus_to_exitcode(status): + try: + return os.waitstatus_to_exitcode(status) + except ValueError: + # The child exited, but we don't understand its status. + # This shouldn't happen, but if it does, let's just + # return that status; perhaps that helps debug it. + return status + + class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): """Unix event loop. @@ -941,7 +951,7 @@ class PidfdChildWatcher(AbstractChildWatcher): " will report returncode 255", pid) else: - returncode = _compute_returncode(status) + returncode = waitstatus_to_exitcode(status) os.close(pidfd) callback(pid, returncode, *args) @@ -956,20 +966,6 @@ class PidfdChildWatcher(AbstractChildWatcher): return True -def _compute_returncode(status): - if os.WIFSIGNALED(status): - # The child process died because of a signal. - return -os.WTERMSIG(status) - elif os.WIFEXITED(status): - # The child process exited (e.g sys.exit()). - return os.WEXITSTATUS(status) - else: - # The child exited, but we don't understand its status. - # This shouldn't happen, but if it does, let's just - # return that status; perhaps that helps debug it. - return status - - class BaseChildWatcher(AbstractChildWatcher): def __init__(self): @@ -1080,7 +1076,7 @@ class SafeChildWatcher(BaseChildWatcher): # The child process is still alive. return - returncode = _compute_returncode(status) + returncode = waitstatus_to_exitcode(status) if self._loop.get_debug(): logger.debug('process %s exited with returncode %s', expected_pid, returncode) @@ -1173,7 +1169,7 @@ class FastChildWatcher(BaseChildWatcher): # A child process is still alive. return - returncode = _compute_returncode(status) + returncode = waitstatus_to_exitcode(status) with self._lock: try: @@ -1300,7 +1296,7 @@ class MultiLoopChildWatcher(AbstractChildWatcher): # The child process is still alive. return - returncode = _compute_returncode(status) + returncode = waitstatus_to_exitcode(status) debug_log = True try: loop, callback, args = self._callbacks.pop(pid) @@ -1403,7 +1399,7 @@ class ThreadedChildWatcher(AbstractChildWatcher): "Unknown child process pid %d, will report returncode 255", pid) else: - returncode = _compute_returncode(status) + returncode = waitstatus_to_exitcode(status) if loop.get_debug(): logger.debug('process %s exited with returncode %s', expected_pid, returncode) |
