aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/tools/python3/src/Lib/asyncio
diff options
context:
space:
mode:
authorshadchin <shadchin@yandex-team.ru>2022-02-10 16:44:39 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:44:39 +0300
commite9656aae26e0358d5378e5b63dcac5c8dbe0e4d0 (patch)
tree64175d5cadab313b3e7039ebaa06c5bc3295e274 /contrib/tools/python3/src/Lib/asyncio
parent2598ef1d0aee359b4b6d5fdd1758916d5907d04f (diff)
downloadydb-e9656aae26e0358d5378e5b63dcac5c8dbe0e4d0.tar.gz
Restoring authorship annotation for <shadchin@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'contrib/tools/python3/src/Lib/asyncio')
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/__init__.py8
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/__main__.py250
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/base_events.py454
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/base_futures.py44
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/base_subprocess.py10
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/base_tasks.py28
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/coroutines.py8
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/events.py70
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/exceptions.py116
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/futures.py102
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/locks.py82
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/proactor_events.py398
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/protocols.py20
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/queues.py16
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/runners.py12
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/selector_events.py332
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/sslproto.py38
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/staggered.py298
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/streams.py192
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/subprocess.py50
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/tasks.py408
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/threads.py50
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/transports.py44
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/trsock.py412
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/unix_events.py778
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/windows_events.py166
-rw-r--r--contrib/tools/python3/src/Lib/asyncio/windows_utils.py4
27 files changed, 2195 insertions, 2195 deletions
diff --git a/contrib/tools/python3/src/Lib/asyncio/__init__.py b/contrib/tools/python3/src/Lib/asyncio/__init__.py
index 8a90986b0b..eb84bfb189 100644
--- a/contrib/tools/python3/src/Lib/asyncio/__init__.py
+++ b/contrib/tools/python3/src/Lib/asyncio/__init__.py
@@ -8,7 +8,7 @@ import sys
from .base_events import *
from .coroutines import *
from .events import *
-from .exceptions import *
+from .exceptions import *
from .futures import *
from .locks import *
from .protocols import *
@@ -17,7 +17,7 @@ from .queues import *
from .streams import *
from .subprocess import *
from .tasks import *
-from .threads import *
+from .threads import *
from .transports import *
# Exposed for _asynciomodule.c to implement now deprecated
@@ -27,7 +27,7 @@ from .tasks import _all_tasks_compat # NoQA
__all__ = (base_events.__all__ +
coroutines.__all__ +
events.__all__ +
- exceptions.__all__ +
+ exceptions.__all__ +
futures.__all__ +
locks.__all__ +
protocols.__all__ +
@@ -36,7 +36,7 @@ __all__ = (base_events.__all__ +
streams.__all__ +
subprocess.__all__ +
tasks.__all__ +
- threads.__all__ +
+ threads.__all__ +
transports.__all__)
if sys.platform == 'win32': # pragma: no cover
diff --git a/contrib/tools/python3/src/Lib/asyncio/__main__.py b/contrib/tools/python3/src/Lib/asyncio/__main__.py
index abe8e722dd..18bb87a5bc 100644
--- a/contrib/tools/python3/src/Lib/asyncio/__main__.py
+++ b/contrib/tools/python3/src/Lib/asyncio/__main__.py
@@ -1,125 +1,125 @@
-import ast
-import asyncio
-import code
-import concurrent.futures
-import inspect
-import sys
-import threading
-import types
-import warnings
-
-from . import futures
-
-
-class AsyncIOInteractiveConsole(code.InteractiveConsole):
-
- def __init__(self, locals, loop):
- super().__init__(locals)
- self.compile.compiler.flags |= ast.PyCF_ALLOW_TOP_LEVEL_AWAIT
-
- self.loop = loop
-
- def runcode(self, code):
- future = concurrent.futures.Future()
-
- def callback():
- global repl_future
- global repl_future_interrupted
-
- repl_future = None
- repl_future_interrupted = False
-
- func = types.FunctionType(code, self.locals)
- try:
- coro = func()
- except SystemExit:
- raise
- except KeyboardInterrupt as ex:
- repl_future_interrupted = True
- future.set_exception(ex)
- return
- except BaseException as ex:
- future.set_exception(ex)
- return
-
- if not inspect.iscoroutine(coro):
- future.set_result(coro)
- return
-
- try:
- repl_future = self.loop.create_task(coro)
- futures._chain_future(repl_future, future)
- except BaseException as exc:
- future.set_exception(exc)
-
- loop.call_soon_threadsafe(callback)
-
- try:
- return future.result()
- except SystemExit:
- raise
- except BaseException:
- if repl_future_interrupted:
- self.write("\nKeyboardInterrupt\n")
- else:
- self.showtraceback()
-
-
-class REPLThread(threading.Thread):
-
- def run(self):
- try:
- banner = (
- f'asyncio REPL {sys.version} on {sys.platform}\n'
- f'Use "await" directly instead of "asyncio.run()".\n'
- f'Type "help", "copyright", "credits" or "license" '
- f'for more information.\n'
- f'{getattr(sys, "ps1", ">>> ")}import asyncio'
- )
-
- console.interact(
- banner=banner,
- exitmsg='exiting asyncio REPL...')
- finally:
- warnings.filterwarnings(
- 'ignore',
- message=r'^coroutine .* was never awaited$',
- category=RuntimeWarning)
-
- loop.call_soon_threadsafe(loop.stop)
-
-
-if __name__ == '__main__':
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
-
- repl_locals = {'asyncio': asyncio}
- for key in {'__name__', '__package__',
- '__loader__', '__spec__',
- '__builtins__', '__file__'}:
- repl_locals[key] = locals()[key]
-
- console = AsyncIOInteractiveConsole(repl_locals, loop)
-
- repl_future = None
- repl_future_interrupted = False
-
- try:
- import readline # NoQA
- except ImportError:
- pass
-
- repl_thread = REPLThread()
- repl_thread.daemon = True
- repl_thread.start()
-
- while True:
- try:
- loop.run_forever()
- except KeyboardInterrupt:
- if repl_future and not repl_future.done():
- repl_future.cancel()
- repl_future_interrupted = True
- continue
- else:
- break
+import ast
+import asyncio
+import code
+import concurrent.futures
+import inspect
+import sys
+import threading
+import types
+import warnings
+
+from . import futures
+
+
+class AsyncIOInteractiveConsole(code.InteractiveConsole):
+
+ def __init__(self, locals, loop):
+ super().__init__(locals)
+ self.compile.compiler.flags |= ast.PyCF_ALLOW_TOP_LEVEL_AWAIT
+
+ self.loop = loop
+
+ def runcode(self, code):
+ future = concurrent.futures.Future()
+
+ def callback():
+ global repl_future
+ global repl_future_interrupted
+
+ repl_future = None
+ repl_future_interrupted = False
+
+ func = types.FunctionType(code, self.locals)
+ try:
+ coro = func()
+ except SystemExit:
+ raise
+ except KeyboardInterrupt as ex:
+ repl_future_interrupted = True
+ future.set_exception(ex)
+ return
+ except BaseException as ex:
+ future.set_exception(ex)
+ return
+
+ if not inspect.iscoroutine(coro):
+ future.set_result(coro)
+ return
+
+ try:
+ repl_future = self.loop.create_task(coro)
+ futures._chain_future(repl_future, future)
+ except BaseException as exc:
+ future.set_exception(exc)
+
+ loop.call_soon_threadsafe(callback)
+
+ try:
+ return future.result()
+ except SystemExit:
+ raise
+ except BaseException:
+ if repl_future_interrupted:
+ self.write("\nKeyboardInterrupt\n")
+ else:
+ self.showtraceback()
+
+
+class REPLThread(threading.Thread):
+
+ def run(self):
+ try:
+ banner = (
+ f'asyncio REPL {sys.version} on {sys.platform}\n'
+ f'Use "await" directly instead of "asyncio.run()".\n'
+ f'Type "help", "copyright", "credits" or "license" '
+ f'for more information.\n'
+ f'{getattr(sys, "ps1", ">>> ")}import asyncio'
+ )
+
+ console.interact(
+ banner=banner,
+ exitmsg='exiting asyncio REPL...')
+ finally:
+ warnings.filterwarnings(
+ 'ignore',
+ message=r'^coroutine .* was never awaited$',
+ category=RuntimeWarning)
+
+ loop.call_soon_threadsafe(loop.stop)
+
+
+if __name__ == '__main__':
+ loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(loop)
+
+ repl_locals = {'asyncio': asyncio}
+ for key in {'__name__', '__package__',
+ '__loader__', '__spec__',
+ '__builtins__', '__file__'}:
+ repl_locals[key] = locals()[key]
+
+ console = AsyncIOInteractiveConsole(repl_locals, loop)
+
+ repl_future = None
+ repl_future_interrupted = False
+
+ try:
+ import readline # NoQA
+ except ImportError:
+ pass
+
+ repl_thread = REPLThread()
+ repl_thread.daemon = True
+ repl_thread.start()
+
+ while True:
+ try:
+ loop.run_forever()
+ except KeyboardInterrupt:
+ if repl_future and not repl_future.done():
+ repl_future.cancel()
+ repl_future_interrupted = True
+ continue
+ else:
+ break
diff --git a/contrib/tools/python3/src/Lib/asyncio/base_events.py b/contrib/tools/python3/src/Lib/asyncio/base_events.py
index 34fdbf2146..8c1fb49694 100644
--- a/contrib/tools/python3/src/Lib/asyncio/base_events.py
+++ b/contrib/tools/python3/src/Lib/asyncio/base_events.py
@@ -16,12 +16,12 @@ to modify the meaning of the API call itself.
import collections
import collections.abc
import concurrent.futures
-import functools
+import functools
import heapq
import itertools
import os
import socket
-import stat
+import stat
import subprocess
import threading
import time
@@ -38,14 +38,14 @@ except ImportError: # pragma: no cover
from . import constants
from . import coroutines
from . import events
-from . import exceptions
+from . import exceptions
from . import futures
from . import protocols
from . import sslproto
-from . import staggered
+from . import staggered
from . import tasks
from . import transports
-from . import trsock
+from . import trsock
from .log import logger
@@ -60,17 +60,17 @@ _MIN_SCHEDULED_TIMER_HANDLES = 100
# before cleanup of cancelled handles is performed.
_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
-
+
_HAS_IPv6 = hasattr(socket, 'AF_INET6')
# Maximum timeout passed to select to avoid OS limitations
MAXIMUM_SELECT_TIMEOUT = 24 * 3600
-# Used for deprecation and removal of `loop.create_datagram_endpoint()`'s
-# *reuse_address* parameter
-_unset = object()
+# Used for deprecation and removal of `loop.create_datagram_endpoint()`'s
+# *reuse_address* parameter
+_unset = object()
+
-
def _format_handle(handle):
cb = handle._callback
if isinstance(getattr(cb, '__self__', None), tasks.Task):
@@ -100,7 +100,7 @@ def _set_reuseport(sock):
'SO_REUSEPORT defined but not implemented.')
-def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0):
+def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0):
# Try to skip getaddrinfo if "host" is already an IP. Users might have
# handled name resolution in their own code and pass in resolved IPs.
if not hasattr(socket, 'inet_pton'):
@@ -149,7 +149,7 @@ def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0):
socket.inet_pton(af, host)
# The host has already been resolved.
if _HAS_IPv6 and af == socket.AF_INET6:
- return af, type, proto, '', (host, port, flowinfo, scopeid)
+ return af, type, proto, '', (host, port, flowinfo, scopeid)
else:
return af, type, proto, '', (host, port)
except OSError:
@@ -159,32 +159,32 @@ def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0):
return None
-def _interleave_addrinfos(addrinfos, first_address_family_count=1):
- """Interleave list of addrinfo tuples by family."""
- # Group addresses by family
- addrinfos_by_family = collections.OrderedDict()
- for addr in addrinfos:
- family = addr[0]
- if family not in addrinfos_by_family:
- addrinfos_by_family[family] = []
- addrinfos_by_family[family].append(addr)
- addrinfos_lists = list(addrinfos_by_family.values())
-
- reordered = []
- if first_address_family_count > 1:
- reordered.extend(addrinfos_lists[0][:first_address_family_count - 1])
- del addrinfos_lists[0][:first_address_family_count - 1]
- reordered.extend(
- a for a in itertools.chain.from_iterable(
- itertools.zip_longest(*addrinfos_lists)
- ) if a is not None)
- return reordered
-
-
+def _interleave_addrinfos(addrinfos, first_address_family_count=1):
+ """Interleave list of addrinfo tuples by family."""
+ # Group addresses by family
+ addrinfos_by_family = collections.OrderedDict()
+ for addr in addrinfos:
+ family = addr[0]
+ if family not in addrinfos_by_family:
+ addrinfos_by_family[family] = []
+ addrinfos_by_family[family].append(addr)
+ addrinfos_lists = list(addrinfos_by_family.values())
+
+ reordered = []
+ if first_address_family_count > 1:
+ reordered.extend(addrinfos_lists[0][:first_address_family_count - 1])
+ del addrinfos_lists[0][:first_address_family_count - 1]
+ reordered.extend(
+ a for a in itertools.chain.from_iterable(
+ itertools.zip_longest(*addrinfos_lists)
+ ) if a is not None)
+ return reordered
+
+
def _run_until_complete_cb(fut):
if not fut.cancelled():
exc = fut.exception()
- if isinstance(exc, (SystemExit, KeyboardInterrupt)):
+ if isinstance(exc, (SystemExit, KeyboardInterrupt)):
# Issue #22429: run_forever() already finished, no need to
# stop it.
return
@@ -324,8 +324,8 @@ class Server(events.AbstractServer):
@property
def sockets(self):
if self._sockets is None:
- return ()
- return tuple(trsock.TransportSocket(s) for s in self._sockets)
+ return ()
+ return tuple(trsock.TransportSocket(s) for s in self._sockets)
def close(self):
sockets = self._sockets
@@ -350,7 +350,7 @@ class Server(events.AbstractServer):
self._start_serving()
# Skip one loop iteration so that all 'loop.add_reader'
# go through.
- await tasks.sleep(0)
+ await tasks.sleep(0)
async def serve_forever(self):
if self._serving_forever_fut is not None:
@@ -364,7 +364,7 @@ class Server(events.AbstractServer):
try:
await self._serving_forever_fut
- except exceptions.CancelledError:
+ except exceptions.CancelledError:
try:
self.close()
await self.wait_closed()
@@ -410,8 +410,8 @@ class BaseEventLoop(events.AbstractEventLoop):
self._asyncgens = weakref.WeakSet()
# Set to True when `loop.shutdown_asyncgens` is called.
self._asyncgens_shutdown_called = False
- # Set to True when `loop.shutdown_default_executor` is called.
- self._executor_shutdown_called = False
+ # Set to True when `loop.shutdown_default_executor` is called.
+ self._executor_shutdown_called = False
def __repr__(self):
return (
@@ -423,20 +423,20 @@ class BaseEventLoop(events.AbstractEventLoop):
"""Create a Future object attached to the loop."""
return futures.Future(loop=self)
- def create_task(self, coro, *, name=None):
+ def create_task(self, coro, *, name=None):
"""Schedule a coroutine object.
Return a task object.
"""
self._check_closed()
if self._task_factory is None:
- task = tasks.Task(coro, loop=self, name=name)
+ task = tasks.Task(coro, loop=self, name=name)
if task._source_traceback:
del task._source_traceback[-1]
else:
task = self._task_factory(self, coro)
- tasks._set_task_name(task, name)
-
+ tasks._set_task_name(task, name)
+
return task
def set_task_factory(self, factory):
@@ -509,10 +509,10 @@ class BaseEventLoop(events.AbstractEventLoop):
if self._closed:
raise RuntimeError('Event loop is closed')
- def _check_default_executor(self):
- if self._executor_shutdown_called:
- raise RuntimeError('Executor shutdown has been called')
-
+ def _check_default_executor(self):
+ if self._executor_shutdown_called:
+ raise RuntimeError('Executor shutdown has been called')
+
def _asyncgen_finalizer_hook(self, agen):
self._asyncgens.discard(agen)
if not self.is_closed():
@@ -539,7 +539,7 @@ class BaseEventLoop(events.AbstractEventLoop):
closing_agens = list(self._asyncgens)
self._asyncgens.clear()
- results = await tasks._gather(
+ results = await tasks._gather(
*[ag.aclose() for ag in closing_agens],
return_exceptions=True,
loop=self)
@@ -553,37 +553,37 @@ class BaseEventLoop(events.AbstractEventLoop):
'asyncgen': agen
})
- async def shutdown_default_executor(self):
- """Schedule the shutdown of the default executor."""
- self._executor_shutdown_called = True
- if self._default_executor is None:
- return
- future = self.create_future()
- thread = threading.Thread(target=self._do_shutdown, args=(future,))
- thread.start()
- try:
- await future
- finally:
- thread.join()
-
- def _do_shutdown(self, future):
- try:
- self._default_executor.shutdown(wait=True)
- self.call_soon_threadsafe(future.set_result, None)
- except Exception as ex:
- self.call_soon_threadsafe(future.set_exception, ex)
-
- def _check_running(self):
+ async def shutdown_default_executor(self):
+ """Schedule the shutdown of the default executor."""
+ self._executor_shutdown_called = True
+ if self._default_executor is None:
+ return
+ future = self.create_future()
+ thread = threading.Thread(target=self._do_shutdown, args=(future,))
+ thread.start()
+ try:
+ await future
+ finally:
+ thread.join()
+
+ def _do_shutdown(self, future):
+ try:
+ self._default_executor.shutdown(wait=True)
+ self.call_soon_threadsafe(future.set_result, None)
+ except Exception as ex:
+ self.call_soon_threadsafe(future.set_exception, ex)
+
+ def _check_running(self):
if self.is_running():
raise RuntimeError('This event loop is already running')
if events._get_running_loop() is not None:
raise RuntimeError(
'Cannot run the event loop while another loop is running')
-
- def run_forever(self):
- """Run until stop() is called."""
- self._check_closed()
- self._check_running()
+
+ def run_forever(self):
+ """Run until stop() is called."""
+ self._check_closed()
+ self._check_running()
self._set_coroutine_origin_tracking(self._debug)
self._thread_id = threading.get_ident()
@@ -615,7 +615,7 @@ class BaseEventLoop(events.AbstractEventLoop):
Return the Future's result, or raise its exception.
"""
self._check_closed()
- self._check_running()
+ self._check_running()
new_task = not futures.isfuture(future)
future = tasks.ensure_future(future, loop=self)
@@ -666,7 +666,7 @@ class BaseEventLoop(events.AbstractEventLoop):
self._closed = True
self._ready.clear()
self._scheduled.clear()
- self._executor_shutdown_called = True
+ self._executor_shutdown_called = True
executor = self._default_executor
if executor is not None:
self._default_executor = None
@@ -676,9 +676,9 @@ class BaseEventLoop(events.AbstractEventLoop):
"""Returns True if the event loop was closed."""
return self._closed
- def __del__(self, _warn=warnings.warn):
+ def __del__(self, _warn=warnings.warn):
if not self.is_closed():
- _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self)
+ _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self)
if not self.is_running():
self.close()
@@ -803,23 +803,23 @@ class BaseEventLoop(events.AbstractEventLoop):
self._check_callback(func, 'run_in_executor')
if executor is None:
executor = self._default_executor
- # Only check when the default executor is being used
- self._check_default_executor()
+ # Only check when the default executor is being used
+ self._check_default_executor()
if executor is None:
- executor = concurrent.futures.ThreadPoolExecutor(
- thread_name_prefix='asyncio'
- )
+ executor = concurrent.futures.ThreadPoolExecutor(
+ thread_name_prefix='asyncio'
+ )
self._default_executor = executor
return futures.wrap_future(
executor.submit(func, *args), loop=self)
def set_default_executor(self, executor):
- if not isinstance(executor, concurrent.futures.ThreadPoolExecutor):
- warnings.warn(
- 'Using the default executor that is not an instance of '
- 'ThreadPoolExecutor is deprecated and will be prohibited '
- 'in Python 3.9',
- DeprecationWarning, 2)
+ if not isinstance(executor, concurrent.futures.ThreadPoolExecutor):
+ warnings.warn(
+ 'Using the default executor that is not an instance of '
+ 'ThreadPoolExecutor is deprecated and will be prohibited '
+ 'in Python 3.9',
+ DeprecationWarning, 2)
self._default_executor = executor
def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
@@ -868,7 +868,7 @@ class BaseEventLoop(events.AbstractEventLoop):
try:
return await self._sock_sendfile_native(sock, file,
offset, count)
- except exceptions.SendfileNotAvailableError as exc:
+ except exceptions.SendfileNotAvailableError as exc:
if not fallback:
raise
return await self._sock_sendfile_fallback(sock, file,
@@ -877,7 +877,7 @@ class BaseEventLoop(events.AbstractEventLoop):
async def _sock_sendfile_native(self, sock, file, offset, count):
# NB: sendfile syscall is not supported for SSL sockets and
# non-mmap files even if sendfile is supported by OS
- raise exceptions.SendfileNotAvailableError(
+ raise exceptions.SendfileNotAvailableError(
f"syscall sendfile is not available for socket {sock!r} "
"and file {file!r} combination")
@@ -900,7 +900,7 @@ class BaseEventLoop(events.AbstractEventLoop):
read = await self.run_in_executor(None, file.readinto, view)
if not read:
break # EOF
- await self.sock_sendall(sock, view[:read])
+ await self.sock_sendall(sock, view[:read])
total_sent += read
return total_sent
finally:
@@ -928,49 +928,49 @@ class BaseEventLoop(events.AbstractEventLoop):
"offset must be a non-negative integer (got {!r})".format(
offset))
- async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None):
- """Create, bind and connect one socket."""
- my_exceptions = []
- exceptions.append(my_exceptions)
- family, type_, proto, _, address = addr_info
- sock = None
- try:
- sock = socket.socket(family=family, type=type_, proto=proto)
- sock.setblocking(False)
- if local_addr_infos is not None:
- for _, _, _, _, laddr in local_addr_infos:
- try:
- sock.bind(laddr)
- break
- except OSError as exc:
- msg = (
- f'error while attempting to bind on '
- f'address {laddr!r}: '
- f'{exc.strerror.lower()}'
- )
- exc = OSError(exc.errno, msg)
- my_exceptions.append(exc)
- else: # all bind attempts failed
- raise my_exceptions.pop()
- await self.sock_connect(sock, address)
- return sock
- except OSError as exc:
- my_exceptions.append(exc)
- if sock is not None:
- sock.close()
- raise
- except:
- if sock is not None:
- sock.close()
- raise
-
+ async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None):
+ """Create, bind and connect one socket."""
+ my_exceptions = []
+ exceptions.append(my_exceptions)
+ family, type_, proto, _, address = addr_info
+ sock = None
+ try:
+ sock = socket.socket(family=family, type=type_, proto=proto)
+ sock.setblocking(False)
+ if local_addr_infos is not None:
+ for _, _, _, _, laddr in local_addr_infos:
+ try:
+ sock.bind(laddr)
+ break
+ except OSError as exc:
+ msg = (
+ f'error while attempting to bind on '
+ f'address {laddr!r}: '
+ f'{exc.strerror.lower()}'
+ )
+ exc = OSError(exc.errno, msg)
+ my_exceptions.append(exc)
+ else: # all bind attempts failed
+ raise my_exceptions.pop()
+ await self.sock_connect(sock, address)
+ return sock
+ except OSError as exc:
+ my_exceptions.append(exc)
+ if sock is not None:
+ sock.close()
+ raise
+ except:
+ if sock is not None:
+ sock.close()
+ raise
+
async def create_connection(
self, protocol_factory, host=None, port=None,
*, ssl=None, family=0,
proto=0, flags=0, sock=None,
local_addr=None, server_hostname=None,
- ssl_handshake_timeout=None,
- happy_eyeballs_delay=None, interleave=None):
+ ssl_handshake_timeout=None,
+ happy_eyeballs_delay=None, interleave=None):
"""Connect to a TCP server.
Create a streaming transport connection to a given Internet host and
@@ -1005,10 +1005,10 @@ class BaseEventLoop(events.AbstractEventLoop):
raise ValueError(
'ssl_handshake_timeout is only meaningful with ssl')
- if happy_eyeballs_delay is not None and interleave is None:
- # If using happy eyeballs, default to interleave addresses by family
- interleave = 1
-
+ if happy_eyeballs_delay is not None and interleave is None:
+ # If using happy eyeballs, default to interleave addresses by family
+ interleave = 1
+
if host is not None or port is not None:
if sock is not None:
raise ValueError(
@@ -1027,31 +1027,31 @@ class BaseEventLoop(events.AbstractEventLoop):
flags=flags, loop=self)
if not laddr_infos:
raise OSError('getaddrinfo() returned empty list')
- else:
- laddr_infos = None
+ else:
+ laddr_infos = None
+
+ if interleave:
+ infos = _interleave_addrinfos(infos, interleave)
- if interleave:
- infos = _interleave_addrinfos(infos, interleave)
-
exceptions = []
- if happy_eyeballs_delay is None:
- # not using happy eyeballs
- for addrinfo in infos:
- try:
- sock = await self._connect_sock(
- exceptions, addrinfo, laddr_infos)
- break
- except OSError:
- continue
- else: # using happy eyeballs
- sock, _, _ = await staggered.staggered_race(
- (functools.partial(self._connect_sock,
- exceptions, addrinfo, laddr_infos)
- for addrinfo in infos),
- happy_eyeballs_delay, loop=self)
-
- if sock is None:
- exceptions = [exc for sub in exceptions for exc in sub]
+ if happy_eyeballs_delay is None:
+ # not using happy eyeballs
+ for addrinfo in infos:
+ try:
+ sock = await self._connect_sock(
+ exceptions, addrinfo, laddr_infos)
+ break
+ except OSError:
+ continue
+ else: # using happy eyeballs
+ sock, _, _ = await staggered.staggered_race(
+ (functools.partial(self._connect_sock,
+ exceptions, addrinfo, laddr_infos)
+ for addrinfo in infos),
+ happy_eyeballs_delay, loop=self)
+
+ if sock is None:
+ exceptions = [exc for sub in exceptions for exc in sub]
if len(exceptions) == 1:
raise exceptions[0]
else:
@@ -1150,7 +1150,7 @@ class BaseEventLoop(events.AbstractEventLoop):
try:
return await self._sendfile_native(transport, file,
offset, count)
- except exceptions.SendfileNotAvailableError as exc:
+ except exceptions.SendfileNotAvailableError as exc:
if not fallback:
raise
@@ -1163,7 +1163,7 @@ class BaseEventLoop(events.AbstractEventLoop):
offset, count)
async def _sendfile_native(self, transp, file, offset, count):
- raise exceptions.SendfileNotAvailableError(
+ raise exceptions.SendfileNotAvailableError(
"sendfile syscall is not supported")
async def _sendfile_fallback(self, transp, file, offset, count):
@@ -1180,11 +1180,11 @@ class BaseEventLoop(events.AbstractEventLoop):
if blocksize <= 0:
return total_sent
view = memoryview(buf)[:blocksize]
- read = await self.run_in_executor(None, file.readinto, view)
+ read = await self.run_in_executor(None, file.readinto, view)
if not read:
return total_sent # EOF
await proto.drain()
- transp.write(view[:read])
+ transp.write(view[:read])
total_sent += read
finally:
if total_sent > 0 and hasattr(file, 'seek'):
@@ -1229,7 +1229,7 @@ class BaseEventLoop(events.AbstractEventLoop):
try:
await waiter
- except BaseException:
+ except BaseException:
transport.close()
conmade_cb.cancel()
resume_cb.cancel()
@@ -1240,7 +1240,7 @@ class BaseEventLoop(events.AbstractEventLoop):
async def create_datagram_endpoint(self, protocol_factory,
local_addr=None, remote_addr=None, *,
family=0, proto=0, flags=0,
- reuse_address=_unset, reuse_port=None,
+ reuse_address=_unset, reuse_port=None,
allow_broadcast=None, sock=None):
"""Create datagram connection."""
if sock is not None:
@@ -1249,7 +1249,7 @@ class BaseEventLoop(events.AbstractEventLoop):
f'A UDP Socket was expected, got {sock!r}')
if (local_addr or remote_addr or
family or proto or flags or
- reuse_port or allow_broadcast):
+ reuse_port or allow_broadcast):
# show the problematic kwargs in exception msg
opts = dict(local_addr=local_addr, remote_addr=remote_addr,
family=family, proto=proto, flags=flags,
@@ -1270,28 +1270,28 @@ class BaseEventLoop(events.AbstractEventLoop):
for addr in (local_addr, remote_addr):
if addr is not None and not isinstance(addr, str):
raise TypeError('string is expected')
-
- if local_addr and local_addr[0] not in (0, '\x00'):
- try:
- if stat.S_ISSOCK(os.stat(local_addr).st_mode):
- os.remove(local_addr)
- except FileNotFoundError:
- pass
- except OSError as err:
- # Directory may have permissions only to create socket.
- logger.error('Unable to check or remove stale UNIX '
- 'socket %r: %r',
- local_addr, err)
-
+
+ if local_addr and local_addr[0] not in (0, '\x00'):
+ try:
+ if stat.S_ISSOCK(os.stat(local_addr).st_mode):
+ os.remove(local_addr)
+ except FileNotFoundError:
+ pass
+ except OSError as err:
+ # Directory may have permissions only to create socket.
+ logger.error('Unable to check or remove stale UNIX '
+ 'socket %r: %r',
+ local_addr, err)
+
addr_pairs_info = (((family, proto),
(local_addr, remote_addr)), )
else:
# join address by (family, protocol)
- addr_infos = {} # Using order preserving dict
+ addr_infos = {} # Using order preserving dict
for idx, addr in ((0, local_addr), (1, remote_addr)):
if addr is not None:
- if not (isinstance(addr, tuple) and len(addr) == 2):
- raise TypeError('2-tuple is expected')
+ if not (isinstance(addr, tuple) and len(addr) == 2):
+ raise TypeError('2-tuple is expected')
infos = await self._ensure_resolved(
addr, family=family, type=socket.SOCK_DGRAM,
@@ -1316,18 +1316,18 @@ class BaseEventLoop(events.AbstractEventLoop):
exceptions = []
- # bpo-37228
- if reuse_address is not _unset:
- if reuse_address:
- raise ValueError("Passing `reuse_address=True` is no "
- "longer supported, as the usage of "
- "SO_REUSEPORT in UDP poses a significant "
- "security concern.")
- else:
- warnings.warn("The *reuse_address* parameter has been "
- "deprecated as of 3.5.10 and is scheduled "
- "for removal in 3.11.", DeprecationWarning,
- stacklevel=2)
+ # bpo-37228
+ if reuse_address is not _unset:
+ if reuse_address:
+ raise ValueError("Passing `reuse_address=True` is no "
+ "longer supported, as the usage of "
+ "SO_REUSEPORT in UDP poses a significant "
+ "security concern.")
+ else:
+ warnings.warn("The *reuse_address* parameter has been "
+ "deprecated as of 3.5.10 and is scheduled "
+ "for removal in 3.11.", DeprecationWarning,
+ stacklevel=2)
for ((family, proto),
(local_address, remote_address)) in addr_pairs_info:
@@ -1346,8 +1346,8 @@ class BaseEventLoop(events.AbstractEventLoop):
if local_addr:
sock.bind(local_address)
if remote_addr:
- if not allow_broadcast:
- await self.sock_connect(sock, remote_address)
+ if not allow_broadcast:
+ await self.sock_connect(sock, remote_address)
r_addr = remote_address
except OSError as exc:
if sock is not None:
@@ -1388,7 +1388,7 @@ class BaseEventLoop(events.AbstractEventLoop):
family=0, type=socket.SOCK_STREAM,
proto=0, flags=0, loop):
host, port = address[:2]
- info = _ipaddr_info(host, port, family, type, proto, *address[2:])
+ info = _ipaddr_info(host, port, family, type, proto, *address[2:])
if info is not None:
# "host" is already a resolved IP.
return [info]
@@ -1457,7 +1457,7 @@ class BaseEventLoop(events.AbstractEventLoop):
fs = [self._create_server_getaddrinfo(host, port, family=family,
flags=flags)
for host in hosts]
- infos = await tasks._gather(*fs, loop=self)
+ infos = await tasks._gather(*fs, loop=self)
infos = set(itertools.chain.from_iterable(infos))
completed = False
@@ -1515,7 +1515,7 @@ class BaseEventLoop(events.AbstractEventLoop):
server._start_serving()
# Skip one loop iteration so that all 'loop.add_reader'
# go through.
- await tasks.sleep(0)
+ await tasks.sleep(0)
if self._debug:
logger.info("%r is serving", server)
@@ -1601,7 +1601,7 @@ class BaseEventLoop(events.AbstractEventLoop):
stderr=subprocess.PIPE,
universal_newlines=False,
shell=True, bufsize=0,
- encoding=None, errors=None, text=None,
+ encoding=None, errors=None, text=None,
**kwargs):
if not isinstance(cmd, (bytes, str)):
raise ValueError("cmd must be a string")
@@ -1611,13 +1611,13 @@ class BaseEventLoop(events.AbstractEventLoop):
raise ValueError("shell must be True")
if bufsize != 0:
raise ValueError("bufsize must be 0")
- if text:
- raise ValueError("text must be False")
- if encoding is not None:
- raise ValueError("encoding must be None")
- if errors is not None:
- raise ValueError("errors must be None")
-
+ if text:
+ raise ValueError("text must be False")
+ if encoding is not None:
+ raise ValueError("encoding must be None")
+ if errors is not None:
+ raise ValueError("errors must be None")
+
protocol = protocol_factory()
debug_log = None
if self._debug:
@@ -1634,22 +1634,22 @@ class BaseEventLoop(events.AbstractEventLoop):
async def subprocess_exec(self, protocol_factory, program, *args,
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, universal_newlines=False,
- shell=False, bufsize=0,
- encoding=None, errors=None, text=None,
- **kwargs):
+ shell=False, bufsize=0,
+ encoding=None, errors=None, text=None,
+ **kwargs):
if universal_newlines:
raise ValueError("universal_newlines must be False")
if shell:
raise ValueError("shell must be False")
if bufsize != 0:
raise ValueError("bufsize must be 0")
- if text:
- raise ValueError("text must be False")
- if encoding is not None:
- raise ValueError("encoding must be None")
- if errors is not None:
- raise ValueError("errors must be None")
-
+ if text:
+ raise ValueError("text must be False")
+ if encoding is not None:
+ raise ValueError("encoding must be None")
+ if errors is not None:
+ raise ValueError("errors must be None")
+
popen_args = (program,) + args
protocol = protocol_factory()
debug_log = None
@@ -1762,9 +1762,9 @@ class BaseEventLoop(events.AbstractEventLoop):
if self._exception_handler is None:
try:
self.default_exception_handler(context)
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException:
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException:
# Second protection layer for unexpected errors
# in the default implementation, as well as for subclassed
# event loops with overloaded "default_exception_handler".
@@ -1773,9 +1773,9 @@ class BaseEventLoop(events.AbstractEventLoop):
else:
try:
self._exception_handler(self, context)
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as exc:
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
# Exception in the user set custom exception handler.
try:
# Let's try default handler.
@@ -1784,9 +1784,9 @@ class BaseEventLoop(events.AbstractEventLoop):
'exception': exc,
'context': context,
})
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException:
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException:
# Guard 'default_exception_handler' in case it is
# overloaded.
logger.error('Exception in default exception handler '
@@ -1851,7 +1851,7 @@ class BaseEventLoop(events.AbstractEventLoop):
when = self._scheduled[0]._when
timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
- event_list = self._selector.select(timeout)
+ event_list = self._selector.select(timeout)
self._process_events(event_list)
# Handle 'later' callbacks that are ready.
diff --git a/contrib/tools/python3/src/Lib/asyncio/base_futures.py b/contrib/tools/python3/src/Lib/asyncio/base_futures.py
index 0962405d8d..2c01ac98e1 100644
--- a/contrib/tools/python3/src/Lib/asyncio/base_futures.py
+++ b/contrib/tools/python3/src/Lib/asyncio/base_futures.py
@@ -1,7 +1,7 @@
__all__ = ()
import reprlib
-from _thread import get_ident
+from _thread import get_ident
from . import format_helpers
@@ -42,16 +42,16 @@ def _format_callbacks(cb):
return f'cb=[{cb}]'
-# bpo-42183: _repr_running is needed for repr protection
-# when a Future or Task result contains itself directly or indirectly.
-# The logic is borrowed from @reprlib.recursive_repr decorator.
-# Unfortunately, the direct decorator usage is impossible because of
-# AttributeError: '_asyncio.Task' object has no attribute '__module__' error.
-#
-# After fixing this thing we can return to the decorator based approach.
-_repr_running = set()
-
-
+# bpo-42183: _repr_running is needed for repr protection
+# when a Future or Task result contains itself directly or indirectly.
+# The logic is borrowed from @reprlib.recursive_repr decorator.
+# Unfortunately, the direct decorator usage is impossible because of
+# AttributeError: '_asyncio.Task' object has no attribute '__module__' error.
+#
+# After fixing this thing we can return to the decorator based approach.
+_repr_running = set()
+
+
def _future_repr_info(future):
# (Future) -> str
"""helper function for Future.__repr__"""
@@ -60,17 +60,17 @@ def _future_repr_info(future):
if future._exception is not None:
info.append(f'exception={future._exception!r}')
else:
- key = id(future), get_ident()
- if key in _repr_running:
- result = '...'
- else:
- _repr_running.add(key)
- try:
- # use reprlib to limit the length of the output, especially
- # for very long strings
- result = reprlib.repr(future._result)
- finally:
- _repr_running.discard(key)
+ key = id(future), get_ident()
+ if key in _repr_running:
+ result = '...'
+ else:
+ _repr_running.add(key)
+ try:
+ # use reprlib to limit the length of the output, especially
+ # for very long strings
+ result = reprlib.repr(future._result)
+ finally:
+ _repr_running.discard(key)
info.append(f'result={result}')
if future._callbacks:
info.append(_format_callbacks(future._callbacks))
diff --git a/contrib/tools/python3/src/Lib/asyncio/base_subprocess.py b/contrib/tools/python3/src/Lib/asyncio/base_subprocess.py
index 05daeedd37..14d5051922 100644
--- a/contrib/tools/python3/src/Lib/asyncio/base_subprocess.py
+++ b/contrib/tools/python3/src/Lib/asyncio/base_subprocess.py
@@ -120,9 +120,9 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
# Don't clear the _proc reference yet: _post_init() may still run
- def __del__(self, _warn=warnings.warn):
+ def __del__(self, _warn=warnings.warn):
if not self._closed:
- _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
+ _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
self.close()
def get_pid(self):
@@ -182,9 +182,9 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
for callback, data in self._pending_calls:
loop.call_soon(callback, *data)
self._pending_calls = None
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as exc:
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
if waiter is not None and not waiter.cancelled():
waiter.set_exception(exc)
else:
diff --git a/contrib/tools/python3/src/Lib/asyncio/base_tasks.py b/contrib/tools/python3/src/Lib/asyncio/base_tasks.py
index d16f3f7be2..09bb171a2c 100644
--- a/contrib/tools/python3/src/Lib/asyncio/base_tasks.py
+++ b/contrib/tools/python3/src/Lib/asyncio/base_tasks.py
@@ -12,30 +12,30 @@ def _task_repr_info(task):
# replace status
info[0] = 'cancelling'
- info.insert(1, 'name=%r' % task.get_name())
-
+ info.insert(1, 'name=%r' % task.get_name())
+
coro = coroutines._format_coroutine(task._coro)
- info.insert(2, f'coro=<{coro}>')
+ info.insert(2, f'coro=<{coro}>')
if task._fut_waiter is not None:
- info.insert(3, f'wait_for={task._fut_waiter!r}')
+ info.insert(3, f'wait_for={task._fut_waiter!r}')
return info
def _task_get_stack(task, limit):
frames = []
- if hasattr(task._coro, 'cr_frame'):
- # case 1: 'async def' coroutines
+ if hasattr(task._coro, 'cr_frame'):
+ # case 1: 'async def' coroutines
f = task._coro.cr_frame
- elif hasattr(task._coro, 'gi_frame'):
- # case 2: legacy coroutines
+ elif hasattr(task._coro, 'gi_frame'):
+ # case 2: legacy coroutines
f = task._coro.gi_frame
- elif hasattr(task._coro, 'ag_frame'):
- # case 3: async generators
- f = task._coro.ag_frame
- else:
- # case 4: unknown objects
- f = None
+ elif hasattr(task._coro, 'ag_frame'):
+ # case 3: async generators
+ f = task._coro.ag_frame
+ else:
+ # case 4: unknown objects
+ f = None
if f is not None:
while f is not None:
if limit is not None:
diff --git a/contrib/tools/python3/src/Lib/asyncio/coroutines.py b/contrib/tools/python3/src/Lib/asyncio/coroutines.py
index ea92c8c95b..9664ea74d7 100644
--- a/contrib/tools/python3/src/Lib/asyncio/coroutines.py
+++ b/contrib/tools/python3/src/Lib/asyncio/coroutines.py
@@ -7,7 +7,7 @@ import os
import sys
import traceback
import types
-import warnings
+import warnings
from . import base_futures
from . import constants
@@ -108,9 +108,9 @@ def coroutine(func):
If the coroutine is not yielded from before it is destroyed,
an error message is logged.
"""
- warnings.warn('"@coroutine" decorator is deprecated since Python 3.8, use "async def" instead',
- DeprecationWarning,
- stacklevel=2)
+ warnings.warn('"@coroutine" decorator is deprecated since Python 3.8, use "async def" instead',
+ DeprecationWarning,
+ stacklevel=2)
if inspect.iscoroutinefunction(func):
# In Python 3.5 that's all we need to do for coroutines
# defined with "async def".
diff --git a/contrib/tools/python3/src/Lib/asyncio/events.py b/contrib/tools/python3/src/Lib/asyncio/events.py
index 2e806c1517..413ff2aaa6 100644
--- a/contrib/tools/python3/src/Lib/asyncio/events.py
+++ b/contrib/tools/python3/src/Lib/asyncio/events.py
@@ -3,7 +3,7 @@
__all__ = (
'AbstractEventLoopPolicy',
'AbstractEventLoop', 'AbstractServer',
- 'Handle', 'TimerHandle',
+ 'Handle', 'TimerHandle',
'get_event_loop_policy', 'set_event_loop_policy',
'get_event_loop', 'set_event_loop', 'new_event_loop',
'get_child_watcher', 'set_child_watcher',
@@ -78,9 +78,9 @@ class Handle:
def _run(self):
try:
self._context.run(self._callback, *self._args)
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as exc:
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
cb = format_helpers._format_callback_source(
self._callback, self._args)
msg = f'Exception in callback {cb}'
@@ -118,24 +118,24 @@ class TimerHandle(Handle):
return hash(self._when)
def __lt__(self, other):
- if isinstance(other, TimerHandle):
- return self._when < other._when
- return NotImplemented
+ if isinstance(other, TimerHandle):
+ return self._when < other._when
+ return NotImplemented
def __le__(self, other):
- if isinstance(other, TimerHandle):
- return self._when < other._when or self.__eq__(other)
- return NotImplemented
+ if isinstance(other, TimerHandle):
+ return self._when < other._when or self.__eq__(other)
+ return NotImplemented
def __gt__(self, other):
- if isinstance(other, TimerHandle):
- return self._when > other._when
- return NotImplemented
+ if isinstance(other, TimerHandle):
+ return self._when > other._when
+ return NotImplemented
def __ge__(self, other):
- if isinstance(other, TimerHandle):
- return self._when > other._when or self.__eq__(other)
- return NotImplemented
+ if isinstance(other, TimerHandle):
+ return self._when > other._when or self.__eq__(other)
+ return NotImplemented
def __eq__(self, other):
if isinstance(other, TimerHandle):
@@ -248,23 +248,23 @@ class AbstractEventLoop:
"""Shutdown all active asynchronous generators."""
raise NotImplementedError
- async def shutdown_default_executor(self):
- """Schedule the shutdown of the default executor."""
- raise NotImplementedError
-
+ async def shutdown_default_executor(self):
+ """Schedule the shutdown of the default executor."""
+ raise NotImplementedError
+
# Methods scheduling callbacks. All these return Handles.
def _timer_handle_cancelled(self, handle):
"""Notification that a TimerHandle has been cancelled."""
raise NotImplementedError
- def call_soon(self, callback, *args, context=None):
- return self.call_later(0, callback, *args, context=context)
+ def call_soon(self, callback, *args, context=None):
+ return self.call_later(0, callback, *args, context=context)
- def call_later(self, delay, callback, *args, context=None):
+ def call_later(self, delay, callback, *args, context=None):
raise NotImplementedError
- def call_at(self, when, callback, *args, context=None):
+ def call_at(self, when, callback, *args, context=None):
raise NotImplementedError
def time(self):
@@ -275,15 +275,15 @@ class AbstractEventLoop:
# Method scheduling a coroutine object: create a task.
- def create_task(self, coro, *, name=None):
+ def create_task(self, coro, *, name=None):
raise NotImplementedError
# Methods for interacting with threads.
- def call_soon_threadsafe(self, callback, *args, context=None):
+ def call_soon_threadsafe(self, callback, *args, context=None):
raise NotImplementedError
- def run_in_executor(self, executor, func, *args):
+ def run_in_executor(self, executor, func, *args):
raise NotImplementedError
def set_default_executor(self, executor):
@@ -303,8 +303,8 @@ class AbstractEventLoop:
*, ssl=None, family=0, proto=0,
flags=0, sock=None, local_addr=None,
server_hostname=None,
- ssl_handshake_timeout=None,
- happy_eyeballs_delay=None, interleave=None):
+ ssl_handshake_timeout=None,
+ happy_eyeballs_delay=None, interleave=None):
raise NotImplementedError
async def create_server(
@@ -396,7 +396,7 @@ class AbstractEventLoop:
The return value is a Server object, which can be used to stop
the service.
- path is a str, representing a file system path to bind the
+ path is a str, representing a file system path to bind the
server socket to.
sock can optionally be specified in order to use a preexisting
@@ -465,7 +465,7 @@ class AbstractEventLoop:
# The reason to accept file-like object instead of just file descriptor
# is: we need to own pipe and close it at transport finishing
# Can got complicated errors if pass f.fileno(),
- # close fd in pipe transport then close f and vice versa.
+ # close fd in pipe transport then close f and vice versa.
raise NotImplementedError
async def connect_write_pipe(self, protocol_factory, pipe):
@@ -478,7 +478,7 @@ class AbstractEventLoop:
# The reason to accept file-like object instead of just file descriptor
# is: we need to own pipe and close it at transport finishing
# Can got complicated errors if pass f.fileno(),
- # close fd in pipe transport then close f and vice versa.
+ # close fd in pipe transport then close f and vice versa.
raise NotImplementedError
async def subprocess_shell(self, protocol_factory, cmd, *,
@@ -629,13 +629,13 @@ class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
self._local = self._Local()
def get_event_loop(self):
- """Get the event loop for the current context.
+ """Get the event loop for the current context.
- Returns an instance of EventLoop or raises an exception.
+ Returns an instance of EventLoop or raises an exception.
"""
if (self._local._loop is None and
not self._local._set_called and
- threading.current_thread() is threading.main_thread()):
+ threading.current_thread() is threading.main_thread()):
self.set_event_loop(self.new_event_loop())
if self._local._loop is None:
diff --git a/contrib/tools/python3/src/Lib/asyncio/exceptions.py b/contrib/tools/python3/src/Lib/asyncio/exceptions.py
index 0957fe6138..f07e448657 100644
--- a/contrib/tools/python3/src/Lib/asyncio/exceptions.py
+++ b/contrib/tools/python3/src/Lib/asyncio/exceptions.py
@@ -1,58 +1,58 @@
-"""asyncio exceptions."""
-
-
-__all__ = ('CancelledError', 'InvalidStateError', 'TimeoutError',
- 'IncompleteReadError', 'LimitOverrunError',
- 'SendfileNotAvailableError')
-
-
-class CancelledError(BaseException):
- """The Future or Task was cancelled."""
-
-
-class TimeoutError(Exception):
- """The operation exceeded the given deadline."""
-
-
-class InvalidStateError(Exception):
- """The operation is not allowed in this state."""
-
-
-class SendfileNotAvailableError(RuntimeError):
- """Sendfile syscall is not available.
-
- Raised if OS does not support sendfile syscall for given socket or
- file type.
- """
-
-
-class IncompleteReadError(EOFError):
- """
- Incomplete read error. Attributes:
-
- - partial: read bytes string before the end of stream was reached
- - expected: total number of expected bytes (or None if unknown)
- """
- def __init__(self, partial, expected):
- r_expected = 'undefined' if expected is None else repr(expected)
- super().__init__(f'{len(partial)} bytes read on a total of '
- f'{r_expected} expected bytes')
- self.partial = partial
- self.expected = expected
-
- def __reduce__(self):
- return type(self), (self.partial, self.expected)
-
-
-class LimitOverrunError(Exception):
- """Reached the buffer limit while looking for a separator.
-
- Attributes:
- - consumed: total number of to be consumed bytes.
- """
- def __init__(self, message, consumed):
- super().__init__(message)
- self.consumed = consumed
-
- def __reduce__(self):
- return type(self), (self.args[0], self.consumed)
+"""asyncio exceptions."""
+
+
+__all__ = ('CancelledError', 'InvalidStateError', 'TimeoutError',
+ 'IncompleteReadError', 'LimitOverrunError',
+ 'SendfileNotAvailableError')
+
+
+class CancelledError(BaseException):
+ """The Future or Task was cancelled."""
+
+
+class TimeoutError(Exception):
+ """The operation exceeded the given deadline."""
+
+
+class InvalidStateError(Exception):
+ """The operation is not allowed in this state."""
+
+
+class SendfileNotAvailableError(RuntimeError):
+ """Sendfile syscall is not available.
+
+ Raised if OS does not support sendfile syscall for given socket or
+ file type.
+ """
+
+
+class IncompleteReadError(EOFError):
+ """
+ Incomplete read error. Attributes:
+
+ - partial: read bytes string before the end of stream was reached
+ - expected: total number of expected bytes (or None if unknown)
+ """
+ def __init__(self, partial, expected):
+ r_expected = 'undefined' if expected is None else repr(expected)
+ super().__init__(f'{len(partial)} bytes read on a total of '
+ f'{r_expected} expected bytes')
+ self.partial = partial
+ self.expected = expected
+
+ def __reduce__(self):
+ return type(self), (self.partial, self.expected)
+
+
+class LimitOverrunError(Exception):
+ """Reached the buffer limit while looking for a separator.
+
+ Attributes:
+ - consumed: total number of to be consumed bytes.
+ """
+ def __init__(self, message, consumed):
+ super().__init__(message)
+ self.consumed = consumed
+
+ def __reduce__(self):
+ return type(self), (self.args[0], self.consumed)
diff --git a/contrib/tools/python3/src/Lib/asyncio/futures.py b/contrib/tools/python3/src/Lib/asyncio/futures.py
index d66c12152e..bed4da52fd 100644
--- a/contrib/tools/python3/src/Lib/asyncio/futures.py
+++ b/contrib/tools/python3/src/Lib/asyncio/futures.py
@@ -11,7 +11,7 @@ import sys
from . import base_futures
from . import events
-from . import exceptions
+from . import exceptions
from . import format_helpers
@@ -51,9 +51,9 @@ class Future:
_exception = None
_loop = None
_source_traceback = None
- _cancel_message = None
- # A saved CancelledError for later chaining as an exception context.
- _cancelled_exc = None
+ _cancel_message = None
+ # A saved CancelledError for later chaining as an exception context.
+ _cancelled_exc = None
# This field is used for a dual purpose:
# - Its presence is a marker to declare that a class implements
@@ -106,9 +106,9 @@ class Future:
context['source_traceback'] = self._source_traceback
self._loop.call_exception_handler(context)
- def __class_getitem__(cls, type):
- return cls
-
+ def __class_getitem__(cls, type):
+ return cls
+
@property
def _log_traceback(self):
return self.__log_traceback
@@ -121,27 +121,27 @@ class Future:
def get_loop(self):
"""Return the event loop the Future is bound to."""
- loop = self._loop
- if loop is None:
- raise RuntimeError("Future object is not initialized.")
- return loop
-
- def _make_cancelled_error(self):
- """Create the CancelledError to raise if the Future is cancelled.
-
- This should only be called once when handling a cancellation since
- it erases the saved context exception value.
- """
- if self._cancel_message is None:
- exc = exceptions.CancelledError()
- else:
- exc = exceptions.CancelledError(self._cancel_message)
- exc.__context__ = self._cancelled_exc
- # Remove the reference since we don't need this anymore.
- self._cancelled_exc = None
- return exc
-
- def cancel(self, msg=None):
+ loop = self._loop
+ if loop is None:
+ raise RuntimeError("Future object is not initialized.")
+ return loop
+
+ def _make_cancelled_error(self):
+ """Create the CancelledError to raise if the Future is cancelled.
+
+ This should only be called once when handling a cancellation since
+ it erases the saved context exception value.
+ """
+ if self._cancel_message is None:
+ exc = exceptions.CancelledError()
+ else:
+ exc = exceptions.CancelledError(self._cancel_message)
+ exc.__context__ = self._cancelled_exc
+ # Remove the reference since we don't need this anymore.
+ self._cancelled_exc = None
+ return exc
+
+ def cancel(self, msg=None):
"""Cancel the future and schedule callbacks.
If the future is already done or cancelled, return False. Otherwise,
@@ -152,7 +152,7 @@ class Future:
if self._state != _PENDING:
return False
self._state = _CANCELLED
- self._cancel_message = msg
+ self._cancel_message = msg
self.__schedule_callbacks()
return True
@@ -192,10 +192,10 @@ class Future:
the future is done and has an exception set, this exception is raised.
"""
if self._state == _CANCELLED:
- exc = self._make_cancelled_error()
- raise exc
+ exc = self._make_cancelled_error()
+ raise exc
if self._state != _FINISHED:
- raise exceptions.InvalidStateError('Result is not ready.')
+ raise exceptions.InvalidStateError('Result is not ready.')
self.__log_traceback = False
if self._exception is not None:
raise self._exception
@@ -210,10 +210,10 @@ class Future:
InvalidStateError.
"""
if self._state == _CANCELLED:
- exc = self._make_cancelled_error()
- raise exc
+ exc = self._make_cancelled_error()
+ raise exc
if self._state != _FINISHED:
- raise exceptions.InvalidStateError('Exception is not set.')
+ raise exceptions.InvalidStateError('Exception is not set.')
self.__log_traceback = False
return self._exception
@@ -255,7 +255,7 @@ class Future:
InvalidStateError.
"""
if self._state != _PENDING:
- raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
+ raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
self._result = result
self._state = _FINISHED
self.__schedule_callbacks()
@@ -267,7 +267,7 @@ class Future:
InvalidStateError.
"""
if self._state != _PENDING:
- raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
+ raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
if isinstance(exception, type):
exception = exception()
if type(exception) is StopIteration:
@@ -312,18 +312,18 @@ def _set_result_unless_cancelled(fut, result):
fut.set_result(result)
-def _convert_future_exc(exc):
- exc_class = type(exc)
- if exc_class is concurrent.futures.CancelledError:
- return exceptions.CancelledError(*exc.args)
- elif exc_class is concurrent.futures.TimeoutError:
- return exceptions.TimeoutError(*exc.args)
- elif exc_class is concurrent.futures.InvalidStateError:
- return exceptions.InvalidStateError(*exc.args)
- else:
- return exc
-
-
+def _convert_future_exc(exc):
+ exc_class = type(exc)
+ if exc_class is concurrent.futures.CancelledError:
+ return exceptions.CancelledError(*exc.args)
+ elif exc_class is concurrent.futures.TimeoutError:
+ return exceptions.TimeoutError(*exc.args)
+ elif exc_class is concurrent.futures.InvalidStateError:
+ return exceptions.InvalidStateError(*exc.args)
+ else:
+ return exc
+
+
def _set_concurrent_future_state(concurrent, source):
"""Copy state from a future to a concurrent.futures.Future."""
assert source.done()
@@ -333,7 +333,7 @@ def _set_concurrent_future_state(concurrent, source):
return
exception = source.exception()
if exception is not None:
- concurrent.set_exception(_convert_future_exc(exception))
+ concurrent.set_exception(_convert_future_exc(exception))
else:
result = source.result()
concurrent.set_result(result)
@@ -353,7 +353,7 @@ def _copy_future_state(source, dest):
else:
exception = source.exception()
if exception is not None:
- dest.set_exception(_convert_future_exc(exception))
+ dest.set_exception(_convert_future_exc(exception))
else:
result = source.result()
dest.set_result(result)
diff --git a/contrib/tools/python3/src/Lib/asyncio/locks.py b/contrib/tools/python3/src/Lib/asyncio/locks.py
index 218226dca2..f1ce732478 100644
--- a/contrib/tools/python3/src/Lib/asyncio/locks.py
+++ b/contrib/tools/python3/src/Lib/asyncio/locks.py
@@ -6,7 +6,7 @@ import collections
import warnings
from . import events
-from . import exceptions
+from . import exceptions
class _ContextManagerMixin:
@@ -75,15 +75,15 @@ class Lock(_ContextManagerMixin):
"""
def __init__(self, *, loop=None):
- self._waiters = None
+ self._waiters = None
self._locked = False
- if loop is None:
- self._loop = events.get_event_loop()
- else:
+ if loop is None:
+ self._loop = events.get_event_loop()
+ else:
self._loop = loop
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
+ warnings.warn("The loop argument is deprecated since Python 3.8, "
+ "and scheduled for removal in Python 3.10.",
+ DeprecationWarning, stacklevel=2)
def __repr__(self):
res = super().__repr__()
@@ -102,13 +102,13 @@ class Lock(_ContextManagerMixin):
This method blocks until the lock is unlocked, then sets it to
locked and returns True.
"""
- if (not self._locked and (self._waiters is None or
- all(w.cancelled() for w in self._waiters))):
+ if (not self._locked and (self._waiters is None or
+ all(w.cancelled() for w in self._waiters))):
self._locked = True
return True
- if self._waiters is None:
- self._waiters = collections.deque()
+ if self._waiters is None:
+ self._waiters = collections.deque()
fut = self._loop.create_future()
self._waiters.append(fut)
@@ -120,7 +120,7 @@ class Lock(_ContextManagerMixin):
await fut
finally:
self._waiters.remove(fut)
- except exceptions.CancelledError:
+ except exceptions.CancelledError:
if not self._locked:
self._wake_up_first()
raise
@@ -147,8 +147,8 @@ class Lock(_ContextManagerMixin):
def _wake_up_first(self):
"""Wake up the first waiter if it isn't done."""
- if not self._waiters:
- return
+ if not self._waiters:
+ return
try:
fut = next(iter(self._waiters))
except StopIteration:
@@ -173,13 +173,13 @@ class Event:
def __init__(self, *, loop=None):
self._waiters = collections.deque()
self._value = False
- if loop is None:
- self._loop = events.get_event_loop()
- else:
+ if loop is None:
+ self._loop = events.get_event_loop()
+ else:
self._loop = loop
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
+ warnings.warn("The loop argument is deprecated since Python 3.8, "
+ "and scheduled for removal in Python 3.10.",
+ DeprecationWarning, stacklevel=2)
def __repr__(self):
res = super().__repr__()
@@ -240,16 +240,16 @@ class Condition(_ContextManagerMixin):
"""
def __init__(self, lock=None, *, loop=None):
- if loop is None:
- self._loop = events.get_event_loop()
- else:
+ if loop is None:
+ self._loop = events.get_event_loop()
+ else:
self._loop = loop
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
+ warnings.warn("The loop argument is deprecated since Python 3.8, "
+ "and scheduled for removal in Python 3.10.",
+ DeprecationWarning, stacklevel=2)
if lock is None:
- lock = Lock(loop=loop)
+ lock = Lock(loop=loop)
elif lock._loop is not self._loop:
raise ValueError("loop argument must agree with lock")
@@ -299,11 +299,11 @@ class Condition(_ContextManagerMixin):
try:
await self.acquire()
break
- except exceptions.CancelledError:
+ except exceptions.CancelledError:
cancelled = True
if cancelled:
- raise exceptions.CancelledError
+ raise exceptions.CancelledError
async def wait_for(self, predicate):
"""Wait until a predicate becomes true.
@@ -371,13 +371,13 @@ class Semaphore(_ContextManagerMixin):
raise ValueError("Semaphore initial value must be >= 0")
self._value = value
self._waiters = collections.deque()
- if loop is None:
- self._loop = events.get_event_loop()
- else:
+ if loop is None:
+ self._loop = events.get_event_loop()
+ else:
self._loop = loop
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
+ warnings.warn("The loop argument is deprecated since Python 3.8, "
+ "and scheduled for removal in Python 3.10.",
+ DeprecationWarning, stacklevel=2)
def __repr__(self):
res = super().__repr__()
@@ -437,11 +437,11 @@ class BoundedSemaphore(Semaphore):
"""
def __init__(self, value=1, *, loop=None):
- if loop:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
-
+ if loop:
+ warnings.warn("The loop argument is deprecated since Python 3.8, "
+ "and scheduled for removal in Python 3.10.",
+ DeprecationWarning, stacklevel=2)
+
self._bound_value = value
super().__init__(value, loop=loop)
diff --git a/contrib/tools/python3/src/Lib/asyncio/proactor_events.py b/contrib/tools/python3/src/Lib/asyncio/proactor_events.py
index 33da82ae58..b4cd414b82 100644
--- a/contrib/tools/python3/src/Lib/asyncio/proactor_events.py
+++ b/contrib/tools/python3/src/Lib/asyncio/proactor_events.py
@@ -10,39 +10,39 @@ import io
import os
import socket
import warnings
-import signal
-import threading
-import collections
+import signal
+import threading
+import collections
from . import base_events
from . import constants
from . import futures
-from . import exceptions
+from . import exceptions
from . import protocols
from . import sslproto
from . import transports
-from . import trsock
+from . import trsock
from .log import logger
-def _set_socket_extra(transport, sock):
- transport._extra['socket'] = trsock.TransportSocket(sock)
-
- try:
- transport._extra['sockname'] = sock.getsockname()
- except socket.error:
- if transport._loop.get_debug():
- logger.warning(
- "getsockname() failed on %r", sock, exc_info=True)
-
- if 'peername' not in transport._extra:
- try:
- transport._extra['peername'] = sock.getpeername()
- except socket.error:
- # UDP sockets may not have a peer name
- transport._extra['peername'] = None
-
-
+def _set_socket_extra(transport, sock):
+ transport._extra['socket'] = trsock.TransportSocket(sock)
+
+ try:
+ transport._extra['sockname'] = sock.getsockname()
+ except socket.error:
+ if transport._loop.get_debug():
+ logger.warning(
+ "getsockname() failed on %r", sock, exc_info=True)
+
+ if 'peername' not in transport._extra:
+ try:
+ transport._extra['peername'] = sock.getpeername()
+ except socket.error:
+ # UDP sockets may not have a peer name
+ transport._extra['peername'] = None
+
+
class _ProactorBasePipeTransport(transports._FlowControlMixin,
transports.BaseTransport):
"""Base class for pipe and socket transports."""
@@ -110,14 +110,14 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin,
self._read_fut.cancel()
self._read_fut = None
- def __del__(self, _warn=warnings.warn):
+ def __del__(self, _warn=warnings.warn):
if self._sock is not None:
- _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
+ _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
self.close()
def _fatal_error(self, exc, message='Fatal error on pipe transport'):
try:
- if isinstance(exc, OSError):
+ if isinstance(exc, OSError):
if self._loop.get_debug():
logger.debug("%r: %s", self, message, exc_info=True)
else:
@@ -233,9 +233,9 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
try:
keep_open = self._protocol.eof_received()
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as exc:
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
self._fatal_error(
exc, 'Fatal error: protocol.eof_received() call failed.')
return
@@ -258,9 +258,9 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
if isinstance(self._protocol, protocols.BufferedProtocol):
try:
protocols._feed_data_to_buffered_proto(self._protocol, data)
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as exc:
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
self._fatal_error(exc,
'Fatal error: protocol.buffer_updated() '
'call failed.')
@@ -307,7 +307,7 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
self._force_close(exc)
except OSError as exc:
self._fatal_error(exc, 'Fatal read error on pipe transport')
- except exceptions.CancelledError:
+ except exceptions.CancelledError:
if not self._closing:
raise
else:
@@ -450,134 +450,134 @@ class _ProactorWritePipeTransport(_ProactorBaseWritePipeTransport):
self.close()
-class _ProactorDatagramTransport(_ProactorBasePipeTransport):
- max_size = 256 * 1024
- def __init__(self, loop, sock, protocol, address=None,
- waiter=None, extra=None):
- self._address = address
- self._empty_waiter = None
- # We don't need to call _protocol.connection_made() since our base
- # constructor does it for us.
- super().__init__(loop, sock, protocol, waiter=waiter, extra=extra)
-
- # The base constructor sets _buffer = None, so we set it here
- self._buffer = collections.deque()
- self._loop.call_soon(self._loop_reading)
-
- def _set_extra(self, sock):
- _set_socket_extra(self, sock)
-
- def get_write_buffer_size(self):
- return sum(len(data) for data, _ in self._buffer)
-
- def abort(self):
- self._force_close(None)
-
- def sendto(self, data, addr=None):
- if not isinstance(data, (bytes, bytearray, memoryview)):
- raise TypeError('data argument must be bytes-like object (%r)',
- type(data))
-
- if not data:
- return
-
- if self._address is not None and addr not in (None, self._address):
- raise ValueError(
- f'Invalid address: must be None or {self._address}')
-
- if self._conn_lost and self._address:
- if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
- logger.warning('socket.sendto() raised exception.')
- self._conn_lost += 1
- return
-
- # Ensure that what we buffer is immutable.
- self._buffer.append((bytes(data), addr))
-
- if self._write_fut is None:
- # No current write operations are active, kick one off
- self._loop_writing()
- # else: A write operation is already kicked off
-
- self._maybe_pause_protocol()
-
- def _loop_writing(self, fut=None):
- try:
- if self._conn_lost:
- return
-
- assert fut is self._write_fut
- self._write_fut = None
- if fut:
- # We are in a _loop_writing() done callback, get the result
- fut.result()
-
- if not self._buffer or (self._conn_lost and self._address):
- # The connection has been closed
- if self._closing:
- self._loop.call_soon(self._call_connection_lost, None)
- return
-
- data, addr = self._buffer.popleft()
- if self._address is not None:
- self._write_fut = self._loop._proactor.send(self._sock,
- data)
- else:
- self._write_fut = self._loop._proactor.sendto(self._sock,
- data,
- addr=addr)
- except OSError as exc:
- self._protocol.error_received(exc)
- except Exception as exc:
- self._fatal_error(exc, 'Fatal write error on datagram transport')
- else:
- self._write_fut.add_done_callback(self._loop_writing)
- self._maybe_resume_protocol()
-
- def _loop_reading(self, fut=None):
- data = None
- try:
- if self._conn_lost:
- return
-
- assert self._read_fut is fut or (self._read_fut is None and
- self._closing)
-
- self._read_fut = None
- if fut is not None:
- res = fut.result()
-
- if self._closing:
- # since close() has been called we ignore any read data
- data = None
- return
-
- if self._address is not None:
- data, addr = res, self._address
- else:
- data, addr = res
-
- if self._conn_lost:
- return
- if self._address is not None:
- self._read_fut = self._loop._proactor.recv(self._sock,
- self.max_size)
- else:
- self._read_fut = self._loop._proactor.recvfrom(self._sock,
- self.max_size)
- except OSError as exc:
- self._protocol.error_received(exc)
- except exceptions.CancelledError:
- if not self._closing:
- raise
- else:
- if self._read_fut is not None:
- self._read_fut.add_done_callback(self._loop_reading)
- finally:
- if data:
- self._protocol.datagram_received(data, addr)
-
-
+class _ProactorDatagramTransport(_ProactorBasePipeTransport):
+ max_size = 256 * 1024
+ def __init__(self, loop, sock, protocol, address=None,
+ waiter=None, extra=None):
+ self._address = address
+ self._empty_waiter = None
+ # We don't need to call _protocol.connection_made() since our base
+ # constructor does it for us.
+ super().__init__(loop, sock, protocol, waiter=waiter, extra=extra)
+
+ # The base constructor sets _buffer = None, so we set it here
+ self._buffer = collections.deque()
+ self._loop.call_soon(self._loop_reading)
+
+ def _set_extra(self, sock):
+ _set_socket_extra(self, sock)
+
+ def get_write_buffer_size(self):
+ return sum(len(data) for data, _ in self._buffer)
+
+ def abort(self):
+ self._force_close(None)
+
+ def sendto(self, data, addr=None):
+ if not isinstance(data, (bytes, bytearray, memoryview)):
+ raise TypeError('data argument must be bytes-like object (%r)',
+ type(data))
+
+ if not data:
+ return
+
+ if self._address is not None and addr not in (None, self._address):
+ raise ValueError(
+ f'Invalid address: must be None or {self._address}')
+
+ if self._conn_lost and self._address:
+ if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
+ logger.warning('socket.sendto() raised exception.')
+ self._conn_lost += 1
+ return
+
+ # Ensure that what we buffer is immutable.
+ self._buffer.append((bytes(data), addr))
+
+ if self._write_fut is None:
+ # No current write operations are active, kick one off
+ self._loop_writing()
+ # else: A write operation is already kicked off
+
+ self._maybe_pause_protocol()
+
+ def _loop_writing(self, fut=None):
+ try:
+ if self._conn_lost:
+ return
+
+ assert fut is self._write_fut
+ self._write_fut = None
+ if fut:
+ # We are in a _loop_writing() done callback, get the result
+ fut.result()
+
+ if not self._buffer or (self._conn_lost and self._address):
+ # The connection has been closed
+ if self._closing:
+ self._loop.call_soon(self._call_connection_lost, None)
+ return
+
+ data, addr = self._buffer.popleft()
+ if self._address is not None:
+ self._write_fut = self._loop._proactor.send(self._sock,
+ data)
+ else:
+ self._write_fut = self._loop._proactor.sendto(self._sock,
+ data,
+ addr=addr)
+ except OSError as exc:
+ self._protocol.error_received(exc)
+ except Exception as exc:
+ self._fatal_error(exc, 'Fatal write error on datagram transport')
+ else:
+ self._write_fut.add_done_callback(self._loop_writing)
+ self._maybe_resume_protocol()
+
+ def _loop_reading(self, fut=None):
+ data = None
+ try:
+ if self._conn_lost:
+ return
+
+ assert self._read_fut is fut or (self._read_fut is None and
+ self._closing)
+
+ self._read_fut = None
+ if fut is not None:
+ res = fut.result()
+
+ if self._closing:
+ # since close() has been called we ignore any read data
+ data = None
+ return
+
+ if self._address is not None:
+ data, addr = res, self._address
+ else:
+ data, addr = res
+
+ if self._conn_lost:
+ return
+ if self._address is not None:
+ self._read_fut = self._loop._proactor.recv(self._sock,
+ self.max_size)
+ else:
+ self._read_fut = self._loop._proactor.recvfrom(self._sock,
+ self.max_size)
+ except OSError as exc:
+ self._protocol.error_received(exc)
+ except exceptions.CancelledError:
+ if not self._closing:
+ raise
+ else:
+ if self._read_fut is not None:
+ self._read_fut.add_done_callback(self._loop_reading)
+ finally:
+ if data:
+ self._protocol.datagram_received(data, addr)
+
+
class _ProactorDuplexPipeTransport(_ProactorReadPipeTransport,
_ProactorBaseWritePipeTransport,
transports.Transport):
@@ -603,7 +603,7 @@ class _ProactorSocketTransport(_ProactorReadPipeTransport,
base_events._set_nodelay(sock)
def _set_extra(self, sock):
- _set_socket_extra(self, sock)
+ _set_socket_extra(self, sock)
def can_write_eof(self):
return True
@@ -627,9 +627,9 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
self._accept_futures = {} # socket file descriptor => Future
proactor.set_loop(self)
self._make_self_pipe()
- if threading.current_thread() is threading.main_thread():
- # wakeup fd can only be installed to a file descriptor from the main thread
- signal.set_wakeup_fd(self._csock.fileno())
+ if threading.current_thread() is threading.main_thread():
+ # wakeup fd can only be installed to a file descriptor from the main thread
+ signal.set_wakeup_fd(self._csock.fileno())
def _make_socket_transport(self, sock, protocol, waiter=None,
extra=None, server=None):
@@ -649,11 +649,11 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
extra=extra, server=server)
return ssl_protocol._app_transport
- def _make_datagram_transport(self, sock, protocol,
- address=None, waiter=None, extra=None):
- return _ProactorDatagramTransport(self, sock, protocol, address,
- waiter, extra)
-
+ def _make_datagram_transport(self, sock, protocol,
+ address=None, waiter=None, extra=None):
+ return _ProactorDatagramTransport(self, sock, protocol, address,
+ waiter, extra)
+
def _make_duplex_pipe_transport(self, sock, protocol, waiter=None,
extra=None):
return _ProactorDuplexPipeTransport(self,
@@ -675,8 +675,8 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
if self.is_closed():
return
- if threading.current_thread() is threading.main_thread():
- signal.set_wakeup_fd(-1)
+ if threading.current_thread() is threading.main_thread():
+ signal.set_wakeup_fd(-1)
# Call these methods before closing the event loop (before calling
# BaseEventLoop.close), because they can schedule callbacks with
# call_soon(), which is forbidden when the event loop is closed.
@@ -708,11 +708,11 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
try:
fileno = file.fileno()
except (AttributeError, io.UnsupportedOperation) as err:
- raise exceptions.SendfileNotAvailableError("not a regular file")
+ raise exceptions.SendfileNotAvailableError("not a regular file")
try:
fsize = os.fstat(fileno).st_size
- except OSError:
- raise exceptions.SendfileNotAvailableError("not a regular file")
+ except OSError:
+ raise exceptions.SendfileNotAvailableError("not a regular file")
blocksize = count if count else fsize
if not blocksize:
return 0 # empty file
@@ -766,21 +766,21 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
try:
if f is not None:
f.result() # may raise
- if self._self_reading_future is not f:
- # When we scheduled this Future, we assigned it to
- # _self_reading_future. If it's not there now, something has
- # tried to cancel the loop while this callback was still in the
- # queue (see windows_events.ProactorEventLoop.run_forever). In
- # that case stop here instead of continuing to schedule a new
- # iteration.
- return
+ if self._self_reading_future is not f:
+ # When we scheduled this Future, we assigned it to
+ # _self_reading_future. If it's not there now, something has
+ # tried to cancel the loop while this callback was still in the
+ # queue (see windows_events.ProactorEventLoop.run_forever). In
+ # that case stop here instead of continuing to schedule a new
+ # iteration.
+ return
f = self._proactor.recv(self._ssock, 4096)
- except exceptions.CancelledError:
+ except exceptions.CancelledError:
# _close_self_pipe() has been called, stop waiting for data
return
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as exc:
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
self.call_exception_handler({
'message': 'Error on reading from the event loop self pipe',
'exception': exc,
@@ -791,17 +791,17 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
f.add_done_callback(self._loop_self_reading)
def _write_to_self(self):
- # This may be called from a different thread, possibly after
- # _close_self_pipe() has been called or even while it is
- # running. Guard for self._csock being None or closed. When
- # a socket is closed, send() raises OSError (with errno set to
- # EBADF, but let's not rely on the exact error code).
- csock = self._csock
- if csock is None:
- return
-
+ # This may be called from a different thread, possibly after
+ # _close_self_pipe() has been called or even while it is
+ # running. Guard for self._csock being None or closed. When
+ # a socket is closed, send() raises OSError (with errno set to
+ # EBADF, but let's not rely on the exact error code).
+ csock = self._csock
+ if csock is None:
+ return
+
try:
- csock.send(b'\0')
+ csock.send(b'\0')
except OSError:
if self._debug:
logger.debug("Fail to write a null byte into the "
@@ -837,13 +837,13 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
self.call_exception_handler({
'message': 'Accept failed on a socket',
'exception': exc,
- 'socket': trsock.TransportSocket(sock),
+ 'socket': trsock.TransportSocket(sock),
})
sock.close()
elif self._debug:
logger.debug("Accept failed on socket %r",
sock, exc_info=True)
- except exceptions.CancelledError:
+ except exceptions.CancelledError:
sock.close()
else:
self._accept_futures[sock.fileno()] = f
diff --git a/contrib/tools/python3/src/Lib/asyncio/protocols.py b/contrib/tools/python3/src/Lib/asyncio/protocols.py
index d09f51f4f2..69fa43e8b6 100644
--- a/contrib/tools/python3/src/Lib/asyncio/protocols.py
+++ b/contrib/tools/python3/src/Lib/asyncio/protocols.py
@@ -16,8 +16,8 @@ class BaseProtocol:
write-only transport like write pipe
"""
- __slots__ = ()
-
+ __slots__ = ()
+
def connection_made(self, transport):
"""Called when a connection is made.
@@ -89,8 +89,8 @@ class Protocol(BaseProtocol):
* CL: connection_lost()
"""
- __slots__ = ()
-
+ __slots__ = ()
+
def data_received(self, data):
"""Called when some data is received.
@@ -134,8 +134,8 @@ class BufferedProtocol(BaseProtocol):
* CL: connection_lost()
"""
- __slots__ = ()
-
+ __slots__ = ()
+
def get_buffer(self, sizehint):
"""Called to allocate a new receive buffer.
@@ -166,8 +166,8 @@ class BufferedProtocol(BaseProtocol):
class DatagramProtocol(BaseProtocol):
"""Interface for datagram protocol."""
- __slots__ = ()
-
+ __slots__ = ()
+
def datagram_received(self, data, addr):
"""Called when some datagram is received."""
@@ -181,8 +181,8 @@ class DatagramProtocol(BaseProtocol):
class SubprocessProtocol(BaseProtocol):
"""Interface for protocol for subprocess calls."""
- __slots__ = ()
-
+ __slots__ = ()
+
def pipe_data_received(self, fd, data):
"""Called when the subprocess writes data into stdout/stderr pipe.
diff --git a/contrib/tools/python3/src/Lib/asyncio/queues.py b/contrib/tools/python3/src/Lib/asyncio/queues.py
index 03ca592290..cd3f7c6a56 100644
--- a/contrib/tools/python3/src/Lib/asyncio/queues.py
+++ b/contrib/tools/python3/src/Lib/asyncio/queues.py
@@ -2,7 +2,7 @@ __all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty')
import collections
import heapq
-import warnings
+import warnings
from . import events
from . import locks
@@ -35,9 +35,9 @@ class Queue:
self._loop = events.get_event_loop()
else:
self._loop = loop
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
+ warnings.warn("The loop argument is deprecated since Python 3.8, "
+ "and scheduled for removal in Python 3.10.",
+ DeprecationWarning, stacklevel=2)
self._maxsize = maxsize
# Futures.
@@ -45,7 +45,7 @@ class Queue:
# Futures.
self._putters = collections.deque()
self._unfinished_tasks = 0
- self._finished = locks.Event(loop=loop)
+ self._finished = locks.Event(loop=loop)
self._finished.set()
self._init(maxsize)
@@ -76,9 +76,9 @@ class Queue:
def __str__(self):
return f'<{type(self).__name__} {self._format()}>'
- def __class_getitem__(cls, type):
- return cls
-
+ def __class_getitem__(cls, type):
+ return cls
+
def _format(self):
result = f'maxsize={self._maxsize!r}'
if getattr(self, '_queue', None):
diff --git a/contrib/tools/python3/src/Lib/asyncio/runners.py b/contrib/tools/python3/src/Lib/asyncio/runners.py
index d6cdeef9a6..6920acba38 100644
--- a/contrib/tools/python3/src/Lib/asyncio/runners.py
+++ b/contrib/tools/python3/src/Lib/asyncio/runners.py
@@ -5,8 +5,8 @@ from . import events
from . import tasks
-def run(main, *, debug=None):
- """Execute the coroutine and return the result.
+def run(main, *, debug=None):
+ """Execute the coroutine and return the result.
This function runs the passed coroutine, taking care of
managing the asyncio event loop and finalizing asynchronous
@@ -39,14 +39,14 @@ def run(main, *, debug=None):
loop = events.new_event_loop()
try:
events.set_event_loop(loop)
- if debug is not None:
- loop.set_debug(debug)
+ if debug is not None:
+ loop.set_debug(debug)
return loop.run_until_complete(main)
finally:
try:
_cancel_all_tasks(loop)
loop.run_until_complete(loop.shutdown_asyncgens())
- loop.run_until_complete(loop.shutdown_default_executor())
+ loop.run_until_complete(loop.shutdown_default_executor())
finally:
events.set_event_loop(None)
loop.close()
@@ -61,7 +61,7 @@ def _cancel_all_tasks(loop):
task.cancel()
loop.run_until_complete(
- tasks._gather(*to_cancel, loop=loop, return_exceptions=True))
+ tasks._gather(*to_cancel, loop=loop, return_exceptions=True))
for task in to_cancel:
if task.cancelled():
diff --git a/contrib/tools/python3/src/Lib/asyncio/selector_events.py b/contrib/tools/python3/src/Lib/asyncio/selector_events.py
index 2ecf392b8b..59cb6b1bab 100644
--- a/contrib/tools/python3/src/Lib/asyncio/selector_events.py
+++ b/contrib/tools/python3/src/Lib/asyncio/selector_events.py
@@ -25,7 +25,7 @@ from . import futures
from . import protocols
from . import sslproto
from . import transports
-from . import trsock
+from . import trsock
from .log import logger
@@ -40,11 +40,11 @@ def _test_selector_event(selector, fd, event):
return bool(key.events & event)
-def _check_ssl_socket(sock):
- if ssl is not None and isinstance(sock, ssl.SSLSocket):
- raise TypeError("Socket cannot be of type SSLSocket")
-
-
+def _check_ssl_socket(sock):
+ if ssl is not None and isinstance(sock, ssl.SSLSocket):
+ raise TypeError("Socket cannot be of type SSLSocket")
+
+
class BaseSelectorEventLoop(base_events.BaseEventLoop):
"""Selector event loop.
@@ -133,17 +133,17 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
# a socket is closed, send() raises OSError (with errno set to
# EBADF, but let's not rely on the exact error code).
csock = self._csock
- if csock is None:
- return
-
- try:
- csock.send(b'\0')
- except OSError:
- if self._debug:
- logger.debug("Fail to write a null byte into the "
- "self-pipe socket",
- exc_info=True)
-
+ if csock is None:
+ return
+
+ try:
+ csock.send(b'\0')
+ except OSError:
+ if self._debug:
+ logger.debug("Fail to write a null byte into the "
+ "self-pipe socket",
+ exc_info=True)
+
def _start_serving(self, protocol_factory, sock,
sslcontext=None, server=None, backlog=100,
ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
@@ -179,7 +179,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
self.call_exception_handler({
'message': 'socket.accept() out of system resource',
'exception': exc,
- 'socket': trsock.TransportSocket(sock),
+ 'socket': trsock.TransportSocket(sock),
})
self._remove_reader(sock.fileno())
self.call_later(constants.ACCEPT_RETRY_DELAY,
@@ -216,14 +216,14 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
try:
await waiter
- except BaseException:
+ except BaseException:
transport.close()
raise
- # It's now up to the protocol to handle the connection.
+ # It's now up to the protocol to handle the connection.
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as exc:
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
if self._debug:
context = {
'message':
@@ -268,7 +268,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
(handle, writer))
if reader is not None:
reader.cancel()
- return handle
+ return handle
def _remove_reader(self, fd):
if self.is_closed():
@@ -305,7 +305,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
(reader, handle))
if writer is not None:
writer.cancel()
- return handle
+ return handle
def _remove_writer(self, fd):
"""Remove a writer callback."""
@@ -333,7 +333,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
def add_reader(self, fd, callback, *args):
"""Add a reader callback."""
self._ensure_fd_no_transport(fd)
- self._add_reader(fd, callback, *args)
+ self._add_reader(fd, callback, *args)
def remove_reader(self, fd):
"""Remove a reader callback."""
@@ -343,7 +343,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
def add_writer(self, fd, callback, *args):
"""Add a writer callback.."""
self._ensure_fd_no_transport(fd)
- self._add_writer(fd, callback, *args)
+ self._add_writer(fd, callback, *args)
def remove_writer(self, fd):
"""Remove a writer callback."""
@@ -357,37 +357,37 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
The maximum amount of data to be received at once is specified by
nbytes.
"""
- _check_ssl_socket(sock)
+ _check_ssl_socket(sock)
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
- try:
- return sock.recv(n)
- except (BlockingIOError, InterruptedError):
- pass
+ try:
+ return sock.recv(n)
+ except (BlockingIOError, InterruptedError):
+ pass
fut = self.create_future()
- fd = sock.fileno()
- self._ensure_fd_no_transport(fd)
- handle = self._add_reader(fd, self._sock_recv, fut, sock, n)
- fut.add_done_callback(
- functools.partial(self._sock_read_done, fd, handle=handle))
+ fd = sock.fileno()
+ self._ensure_fd_no_transport(fd)
+ handle = self._add_reader(fd, self._sock_recv, fut, sock, n)
+ fut.add_done_callback(
+ functools.partial(self._sock_read_done, fd, handle=handle))
return await fut
- def _sock_read_done(self, fd, fut, handle=None):
- if handle is None or not handle.cancelled():
- self.remove_reader(fd)
-
- def _sock_recv(self, fut, sock, n):
+ def _sock_read_done(self, fd, fut, handle=None):
+ if handle is None or not handle.cancelled():
+ self.remove_reader(fd)
+
+ def _sock_recv(self, fut, sock, n):
# _sock_recv() can add itself as an I/O callback if the operation can't
# be done immediately. Don't use it directly, call sock_recv().
- if fut.done():
+ if fut.done():
return
try:
data = sock.recv(n)
except (BlockingIOError, InterruptedError):
- return # try again next time
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as exc:
+ return # try again next time
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
fut.set_exception(exc)
else:
fut.set_result(data)
@@ -398,34 +398,34 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
The received data is written into *buf* (a writable buffer).
The return value is the number of bytes written.
"""
- _check_ssl_socket(sock)
+ _check_ssl_socket(sock)
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
- try:
- return sock.recv_into(buf)
- except (BlockingIOError, InterruptedError):
- pass
+ try:
+ return sock.recv_into(buf)
+ except (BlockingIOError, InterruptedError):
+ pass
fut = self.create_future()
- fd = sock.fileno()
- self._ensure_fd_no_transport(fd)
- handle = self._add_reader(fd, self._sock_recv_into, fut, sock, buf)
- fut.add_done_callback(
- functools.partial(self._sock_read_done, fd, handle=handle))
+ fd = sock.fileno()
+ self._ensure_fd_no_transport(fd)
+ handle = self._add_reader(fd, self._sock_recv_into, fut, sock, buf)
+ fut.add_done_callback(
+ functools.partial(self._sock_read_done, fd, handle=handle))
return await fut
- def _sock_recv_into(self, fut, sock, buf):
+ def _sock_recv_into(self, fut, sock, buf):
# _sock_recv_into() can add itself as an I/O callback if the operation
# can't be done immediately. Don't use it directly, call
# sock_recv_into().
- if fut.done():
+ if fut.done():
return
try:
nbytes = sock.recv_into(buf)
except (BlockingIOError, InterruptedError):
- return # try again next time
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as exc:
+ return # try again next time
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
fut.set_exception(exc)
else:
fut.set_result(nbytes)
@@ -439,56 +439,56 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
raised, and there is no way to determine how much data, if any, was
successfully processed by the receiving end of the connection.
"""
- _check_ssl_socket(sock)
+ _check_ssl_socket(sock)
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
- try:
- n = sock.send(data)
- except (BlockingIOError, InterruptedError):
- n = 0
-
- if n == len(data):
- # all data sent
- return
-
+ try:
+ n = sock.send(data)
+ except (BlockingIOError, InterruptedError):
+ n = 0
+
+ if n == len(data):
+ # all data sent
+ return
+
fut = self.create_future()
- fd = sock.fileno()
- self._ensure_fd_no_transport(fd)
- # use a trick with a list in closure to store a mutable state
- handle = self._add_writer(fd, self._sock_sendall, fut, sock,
- memoryview(data), [n])
- fut.add_done_callback(
- functools.partial(self._sock_write_done, fd, handle=handle))
+ fd = sock.fileno()
+ self._ensure_fd_no_transport(fd)
+ # use a trick with a list in closure to store a mutable state
+ handle = self._add_writer(fd, self._sock_sendall, fut, sock,
+ memoryview(data), [n])
+ fut.add_done_callback(
+ functools.partial(self._sock_write_done, fd, handle=handle))
return await fut
- def _sock_sendall(self, fut, sock, view, pos):
- if fut.done():
- # Future cancellation can be scheduled on previous loop iteration
+ def _sock_sendall(self, fut, sock, view, pos):
+ if fut.done():
+ # Future cancellation can be scheduled on previous loop iteration
return
- start = pos[0]
+ start = pos[0]
try:
- n = sock.send(view[start:])
+ n = sock.send(view[start:])
except (BlockingIOError, InterruptedError):
- return
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as exc:
+ return
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
fut.set_exception(exc)
return
- start += n
-
- if start == len(view):
+ start += n
+
+ if start == len(view):
fut.set_result(None)
else:
- pos[0] = start
+ pos[0] = start
async def sock_connect(self, sock, address):
"""Connect to a remote socket at address.
This method is a coroutine.
"""
- _check_ssl_socket(sock)
+ _check_ssl_socket(sock)
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
@@ -510,24 +510,24 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
# connection runs in background. We have to wait until the socket
# becomes writable to be notified when the connection succeed or
# fails.
- self._ensure_fd_no_transport(fd)
- handle = self._add_writer(
- fd, self._sock_connect_cb, fut, sock, address)
+ self._ensure_fd_no_transport(fd)
+ handle = self._add_writer(
+ fd, self._sock_connect_cb, fut, sock, address)
fut.add_done_callback(
- functools.partial(self._sock_write_done, fd, handle=handle))
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as exc:
+ functools.partial(self._sock_write_done, fd, handle=handle))
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
fut.set_exception(exc)
else:
fut.set_result(None)
- def _sock_write_done(self, fd, fut, handle=None):
- if handle is None or not handle.cancelled():
- self.remove_writer(fd)
+ def _sock_write_done(self, fd, fut, handle=None):
+ if handle is None or not handle.cancelled():
+ self.remove_writer(fd)
def _sock_connect_cb(self, fut, sock, address):
- if fut.done():
+ if fut.done():
return
try:
@@ -538,9 +538,9 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
except (BlockingIOError, InterruptedError):
# socket is still registered, the callback will be retried later
pass
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as exc:
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
fut.set_exception(exc)
else:
fut.set_result(None)
@@ -553,26 +553,26 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
object usable to send and receive data on the connection, and address
is the address bound to the socket on the other end of the connection.
"""
- _check_ssl_socket(sock)
+ _check_ssl_socket(sock)
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
fut = self.create_future()
- self._sock_accept(fut, sock)
+ self._sock_accept(fut, sock)
return await fut
- def _sock_accept(self, fut, sock):
+ def _sock_accept(self, fut, sock):
fd = sock.fileno()
try:
conn, address = sock.accept()
conn.setblocking(False)
except (BlockingIOError, InterruptedError):
- self._ensure_fd_no_transport(fd)
- handle = self._add_reader(fd, self._sock_accept, fut, sock)
- fut.add_done_callback(
- functools.partial(self._sock_read_done, fd, handle=handle))
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as exc:
+ self._ensure_fd_no_transport(fd)
+ handle = self._add_reader(fd, self._sock_accept, fut, sock)
+ fut.add_done_callback(
+ functools.partial(self._sock_read_done, fd, handle=handle))
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
fut.set_exception(exc)
else:
fut.set_result((conn, address))
@@ -624,11 +624,11 @@ class _SelectorTransport(transports._FlowControlMixin,
def __init__(self, loop, sock, protocol, extra=None, server=None):
super().__init__(extra, loop)
- self._extra['socket'] = trsock.TransportSocket(sock)
- try:
- self._extra['sockname'] = sock.getsockname()
- except OSError:
- self._extra['sockname'] = None
+ self._extra['socket'] = trsock.TransportSocket(sock)
+ try:
+ self._extra['sockname'] = sock.getsockname()
+ except OSError:
+ self._extra['sockname'] = None
if 'peername' not in self._extra:
try:
self._extra['peername'] = sock.getpeername()
@@ -699,14 +699,14 @@ class _SelectorTransport(transports._FlowControlMixin,
self._loop._remove_writer(self._sock_fd)
self._loop.call_soon(self._call_connection_lost, None)
- def __del__(self, _warn=warnings.warn):
+ def __del__(self, _warn=warnings.warn):
if self._sock is not None:
- _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
+ _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
self._sock.close()
def _fatal_error(self, exc, message='Fatal error on transport'):
# Should be called from exception handler only.
- if isinstance(exc, OSError):
+ if isinstance(exc, OSError):
if self._loop.get_debug():
logger.debug("%r: %s", self, message, exc_info=True)
else:
@@ -820,9 +820,9 @@ class _SelectorSocketTransport(_SelectorTransport):
buf = self._protocol.get_buffer(-1)
if not len(buf):
raise RuntimeError('get_buffer() returned an empty buffer')
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as exc:
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
self._fatal_error(
exc, 'Fatal error: protocol.get_buffer() call failed.')
return
@@ -831,9 +831,9 @@ class _SelectorSocketTransport(_SelectorTransport):
nbytes = self._sock.recv_into(buf)
except (BlockingIOError, InterruptedError):
return
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as exc:
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
self._fatal_error(exc, 'Fatal read error on socket transport')
return
@@ -843,9 +843,9 @@ class _SelectorSocketTransport(_SelectorTransport):
try:
self._protocol.buffer_updated(nbytes)
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as exc:
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
self._fatal_error(
exc, 'Fatal error: protocol.buffer_updated() call failed.')
@@ -856,9 +856,9 @@ class _SelectorSocketTransport(_SelectorTransport):
data = self._sock.recv(self.max_size)
except (BlockingIOError, InterruptedError):
return
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as exc:
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
self._fatal_error(exc, 'Fatal read error on socket transport')
return
@@ -868,9 +868,9 @@ class _SelectorSocketTransport(_SelectorTransport):
try:
self._protocol.data_received(data)
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as exc:
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
self._fatal_error(
exc, 'Fatal error: protocol.data_received() call failed.')
@@ -880,9 +880,9 @@ class _SelectorSocketTransport(_SelectorTransport):
try:
keep_open = self._protocol.eof_received()
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as exc:
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
self._fatal_error(
exc, 'Fatal error: protocol.eof_received() call failed.')
return
@@ -918,9 +918,9 @@ class _SelectorSocketTransport(_SelectorTransport):
n = self._sock.send(data)
except (BlockingIOError, InterruptedError):
pass
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as exc:
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
self._fatal_error(exc, 'Fatal write error on socket transport')
return
else:
@@ -943,9 +943,9 @@ class _SelectorSocketTransport(_SelectorTransport):
n = self._sock.send(self._buffer)
except (BlockingIOError, InterruptedError):
pass
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as exc:
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
self._loop._remove_writer(self._sock_fd)
self._buffer.clear()
self._fatal_error(exc, 'Fatal write error on socket transport')
@@ -1021,9 +1021,9 @@ class _SelectorDatagramTransport(_SelectorTransport):
pass
except OSError as exc:
self._protocol.error_received(exc)
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as exc:
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
self._fatal_error(exc, 'Fatal read error on datagram transport')
else:
self._protocol.datagram_received(data, addr)
@@ -1035,11 +1035,11 @@ class _SelectorDatagramTransport(_SelectorTransport):
if not data:
return
- if self._address:
- if addr not in (None, self._address):
- raise ValueError(
- f'Invalid address: must be None or {self._address}')
- addr = self._address
+ if self._address:
+ if addr not in (None, self._address):
+ raise ValueError(
+ f'Invalid address: must be None or {self._address}')
+ addr = self._address
if self._conn_lost and self._address:
if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
@@ -1050,7 +1050,7 @@ class _SelectorDatagramTransport(_SelectorTransport):
if not self._buffer:
# Attempt to send it right away first.
try:
- if self._extra['peername']:
+ if self._extra['peername']:
self._sock.send(data)
else:
self._sock.sendto(data, addr)
@@ -1060,9 +1060,9 @@ class _SelectorDatagramTransport(_SelectorTransport):
except OSError as exc:
self._protocol.error_received(exc)
return
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as exc:
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
self._fatal_error(
exc, 'Fatal write error on datagram transport')
return
@@ -1075,7 +1075,7 @@ class _SelectorDatagramTransport(_SelectorTransport):
while self._buffer:
data, addr = self._buffer.popleft()
try:
- if self._extra['peername']:
+ if self._extra['peername']:
self._sock.send(data)
else:
self._sock.sendto(data, addr)
@@ -1085,9 +1085,9 @@ class _SelectorDatagramTransport(_SelectorTransport):
except OSError as exc:
self._protocol.error_received(exc)
return
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as exc:
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
self._fatal_error(
exc, 'Fatal write error on datagram transport')
return
diff --git a/contrib/tools/python3/src/Lib/asyncio/sslproto.py b/contrib/tools/python3/src/Lib/asyncio/sslproto.py
index 6731363a7a..cad25b2653 100644
--- a/contrib/tools/python3/src/Lib/asyncio/sslproto.py
+++ b/contrib/tools/python3/src/Lib/asyncio/sslproto.py
@@ -315,9 +315,9 @@ class _SSLProtocolTransport(transports._FlowControlMixin,
self._closed = True
self._ssl_protocol._start_shutdown()
- def __del__(self, _warn=warnings.warn):
+ def __del__(self, _warn=warnings.warn):
if not self._closed:
- _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
+ _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
self.close()
def is_reading(self):
@@ -497,11 +497,11 @@ class SSLProtocol(protocols.Protocol):
self._app_transport._closed = True
self._transport = None
self._app_transport = None
- if getattr(self, '_handshake_timeout_handle', None):
- self._handshake_timeout_handle.cancel()
+ if getattr(self, '_handshake_timeout_handle', None):
+ self._handshake_timeout_handle.cancel()
self._wakeup_waiter(exc)
- self._app_protocol = None
- self._sslpipe = None
+ self._app_protocol = None
+ self._sslpipe = None
def pause_writing(self):
"""Called when the low-level transport's buffer goes over
@@ -526,9 +526,9 @@ class SSLProtocol(protocols.Protocol):
try:
ssldata, appdata = self._sslpipe.feed_ssldata(data)
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as e:
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as e:
self._fatal_error(e, 'SSL error in data received')
return
@@ -543,9 +543,9 @@ class SSLProtocol(protocols.Protocol):
self._app_protocol, chunk)
else:
self._app_protocol.data_received(chunk)
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as ex:
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as ex:
self._fatal_error(
ex, 'application protocol failed to receive SSL data')
return
@@ -631,9 +631,9 @@ class SSLProtocol(protocols.Protocol):
raise handshake_exc
peercert = sslobj.getpeercert()
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as exc:
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
if isinstance(exc, ssl.CertificateError):
msg = 'SSL handshake failed on verifying the certificate'
else:
@@ -696,9 +696,9 @@ class SSLProtocol(protocols.Protocol):
# delete it and reduce the outstanding buffer size.
del self._write_backlog[0]
self._write_buffer_size -= len(data)
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as exc:
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
if self._in_handshake:
# Exceptions will be re-raised in _on_handshake_complete.
self._on_handshake_complete(exc)
@@ -706,7 +706,7 @@ class SSLProtocol(protocols.Protocol):
self._fatal_error(exc, 'Fatal error on SSL transport')
def _fatal_error(self, exc, message='Fatal error on transport'):
- if isinstance(exc, OSError):
+ if isinstance(exc, OSError):
if self._loop.get_debug():
logger.debug("%r: %s", self, message, exc_info=True)
else:
diff --git a/contrib/tools/python3/src/Lib/asyncio/staggered.py b/contrib/tools/python3/src/Lib/asyncio/staggered.py
index 9627efdf11..451a53a16f 100644
--- a/contrib/tools/python3/src/Lib/asyncio/staggered.py
+++ b/contrib/tools/python3/src/Lib/asyncio/staggered.py
@@ -1,149 +1,149 @@
-"""Support for running coroutines in parallel with staggered start times."""
-
-__all__ = 'staggered_race',
-
-import contextlib
-import typing
-
-from . import events
-from . import exceptions as exceptions_mod
-from . import locks
-from . import tasks
-
-
-async def staggered_race(
- coro_fns: typing.Iterable[typing.Callable[[], typing.Awaitable]],
- delay: typing.Optional[float],
- *,
- loop: events.AbstractEventLoop = None,
-) -> typing.Tuple[
- typing.Any,
- typing.Optional[int],
- typing.List[typing.Optional[Exception]]
-]:
- """Run coroutines with staggered start times and take the first to finish.
-
- This method takes an iterable of coroutine functions. The first one is
- started immediately. From then on, whenever the immediately preceding one
- fails (raises an exception), or when *delay* seconds has passed, the next
- coroutine is started. This continues until one of the coroutines complete
- successfully, in which case all others are cancelled, or until all
- coroutines fail.
-
- The coroutines provided should be well-behaved in the following way:
-
- * They should only ``return`` if completed successfully.
-
- * They should always raise an exception if they did not complete
- successfully. In particular, if they handle cancellation, they should
- probably reraise, like this::
-
- try:
- # do work
- except asyncio.CancelledError:
- # undo partially completed work
- raise
-
- Args:
- coro_fns: an iterable of coroutine functions, i.e. callables that
- return a coroutine object when called. Use ``functools.partial`` or
- lambdas to pass arguments.
-
- delay: amount of time, in seconds, between starting coroutines. If
- ``None``, the coroutines will run sequentially.
-
- loop: the event loop to use.
-
- Returns:
- tuple *(winner_result, winner_index, exceptions)* where
-
- - *winner_result*: the result of the winning coroutine, or ``None``
- if no coroutines won.
-
- - *winner_index*: the index of the winning coroutine in
- ``coro_fns``, or ``None`` if no coroutines won. If the winning
- coroutine may return None on success, *winner_index* can be used
- to definitively determine whether any coroutine won.
-
- - *exceptions*: list of exceptions returned by the coroutines.
- ``len(exceptions)`` is equal to the number of coroutines actually
- started, and the order is the same as in ``coro_fns``. The winning
- coroutine's entry is ``None``.
-
- """
- # TODO: when we have aiter() and anext(), allow async iterables in coro_fns.
- loop = loop or events.get_running_loop()
- enum_coro_fns = enumerate(coro_fns)
- winner_result = None
- winner_index = None
- exceptions = []
- running_tasks = []
-
- async def run_one_coro(
- previous_failed: typing.Optional[locks.Event]) -> None:
- # Wait for the previous task to finish, or for delay seconds
- if previous_failed is not None:
- with contextlib.suppress(exceptions_mod.TimeoutError):
- # Use asyncio.wait_for() instead of asyncio.wait() here, so
- # that if we get cancelled at this point, Event.wait() is also
- # cancelled, otherwise there will be a "Task destroyed but it is
- # pending" later.
- await tasks.wait_for(previous_failed.wait(), delay)
- # Get the next coroutine to run
- try:
- this_index, coro_fn = next(enum_coro_fns)
- except StopIteration:
- return
- # Start task that will run the next coroutine
- this_failed = locks.Event()
- next_task = loop.create_task(run_one_coro(this_failed))
- running_tasks.append(next_task)
- assert len(running_tasks) == this_index + 2
- # Prepare place to put this coroutine's exceptions if not won
- exceptions.append(None)
- assert len(exceptions) == this_index + 1
-
- try:
- result = await coro_fn()
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as e:
- exceptions[this_index] = e
- this_failed.set() # Kickstart the next coroutine
- else:
- # Store winner's results
- nonlocal winner_index, winner_result
- assert winner_index is None
- winner_index = this_index
- winner_result = result
- # Cancel all other tasks. We take care to not cancel the current
- # task as well. If we do so, then since there is no `await` after
- # here and CancelledError are usually thrown at one, we will
- # encounter a curious corner case where the current task will end
- # up as done() == True, cancelled() == False, exception() ==
- # asyncio.CancelledError. This behavior is specified in
- # https://bugs.python.org/issue30048
- for i, t in enumerate(running_tasks):
- if i != this_index:
- t.cancel()
-
- first_task = loop.create_task(run_one_coro(None))
- running_tasks.append(first_task)
- try:
- # Wait for a growing list of tasks to all finish: poor man's version of
- # curio's TaskGroup or trio's nursery
- done_count = 0
- while done_count != len(running_tasks):
- done, _ = await tasks.wait(running_tasks)
- done_count = len(done)
- # If run_one_coro raises an unhandled exception, it's probably a
- # programming error, and I want to see it.
- if __debug__:
- for d in done:
- if d.done() and not d.cancelled() and d.exception():
- raise d.exception()
- return winner_result, winner_index, exceptions
- finally:
- # Make sure no tasks are left running if we leave this function
- for t in running_tasks:
- t.cancel()
+"""Support for running coroutines in parallel with staggered start times."""
+
+__all__ = 'staggered_race',
+
+import contextlib
+import typing
+
+from . import events
+from . import exceptions as exceptions_mod
+from . import locks
+from . import tasks
+
+
+async def staggered_race(
+ coro_fns: typing.Iterable[typing.Callable[[], typing.Awaitable]],
+ delay: typing.Optional[float],
+ *,
+ loop: events.AbstractEventLoop = None,
+) -> typing.Tuple[
+ typing.Any,
+ typing.Optional[int],
+ typing.List[typing.Optional[Exception]]
+]:
+ """Run coroutines with staggered start times and take the first to finish.
+
+ This method takes an iterable of coroutine functions. The first one is
+ started immediately. From then on, whenever the immediately preceding one
+ fails (raises an exception), or when *delay* seconds has passed, the next
+ coroutine is started. This continues until one of the coroutines complete
+ successfully, in which case all others are cancelled, or until all
+ coroutines fail.
+
+ The coroutines provided should be well-behaved in the following way:
+
+ * They should only ``return`` if completed successfully.
+
+ * They should always raise an exception if they did not complete
+ successfully. In particular, if they handle cancellation, they should
+ probably reraise, like this::
+
+ try:
+ # do work
+ except asyncio.CancelledError:
+ # undo partially completed work
+ raise
+
+ Args:
+ coro_fns: an iterable of coroutine functions, i.e. callables that
+ return a coroutine object when called. Use ``functools.partial`` or
+ lambdas to pass arguments.
+
+ delay: amount of time, in seconds, between starting coroutines. If
+ ``None``, the coroutines will run sequentially.
+
+ loop: the event loop to use.
+
+ Returns:
+ tuple *(winner_result, winner_index, exceptions)* where
+
+ - *winner_result*: the result of the winning coroutine, or ``None``
+ if no coroutines won.
+
+ - *winner_index*: the index of the winning coroutine in
+ ``coro_fns``, or ``None`` if no coroutines won. If the winning
+ coroutine may return None on success, *winner_index* can be used
+ to definitively determine whether any coroutine won.
+
+ - *exceptions*: list of exceptions returned by the coroutines.
+ ``len(exceptions)`` is equal to the number of coroutines actually
+ started, and the order is the same as in ``coro_fns``. The winning
+ coroutine's entry is ``None``.
+
+ """
+ # TODO: when we have aiter() and anext(), allow async iterables in coro_fns.
+ loop = loop or events.get_running_loop()
+ enum_coro_fns = enumerate(coro_fns)
+ winner_result = None
+ winner_index = None
+ exceptions = []
+ running_tasks = []
+
+ async def run_one_coro(
+ previous_failed: typing.Optional[locks.Event]) -> None:
+ # Wait for the previous task to finish, or for delay seconds
+ if previous_failed is not None:
+ with contextlib.suppress(exceptions_mod.TimeoutError):
+ # Use asyncio.wait_for() instead of asyncio.wait() here, so
+ # that if we get cancelled at this point, Event.wait() is also
+ # cancelled, otherwise there will be a "Task destroyed but it is
+ # pending" later.
+ await tasks.wait_for(previous_failed.wait(), delay)
+ # Get the next coroutine to run
+ try:
+ this_index, coro_fn = next(enum_coro_fns)
+ except StopIteration:
+ return
+ # Start task that will run the next coroutine
+ this_failed = locks.Event()
+ next_task = loop.create_task(run_one_coro(this_failed))
+ running_tasks.append(next_task)
+ assert len(running_tasks) == this_index + 2
+ # Prepare place to put this coroutine's exceptions if not won
+ exceptions.append(None)
+ assert len(exceptions) == this_index + 1
+
+ try:
+ result = await coro_fn()
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as e:
+ exceptions[this_index] = e
+ this_failed.set() # Kickstart the next coroutine
+ else:
+ # Store winner's results
+ nonlocal winner_index, winner_result
+ assert winner_index is None
+ winner_index = this_index
+ winner_result = result
+ # Cancel all other tasks. We take care to not cancel the current
+ # task as well. If we do so, then since there is no `await` after
+ # here and CancelledError are usually thrown at one, we will
+ # encounter a curious corner case where the current task will end
+ # up as done() == True, cancelled() == False, exception() ==
+ # asyncio.CancelledError. This behavior is specified in
+ # https://bugs.python.org/issue30048
+ for i, t in enumerate(running_tasks):
+ if i != this_index:
+ t.cancel()
+
+ first_task = loop.create_task(run_one_coro(None))
+ running_tasks.append(first_task)
+ try:
+ # Wait for a growing list of tasks to all finish: poor man's version of
+ # curio's TaskGroup or trio's nursery
+ done_count = 0
+ while done_count != len(running_tasks):
+ done, _ = await tasks.wait(running_tasks)
+ done_count = len(done)
+ # If run_one_coro raises an unhandled exception, it's probably a
+ # programming error, and I want to see it.
+ if __debug__:
+ for d in done:
+ if d.done() and not d.cancelled() and d.exception():
+ raise d.exception()
+ return winner_result, winner_index, exceptions
+ finally:
+ # Make sure no tasks are left running if we leave this function
+ for t in running_tasks:
+ t.cancel()
diff --git a/contrib/tools/python3/src/Lib/asyncio/streams.py b/contrib/tools/python3/src/Lib/asyncio/streams.py
index 9c2cd4ad6b..3c80bb8892 100644
--- a/contrib/tools/python3/src/Lib/asyncio/streams.py
+++ b/contrib/tools/python3/src/Lib/asyncio/streams.py
@@ -1,19 +1,19 @@
__all__ = (
'StreamReader', 'StreamWriter', 'StreamReaderProtocol',
- 'open_connection', 'start_server')
+ 'open_connection', 'start_server')
import socket
-import sys
-import warnings
-import weakref
+import sys
+import warnings
+import weakref
if hasattr(socket, 'AF_UNIX'):
__all__ += ('open_unix_connection', 'start_unix_server')
from . import coroutines
from . import events
-from . import exceptions
-from . import format_helpers
+from . import exceptions
+from . import format_helpers
from . import protocols
from .log import logger
from .tasks import sleep
@@ -43,10 +43,10 @@ async def open_connection(host=None, port=None, *,
"""
if loop is None:
loop = events.get_event_loop()
- else:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
+ else:
+ warnings.warn("The loop argument is deprecated since Python 3.8, "
+ "and scheduled for removal in Python 3.10.",
+ DeprecationWarning, stacklevel=2)
reader = StreamReader(limit=limit, loop=loop)
protocol = StreamReaderProtocol(reader, loop=loop)
transport, _ = await loop.create_connection(
@@ -80,10 +80,10 @@ async def start_server(client_connected_cb, host=None, port=None, *,
"""
if loop is None:
loop = events.get_event_loop()
- else:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
+ else:
+ warnings.warn("The loop argument is deprecated since Python 3.8, "
+ "and scheduled for removal in Python 3.10.",
+ DeprecationWarning, stacklevel=2)
def factory():
reader = StreamReader(limit=limit, loop=loop)
@@ -102,10 +102,10 @@ if hasattr(socket, 'AF_UNIX'):
"""Similar to `open_connection` but works with UNIX Domain Sockets."""
if loop is None:
loop = events.get_event_loop()
- else:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
+ else:
+ warnings.warn("The loop argument is deprecated since Python 3.8, "
+ "and scheduled for removal in Python 3.10.",
+ DeprecationWarning, stacklevel=2)
reader = StreamReader(limit=limit, loop=loop)
protocol = StreamReaderProtocol(reader, loop=loop)
transport, _ = await loop.create_unix_connection(
@@ -118,10 +118,10 @@ if hasattr(socket, 'AF_UNIX'):
"""Similar to `start_server` but works with UNIX Domain Sockets."""
if loop is None:
loop = events.get_event_loop()
- else:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
+ else:
+ warnings.warn("The loop argument is deprecated since Python 3.8, "
+ "and scheduled for removal in Python 3.10.",
+ DeprecationWarning, stacklevel=2)
def factory():
reader = StreamReader(limit=limit, loop=loop)
@@ -196,10 +196,10 @@ class FlowControlMixin(protocols.Protocol):
self._drain_waiter = waiter
await waiter
- def _get_close_waiter(self, stream):
- raise NotImplementedError
+ def _get_close_waiter(self, stream):
+ raise NotImplementedError
+
-
class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
"""Helper class to adapt between Protocol and StreamReader.
@@ -209,86 +209,86 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
call inappropriate methods of the protocol.)
"""
- _source_traceback = None
-
+ _source_traceback = None
+
def __init__(self, stream_reader, client_connected_cb=None, loop=None):
super().__init__(loop=loop)
- if stream_reader is not None:
- self._stream_reader_wr = weakref.ref(stream_reader)
- self._source_traceback = stream_reader._source_traceback
- else:
- self._stream_reader_wr = None
- if client_connected_cb is not None:
- # This is a stream created by the `create_server()` function.
- # Keep a strong reference to the reader until a connection
- # is established.
- self._strong_reader = stream_reader
- self._reject_connection = False
+ if stream_reader is not None:
+ self._stream_reader_wr = weakref.ref(stream_reader)
+ self._source_traceback = stream_reader._source_traceback
+ else:
+ self._stream_reader_wr = None
+ if client_connected_cb is not None:
+ # This is a stream created by the `create_server()` function.
+ # Keep a strong reference to the reader until a connection
+ # is established.
+ self._strong_reader = stream_reader
+ self._reject_connection = False
self._stream_writer = None
- self._transport = None
+ self._transport = None
self._client_connected_cb = client_connected_cb
self._over_ssl = False
self._closed = self._loop.create_future()
- @property
- def _stream_reader(self):
- if self._stream_reader_wr is None:
- return None
- return self._stream_reader_wr()
-
+ @property
+ def _stream_reader(self):
+ if self._stream_reader_wr is None:
+ return None
+ return self._stream_reader_wr()
+
def connection_made(self, transport):
- if self._reject_connection:
- context = {
- 'message': ('An open stream was garbage collected prior to '
- 'establishing network connection; '
- 'call "stream.close()" explicitly.')
- }
- if self._source_traceback:
- context['source_traceback'] = self._source_traceback
- self._loop.call_exception_handler(context)
- transport.abort()
- return
- self._transport = transport
- reader = self._stream_reader
- if reader is not None:
- reader.set_transport(transport)
+ if self._reject_connection:
+ context = {
+ 'message': ('An open stream was garbage collected prior to '
+ 'establishing network connection; '
+ 'call "stream.close()" explicitly.')
+ }
+ if self._source_traceback:
+ context['source_traceback'] = self._source_traceback
+ self._loop.call_exception_handler(context)
+ transport.abort()
+ return
+ self._transport = transport
+ reader = self._stream_reader
+ if reader is not None:
+ reader.set_transport(transport)
self._over_ssl = transport.get_extra_info('sslcontext') is not None
if self._client_connected_cb is not None:
self._stream_writer = StreamWriter(transport, self,
- reader,
+ reader,
self._loop)
- res = self._client_connected_cb(reader,
+ res = self._client_connected_cb(reader,
self._stream_writer)
if coroutines.iscoroutine(res):
self._loop.create_task(res)
- self._strong_reader = None
+ self._strong_reader = None
def connection_lost(self, exc):
- reader = self._stream_reader
- if reader is not None:
+ reader = self._stream_reader
+ if reader is not None:
if exc is None:
- reader.feed_eof()
+ reader.feed_eof()
else:
- reader.set_exception(exc)
+ reader.set_exception(exc)
if not self._closed.done():
if exc is None:
self._closed.set_result(None)
else:
self._closed.set_exception(exc)
super().connection_lost(exc)
- self._stream_reader_wr = None
+ self._stream_reader_wr = None
self._stream_writer = None
- self._transport = None
+ self._transport = None
def data_received(self, data):
- reader = self._stream_reader
- if reader is not None:
- reader.feed_data(data)
+ reader = self._stream_reader
+ if reader is not None:
+ reader.feed_data(data)
def eof_received(self):
- reader = self._stream_reader
- if reader is not None:
- reader.feed_eof()
+ reader = self._stream_reader
+ if reader is not None:
+ reader.feed_eof()
if self._over_ssl:
# Prevent a warning in SSLProtocol.eof_received:
# "returning true from eof_received()
@@ -296,9 +296,9 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
return False
return True
- def _get_close_waiter(self, stream):
- return self._closed
-
+ def _get_close_waiter(self, stream):
+ return self._closed
+
def __del__(self):
# Prevent reports about unhandled exceptions.
# Better than self._closed._log_traceback = False hack
@@ -324,8 +324,8 @@ class StreamWriter:
assert reader is None or isinstance(reader, StreamReader)
self._reader = reader
self._loop = loop
- self._complete_fut = self._loop.create_future()
- self._complete_fut.set_result(None)
+ self._complete_fut = self._loop.create_future()
+ self._complete_fut.set_result(None)
def __repr__(self):
info = [self.__class__.__name__, f'transport={self._transport!r}']
@@ -356,7 +356,7 @@ class StreamWriter:
return self._transport.is_closing()
async def wait_closed(self):
- await self._protocol._get_close_waiter(self)
+ await self._protocol._get_close_waiter(self)
def get_extra_info(self, name, default=None):
return self._transport.get_extra_info(name, default)
@@ -374,23 +374,23 @@ class StreamWriter:
if exc is not None:
raise exc
if self._transport.is_closing():
- # Wait for protocol.connection_lost() call
- # Raise connection closing error if any,
- # ConnectionResetError otherwise
+ # Wait for protocol.connection_lost() call
+ # Raise connection closing error if any,
+ # ConnectionResetError otherwise
# Yield to the event loop so connection_lost() may be
# called. Without this, _drain_helper() would return
# immediately, and code that calls
# write(...); await drain()
# in a loop would never call connection_lost(), so it
# would not see an error when the socket is closed.
- await sleep(0)
+ await sleep(0)
await self._protocol._drain_helper()
class StreamReader:
- _source_traceback = None
-
+ _source_traceback = None
+
def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
# The line length limit is a security feature;
# it also doubles as half the buffer limit.
@@ -409,9 +409,9 @@ class StreamReader:
self._exception = None
self._transport = None
self._paused = False
- if self._loop.get_debug():
- self._source_traceback = format_helpers.extract_stack(
- sys._getframe(1))
+ if self._loop.get_debug():
+ self._source_traceback = format_helpers.extract_stack(
+ sys._getframe(1))
def __repr__(self):
info = ['StreamReader']
@@ -538,9 +538,9 @@ class StreamReader:
seplen = len(sep)
try:
line = await self.readuntil(sep)
- except exceptions.IncompleteReadError as e:
+ except exceptions.IncompleteReadError as e:
return e.partial
- except exceptions.LimitOverrunError as e:
+ except exceptions.LimitOverrunError as e:
if self._buffer.startswith(sep, e.consumed):
del self._buffer[:e.consumed + seplen]
else:
@@ -615,7 +615,7 @@ class StreamReader:
# see upper comment for explanation.
offset = buflen + 1 - seplen
if offset > self._limit:
- raise exceptions.LimitOverrunError(
+ raise exceptions.LimitOverrunError(
'Separator is not found, and chunk exceed the limit',
offset)
@@ -626,13 +626,13 @@ class StreamReader:
if self._eof:
chunk = bytes(self._buffer)
self._buffer.clear()
- raise exceptions.IncompleteReadError(chunk, None)
+ raise exceptions.IncompleteReadError(chunk, None)
# _wait_for_data() will resume reading if stream was paused.
await self._wait_for_data('readuntil')
if isep > self._limit:
- raise exceptions.LimitOverrunError(
+ raise exceptions.LimitOverrunError(
'Separator is found, but chunk is longer than limit', isep)
chunk = self._buffer[:isep + seplen]
@@ -718,7 +718,7 @@ class StreamReader:
if self._eof:
incomplete = bytes(self._buffer)
self._buffer.clear()
- raise exceptions.IncompleteReadError(incomplete, n)
+ raise exceptions.IncompleteReadError(incomplete, n)
await self._wait_for_data('readexactly')
diff --git a/contrib/tools/python3/src/Lib/asyncio/subprocess.py b/contrib/tools/python3/src/Lib/asyncio/subprocess.py
index 73b0713687..820304ecca 100644
--- a/contrib/tools/python3/src/Lib/asyncio/subprocess.py
+++ b/contrib/tools/python3/src/Lib/asyncio/subprocess.py
@@ -1,7 +1,7 @@
__all__ = 'create_subprocess_exec', 'create_subprocess_shell'
import subprocess
-import warnings
+import warnings
from . import events
from . import protocols
@@ -26,7 +26,7 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
self._transport = None
self._process_exited = False
self._pipe_fds = []
- self._stdin_closed = self._loop.create_future()
+ self._stdin_closed = self._loop.create_future()
def __repr__(self):
info = [self.__class__.__name__]
@@ -78,10 +78,10 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
if pipe is not None:
pipe.close()
self.connection_lost(exc)
- if exc is None:
- self._stdin_closed.set_result(None)
- else:
- self._stdin_closed.set_exception(exc)
+ if exc is None:
+ self._stdin_closed.set_result(None)
+ else:
+ self._stdin_closed.set_exception(exc)
return
if fd == 1:
reader = self.stdout
@@ -108,11 +108,11 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
self._transport.close()
self._transport = None
- def _get_close_waiter(self, stream):
- if stream is self.stdin:
- return self._stdin_closed
+ def _get_close_waiter(self, stream):
+ if stream is self.stdin:
+ return self._stdin_closed
+
-
class Process:
def __init__(self, transport, protocol, loop):
self._transport = transport
@@ -193,8 +193,8 @@ class Process:
stderr = self._read_stream(2)
else:
stderr = self._noop()
- stdin, stdout, stderr = await tasks._gather(stdin, stdout, stderr,
- loop=self._loop)
+ stdin, stdout, stderr = await tasks._gather(stdin, stdout, stderr,
+ loop=self._loop)
await self.wait()
return (stdout, stderr)
@@ -204,13 +204,13 @@ async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
**kwds):
if loop is None:
loop = events.get_event_loop()
- else:
- warnings.warn("The loop argument is deprecated since Python 3.8 "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning,
- stacklevel=2
- )
-
+ else:
+ warnings.warn("The loop argument is deprecated since Python 3.8 "
+ "and scheduled for removal in Python 3.10.",
+ DeprecationWarning,
+ stacklevel=2
+ )
+
protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
loop=loop)
transport, protocol = await loop.subprocess_shell(
@@ -225,12 +225,12 @@ async def create_subprocess_exec(program, *args, stdin=None, stdout=None,
limit=streams._DEFAULT_LIMIT, **kwds):
if loop is None:
loop = events.get_event_loop()
- else:
- warnings.warn("The loop argument is deprecated since Python 3.8 "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning,
- stacklevel=2
- )
+ else:
+ warnings.warn("The loop argument is deprecated since Python 3.8 "
+ "and scheduled for removal in Python 3.10.",
+ DeprecationWarning,
+ stacklevel=2
+ )
protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
loop=loop)
transport, protocol = await loop.subprocess_exec(
diff --git a/contrib/tools/python3/src/Lib/asyncio/tasks.py b/contrib/tools/python3/src/Lib/asyncio/tasks.py
index d378a369ba..27a3c8c5a8 100644
--- a/contrib/tools/python3/src/Lib/asyncio/tasks.py
+++ b/contrib/tools/python3/src/Lib/asyncio/tasks.py
@@ -13,7 +13,7 @@ import concurrent.futures
import contextvars
import functools
import inspect
-import itertools
+import itertools
import types
import warnings
import weakref
@@ -21,16 +21,16 @@ import weakref
from . import base_tasks
from . import coroutines
from . import events
-from . import exceptions
+from . import exceptions
from . import futures
-from .coroutines import _is_coroutine
+from .coroutines import _is_coroutine
+
+# Helper to generate new task names
+# This uses itertools.count() instead of a "+= 1" operation because the latter
+# is not thread safe. See bpo-11866 for a longer explanation.
+_task_name_counter = itertools.count(1).__next__
-# Helper to generate new task names
-# This uses itertools.count() instead of a "+= 1" operation because the latter
-# is not thread safe. See bpo-11866 for a longer explanation.
-_task_name_counter = itertools.count(1).__next__
-
def current_task(loop=None):
"""Return a currently executed task."""
if loop is None:
@@ -42,22 +42,22 @@ def all_tasks(loop=None):
"""Return a set of all tasks for the loop."""
if loop is None:
loop = events.get_running_loop()
- # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
- # thread while we do so. Therefore we cast it to list prior to filtering. The list
- # cast itself requires iteration, so we repeat it several times ignoring
- # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
- # details.
- i = 0
- while True:
- try:
- tasks = list(_all_tasks)
- except RuntimeError:
- i += 1
- if i >= 1000:
- raise
- else:
- break
- return {t for t in tasks
+ # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
+ # thread while we do so. Therefore we cast it to list prior to filtering. The list
+ # cast itself requires iteration, so we repeat it several times ignoring
+ # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
+ # details.
+ i = 0
+ while True:
+ try:
+ tasks = list(_all_tasks)
+ except RuntimeError:
+ i += 1
+ if i >= 1000:
+ raise
+ else:
+ break
+ return {t for t in tasks
if futures._get_loop(t) is loop and not t.done()}
@@ -67,34 +67,34 @@ def _all_tasks_compat(loop=None):
# method.
if loop is None:
loop = events.get_event_loop()
- # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
- # thread while we do so. Therefore we cast it to list prior to filtering. The list
- # cast itself requires iteration, so we repeat it several times ignoring
- # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
- # details.
- i = 0
- while True:
- try:
- tasks = list(_all_tasks)
- except RuntimeError:
- i += 1
- if i >= 1000:
- raise
- else:
- break
- return {t for t in tasks if futures._get_loop(t) is loop}
-
-
-def _set_task_name(task, name):
- if name is not None:
- try:
- set_name = task.set_name
- except AttributeError:
- pass
- else:
- set_name(name)
-
-
+ # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
+ # thread while we do so. Therefore we cast it to list prior to filtering. The list
+ # cast itself requires iteration, so we repeat it several times ignoring
+ # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
+ # details.
+ i = 0
+ while True:
+ try:
+ tasks = list(_all_tasks)
+ except RuntimeError:
+ i += 1
+ if i >= 1000:
+ raise
+ else:
+ break
+ return {t for t in tasks if futures._get_loop(t) is loop}
+
+
+def _set_task_name(task, name):
+ if name is not None:
+ try:
+ set_name = task.set_name
+ except AttributeError:
+ pass
+ else:
+ set_name(name)
+
+
class Task(futures._PyFuture): # Inherit Python Task implementation
# from a Python Future implementation.
@@ -113,7 +113,7 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
# status is still pending
_log_destroy_pending = True
- def __init__(self, coro, *, loop=None, name=None):
+ def __init__(self, coro, *, loop=None, name=None):
super().__init__(loop=loop)
if self._source_traceback:
del self._source_traceback[-1]
@@ -123,11 +123,11 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
self._log_destroy_pending = False
raise TypeError(f"a coroutine was expected, got {coro!r}")
- if name is None:
- self._name = f'Task-{_task_name_counter()}'
- else:
- self._name = str(name)
-
+ if name is None:
+ self._name = f'Task-{_task_name_counter()}'
+ else:
+ self._name = str(name)
+
self._must_cancel = False
self._fut_waiter = None
self._coro = coro
@@ -147,21 +147,21 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
self._loop.call_exception_handler(context)
super().__del__()
- def __class_getitem__(cls, type):
- return cls
-
+ def __class_getitem__(cls, type):
+ return cls
+
def _repr_info(self):
return base_tasks._task_repr_info(self)
- def get_coro(self):
- return self._coro
-
- def get_name(self):
- return self._name
-
- def set_name(self, value):
- self._name = str(value)
-
+ def get_coro(self):
+ return self._coro
+
+ def get_name(self):
+ return self._name
+
+ def set_name(self, value):
+ self._name = str(value)
+
def set_result(self, result):
raise RuntimeError('Task does not support set_result operation')
@@ -202,7 +202,7 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
"""
return base_tasks._task_print_stack(self, limit, file)
- def cancel(self, msg=None):
+ def cancel(self, msg=None):
"""Request that this task cancel itself.
This arranges for a CancelledError to be thrown into the
@@ -226,23 +226,23 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
if self.done():
return False
if self._fut_waiter is not None:
- if self._fut_waiter.cancel(msg=msg):
+ if self._fut_waiter.cancel(msg=msg):
# Leave self._fut_waiter; it may be a Task that
# catches and ignores the cancellation so we may have
# to cancel it again later.
return True
# It must be the case that self.__step is already scheduled.
self._must_cancel = True
- self._cancel_message = msg
+ self._cancel_message = msg
return True
def __step(self, exc=None):
if self.done():
- raise exceptions.InvalidStateError(
+ raise exceptions.InvalidStateError(
f'_step(): already done: {self!r}, {exc!r}')
if self._must_cancel:
- if not isinstance(exc, exceptions.CancelledError):
- exc = self._make_cancelled_error()
+ if not isinstance(exc, exceptions.CancelledError):
+ exc = self._make_cancelled_error()
self._must_cancel = False
coro = self._coro
self._fut_waiter = None
@@ -260,16 +260,16 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
if self._must_cancel:
# Task is cancelled right before coro stops.
self._must_cancel = False
- super().cancel(msg=self._cancel_message)
+ super().cancel(msg=self._cancel_message)
else:
super().set_result(exc.value)
- except exceptions.CancelledError as exc:
- # Save the original exception so we can chain it later.
- self._cancelled_exc = exc
+ except exceptions.CancelledError as exc:
+ # Save the original exception so we can chain it later.
+ self._cancelled_exc = exc
super().cancel() # I.e., Future.cancel(self).
- except (KeyboardInterrupt, SystemExit) as exc:
+ except (KeyboardInterrupt, SystemExit) as exc:
super().set_exception(exc)
- raise
+ raise
except BaseException as exc:
super().set_exception(exc)
else:
@@ -294,8 +294,8 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
self.__wakeup, context=self._context)
self._fut_waiter = result
if self._must_cancel:
- if self._fut_waiter.cancel(
- msg=self._cancel_message):
+ if self._fut_waiter.cancel(
+ msg=self._cancel_message):
self._must_cancel = False
else:
new_exc = RuntimeError(
@@ -326,7 +326,7 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
def __wakeup(self, future):
try:
future.result()
- except BaseException as exc:
+ except BaseException as exc:
# This may also be a cancellation.
self.__step(exc)
else:
@@ -352,15 +352,15 @@ else:
Task = _CTask = _asyncio.Task
-def create_task(coro, *, name=None):
+def create_task(coro, *, name=None):
"""Schedule the execution of a coroutine object in a spawn task.
Return a Task object.
"""
loop = events.get_running_loop()
- task = loop.create_task(coro)
- _set_task_name(task, name)
- return task
+ task = loop.create_task(coro)
+ _set_task_name(task, name)
+ return task
# wait() and as_completed() similar to those in PEP 3148.
@@ -373,7 +373,7 @@ ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
"""Wait for the Futures and coroutines given by fs to complete.
- The fs iterable must not be empty.
+ The fs iterable must not be empty.
Coroutines will be wrapped in Tasks.
@@ -394,22 +394,22 @@ async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
raise ValueError(f'Invalid return_when value: {return_when}')
if loop is None:
- loop = events.get_running_loop()
- else:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
-
- fs = set(fs)
-
- if any(coroutines.iscoroutine(f) for f in fs):
- warnings.warn("The explicit passing of coroutine objects to "
- "asyncio.wait() is deprecated since Python 3.8, and "
- "scheduled for removal in Python 3.11.",
- DeprecationWarning, stacklevel=2)
-
- fs = {ensure_future(f, loop=loop) for f in fs}
-
+ loop = events.get_running_loop()
+ else:
+ warnings.warn("The loop argument is deprecated since Python 3.8, "
+ "and scheduled for removal in Python 3.10.",
+ DeprecationWarning, stacklevel=2)
+
+ fs = set(fs)
+
+ if any(coroutines.iscoroutine(f) for f in fs):
+ warnings.warn("The explicit passing of coroutine objects to "
+ "asyncio.wait() is deprecated since Python 3.8, and "
+ "scheduled for removal in Python 3.11.",
+ DeprecationWarning, stacklevel=2)
+
+ fs = {ensure_future(f, loop=loop) for f in fs}
+
return await _wait(fs, timeout, return_when, loop)
@@ -432,11 +432,11 @@ async def wait_for(fut, timeout, *, loop=None):
This function is a coroutine.
"""
if loop is None:
- loop = events.get_running_loop()
- else:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
+ loop = events.get_running_loop()
+ else:
+ warnings.warn("The loop argument is deprecated since Python 3.8, "
+ "and scheduled for removal in Python 3.10.",
+ DeprecationWarning, stacklevel=2)
if timeout is None:
return await fut
@@ -447,11 +447,11 @@ async def wait_for(fut, timeout, *, loop=None):
if fut.done():
return fut.result()
- await _cancel_and_wait(fut, loop=loop)
- try:
- return fut.result()
- except exceptions.CancelledError as exc:
- raise exceptions.TimeoutError() from exc
+ await _cancel_and_wait(fut, loop=loop)
+ try:
+ return fut.result()
+ except exceptions.CancelledError as exc:
+ raise exceptions.TimeoutError() from exc
waiter = loop.create_future()
timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
@@ -464,16 +464,16 @@ async def wait_for(fut, timeout, *, loop=None):
# wait until the future completes or the timeout
try:
await waiter
- except exceptions.CancelledError:
- if fut.done():
- return fut.result()
- else:
- fut.remove_done_callback(cb)
- # We must ensure that the task is not running
- # after wait_for() returns.
- # See https://bugs.python.org/issue32751
- await _cancel_and_wait(fut, loop=loop)
- raise
+ except exceptions.CancelledError:
+ if fut.done():
+ return fut.result()
+ else:
+ fut.remove_done_callback(cb)
+ # We must ensure that the task is not running
+ # after wait_for() returns.
+ # See https://bugs.python.org/issue32751
+ await _cancel_and_wait(fut, loop=loop)
+ raise
if fut.done():
return fut.result()
@@ -483,13 +483,13 @@ async def wait_for(fut, timeout, *, loop=None):
# after wait_for() returns.
# See https://bugs.python.org/issue32751
await _cancel_and_wait(fut, loop=loop)
- # In case task cancellation failed with some
- # exception, we should re-raise it
- # See https://bugs.python.org/issue40607
- try:
- return fut.result()
- except exceptions.CancelledError as exc:
- raise exceptions.TimeoutError() from exc
+ # In case task cancellation failed with some
+ # exception, we should re-raise it
+ # See https://bugs.python.org/issue40607
+ try:
+ return fut.result()
+ except exceptions.CancelledError as exc:
+ raise exceptions.TimeoutError() from exc
finally:
timeout_handle.cancel()
@@ -526,8 +526,8 @@ async def _wait(fs, timeout, return_when, loop):
finally:
if timeout_handle is not None:
timeout_handle.cancel()
- for f in fs:
- f.remove_done_callback(_on_completion)
+ for f in fs:
+ f.remove_done_callback(_on_completion)
done, pending = set(), set()
for f in fs:
@@ -574,19 +574,19 @@ def as_completed(fs, *, loop=None, timeout=None):
Note: The futures 'f' are not necessarily members of fs.
"""
if futures.isfuture(fs) or coroutines.iscoroutine(fs):
- raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}")
-
- if loop is not None:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
-
+ raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}")
+
+ if loop is not None:
+ warnings.warn("The loop argument is deprecated since Python 3.8, "
+ "and scheduled for removal in Python 3.10.",
+ DeprecationWarning, stacklevel=2)
+
from .queues import Queue # Import here to avoid circular import problem.
done = Queue(loop=loop)
-
- if loop is None:
- loop = events.get_event_loop()
- todo = {ensure_future(f, loop=loop) for f in set(fs)}
+
+ if loop is None:
+ loop = events.get_event_loop()
+ todo = {ensure_future(f, loop=loop) for f in set(fs)}
timeout_handle = None
def _on_timeout():
@@ -607,7 +607,7 @@ def as_completed(fs, *, loop=None, timeout=None):
f = await done.get()
if f is None:
# Dummy value from _on_timeout().
- raise exceptions.TimeoutError
+ raise exceptions.TimeoutError
return f.result() # May raise f.exception().
for f in todo:
@@ -632,18 +632,18 @@ def __sleep0():
async def sleep(delay, result=None, *, loop=None):
"""Coroutine that completes after a given time (in seconds)."""
- if loop is not None:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
-
+ if loop is not None:
+ warnings.warn("The loop argument is deprecated since Python 3.8, "
+ "and scheduled for removal in Python 3.10.",
+ DeprecationWarning, stacklevel=2)
+
if delay <= 0:
await __sleep0()
return result
if loop is None:
- loop = events.get_running_loop()
-
+ loop = events.get_running_loop()
+
future = loop.create_future()
h = loop.call_later(delay,
futures._set_result_unless_cancelled,
@@ -668,8 +668,8 @@ def ensure_future(coro_or_future, *, loop=None):
return task
elif futures.isfuture(coro_or_future):
if loop is not None and loop is not futures._get_loop(coro_or_future):
- raise ValueError('The future belongs to a different loop than '
- 'the one specified as the loop argument')
+ raise ValueError('The future belongs to a different loop than '
+ 'the one specified as the loop argument')
return coro_or_future
elif inspect.isawaitable(coro_or_future):
return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
@@ -678,7 +678,7 @@ def ensure_future(coro_or_future, *, loop=None):
'required')
-@types.coroutine
+@types.coroutine
def _wrap_awaitable(awaitable):
"""Helper for asyncio.ensure_future().
@@ -687,9 +687,9 @@ def _wrap_awaitable(awaitable):
"""
return (yield from awaitable.__await__())
-_wrap_awaitable._is_coroutine = _is_coroutine
+_wrap_awaitable._is_coroutine = _is_coroutine
+
-
class _GatheringFuture(futures.Future):
"""Helper for gather().
@@ -703,12 +703,12 @@ class _GatheringFuture(futures.Future):
self._children = children
self._cancel_requested = False
- def cancel(self, msg=None):
+ def cancel(self, msg=None):
if self.done():
return False
ret = False
for child in self._children:
- if child.cancel(msg=msg):
+ if child.cancel(msg=msg):
ret = True
if ret:
# If any child tasks were actually cancelled, we should
@@ -740,23 +740,23 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False):
the outer Future is *not* cancelled in this case. (This is to
prevent the cancellation of one child to cause other children to
be cancelled.)
-
- If *return_exceptions* is False, cancelling gather() after it
- has been marked done won't cancel any submitted awaitables.
- For instance, gather can be marked done after propagating an
- exception to the caller, therefore, calling ``gather.cancel()``
- after catching an exception (raised by one of the awaitables) from
- gather won't cancel any other awaitables.
+
+ If *return_exceptions* is False, cancelling gather() after it
+ has been marked done won't cancel any submitted awaitables.
+ For instance, gather can be marked done after propagating an
+ exception to the caller, therefore, calling ``gather.cancel()``
+ after catching an exception (raised by one of the awaitables) from
+ gather won't cancel any other awaitables.
"""
- if loop is not None:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
-
- return _gather(*coros_or_futures, loop=loop, return_exceptions=return_exceptions)
-
-
-def _gather(*coros_or_futures, loop=None, return_exceptions=False):
+ if loop is not None:
+ warnings.warn("The loop argument is deprecated since Python 3.8, "
+ "and scheduled for removal in Python 3.10.",
+ DeprecationWarning, stacklevel=2)
+
+ return _gather(*coros_or_futures, loop=loop, return_exceptions=return_exceptions)
+
+
+def _gather(*coros_or_futures, loop=None, return_exceptions=False):
if not coros_or_futures:
if loop is None:
loop = events.get_event_loop()
@@ -779,7 +779,7 @@ def _gather(*coros_or_futures, loop=None, return_exceptions=False):
# Check if 'fut' is cancelled first, as
# 'fut.exception()' will *raise* a CancelledError
# instead of returning it.
- exc = fut._make_cancelled_error()
+ exc = fut._make_cancelled_error()
outer.set_exception(exc)
return
else:
@@ -795,15 +795,15 @@ def _gather(*coros_or_futures, loop=None, return_exceptions=False):
for fut in children:
if fut.cancelled():
- # Check if 'fut' is cancelled first, as 'fut.exception()'
- # will *raise* a CancelledError instead of returning it.
- # Also, since we're adding the exception return value
- # to 'results' instead of raising it, don't bother
- # setting __context__. This also lets us preserve
- # calling '_make_cancelled_error()' at most once.
- res = exceptions.CancelledError(
- '' if fut._cancel_message is None else
- fut._cancel_message)
+ # Check if 'fut' is cancelled first, as 'fut.exception()'
+ # will *raise* a CancelledError instead of returning it.
+ # Also, since we're adding the exception return value
+ # to 'results' instead of raising it, don't bother
+ # setting __context__. This also lets us preserve
+ # calling '_make_cancelled_error()' at most once.
+ res = exceptions.CancelledError(
+ '' if fut._cancel_message is None else
+ fut._cancel_message)
else:
res = fut.exception()
if res is None:
@@ -814,8 +814,8 @@ def _gather(*coros_or_futures, loop=None, return_exceptions=False):
# If gather is being cancelled we must propagate the
# cancellation regardless of *return_exceptions* argument.
# See issue 32684.
- exc = fut._make_cancelled_error()
- outer.set_exception(exc)
+ exc = fut._make_cancelled_error()
+ outer.set_exception(exc)
else:
outer.set_result(results)
@@ -875,10 +875,10 @@ def shield(arg, *, loop=None):
except CancelledError:
res = None
"""
- if loop is not None:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
+ if loop is not None:
+ warnings.warn("The loop argument is deprecated since Python 3.8, "
+ "and scheduled for removal in Python 3.10.",
+ DeprecationWarning, stacklevel=2)
inner = ensure_future(arg, loop=loop)
if inner.done():
# Shortcut.
@@ -886,7 +886,7 @@ def shield(arg, *, loop=None):
loop = futures._get_loop(inner)
outer = loop.create_future()
- def _inner_done_callback(inner):
+ def _inner_done_callback(inner):
if outer.cancelled():
if not inner.cancelled():
# Mark inner's result as retrieved.
@@ -902,13 +902,13 @@ def shield(arg, *, loop=None):
else:
outer.set_result(inner.result())
-
- def _outer_done_callback(outer):
- if not inner.done():
- inner.remove_done_callback(_inner_done_callback)
-
- inner.add_done_callback(_inner_done_callback)
- outer.add_done_callback(_outer_done_callback)
+
+ def _outer_done_callback(outer):
+ if not inner.done():
+ inner.remove_done_callback(_inner_done_callback)
+
+ inner.add_done_callback(_inner_done_callback)
+ outer.add_done_callback(_outer_done_callback)
return outer
@@ -924,9 +924,9 @@ def run_coroutine_threadsafe(coro, loop):
def callback():
try:
futures._chain_future(ensure_future(coro, loop=loop), future)
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as exc:
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
if future.set_running_or_notify_cancel():
future.set_exception(exc)
raise
diff --git a/contrib/tools/python3/src/Lib/asyncio/threads.py b/contrib/tools/python3/src/Lib/asyncio/threads.py
index cce2f05e10..db048a8231 100644
--- a/contrib/tools/python3/src/Lib/asyncio/threads.py
+++ b/contrib/tools/python3/src/Lib/asyncio/threads.py
@@ -1,25 +1,25 @@
-"""High-level support for working with threads in asyncio"""
-
-import functools
-import contextvars
-
-from . import events
-
-
-__all__ = "to_thread",
-
-
-async def to_thread(func, /, *args, **kwargs):
- """Asynchronously run function *func* in a separate thread.
-
- Any *args and **kwargs supplied for this function are directly passed
- to *func*. Also, the current :class:`contextvars.Context` is propagated,
- allowing context variables from the main thread to be accessed in the
- separate thread.
-
- Return a coroutine that can be awaited to get the eventual result of *func*.
- """
- loop = events.get_running_loop()
- ctx = contextvars.copy_context()
- func_call = functools.partial(ctx.run, func, *args, **kwargs)
- return await loop.run_in_executor(None, func_call)
+"""High-level support for working with threads in asyncio"""
+
+import functools
+import contextvars
+
+from . import events
+
+
+__all__ = "to_thread",
+
+
+async def to_thread(func, /, *args, **kwargs):
+ """Asynchronously run function *func* in a separate thread.
+
+ Any *args and **kwargs supplied for this function are directly passed
+ to *func*. Also, the current :class:`contextvars.Context` is propagated,
+ allowing context variables from the main thread to be accessed in the
+ separate thread.
+
+ Return a coroutine that can be awaited to get the eventual result of *func*.
+ """
+ loop = events.get_running_loop()
+ ctx = contextvars.copy_context()
+ func_call = functools.partial(ctx.run, func, *args, **kwargs)
+ return await loop.run_in_executor(None, func_call)
diff --git a/contrib/tools/python3/src/Lib/asyncio/transports.py b/contrib/tools/python3/src/Lib/asyncio/transports.py
index 06143ed829..45e155c94c 100644
--- a/contrib/tools/python3/src/Lib/asyncio/transports.py
+++ b/contrib/tools/python3/src/Lib/asyncio/transports.py
@@ -9,8 +9,8 @@ __all__ = (
class BaseTransport:
"""Base class for transports."""
- __slots__ = ('_extra',)
-
+ __slots__ = ('_extra',)
+
def __init__(self, extra=None):
if extra is None:
extra = {}
@@ -29,8 +29,8 @@ class BaseTransport:
Buffered data will be flushed asynchronously. No more data
will be received. After all buffered data is flushed, the
- protocol's connection_lost() method will (eventually) be
- called with None as its argument.
+ protocol's connection_lost() method will (eventually) be
+ called with None as its argument.
"""
raise NotImplementedError
@@ -46,8 +46,8 @@ class BaseTransport:
class ReadTransport(BaseTransport):
"""Interface for read-only transports."""
- __slots__ = ()
-
+ __slots__ = ()
+
def is_reading(self):
"""Return True if the transport is receiving."""
raise NotImplementedError
@@ -72,8 +72,8 @@ class ReadTransport(BaseTransport):
class WriteTransport(BaseTransport):
"""Interface for write-only transports."""
- __slots__ = ()
-
+ __slots__ = ()
+
def set_write_buffer_limits(self, high=None, low=None):
"""Set the high- and low-water limits for write flow control.
@@ -160,14 +160,14 @@ class Transport(ReadTransport, WriteTransport):
except writelines(), which calls write() in a loop.
"""
- __slots__ = ()
+ __slots__ = ()
+
-
class DatagramTransport(BaseTransport):
"""Interface for datagram (UDP) transports."""
- __slots__ = ()
-
+ __slots__ = ()
+
def sendto(self, data, addr=None):
"""Send data to the transport.
@@ -190,8 +190,8 @@ class DatagramTransport(BaseTransport):
class SubprocessTransport(BaseTransport):
- __slots__ = ()
-
+ __slots__ = ()
+
def get_pid(self):
"""Get subprocess id."""
raise NotImplementedError
@@ -259,8 +259,8 @@ class _FlowControlMixin(Transport):
resume_writing() may be called.
"""
- __slots__ = ('_loop', '_protocol_paused', '_high_water', '_low_water')
-
+ __slots__ = ('_loop', '_protocol_paused', '_high_water', '_low_water')
+
def __init__(self, extra=None, loop=None):
super().__init__(extra)
assert loop is not None
@@ -276,9 +276,9 @@ class _FlowControlMixin(Transport):
self._protocol_paused = True
try:
self._protocol.pause_writing()
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as exc:
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
self._loop.call_exception_handler({
'message': 'protocol.pause_writing() failed',
'exception': exc,
@@ -292,9 +292,9 @@ class _FlowControlMixin(Transport):
self._protocol_paused = False
try:
self._protocol.resume_writing()
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as exc:
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
self._loop.call_exception_handler({
'message': 'protocol.resume_writing() failed',
'exception': exc,
diff --git a/contrib/tools/python3/src/Lib/asyncio/trsock.py b/contrib/tools/python3/src/Lib/asyncio/trsock.py
index 80ac049e85..e9ebcc3261 100644
--- a/contrib/tools/python3/src/Lib/asyncio/trsock.py
+++ b/contrib/tools/python3/src/Lib/asyncio/trsock.py
@@ -1,206 +1,206 @@
-import socket
-import warnings
-
-
-class TransportSocket:
-
- """A socket-like wrapper for exposing real transport sockets.
-
- These objects can be safely returned by APIs like
- `transport.get_extra_info('socket')`. All potentially disruptive
- operations (like "socket.close()") are banned.
- """
-
- __slots__ = ('_sock',)
-
- def __init__(self, sock: socket.socket):
- self._sock = sock
-
- def _na(self, what):
- warnings.warn(
- f"Using {what} on sockets returned from get_extra_info('socket') "
- f"will be prohibited in asyncio 3.9. Please report your use case "
- f"to bugs.python.org.",
- DeprecationWarning, source=self)
-
- @property
- def family(self):
- return self._sock.family
-
- @property
- def type(self):
- return self._sock.type
-
- @property
- def proto(self):
- return self._sock.proto
-
- def __repr__(self):
- s = (
- f"<asyncio.TransportSocket fd={self.fileno()}, "
- f"family={self.family!s}, type={self.type!s}, "
- f"proto={self.proto}"
- )
-
- if self.fileno() != -1:
- try:
- laddr = self.getsockname()
- if laddr:
- s = f"{s}, laddr={laddr}"
- except socket.error:
- pass
- try:
- raddr = self.getpeername()
- if raddr:
- s = f"{s}, raddr={raddr}"
- except socket.error:
- pass
-
- return f"{s}>"
-
- def __getstate__(self):
- raise TypeError("Cannot serialize asyncio.TransportSocket object")
-
- def fileno(self):
- return self._sock.fileno()
-
- def dup(self):
- return self._sock.dup()
-
- def get_inheritable(self):
- return self._sock.get_inheritable()
-
- def shutdown(self, how):
- # asyncio doesn't currently provide a high-level transport API
- # to shutdown the connection.
- self._sock.shutdown(how)
-
- def getsockopt(self, *args, **kwargs):
- return self._sock.getsockopt(*args, **kwargs)
-
- def setsockopt(self, *args, **kwargs):
- self._sock.setsockopt(*args, **kwargs)
-
- def getpeername(self):
- return self._sock.getpeername()
-
- def getsockname(self):
- return self._sock.getsockname()
-
- def getsockbyname(self):
- return self._sock.getsockbyname()
-
- def accept(self):
- self._na('accept() method')
- return self._sock.accept()
-
- def connect(self, *args, **kwargs):
- self._na('connect() method')
- return self._sock.connect(*args, **kwargs)
-
- def connect_ex(self, *args, **kwargs):
- self._na('connect_ex() method')
- return self._sock.connect_ex(*args, **kwargs)
-
- def bind(self, *args, **kwargs):
- self._na('bind() method')
- return self._sock.bind(*args, **kwargs)
-
- def ioctl(self, *args, **kwargs):
- self._na('ioctl() method')
- return self._sock.ioctl(*args, **kwargs)
-
- def listen(self, *args, **kwargs):
- self._na('listen() method')
- return self._sock.listen(*args, **kwargs)
-
- def makefile(self):
- self._na('makefile() method')
- return self._sock.makefile()
-
- def sendfile(self, *args, **kwargs):
- self._na('sendfile() method')
- return self._sock.sendfile(*args, **kwargs)
-
- def close(self):
- self._na('close() method')
- return self._sock.close()
-
- def detach(self):
- self._na('detach() method')
- return self._sock.detach()
-
- def sendmsg_afalg(self, *args, **kwargs):
- self._na('sendmsg_afalg() method')
- return self._sock.sendmsg_afalg(*args, **kwargs)
-
- def sendmsg(self, *args, **kwargs):
- self._na('sendmsg() method')
- return self._sock.sendmsg(*args, **kwargs)
-
- def sendto(self, *args, **kwargs):
- self._na('sendto() method')
- return self._sock.sendto(*args, **kwargs)
-
- def send(self, *args, **kwargs):
- self._na('send() method')
- return self._sock.send(*args, **kwargs)
-
- def sendall(self, *args, **kwargs):
- self._na('sendall() method')
- return self._sock.sendall(*args, **kwargs)
-
- def set_inheritable(self, *args, **kwargs):
- self._na('set_inheritable() method')
- return self._sock.set_inheritable(*args, **kwargs)
-
- def share(self, process_id):
- self._na('share() method')
- return self._sock.share(process_id)
-
- def recv_into(self, *args, **kwargs):
- self._na('recv_into() method')
- return self._sock.recv_into(*args, **kwargs)
-
- def recvfrom_into(self, *args, **kwargs):
- self._na('recvfrom_into() method')
- return self._sock.recvfrom_into(*args, **kwargs)
-
- def recvmsg_into(self, *args, **kwargs):
- self._na('recvmsg_into() method')
- return self._sock.recvmsg_into(*args, **kwargs)
-
- def recvmsg(self, *args, **kwargs):
- self._na('recvmsg() method')
- return self._sock.recvmsg(*args, **kwargs)
-
- def recvfrom(self, *args, **kwargs):
- self._na('recvfrom() method')
- return self._sock.recvfrom(*args, **kwargs)
-
- def recv(self, *args, **kwargs):
- self._na('recv() method')
- return self._sock.recv(*args, **kwargs)
-
- def settimeout(self, value):
- if value == 0:
- return
- raise ValueError(
- 'settimeout(): only 0 timeout is allowed on transport sockets')
-
- def gettimeout(self):
- return 0
-
- def setblocking(self, flag):
- if not flag:
- return
- raise ValueError(
- 'setblocking(): transport sockets cannot be blocking')
-
- def __enter__(self):
- self._na('context manager protocol')
- return self._sock.__enter__()
-
- def __exit__(self, *err):
- self._na('context manager protocol')
- return self._sock.__exit__(*err)
+import socket
+import warnings
+
+
+class TransportSocket:
+
+ """A socket-like wrapper for exposing real transport sockets.
+
+ These objects can be safely returned by APIs like
+ `transport.get_extra_info('socket')`. All potentially disruptive
+ operations (like "socket.close()") are banned.
+ """
+
+ __slots__ = ('_sock',)
+
+ def __init__(self, sock: socket.socket):
+ self._sock = sock
+
+ def _na(self, what):
+ warnings.warn(
+ f"Using {what} on sockets returned from get_extra_info('socket') "
+ f"will be prohibited in asyncio 3.9. Please report your use case "
+ f"to bugs.python.org.",
+ DeprecationWarning, source=self)
+
+ @property
+ def family(self):
+ return self._sock.family
+
+ @property
+ def type(self):
+ return self._sock.type
+
+ @property
+ def proto(self):
+ return self._sock.proto
+
+ def __repr__(self):
+ s = (
+ f"<asyncio.TransportSocket fd={self.fileno()}, "
+ f"family={self.family!s}, type={self.type!s}, "
+ f"proto={self.proto}"
+ )
+
+ if self.fileno() != -1:
+ try:
+ laddr = self.getsockname()
+ if laddr:
+ s = f"{s}, laddr={laddr}"
+ except socket.error:
+ pass
+ try:
+ raddr = self.getpeername()
+ if raddr:
+ s = f"{s}, raddr={raddr}"
+ except socket.error:
+ pass
+
+ return f"{s}>"
+
+ def __getstate__(self):
+ raise TypeError("Cannot serialize asyncio.TransportSocket object")
+
+ def fileno(self):
+ return self._sock.fileno()
+
+ def dup(self):
+ return self._sock.dup()
+
+ def get_inheritable(self):
+ return self._sock.get_inheritable()
+
+ def shutdown(self, how):
+ # asyncio doesn't currently provide a high-level transport API
+ # to shutdown the connection.
+ self._sock.shutdown(how)
+
+ def getsockopt(self, *args, **kwargs):
+ return self._sock.getsockopt(*args, **kwargs)
+
+ def setsockopt(self, *args, **kwargs):
+ self._sock.setsockopt(*args, **kwargs)
+
+ def getpeername(self):
+ return self._sock.getpeername()
+
+ def getsockname(self):
+ return self._sock.getsockname()
+
+ def getsockbyname(self):
+ return self._sock.getsockbyname()
+
+ def accept(self):
+ self._na('accept() method')
+ return self._sock.accept()
+
+ def connect(self, *args, **kwargs):
+ self._na('connect() method')
+ return self._sock.connect(*args, **kwargs)
+
+ def connect_ex(self, *args, **kwargs):
+ self._na('connect_ex() method')
+ return self._sock.connect_ex(*args, **kwargs)
+
+ def bind(self, *args, **kwargs):
+ self._na('bind() method')
+ return self._sock.bind(*args, **kwargs)
+
+ def ioctl(self, *args, **kwargs):
+ self._na('ioctl() method')
+ return self._sock.ioctl(*args, **kwargs)
+
+ def listen(self, *args, **kwargs):
+ self._na('listen() method')
+ return self._sock.listen(*args, **kwargs)
+
+ def makefile(self):
+ self._na('makefile() method')
+ return self._sock.makefile()
+
+ def sendfile(self, *args, **kwargs):
+ self._na('sendfile() method')
+ return self._sock.sendfile(*args, **kwargs)
+
+ def close(self):
+ self._na('close() method')
+ return self._sock.close()
+
+ def detach(self):
+ self._na('detach() method')
+ return self._sock.detach()
+
+ def sendmsg_afalg(self, *args, **kwargs):
+ self._na('sendmsg_afalg() method')
+ return self._sock.sendmsg_afalg(*args, **kwargs)
+
+ def sendmsg(self, *args, **kwargs):
+ self._na('sendmsg() method')
+ return self._sock.sendmsg(*args, **kwargs)
+
+ def sendto(self, *args, **kwargs):
+ self._na('sendto() method')
+ return self._sock.sendto(*args, **kwargs)
+
+ def send(self, *args, **kwargs):
+ self._na('send() method')
+ return self._sock.send(*args, **kwargs)
+
+ def sendall(self, *args, **kwargs):
+ self._na('sendall() method')
+ return self._sock.sendall(*args, **kwargs)
+
+ def set_inheritable(self, *args, **kwargs):
+ self._na('set_inheritable() method')
+ return self._sock.set_inheritable(*args, **kwargs)
+
+ def share(self, process_id):
+ self._na('share() method')
+ return self._sock.share(process_id)
+
+ def recv_into(self, *args, **kwargs):
+ self._na('recv_into() method')
+ return self._sock.recv_into(*args, **kwargs)
+
+ def recvfrom_into(self, *args, **kwargs):
+ self._na('recvfrom_into() method')
+ return self._sock.recvfrom_into(*args, **kwargs)
+
+ def recvmsg_into(self, *args, **kwargs):
+ self._na('recvmsg_into() method')
+ return self._sock.recvmsg_into(*args, **kwargs)
+
+ def recvmsg(self, *args, **kwargs):
+ self._na('recvmsg() method')
+ return self._sock.recvmsg(*args, **kwargs)
+
+ def recvfrom(self, *args, **kwargs):
+ self._na('recvfrom() method')
+ return self._sock.recvfrom(*args, **kwargs)
+
+ def recv(self, *args, **kwargs):
+ self._na('recv() method')
+ return self._sock.recv(*args, **kwargs)
+
+ def settimeout(self, value):
+ if value == 0:
+ return
+ raise ValueError(
+ 'settimeout(): only 0 timeout is allowed on transport sockets')
+
+ def gettimeout(self):
+ return 0
+
+ def setblocking(self, flag):
+ if not flag:
+ return
+ raise ValueError(
+ 'setblocking(): transport sockets cannot be blocking')
+
+ def __enter__(self):
+ self._na('context manager protocol')
+ return self._sock.__enter__()
+
+ def __exit__(self, *err):
+ self._na('context manager protocol')
+ return self._sock.__exit__(*err)
diff --git a/contrib/tools/python3/src/Lib/asyncio/unix_events.py b/contrib/tools/python3/src/Lib/asyncio/unix_events.py
index b553e2010b..eecbc101ee 100644
--- a/contrib/tools/python3/src/Lib/asyncio/unix_events.py
+++ b/contrib/tools/python3/src/Lib/asyncio/unix_events.py
@@ -2,7 +2,7 @@
import errno
import io
-import itertools
+import itertools
import os
import selectors
import signal
@@ -18,7 +18,7 @@ from . import base_subprocess
from . import constants
from . import coroutines
from . import events
-from . import exceptions
+from . import exceptions
from . import futures
from . import selector_events
from . import tasks
@@ -29,9 +29,9 @@ from .log import logger
__all__ = (
'SelectorEventLoop',
'AbstractChildWatcher', 'SafeChildWatcher',
- 'FastChildWatcher', 'PidfdChildWatcher',
- 'MultiLoopChildWatcher', 'ThreadedChildWatcher',
- 'DefaultEventLoopPolicy',
+ 'FastChildWatcher', 'PidfdChildWatcher',
+ 'MultiLoopChildWatcher', 'ThreadedChildWatcher',
+ 'DefaultEventLoopPolicy',
)
@@ -101,7 +101,7 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
try:
# Register a dummy signal handler to ask Python to write the signal
- # number in the wakeup file descriptor. _process_self_data() will
+ # number in the wakeup file descriptor. _process_self_data() will
# read signal numbers from this file descriptor to handle signals.
signal.signal(sig, _sighandler_noop)
@@ -171,8 +171,8 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
if not isinstance(sig, int):
raise TypeError(f'sig must be an int, not {sig!r}')
- if sig not in signal.valid_signals():
- raise ValueError(f'invalid signal number {sig}')
+ if sig not in signal.valid_signals():
+ raise ValueError(f'invalid signal number {sig}')
def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
extra=None):
@@ -186,13 +186,13 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
stdin, stdout, stderr, bufsize,
extra=None, **kwargs):
with events.get_child_watcher() as watcher:
- if not watcher.is_active():
- # Check early.
- # Raising exception before process creation
- # prevents subprocess execution if the watcher
- # is not ready to handle it.
- raise RuntimeError("asyncio.get_child_watcher() is not activated, "
- "subprocess support is not installed.")
+ if not watcher.is_active():
+ # Check early.
+ # Raising exception before process creation
+ # prevents subprocess execution if the watcher
+ # is not ready to handle it.
+ raise RuntimeError("asyncio.get_child_watcher() is not activated, "
+ "subprocess support is not installed.")
waiter = self.create_future()
transp = _UnixSubprocessTransport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
@@ -203,9 +203,9 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
self._child_watcher_callback, transp)
try:
await waiter
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException:
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException:
transp.close()
await transp._wait()
raise
@@ -323,24 +323,24 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
server._start_serving()
# Skip one loop iteration so that all 'loop.add_reader'
# go through.
- await tasks.sleep(0)
+ await tasks.sleep(0)
return server
async def _sock_sendfile_native(self, sock, file, offset, count):
try:
os.sendfile
- except AttributeError:
- raise exceptions.SendfileNotAvailableError(
+ except AttributeError:
+ raise exceptions.SendfileNotAvailableError(
"os.sendfile() is not available")
try:
fileno = file.fileno()
except (AttributeError, io.UnsupportedOperation) as err:
- raise exceptions.SendfileNotAvailableError("not a regular file")
+ raise exceptions.SendfileNotAvailableError("not a regular file")
try:
fsize = os.fstat(fileno).st_size
- except OSError:
- raise exceptions.SendfileNotAvailableError("not a regular file")
+ except OSError:
+ raise exceptions.SendfileNotAvailableError("not a regular file")
blocksize = count if count else fsize
if not blocksize:
return 0 # empty file
@@ -394,16 +394,16 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
# one being 'file' is not a regular mmap(2)-like
# file, in which case we'll fall back on using
# plain send().
- err = exceptions.SendfileNotAvailableError(
+ err = exceptions.SendfileNotAvailableError(
"os.sendfile call failed")
self._sock_sendfile_update_filepos(fileno, offset, total_sent)
fut.set_exception(err)
else:
self._sock_sendfile_update_filepos(fileno, offset, total_sent)
fut.set_exception(exc)
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as exc:
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
self._sock_sendfile_update_filepos(fileno, offset, total_sent)
fut.set_exception(exc)
else:
@@ -445,7 +445,7 @@ class _UnixReadPipeTransport(transports.ReadTransport):
self._fileno = pipe.fileno()
self._protocol = protocol
self._closing = False
- self._paused = False
+ self._paused = False
mode = os.fstat(self._fileno).st_mode
if not (stat.S_ISFIFO(mode) or
@@ -507,20 +507,20 @@ class _UnixReadPipeTransport(transports.ReadTransport):
self._loop.call_soon(self._call_connection_lost, None)
def pause_reading(self):
- if self._closing or self._paused:
- return
- self._paused = True
+ if self._closing or self._paused:
+ return
+ self._paused = True
self._loop._remove_reader(self._fileno)
- if self._loop.get_debug():
- logger.debug("%r pauses reading", self)
+ if self._loop.get_debug():
+ logger.debug("%r pauses reading", self)
def resume_reading(self):
- if self._closing or not self._paused:
- return
- self._paused = False
+ if self._closing or not self._paused:
+ return
+ self._paused = False
self._loop._add_reader(self._fileno, self._read_ready)
- if self._loop.get_debug():
- logger.debug("%r resumes reading", self)
+ if self._loop.get_debug():
+ logger.debug("%r resumes reading", self)
def set_protocol(self, protocol):
self._protocol = protocol
@@ -535,9 +535,9 @@ class _UnixReadPipeTransport(transports.ReadTransport):
if not self._closing:
self._close(None)
- def __del__(self, _warn=warnings.warn):
+ def __del__(self, _warn=warnings.warn):
if self._pipe is not None:
- _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
+ _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
self._pipe.close()
def _fatal_error(self, exc, message='Fatal error on pipe transport'):
@@ -665,9 +665,9 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
n = os.write(self._fileno, data)
except (BlockingIOError, InterruptedError):
n = 0
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as exc:
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
self._conn_lost += 1
self._fatal_error(exc, 'Fatal write error on pipe transport')
return
@@ -687,9 +687,9 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
n = os.write(self._fileno, self._buffer)
except (BlockingIOError, InterruptedError):
pass
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as exc:
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
self._buffer.clear()
self._conn_lost += 1
# Remove writer here, _fatal_error() doesn't it
@@ -734,9 +734,9 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
# write_eof is all what we needed to close the write pipe
self.write_eof()
- def __del__(self, _warn=warnings.warn):
+ def __del__(self, _warn=warnings.warn):
if self._pipe is not None:
- _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
+ _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
self._pipe.close()
def abort(self):
@@ -744,7 +744,7 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
def _fatal_error(self, exc, message='Fatal error on pipe transport'):
# should be called by exception handler only
- if isinstance(exc, OSError):
+ if isinstance(exc, OSError):
if self._loop.get_debug():
logger.debug("%r: %s", self, message, exc_info=True)
else:
@@ -785,18 +785,18 @@ class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
# other end). Notably this is needed on AIX, and works
# just fine on other platforms.
stdin, stdin_w = socket.socketpair()
- try:
- self._proc = subprocess.Popen(
- args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
- universal_newlines=False, bufsize=bufsize, **kwargs)
- if stdin_w is not None:
- stdin.close()
- self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
- stdin_w = None
- finally:
- if stdin_w is not None:
- stdin.close()
- stdin_w.close()
+ try:
+ self._proc = subprocess.Popen(
+ args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
+ universal_newlines=False, bufsize=bufsize, **kwargs)
+ if stdin_w is not None:
+ stdin.close()
+ self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
+ stdin_w = None
+ finally:
+ if stdin_w is not None:
+ stdin.close()
+ stdin_w.close()
class AbstractChildWatcher:
@@ -858,15 +858,15 @@ class AbstractChildWatcher:
"""
raise NotImplementedError()
- def is_active(self):
- """Return ``True`` if the watcher is active and is used by the event loop.
-
- Return True if the watcher is installed and ready to handle process exit
- notifications.
-
- """
- raise NotImplementedError()
-
+ def is_active(self):
+ """Return ``True`` if the watcher is active and is used by the event loop.
+
+ Return True if the watcher is installed and ready to handle process exit
+ notifications.
+
+ """
+ raise NotImplementedError()
+
def __enter__(self):
"""Enter the watcher's context and allow starting new processes
@@ -878,98 +878,98 @@ class AbstractChildWatcher:
raise NotImplementedError()
-class PidfdChildWatcher(AbstractChildWatcher):
- """Child watcher implementation using Linux's pid file descriptors.
-
- This child watcher polls process file descriptors (pidfds) to await child
- process termination. In some respects, PidfdChildWatcher is a "Goldilocks"
- child watcher implementation. It doesn't require signals or threads, doesn't
- interfere with any processes launched outside the event loop, and scales
- linearly with the number of subprocesses launched by the event loop. The
- main disadvantage is that pidfds are specific to Linux, and only work on
- recent (5.3+) kernels.
- """
-
- def __init__(self):
- self._loop = None
- self._callbacks = {}
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_value, exc_traceback):
- pass
-
- def is_active(self):
- return self._loop is not None and self._loop.is_running()
-
- def close(self):
- self.attach_loop(None)
-
- def attach_loop(self, loop):
- if self._loop is not None and loop is None and self._callbacks:
- warnings.warn(
- 'A loop is being detached '
- 'from a child watcher with pending handlers',
- RuntimeWarning)
- for pidfd, _, _ in self._callbacks.values():
- self._loop._remove_reader(pidfd)
- os.close(pidfd)
- self._callbacks.clear()
- self._loop = loop
-
- def add_child_handler(self, pid, callback, *args):
- existing = self._callbacks.get(pid)
- if existing is not None:
- self._callbacks[pid] = existing[0], callback, args
- else:
- pidfd = os.pidfd_open(pid)
- self._loop._add_reader(pidfd, self._do_wait, pid)
- self._callbacks[pid] = pidfd, callback, args
-
- def _do_wait(self, pid):
- pidfd, callback, args = self._callbacks.pop(pid)
- self._loop._remove_reader(pidfd)
- try:
- _, status = os.waitpid(pid, 0)
- except ChildProcessError:
- # The child process is already reaped
- # (may happen if waitpid() is called elsewhere).
- returncode = 255
- logger.warning(
- "child process pid %d exit status already read: "
- " will report returncode 255",
- pid)
- else:
- returncode = _compute_returncode(status)
-
- os.close(pidfd)
- callback(pid, returncode, *args)
-
- def remove_child_handler(self, pid):
- try:
- pidfd, _, _ = self._callbacks.pop(pid)
- except KeyError:
- return False
- self._loop._remove_reader(pidfd)
- os.close(pidfd)
- return True
-
-
-def _compute_returncode(status):
- if os.WIFSIGNALED(status):
- # The child process died because of a signal.
- return -os.WTERMSIG(status)
- elif os.WIFEXITED(status):
- # The child process exited (e.g sys.exit()).
- return os.WEXITSTATUS(status)
- else:
- # The child exited, but we don't understand its status.
- # This shouldn't happen, but if it does, let's just
- # return that status; perhaps that helps debug it.
- return status
-
-
+class PidfdChildWatcher(AbstractChildWatcher):
+ """Child watcher implementation using Linux's pid file descriptors.
+
+ This child watcher polls process file descriptors (pidfds) to await child
+ process termination. In some respects, PidfdChildWatcher is a "Goldilocks"
+ child watcher implementation. It doesn't require signals or threads, doesn't
+ interfere with any processes launched outside the event loop, and scales
+ linearly with the number of subprocesses launched by the event loop. The
+ main disadvantage is that pidfds are specific to Linux, and only work on
+ recent (5.3+) kernels.
+ """
+
+ def __init__(self):
+ self._loop = None
+ self._callbacks = {}
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, exc_traceback):
+ pass
+
+ def is_active(self):
+ return self._loop is not None and self._loop.is_running()
+
+ def close(self):
+ self.attach_loop(None)
+
+ def attach_loop(self, loop):
+ if self._loop is not None and loop is None and self._callbacks:
+ warnings.warn(
+ 'A loop is being detached '
+ 'from a child watcher with pending handlers',
+ RuntimeWarning)
+ for pidfd, _, _ in self._callbacks.values():
+ self._loop._remove_reader(pidfd)
+ os.close(pidfd)
+ self._callbacks.clear()
+ self._loop = loop
+
+ def add_child_handler(self, pid, callback, *args):
+ existing = self._callbacks.get(pid)
+ if existing is not None:
+ self._callbacks[pid] = existing[0], callback, args
+ else:
+ pidfd = os.pidfd_open(pid)
+ self._loop._add_reader(pidfd, self._do_wait, pid)
+ self._callbacks[pid] = pidfd, callback, args
+
+ def _do_wait(self, pid):
+ pidfd, callback, args = self._callbacks.pop(pid)
+ self._loop._remove_reader(pidfd)
+ try:
+ _, status = os.waitpid(pid, 0)
+ except ChildProcessError:
+ # The child process is already reaped
+ # (may happen if waitpid() is called elsewhere).
+ returncode = 255
+ logger.warning(
+ "child process pid %d exit status already read: "
+ " will report returncode 255",
+ pid)
+ else:
+ returncode = _compute_returncode(status)
+
+ os.close(pidfd)
+ callback(pid, returncode, *args)
+
+ def remove_child_handler(self, pid):
+ try:
+ pidfd, _, _ = self._callbacks.pop(pid)
+ except KeyError:
+ return False
+ self._loop._remove_reader(pidfd)
+ os.close(pidfd)
+ return True
+
+
+def _compute_returncode(status):
+ if os.WIFSIGNALED(status):
+ # The child process died because of a signal.
+ return -os.WTERMSIG(status)
+ elif os.WIFEXITED(status):
+ # The child process exited (e.g sys.exit()).
+ return os.WEXITSTATUS(status)
+ else:
+ # The child exited, but we don't understand its status.
+ # This shouldn't happen, but if it does, let's just
+ # return that status; perhaps that helps debug it.
+ return status
+
+
class BaseChildWatcher(AbstractChildWatcher):
def __init__(self):
@@ -979,9 +979,9 @@ class BaseChildWatcher(AbstractChildWatcher):
def close(self):
self.attach_loop(None)
- def is_active(self):
- return self._loop is not None and self._loop.is_running()
-
+ def is_active(self):
+ return self._loop is not None and self._loop.is_running()
+
def _do_waitpid(self, expected_pid):
raise NotImplementedError()
@@ -1011,9 +1011,9 @@ class BaseChildWatcher(AbstractChildWatcher):
def _sig_chld(self):
try:
self._do_waitpid_all()
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as exc:
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException as exc:
# self._loop should always be available here
# as '_sig_chld' is added as a signal handler
# in 'attach_loop'
@@ -1080,7 +1080,7 @@ class SafeChildWatcher(BaseChildWatcher):
# The child process is still alive.
return
- returncode = _compute_returncode(status)
+ returncode = _compute_returncode(status)
if self._loop.get_debug():
logger.debug('process %s exited with returncode %s',
expected_pid, returncode)
@@ -1173,7 +1173,7 @@ class FastChildWatcher(BaseChildWatcher):
# A child process is still alive.
return
- returncode = _compute_returncode(status)
+ returncode = _compute_returncode(status)
with self._lock:
try:
@@ -1202,220 +1202,220 @@ class FastChildWatcher(BaseChildWatcher):
callback(pid, returncode, *args)
-class MultiLoopChildWatcher(AbstractChildWatcher):
- """A watcher that doesn't require running loop in the main thread.
-
- This implementation registers a SIGCHLD signal handler on
- instantiation (which may conflict with other code that
- install own handler for this signal).
-
- The solution is safe but it has a significant overhead when
- handling a big number of processes (*O(n)* each time a
- SIGCHLD is received).
- """
-
- # Implementation note:
- # The class keeps compatibility with AbstractChildWatcher ABC
- # To achieve this it has empty attach_loop() method
- # and doesn't accept explicit loop argument
- # for add_child_handler()/remove_child_handler()
- # but retrieves the current loop by get_running_loop()
-
- def __init__(self):
- self._callbacks = {}
- self._saved_sighandler = None
-
- def is_active(self):
- return self._saved_sighandler is not None
-
- def close(self):
- self._callbacks.clear()
- if self._saved_sighandler is None:
- return
-
- handler = signal.getsignal(signal.SIGCHLD)
- if handler != self._sig_chld:
- logger.warning("SIGCHLD handler was changed by outside code")
- else:
- signal.signal(signal.SIGCHLD, self._saved_sighandler)
- self._saved_sighandler = None
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- pass
-
- def add_child_handler(self, pid, callback, *args):
- loop = events.get_running_loop()
- self._callbacks[pid] = (loop, callback, args)
-
- # Prevent a race condition in case the child is already terminated.
- self._do_waitpid(pid)
-
- def remove_child_handler(self, pid):
- try:
- del self._callbacks[pid]
- return True
- except KeyError:
- return False
-
- def attach_loop(self, loop):
- # Don't save the loop but initialize itself if called first time
- # The reason to do it here is that attach_loop() is called from
- # unix policy only for the main thread.
- # Main thread is required for subscription on SIGCHLD signal
- if self._saved_sighandler is not None:
- return
-
- self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld)
- if self._saved_sighandler is None:
- logger.warning("Previous SIGCHLD handler was set by non-Python code, "
- "restore to default handler on watcher close.")
- self._saved_sighandler = signal.SIG_DFL
-
- # Set SA_RESTART to limit EINTR occurrences.
- signal.siginterrupt(signal.SIGCHLD, False)
-
- def _do_waitpid_all(self):
- for pid in list(self._callbacks):
- self._do_waitpid(pid)
-
- def _do_waitpid(self, expected_pid):
- assert expected_pid > 0
-
- try:
- pid, status = os.waitpid(expected_pid, os.WNOHANG)
- except ChildProcessError:
- # The child process is already reaped
- # (may happen if waitpid() is called elsewhere).
- pid = expected_pid
- returncode = 255
- logger.warning(
- "Unknown child process pid %d, will report returncode 255",
- pid)
- debug_log = False
- else:
- if pid == 0:
- # The child process is still alive.
- return
-
- returncode = _compute_returncode(status)
- debug_log = True
- try:
- loop, callback, args = self._callbacks.pop(pid)
- except KeyError: # pragma: no cover
- # May happen if .remove_child_handler() is called
- # after os.waitpid() returns.
- logger.warning("Child watcher got an unexpected pid: %r",
- pid, exc_info=True)
- else:
- if loop.is_closed():
- logger.warning("Loop %r that handles pid %r is closed", loop, pid)
- else:
- if debug_log and loop.get_debug():
- logger.debug('process %s exited with returncode %s',
- expected_pid, returncode)
- loop.call_soon_threadsafe(callback, pid, returncode, *args)
-
- def _sig_chld(self, signum, frame):
- try:
- self._do_waitpid_all()
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException:
- logger.warning('Unknown exception in SIGCHLD handler', exc_info=True)
-
-
-class ThreadedChildWatcher(AbstractChildWatcher):
- """Threaded child watcher implementation.
-
- The watcher uses a thread per process
- for waiting for the process finish.
-
- It doesn't require subscription on POSIX signal
- but a thread creation is not free.
-
- The watcher has O(1) complexity, its performance doesn't depend
- on amount of spawn processes.
- """
-
- def __init__(self):
- self._pid_counter = itertools.count(0)
- self._threads = {}
-
- def is_active(self):
- return True
-
- def close(self):
- self._join_threads()
-
- def _join_threads(self):
- """Internal: Join all non-daemon threads"""
- threads = [thread for thread in list(self._threads.values())
- if thread.is_alive() and not thread.daemon]
- for thread in threads:
- thread.join()
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- pass
-
- def __del__(self, _warn=warnings.warn):
- threads = [thread for thread in list(self._threads.values())
- if thread.is_alive()]
- if threads:
- _warn(f"{self.__class__} has registered but not finished child processes",
- ResourceWarning,
- source=self)
-
- def add_child_handler(self, pid, callback, *args):
- loop = events.get_running_loop()
- thread = threading.Thread(target=self._do_waitpid,
- name=f"waitpid-{next(self._pid_counter)}",
- args=(loop, pid, callback, args),
- daemon=True)
- self._threads[pid] = thread
- thread.start()
-
- def remove_child_handler(self, pid):
- # asyncio never calls remove_child_handler() !!!
- # The method is no-op but is implemented because
- # abstract base classes require it.
- return True
-
- def attach_loop(self, loop):
- pass
-
- def _do_waitpid(self, loop, expected_pid, callback, args):
- assert expected_pid > 0
-
- try:
- pid, status = os.waitpid(expected_pid, 0)
- except ChildProcessError:
- # The child process is already reaped
- # (may happen if waitpid() is called elsewhere).
- pid = expected_pid
- returncode = 255
- logger.warning(
- "Unknown child process pid %d, will report returncode 255",
- pid)
- else:
- returncode = _compute_returncode(status)
- if loop.get_debug():
- logger.debug('process %s exited with returncode %s',
- expected_pid, returncode)
-
- if loop.is_closed():
- logger.warning("Loop %r that handles pid %r is closed", loop, pid)
- else:
- loop.call_soon_threadsafe(callback, pid, returncode, *args)
-
- self._threads.pop(expected_pid)
-
-
+class MultiLoopChildWatcher(AbstractChildWatcher):
+ """A watcher that doesn't require running loop in the main thread.
+
+ This implementation registers a SIGCHLD signal handler on
+ instantiation (which may conflict with other code that
+ install own handler for this signal).
+
+ The solution is safe but it has a significant overhead when
+ handling a big number of processes (*O(n)* each time a
+ SIGCHLD is received).
+ """
+
+ # Implementation note:
+ # The class keeps compatibility with AbstractChildWatcher ABC
+ # To achieve this it has empty attach_loop() method
+ # and doesn't accept explicit loop argument
+ # for add_child_handler()/remove_child_handler()
+ # but retrieves the current loop by get_running_loop()
+
+ def __init__(self):
+ self._callbacks = {}
+ self._saved_sighandler = None
+
+ def is_active(self):
+ return self._saved_sighandler is not None
+
+ def close(self):
+ self._callbacks.clear()
+ if self._saved_sighandler is None:
+ return
+
+ handler = signal.getsignal(signal.SIGCHLD)
+ if handler != self._sig_chld:
+ logger.warning("SIGCHLD handler was changed by outside code")
+ else:
+ signal.signal(signal.SIGCHLD, self._saved_sighandler)
+ self._saved_sighandler = None
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ pass
+
+ def add_child_handler(self, pid, callback, *args):
+ loop = events.get_running_loop()
+ self._callbacks[pid] = (loop, callback, args)
+
+ # Prevent a race condition in case the child is already terminated.
+ self._do_waitpid(pid)
+
+ def remove_child_handler(self, pid):
+ try:
+ del self._callbacks[pid]
+ return True
+ except KeyError:
+ return False
+
+ def attach_loop(self, loop):
+ # Don't save the loop but initialize itself if called first time
+ # The reason to do it here is that attach_loop() is called from
+ # unix policy only for the main thread.
+ # Main thread is required for subscription on SIGCHLD signal
+ if self._saved_sighandler is not None:
+ return
+
+ self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld)
+ if self._saved_sighandler is None:
+ logger.warning("Previous SIGCHLD handler was set by non-Python code, "
+ "restore to default handler on watcher close.")
+ self._saved_sighandler = signal.SIG_DFL
+
+ # Set SA_RESTART to limit EINTR occurrences.
+ signal.siginterrupt(signal.SIGCHLD, False)
+
+ def _do_waitpid_all(self):
+ for pid in list(self._callbacks):
+ self._do_waitpid(pid)
+
+ def _do_waitpid(self, expected_pid):
+ assert expected_pid > 0
+
+ try:
+ pid, status = os.waitpid(expected_pid, os.WNOHANG)
+ except ChildProcessError:
+ # The child process is already reaped
+ # (may happen if waitpid() is called elsewhere).
+ pid = expected_pid
+ returncode = 255
+ logger.warning(
+ "Unknown child process pid %d, will report returncode 255",
+ pid)
+ debug_log = False
+ else:
+ if pid == 0:
+ # The child process is still alive.
+ return
+
+ returncode = _compute_returncode(status)
+ debug_log = True
+ try:
+ loop, callback, args = self._callbacks.pop(pid)
+ except KeyError: # pragma: no cover
+ # May happen if .remove_child_handler() is called
+ # after os.waitpid() returns.
+ logger.warning("Child watcher got an unexpected pid: %r",
+ pid, exc_info=True)
+ else:
+ if loop.is_closed():
+ logger.warning("Loop %r that handles pid %r is closed", loop, pid)
+ else:
+ if debug_log and loop.get_debug():
+ logger.debug('process %s exited with returncode %s',
+ expected_pid, returncode)
+ loop.call_soon_threadsafe(callback, pid, returncode, *args)
+
+ def _sig_chld(self, signum, frame):
+ try:
+ self._do_waitpid_all()
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException:
+ logger.warning('Unknown exception in SIGCHLD handler', exc_info=True)
+
+
+class ThreadedChildWatcher(AbstractChildWatcher):
+ """Threaded child watcher implementation.
+
+ The watcher uses a thread per process
+ for waiting for the process finish.
+
+ It doesn't require subscription on POSIX signal
+ but a thread creation is not free.
+
+ The watcher has O(1) complexity, its performance doesn't depend
+ on amount of spawn processes.
+ """
+
+ def __init__(self):
+ self._pid_counter = itertools.count(0)
+ self._threads = {}
+
+ def is_active(self):
+ return True
+
+ def close(self):
+ self._join_threads()
+
+ def _join_threads(self):
+ """Internal: Join all non-daemon threads"""
+ threads = [thread for thread in list(self._threads.values())
+ if thread.is_alive() and not thread.daemon]
+ for thread in threads:
+ thread.join()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ pass
+
+ def __del__(self, _warn=warnings.warn):
+ threads = [thread for thread in list(self._threads.values())
+ if thread.is_alive()]
+ if threads:
+ _warn(f"{self.__class__} has registered but not finished child processes",
+ ResourceWarning,
+ source=self)
+
+ def add_child_handler(self, pid, callback, *args):
+ loop = events.get_running_loop()
+ thread = threading.Thread(target=self._do_waitpid,
+ name=f"waitpid-{next(self._pid_counter)}",
+ args=(loop, pid, callback, args),
+ daemon=True)
+ self._threads[pid] = thread
+ thread.start()
+
+ def remove_child_handler(self, pid):
+ # asyncio never calls remove_child_handler() !!!
+ # The method is no-op but is implemented because
+ # abstract base classes require it.
+ return True
+
+ def attach_loop(self, loop):
+ pass
+
+ def _do_waitpid(self, loop, expected_pid, callback, args):
+ assert expected_pid > 0
+
+ try:
+ pid, status = os.waitpid(expected_pid, 0)
+ except ChildProcessError:
+ # The child process is already reaped
+ # (may happen if waitpid() is called elsewhere).
+ pid = expected_pid
+ returncode = 255
+ logger.warning(
+ "Unknown child process pid %d, will report returncode 255",
+ pid)
+ else:
+ returncode = _compute_returncode(status)
+ if loop.get_debug():
+ logger.debug('process %s exited with returncode %s',
+ expected_pid, returncode)
+
+ if loop.is_closed():
+ logger.warning("Loop %r that handles pid %r is closed", loop, pid)
+ else:
+ loop.call_soon_threadsafe(callback, pid, returncode, *args)
+
+ self._threads.pop(expected_pid)
+
+
class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
"""UNIX event loop policy with a watcher for child processes."""
_loop_factory = _UnixSelectorEventLoop
@@ -1427,8 +1427,8 @@ class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
def _init_watcher(self):
with events._lock:
if self._watcher is None: # pragma: no branch
- self._watcher = ThreadedChildWatcher()
- if threading.current_thread() is threading.main_thread():
+ self._watcher = ThreadedChildWatcher()
+ if threading.current_thread() is threading.main_thread():
self._watcher.attach_loop(self._local._loop)
def set_event_loop(self, loop):
@@ -1442,13 +1442,13 @@ class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
super().set_event_loop(loop)
if (self._watcher is not None and
- threading.current_thread() is threading.main_thread()):
+ threading.current_thread() is threading.main_thread()):
self._watcher.attach_loop(loop)
def get_child_watcher(self):
"""Get the watcher for child processes.
- If not yet set, a ThreadedChildWatcher object is automatically created.
+ If not yet set, a ThreadedChildWatcher object is automatically created.
"""
if self._watcher is None:
self._init_watcher()
diff --git a/contrib/tools/python3/src/Lib/asyncio/windows_events.py b/contrib/tools/python3/src/Lib/asyncio/windows_events.py
index bf9e43b7a0..da81ab435b 100644
--- a/contrib/tools/python3/src/Lib/asyncio/windows_events.py
+++ b/contrib/tools/python3/src/Lib/asyncio/windows_events.py
@@ -1,10 +1,10 @@
"""Selector and proactor event loops for Windows."""
-import sys
-
-if sys.platform != 'win32': # pragma: no cover
- raise ImportError('win32 only')
-
+import sys
+
+if sys.platform != 'win32': # pragma: no cover
+ raise ImportError('win32 only')
+
import _overlapped
import _winapi
import errno
@@ -18,7 +18,7 @@ import weakref
from . import events
from . import base_subprocess
from . import futures
-from . import exceptions
+from . import exceptions
from . import proactor_events
from . import selector_events
from . import tasks
@@ -80,9 +80,9 @@ class _OverlappedFuture(futures.Future):
self._loop.call_exception_handler(context)
self._ov = None
- def cancel(self, msg=None):
+ def cancel(self, msg=None):
self._cancel_overlapped()
- return super().cancel(msg=msg)
+ return super().cancel(msg=msg)
def set_exception(self, exception):
super().set_exception(exception)
@@ -154,9 +154,9 @@ class _BaseWaitHandleFuture(futures.Future):
self._unregister_wait_cb(None)
- def cancel(self, msg=None):
+ def cancel(self, msg=None):
self._unregister_wait()
- return super().cancel(msg=msg)
+ return super().cancel(msg=msg)
def set_exception(self, exception):
self._unregister_wait()
@@ -314,25 +314,25 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
proactor = IocpProactor()
super().__init__(proactor)
- def run_forever(self):
- try:
- assert self._self_reading_future is None
- self.call_soon(self._loop_self_reading)
- super().run_forever()
- finally:
- if self._self_reading_future is not None:
- ov = self._self_reading_future._ov
- self._self_reading_future.cancel()
- # self_reading_future was just cancelled so if it hasn't been
- # finished yet, it never will be (it's possible that it has
- # already finished and its callback is waiting in the queue,
- # where it could still happen if the event loop is restarted).
- # Unregister it otherwise IocpProactor.close will wait for it
- # forever
- if ov is not None:
- self._proactor._unregister(ov)
- self._self_reading_future = None
-
+ def run_forever(self):
+ try:
+ assert self._self_reading_future is None
+ self.call_soon(self._loop_self_reading)
+ super().run_forever()
+ finally:
+ if self._self_reading_future is not None:
+ ov = self._self_reading_future._ov
+ self._self_reading_future.cancel()
+ # self_reading_future was just cancelled so if it hasn't been
+ # finished yet, it never will be (it's possible that it has
+ # already finished and its callback is waiting in the queue,
+ # where it could still happen if the event loop is restarted).
+ # Unregister it otherwise IocpProactor.close will wait for it
+ # forever
+ if ov is not None:
+ self._proactor._unregister(ov)
+ self._self_reading_future = None
+
async def create_pipe_connection(self, protocol_factory, address):
f = self._proactor.connect_pipe(address)
pipe = await f
@@ -377,7 +377,7 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
elif self._debug:
logger.warning("Accept pipe failed on pipe %r",
pipe, exc_info=True)
- except exceptions.CancelledError:
+ except exceptions.CancelledError:
if pipe:
pipe.close()
else:
@@ -397,9 +397,9 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
**kwargs)
try:
await waiter
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException:
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except BaseException:
transp.close()
await transp._wait()
raise
@@ -478,7 +478,7 @@ class IocpProactor:
else:
ov.ReadFileInto(conn.fileno(), buf)
except BrokenPipeError:
- return self._result(0)
+ return self._result(0)
def finish_recv(trans, key, ov):
try:
@@ -492,44 +492,44 @@ class IocpProactor:
return self._register(ov, conn, finish_recv)
- def recvfrom(self, conn, nbytes, flags=0):
- self._register_with_iocp(conn)
- ov = _overlapped.Overlapped(NULL)
- try:
- ov.WSARecvFrom(conn.fileno(), nbytes, flags)
- except BrokenPipeError:
- return self._result((b'', None))
-
- def finish_recv(trans, key, ov):
- try:
- return ov.getresult()
- except OSError as exc:
- if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
- _overlapped.ERROR_OPERATION_ABORTED):
- raise ConnectionResetError(*exc.args)
- else:
- raise
-
- return self._register(ov, conn, finish_recv)
-
- def sendto(self, conn, buf, flags=0, addr=None):
- self._register_with_iocp(conn)
- ov = _overlapped.Overlapped(NULL)
-
- ov.WSASendTo(conn.fileno(), buf, flags, addr)
-
- def finish_send(trans, key, ov):
- try:
- return ov.getresult()
- except OSError as exc:
- if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
- _overlapped.ERROR_OPERATION_ABORTED):
- raise ConnectionResetError(*exc.args)
- else:
- raise
-
- return self._register(ov, conn, finish_send)
-
+ def recvfrom(self, conn, nbytes, flags=0):
+ self._register_with_iocp(conn)
+ ov = _overlapped.Overlapped(NULL)
+ try:
+ ov.WSARecvFrom(conn.fileno(), nbytes, flags)
+ except BrokenPipeError:
+ return self._result((b'', None))
+
+ def finish_recv(trans, key, ov):
+ try:
+ return ov.getresult()
+ except OSError as exc:
+ if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
+ _overlapped.ERROR_OPERATION_ABORTED):
+ raise ConnectionResetError(*exc.args)
+ else:
+ raise
+
+ return self._register(ov, conn, finish_recv)
+
+ def sendto(self, conn, buf, flags=0, addr=None):
+ self._register_with_iocp(conn)
+ ov = _overlapped.Overlapped(NULL)
+
+ ov.WSASendTo(conn.fileno(), buf, flags, addr)
+
+ def finish_send(trans, key, ov):
+ try:
+ return ov.getresult()
+ except OSError as exc:
+ if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
+ _overlapped.ERROR_OPERATION_ABORTED):
+ raise ConnectionResetError(*exc.args)
+ else:
+ raise
+
+ return self._register(ov, conn, finish_send)
+
def send(self, conn, buf, flags=0):
self._register_with_iocp(conn)
ov = _overlapped.Overlapped(NULL)
@@ -569,7 +569,7 @@ class IocpProactor:
# Coroutine closing the accept socket if the future is cancelled
try:
await future
- except exceptions.CancelledError:
+ except exceptions.CancelledError:
conn.close()
raise
@@ -579,14 +579,14 @@ class IocpProactor:
return future
def connect(self, conn, address):
- if conn.type == socket.SOCK_DGRAM:
- # WSAConnect will complete immediately for UDP sockets so we don't
- # need to register any IOCP operation
- _overlapped.WSAConnect(conn.fileno(), address)
- fut = self._loop.create_future()
- fut.set_result(None)
- return fut
-
+ if conn.type == socket.SOCK_DGRAM:
+ # WSAConnect will complete immediately for UDP sockets so we don't
+ # need to register any IOCP operation
+ _overlapped.WSAConnect(conn.fileno(), address)
+ fut = self._loop.create_future()
+ fut.set_result(None)
+ return fut
+
self._register_with_iocp(conn)
# The socket needs to be locally bound before we call ConnectEx().
try:
@@ -662,7 +662,7 @@ class IocpProactor:
# ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
- await tasks.sleep(delay)
+ await tasks.sleep(delay)
return windows_utils.PipeHandle(handle)
@@ -910,4 +910,4 @@ class WindowsProactorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
_loop_factory = ProactorEventLoop
-DefaultEventLoopPolicy = WindowsProactorEventLoopPolicy
+DefaultEventLoopPolicy = WindowsProactorEventLoopPolicy
diff --git a/contrib/tools/python3/src/Lib/asyncio/windows_utils.py b/contrib/tools/python3/src/Lib/asyncio/windows_utils.py
index 1b1ce0e9cd..ef277fac3e 100644
--- a/contrib/tools/python3/src/Lib/asyncio/windows_utils.py
+++ b/contrib/tools/python3/src/Lib/asyncio/windows_utils.py
@@ -107,9 +107,9 @@ class PipeHandle:
CloseHandle(self._handle)
self._handle = None
- def __del__(self, _warn=warnings.warn):
+ def __del__(self, _warn=warnings.warn):
if self._handle is not None:
- _warn(f"unclosed {self!r}", ResourceWarning, source=self)
+ _warn(f"unclosed {self!r}", ResourceWarning, source=self)
self.close()
def __enter__(self):