diff options
author | shadchin <shadchin@yandex-team.ru> | 2022-02-10 16:44:39 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:44:39 +0300 |
commit | e9656aae26e0358d5378e5b63dcac5c8dbe0e4d0 (patch) | |
tree | 64175d5cadab313b3e7039ebaa06c5bc3295e274 /contrib/tools/python3/src/Lib/asyncio | |
parent | 2598ef1d0aee359b4b6d5fdd1758916d5907d04f (diff) | |
download | ydb-e9656aae26e0358d5378e5b63dcac5c8dbe0e4d0.tar.gz |
Restoring authorship annotation for <shadchin@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'contrib/tools/python3/src/Lib/asyncio')
27 files changed, 2195 insertions, 2195 deletions
diff --git a/contrib/tools/python3/src/Lib/asyncio/__init__.py b/contrib/tools/python3/src/Lib/asyncio/__init__.py index 8a90986b0b..eb84bfb189 100644 --- a/contrib/tools/python3/src/Lib/asyncio/__init__.py +++ b/contrib/tools/python3/src/Lib/asyncio/__init__.py @@ -8,7 +8,7 @@ import sys from .base_events import * from .coroutines import * from .events import * -from .exceptions import * +from .exceptions import * from .futures import * from .locks import * from .protocols import * @@ -17,7 +17,7 @@ from .queues import * from .streams import * from .subprocess import * from .tasks import * -from .threads import * +from .threads import * from .transports import * # Exposed for _asynciomodule.c to implement now deprecated @@ -27,7 +27,7 @@ from .tasks import _all_tasks_compat # NoQA __all__ = (base_events.__all__ + coroutines.__all__ + events.__all__ + - exceptions.__all__ + + exceptions.__all__ + futures.__all__ + locks.__all__ + protocols.__all__ + @@ -36,7 +36,7 @@ __all__ = (base_events.__all__ + streams.__all__ + subprocess.__all__ + tasks.__all__ + - threads.__all__ + + threads.__all__ + transports.__all__) if sys.platform == 'win32': # pragma: no cover diff --git a/contrib/tools/python3/src/Lib/asyncio/__main__.py b/contrib/tools/python3/src/Lib/asyncio/__main__.py index abe8e722dd..18bb87a5bc 100644 --- a/contrib/tools/python3/src/Lib/asyncio/__main__.py +++ b/contrib/tools/python3/src/Lib/asyncio/__main__.py @@ -1,125 +1,125 @@ -import ast -import asyncio -import code -import concurrent.futures -import inspect -import sys -import threading -import types -import warnings - -from . import futures - - -class AsyncIOInteractiveConsole(code.InteractiveConsole): - - def __init__(self, locals, loop): - super().__init__(locals) - self.compile.compiler.flags |= ast.PyCF_ALLOW_TOP_LEVEL_AWAIT - - self.loop = loop - - def runcode(self, code): - future = concurrent.futures.Future() - - def callback(): - global repl_future - global repl_future_interrupted - - repl_future = None - repl_future_interrupted = False - - func = types.FunctionType(code, self.locals) - try: - coro = func() - except SystemExit: - raise - except KeyboardInterrupt as ex: - repl_future_interrupted = True - future.set_exception(ex) - return - except BaseException as ex: - future.set_exception(ex) - return - - if not inspect.iscoroutine(coro): - future.set_result(coro) - return - - try: - repl_future = self.loop.create_task(coro) - futures._chain_future(repl_future, future) - except BaseException as exc: - future.set_exception(exc) - - loop.call_soon_threadsafe(callback) - - try: - return future.result() - except SystemExit: - raise - except BaseException: - if repl_future_interrupted: - self.write("\nKeyboardInterrupt\n") - else: - self.showtraceback() - - -class REPLThread(threading.Thread): - - def run(self): - try: - banner = ( - f'asyncio REPL {sys.version} on {sys.platform}\n' - f'Use "await" directly instead of "asyncio.run()".\n' - f'Type "help", "copyright", "credits" or "license" ' - f'for more information.\n' - f'{getattr(sys, "ps1", ">>> ")}import asyncio' - ) - - console.interact( - banner=banner, - exitmsg='exiting asyncio REPL...') - finally: - warnings.filterwarnings( - 'ignore', - message=r'^coroutine .* was never awaited$', - category=RuntimeWarning) - - loop.call_soon_threadsafe(loop.stop) - - -if __name__ == '__main__': - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - repl_locals = {'asyncio': asyncio} - for key in {'__name__', '__package__', - '__loader__', '__spec__', - '__builtins__', '__file__'}: - repl_locals[key] = locals()[key] - - console = AsyncIOInteractiveConsole(repl_locals, loop) - - repl_future = None - repl_future_interrupted = False - - try: - import readline # NoQA - except ImportError: - pass - - repl_thread = REPLThread() - repl_thread.daemon = True - repl_thread.start() - - while True: - try: - loop.run_forever() - except KeyboardInterrupt: - if repl_future and not repl_future.done(): - repl_future.cancel() - repl_future_interrupted = True - continue - else: - break +import ast +import asyncio +import code +import concurrent.futures +import inspect +import sys +import threading +import types +import warnings + +from . import futures + + +class AsyncIOInteractiveConsole(code.InteractiveConsole): + + def __init__(self, locals, loop): + super().__init__(locals) + self.compile.compiler.flags |= ast.PyCF_ALLOW_TOP_LEVEL_AWAIT + + self.loop = loop + + def runcode(self, code): + future = concurrent.futures.Future() + + def callback(): + global repl_future + global repl_future_interrupted + + repl_future = None + repl_future_interrupted = False + + func = types.FunctionType(code, self.locals) + try: + coro = func() + except SystemExit: + raise + except KeyboardInterrupt as ex: + repl_future_interrupted = True + future.set_exception(ex) + return + except BaseException as ex: + future.set_exception(ex) + return + + if not inspect.iscoroutine(coro): + future.set_result(coro) + return + + try: + repl_future = self.loop.create_task(coro) + futures._chain_future(repl_future, future) + except BaseException as exc: + future.set_exception(exc) + + loop.call_soon_threadsafe(callback) + + try: + return future.result() + except SystemExit: + raise + except BaseException: + if repl_future_interrupted: + self.write("\nKeyboardInterrupt\n") + else: + self.showtraceback() + + +class REPLThread(threading.Thread): + + def run(self): + try: + banner = ( + f'asyncio REPL {sys.version} on {sys.platform}\n' + f'Use "await" directly instead of "asyncio.run()".\n' + f'Type "help", "copyright", "credits" or "license" ' + f'for more information.\n' + f'{getattr(sys, "ps1", ">>> ")}import asyncio' + ) + + console.interact( + banner=banner, + exitmsg='exiting asyncio REPL...') + finally: + warnings.filterwarnings( + 'ignore', + message=r'^coroutine .* was never awaited$', + category=RuntimeWarning) + + loop.call_soon_threadsafe(loop.stop) + + +if __name__ == '__main__': + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + repl_locals = {'asyncio': asyncio} + for key in {'__name__', '__package__', + '__loader__', '__spec__', + '__builtins__', '__file__'}: + repl_locals[key] = locals()[key] + + console = AsyncIOInteractiveConsole(repl_locals, loop) + + repl_future = None + repl_future_interrupted = False + + try: + import readline # NoQA + except ImportError: + pass + + repl_thread = REPLThread() + repl_thread.daemon = True + repl_thread.start() + + while True: + try: + loop.run_forever() + except KeyboardInterrupt: + if repl_future and not repl_future.done(): + repl_future.cancel() + repl_future_interrupted = True + continue + else: + break diff --git a/contrib/tools/python3/src/Lib/asyncio/base_events.py b/contrib/tools/python3/src/Lib/asyncio/base_events.py index 34fdbf2146..8c1fb49694 100644 --- a/contrib/tools/python3/src/Lib/asyncio/base_events.py +++ b/contrib/tools/python3/src/Lib/asyncio/base_events.py @@ -16,12 +16,12 @@ to modify the meaning of the API call itself. import collections import collections.abc import concurrent.futures -import functools +import functools import heapq import itertools import os import socket -import stat +import stat import subprocess import threading import time @@ -38,14 +38,14 @@ except ImportError: # pragma: no cover from . import constants from . import coroutines from . import events -from . import exceptions +from . import exceptions from . import futures from . import protocols from . import sslproto -from . import staggered +from . import staggered from . import tasks from . import transports -from . import trsock +from . import trsock from .log import logger @@ -60,17 +60,17 @@ _MIN_SCHEDULED_TIMER_HANDLES = 100 # before cleanup of cancelled handles is performed. _MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5 - + _HAS_IPv6 = hasattr(socket, 'AF_INET6') # Maximum timeout passed to select to avoid OS limitations MAXIMUM_SELECT_TIMEOUT = 24 * 3600 -# Used for deprecation and removal of `loop.create_datagram_endpoint()`'s -# *reuse_address* parameter -_unset = object() +# Used for deprecation and removal of `loop.create_datagram_endpoint()`'s +# *reuse_address* parameter +_unset = object() + - def _format_handle(handle): cb = handle._callback if isinstance(getattr(cb, '__self__', None), tasks.Task): @@ -100,7 +100,7 @@ def _set_reuseport(sock): 'SO_REUSEPORT defined but not implemented.') -def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0): +def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0): # Try to skip getaddrinfo if "host" is already an IP. Users might have # handled name resolution in their own code and pass in resolved IPs. if not hasattr(socket, 'inet_pton'): @@ -149,7 +149,7 @@ def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0): socket.inet_pton(af, host) # The host has already been resolved. if _HAS_IPv6 and af == socket.AF_INET6: - return af, type, proto, '', (host, port, flowinfo, scopeid) + return af, type, proto, '', (host, port, flowinfo, scopeid) else: return af, type, proto, '', (host, port) except OSError: @@ -159,32 +159,32 @@ def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0): return None -def _interleave_addrinfos(addrinfos, first_address_family_count=1): - """Interleave list of addrinfo tuples by family.""" - # Group addresses by family - addrinfos_by_family = collections.OrderedDict() - for addr in addrinfos: - family = addr[0] - if family not in addrinfos_by_family: - addrinfos_by_family[family] = [] - addrinfos_by_family[family].append(addr) - addrinfos_lists = list(addrinfos_by_family.values()) - - reordered = [] - if first_address_family_count > 1: - reordered.extend(addrinfos_lists[0][:first_address_family_count - 1]) - del addrinfos_lists[0][:first_address_family_count - 1] - reordered.extend( - a for a in itertools.chain.from_iterable( - itertools.zip_longest(*addrinfos_lists) - ) if a is not None) - return reordered - - +def _interleave_addrinfos(addrinfos, first_address_family_count=1): + """Interleave list of addrinfo tuples by family.""" + # Group addresses by family + addrinfos_by_family = collections.OrderedDict() + for addr in addrinfos: + family = addr[0] + if family not in addrinfos_by_family: + addrinfos_by_family[family] = [] + addrinfos_by_family[family].append(addr) + addrinfos_lists = list(addrinfos_by_family.values()) + + reordered = [] + if first_address_family_count > 1: + reordered.extend(addrinfos_lists[0][:first_address_family_count - 1]) + del addrinfos_lists[0][:first_address_family_count - 1] + reordered.extend( + a for a in itertools.chain.from_iterable( + itertools.zip_longest(*addrinfos_lists) + ) if a is not None) + return reordered + + def _run_until_complete_cb(fut): if not fut.cancelled(): exc = fut.exception() - if isinstance(exc, (SystemExit, KeyboardInterrupt)): + if isinstance(exc, (SystemExit, KeyboardInterrupt)): # Issue #22429: run_forever() already finished, no need to # stop it. return @@ -324,8 +324,8 @@ class Server(events.AbstractServer): @property def sockets(self): if self._sockets is None: - return () - return tuple(trsock.TransportSocket(s) for s in self._sockets) + return () + return tuple(trsock.TransportSocket(s) for s in self._sockets) def close(self): sockets = self._sockets @@ -350,7 +350,7 @@ class Server(events.AbstractServer): self._start_serving() # Skip one loop iteration so that all 'loop.add_reader' # go through. - await tasks.sleep(0) + await tasks.sleep(0) async def serve_forever(self): if self._serving_forever_fut is not None: @@ -364,7 +364,7 @@ class Server(events.AbstractServer): try: await self._serving_forever_fut - except exceptions.CancelledError: + except exceptions.CancelledError: try: self.close() await self.wait_closed() @@ -410,8 +410,8 @@ class BaseEventLoop(events.AbstractEventLoop): self._asyncgens = weakref.WeakSet() # Set to True when `loop.shutdown_asyncgens` is called. self._asyncgens_shutdown_called = False - # Set to True when `loop.shutdown_default_executor` is called. - self._executor_shutdown_called = False + # Set to True when `loop.shutdown_default_executor` is called. + self._executor_shutdown_called = False def __repr__(self): return ( @@ -423,20 +423,20 @@ class BaseEventLoop(events.AbstractEventLoop): """Create a Future object attached to the loop.""" return futures.Future(loop=self) - def create_task(self, coro, *, name=None): + def create_task(self, coro, *, name=None): """Schedule a coroutine object. Return a task object. """ self._check_closed() if self._task_factory is None: - task = tasks.Task(coro, loop=self, name=name) + task = tasks.Task(coro, loop=self, name=name) if task._source_traceback: del task._source_traceback[-1] else: task = self._task_factory(self, coro) - tasks._set_task_name(task, name) - + tasks._set_task_name(task, name) + return task def set_task_factory(self, factory): @@ -509,10 +509,10 @@ class BaseEventLoop(events.AbstractEventLoop): if self._closed: raise RuntimeError('Event loop is closed') - def _check_default_executor(self): - if self._executor_shutdown_called: - raise RuntimeError('Executor shutdown has been called') - + def _check_default_executor(self): + if self._executor_shutdown_called: + raise RuntimeError('Executor shutdown has been called') + def _asyncgen_finalizer_hook(self, agen): self._asyncgens.discard(agen) if not self.is_closed(): @@ -539,7 +539,7 @@ class BaseEventLoop(events.AbstractEventLoop): closing_agens = list(self._asyncgens) self._asyncgens.clear() - results = await tasks._gather( + results = await tasks._gather( *[ag.aclose() for ag in closing_agens], return_exceptions=True, loop=self) @@ -553,37 +553,37 @@ class BaseEventLoop(events.AbstractEventLoop): 'asyncgen': agen }) - async def shutdown_default_executor(self): - """Schedule the shutdown of the default executor.""" - self._executor_shutdown_called = True - if self._default_executor is None: - return - future = self.create_future() - thread = threading.Thread(target=self._do_shutdown, args=(future,)) - thread.start() - try: - await future - finally: - thread.join() - - def _do_shutdown(self, future): - try: - self._default_executor.shutdown(wait=True) - self.call_soon_threadsafe(future.set_result, None) - except Exception as ex: - self.call_soon_threadsafe(future.set_exception, ex) - - def _check_running(self): + async def shutdown_default_executor(self): + """Schedule the shutdown of the default executor.""" + self._executor_shutdown_called = True + if self._default_executor is None: + return + future = self.create_future() + thread = threading.Thread(target=self._do_shutdown, args=(future,)) + thread.start() + try: + await future + finally: + thread.join() + + def _do_shutdown(self, future): + try: + self._default_executor.shutdown(wait=True) + self.call_soon_threadsafe(future.set_result, None) + except Exception as ex: + self.call_soon_threadsafe(future.set_exception, ex) + + def _check_running(self): if self.is_running(): raise RuntimeError('This event loop is already running') if events._get_running_loop() is not None: raise RuntimeError( 'Cannot run the event loop while another loop is running') - - def run_forever(self): - """Run until stop() is called.""" - self._check_closed() - self._check_running() + + def run_forever(self): + """Run until stop() is called.""" + self._check_closed() + self._check_running() self._set_coroutine_origin_tracking(self._debug) self._thread_id = threading.get_ident() @@ -615,7 +615,7 @@ class BaseEventLoop(events.AbstractEventLoop): Return the Future's result, or raise its exception. """ self._check_closed() - self._check_running() + self._check_running() new_task = not futures.isfuture(future) future = tasks.ensure_future(future, loop=self) @@ -666,7 +666,7 @@ class BaseEventLoop(events.AbstractEventLoop): self._closed = True self._ready.clear() self._scheduled.clear() - self._executor_shutdown_called = True + self._executor_shutdown_called = True executor = self._default_executor if executor is not None: self._default_executor = None @@ -676,9 +676,9 @@ class BaseEventLoop(events.AbstractEventLoop): """Returns True if the event loop was closed.""" return self._closed - def __del__(self, _warn=warnings.warn): + def __del__(self, _warn=warnings.warn): if not self.is_closed(): - _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self) + _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self) if not self.is_running(): self.close() @@ -803,23 +803,23 @@ class BaseEventLoop(events.AbstractEventLoop): self._check_callback(func, 'run_in_executor') if executor is None: executor = self._default_executor - # Only check when the default executor is being used - self._check_default_executor() + # Only check when the default executor is being used + self._check_default_executor() if executor is None: - executor = concurrent.futures.ThreadPoolExecutor( - thread_name_prefix='asyncio' - ) + executor = concurrent.futures.ThreadPoolExecutor( + thread_name_prefix='asyncio' + ) self._default_executor = executor return futures.wrap_future( executor.submit(func, *args), loop=self) def set_default_executor(self, executor): - if not isinstance(executor, concurrent.futures.ThreadPoolExecutor): - warnings.warn( - 'Using the default executor that is not an instance of ' - 'ThreadPoolExecutor is deprecated and will be prohibited ' - 'in Python 3.9', - DeprecationWarning, 2) + if not isinstance(executor, concurrent.futures.ThreadPoolExecutor): + warnings.warn( + 'Using the default executor that is not an instance of ' + 'ThreadPoolExecutor is deprecated and will be prohibited ' + 'in Python 3.9', + DeprecationWarning, 2) self._default_executor = executor def _getaddrinfo_debug(self, host, port, family, type, proto, flags): @@ -868,7 +868,7 @@ class BaseEventLoop(events.AbstractEventLoop): try: return await self._sock_sendfile_native(sock, file, offset, count) - except exceptions.SendfileNotAvailableError as exc: + except exceptions.SendfileNotAvailableError as exc: if not fallback: raise return await self._sock_sendfile_fallback(sock, file, @@ -877,7 +877,7 @@ class BaseEventLoop(events.AbstractEventLoop): async def _sock_sendfile_native(self, sock, file, offset, count): # NB: sendfile syscall is not supported for SSL sockets and # non-mmap files even if sendfile is supported by OS - raise exceptions.SendfileNotAvailableError( + raise exceptions.SendfileNotAvailableError( f"syscall sendfile is not available for socket {sock!r} " "and file {file!r} combination") @@ -900,7 +900,7 @@ class BaseEventLoop(events.AbstractEventLoop): read = await self.run_in_executor(None, file.readinto, view) if not read: break # EOF - await self.sock_sendall(sock, view[:read]) + await self.sock_sendall(sock, view[:read]) total_sent += read return total_sent finally: @@ -928,49 +928,49 @@ class BaseEventLoop(events.AbstractEventLoop): "offset must be a non-negative integer (got {!r})".format( offset)) - async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None): - """Create, bind and connect one socket.""" - my_exceptions = [] - exceptions.append(my_exceptions) - family, type_, proto, _, address = addr_info - sock = None - try: - sock = socket.socket(family=family, type=type_, proto=proto) - sock.setblocking(False) - if local_addr_infos is not None: - for _, _, _, _, laddr in local_addr_infos: - try: - sock.bind(laddr) - break - except OSError as exc: - msg = ( - f'error while attempting to bind on ' - f'address {laddr!r}: ' - f'{exc.strerror.lower()}' - ) - exc = OSError(exc.errno, msg) - my_exceptions.append(exc) - else: # all bind attempts failed - raise my_exceptions.pop() - await self.sock_connect(sock, address) - return sock - except OSError as exc: - my_exceptions.append(exc) - if sock is not None: - sock.close() - raise - except: - if sock is not None: - sock.close() - raise - + async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None): + """Create, bind and connect one socket.""" + my_exceptions = [] + exceptions.append(my_exceptions) + family, type_, proto, _, address = addr_info + sock = None + try: + sock = socket.socket(family=family, type=type_, proto=proto) + sock.setblocking(False) + if local_addr_infos is not None: + for _, _, _, _, laddr in local_addr_infos: + try: + sock.bind(laddr) + break + except OSError as exc: + msg = ( + f'error while attempting to bind on ' + f'address {laddr!r}: ' + f'{exc.strerror.lower()}' + ) + exc = OSError(exc.errno, msg) + my_exceptions.append(exc) + else: # all bind attempts failed + raise my_exceptions.pop() + await self.sock_connect(sock, address) + return sock + except OSError as exc: + my_exceptions.append(exc) + if sock is not None: + sock.close() + raise + except: + if sock is not None: + sock.close() + raise + async def create_connection( self, protocol_factory, host=None, port=None, *, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, - ssl_handshake_timeout=None, - happy_eyeballs_delay=None, interleave=None): + ssl_handshake_timeout=None, + happy_eyeballs_delay=None, interleave=None): """Connect to a TCP server. Create a streaming transport connection to a given Internet host and @@ -1005,10 +1005,10 @@ class BaseEventLoop(events.AbstractEventLoop): raise ValueError( 'ssl_handshake_timeout is only meaningful with ssl') - if happy_eyeballs_delay is not None and interleave is None: - # If using happy eyeballs, default to interleave addresses by family - interleave = 1 - + if happy_eyeballs_delay is not None and interleave is None: + # If using happy eyeballs, default to interleave addresses by family + interleave = 1 + if host is not None or port is not None: if sock is not None: raise ValueError( @@ -1027,31 +1027,31 @@ class BaseEventLoop(events.AbstractEventLoop): flags=flags, loop=self) if not laddr_infos: raise OSError('getaddrinfo() returned empty list') - else: - laddr_infos = None + else: + laddr_infos = None + + if interleave: + infos = _interleave_addrinfos(infos, interleave) - if interleave: - infos = _interleave_addrinfos(infos, interleave) - exceptions = [] - if happy_eyeballs_delay is None: - # not using happy eyeballs - for addrinfo in infos: - try: - sock = await self._connect_sock( - exceptions, addrinfo, laddr_infos) - break - except OSError: - continue - else: # using happy eyeballs - sock, _, _ = await staggered.staggered_race( - (functools.partial(self._connect_sock, - exceptions, addrinfo, laddr_infos) - for addrinfo in infos), - happy_eyeballs_delay, loop=self) - - if sock is None: - exceptions = [exc for sub in exceptions for exc in sub] + if happy_eyeballs_delay is None: + # not using happy eyeballs + for addrinfo in infos: + try: + sock = await self._connect_sock( + exceptions, addrinfo, laddr_infos) + break + except OSError: + continue + else: # using happy eyeballs + sock, _, _ = await staggered.staggered_race( + (functools.partial(self._connect_sock, + exceptions, addrinfo, laddr_infos) + for addrinfo in infos), + happy_eyeballs_delay, loop=self) + + if sock is None: + exceptions = [exc for sub in exceptions for exc in sub] if len(exceptions) == 1: raise exceptions[0] else: @@ -1150,7 +1150,7 @@ class BaseEventLoop(events.AbstractEventLoop): try: return await self._sendfile_native(transport, file, offset, count) - except exceptions.SendfileNotAvailableError as exc: + except exceptions.SendfileNotAvailableError as exc: if not fallback: raise @@ -1163,7 +1163,7 @@ class BaseEventLoop(events.AbstractEventLoop): offset, count) async def _sendfile_native(self, transp, file, offset, count): - raise exceptions.SendfileNotAvailableError( + raise exceptions.SendfileNotAvailableError( "sendfile syscall is not supported") async def _sendfile_fallback(self, transp, file, offset, count): @@ -1180,11 +1180,11 @@ class BaseEventLoop(events.AbstractEventLoop): if blocksize <= 0: return total_sent view = memoryview(buf)[:blocksize] - read = await self.run_in_executor(None, file.readinto, view) + read = await self.run_in_executor(None, file.readinto, view) if not read: return total_sent # EOF await proto.drain() - transp.write(view[:read]) + transp.write(view[:read]) total_sent += read finally: if total_sent > 0 and hasattr(file, 'seek'): @@ -1229,7 +1229,7 @@ class BaseEventLoop(events.AbstractEventLoop): try: await waiter - except BaseException: + except BaseException: transport.close() conmade_cb.cancel() resume_cb.cancel() @@ -1240,7 +1240,7 @@ class BaseEventLoop(events.AbstractEventLoop): async def create_datagram_endpoint(self, protocol_factory, local_addr=None, remote_addr=None, *, family=0, proto=0, flags=0, - reuse_address=_unset, reuse_port=None, + reuse_address=_unset, reuse_port=None, allow_broadcast=None, sock=None): """Create datagram connection.""" if sock is not None: @@ -1249,7 +1249,7 @@ class BaseEventLoop(events.AbstractEventLoop): f'A UDP Socket was expected, got {sock!r}') if (local_addr or remote_addr or family or proto or flags or - reuse_port or allow_broadcast): + reuse_port or allow_broadcast): # show the problematic kwargs in exception msg opts = dict(local_addr=local_addr, remote_addr=remote_addr, family=family, proto=proto, flags=flags, @@ -1270,28 +1270,28 @@ class BaseEventLoop(events.AbstractEventLoop): for addr in (local_addr, remote_addr): if addr is not None and not isinstance(addr, str): raise TypeError('string is expected') - - if local_addr and local_addr[0] not in (0, '\x00'): - try: - if stat.S_ISSOCK(os.stat(local_addr).st_mode): - os.remove(local_addr) - except FileNotFoundError: - pass - except OSError as err: - # Directory may have permissions only to create socket. - logger.error('Unable to check or remove stale UNIX ' - 'socket %r: %r', - local_addr, err) - + + if local_addr and local_addr[0] not in (0, '\x00'): + try: + if stat.S_ISSOCK(os.stat(local_addr).st_mode): + os.remove(local_addr) + except FileNotFoundError: + pass + except OSError as err: + # Directory may have permissions only to create socket. + logger.error('Unable to check or remove stale UNIX ' + 'socket %r: %r', + local_addr, err) + addr_pairs_info = (((family, proto), (local_addr, remote_addr)), ) else: # join address by (family, protocol) - addr_infos = {} # Using order preserving dict + addr_infos = {} # Using order preserving dict for idx, addr in ((0, local_addr), (1, remote_addr)): if addr is not None: - if not (isinstance(addr, tuple) and len(addr) == 2): - raise TypeError('2-tuple is expected') + if not (isinstance(addr, tuple) and len(addr) == 2): + raise TypeError('2-tuple is expected') infos = await self._ensure_resolved( addr, family=family, type=socket.SOCK_DGRAM, @@ -1316,18 +1316,18 @@ class BaseEventLoop(events.AbstractEventLoop): exceptions = [] - # bpo-37228 - if reuse_address is not _unset: - if reuse_address: - raise ValueError("Passing `reuse_address=True` is no " - "longer supported, as the usage of " - "SO_REUSEPORT in UDP poses a significant " - "security concern.") - else: - warnings.warn("The *reuse_address* parameter has been " - "deprecated as of 3.5.10 and is scheduled " - "for removal in 3.11.", DeprecationWarning, - stacklevel=2) + # bpo-37228 + if reuse_address is not _unset: + if reuse_address: + raise ValueError("Passing `reuse_address=True` is no " + "longer supported, as the usage of " + "SO_REUSEPORT in UDP poses a significant " + "security concern.") + else: + warnings.warn("The *reuse_address* parameter has been " + "deprecated as of 3.5.10 and is scheduled " + "for removal in 3.11.", DeprecationWarning, + stacklevel=2) for ((family, proto), (local_address, remote_address)) in addr_pairs_info: @@ -1346,8 +1346,8 @@ class BaseEventLoop(events.AbstractEventLoop): if local_addr: sock.bind(local_address) if remote_addr: - if not allow_broadcast: - await self.sock_connect(sock, remote_address) + if not allow_broadcast: + await self.sock_connect(sock, remote_address) r_addr = remote_address except OSError as exc: if sock is not None: @@ -1388,7 +1388,7 @@ class BaseEventLoop(events.AbstractEventLoop): family=0, type=socket.SOCK_STREAM, proto=0, flags=0, loop): host, port = address[:2] - info = _ipaddr_info(host, port, family, type, proto, *address[2:]) + info = _ipaddr_info(host, port, family, type, proto, *address[2:]) if info is not None: # "host" is already a resolved IP. return [info] @@ -1457,7 +1457,7 @@ class BaseEventLoop(events.AbstractEventLoop): fs = [self._create_server_getaddrinfo(host, port, family=family, flags=flags) for host in hosts] - infos = await tasks._gather(*fs, loop=self) + infos = await tasks._gather(*fs, loop=self) infos = set(itertools.chain.from_iterable(infos)) completed = False @@ -1515,7 +1515,7 @@ class BaseEventLoop(events.AbstractEventLoop): server._start_serving() # Skip one loop iteration so that all 'loop.add_reader' # go through. - await tasks.sleep(0) + await tasks.sleep(0) if self._debug: logger.info("%r is serving", server) @@ -1601,7 +1601,7 @@ class BaseEventLoop(events.AbstractEventLoop): stderr=subprocess.PIPE, universal_newlines=False, shell=True, bufsize=0, - encoding=None, errors=None, text=None, + encoding=None, errors=None, text=None, **kwargs): if not isinstance(cmd, (bytes, str)): raise ValueError("cmd must be a string") @@ -1611,13 +1611,13 @@ class BaseEventLoop(events.AbstractEventLoop): raise ValueError("shell must be True") if bufsize != 0: raise ValueError("bufsize must be 0") - if text: - raise ValueError("text must be False") - if encoding is not None: - raise ValueError("encoding must be None") - if errors is not None: - raise ValueError("errors must be None") - + if text: + raise ValueError("text must be False") + if encoding is not None: + raise ValueError("encoding must be None") + if errors is not None: + raise ValueError("errors must be None") + protocol = protocol_factory() debug_log = None if self._debug: @@ -1634,22 +1634,22 @@ class BaseEventLoop(events.AbstractEventLoop): async def subprocess_exec(self, protocol_factory, program, *args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=False, - shell=False, bufsize=0, - encoding=None, errors=None, text=None, - **kwargs): + shell=False, bufsize=0, + encoding=None, errors=None, text=None, + **kwargs): if universal_newlines: raise ValueError("universal_newlines must be False") if shell: raise ValueError("shell must be False") if bufsize != 0: raise ValueError("bufsize must be 0") - if text: - raise ValueError("text must be False") - if encoding is not None: - raise ValueError("encoding must be None") - if errors is not None: - raise ValueError("errors must be None") - + if text: + raise ValueError("text must be False") + if encoding is not None: + raise ValueError("encoding must be None") + if errors is not None: + raise ValueError("errors must be None") + popen_args = (program,) + args protocol = protocol_factory() debug_log = None @@ -1762,9 +1762,9 @@ class BaseEventLoop(events.AbstractEventLoop): if self._exception_handler is None: try: self.default_exception_handler(context) - except (SystemExit, KeyboardInterrupt): - raise - except BaseException: + except (SystemExit, KeyboardInterrupt): + raise + except BaseException: # Second protection layer for unexpected errors # in the default implementation, as well as for subclassed # event loops with overloaded "default_exception_handler". @@ -1773,9 +1773,9 @@ class BaseEventLoop(events.AbstractEventLoop): else: try: self._exception_handler(self, context) - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as exc: + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: # Exception in the user set custom exception handler. try: # Let's try default handler. @@ -1784,9 +1784,9 @@ class BaseEventLoop(events.AbstractEventLoop): 'exception': exc, 'context': context, }) - except (SystemExit, KeyboardInterrupt): - raise - except BaseException: + except (SystemExit, KeyboardInterrupt): + raise + except BaseException: # Guard 'default_exception_handler' in case it is # overloaded. logger.error('Exception in default exception handler ' @@ -1851,7 +1851,7 @@ class BaseEventLoop(events.AbstractEventLoop): when = self._scheduled[0]._when timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT) - event_list = self._selector.select(timeout) + event_list = self._selector.select(timeout) self._process_events(event_list) # Handle 'later' callbacks that are ready. diff --git a/contrib/tools/python3/src/Lib/asyncio/base_futures.py b/contrib/tools/python3/src/Lib/asyncio/base_futures.py index 0962405d8d..2c01ac98e1 100644 --- a/contrib/tools/python3/src/Lib/asyncio/base_futures.py +++ b/contrib/tools/python3/src/Lib/asyncio/base_futures.py @@ -1,7 +1,7 @@ __all__ = () import reprlib -from _thread import get_ident +from _thread import get_ident from . import format_helpers @@ -42,16 +42,16 @@ def _format_callbacks(cb): return f'cb=[{cb}]' -# bpo-42183: _repr_running is needed for repr protection -# when a Future or Task result contains itself directly or indirectly. -# The logic is borrowed from @reprlib.recursive_repr decorator. -# Unfortunately, the direct decorator usage is impossible because of -# AttributeError: '_asyncio.Task' object has no attribute '__module__' error. -# -# After fixing this thing we can return to the decorator based approach. -_repr_running = set() - - +# bpo-42183: _repr_running is needed for repr protection +# when a Future or Task result contains itself directly or indirectly. +# The logic is borrowed from @reprlib.recursive_repr decorator. +# Unfortunately, the direct decorator usage is impossible because of +# AttributeError: '_asyncio.Task' object has no attribute '__module__' error. +# +# After fixing this thing we can return to the decorator based approach. +_repr_running = set() + + def _future_repr_info(future): # (Future) -> str """helper function for Future.__repr__""" @@ -60,17 +60,17 @@ def _future_repr_info(future): if future._exception is not None: info.append(f'exception={future._exception!r}') else: - key = id(future), get_ident() - if key in _repr_running: - result = '...' - else: - _repr_running.add(key) - try: - # use reprlib to limit the length of the output, especially - # for very long strings - result = reprlib.repr(future._result) - finally: - _repr_running.discard(key) + key = id(future), get_ident() + if key in _repr_running: + result = '...' + else: + _repr_running.add(key) + try: + # use reprlib to limit the length of the output, especially + # for very long strings + result = reprlib.repr(future._result) + finally: + _repr_running.discard(key) info.append(f'result={result}') if future._callbacks: info.append(_format_callbacks(future._callbacks)) diff --git a/contrib/tools/python3/src/Lib/asyncio/base_subprocess.py b/contrib/tools/python3/src/Lib/asyncio/base_subprocess.py index 05daeedd37..14d5051922 100644 --- a/contrib/tools/python3/src/Lib/asyncio/base_subprocess.py +++ b/contrib/tools/python3/src/Lib/asyncio/base_subprocess.py @@ -120,9 +120,9 @@ class BaseSubprocessTransport(transports.SubprocessTransport): # Don't clear the _proc reference yet: _post_init() may still run - def __del__(self, _warn=warnings.warn): + def __del__(self, _warn=warnings.warn): if not self._closed: - _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) + _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) self.close() def get_pid(self): @@ -182,9 +182,9 @@ class BaseSubprocessTransport(transports.SubprocessTransport): for callback, data in self._pending_calls: loop.call_soon(callback, *data) self._pending_calls = None - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as exc: + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: if waiter is not None and not waiter.cancelled(): waiter.set_exception(exc) else: diff --git a/contrib/tools/python3/src/Lib/asyncio/base_tasks.py b/contrib/tools/python3/src/Lib/asyncio/base_tasks.py index d16f3f7be2..09bb171a2c 100644 --- a/contrib/tools/python3/src/Lib/asyncio/base_tasks.py +++ b/contrib/tools/python3/src/Lib/asyncio/base_tasks.py @@ -12,30 +12,30 @@ def _task_repr_info(task): # replace status info[0] = 'cancelling' - info.insert(1, 'name=%r' % task.get_name()) - + info.insert(1, 'name=%r' % task.get_name()) + coro = coroutines._format_coroutine(task._coro) - info.insert(2, f'coro=<{coro}>') + info.insert(2, f'coro=<{coro}>') if task._fut_waiter is not None: - info.insert(3, f'wait_for={task._fut_waiter!r}') + info.insert(3, f'wait_for={task._fut_waiter!r}') return info def _task_get_stack(task, limit): frames = [] - if hasattr(task._coro, 'cr_frame'): - # case 1: 'async def' coroutines + if hasattr(task._coro, 'cr_frame'): + # case 1: 'async def' coroutines f = task._coro.cr_frame - elif hasattr(task._coro, 'gi_frame'): - # case 2: legacy coroutines + elif hasattr(task._coro, 'gi_frame'): + # case 2: legacy coroutines f = task._coro.gi_frame - elif hasattr(task._coro, 'ag_frame'): - # case 3: async generators - f = task._coro.ag_frame - else: - # case 4: unknown objects - f = None + elif hasattr(task._coro, 'ag_frame'): + # case 3: async generators + f = task._coro.ag_frame + else: + # case 4: unknown objects + f = None if f is not None: while f is not None: if limit is not None: diff --git a/contrib/tools/python3/src/Lib/asyncio/coroutines.py b/contrib/tools/python3/src/Lib/asyncio/coroutines.py index ea92c8c95b..9664ea74d7 100644 --- a/contrib/tools/python3/src/Lib/asyncio/coroutines.py +++ b/contrib/tools/python3/src/Lib/asyncio/coroutines.py @@ -7,7 +7,7 @@ import os import sys import traceback import types -import warnings +import warnings from . import base_futures from . import constants @@ -108,9 +108,9 @@ def coroutine(func): If the coroutine is not yielded from before it is destroyed, an error message is logged. """ - warnings.warn('"@coroutine" decorator is deprecated since Python 3.8, use "async def" instead', - DeprecationWarning, - stacklevel=2) + warnings.warn('"@coroutine" decorator is deprecated since Python 3.8, use "async def" instead', + DeprecationWarning, + stacklevel=2) if inspect.iscoroutinefunction(func): # In Python 3.5 that's all we need to do for coroutines # defined with "async def". diff --git a/contrib/tools/python3/src/Lib/asyncio/events.py b/contrib/tools/python3/src/Lib/asyncio/events.py index 2e806c1517..413ff2aaa6 100644 --- a/contrib/tools/python3/src/Lib/asyncio/events.py +++ b/contrib/tools/python3/src/Lib/asyncio/events.py @@ -3,7 +3,7 @@ __all__ = ( 'AbstractEventLoopPolicy', 'AbstractEventLoop', 'AbstractServer', - 'Handle', 'TimerHandle', + 'Handle', 'TimerHandle', 'get_event_loop_policy', 'set_event_loop_policy', 'get_event_loop', 'set_event_loop', 'new_event_loop', 'get_child_watcher', 'set_child_watcher', @@ -78,9 +78,9 @@ class Handle: def _run(self): try: self._context.run(self._callback, *self._args) - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as exc: + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: cb = format_helpers._format_callback_source( self._callback, self._args) msg = f'Exception in callback {cb}' @@ -118,24 +118,24 @@ class TimerHandle(Handle): return hash(self._when) def __lt__(self, other): - if isinstance(other, TimerHandle): - return self._when < other._when - return NotImplemented + if isinstance(other, TimerHandle): + return self._when < other._when + return NotImplemented def __le__(self, other): - if isinstance(other, TimerHandle): - return self._when < other._when or self.__eq__(other) - return NotImplemented + if isinstance(other, TimerHandle): + return self._when < other._when or self.__eq__(other) + return NotImplemented def __gt__(self, other): - if isinstance(other, TimerHandle): - return self._when > other._when - return NotImplemented + if isinstance(other, TimerHandle): + return self._when > other._when + return NotImplemented def __ge__(self, other): - if isinstance(other, TimerHandle): - return self._when > other._when or self.__eq__(other) - return NotImplemented + if isinstance(other, TimerHandle): + return self._when > other._when or self.__eq__(other) + return NotImplemented def __eq__(self, other): if isinstance(other, TimerHandle): @@ -248,23 +248,23 @@ class AbstractEventLoop: """Shutdown all active asynchronous generators.""" raise NotImplementedError - async def shutdown_default_executor(self): - """Schedule the shutdown of the default executor.""" - raise NotImplementedError - + async def shutdown_default_executor(self): + """Schedule the shutdown of the default executor.""" + raise NotImplementedError + # Methods scheduling callbacks. All these return Handles. def _timer_handle_cancelled(self, handle): """Notification that a TimerHandle has been cancelled.""" raise NotImplementedError - def call_soon(self, callback, *args, context=None): - return self.call_later(0, callback, *args, context=context) + def call_soon(self, callback, *args, context=None): + return self.call_later(0, callback, *args, context=context) - def call_later(self, delay, callback, *args, context=None): + def call_later(self, delay, callback, *args, context=None): raise NotImplementedError - def call_at(self, when, callback, *args, context=None): + def call_at(self, when, callback, *args, context=None): raise NotImplementedError def time(self): @@ -275,15 +275,15 @@ class AbstractEventLoop: # Method scheduling a coroutine object: create a task. - def create_task(self, coro, *, name=None): + def create_task(self, coro, *, name=None): raise NotImplementedError # Methods for interacting with threads. - def call_soon_threadsafe(self, callback, *args, context=None): + def call_soon_threadsafe(self, callback, *args, context=None): raise NotImplementedError - def run_in_executor(self, executor, func, *args): + def run_in_executor(self, executor, func, *args): raise NotImplementedError def set_default_executor(self, executor): @@ -303,8 +303,8 @@ class AbstractEventLoop: *, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, - ssl_handshake_timeout=None, - happy_eyeballs_delay=None, interleave=None): + ssl_handshake_timeout=None, + happy_eyeballs_delay=None, interleave=None): raise NotImplementedError async def create_server( @@ -396,7 +396,7 @@ class AbstractEventLoop: The return value is a Server object, which can be used to stop the service. - path is a str, representing a file system path to bind the + path is a str, representing a file system path to bind the server socket to. sock can optionally be specified in order to use a preexisting @@ -465,7 +465,7 @@ class AbstractEventLoop: # The reason to accept file-like object instead of just file descriptor # is: we need to own pipe and close it at transport finishing # Can got complicated errors if pass f.fileno(), - # close fd in pipe transport then close f and vice versa. + # close fd in pipe transport then close f and vice versa. raise NotImplementedError async def connect_write_pipe(self, protocol_factory, pipe): @@ -478,7 +478,7 @@ class AbstractEventLoop: # The reason to accept file-like object instead of just file descriptor # is: we need to own pipe and close it at transport finishing # Can got complicated errors if pass f.fileno(), - # close fd in pipe transport then close f and vice versa. + # close fd in pipe transport then close f and vice versa. raise NotImplementedError async def subprocess_shell(self, protocol_factory, cmd, *, @@ -629,13 +629,13 @@ class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy): self._local = self._Local() def get_event_loop(self): - """Get the event loop for the current context. + """Get the event loop for the current context. - Returns an instance of EventLoop or raises an exception. + Returns an instance of EventLoop or raises an exception. """ if (self._local._loop is None and not self._local._set_called and - threading.current_thread() is threading.main_thread()): + threading.current_thread() is threading.main_thread()): self.set_event_loop(self.new_event_loop()) if self._local._loop is None: diff --git a/contrib/tools/python3/src/Lib/asyncio/exceptions.py b/contrib/tools/python3/src/Lib/asyncio/exceptions.py index 0957fe6138..f07e448657 100644 --- a/contrib/tools/python3/src/Lib/asyncio/exceptions.py +++ b/contrib/tools/python3/src/Lib/asyncio/exceptions.py @@ -1,58 +1,58 @@ -"""asyncio exceptions.""" - - -__all__ = ('CancelledError', 'InvalidStateError', 'TimeoutError', - 'IncompleteReadError', 'LimitOverrunError', - 'SendfileNotAvailableError') - - -class CancelledError(BaseException): - """The Future or Task was cancelled.""" - - -class TimeoutError(Exception): - """The operation exceeded the given deadline.""" - - -class InvalidStateError(Exception): - """The operation is not allowed in this state.""" - - -class SendfileNotAvailableError(RuntimeError): - """Sendfile syscall is not available. - - Raised if OS does not support sendfile syscall for given socket or - file type. - """ - - -class IncompleteReadError(EOFError): - """ - Incomplete read error. Attributes: - - - partial: read bytes string before the end of stream was reached - - expected: total number of expected bytes (or None if unknown) - """ - def __init__(self, partial, expected): - r_expected = 'undefined' if expected is None else repr(expected) - super().__init__(f'{len(partial)} bytes read on a total of ' - f'{r_expected} expected bytes') - self.partial = partial - self.expected = expected - - def __reduce__(self): - return type(self), (self.partial, self.expected) - - -class LimitOverrunError(Exception): - """Reached the buffer limit while looking for a separator. - - Attributes: - - consumed: total number of to be consumed bytes. - """ - def __init__(self, message, consumed): - super().__init__(message) - self.consumed = consumed - - def __reduce__(self): - return type(self), (self.args[0], self.consumed) +"""asyncio exceptions.""" + + +__all__ = ('CancelledError', 'InvalidStateError', 'TimeoutError', + 'IncompleteReadError', 'LimitOverrunError', + 'SendfileNotAvailableError') + + +class CancelledError(BaseException): + """The Future or Task was cancelled.""" + + +class TimeoutError(Exception): + """The operation exceeded the given deadline.""" + + +class InvalidStateError(Exception): + """The operation is not allowed in this state.""" + + +class SendfileNotAvailableError(RuntimeError): + """Sendfile syscall is not available. + + Raised if OS does not support sendfile syscall for given socket or + file type. + """ + + +class IncompleteReadError(EOFError): + """ + Incomplete read error. Attributes: + + - partial: read bytes string before the end of stream was reached + - expected: total number of expected bytes (or None if unknown) + """ + def __init__(self, partial, expected): + r_expected = 'undefined' if expected is None else repr(expected) + super().__init__(f'{len(partial)} bytes read on a total of ' + f'{r_expected} expected bytes') + self.partial = partial + self.expected = expected + + def __reduce__(self): + return type(self), (self.partial, self.expected) + + +class LimitOverrunError(Exception): + """Reached the buffer limit while looking for a separator. + + Attributes: + - consumed: total number of to be consumed bytes. + """ + def __init__(self, message, consumed): + super().__init__(message) + self.consumed = consumed + + def __reduce__(self): + return type(self), (self.args[0], self.consumed) diff --git a/contrib/tools/python3/src/Lib/asyncio/futures.py b/contrib/tools/python3/src/Lib/asyncio/futures.py index d66c12152e..bed4da52fd 100644 --- a/contrib/tools/python3/src/Lib/asyncio/futures.py +++ b/contrib/tools/python3/src/Lib/asyncio/futures.py @@ -11,7 +11,7 @@ import sys from . import base_futures from . import events -from . import exceptions +from . import exceptions from . import format_helpers @@ -51,9 +51,9 @@ class Future: _exception = None _loop = None _source_traceback = None - _cancel_message = None - # A saved CancelledError for later chaining as an exception context. - _cancelled_exc = None + _cancel_message = None + # A saved CancelledError for later chaining as an exception context. + _cancelled_exc = None # This field is used for a dual purpose: # - Its presence is a marker to declare that a class implements @@ -106,9 +106,9 @@ class Future: context['source_traceback'] = self._source_traceback self._loop.call_exception_handler(context) - def __class_getitem__(cls, type): - return cls - + def __class_getitem__(cls, type): + return cls + @property def _log_traceback(self): return self.__log_traceback @@ -121,27 +121,27 @@ class Future: def get_loop(self): """Return the event loop the Future is bound to.""" - loop = self._loop - if loop is None: - raise RuntimeError("Future object is not initialized.") - return loop - - def _make_cancelled_error(self): - """Create the CancelledError to raise if the Future is cancelled. - - This should only be called once when handling a cancellation since - it erases the saved context exception value. - """ - if self._cancel_message is None: - exc = exceptions.CancelledError() - else: - exc = exceptions.CancelledError(self._cancel_message) - exc.__context__ = self._cancelled_exc - # Remove the reference since we don't need this anymore. - self._cancelled_exc = None - return exc - - def cancel(self, msg=None): + loop = self._loop + if loop is None: + raise RuntimeError("Future object is not initialized.") + return loop + + def _make_cancelled_error(self): + """Create the CancelledError to raise if the Future is cancelled. + + This should only be called once when handling a cancellation since + it erases the saved context exception value. + """ + if self._cancel_message is None: + exc = exceptions.CancelledError() + else: + exc = exceptions.CancelledError(self._cancel_message) + exc.__context__ = self._cancelled_exc + # Remove the reference since we don't need this anymore. + self._cancelled_exc = None + return exc + + def cancel(self, msg=None): """Cancel the future and schedule callbacks. If the future is already done or cancelled, return False. Otherwise, @@ -152,7 +152,7 @@ class Future: if self._state != _PENDING: return False self._state = _CANCELLED - self._cancel_message = msg + self._cancel_message = msg self.__schedule_callbacks() return True @@ -192,10 +192,10 @@ class Future: the future is done and has an exception set, this exception is raised. """ if self._state == _CANCELLED: - exc = self._make_cancelled_error() - raise exc + exc = self._make_cancelled_error() + raise exc if self._state != _FINISHED: - raise exceptions.InvalidStateError('Result is not ready.') + raise exceptions.InvalidStateError('Result is not ready.') self.__log_traceback = False if self._exception is not None: raise self._exception @@ -210,10 +210,10 @@ class Future: InvalidStateError. """ if self._state == _CANCELLED: - exc = self._make_cancelled_error() - raise exc + exc = self._make_cancelled_error() + raise exc if self._state != _FINISHED: - raise exceptions.InvalidStateError('Exception is not set.') + raise exceptions.InvalidStateError('Exception is not set.') self.__log_traceback = False return self._exception @@ -255,7 +255,7 @@ class Future: InvalidStateError. """ if self._state != _PENDING: - raise exceptions.InvalidStateError(f'{self._state}: {self!r}') + raise exceptions.InvalidStateError(f'{self._state}: {self!r}') self._result = result self._state = _FINISHED self.__schedule_callbacks() @@ -267,7 +267,7 @@ class Future: InvalidStateError. """ if self._state != _PENDING: - raise exceptions.InvalidStateError(f'{self._state}: {self!r}') + raise exceptions.InvalidStateError(f'{self._state}: {self!r}') if isinstance(exception, type): exception = exception() if type(exception) is StopIteration: @@ -312,18 +312,18 @@ def _set_result_unless_cancelled(fut, result): fut.set_result(result) -def _convert_future_exc(exc): - exc_class = type(exc) - if exc_class is concurrent.futures.CancelledError: - return exceptions.CancelledError(*exc.args) - elif exc_class is concurrent.futures.TimeoutError: - return exceptions.TimeoutError(*exc.args) - elif exc_class is concurrent.futures.InvalidStateError: - return exceptions.InvalidStateError(*exc.args) - else: - return exc - - +def _convert_future_exc(exc): + exc_class = type(exc) + if exc_class is concurrent.futures.CancelledError: + return exceptions.CancelledError(*exc.args) + elif exc_class is concurrent.futures.TimeoutError: + return exceptions.TimeoutError(*exc.args) + elif exc_class is concurrent.futures.InvalidStateError: + return exceptions.InvalidStateError(*exc.args) + else: + return exc + + def _set_concurrent_future_state(concurrent, source): """Copy state from a future to a concurrent.futures.Future.""" assert source.done() @@ -333,7 +333,7 @@ def _set_concurrent_future_state(concurrent, source): return exception = source.exception() if exception is not None: - concurrent.set_exception(_convert_future_exc(exception)) + concurrent.set_exception(_convert_future_exc(exception)) else: result = source.result() concurrent.set_result(result) @@ -353,7 +353,7 @@ def _copy_future_state(source, dest): else: exception = source.exception() if exception is not None: - dest.set_exception(_convert_future_exc(exception)) + dest.set_exception(_convert_future_exc(exception)) else: result = source.result() dest.set_result(result) diff --git a/contrib/tools/python3/src/Lib/asyncio/locks.py b/contrib/tools/python3/src/Lib/asyncio/locks.py index 218226dca2..f1ce732478 100644 --- a/contrib/tools/python3/src/Lib/asyncio/locks.py +++ b/contrib/tools/python3/src/Lib/asyncio/locks.py @@ -6,7 +6,7 @@ import collections import warnings from . import events -from . import exceptions +from . import exceptions class _ContextManagerMixin: @@ -75,15 +75,15 @@ class Lock(_ContextManagerMixin): """ def __init__(self, *, loop=None): - self._waiters = None + self._waiters = None self._locked = False - if loop is None: - self._loop = events.get_event_loop() - else: + if loop is None: + self._loop = events.get_event_loop() + else: self._loop = loop - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) + warnings.warn("The loop argument is deprecated since Python 3.8, " + "and scheduled for removal in Python 3.10.", + DeprecationWarning, stacklevel=2) def __repr__(self): res = super().__repr__() @@ -102,13 +102,13 @@ class Lock(_ContextManagerMixin): This method blocks until the lock is unlocked, then sets it to locked and returns True. """ - if (not self._locked and (self._waiters is None or - all(w.cancelled() for w in self._waiters))): + if (not self._locked and (self._waiters is None or + all(w.cancelled() for w in self._waiters))): self._locked = True return True - if self._waiters is None: - self._waiters = collections.deque() + if self._waiters is None: + self._waiters = collections.deque() fut = self._loop.create_future() self._waiters.append(fut) @@ -120,7 +120,7 @@ class Lock(_ContextManagerMixin): await fut finally: self._waiters.remove(fut) - except exceptions.CancelledError: + except exceptions.CancelledError: if not self._locked: self._wake_up_first() raise @@ -147,8 +147,8 @@ class Lock(_ContextManagerMixin): def _wake_up_first(self): """Wake up the first waiter if it isn't done.""" - if not self._waiters: - return + if not self._waiters: + return try: fut = next(iter(self._waiters)) except StopIteration: @@ -173,13 +173,13 @@ class Event: def __init__(self, *, loop=None): self._waiters = collections.deque() self._value = False - if loop is None: - self._loop = events.get_event_loop() - else: + if loop is None: + self._loop = events.get_event_loop() + else: self._loop = loop - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) + warnings.warn("The loop argument is deprecated since Python 3.8, " + "and scheduled for removal in Python 3.10.", + DeprecationWarning, stacklevel=2) def __repr__(self): res = super().__repr__() @@ -240,16 +240,16 @@ class Condition(_ContextManagerMixin): """ def __init__(self, lock=None, *, loop=None): - if loop is None: - self._loop = events.get_event_loop() - else: + if loop is None: + self._loop = events.get_event_loop() + else: self._loop = loop - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) + warnings.warn("The loop argument is deprecated since Python 3.8, " + "and scheduled for removal in Python 3.10.", + DeprecationWarning, stacklevel=2) if lock is None: - lock = Lock(loop=loop) + lock = Lock(loop=loop) elif lock._loop is not self._loop: raise ValueError("loop argument must agree with lock") @@ -299,11 +299,11 @@ class Condition(_ContextManagerMixin): try: await self.acquire() break - except exceptions.CancelledError: + except exceptions.CancelledError: cancelled = True if cancelled: - raise exceptions.CancelledError + raise exceptions.CancelledError async def wait_for(self, predicate): """Wait until a predicate becomes true. @@ -371,13 +371,13 @@ class Semaphore(_ContextManagerMixin): raise ValueError("Semaphore initial value must be >= 0") self._value = value self._waiters = collections.deque() - if loop is None: - self._loop = events.get_event_loop() - else: + if loop is None: + self._loop = events.get_event_loop() + else: self._loop = loop - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) + warnings.warn("The loop argument is deprecated since Python 3.8, " + "and scheduled for removal in Python 3.10.", + DeprecationWarning, stacklevel=2) def __repr__(self): res = super().__repr__() @@ -437,11 +437,11 @@ class BoundedSemaphore(Semaphore): """ def __init__(self, value=1, *, loop=None): - if loop: - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) - + if loop: + warnings.warn("The loop argument is deprecated since Python 3.8, " + "and scheduled for removal in Python 3.10.", + DeprecationWarning, stacklevel=2) + self._bound_value = value super().__init__(value, loop=loop) diff --git a/contrib/tools/python3/src/Lib/asyncio/proactor_events.py b/contrib/tools/python3/src/Lib/asyncio/proactor_events.py index 33da82ae58..b4cd414b82 100644 --- a/contrib/tools/python3/src/Lib/asyncio/proactor_events.py +++ b/contrib/tools/python3/src/Lib/asyncio/proactor_events.py @@ -10,39 +10,39 @@ import io import os import socket import warnings -import signal -import threading -import collections +import signal +import threading +import collections from . import base_events from . import constants from . import futures -from . import exceptions +from . import exceptions from . import protocols from . import sslproto from . import transports -from . import trsock +from . import trsock from .log import logger -def _set_socket_extra(transport, sock): - transport._extra['socket'] = trsock.TransportSocket(sock) - - try: - transport._extra['sockname'] = sock.getsockname() - except socket.error: - if transport._loop.get_debug(): - logger.warning( - "getsockname() failed on %r", sock, exc_info=True) - - if 'peername' not in transport._extra: - try: - transport._extra['peername'] = sock.getpeername() - except socket.error: - # UDP sockets may not have a peer name - transport._extra['peername'] = None - - +def _set_socket_extra(transport, sock): + transport._extra['socket'] = trsock.TransportSocket(sock) + + try: + transport._extra['sockname'] = sock.getsockname() + except socket.error: + if transport._loop.get_debug(): + logger.warning( + "getsockname() failed on %r", sock, exc_info=True) + + if 'peername' not in transport._extra: + try: + transport._extra['peername'] = sock.getpeername() + except socket.error: + # UDP sockets may not have a peer name + transport._extra['peername'] = None + + class _ProactorBasePipeTransport(transports._FlowControlMixin, transports.BaseTransport): """Base class for pipe and socket transports.""" @@ -110,14 +110,14 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin, self._read_fut.cancel() self._read_fut = None - def __del__(self, _warn=warnings.warn): + def __del__(self, _warn=warnings.warn): if self._sock is not None: - _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) + _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) self.close() def _fatal_error(self, exc, message='Fatal error on pipe transport'): try: - if isinstance(exc, OSError): + if isinstance(exc, OSError): if self._loop.get_debug(): logger.debug("%r: %s", self, message, exc_info=True) else: @@ -233,9 +233,9 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, try: keep_open = self._protocol.eof_received() - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as exc: + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: self._fatal_error( exc, 'Fatal error: protocol.eof_received() call failed.') return @@ -258,9 +258,9 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, if isinstance(self._protocol, protocols.BufferedProtocol): try: protocols._feed_data_to_buffered_proto(self._protocol, data) - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as exc: + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: self._fatal_error(exc, 'Fatal error: protocol.buffer_updated() ' 'call failed.') @@ -307,7 +307,7 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, self._force_close(exc) except OSError as exc: self._fatal_error(exc, 'Fatal read error on pipe transport') - except exceptions.CancelledError: + except exceptions.CancelledError: if not self._closing: raise else: @@ -450,134 +450,134 @@ class _ProactorWritePipeTransport(_ProactorBaseWritePipeTransport): self.close() -class _ProactorDatagramTransport(_ProactorBasePipeTransport): - max_size = 256 * 1024 - def __init__(self, loop, sock, protocol, address=None, - waiter=None, extra=None): - self._address = address - self._empty_waiter = None - # We don't need to call _protocol.connection_made() since our base - # constructor does it for us. - super().__init__(loop, sock, protocol, waiter=waiter, extra=extra) - - # The base constructor sets _buffer = None, so we set it here - self._buffer = collections.deque() - self._loop.call_soon(self._loop_reading) - - def _set_extra(self, sock): - _set_socket_extra(self, sock) - - def get_write_buffer_size(self): - return sum(len(data) for data, _ in self._buffer) - - def abort(self): - self._force_close(None) - - def sendto(self, data, addr=None): - if not isinstance(data, (bytes, bytearray, memoryview)): - raise TypeError('data argument must be bytes-like object (%r)', - type(data)) - - if not data: - return - - if self._address is not None and addr not in (None, self._address): - raise ValueError( - f'Invalid address: must be None or {self._address}') - - if self._conn_lost and self._address: - if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: - logger.warning('socket.sendto() raised exception.') - self._conn_lost += 1 - return - - # Ensure that what we buffer is immutable. - self._buffer.append((bytes(data), addr)) - - if self._write_fut is None: - # No current write operations are active, kick one off - self._loop_writing() - # else: A write operation is already kicked off - - self._maybe_pause_protocol() - - def _loop_writing(self, fut=None): - try: - if self._conn_lost: - return - - assert fut is self._write_fut - self._write_fut = None - if fut: - # We are in a _loop_writing() done callback, get the result - fut.result() - - if not self._buffer or (self._conn_lost and self._address): - # The connection has been closed - if self._closing: - self._loop.call_soon(self._call_connection_lost, None) - return - - data, addr = self._buffer.popleft() - if self._address is not None: - self._write_fut = self._loop._proactor.send(self._sock, - data) - else: - self._write_fut = self._loop._proactor.sendto(self._sock, - data, - addr=addr) - except OSError as exc: - self._protocol.error_received(exc) - except Exception as exc: - self._fatal_error(exc, 'Fatal write error on datagram transport') - else: - self._write_fut.add_done_callback(self._loop_writing) - self._maybe_resume_protocol() - - def _loop_reading(self, fut=None): - data = None - try: - if self._conn_lost: - return - - assert self._read_fut is fut or (self._read_fut is None and - self._closing) - - self._read_fut = None - if fut is not None: - res = fut.result() - - if self._closing: - # since close() has been called we ignore any read data - data = None - return - - if self._address is not None: - data, addr = res, self._address - else: - data, addr = res - - if self._conn_lost: - return - if self._address is not None: - self._read_fut = self._loop._proactor.recv(self._sock, - self.max_size) - else: - self._read_fut = self._loop._proactor.recvfrom(self._sock, - self.max_size) - except OSError as exc: - self._protocol.error_received(exc) - except exceptions.CancelledError: - if not self._closing: - raise - else: - if self._read_fut is not None: - self._read_fut.add_done_callback(self._loop_reading) - finally: - if data: - self._protocol.datagram_received(data, addr) - - +class _ProactorDatagramTransport(_ProactorBasePipeTransport): + max_size = 256 * 1024 + def __init__(self, loop, sock, protocol, address=None, + waiter=None, extra=None): + self._address = address + self._empty_waiter = None + # We don't need to call _protocol.connection_made() since our base + # constructor does it for us. + super().__init__(loop, sock, protocol, waiter=waiter, extra=extra) + + # The base constructor sets _buffer = None, so we set it here + self._buffer = collections.deque() + self._loop.call_soon(self._loop_reading) + + def _set_extra(self, sock): + _set_socket_extra(self, sock) + + def get_write_buffer_size(self): + return sum(len(data) for data, _ in self._buffer) + + def abort(self): + self._force_close(None) + + def sendto(self, data, addr=None): + if not isinstance(data, (bytes, bytearray, memoryview)): + raise TypeError('data argument must be bytes-like object (%r)', + type(data)) + + if not data: + return + + if self._address is not None and addr not in (None, self._address): + raise ValueError( + f'Invalid address: must be None or {self._address}') + + if self._conn_lost and self._address: + if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: + logger.warning('socket.sendto() raised exception.') + self._conn_lost += 1 + return + + # Ensure that what we buffer is immutable. + self._buffer.append((bytes(data), addr)) + + if self._write_fut is None: + # No current write operations are active, kick one off + self._loop_writing() + # else: A write operation is already kicked off + + self._maybe_pause_protocol() + + def _loop_writing(self, fut=None): + try: + if self._conn_lost: + return + + assert fut is self._write_fut + self._write_fut = None + if fut: + # We are in a _loop_writing() done callback, get the result + fut.result() + + if not self._buffer or (self._conn_lost and self._address): + # The connection has been closed + if self._closing: + self._loop.call_soon(self._call_connection_lost, None) + return + + data, addr = self._buffer.popleft() + if self._address is not None: + self._write_fut = self._loop._proactor.send(self._sock, + data) + else: + self._write_fut = self._loop._proactor.sendto(self._sock, + data, + addr=addr) + except OSError as exc: + self._protocol.error_received(exc) + except Exception as exc: + self._fatal_error(exc, 'Fatal write error on datagram transport') + else: + self._write_fut.add_done_callback(self._loop_writing) + self._maybe_resume_protocol() + + def _loop_reading(self, fut=None): + data = None + try: + if self._conn_lost: + return + + assert self._read_fut is fut or (self._read_fut is None and + self._closing) + + self._read_fut = None + if fut is not None: + res = fut.result() + + if self._closing: + # since close() has been called we ignore any read data + data = None + return + + if self._address is not None: + data, addr = res, self._address + else: + data, addr = res + + if self._conn_lost: + return + if self._address is not None: + self._read_fut = self._loop._proactor.recv(self._sock, + self.max_size) + else: + self._read_fut = self._loop._proactor.recvfrom(self._sock, + self.max_size) + except OSError as exc: + self._protocol.error_received(exc) + except exceptions.CancelledError: + if not self._closing: + raise + else: + if self._read_fut is not None: + self._read_fut.add_done_callback(self._loop_reading) + finally: + if data: + self._protocol.datagram_received(data, addr) + + class _ProactorDuplexPipeTransport(_ProactorReadPipeTransport, _ProactorBaseWritePipeTransport, transports.Transport): @@ -603,7 +603,7 @@ class _ProactorSocketTransport(_ProactorReadPipeTransport, base_events._set_nodelay(sock) def _set_extra(self, sock): - _set_socket_extra(self, sock) + _set_socket_extra(self, sock) def can_write_eof(self): return True @@ -627,9 +627,9 @@ class BaseProactorEventLoop(base_events.BaseEventLoop): self._accept_futures = {} # socket file descriptor => Future proactor.set_loop(self) self._make_self_pipe() - if threading.current_thread() is threading.main_thread(): - # wakeup fd can only be installed to a file descriptor from the main thread - signal.set_wakeup_fd(self._csock.fileno()) + if threading.current_thread() is threading.main_thread(): + # wakeup fd can only be installed to a file descriptor from the main thread + signal.set_wakeup_fd(self._csock.fileno()) def _make_socket_transport(self, sock, protocol, waiter=None, extra=None, server=None): @@ -649,11 +649,11 @@ class BaseProactorEventLoop(base_events.BaseEventLoop): extra=extra, server=server) return ssl_protocol._app_transport - def _make_datagram_transport(self, sock, protocol, - address=None, waiter=None, extra=None): - return _ProactorDatagramTransport(self, sock, protocol, address, - waiter, extra) - + def _make_datagram_transport(self, sock, protocol, + address=None, waiter=None, extra=None): + return _ProactorDatagramTransport(self, sock, protocol, address, + waiter, extra) + def _make_duplex_pipe_transport(self, sock, protocol, waiter=None, extra=None): return _ProactorDuplexPipeTransport(self, @@ -675,8 +675,8 @@ class BaseProactorEventLoop(base_events.BaseEventLoop): if self.is_closed(): return - if threading.current_thread() is threading.main_thread(): - signal.set_wakeup_fd(-1) + if threading.current_thread() is threading.main_thread(): + signal.set_wakeup_fd(-1) # Call these methods before closing the event loop (before calling # BaseEventLoop.close), because they can schedule callbacks with # call_soon(), which is forbidden when the event loop is closed. @@ -708,11 +708,11 @@ class BaseProactorEventLoop(base_events.BaseEventLoop): try: fileno = file.fileno() except (AttributeError, io.UnsupportedOperation) as err: - raise exceptions.SendfileNotAvailableError("not a regular file") + raise exceptions.SendfileNotAvailableError("not a regular file") try: fsize = os.fstat(fileno).st_size - except OSError: - raise exceptions.SendfileNotAvailableError("not a regular file") + except OSError: + raise exceptions.SendfileNotAvailableError("not a regular file") blocksize = count if count else fsize if not blocksize: return 0 # empty file @@ -766,21 +766,21 @@ class BaseProactorEventLoop(base_events.BaseEventLoop): try: if f is not None: f.result() # may raise - if self._self_reading_future is not f: - # When we scheduled this Future, we assigned it to - # _self_reading_future. If it's not there now, something has - # tried to cancel the loop while this callback was still in the - # queue (see windows_events.ProactorEventLoop.run_forever). In - # that case stop here instead of continuing to schedule a new - # iteration. - return + if self._self_reading_future is not f: + # When we scheduled this Future, we assigned it to + # _self_reading_future. If it's not there now, something has + # tried to cancel the loop while this callback was still in the + # queue (see windows_events.ProactorEventLoop.run_forever). In + # that case stop here instead of continuing to schedule a new + # iteration. + return f = self._proactor.recv(self._ssock, 4096) - except exceptions.CancelledError: + except exceptions.CancelledError: # _close_self_pipe() has been called, stop waiting for data return - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as exc: + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: self.call_exception_handler({ 'message': 'Error on reading from the event loop self pipe', 'exception': exc, @@ -791,17 +791,17 @@ class BaseProactorEventLoop(base_events.BaseEventLoop): f.add_done_callback(self._loop_self_reading) def _write_to_self(self): - # This may be called from a different thread, possibly after - # _close_self_pipe() has been called or even while it is - # running. Guard for self._csock being None or closed. When - # a socket is closed, send() raises OSError (with errno set to - # EBADF, but let's not rely on the exact error code). - csock = self._csock - if csock is None: - return - + # This may be called from a different thread, possibly after + # _close_self_pipe() has been called or even while it is + # running. Guard for self._csock being None or closed. When + # a socket is closed, send() raises OSError (with errno set to + # EBADF, but let's not rely on the exact error code). + csock = self._csock + if csock is None: + return + try: - csock.send(b'\0') + csock.send(b'\0') except OSError: if self._debug: logger.debug("Fail to write a null byte into the " @@ -837,13 +837,13 @@ class BaseProactorEventLoop(base_events.BaseEventLoop): self.call_exception_handler({ 'message': 'Accept failed on a socket', 'exception': exc, - 'socket': trsock.TransportSocket(sock), + 'socket': trsock.TransportSocket(sock), }) sock.close() elif self._debug: logger.debug("Accept failed on socket %r", sock, exc_info=True) - except exceptions.CancelledError: + except exceptions.CancelledError: sock.close() else: self._accept_futures[sock.fileno()] = f diff --git a/contrib/tools/python3/src/Lib/asyncio/protocols.py b/contrib/tools/python3/src/Lib/asyncio/protocols.py index d09f51f4f2..69fa43e8b6 100644 --- a/contrib/tools/python3/src/Lib/asyncio/protocols.py +++ b/contrib/tools/python3/src/Lib/asyncio/protocols.py @@ -16,8 +16,8 @@ class BaseProtocol: write-only transport like write pipe """ - __slots__ = () - + __slots__ = () + def connection_made(self, transport): """Called when a connection is made. @@ -89,8 +89,8 @@ class Protocol(BaseProtocol): * CL: connection_lost() """ - __slots__ = () - + __slots__ = () + def data_received(self, data): """Called when some data is received. @@ -134,8 +134,8 @@ class BufferedProtocol(BaseProtocol): * CL: connection_lost() """ - __slots__ = () - + __slots__ = () + def get_buffer(self, sizehint): """Called to allocate a new receive buffer. @@ -166,8 +166,8 @@ class BufferedProtocol(BaseProtocol): class DatagramProtocol(BaseProtocol): """Interface for datagram protocol.""" - __slots__ = () - + __slots__ = () + def datagram_received(self, data, addr): """Called when some datagram is received.""" @@ -181,8 +181,8 @@ class DatagramProtocol(BaseProtocol): class SubprocessProtocol(BaseProtocol): """Interface for protocol for subprocess calls.""" - __slots__ = () - + __slots__ = () + def pipe_data_received(self, fd, data): """Called when the subprocess writes data into stdout/stderr pipe. diff --git a/contrib/tools/python3/src/Lib/asyncio/queues.py b/contrib/tools/python3/src/Lib/asyncio/queues.py index 03ca592290..cd3f7c6a56 100644 --- a/contrib/tools/python3/src/Lib/asyncio/queues.py +++ b/contrib/tools/python3/src/Lib/asyncio/queues.py @@ -2,7 +2,7 @@ __all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty') import collections import heapq -import warnings +import warnings from . import events from . import locks @@ -35,9 +35,9 @@ class Queue: self._loop = events.get_event_loop() else: self._loop = loop - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) + warnings.warn("The loop argument is deprecated since Python 3.8, " + "and scheduled for removal in Python 3.10.", + DeprecationWarning, stacklevel=2) self._maxsize = maxsize # Futures. @@ -45,7 +45,7 @@ class Queue: # Futures. self._putters = collections.deque() self._unfinished_tasks = 0 - self._finished = locks.Event(loop=loop) + self._finished = locks.Event(loop=loop) self._finished.set() self._init(maxsize) @@ -76,9 +76,9 @@ class Queue: def __str__(self): return f'<{type(self).__name__} {self._format()}>' - def __class_getitem__(cls, type): - return cls - + def __class_getitem__(cls, type): + return cls + def _format(self): result = f'maxsize={self._maxsize!r}' if getattr(self, '_queue', None): diff --git a/contrib/tools/python3/src/Lib/asyncio/runners.py b/contrib/tools/python3/src/Lib/asyncio/runners.py index d6cdeef9a6..6920acba38 100644 --- a/contrib/tools/python3/src/Lib/asyncio/runners.py +++ b/contrib/tools/python3/src/Lib/asyncio/runners.py @@ -5,8 +5,8 @@ from . import events from . import tasks -def run(main, *, debug=None): - """Execute the coroutine and return the result. +def run(main, *, debug=None): + """Execute the coroutine and return the result. This function runs the passed coroutine, taking care of managing the asyncio event loop and finalizing asynchronous @@ -39,14 +39,14 @@ def run(main, *, debug=None): loop = events.new_event_loop() try: events.set_event_loop(loop) - if debug is not None: - loop.set_debug(debug) + if debug is not None: + loop.set_debug(debug) return loop.run_until_complete(main) finally: try: _cancel_all_tasks(loop) loop.run_until_complete(loop.shutdown_asyncgens()) - loop.run_until_complete(loop.shutdown_default_executor()) + loop.run_until_complete(loop.shutdown_default_executor()) finally: events.set_event_loop(None) loop.close() @@ -61,7 +61,7 @@ def _cancel_all_tasks(loop): task.cancel() loop.run_until_complete( - tasks._gather(*to_cancel, loop=loop, return_exceptions=True)) + tasks._gather(*to_cancel, loop=loop, return_exceptions=True)) for task in to_cancel: if task.cancelled(): diff --git a/contrib/tools/python3/src/Lib/asyncio/selector_events.py b/contrib/tools/python3/src/Lib/asyncio/selector_events.py index 2ecf392b8b..59cb6b1bab 100644 --- a/contrib/tools/python3/src/Lib/asyncio/selector_events.py +++ b/contrib/tools/python3/src/Lib/asyncio/selector_events.py @@ -25,7 +25,7 @@ from . import futures from . import protocols from . import sslproto from . import transports -from . import trsock +from . import trsock from .log import logger @@ -40,11 +40,11 @@ def _test_selector_event(selector, fd, event): return bool(key.events & event) -def _check_ssl_socket(sock): - if ssl is not None and isinstance(sock, ssl.SSLSocket): - raise TypeError("Socket cannot be of type SSLSocket") - - +def _check_ssl_socket(sock): + if ssl is not None and isinstance(sock, ssl.SSLSocket): + raise TypeError("Socket cannot be of type SSLSocket") + + class BaseSelectorEventLoop(base_events.BaseEventLoop): """Selector event loop. @@ -133,17 +133,17 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): # a socket is closed, send() raises OSError (with errno set to # EBADF, but let's not rely on the exact error code). csock = self._csock - if csock is None: - return - - try: - csock.send(b'\0') - except OSError: - if self._debug: - logger.debug("Fail to write a null byte into the " - "self-pipe socket", - exc_info=True) - + if csock is None: + return + + try: + csock.send(b'\0') + except OSError: + if self._debug: + logger.debug("Fail to write a null byte into the " + "self-pipe socket", + exc_info=True) + def _start_serving(self, protocol_factory, sock, sslcontext=None, server=None, backlog=100, ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT): @@ -179,7 +179,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): self.call_exception_handler({ 'message': 'socket.accept() out of system resource', 'exception': exc, - 'socket': trsock.TransportSocket(sock), + 'socket': trsock.TransportSocket(sock), }) self._remove_reader(sock.fileno()) self.call_later(constants.ACCEPT_RETRY_DELAY, @@ -216,14 +216,14 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): try: await waiter - except BaseException: + except BaseException: transport.close() raise - # It's now up to the protocol to handle the connection. + # It's now up to the protocol to handle the connection. - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as exc: + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: if self._debug: context = { 'message': @@ -268,7 +268,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): (handle, writer)) if reader is not None: reader.cancel() - return handle + return handle def _remove_reader(self, fd): if self.is_closed(): @@ -305,7 +305,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): (reader, handle)) if writer is not None: writer.cancel() - return handle + return handle def _remove_writer(self, fd): """Remove a writer callback.""" @@ -333,7 +333,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): def add_reader(self, fd, callback, *args): """Add a reader callback.""" self._ensure_fd_no_transport(fd) - self._add_reader(fd, callback, *args) + self._add_reader(fd, callback, *args) def remove_reader(self, fd): """Remove a reader callback.""" @@ -343,7 +343,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): def add_writer(self, fd, callback, *args): """Add a writer callback..""" self._ensure_fd_no_transport(fd) - self._add_writer(fd, callback, *args) + self._add_writer(fd, callback, *args) def remove_writer(self, fd): """Remove a writer callback.""" @@ -357,37 +357,37 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): The maximum amount of data to be received at once is specified by nbytes. """ - _check_ssl_socket(sock) + _check_ssl_socket(sock) if self._debug and sock.gettimeout() != 0: raise ValueError("the socket must be non-blocking") - try: - return sock.recv(n) - except (BlockingIOError, InterruptedError): - pass + try: + return sock.recv(n) + except (BlockingIOError, InterruptedError): + pass fut = self.create_future() - fd = sock.fileno() - self._ensure_fd_no_transport(fd) - handle = self._add_reader(fd, self._sock_recv, fut, sock, n) - fut.add_done_callback( - functools.partial(self._sock_read_done, fd, handle=handle)) + fd = sock.fileno() + self._ensure_fd_no_transport(fd) + handle = self._add_reader(fd, self._sock_recv, fut, sock, n) + fut.add_done_callback( + functools.partial(self._sock_read_done, fd, handle=handle)) return await fut - def _sock_read_done(self, fd, fut, handle=None): - if handle is None or not handle.cancelled(): - self.remove_reader(fd) - - def _sock_recv(self, fut, sock, n): + def _sock_read_done(self, fd, fut, handle=None): + if handle is None or not handle.cancelled(): + self.remove_reader(fd) + + def _sock_recv(self, fut, sock, n): # _sock_recv() can add itself as an I/O callback if the operation can't # be done immediately. Don't use it directly, call sock_recv(). - if fut.done(): + if fut.done(): return try: data = sock.recv(n) except (BlockingIOError, InterruptedError): - return # try again next time - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as exc: + return # try again next time + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: fut.set_exception(exc) else: fut.set_result(data) @@ -398,34 +398,34 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): The received data is written into *buf* (a writable buffer). The return value is the number of bytes written. """ - _check_ssl_socket(sock) + _check_ssl_socket(sock) if self._debug and sock.gettimeout() != 0: raise ValueError("the socket must be non-blocking") - try: - return sock.recv_into(buf) - except (BlockingIOError, InterruptedError): - pass + try: + return sock.recv_into(buf) + except (BlockingIOError, InterruptedError): + pass fut = self.create_future() - fd = sock.fileno() - self._ensure_fd_no_transport(fd) - handle = self._add_reader(fd, self._sock_recv_into, fut, sock, buf) - fut.add_done_callback( - functools.partial(self._sock_read_done, fd, handle=handle)) + fd = sock.fileno() + self._ensure_fd_no_transport(fd) + handle = self._add_reader(fd, self._sock_recv_into, fut, sock, buf) + fut.add_done_callback( + functools.partial(self._sock_read_done, fd, handle=handle)) return await fut - def _sock_recv_into(self, fut, sock, buf): + def _sock_recv_into(self, fut, sock, buf): # _sock_recv_into() can add itself as an I/O callback if the operation # can't be done immediately. Don't use it directly, call # sock_recv_into(). - if fut.done(): + if fut.done(): return try: nbytes = sock.recv_into(buf) except (BlockingIOError, InterruptedError): - return # try again next time - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as exc: + return # try again next time + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: fut.set_exception(exc) else: fut.set_result(nbytes) @@ -439,56 +439,56 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): raised, and there is no way to determine how much data, if any, was successfully processed by the receiving end of the connection. """ - _check_ssl_socket(sock) + _check_ssl_socket(sock) if self._debug and sock.gettimeout() != 0: raise ValueError("the socket must be non-blocking") - try: - n = sock.send(data) - except (BlockingIOError, InterruptedError): - n = 0 - - if n == len(data): - # all data sent - return - + try: + n = sock.send(data) + except (BlockingIOError, InterruptedError): + n = 0 + + if n == len(data): + # all data sent + return + fut = self.create_future() - fd = sock.fileno() - self._ensure_fd_no_transport(fd) - # use a trick with a list in closure to store a mutable state - handle = self._add_writer(fd, self._sock_sendall, fut, sock, - memoryview(data), [n]) - fut.add_done_callback( - functools.partial(self._sock_write_done, fd, handle=handle)) + fd = sock.fileno() + self._ensure_fd_no_transport(fd) + # use a trick with a list in closure to store a mutable state + handle = self._add_writer(fd, self._sock_sendall, fut, sock, + memoryview(data), [n]) + fut.add_done_callback( + functools.partial(self._sock_write_done, fd, handle=handle)) return await fut - def _sock_sendall(self, fut, sock, view, pos): - if fut.done(): - # Future cancellation can be scheduled on previous loop iteration + def _sock_sendall(self, fut, sock, view, pos): + if fut.done(): + # Future cancellation can be scheduled on previous loop iteration return - start = pos[0] + start = pos[0] try: - n = sock.send(view[start:]) + n = sock.send(view[start:]) except (BlockingIOError, InterruptedError): - return - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as exc: + return + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: fut.set_exception(exc) return - start += n - - if start == len(view): + start += n + + if start == len(view): fut.set_result(None) else: - pos[0] = start + pos[0] = start async def sock_connect(self, sock, address): """Connect to a remote socket at address. This method is a coroutine. """ - _check_ssl_socket(sock) + _check_ssl_socket(sock) if self._debug and sock.gettimeout() != 0: raise ValueError("the socket must be non-blocking") @@ -510,24 +510,24 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): # connection runs in background. We have to wait until the socket # becomes writable to be notified when the connection succeed or # fails. - self._ensure_fd_no_transport(fd) - handle = self._add_writer( - fd, self._sock_connect_cb, fut, sock, address) + self._ensure_fd_no_transport(fd) + handle = self._add_writer( + fd, self._sock_connect_cb, fut, sock, address) fut.add_done_callback( - functools.partial(self._sock_write_done, fd, handle=handle)) - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as exc: + functools.partial(self._sock_write_done, fd, handle=handle)) + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: fut.set_exception(exc) else: fut.set_result(None) - def _sock_write_done(self, fd, fut, handle=None): - if handle is None or not handle.cancelled(): - self.remove_writer(fd) + def _sock_write_done(self, fd, fut, handle=None): + if handle is None or not handle.cancelled(): + self.remove_writer(fd) def _sock_connect_cb(self, fut, sock, address): - if fut.done(): + if fut.done(): return try: @@ -538,9 +538,9 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): except (BlockingIOError, InterruptedError): # socket is still registered, the callback will be retried later pass - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as exc: + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: fut.set_exception(exc) else: fut.set_result(None) @@ -553,26 +553,26 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): object usable to send and receive data on the connection, and address is the address bound to the socket on the other end of the connection. """ - _check_ssl_socket(sock) + _check_ssl_socket(sock) if self._debug and sock.gettimeout() != 0: raise ValueError("the socket must be non-blocking") fut = self.create_future() - self._sock_accept(fut, sock) + self._sock_accept(fut, sock) return await fut - def _sock_accept(self, fut, sock): + def _sock_accept(self, fut, sock): fd = sock.fileno() try: conn, address = sock.accept() conn.setblocking(False) except (BlockingIOError, InterruptedError): - self._ensure_fd_no_transport(fd) - handle = self._add_reader(fd, self._sock_accept, fut, sock) - fut.add_done_callback( - functools.partial(self._sock_read_done, fd, handle=handle)) - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as exc: + self._ensure_fd_no_transport(fd) + handle = self._add_reader(fd, self._sock_accept, fut, sock) + fut.add_done_callback( + functools.partial(self._sock_read_done, fd, handle=handle)) + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: fut.set_exception(exc) else: fut.set_result((conn, address)) @@ -624,11 +624,11 @@ class _SelectorTransport(transports._FlowControlMixin, def __init__(self, loop, sock, protocol, extra=None, server=None): super().__init__(extra, loop) - self._extra['socket'] = trsock.TransportSocket(sock) - try: - self._extra['sockname'] = sock.getsockname() - except OSError: - self._extra['sockname'] = None + self._extra['socket'] = trsock.TransportSocket(sock) + try: + self._extra['sockname'] = sock.getsockname() + except OSError: + self._extra['sockname'] = None if 'peername' not in self._extra: try: self._extra['peername'] = sock.getpeername() @@ -699,14 +699,14 @@ class _SelectorTransport(transports._FlowControlMixin, self._loop._remove_writer(self._sock_fd) self._loop.call_soon(self._call_connection_lost, None) - def __del__(self, _warn=warnings.warn): + def __del__(self, _warn=warnings.warn): if self._sock is not None: - _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) + _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) self._sock.close() def _fatal_error(self, exc, message='Fatal error on transport'): # Should be called from exception handler only. - if isinstance(exc, OSError): + if isinstance(exc, OSError): if self._loop.get_debug(): logger.debug("%r: %s", self, message, exc_info=True) else: @@ -820,9 +820,9 @@ class _SelectorSocketTransport(_SelectorTransport): buf = self._protocol.get_buffer(-1) if not len(buf): raise RuntimeError('get_buffer() returned an empty buffer') - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as exc: + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: self._fatal_error( exc, 'Fatal error: protocol.get_buffer() call failed.') return @@ -831,9 +831,9 @@ class _SelectorSocketTransport(_SelectorTransport): nbytes = self._sock.recv_into(buf) except (BlockingIOError, InterruptedError): return - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as exc: + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: self._fatal_error(exc, 'Fatal read error on socket transport') return @@ -843,9 +843,9 @@ class _SelectorSocketTransport(_SelectorTransport): try: self._protocol.buffer_updated(nbytes) - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as exc: + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: self._fatal_error( exc, 'Fatal error: protocol.buffer_updated() call failed.') @@ -856,9 +856,9 @@ class _SelectorSocketTransport(_SelectorTransport): data = self._sock.recv(self.max_size) except (BlockingIOError, InterruptedError): return - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as exc: + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: self._fatal_error(exc, 'Fatal read error on socket transport') return @@ -868,9 +868,9 @@ class _SelectorSocketTransport(_SelectorTransport): try: self._protocol.data_received(data) - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as exc: + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: self._fatal_error( exc, 'Fatal error: protocol.data_received() call failed.') @@ -880,9 +880,9 @@ class _SelectorSocketTransport(_SelectorTransport): try: keep_open = self._protocol.eof_received() - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as exc: + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: self._fatal_error( exc, 'Fatal error: protocol.eof_received() call failed.') return @@ -918,9 +918,9 @@ class _SelectorSocketTransport(_SelectorTransport): n = self._sock.send(data) except (BlockingIOError, InterruptedError): pass - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as exc: + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: self._fatal_error(exc, 'Fatal write error on socket transport') return else: @@ -943,9 +943,9 @@ class _SelectorSocketTransport(_SelectorTransport): n = self._sock.send(self._buffer) except (BlockingIOError, InterruptedError): pass - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as exc: + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: self._loop._remove_writer(self._sock_fd) self._buffer.clear() self._fatal_error(exc, 'Fatal write error on socket transport') @@ -1021,9 +1021,9 @@ class _SelectorDatagramTransport(_SelectorTransport): pass except OSError as exc: self._protocol.error_received(exc) - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as exc: + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: self._fatal_error(exc, 'Fatal read error on datagram transport') else: self._protocol.datagram_received(data, addr) @@ -1035,11 +1035,11 @@ class _SelectorDatagramTransport(_SelectorTransport): if not data: return - if self._address: - if addr not in (None, self._address): - raise ValueError( - f'Invalid address: must be None or {self._address}') - addr = self._address + if self._address: + if addr not in (None, self._address): + raise ValueError( + f'Invalid address: must be None or {self._address}') + addr = self._address if self._conn_lost and self._address: if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: @@ -1050,7 +1050,7 @@ class _SelectorDatagramTransport(_SelectorTransport): if not self._buffer: # Attempt to send it right away first. try: - if self._extra['peername']: + if self._extra['peername']: self._sock.send(data) else: self._sock.sendto(data, addr) @@ -1060,9 +1060,9 @@ class _SelectorDatagramTransport(_SelectorTransport): except OSError as exc: self._protocol.error_received(exc) return - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as exc: + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: self._fatal_error( exc, 'Fatal write error on datagram transport') return @@ -1075,7 +1075,7 @@ class _SelectorDatagramTransport(_SelectorTransport): while self._buffer: data, addr = self._buffer.popleft() try: - if self._extra['peername']: + if self._extra['peername']: self._sock.send(data) else: self._sock.sendto(data, addr) @@ -1085,9 +1085,9 @@ class _SelectorDatagramTransport(_SelectorTransport): except OSError as exc: self._protocol.error_received(exc) return - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as exc: + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: self._fatal_error( exc, 'Fatal write error on datagram transport') return diff --git a/contrib/tools/python3/src/Lib/asyncio/sslproto.py b/contrib/tools/python3/src/Lib/asyncio/sslproto.py index 6731363a7a..cad25b2653 100644 --- a/contrib/tools/python3/src/Lib/asyncio/sslproto.py +++ b/contrib/tools/python3/src/Lib/asyncio/sslproto.py @@ -315,9 +315,9 @@ class _SSLProtocolTransport(transports._FlowControlMixin, self._closed = True self._ssl_protocol._start_shutdown() - def __del__(self, _warn=warnings.warn): + def __del__(self, _warn=warnings.warn): if not self._closed: - _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) + _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) self.close() def is_reading(self): @@ -497,11 +497,11 @@ class SSLProtocol(protocols.Protocol): self._app_transport._closed = True self._transport = None self._app_transport = None - if getattr(self, '_handshake_timeout_handle', None): - self._handshake_timeout_handle.cancel() + if getattr(self, '_handshake_timeout_handle', None): + self._handshake_timeout_handle.cancel() self._wakeup_waiter(exc) - self._app_protocol = None - self._sslpipe = None + self._app_protocol = None + self._sslpipe = None def pause_writing(self): """Called when the low-level transport's buffer goes over @@ -526,9 +526,9 @@ class SSLProtocol(protocols.Protocol): try: ssldata, appdata = self._sslpipe.feed_ssldata(data) - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as e: + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as e: self._fatal_error(e, 'SSL error in data received') return @@ -543,9 +543,9 @@ class SSLProtocol(protocols.Protocol): self._app_protocol, chunk) else: self._app_protocol.data_received(chunk) - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as ex: + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as ex: self._fatal_error( ex, 'application protocol failed to receive SSL data') return @@ -631,9 +631,9 @@ class SSLProtocol(protocols.Protocol): raise handshake_exc peercert = sslobj.getpeercert() - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as exc: + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: if isinstance(exc, ssl.CertificateError): msg = 'SSL handshake failed on verifying the certificate' else: @@ -696,9 +696,9 @@ class SSLProtocol(protocols.Protocol): # delete it and reduce the outstanding buffer size. del self._write_backlog[0] self._write_buffer_size -= len(data) - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as exc: + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: if self._in_handshake: # Exceptions will be re-raised in _on_handshake_complete. self._on_handshake_complete(exc) @@ -706,7 +706,7 @@ class SSLProtocol(protocols.Protocol): self._fatal_error(exc, 'Fatal error on SSL transport') def _fatal_error(self, exc, message='Fatal error on transport'): - if isinstance(exc, OSError): + if isinstance(exc, OSError): if self._loop.get_debug(): logger.debug("%r: %s", self, message, exc_info=True) else: diff --git a/contrib/tools/python3/src/Lib/asyncio/staggered.py b/contrib/tools/python3/src/Lib/asyncio/staggered.py index 9627efdf11..451a53a16f 100644 --- a/contrib/tools/python3/src/Lib/asyncio/staggered.py +++ b/contrib/tools/python3/src/Lib/asyncio/staggered.py @@ -1,149 +1,149 @@ -"""Support for running coroutines in parallel with staggered start times.""" - -__all__ = 'staggered_race', - -import contextlib -import typing - -from . import events -from . import exceptions as exceptions_mod -from . import locks -from . import tasks - - -async def staggered_race( - coro_fns: typing.Iterable[typing.Callable[[], typing.Awaitable]], - delay: typing.Optional[float], - *, - loop: events.AbstractEventLoop = None, -) -> typing.Tuple[ - typing.Any, - typing.Optional[int], - typing.List[typing.Optional[Exception]] -]: - """Run coroutines with staggered start times and take the first to finish. - - This method takes an iterable of coroutine functions. The first one is - started immediately. From then on, whenever the immediately preceding one - fails (raises an exception), or when *delay* seconds has passed, the next - coroutine is started. This continues until one of the coroutines complete - successfully, in which case all others are cancelled, or until all - coroutines fail. - - The coroutines provided should be well-behaved in the following way: - - * They should only ``return`` if completed successfully. - - * They should always raise an exception if they did not complete - successfully. In particular, if they handle cancellation, they should - probably reraise, like this:: - - try: - # do work - except asyncio.CancelledError: - # undo partially completed work - raise - - Args: - coro_fns: an iterable of coroutine functions, i.e. callables that - return a coroutine object when called. Use ``functools.partial`` or - lambdas to pass arguments. - - delay: amount of time, in seconds, between starting coroutines. If - ``None``, the coroutines will run sequentially. - - loop: the event loop to use. - - Returns: - tuple *(winner_result, winner_index, exceptions)* where - - - *winner_result*: the result of the winning coroutine, or ``None`` - if no coroutines won. - - - *winner_index*: the index of the winning coroutine in - ``coro_fns``, or ``None`` if no coroutines won. If the winning - coroutine may return None on success, *winner_index* can be used - to definitively determine whether any coroutine won. - - - *exceptions*: list of exceptions returned by the coroutines. - ``len(exceptions)`` is equal to the number of coroutines actually - started, and the order is the same as in ``coro_fns``. The winning - coroutine's entry is ``None``. - - """ - # TODO: when we have aiter() and anext(), allow async iterables in coro_fns. - loop = loop or events.get_running_loop() - enum_coro_fns = enumerate(coro_fns) - winner_result = None - winner_index = None - exceptions = [] - running_tasks = [] - - async def run_one_coro( - previous_failed: typing.Optional[locks.Event]) -> None: - # Wait for the previous task to finish, or for delay seconds - if previous_failed is not None: - with contextlib.suppress(exceptions_mod.TimeoutError): - # Use asyncio.wait_for() instead of asyncio.wait() here, so - # that if we get cancelled at this point, Event.wait() is also - # cancelled, otherwise there will be a "Task destroyed but it is - # pending" later. - await tasks.wait_for(previous_failed.wait(), delay) - # Get the next coroutine to run - try: - this_index, coro_fn = next(enum_coro_fns) - except StopIteration: - return - # Start task that will run the next coroutine - this_failed = locks.Event() - next_task = loop.create_task(run_one_coro(this_failed)) - running_tasks.append(next_task) - assert len(running_tasks) == this_index + 2 - # Prepare place to put this coroutine's exceptions if not won - exceptions.append(None) - assert len(exceptions) == this_index + 1 - - try: - result = await coro_fn() - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as e: - exceptions[this_index] = e - this_failed.set() # Kickstart the next coroutine - else: - # Store winner's results - nonlocal winner_index, winner_result - assert winner_index is None - winner_index = this_index - winner_result = result - # Cancel all other tasks. We take care to not cancel the current - # task as well. If we do so, then since there is no `await` after - # here and CancelledError are usually thrown at one, we will - # encounter a curious corner case where the current task will end - # up as done() == True, cancelled() == False, exception() == - # asyncio.CancelledError. This behavior is specified in - # https://bugs.python.org/issue30048 - for i, t in enumerate(running_tasks): - if i != this_index: - t.cancel() - - first_task = loop.create_task(run_one_coro(None)) - running_tasks.append(first_task) - try: - # Wait for a growing list of tasks to all finish: poor man's version of - # curio's TaskGroup or trio's nursery - done_count = 0 - while done_count != len(running_tasks): - done, _ = await tasks.wait(running_tasks) - done_count = len(done) - # If run_one_coro raises an unhandled exception, it's probably a - # programming error, and I want to see it. - if __debug__: - for d in done: - if d.done() and not d.cancelled() and d.exception(): - raise d.exception() - return winner_result, winner_index, exceptions - finally: - # Make sure no tasks are left running if we leave this function - for t in running_tasks: - t.cancel() +"""Support for running coroutines in parallel with staggered start times.""" + +__all__ = 'staggered_race', + +import contextlib +import typing + +from . import events +from . import exceptions as exceptions_mod +from . import locks +from . import tasks + + +async def staggered_race( + coro_fns: typing.Iterable[typing.Callable[[], typing.Awaitable]], + delay: typing.Optional[float], + *, + loop: events.AbstractEventLoop = None, +) -> typing.Tuple[ + typing.Any, + typing.Optional[int], + typing.List[typing.Optional[Exception]] +]: + """Run coroutines with staggered start times and take the first to finish. + + This method takes an iterable of coroutine functions. The first one is + started immediately. From then on, whenever the immediately preceding one + fails (raises an exception), or when *delay* seconds has passed, the next + coroutine is started. This continues until one of the coroutines complete + successfully, in which case all others are cancelled, or until all + coroutines fail. + + The coroutines provided should be well-behaved in the following way: + + * They should only ``return`` if completed successfully. + + * They should always raise an exception if they did not complete + successfully. In particular, if they handle cancellation, they should + probably reraise, like this:: + + try: + # do work + except asyncio.CancelledError: + # undo partially completed work + raise + + Args: + coro_fns: an iterable of coroutine functions, i.e. callables that + return a coroutine object when called. Use ``functools.partial`` or + lambdas to pass arguments. + + delay: amount of time, in seconds, between starting coroutines. If + ``None``, the coroutines will run sequentially. + + loop: the event loop to use. + + Returns: + tuple *(winner_result, winner_index, exceptions)* where + + - *winner_result*: the result of the winning coroutine, or ``None`` + if no coroutines won. + + - *winner_index*: the index of the winning coroutine in + ``coro_fns``, or ``None`` if no coroutines won. If the winning + coroutine may return None on success, *winner_index* can be used + to definitively determine whether any coroutine won. + + - *exceptions*: list of exceptions returned by the coroutines. + ``len(exceptions)`` is equal to the number of coroutines actually + started, and the order is the same as in ``coro_fns``. The winning + coroutine's entry is ``None``. + + """ + # TODO: when we have aiter() and anext(), allow async iterables in coro_fns. + loop = loop or events.get_running_loop() + enum_coro_fns = enumerate(coro_fns) + winner_result = None + winner_index = None + exceptions = [] + running_tasks = [] + + async def run_one_coro( + previous_failed: typing.Optional[locks.Event]) -> None: + # Wait for the previous task to finish, or for delay seconds + if previous_failed is not None: + with contextlib.suppress(exceptions_mod.TimeoutError): + # Use asyncio.wait_for() instead of asyncio.wait() here, so + # that if we get cancelled at this point, Event.wait() is also + # cancelled, otherwise there will be a "Task destroyed but it is + # pending" later. + await tasks.wait_for(previous_failed.wait(), delay) + # Get the next coroutine to run + try: + this_index, coro_fn = next(enum_coro_fns) + except StopIteration: + return + # Start task that will run the next coroutine + this_failed = locks.Event() + next_task = loop.create_task(run_one_coro(this_failed)) + running_tasks.append(next_task) + assert len(running_tasks) == this_index + 2 + # Prepare place to put this coroutine's exceptions if not won + exceptions.append(None) + assert len(exceptions) == this_index + 1 + + try: + result = await coro_fn() + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as e: + exceptions[this_index] = e + this_failed.set() # Kickstart the next coroutine + else: + # Store winner's results + nonlocal winner_index, winner_result + assert winner_index is None + winner_index = this_index + winner_result = result + # Cancel all other tasks. We take care to not cancel the current + # task as well. If we do so, then since there is no `await` after + # here and CancelledError are usually thrown at one, we will + # encounter a curious corner case where the current task will end + # up as done() == True, cancelled() == False, exception() == + # asyncio.CancelledError. This behavior is specified in + # https://bugs.python.org/issue30048 + for i, t in enumerate(running_tasks): + if i != this_index: + t.cancel() + + first_task = loop.create_task(run_one_coro(None)) + running_tasks.append(first_task) + try: + # Wait for a growing list of tasks to all finish: poor man's version of + # curio's TaskGroup or trio's nursery + done_count = 0 + while done_count != len(running_tasks): + done, _ = await tasks.wait(running_tasks) + done_count = len(done) + # If run_one_coro raises an unhandled exception, it's probably a + # programming error, and I want to see it. + if __debug__: + for d in done: + if d.done() and not d.cancelled() and d.exception(): + raise d.exception() + return winner_result, winner_index, exceptions + finally: + # Make sure no tasks are left running if we leave this function + for t in running_tasks: + t.cancel() diff --git a/contrib/tools/python3/src/Lib/asyncio/streams.py b/contrib/tools/python3/src/Lib/asyncio/streams.py index 9c2cd4ad6b..3c80bb8892 100644 --- a/contrib/tools/python3/src/Lib/asyncio/streams.py +++ b/contrib/tools/python3/src/Lib/asyncio/streams.py @@ -1,19 +1,19 @@ __all__ = ( 'StreamReader', 'StreamWriter', 'StreamReaderProtocol', - 'open_connection', 'start_server') + 'open_connection', 'start_server') import socket -import sys -import warnings -import weakref +import sys +import warnings +import weakref if hasattr(socket, 'AF_UNIX'): __all__ += ('open_unix_connection', 'start_unix_server') from . import coroutines from . import events -from . import exceptions -from . import format_helpers +from . import exceptions +from . import format_helpers from . import protocols from .log import logger from .tasks import sleep @@ -43,10 +43,10 @@ async def open_connection(host=None, port=None, *, """ if loop is None: loop = events.get_event_loop() - else: - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) + else: + warnings.warn("The loop argument is deprecated since Python 3.8, " + "and scheduled for removal in Python 3.10.", + DeprecationWarning, stacklevel=2) reader = StreamReader(limit=limit, loop=loop) protocol = StreamReaderProtocol(reader, loop=loop) transport, _ = await loop.create_connection( @@ -80,10 +80,10 @@ async def start_server(client_connected_cb, host=None, port=None, *, """ if loop is None: loop = events.get_event_loop() - else: - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) + else: + warnings.warn("The loop argument is deprecated since Python 3.8, " + "and scheduled for removal in Python 3.10.", + DeprecationWarning, stacklevel=2) def factory(): reader = StreamReader(limit=limit, loop=loop) @@ -102,10 +102,10 @@ if hasattr(socket, 'AF_UNIX'): """Similar to `open_connection` but works with UNIX Domain Sockets.""" if loop is None: loop = events.get_event_loop() - else: - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) + else: + warnings.warn("The loop argument is deprecated since Python 3.8, " + "and scheduled for removal in Python 3.10.", + DeprecationWarning, stacklevel=2) reader = StreamReader(limit=limit, loop=loop) protocol = StreamReaderProtocol(reader, loop=loop) transport, _ = await loop.create_unix_connection( @@ -118,10 +118,10 @@ if hasattr(socket, 'AF_UNIX'): """Similar to `start_server` but works with UNIX Domain Sockets.""" if loop is None: loop = events.get_event_loop() - else: - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) + else: + warnings.warn("The loop argument is deprecated since Python 3.8, " + "and scheduled for removal in Python 3.10.", + DeprecationWarning, stacklevel=2) def factory(): reader = StreamReader(limit=limit, loop=loop) @@ -196,10 +196,10 @@ class FlowControlMixin(protocols.Protocol): self._drain_waiter = waiter await waiter - def _get_close_waiter(self, stream): - raise NotImplementedError + def _get_close_waiter(self, stream): + raise NotImplementedError + - class StreamReaderProtocol(FlowControlMixin, protocols.Protocol): """Helper class to adapt between Protocol and StreamReader. @@ -209,86 +209,86 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol): call inappropriate methods of the protocol.) """ - _source_traceback = None - + _source_traceback = None + def __init__(self, stream_reader, client_connected_cb=None, loop=None): super().__init__(loop=loop) - if stream_reader is not None: - self._stream_reader_wr = weakref.ref(stream_reader) - self._source_traceback = stream_reader._source_traceback - else: - self._stream_reader_wr = None - if client_connected_cb is not None: - # This is a stream created by the `create_server()` function. - # Keep a strong reference to the reader until a connection - # is established. - self._strong_reader = stream_reader - self._reject_connection = False + if stream_reader is not None: + self._stream_reader_wr = weakref.ref(stream_reader) + self._source_traceback = stream_reader._source_traceback + else: + self._stream_reader_wr = None + if client_connected_cb is not None: + # This is a stream created by the `create_server()` function. + # Keep a strong reference to the reader until a connection + # is established. + self._strong_reader = stream_reader + self._reject_connection = False self._stream_writer = None - self._transport = None + self._transport = None self._client_connected_cb = client_connected_cb self._over_ssl = False self._closed = self._loop.create_future() - @property - def _stream_reader(self): - if self._stream_reader_wr is None: - return None - return self._stream_reader_wr() - + @property + def _stream_reader(self): + if self._stream_reader_wr is None: + return None + return self._stream_reader_wr() + def connection_made(self, transport): - if self._reject_connection: - context = { - 'message': ('An open stream was garbage collected prior to ' - 'establishing network connection; ' - 'call "stream.close()" explicitly.') - } - if self._source_traceback: - context['source_traceback'] = self._source_traceback - self._loop.call_exception_handler(context) - transport.abort() - return - self._transport = transport - reader = self._stream_reader - if reader is not None: - reader.set_transport(transport) + if self._reject_connection: + context = { + 'message': ('An open stream was garbage collected prior to ' + 'establishing network connection; ' + 'call "stream.close()" explicitly.') + } + if self._source_traceback: + context['source_traceback'] = self._source_traceback + self._loop.call_exception_handler(context) + transport.abort() + return + self._transport = transport + reader = self._stream_reader + if reader is not None: + reader.set_transport(transport) self._over_ssl = transport.get_extra_info('sslcontext') is not None if self._client_connected_cb is not None: self._stream_writer = StreamWriter(transport, self, - reader, + reader, self._loop) - res = self._client_connected_cb(reader, + res = self._client_connected_cb(reader, self._stream_writer) if coroutines.iscoroutine(res): self._loop.create_task(res) - self._strong_reader = None + self._strong_reader = None def connection_lost(self, exc): - reader = self._stream_reader - if reader is not None: + reader = self._stream_reader + if reader is not None: if exc is None: - reader.feed_eof() + reader.feed_eof() else: - reader.set_exception(exc) + reader.set_exception(exc) if not self._closed.done(): if exc is None: self._closed.set_result(None) else: self._closed.set_exception(exc) super().connection_lost(exc) - self._stream_reader_wr = None + self._stream_reader_wr = None self._stream_writer = None - self._transport = None + self._transport = None def data_received(self, data): - reader = self._stream_reader - if reader is not None: - reader.feed_data(data) + reader = self._stream_reader + if reader is not None: + reader.feed_data(data) def eof_received(self): - reader = self._stream_reader - if reader is not None: - reader.feed_eof() + reader = self._stream_reader + if reader is not None: + reader.feed_eof() if self._over_ssl: # Prevent a warning in SSLProtocol.eof_received: # "returning true from eof_received() @@ -296,9 +296,9 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol): return False return True - def _get_close_waiter(self, stream): - return self._closed - + def _get_close_waiter(self, stream): + return self._closed + def __del__(self): # Prevent reports about unhandled exceptions. # Better than self._closed._log_traceback = False hack @@ -324,8 +324,8 @@ class StreamWriter: assert reader is None or isinstance(reader, StreamReader) self._reader = reader self._loop = loop - self._complete_fut = self._loop.create_future() - self._complete_fut.set_result(None) + self._complete_fut = self._loop.create_future() + self._complete_fut.set_result(None) def __repr__(self): info = [self.__class__.__name__, f'transport={self._transport!r}'] @@ -356,7 +356,7 @@ class StreamWriter: return self._transport.is_closing() async def wait_closed(self): - await self._protocol._get_close_waiter(self) + await self._protocol._get_close_waiter(self) def get_extra_info(self, name, default=None): return self._transport.get_extra_info(name, default) @@ -374,23 +374,23 @@ class StreamWriter: if exc is not None: raise exc if self._transport.is_closing(): - # Wait for protocol.connection_lost() call - # Raise connection closing error if any, - # ConnectionResetError otherwise + # Wait for protocol.connection_lost() call + # Raise connection closing error if any, + # ConnectionResetError otherwise # Yield to the event loop so connection_lost() may be # called. Without this, _drain_helper() would return # immediately, and code that calls # write(...); await drain() # in a loop would never call connection_lost(), so it # would not see an error when the socket is closed. - await sleep(0) + await sleep(0) await self._protocol._drain_helper() class StreamReader: - _source_traceback = None - + _source_traceback = None + def __init__(self, limit=_DEFAULT_LIMIT, loop=None): # The line length limit is a security feature; # it also doubles as half the buffer limit. @@ -409,9 +409,9 @@ class StreamReader: self._exception = None self._transport = None self._paused = False - if self._loop.get_debug(): - self._source_traceback = format_helpers.extract_stack( - sys._getframe(1)) + if self._loop.get_debug(): + self._source_traceback = format_helpers.extract_stack( + sys._getframe(1)) def __repr__(self): info = ['StreamReader'] @@ -538,9 +538,9 @@ class StreamReader: seplen = len(sep) try: line = await self.readuntil(sep) - except exceptions.IncompleteReadError as e: + except exceptions.IncompleteReadError as e: return e.partial - except exceptions.LimitOverrunError as e: + except exceptions.LimitOverrunError as e: if self._buffer.startswith(sep, e.consumed): del self._buffer[:e.consumed + seplen] else: @@ -615,7 +615,7 @@ class StreamReader: # see upper comment for explanation. offset = buflen + 1 - seplen if offset > self._limit: - raise exceptions.LimitOverrunError( + raise exceptions.LimitOverrunError( 'Separator is not found, and chunk exceed the limit', offset) @@ -626,13 +626,13 @@ class StreamReader: if self._eof: chunk = bytes(self._buffer) self._buffer.clear() - raise exceptions.IncompleteReadError(chunk, None) + raise exceptions.IncompleteReadError(chunk, None) # _wait_for_data() will resume reading if stream was paused. await self._wait_for_data('readuntil') if isep > self._limit: - raise exceptions.LimitOverrunError( + raise exceptions.LimitOverrunError( 'Separator is found, but chunk is longer than limit', isep) chunk = self._buffer[:isep + seplen] @@ -718,7 +718,7 @@ class StreamReader: if self._eof: incomplete = bytes(self._buffer) self._buffer.clear() - raise exceptions.IncompleteReadError(incomplete, n) + raise exceptions.IncompleteReadError(incomplete, n) await self._wait_for_data('readexactly') diff --git a/contrib/tools/python3/src/Lib/asyncio/subprocess.py b/contrib/tools/python3/src/Lib/asyncio/subprocess.py index 73b0713687..820304ecca 100644 --- a/contrib/tools/python3/src/Lib/asyncio/subprocess.py +++ b/contrib/tools/python3/src/Lib/asyncio/subprocess.py @@ -1,7 +1,7 @@ __all__ = 'create_subprocess_exec', 'create_subprocess_shell' import subprocess -import warnings +import warnings from . import events from . import protocols @@ -26,7 +26,7 @@ class SubprocessStreamProtocol(streams.FlowControlMixin, self._transport = None self._process_exited = False self._pipe_fds = [] - self._stdin_closed = self._loop.create_future() + self._stdin_closed = self._loop.create_future() def __repr__(self): info = [self.__class__.__name__] @@ -78,10 +78,10 @@ class SubprocessStreamProtocol(streams.FlowControlMixin, if pipe is not None: pipe.close() self.connection_lost(exc) - if exc is None: - self._stdin_closed.set_result(None) - else: - self._stdin_closed.set_exception(exc) + if exc is None: + self._stdin_closed.set_result(None) + else: + self._stdin_closed.set_exception(exc) return if fd == 1: reader = self.stdout @@ -108,11 +108,11 @@ class SubprocessStreamProtocol(streams.FlowControlMixin, self._transport.close() self._transport = None - def _get_close_waiter(self, stream): - if stream is self.stdin: - return self._stdin_closed + def _get_close_waiter(self, stream): + if stream is self.stdin: + return self._stdin_closed + - class Process: def __init__(self, transport, protocol, loop): self._transport = transport @@ -193,8 +193,8 @@ class Process: stderr = self._read_stream(2) else: stderr = self._noop() - stdin, stdout, stderr = await tasks._gather(stdin, stdout, stderr, - loop=self._loop) + stdin, stdout, stderr = await tasks._gather(stdin, stdout, stderr, + loop=self._loop) await self.wait() return (stdout, stderr) @@ -204,13 +204,13 @@ async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None, **kwds): if loop is None: loop = events.get_event_loop() - else: - warnings.warn("The loop argument is deprecated since Python 3.8 " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, - stacklevel=2 - ) - + else: + warnings.warn("The loop argument is deprecated since Python 3.8 " + "and scheduled for removal in Python 3.10.", + DeprecationWarning, + stacklevel=2 + ) + protocol_factory = lambda: SubprocessStreamProtocol(limit=limit, loop=loop) transport, protocol = await loop.subprocess_shell( @@ -225,12 +225,12 @@ async def create_subprocess_exec(program, *args, stdin=None, stdout=None, limit=streams._DEFAULT_LIMIT, **kwds): if loop is None: loop = events.get_event_loop() - else: - warnings.warn("The loop argument is deprecated since Python 3.8 " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, - stacklevel=2 - ) + else: + warnings.warn("The loop argument is deprecated since Python 3.8 " + "and scheduled for removal in Python 3.10.", + DeprecationWarning, + stacklevel=2 + ) protocol_factory = lambda: SubprocessStreamProtocol(limit=limit, loop=loop) transport, protocol = await loop.subprocess_exec( diff --git a/contrib/tools/python3/src/Lib/asyncio/tasks.py b/contrib/tools/python3/src/Lib/asyncio/tasks.py index d378a369ba..27a3c8c5a8 100644 --- a/contrib/tools/python3/src/Lib/asyncio/tasks.py +++ b/contrib/tools/python3/src/Lib/asyncio/tasks.py @@ -13,7 +13,7 @@ import concurrent.futures import contextvars import functools import inspect -import itertools +import itertools import types import warnings import weakref @@ -21,16 +21,16 @@ import weakref from . import base_tasks from . import coroutines from . import events -from . import exceptions +from . import exceptions from . import futures -from .coroutines import _is_coroutine +from .coroutines import _is_coroutine + +# Helper to generate new task names +# This uses itertools.count() instead of a "+= 1" operation because the latter +# is not thread safe. See bpo-11866 for a longer explanation. +_task_name_counter = itertools.count(1).__next__ -# Helper to generate new task names -# This uses itertools.count() instead of a "+= 1" operation because the latter -# is not thread safe. See bpo-11866 for a longer explanation. -_task_name_counter = itertools.count(1).__next__ - def current_task(loop=None): """Return a currently executed task.""" if loop is None: @@ -42,22 +42,22 @@ def all_tasks(loop=None): """Return a set of all tasks for the loop.""" if loop is None: loop = events.get_running_loop() - # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another - # thread while we do so. Therefore we cast it to list prior to filtering. The list - # cast itself requires iteration, so we repeat it several times ignoring - # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for - # details. - i = 0 - while True: - try: - tasks = list(_all_tasks) - except RuntimeError: - i += 1 - if i >= 1000: - raise - else: - break - return {t for t in tasks + # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another + # thread while we do so. Therefore we cast it to list prior to filtering. The list + # cast itself requires iteration, so we repeat it several times ignoring + # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for + # details. + i = 0 + while True: + try: + tasks = list(_all_tasks) + except RuntimeError: + i += 1 + if i >= 1000: + raise + else: + break + return {t for t in tasks if futures._get_loop(t) is loop and not t.done()} @@ -67,34 +67,34 @@ def _all_tasks_compat(loop=None): # method. if loop is None: loop = events.get_event_loop() - # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another - # thread while we do so. Therefore we cast it to list prior to filtering. The list - # cast itself requires iteration, so we repeat it several times ignoring - # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for - # details. - i = 0 - while True: - try: - tasks = list(_all_tasks) - except RuntimeError: - i += 1 - if i >= 1000: - raise - else: - break - return {t for t in tasks if futures._get_loop(t) is loop} - - -def _set_task_name(task, name): - if name is not None: - try: - set_name = task.set_name - except AttributeError: - pass - else: - set_name(name) - - + # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another + # thread while we do so. Therefore we cast it to list prior to filtering. The list + # cast itself requires iteration, so we repeat it several times ignoring + # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for + # details. + i = 0 + while True: + try: + tasks = list(_all_tasks) + except RuntimeError: + i += 1 + if i >= 1000: + raise + else: + break + return {t for t in tasks if futures._get_loop(t) is loop} + + +def _set_task_name(task, name): + if name is not None: + try: + set_name = task.set_name + except AttributeError: + pass + else: + set_name(name) + + class Task(futures._PyFuture): # Inherit Python Task implementation # from a Python Future implementation. @@ -113,7 +113,7 @@ class Task(futures._PyFuture): # Inherit Python Task implementation # status is still pending _log_destroy_pending = True - def __init__(self, coro, *, loop=None, name=None): + def __init__(self, coro, *, loop=None, name=None): super().__init__(loop=loop) if self._source_traceback: del self._source_traceback[-1] @@ -123,11 +123,11 @@ class Task(futures._PyFuture): # Inherit Python Task implementation self._log_destroy_pending = False raise TypeError(f"a coroutine was expected, got {coro!r}") - if name is None: - self._name = f'Task-{_task_name_counter()}' - else: - self._name = str(name) - + if name is None: + self._name = f'Task-{_task_name_counter()}' + else: + self._name = str(name) + self._must_cancel = False self._fut_waiter = None self._coro = coro @@ -147,21 +147,21 @@ class Task(futures._PyFuture): # Inherit Python Task implementation self._loop.call_exception_handler(context) super().__del__() - def __class_getitem__(cls, type): - return cls - + def __class_getitem__(cls, type): + return cls + def _repr_info(self): return base_tasks._task_repr_info(self) - def get_coro(self): - return self._coro - - def get_name(self): - return self._name - - def set_name(self, value): - self._name = str(value) - + def get_coro(self): + return self._coro + + def get_name(self): + return self._name + + def set_name(self, value): + self._name = str(value) + def set_result(self, result): raise RuntimeError('Task does not support set_result operation') @@ -202,7 +202,7 @@ class Task(futures._PyFuture): # Inherit Python Task implementation """ return base_tasks._task_print_stack(self, limit, file) - def cancel(self, msg=None): + def cancel(self, msg=None): """Request that this task cancel itself. This arranges for a CancelledError to be thrown into the @@ -226,23 +226,23 @@ class Task(futures._PyFuture): # Inherit Python Task implementation if self.done(): return False if self._fut_waiter is not None: - if self._fut_waiter.cancel(msg=msg): + if self._fut_waiter.cancel(msg=msg): # Leave self._fut_waiter; it may be a Task that # catches and ignores the cancellation so we may have # to cancel it again later. return True # It must be the case that self.__step is already scheduled. self._must_cancel = True - self._cancel_message = msg + self._cancel_message = msg return True def __step(self, exc=None): if self.done(): - raise exceptions.InvalidStateError( + raise exceptions.InvalidStateError( f'_step(): already done: {self!r}, {exc!r}') if self._must_cancel: - if not isinstance(exc, exceptions.CancelledError): - exc = self._make_cancelled_error() + if not isinstance(exc, exceptions.CancelledError): + exc = self._make_cancelled_error() self._must_cancel = False coro = self._coro self._fut_waiter = None @@ -260,16 +260,16 @@ class Task(futures._PyFuture): # Inherit Python Task implementation if self._must_cancel: # Task is cancelled right before coro stops. self._must_cancel = False - super().cancel(msg=self._cancel_message) + super().cancel(msg=self._cancel_message) else: super().set_result(exc.value) - except exceptions.CancelledError as exc: - # Save the original exception so we can chain it later. - self._cancelled_exc = exc + except exceptions.CancelledError as exc: + # Save the original exception so we can chain it later. + self._cancelled_exc = exc super().cancel() # I.e., Future.cancel(self). - except (KeyboardInterrupt, SystemExit) as exc: + except (KeyboardInterrupt, SystemExit) as exc: super().set_exception(exc) - raise + raise except BaseException as exc: super().set_exception(exc) else: @@ -294,8 +294,8 @@ class Task(futures._PyFuture): # Inherit Python Task implementation self.__wakeup, context=self._context) self._fut_waiter = result if self._must_cancel: - if self._fut_waiter.cancel( - msg=self._cancel_message): + if self._fut_waiter.cancel( + msg=self._cancel_message): self._must_cancel = False else: new_exc = RuntimeError( @@ -326,7 +326,7 @@ class Task(futures._PyFuture): # Inherit Python Task implementation def __wakeup(self, future): try: future.result() - except BaseException as exc: + except BaseException as exc: # This may also be a cancellation. self.__step(exc) else: @@ -352,15 +352,15 @@ else: Task = _CTask = _asyncio.Task -def create_task(coro, *, name=None): +def create_task(coro, *, name=None): """Schedule the execution of a coroutine object in a spawn task. Return a Task object. """ loop = events.get_running_loop() - task = loop.create_task(coro) - _set_task_name(task, name) - return task + task = loop.create_task(coro) + _set_task_name(task, name) + return task # wait() and as_completed() similar to those in PEP 3148. @@ -373,7 +373,7 @@ ALL_COMPLETED = concurrent.futures.ALL_COMPLETED async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED): """Wait for the Futures and coroutines given by fs to complete. - The fs iterable must not be empty. + The fs iterable must not be empty. Coroutines will be wrapped in Tasks. @@ -394,22 +394,22 @@ async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED): raise ValueError(f'Invalid return_when value: {return_when}') if loop is None: - loop = events.get_running_loop() - else: - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) - - fs = set(fs) - - if any(coroutines.iscoroutine(f) for f in fs): - warnings.warn("The explicit passing of coroutine objects to " - "asyncio.wait() is deprecated since Python 3.8, and " - "scheduled for removal in Python 3.11.", - DeprecationWarning, stacklevel=2) - - fs = {ensure_future(f, loop=loop) for f in fs} - + loop = events.get_running_loop() + else: + warnings.warn("The loop argument is deprecated since Python 3.8, " + "and scheduled for removal in Python 3.10.", + DeprecationWarning, stacklevel=2) + + fs = set(fs) + + if any(coroutines.iscoroutine(f) for f in fs): + warnings.warn("The explicit passing of coroutine objects to " + "asyncio.wait() is deprecated since Python 3.8, and " + "scheduled for removal in Python 3.11.", + DeprecationWarning, stacklevel=2) + + fs = {ensure_future(f, loop=loop) for f in fs} + return await _wait(fs, timeout, return_when, loop) @@ -432,11 +432,11 @@ async def wait_for(fut, timeout, *, loop=None): This function is a coroutine. """ if loop is None: - loop = events.get_running_loop() - else: - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) + loop = events.get_running_loop() + else: + warnings.warn("The loop argument is deprecated since Python 3.8, " + "and scheduled for removal in Python 3.10.", + DeprecationWarning, stacklevel=2) if timeout is None: return await fut @@ -447,11 +447,11 @@ async def wait_for(fut, timeout, *, loop=None): if fut.done(): return fut.result() - await _cancel_and_wait(fut, loop=loop) - try: - return fut.result() - except exceptions.CancelledError as exc: - raise exceptions.TimeoutError() from exc + await _cancel_and_wait(fut, loop=loop) + try: + return fut.result() + except exceptions.CancelledError as exc: + raise exceptions.TimeoutError() from exc waiter = loop.create_future() timeout_handle = loop.call_later(timeout, _release_waiter, waiter) @@ -464,16 +464,16 @@ async def wait_for(fut, timeout, *, loop=None): # wait until the future completes or the timeout try: await waiter - except exceptions.CancelledError: - if fut.done(): - return fut.result() - else: - fut.remove_done_callback(cb) - # We must ensure that the task is not running - # after wait_for() returns. - # See https://bugs.python.org/issue32751 - await _cancel_and_wait(fut, loop=loop) - raise + except exceptions.CancelledError: + if fut.done(): + return fut.result() + else: + fut.remove_done_callback(cb) + # We must ensure that the task is not running + # after wait_for() returns. + # See https://bugs.python.org/issue32751 + await _cancel_and_wait(fut, loop=loop) + raise if fut.done(): return fut.result() @@ -483,13 +483,13 @@ async def wait_for(fut, timeout, *, loop=None): # after wait_for() returns. # See https://bugs.python.org/issue32751 await _cancel_and_wait(fut, loop=loop) - # In case task cancellation failed with some - # exception, we should re-raise it - # See https://bugs.python.org/issue40607 - try: - return fut.result() - except exceptions.CancelledError as exc: - raise exceptions.TimeoutError() from exc + # In case task cancellation failed with some + # exception, we should re-raise it + # See https://bugs.python.org/issue40607 + try: + return fut.result() + except exceptions.CancelledError as exc: + raise exceptions.TimeoutError() from exc finally: timeout_handle.cancel() @@ -526,8 +526,8 @@ async def _wait(fs, timeout, return_when, loop): finally: if timeout_handle is not None: timeout_handle.cancel() - for f in fs: - f.remove_done_callback(_on_completion) + for f in fs: + f.remove_done_callback(_on_completion) done, pending = set(), set() for f in fs: @@ -574,19 +574,19 @@ def as_completed(fs, *, loop=None, timeout=None): Note: The futures 'f' are not necessarily members of fs. """ if futures.isfuture(fs) or coroutines.iscoroutine(fs): - raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}") - - if loop is not None: - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) - + raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}") + + if loop is not None: + warnings.warn("The loop argument is deprecated since Python 3.8, " + "and scheduled for removal in Python 3.10.", + DeprecationWarning, stacklevel=2) + from .queues import Queue # Import here to avoid circular import problem. done = Queue(loop=loop) - - if loop is None: - loop = events.get_event_loop() - todo = {ensure_future(f, loop=loop) for f in set(fs)} + + if loop is None: + loop = events.get_event_loop() + todo = {ensure_future(f, loop=loop) for f in set(fs)} timeout_handle = None def _on_timeout(): @@ -607,7 +607,7 @@ def as_completed(fs, *, loop=None, timeout=None): f = await done.get() if f is None: # Dummy value from _on_timeout(). - raise exceptions.TimeoutError + raise exceptions.TimeoutError return f.result() # May raise f.exception(). for f in todo: @@ -632,18 +632,18 @@ def __sleep0(): async def sleep(delay, result=None, *, loop=None): """Coroutine that completes after a given time (in seconds).""" - if loop is not None: - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) - + if loop is not None: + warnings.warn("The loop argument is deprecated since Python 3.8, " + "and scheduled for removal in Python 3.10.", + DeprecationWarning, stacklevel=2) + if delay <= 0: await __sleep0() return result if loop is None: - loop = events.get_running_loop() - + loop = events.get_running_loop() + future = loop.create_future() h = loop.call_later(delay, futures._set_result_unless_cancelled, @@ -668,8 +668,8 @@ def ensure_future(coro_or_future, *, loop=None): return task elif futures.isfuture(coro_or_future): if loop is not None and loop is not futures._get_loop(coro_or_future): - raise ValueError('The future belongs to a different loop than ' - 'the one specified as the loop argument') + raise ValueError('The future belongs to a different loop than ' + 'the one specified as the loop argument') return coro_or_future elif inspect.isawaitable(coro_or_future): return ensure_future(_wrap_awaitable(coro_or_future), loop=loop) @@ -678,7 +678,7 @@ def ensure_future(coro_or_future, *, loop=None): 'required') -@types.coroutine +@types.coroutine def _wrap_awaitable(awaitable): """Helper for asyncio.ensure_future(). @@ -687,9 +687,9 @@ def _wrap_awaitable(awaitable): """ return (yield from awaitable.__await__()) -_wrap_awaitable._is_coroutine = _is_coroutine +_wrap_awaitable._is_coroutine = _is_coroutine + - class _GatheringFuture(futures.Future): """Helper for gather(). @@ -703,12 +703,12 @@ class _GatheringFuture(futures.Future): self._children = children self._cancel_requested = False - def cancel(self, msg=None): + def cancel(self, msg=None): if self.done(): return False ret = False for child in self._children: - if child.cancel(msg=msg): + if child.cancel(msg=msg): ret = True if ret: # If any child tasks were actually cancelled, we should @@ -740,23 +740,23 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False): the outer Future is *not* cancelled in this case. (This is to prevent the cancellation of one child to cause other children to be cancelled.) - - If *return_exceptions* is False, cancelling gather() after it - has been marked done won't cancel any submitted awaitables. - For instance, gather can be marked done after propagating an - exception to the caller, therefore, calling ``gather.cancel()`` - after catching an exception (raised by one of the awaitables) from - gather won't cancel any other awaitables. + + If *return_exceptions* is False, cancelling gather() after it + has been marked done won't cancel any submitted awaitables. + For instance, gather can be marked done after propagating an + exception to the caller, therefore, calling ``gather.cancel()`` + after catching an exception (raised by one of the awaitables) from + gather won't cancel any other awaitables. """ - if loop is not None: - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) - - return _gather(*coros_or_futures, loop=loop, return_exceptions=return_exceptions) - - -def _gather(*coros_or_futures, loop=None, return_exceptions=False): + if loop is not None: + warnings.warn("The loop argument is deprecated since Python 3.8, " + "and scheduled for removal in Python 3.10.", + DeprecationWarning, stacklevel=2) + + return _gather(*coros_or_futures, loop=loop, return_exceptions=return_exceptions) + + +def _gather(*coros_or_futures, loop=None, return_exceptions=False): if not coros_or_futures: if loop is None: loop = events.get_event_loop() @@ -779,7 +779,7 @@ def _gather(*coros_or_futures, loop=None, return_exceptions=False): # Check if 'fut' is cancelled first, as # 'fut.exception()' will *raise* a CancelledError # instead of returning it. - exc = fut._make_cancelled_error() + exc = fut._make_cancelled_error() outer.set_exception(exc) return else: @@ -795,15 +795,15 @@ def _gather(*coros_or_futures, loop=None, return_exceptions=False): for fut in children: if fut.cancelled(): - # Check if 'fut' is cancelled first, as 'fut.exception()' - # will *raise* a CancelledError instead of returning it. - # Also, since we're adding the exception return value - # to 'results' instead of raising it, don't bother - # setting __context__. This also lets us preserve - # calling '_make_cancelled_error()' at most once. - res = exceptions.CancelledError( - '' if fut._cancel_message is None else - fut._cancel_message) + # Check if 'fut' is cancelled first, as 'fut.exception()' + # will *raise* a CancelledError instead of returning it. + # Also, since we're adding the exception return value + # to 'results' instead of raising it, don't bother + # setting __context__. This also lets us preserve + # calling '_make_cancelled_error()' at most once. + res = exceptions.CancelledError( + '' if fut._cancel_message is None else + fut._cancel_message) else: res = fut.exception() if res is None: @@ -814,8 +814,8 @@ def _gather(*coros_or_futures, loop=None, return_exceptions=False): # If gather is being cancelled we must propagate the # cancellation regardless of *return_exceptions* argument. # See issue 32684. - exc = fut._make_cancelled_error() - outer.set_exception(exc) + exc = fut._make_cancelled_error() + outer.set_exception(exc) else: outer.set_result(results) @@ -875,10 +875,10 @@ def shield(arg, *, loop=None): except CancelledError: res = None """ - if loop is not None: - warnings.warn("The loop argument is deprecated since Python 3.8, " - "and scheduled for removal in Python 3.10.", - DeprecationWarning, stacklevel=2) + if loop is not None: + warnings.warn("The loop argument is deprecated since Python 3.8, " + "and scheduled for removal in Python 3.10.", + DeprecationWarning, stacklevel=2) inner = ensure_future(arg, loop=loop) if inner.done(): # Shortcut. @@ -886,7 +886,7 @@ def shield(arg, *, loop=None): loop = futures._get_loop(inner) outer = loop.create_future() - def _inner_done_callback(inner): + def _inner_done_callback(inner): if outer.cancelled(): if not inner.cancelled(): # Mark inner's result as retrieved. @@ -902,13 +902,13 @@ def shield(arg, *, loop=None): else: outer.set_result(inner.result()) - - def _outer_done_callback(outer): - if not inner.done(): - inner.remove_done_callback(_inner_done_callback) - - inner.add_done_callback(_inner_done_callback) - outer.add_done_callback(_outer_done_callback) + + def _outer_done_callback(outer): + if not inner.done(): + inner.remove_done_callback(_inner_done_callback) + + inner.add_done_callback(_inner_done_callback) + outer.add_done_callback(_outer_done_callback) return outer @@ -924,9 +924,9 @@ def run_coroutine_threadsafe(coro, loop): def callback(): try: futures._chain_future(ensure_future(coro, loop=loop), future) - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as exc: + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: if future.set_running_or_notify_cancel(): future.set_exception(exc) raise diff --git a/contrib/tools/python3/src/Lib/asyncio/threads.py b/contrib/tools/python3/src/Lib/asyncio/threads.py index cce2f05e10..db048a8231 100644 --- a/contrib/tools/python3/src/Lib/asyncio/threads.py +++ b/contrib/tools/python3/src/Lib/asyncio/threads.py @@ -1,25 +1,25 @@ -"""High-level support for working with threads in asyncio""" - -import functools -import contextvars - -from . import events - - -__all__ = "to_thread", - - -async def to_thread(func, /, *args, **kwargs): - """Asynchronously run function *func* in a separate thread. - - Any *args and **kwargs supplied for this function are directly passed - to *func*. Also, the current :class:`contextvars.Context` is propagated, - allowing context variables from the main thread to be accessed in the - separate thread. - - Return a coroutine that can be awaited to get the eventual result of *func*. - """ - loop = events.get_running_loop() - ctx = contextvars.copy_context() - func_call = functools.partial(ctx.run, func, *args, **kwargs) - return await loop.run_in_executor(None, func_call) +"""High-level support for working with threads in asyncio""" + +import functools +import contextvars + +from . import events + + +__all__ = "to_thread", + + +async def to_thread(func, /, *args, **kwargs): + """Asynchronously run function *func* in a separate thread. + + Any *args and **kwargs supplied for this function are directly passed + to *func*. Also, the current :class:`contextvars.Context` is propagated, + allowing context variables from the main thread to be accessed in the + separate thread. + + Return a coroutine that can be awaited to get the eventual result of *func*. + """ + loop = events.get_running_loop() + ctx = contextvars.copy_context() + func_call = functools.partial(ctx.run, func, *args, **kwargs) + return await loop.run_in_executor(None, func_call) diff --git a/contrib/tools/python3/src/Lib/asyncio/transports.py b/contrib/tools/python3/src/Lib/asyncio/transports.py index 06143ed829..45e155c94c 100644 --- a/contrib/tools/python3/src/Lib/asyncio/transports.py +++ b/contrib/tools/python3/src/Lib/asyncio/transports.py @@ -9,8 +9,8 @@ __all__ = ( class BaseTransport: """Base class for transports.""" - __slots__ = ('_extra',) - + __slots__ = ('_extra',) + def __init__(self, extra=None): if extra is None: extra = {} @@ -29,8 +29,8 @@ class BaseTransport: Buffered data will be flushed asynchronously. No more data will be received. After all buffered data is flushed, the - protocol's connection_lost() method will (eventually) be - called with None as its argument. + protocol's connection_lost() method will (eventually) be + called with None as its argument. """ raise NotImplementedError @@ -46,8 +46,8 @@ class BaseTransport: class ReadTransport(BaseTransport): """Interface for read-only transports.""" - __slots__ = () - + __slots__ = () + def is_reading(self): """Return True if the transport is receiving.""" raise NotImplementedError @@ -72,8 +72,8 @@ class ReadTransport(BaseTransport): class WriteTransport(BaseTransport): """Interface for write-only transports.""" - __slots__ = () - + __slots__ = () + def set_write_buffer_limits(self, high=None, low=None): """Set the high- and low-water limits for write flow control. @@ -160,14 +160,14 @@ class Transport(ReadTransport, WriteTransport): except writelines(), which calls write() in a loop. """ - __slots__ = () + __slots__ = () + - class DatagramTransport(BaseTransport): """Interface for datagram (UDP) transports.""" - __slots__ = () - + __slots__ = () + def sendto(self, data, addr=None): """Send data to the transport. @@ -190,8 +190,8 @@ class DatagramTransport(BaseTransport): class SubprocessTransport(BaseTransport): - __slots__ = () - + __slots__ = () + def get_pid(self): """Get subprocess id.""" raise NotImplementedError @@ -259,8 +259,8 @@ class _FlowControlMixin(Transport): resume_writing() may be called. """ - __slots__ = ('_loop', '_protocol_paused', '_high_water', '_low_water') - + __slots__ = ('_loop', '_protocol_paused', '_high_water', '_low_water') + def __init__(self, extra=None, loop=None): super().__init__(extra) assert loop is not None @@ -276,9 +276,9 @@ class _FlowControlMixin(Transport): self._protocol_paused = True try: self._protocol.pause_writing() - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as exc: + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: self._loop.call_exception_handler({ 'message': 'protocol.pause_writing() failed', 'exception': exc, @@ -292,9 +292,9 @@ class _FlowControlMixin(Transport): self._protocol_paused = False try: self._protocol.resume_writing() - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as exc: + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: self._loop.call_exception_handler({ 'message': 'protocol.resume_writing() failed', 'exception': exc, diff --git a/contrib/tools/python3/src/Lib/asyncio/trsock.py b/contrib/tools/python3/src/Lib/asyncio/trsock.py index 80ac049e85..e9ebcc3261 100644 --- a/contrib/tools/python3/src/Lib/asyncio/trsock.py +++ b/contrib/tools/python3/src/Lib/asyncio/trsock.py @@ -1,206 +1,206 @@ -import socket -import warnings - - -class TransportSocket: - - """A socket-like wrapper for exposing real transport sockets. - - These objects can be safely returned by APIs like - `transport.get_extra_info('socket')`. All potentially disruptive - operations (like "socket.close()") are banned. - """ - - __slots__ = ('_sock',) - - def __init__(self, sock: socket.socket): - self._sock = sock - - def _na(self, what): - warnings.warn( - f"Using {what} on sockets returned from get_extra_info('socket') " - f"will be prohibited in asyncio 3.9. Please report your use case " - f"to bugs.python.org.", - DeprecationWarning, source=self) - - @property - def family(self): - return self._sock.family - - @property - def type(self): - return self._sock.type - - @property - def proto(self): - return self._sock.proto - - def __repr__(self): - s = ( - f"<asyncio.TransportSocket fd={self.fileno()}, " - f"family={self.family!s}, type={self.type!s}, " - f"proto={self.proto}" - ) - - if self.fileno() != -1: - try: - laddr = self.getsockname() - if laddr: - s = f"{s}, laddr={laddr}" - except socket.error: - pass - try: - raddr = self.getpeername() - if raddr: - s = f"{s}, raddr={raddr}" - except socket.error: - pass - - return f"{s}>" - - def __getstate__(self): - raise TypeError("Cannot serialize asyncio.TransportSocket object") - - def fileno(self): - return self._sock.fileno() - - def dup(self): - return self._sock.dup() - - def get_inheritable(self): - return self._sock.get_inheritable() - - def shutdown(self, how): - # asyncio doesn't currently provide a high-level transport API - # to shutdown the connection. - self._sock.shutdown(how) - - def getsockopt(self, *args, **kwargs): - return self._sock.getsockopt(*args, **kwargs) - - def setsockopt(self, *args, **kwargs): - self._sock.setsockopt(*args, **kwargs) - - def getpeername(self): - return self._sock.getpeername() - - def getsockname(self): - return self._sock.getsockname() - - def getsockbyname(self): - return self._sock.getsockbyname() - - def accept(self): - self._na('accept() method') - return self._sock.accept() - - def connect(self, *args, **kwargs): - self._na('connect() method') - return self._sock.connect(*args, **kwargs) - - def connect_ex(self, *args, **kwargs): - self._na('connect_ex() method') - return self._sock.connect_ex(*args, **kwargs) - - def bind(self, *args, **kwargs): - self._na('bind() method') - return self._sock.bind(*args, **kwargs) - - def ioctl(self, *args, **kwargs): - self._na('ioctl() method') - return self._sock.ioctl(*args, **kwargs) - - def listen(self, *args, **kwargs): - self._na('listen() method') - return self._sock.listen(*args, **kwargs) - - def makefile(self): - self._na('makefile() method') - return self._sock.makefile() - - def sendfile(self, *args, **kwargs): - self._na('sendfile() method') - return self._sock.sendfile(*args, **kwargs) - - def close(self): - self._na('close() method') - return self._sock.close() - - def detach(self): - self._na('detach() method') - return self._sock.detach() - - def sendmsg_afalg(self, *args, **kwargs): - self._na('sendmsg_afalg() method') - return self._sock.sendmsg_afalg(*args, **kwargs) - - def sendmsg(self, *args, **kwargs): - self._na('sendmsg() method') - return self._sock.sendmsg(*args, **kwargs) - - def sendto(self, *args, **kwargs): - self._na('sendto() method') - return self._sock.sendto(*args, **kwargs) - - def send(self, *args, **kwargs): - self._na('send() method') - return self._sock.send(*args, **kwargs) - - def sendall(self, *args, **kwargs): - self._na('sendall() method') - return self._sock.sendall(*args, **kwargs) - - def set_inheritable(self, *args, **kwargs): - self._na('set_inheritable() method') - return self._sock.set_inheritable(*args, **kwargs) - - def share(self, process_id): - self._na('share() method') - return self._sock.share(process_id) - - def recv_into(self, *args, **kwargs): - self._na('recv_into() method') - return self._sock.recv_into(*args, **kwargs) - - def recvfrom_into(self, *args, **kwargs): - self._na('recvfrom_into() method') - return self._sock.recvfrom_into(*args, **kwargs) - - def recvmsg_into(self, *args, **kwargs): - self._na('recvmsg_into() method') - return self._sock.recvmsg_into(*args, **kwargs) - - def recvmsg(self, *args, **kwargs): - self._na('recvmsg() method') - return self._sock.recvmsg(*args, **kwargs) - - def recvfrom(self, *args, **kwargs): - self._na('recvfrom() method') - return self._sock.recvfrom(*args, **kwargs) - - def recv(self, *args, **kwargs): - self._na('recv() method') - return self._sock.recv(*args, **kwargs) - - def settimeout(self, value): - if value == 0: - return - raise ValueError( - 'settimeout(): only 0 timeout is allowed on transport sockets') - - def gettimeout(self): - return 0 - - def setblocking(self, flag): - if not flag: - return - raise ValueError( - 'setblocking(): transport sockets cannot be blocking') - - def __enter__(self): - self._na('context manager protocol') - return self._sock.__enter__() - - def __exit__(self, *err): - self._na('context manager protocol') - return self._sock.__exit__(*err) +import socket +import warnings + + +class TransportSocket: + + """A socket-like wrapper for exposing real transport sockets. + + These objects can be safely returned by APIs like + `transport.get_extra_info('socket')`. All potentially disruptive + operations (like "socket.close()") are banned. + """ + + __slots__ = ('_sock',) + + def __init__(self, sock: socket.socket): + self._sock = sock + + def _na(self, what): + warnings.warn( + f"Using {what} on sockets returned from get_extra_info('socket') " + f"will be prohibited in asyncio 3.9. Please report your use case " + f"to bugs.python.org.", + DeprecationWarning, source=self) + + @property + def family(self): + return self._sock.family + + @property + def type(self): + return self._sock.type + + @property + def proto(self): + return self._sock.proto + + def __repr__(self): + s = ( + f"<asyncio.TransportSocket fd={self.fileno()}, " + f"family={self.family!s}, type={self.type!s}, " + f"proto={self.proto}" + ) + + if self.fileno() != -1: + try: + laddr = self.getsockname() + if laddr: + s = f"{s}, laddr={laddr}" + except socket.error: + pass + try: + raddr = self.getpeername() + if raddr: + s = f"{s}, raddr={raddr}" + except socket.error: + pass + + return f"{s}>" + + def __getstate__(self): + raise TypeError("Cannot serialize asyncio.TransportSocket object") + + def fileno(self): + return self._sock.fileno() + + def dup(self): + return self._sock.dup() + + def get_inheritable(self): + return self._sock.get_inheritable() + + def shutdown(self, how): + # asyncio doesn't currently provide a high-level transport API + # to shutdown the connection. + self._sock.shutdown(how) + + def getsockopt(self, *args, **kwargs): + return self._sock.getsockopt(*args, **kwargs) + + def setsockopt(self, *args, **kwargs): + self._sock.setsockopt(*args, **kwargs) + + def getpeername(self): + return self._sock.getpeername() + + def getsockname(self): + return self._sock.getsockname() + + def getsockbyname(self): + return self._sock.getsockbyname() + + def accept(self): + self._na('accept() method') + return self._sock.accept() + + def connect(self, *args, **kwargs): + self._na('connect() method') + return self._sock.connect(*args, **kwargs) + + def connect_ex(self, *args, **kwargs): + self._na('connect_ex() method') + return self._sock.connect_ex(*args, **kwargs) + + def bind(self, *args, **kwargs): + self._na('bind() method') + return self._sock.bind(*args, **kwargs) + + def ioctl(self, *args, **kwargs): + self._na('ioctl() method') + return self._sock.ioctl(*args, **kwargs) + + def listen(self, *args, **kwargs): + self._na('listen() method') + return self._sock.listen(*args, **kwargs) + + def makefile(self): + self._na('makefile() method') + return self._sock.makefile() + + def sendfile(self, *args, **kwargs): + self._na('sendfile() method') + return self._sock.sendfile(*args, **kwargs) + + def close(self): + self._na('close() method') + return self._sock.close() + + def detach(self): + self._na('detach() method') + return self._sock.detach() + + def sendmsg_afalg(self, *args, **kwargs): + self._na('sendmsg_afalg() method') + return self._sock.sendmsg_afalg(*args, **kwargs) + + def sendmsg(self, *args, **kwargs): + self._na('sendmsg() method') + return self._sock.sendmsg(*args, **kwargs) + + def sendto(self, *args, **kwargs): + self._na('sendto() method') + return self._sock.sendto(*args, **kwargs) + + def send(self, *args, **kwargs): + self._na('send() method') + return self._sock.send(*args, **kwargs) + + def sendall(self, *args, **kwargs): + self._na('sendall() method') + return self._sock.sendall(*args, **kwargs) + + def set_inheritable(self, *args, **kwargs): + self._na('set_inheritable() method') + return self._sock.set_inheritable(*args, **kwargs) + + def share(self, process_id): + self._na('share() method') + return self._sock.share(process_id) + + def recv_into(self, *args, **kwargs): + self._na('recv_into() method') + return self._sock.recv_into(*args, **kwargs) + + def recvfrom_into(self, *args, **kwargs): + self._na('recvfrom_into() method') + return self._sock.recvfrom_into(*args, **kwargs) + + def recvmsg_into(self, *args, **kwargs): + self._na('recvmsg_into() method') + return self._sock.recvmsg_into(*args, **kwargs) + + def recvmsg(self, *args, **kwargs): + self._na('recvmsg() method') + return self._sock.recvmsg(*args, **kwargs) + + def recvfrom(self, *args, **kwargs): + self._na('recvfrom() method') + return self._sock.recvfrom(*args, **kwargs) + + def recv(self, *args, **kwargs): + self._na('recv() method') + return self._sock.recv(*args, **kwargs) + + def settimeout(self, value): + if value == 0: + return + raise ValueError( + 'settimeout(): only 0 timeout is allowed on transport sockets') + + def gettimeout(self): + return 0 + + def setblocking(self, flag): + if not flag: + return + raise ValueError( + 'setblocking(): transport sockets cannot be blocking') + + def __enter__(self): + self._na('context manager protocol') + return self._sock.__enter__() + + def __exit__(self, *err): + self._na('context manager protocol') + return self._sock.__exit__(*err) diff --git a/contrib/tools/python3/src/Lib/asyncio/unix_events.py b/contrib/tools/python3/src/Lib/asyncio/unix_events.py index b553e2010b..eecbc101ee 100644 --- a/contrib/tools/python3/src/Lib/asyncio/unix_events.py +++ b/contrib/tools/python3/src/Lib/asyncio/unix_events.py @@ -2,7 +2,7 @@ import errno import io -import itertools +import itertools import os import selectors import signal @@ -18,7 +18,7 @@ from . import base_subprocess from . import constants from . import coroutines from . import events -from . import exceptions +from . import exceptions from . import futures from . import selector_events from . import tasks @@ -29,9 +29,9 @@ from .log import logger __all__ = ( 'SelectorEventLoop', 'AbstractChildWatcher', 'SafeChildWatcher', - 'FastChildWatcher', 'PidfdChildWatcher', - 'MultiLoopChildWatcher', 'ThreadedChildWatcher', - 'DefaultEventLoopPolicy', + 'FastChildWatcher', 'PidfdChildWatcher', + 'MultiLoopChildWatcher', 'ThreadedChildWatcher', + 'DefaultEventLoopPolicy', ) @@ -101,7 +101,7 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): try: # Register a dummy signal handler to ask Python to write the signal - # number in the wakeup file descriptor. _process_self_data() will + # number in the wakeup file descriptor. _process_self_data() will # read signal numbers from this file descriptor to handle signals. signal.signal(sig, _sighandler_noop) @@ -171,8 +171,8 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): if not isinstance(sig, int): raise TypeError(f'sig must be an int, not {sig!r}') - if sig not in signal.valid_signals(): - raise ValueError(f'invalid signal number {sig}') + if sig not in signal.valid_signals(): + raise ValueError(f'invalid signal number {sig}') def _make_read_pipe_transport(self, pipe, protocol, waiter=None, extra=None): @@ -186,13 +186,13 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): stdin, stdout, stderr, bufsize, extra=None, **kwargs): with events.get_child_watcher() as watcher: - if not watcher.is_active(): - # Check early. - # Raising exception before process creation - # prevents subprocess execution if the watcher - # is not ready to handle it. - raise RuntimeError("asyncio.get_child_watcher() is not activated, " - "subprocess support is not installed.") + if not watcher.is_active(): + # Check early. + # Raising exception before process creation + # prevents subprocess execution if the watcher + # is not ready to handle it. + raise RuntimeError("asyncio.get_child_watcher() is not activated, " + "subprocess support is not installed.") waiter = self.create_future() transp = _UnixSubprocessTransport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, @@ -203,9 +203,9 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): self._child_watcher_callback, transp) try: await waiter - except (SystemExit, KeyboardInterrupt): - raise - except BaseException: + except (SystemExit, KeyboardInterrupt): + raise + except BaseException: transp.close() await transp._wait() raise @@ -323,24 +323,24 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): server._start_serving() # Skip one loop iteration so that all 'loop.add_reader' # go through. - await tasks.sleep(0) + await tasks.sleep(0) return server async def _sock_sendfile_native(self, sock, file, offset, count): try: os.sendfile - except AttributeError: - raise exceptions.SendfileNotAvailableError( + except AttributeError: + raise exceptions.SendfileNotAvailableError( "os.sendfile() is not available") try: fileno = file.fileno() except (AttributeError, io.UnsupportedOperation) as err: - raise exceptions.SendfileNotAvailableError("not a regular file") + raise exceptions.SendfileNotAvailableError("not a regular file") try: fsize = os.fstat(fileno).st_size - except OSError: - raise exceptions.SendfileNotAvailableError("not a regular file") + except OSError: + raise exceptions.SendfileNotAvailableError("not a regular file") blocksize = count if count else fsize if not blocksize: return 0 # empty file @@ -394,16 +394,16 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): # one being 'file' is not a regular mmap(2)-like # file, in which case we'll fall back on using # plain send(). - err = exceptions.SendfileNotAvailableError( + err = exceptions.SendfileNotAvailableError( "os.sendfile call failed") self._sock_sendfile_update_filepos(fileno, offset, total_sent) fut.set_exception(err) else: self._sock_sendfile_update_filepos(fileno, offset, total_sent) fut.set_exception(exc) - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as exc: + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: self._sock_sendfile_update_filepos(fileno, offset, total_sent) fut.set_exception(exc) else: @@ -445,7 +445,7 @@ class _UnixReadPipeTransport(transports.ReadTransport): self._fileno = pipe.fileno() self._protocol = protocol self._closing = False - self._paused = False + self._paused = False mode = os.fstat(self._fileno).st_mode if not (stat.S_ISFIFO(mode) or @@ -507,20 +507,20 @@ class _UnixReadPipeTransport(transports.ReadTransport): self._loop.call_soon(self._call_connection_lost, None) def pause_reading(self): - if self._closing or self._paused: - return - self._paused = True + if self._closing or self._paused: + return + self._paused = True self._loop._remove_reader(self._fileno) - if self._loop.get_debug(): - logger.debug("%r pauses reading", self) + if self._loop.get_debug(): + logger.debug("%r pauses reading", self) def resume_reading(self): - if self._closing or not self._paused: - return - self._paused = False + if self._closing or not self._paused: + return + self._paused = False self._loop._add_reader(self._fileno, self._read_ready) - if self._loop.get_debug(): - logger.debug("%r resumes reading", self) + if self._loop.get_debug(): + logger.debug("%r resumes reading", self) def set_protocol(self, protocol): self._protocol = protocol @@ -535,9 +535,9 @@ class _UnixReadPipeTransport(transports.ReadTransport): if not self._closing: self._close(None) - def __del__(self, _warn=warnings.warn): + def __del__(self, _warn=warnings.warn): if self._pipe is not None: - _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) + _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) self._pipe.close() def _fatal_error(self, exc, message='Fatal error on pipe transport'): @@ -665,9 +665,9 @@ class _UnixWritePipeTransport(transports._FlowControlMixin, n = os.write(self._fileno, data) except (BlockingIOError, InterruptedError): n = 0 - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as exc: + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: self._conn_lost += 1 self._fatal_error(exc, 'Fatal write error on pipe transport') return @@ -687,9 +687,9 @@ class _UnixWritePipeTransport(transports._FlowControlMixin, n = os.write(self._fileno, self._buffer) except (BlockingIOError, InterruptedError): pass - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as exc: + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: self._buffer.clear() self._conn_lost += 1 # Remove writer here, _fatal_error() doesn't it @@ -734,9 +734,9 @@ class _UnixWritePipeTransport(transports._FlowControlMixin, # write_eof is all what we needed to close the write pipe self.write_eof() - def __del__(self, _warn=warnings.warn): + def __del__(self, _warn=warnings.warn): if self._pipe is not None: - _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) + _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) self._pipe.close() def abort(self): @@ -744,7 +744,7 @@ class _UnixWritePipeTransport(transports._FlowControlMixin, def _fatal_error(self, exc, message='Fatal error on pipe transport'): # should be called by exception handler only - if isinstance(exc, OSError): + if isinstance(exc, OSError): if self._loop.get_debug(): logger.debug("%r: %s", self, message, exc_info=True) else: @@ -785,18 +785,18 @@ class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport): # other end). Notably this is needed on AIX, and works # just fine on other platforms. stdin, stdin_w = socket.socketpair() - try: - self._proc = subprocess.Popen( - args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, - universal_newlines=False, bufsize=bufsize, **kwargs) - if stdin_w is not None: - stdin.close() - self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize) - stdin_w = None - finally: - if stdin_w is not None: - stdin.close() - stdin_w.close() + try: + self._proc = subprocess.Popen( + args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, + universal_newlines=False, bufsize=bufsize, **kwargs) + if stdin_w is not None: + stdin.close() + self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize) + stdin_w = None + finally: + if stdin_w is not None: + stdin.close() + stdin_w.close() class AbstractChildWatcher: @@ -858,15 +858,15 @@ class AbstractChildWatcher: """ raise NotImplementedError() - def is_active(self): - """Return ``True`` if the watcher is active and is used by the event loop. - - Return True if the watcher is installed and ready to handle process exit - notifications. - - """ - raise NotImplementedError() - + def is_active(self): + """Return ``True`` if the watcher is active and is used by the event loop. + + Return True if the watcher is installed and ready to handle process exit + notifications. + + """ + raise NotImplementedError() + def __enter__(self): """Enter the watcher's context and allow starting new processes @@ -878,98 +878,98 @@ class AbstractChildWatcher: raise NotImplementedError() -class PidfdChildWatcher(AbstractChildWatcher): - """Child watcher implementation using Linux's pid file descriptors. - - This child watcher polls process file descriptors (pidfds) to await child - process termination. In some respects, PidfdChildWatcher is a "Goldilocks" - child watcher implementation. It doesn't require signals or threads, doesn't - interfere with any processes launched outside the event loop, and scales - linearly with the number of subprocesses launched by the event loop. The - main disadvantage is that pidfds are specific to Linux, and only work on - recent (5.3+) kernels. - """ - - def __init__(self): - self._loop = None - self._callbacks = {} - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_value, exc_traceback): - pass - - def is_active(self): - return self._loop is not None and self._loop.is_running() - - def close(self): - self.attach_loop(None) - - def attach_loop(self, loop): - if self._loop is not None and loop is None and self._callbacks: - warnings.warn( - 'A loop is being detached ' - 'from a child watcher with pending handlers', - RuntimeWarning) - for pidfd, _, _ in self._callbacks.values(): - self._loop._remove_reader(pidfd) - os.close(pidfd) - self._callbacks.clear() - self._loop = loop - - def add_child_handler(self, pid, callback, *args): - existing = self._callbacks.get(pid) - if existing is not None: - self._callbacks[pid] = existing[0], callback, args - else: - pidfd = os.pidfd_open(pid) - self._loop._add_reader(pidfd, self._do_wait, pid) - self._callbacks[pid] = pidfd, callback, args - - def _do_wait(self, pid): - pidfd, callback, args = self._callbacks.pop(pid) - self._loop._remove_reader(pidfd) - try: - _, status = os.waitpid(pid, 0) - except ChildProcessError: - # The child process is already reaped - # (may happen if waitpid() is called elsewhere). - returncode = 255 - logger.warning( - "child process pid %d exit status already read: " - " will report returncode 255", - pid) - else: - returncode = _compute_returncode(status) - - os.close(pidfd) - callback(pid, returncode, *args) - - def remove_child_handler(self, pid): - try: - pidfd, _, _ = self._callbacks.pop(pid) - except KeyError: - return False - self._loop._remove_reader(pidfd) - os.close(pidfd) - return True - - -def _compute_returncode(status): - if os.WIFSIGNALED(status): - # The child process died because of a signal. - return -os.WTERMSIG(status) - elif os.WIFEXITED(status): - # The child process exited (e.g sys.exit()). - return os.WEXITSTATUS(status) - else: - # The child exited, but we don't understand its status. - # This shouldn't happen, but if it does, let's just - # return that status; perhaps that helps debug it. - return status - - +class PidfdChildWatcher(AbstractChildWatcher): + """Child watcher implementation using Linux's pid file descriptors. + + This child watcher polls process file descriptors (pidfds) to await child + process termination. In some respects, PidfdChildWatcher is a "Goldilocks" + child watcher implementation. It doesn't require signals or threads, doesn't + interfere with any processes launched outside the event loop, and scales + linearly with the number of subprocesses launched by the event loop. The + main disadvantage is that pidfds are specific to Linux, and only work on + recent (5.3+) kernels. + """ + + def __init__(self): + self._loop = None + self._callbacks = {} + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, exc_traceback): + pass + + def is_active(self): + return self._loop is not None and self._loop.is_running() + + def close(self): + self.attach_loop(None) + + def attach_loop(self, loop): + if self._loop is not None and loop is None and self._callbacks: + warnings.warn( + 'A loop is being detached ' + 'from a child watcher with pending handlers', + RuntimeWarning) + for pidfd, _, _ in self._callbacks.values(): + self._loop._remove_reader(pidfd) + os.close(pidfd) + self._callbacks.clear() + self._loop = loop + + def add_child_handler(self, pid, callback, *args): + existing = self._callbacks.get(pid) + if existing is not None: + self._callbacks[pid] = existing[0], callback, args + else: + pidfd = os.pidfd_open(pid) + self._loop._add_reader(pidfd, self._do_wait, pid) + self._callbacks[pid] = pidfd, callback, args + + def _do_wait(self, pid): + pidfd, callback, args = self._callbacks.pop(pid) + self._loop._remove_reader(pidfd) + try: + _, status = os.waitpid(pid, 0) + except ChildProcessError: + # The child process is already reaped + # (may happen if waitpid() is called elsewhere). + returncode = 255 + logger.warning( + "child process pid %d exit status already read: " + " will report returncode 255", + pid) + else: + returncode = _compute_returncode(status) + + os.close(pidfd) + callback(pid, returncode, *args) + + def remove_child_handler(self, pid): + try: + pidfd, _, _ = self._callbacks.pop(pid) + except KeyError: + return False + self._loop._remove_reader(pidfd) + os.close(pidfd) + return True + + +def _compute_returncode(status): + if os.WIFSIGNALED(status): + # The child process died because of a signal. + return -os.WTERMSIG(status) + elif os.WIFEXITED(status): + # The child process exited (e.g sys.exit()). + return os.WEXITSTATUS(status) + else: + # The child exited, but we don't understand its status. + # This shouldn't happen, but if it does, let's just + # return that status; perhaps that helps debug it. + return status + + class BaseChildWatcher(AbstractChildWatcher): def __init__(self): @@ -979,9 +979,9 @@ class BaseChildWatcher(AbstractChildWatcher): def close(self): self.attach_loop(None) - def is_active(self): - return self._loop is not None and self._loop.is_running() - + def is_active(self): + return self._loop is not None and self._loop.is_running() + def _do_waitpid(self, expected_pid): raise NotImplementedError() @@ -1011,9 +1011,9 @@ class BaseChildWatcher(AbstractChildWatcher): def _sig_chld(self): try: self._do_waitpid_all() - except (SystemExit, KeyboardInterrupt): - raise - except BaseException as exc: + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: # self._loop should always be available here # as '_sig_chld' is added as a signal handler # in 'attach_loop' @@ -1080,7 +1080,7 @@ class SafeChildWatcher(BaseChildWatcher): # The child process is still alive. return - returncode = _compute_returncode(status) + returncode = _compute_returncode(status) if self._loop.get_debug(): logger.debug('process %s exited with returncode %s', expected_pid, returncode) @@ -1173,7 +1173,7 @@ class FastChildWatcher(BaseChildWatcher): # A child process is still alive. return - returncode = _compute_returncode(status) + returncode = _compute_returncode(status) with self._lock: try: @@ -1202,220 +1202,220 @@ class FastChildWatcher(BaseChildWatcher): callback(pid, returncode, *args) -class MultiLoopChildWatcher(AbstractChildWatcher): - """A watcher that doesn't require running loop in the main thread. - - This implementation registers a SIGCHLD signal handler on - instantiation (which may conflict with other code that - install own handler for this signal). - - The solution is safe but it has a significant overhead when - handling a big number of processes (*O(n)* each time a - SIGCHLD is received). - """ - - # Implementation note: - # The class keeps compatibility with AbstractChildWatcher ABC - # To achieve this it has empty attach_loop() method - # and doesn't accept explicit loop argument - # for add_child_handler()/remove_child_handler() - # but retrieves the current loop by get_running_loop() - - def __init__(self): - self._callbacks = {} - self._saved_sighandler = None - - def is_active(self): - return self._saved_sighandler is not None - - def close(self): - self._callbacks.clear() - if self._saved_sighandler is None: - return - - handler = signal.getsignal(signal.SIGCHLD) - if handler != self._sig_chld: - logger.warning("SIGCHLD handler was changed by outside code") - else: - signal.signal(signal.SIGCHLD, self._saved_sighandler) - self._saved_sighandler = None - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - pass - - def add_child_handler(self, pid, callback, *args): - loop = events.get_running_loop() - self._callbacks[pid] = (loop, callback, args) - - # Prevent a race condition in case the child is already terminated. - self._do_waitpid(pid) - - def remove_child_handler(self, pid): - try: - del self._callbacks[pid] - return True - except KeyError: - return False - - def attach_loop(self, loop): - # Don't save the loop but initialize itself if called first time - # The reason to do it here is that attach_loop() is called from - # unix policy only for the main thread. - # Main thread is required for subscription on SIGCHLD signal - if self._saved_sighandler is not None: - return - - self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld) - if self._saved_sighandler is None: - logger.warning("Previous SIGCHLD handler was set by non-Python code, " - "restore to default handler on watcher close.") - self._saved_sighandler = signal.SIG_DFL - - # Set SA_RESTART to limit EINTR occurrences. - signal.siginterrupt(signal.SIGCHLD, False) - - def _do_waitpid_all(self): - for pid in list(self._callbacks): - self._do_waitpid(pid) - - def _do_waitpid(self, expected_pid): - assert expected_pid > 0 - - try: - pid, status = os.waitpid(expected_pid, os.WNOHANG) - except ChildProcessError: - # The child process is already reaped - # (may happen if waitpid() is called elsewhere). - pid = expected_pid - returncode = 255 - logger.warning( - "Unknown child process pid %d, will report returncode 255", - pid) - debug_log = False - else: - if pid == 0: - # The child process is still alive. - return - - returncode = _compute_returncode(status) - debug_log = True - try: - loop, callback, args = self._callbacks.pop(pid) - except KeyError: # pragma: no cover - # May happen if .remove_child_handler() is called - # after os.waitpid() returns. - logger.warning("Child watcher got an unexpected pid: %r", - pid, exc_info=True) - else: - if loop.is_closed(): - logger.warning("Loop %r that handles pid %r is closed", loop, pid) - else: - if debug_log and loop.get_debug(): - logger.debug('process %s exited with returncode %s', - expected_pid, returncode) - loop.call_soon_threadsafe(callback, pid, returncode, *args) - - def _sig_chld(self, signum, frame): - try: - self._do_waitpid_all() - except (SystemExit, KeyboardInterrupt): - raise - except BaseException: - logger.warning('Unknown exception in SIGCHLD handler', exc_info=True) - - -class ThreadedChildWatcher(AbstractChildWatcher): - """Threaded child watcher implementation. - - The watcher uses a thread per process - for waiting for the process finish. - - It doesn't require subscription on POSIX signal - but a thread creation is not free. - - The watcher has O(1) complexity, its performance doesn't depend - on amount of spawn processes. - """ - - def __init__(self): - self._pid_counter = itertools.count(0) - self._threads = {} - - def is_active(self): - return True - - def close(self): - self._join_threads() - - def _join_threads(self): - """Internal: Join all non-daemon threads""" - threads = [thread for thread in list(self._threads.values()) - if thread.is_alive() and not thread.daemon] - for thread in threads: - thread.join() - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - pass - - def __del__(self, _warn=warnings.warn): - threads = [thread for thread in list(self._threads.values()) - if thread.is_alive()] - if threads: - _warn(f"{self.__class__} has registered but not finished child processes", - ResourceWarning, - source=self) - - def add_child_handler(self, pid, callback, *args): - loop = events.get_running_loop() - thread = threading.Thread(target=self._do_waitpid, - name=f"waitpid-{next(self._pid_counter)}", - args=(loop, pid, callback, args), - daemon=True) - self._threads[pid] = thread - thread.start() - - def remove_child_handler(self, pid): - # asyncio never calls remove_child_handler() !!! - # The method is no-op but is implemented because - # abstract base classes require it. - return True - - def attach_loop(self, loop): - pass - - def _do_waitpid(self, loop, expected_pid, callback, args): - assert expected_pid > 0 - - try: - pid, status = os.waitpid(expected_pid, 0) - except ChildProcessError: - # The child process is already reaped - # (may happen if waitpid() is called elsewhere). - pid = expected_pid - returncode = 255 - logger.warning( - "Unknown child process pid %d, will report returncode 255", - pid) - else: - returncode = _compute_returncode(status) - if loop.get_debug(): - logger.debug('process %s exited with returncode %s', - expected_pid, returncode) - - if loop.is_closed(): - logger.warning("Loop %r that handles pid %r is closed", loop, pid) - else: - loop.call_soon_threadsafe(callback, pid, returncode, *args) - - self._threads.pop(expected_pid) - - +class MultiLoopChildWatcher(AbstractChildWatcher): + """A watcher that doesn't require running loop in the main thread. + + This implementation registers a SIGCHLD signal handler on + instantiation (which may conflict with other code that + install own handler for this signal). + + The solution is safe but it has a significant overhead when + handling a big number of processes (*O(n)* each time a + SIGCHLD is received). + """ + + # Implementation note: + # The class keeps compatibility with AbstractChildWatcher ABC + # To achieve this it has empty attach_loop() method + # and doesn't accept explicit loop argument + # for add_child_handler()/remove_child_handler() + # but retrieves the current loop by get_running_loop() + + def __init__(self): + self._callbacks = {} + self._saved_sighandler = None + + def is_active(self): + return self._saved_sighandler is not None + + def close(self): + self._callbacks.clear() + if self._saved_sighandler is None: + return + + handler = signal.getsignal(signal.SIGCHLD) + if handler != self._sig_chld: + logger.warning("SIGCHLD handler was changed by outside code") + else: + signal.signal(signal.SIGCHLD, self._saved_sighandler) + self._saved_sighandler = None + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + def add_child_handler(self, pid, callback, *args): + loop = events.get_running_loop() + self._callbacks[pid] = (loop, callback, args) + + # Prevent a race condition in case the child is already terminated. + self._do_waitpid(pid) + + def remove_child_handler(self, pid): + try: + del self._callbacks[pid] + return True + except KeyError: + return False + + def attach_loop(self, loop): + # Don't save the loop but initialize itself if called first time + # The reason to do it here is that attach_loop() is called from + # unix policy only for the main thread. + # Main thread is required for subscription on SIGCHLD signal + if self._saved_sighandler is not None: + return + + self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld) + if self._saved_sighandler is None: + logger.warning("Previous SIGCHLD handler was set by non-Python code, " + "restore to default handler on watcher close.") + self._saved_sighandler = signal.SIG_DFL + + # Set SA_RESTART to limit EINTR occurrences. + signal.siginterrupt(signal.SIGCHLD, False) + + def _do_waitpid_all(self): + for pid in list(self._callbacks): + self._do_waitpid(pid) + + def _do_waitpid(self, expected_pid): + assert expected_pid > 0 + + try: + pid, status = os.waitpid(expected_pid, os.WNOHANG) + except ChildProcessError: + # The child process is already reaped + # (may happen if waitpid() is called elsewhere). + pid = expected_pid + returncode = 255 + logger.warning( + "Unknown child process pid %d, will report returncode 255", + pid) + debug_log = False + else: + if pid == 0: + # The child process is still alive. + return + + returncode = _compute_returncode(status) + debug_log = True + try: + loop, callback, args = self._callbacks.pop(pid) + except KeyError: # pragma: no cover + # May happen if .remove_child_handler() is called + # after os.waitpid() returns. + logger.warning("Child watcher got an unexpected pid: %r", + pid, exc_info=True) + else: + if loop.is_closed(): + logger.warning("Loop %r that handles pid %r is closed", loop, pid) + else: + if debug_log and loop.get_debug(): + logger.debug('process %s exited with returncode %s', + expected_pid, returncode) + loop.call_soon_threadsafe(callback, pid, returncode, *args) + + def _sig_chld(self, signum, frame): + try: + self._do_waitpid_all() + except (SystemExit, KeyboardInterrupt): + raise + except BaseException: + logger.warning('Unknown exception in SIGCHLD handler', exc_info=True) + + +class ThreadedChildWatcher(AbstractChildWatcher): + """Threaded child watcher implementation. + + The watcher uses a thread per process + for waiting for the process finish. + + It doesn't require subscription on POSIX signal + but a thread creation is not free. + + The watcher has O(1) complexity, its performance doesn't depend + on amount of spawn processes. + """ + + def __init__(self): + self._pid_counter = itertools.count(0) + self._threads = {} + + def is_active(self): + return True + + def close(self): + self._join_threads() + + def _join_threads(self): + """Internal: Join all non-daemon threads""" + threads = [thread for thread in list(self._threads.values()) + if thread.is_alive() and not thread.daemon] + for thread in threads: + thread.join() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + def __del__(self, _warn=warnings.warn): + threads = [thread for thread in list(self._threads.values()) + if thread.is_alive()] + if threads: + _warn(f"{self.__class__} has registered but not finished child processes", + ResourceWarning, + source=self) + + def add_child_handler(self, pid, callback, *args): + loop = events.get_running_loop() + thread = threading.Thread(target=self._do_waitpid, + name=f"waitpid-{next(self._pid_counter)}", + args=(loop, pid, callback, args), + daemon=True) + self._threads[pid] = thread + thread.start() + + def remove_child_handler(self, pid): + # asyncio never calls remove_child_handler() !!! + # The method is no-op but is implemented because + # abstract base classes require it. + return True + + def attach_loop(self, loop): + pass + + def _do_waitpid(self, loop, expected_pid, callback, args): + assert expected_pid > 0 + + try: + pid, status = os.waitpid(expected_pid, 0) + except ChildProcessError: + # The child process is already reaped + # (may happen if waitpid() is called elsewhere). + pid = expected_pid + returncode = 255 + logger.warning( + "Unknown child process pid %d, will report returncode 255", + pid) + else: + returncode = _compute_returncode(status) + if loop.get_debug(): + logger.debug('process %s exited with returncode %s', + expected_pid, returncode) + + if loop.is_closed(): + logger.warning("Loop %r that handles pid %r is closed", loop, pid) + else: + loop.call_soon_threadsafe(callback, pid, returncode, *args) + + self._threads.pop(expected_pid) + + class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): """UNIX event loop policy with a watcher for child processes.""" _loop_factory = _UnixSelectorEventLoop @@ -1427,8 +1427,8 @@ class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): def _init_watcher(self): with events._lock: if self._watcher is None: # pragma: no branch - self._watcher = ThreadedChildWatcher() - if threading.current_thread() is threading.main_thread(): + self._watcher = ThreadedChildWatcher() + if threading.current_thread() is threading.main_thread(): self._watcher.attach_loop(self._local._loop) def set_event_loop(self, loop): @@ -1442,13 +1442,13 @@ class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): super().set_event_loop(loop) if (self._watcher is not None and - threading.current_thread() is threading.main_thread()): + threading.current_thread() is threading.main_thread()): self._watcher.attach_loop(loop) def get_child_watcher(self): """Get the watcher for child processes. - If not yet set, a ThreadedChildWatcher object is automatically created. + If not yet set, a ThreadedChildWatcher object is automatically created. """ if self._watcher is None: self._init_watcher() diff --git a/contrib/tools/python3/src/Lib/asyncio/windows_events.py b/contrib/tools/python3/src/Lib/asyncio/windows_events.py index bf9e43b7a0..da81ab435b 100644 --- a/contrib/tools/python3/src/Lib/asyncio/windows_events.py +++ b/contrib/tools/python3/src/Lib/asyncio/windows_events.py @@ -1,10 +1,10 @@ """Selector and proactor event loops for Windows.""" -import sys - -if sys.platform != 'win32': # pragma: no cover - raise ImportError('win32 only') - +import sys + +if sys.platform != 'win32': # pragma: no cover + raise ImportError('win32 only') + import _overlapped import _winapi import errno @@ -18,7 +18,7 @@ import weakref from . import events from . import base_subprocess from . import futures -from . import exceptions +from . import exceptions from . import proactor_events from . import selector_events from . import tasks @@ -80,9 +80,9 @@ class _OverlappedFuture(futures.Future): self._loop.call_exception_handler(context) self._ov = None - def cancel(self, msg=None): + def cancel(self, msg=None): self._cancel_overlapped() - return super().cancel(msg=msg) + return super().cancel(msg=msg) def set_exception(self, exception): super().set_exception(exception) @@ -154,9 +154,9 @@ class _BaseWaitHandleFuture(futures.Future): self._unregister_wait_cb(None) - def cancel(self, msg=None): + def cancel(self, msg=None): self._unregister_wait() - return super().cancel(msg=msg) + return super().cancel(msg=msg) def set_exception(self, exception): self._unregister_wait() @@ -314,25 +314,25 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop): proactor = IocpProactor() super().__init__(proactor) - def run_forever(self): - try: - assert self._self_reading_future is None - self.call_soon(self._loop_self_reading) - super().run_forever() - finally: - if self._self_reading_future is not None: - ov = self._self_reading_future._ov - self._self_reading_future.cancel() - # self_reading_future was just cancelled so if it hasn't been - # finished yet, it never will be (it's possible that it has - # already finished and its callback is waiting in the queue, - # where it could still happen if the event loop is restarted). - # Unregister it otherwise IocpProactor.close will wait for it - # forever - if ov is not None: - self._proactor._unregister(ov) - self._self_reading_future = None - + def run_forever(self): + try: + assert self._self_reading_future is None + self.call_soon(self._loop_self_reading) + super().run_forever() + finally: + if self._self_reading_future is not None: + ov = self._self_reading_future._ov + self._self_reading_future.cancel() + # self_reading_future was just cancelled so if it hasn't been + # finished yet, it never will be (it's possible that it has + # already finished and its callback is waiting in the queue, + # where it could still happen if the event loop is restarted). + # Unregister it otherwise IocpProactor.close will wait for it + # forever + if ov is not None: + self._proactor._unregister(ov) + self._self_reading_future = None + async def create_pipe_connection(self, protocol_factory, address): f = self._proactor.connect_pipe(address) pipe = await f @@ -377,7 +377,7 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop): elif self._debug: logger.warning("Accept pipe failed on pipe %r", pipe, exc_info=True) - except exceptions.CancelledError: + except exceptions.CancelledError: if pipe: pipe.close() else: @@ -397,9 +397,9 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop): **kwargs) try: await waiter - except (SystemExit, KeyboardInterrupt): - raise - except BaseException: + except (SystemExit, KeyboardInterrupt): + raise + except BaseException: transp.close() await transp._wait() raise @@ -478,7 +478,7 @@ class IocpProactor: else: ov.ReadFileInto(conn.fileno(), buf) except BrokenPipeError: - return self._result(0) + return self._result(0) def finish_recv(trans, key, ov): try: @@ -492,44 +492,44 @@ class IocpProactor: return self._register(ov, conn, finish_recv) - def recvfrom(self, conn, nbytes, flags=0): - self._register_with_iocp(conn) - ov = _overlapped.Overlapped(NULL) - try: - ov.WSARecvFrom(conn.fileno(), nbytes, flags) - except BrokenPipeError: - return self._result((b'', None)) - - def finish_recv(trans, key, ov): - try: - return ov.getresult() - except OSError as exc: - if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, - _overlapped.ERROR_OPERATION_ABORTED): - raise ConnectionResetError(*exc.args) - else: - raise - - return self._register(ov, conn, finish_recv) - - def sendto(self, conn, buf, flags=0, addr=None): - self._register_with_iocp(conn) - ov = _overlapped.Overlapped(NULL) - - ov.WSASendTo(conn.fileno(), buf, flags, addr) - - def finish_send(trans, key, ov): - try: - return ov.getresult() - except OSError as exc: - if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, - _overlapped.ERROR_OPERATION_ABORTED): - raise ConnectionResetError(*exc.args) - else: - raise - - return self._register(ov, conn, finish_send) - + def recvfrom(self, conn, nbytes, flags=0): + self._register_with_iocp(conn) + ov = _overlapped.Overlapped(NULL) + try: + ov.WSARecvFrom(conn.fileno(), nbytes, flags) + except BrokenPipeError: + return self._result((b'', None)) + + def finish_recv(trans, key, ov): + try: + return ov.getresult() + except OSError as exc: + if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, + _overlapped.ERROR_OPERATION_ABORTED): + raise ConnectionResetError(*exc.args) + else: + raise + + return self._register(ov, conn, finish_recv) + + def sendto(self, conn, buf, flags=0, addr=None): + self._register_with_iocp(conn) + ov = _overlapped.Overlapped(NULL) + + ov.WSASendTo(conn.fileno(), buf, flags, addr) + + def finish_send(trans, key, ov): + try: + return ov.getresult() + except OSError as exc: + if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, + _overlapped.ERROR_OPERATION_ABORTED): + raise ConnectionResetError(*exc.args) + else: + raise + + return self._register(ov, conn, finish_send) + def send(self, conn, buf, flags=0): self._register_with_iocp(conn) ov = _overlapped.Overlapped(NULL) @@ -569,7 +569,7 @@ class IocpProactor: # Coroutine closing the accept socket if the future is cancelled try: await future - except exceptions.CancelledError: + except exceptions.CancelledError: conn.close() raise @@ -579,14 +579,14 @@ class IocpProactor: return future def connect(self, conn, address): - if conn.type == socket.SOCK_DGRAM: - # WSAConnect will complete immediately for UDP sockets so we don't - # need to register any IOCP operation - _overlapped.WSAConnect(conn.fileno(), address) - fut = self._loop.create_future() - fut.set_result(None) - return fut - + if conn.type == socket.SOCK_DGRAM: + # WSAConnect will complete immediately for UDP sockets so we don't + # need to register any IOCP operation + _overlapped.WSAConnect(conn.fileno(), address) + fut = self._loop.create_future() + fut.set_result(None) + return fut + self._register_with_iocp(conn) # The socket needs to be locally bound before we call ConnectEx(). try: @@ -662,7 +662,7 @@ class IocpProactor: # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY) - await tasks.sleep(delay) + await tasks.sleep(delay) return windows_utils.PipeHandle(handle) @@ -910,4 +910,4 @@ class WindowsProactorEventLoopPolicy(events.BaseDefaultEventLoopPolicy): _loop_factory = ProactorEventLoop -DefaultEventLoopPolicy = WindowsProactorEventLoopPolicy +DefaultEventLoopPolicy = WindowsProactorEventLoopPolicy diff --git a/contrib/tools/python3/src/Lib/asyncio/windows_utils.py b/contrib/tools/python3/src/Lib/asyncio/windows_utils.py index 1b1ce0e9cd..ef277fac3e 100644 --- a/contrib/tools/python3/src/Lib/asyncio/windows_utils.py +++ b/contrib/tools/python3/src/Lib/asyncio/windows_utils.py @@ -107,9 +107,9 @@ class PipeHandle: CloseHandle(self._handle) self._handle = None - def __del__(self, _warn=warnings.warn): + def __del__(self, _warn=warnings.warn): if self._handle is not None: - _warn(f"unclosed {self!r}", ResourceWarning, source=self) + _warn(f"unclosed {self!r}", ResourceWarning, source=self) self.close() def __enter__(self): |