diff options
author | AlexSm <alex@ydb.tech> | 2024-05-06 18:27:11 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-05-06 18:27:11 +0200 |
commit | 068e8291de67631f063304b76dda3c1fd6601c12 (patch) | |
tree | f9058c69ef88f04c55ff9c92949dffa8cd6b83a5 /contrib/tools/python3/Lib/asyncio | |
parent | 653a427438ab0fa69068180c34233b015af0d405 (diff) | |
parent | 41f0129e44731de1ba129fbae27008f8a4048fdc (diff) | |
download | ydb-068e8291de67631f063304b76dda3c1fd6601c12.tar.gz |
Merge pull request #4325 from ydb-platform/mergelibs-240506-1255
Library import 240506-1255
Diffstat (limited to 'contrib/tools/python3/Lib/asyncio')
-rw-r--r-- | contrib/tools/python3/Lib/asyncio/base_events.py | 20 | ||||
-rw-r--r-- | contrib/tools/python3/Lib/asyncio/tasks.py | 2 | ||||
-rw-r--r-- | contrib/tools/python3/Lib/asyncio/windows_events.py | 43 |
3 files changed, 36 insertions, 29 deletions
diff --git a/contrib/tools/python3/Lib/asyncio/base_events.py b/contrib/tools/python3/Lib/asyncio/base_events.py index c16c445bde..29eff0499c 100644 --- a/contrib/tools/python3/Lib/asyncio/base_events.py +++ b/contrib/tools/python3/Lib/asyncio/base_events.py @@ -45,6 +45,7 @@ from . import protocols from . import sslproto from . import staggered from . import tasks +from . import timeouts from . import transports from . import trsock from .log import logger @@ -596,23 +597,24 @@ class BaseEventLoop(events.AbstractEventLoop): thread = threading.Thread(target=self._do_shutdown, args=(future,)) thread.start() try: - await future - finally: - thread.join(timeout) - - if thread.is_alive(): + async with timeouts.timeout(timeout): + await future + except TimeoutError: warnings.warn("The executor did not finishing joining " - f"its threads within {timeout} seconds.", - RuntimeWarning, stacklevel=2) + f"its threads within {timeout} seconds.", + RuntimeWarning, stacklevel=2) self._default_executor.shutdown(wait=False) + else: + thread.join() def _do_shutdown(self, future): try: self._default_executor.shutdown(wait=True) if not self.is_closed(): - self.call_soon_threadsafe(future.set_result, None) + self.call_soon_threadsafe(futures._set_result_unless_cancelled, + future, None) except Exception as ex: - if not self.is_closed(): + if not self.is_closed() and not future.cancelled(): self.call_soon_threadsafe(future.set_exception, ex) def _check_running(self): diff --git a/contrib/tools/python3/Lib/asyncio/tasks.py b/contrib/tools/python3/Lib/asyncio/tasks.py index 65f2a6ef80..0b22e28d8e 100644 --- a/contrib/tools/python3/Lib/asyncio/tasks.py +++ b/contrib/tools/python3/Lib/asyncio/tasks.py @@ -480,7 +480,7 @@ async def wait_for(fut, timeout): If the wait is cancelled, the task is also cancelled. - If the task supresses the cancellation and returns a value instead, + If the task suppresses the cancellation and returns a value instead, that value is returned. This function is a coroutine. diff --git a/contrib/tools/python3/Lib/asyncio/windows_events.py b/contrib/tools/python3/Lib/asyncio/windows_events.py index c9a5fb841c..cb613451a5 100644 --- a/contrib/tools/python3/Lib/asyncio/windows_events.py +++ b/contrib/tools/python3/Lib/asyncio/windows_events.py @@ -8,6 +8,7 @@ if sys.platform != 'win32': # pragma: no cover import _overlapped import _winapi import errno +from functools import partial import math import msvcrt import socket @@ -323,13 +324,13 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop): if self._self_reading_future is not None: ov = self._self_reading_future._ov self._self_reading_future.cancel() - # self_reading_future was just cancelled so if it hasn't been - # finished yet, it never will be (it's possible that it has - # already finished and its callback is waiting in the queue, - # where it could still happen if the event loop is restarted). - # Unregister it otherwise IocpProactor.close will wait for it - # forever - if ov is not None: + # self_reading_future always uses IOCP, so even though it's + # been cancelled, we need to make sure that the IOCP message + # is received so that the kernel is not holding on to the + # memory, possibly causing memory corruption later. Only + # unregister it if IO is complete in all respects. Otherwise + # we need another _poll() later to complete the IO. + if ov is not None and not ov.pending: self._proactor._unregister(ov) self._self_reading_future = None @@ -466,6 +467,18 @@ class IocpProactor: else: raise + @classmethod + def _finish_recvfrom(cls, trans, key, ov, *, empty_result): + try: + return cls.finish_socket_func(trans, key, ov) + except OSError as exc: + # WSARecvFrom will report ERROR_PORT_UNREACHABLE when the same + # socket is used to send to an address that is not listening. + if exc.winerror == _overlapped.ERROR_PORT_UNREACHABLE: + return empty_result, None + else: + raise + def recv(self, conn, nbytes, flags=0): self._register_with_iocp(conn) ov = _overlapped.Overlapped(NULL) @@ -500,7 +513,8 @@ class IocpProactor: except BrokenPipeError: return self._result((b'', None)) - return self._register(ov, conn, self.finish_socket_func) + return self._register(ov, conn, partial(self._finish_recvfrom, + empty_result=b'')) def recvfrom_into(self, conn, buf, flags=0): self._register_with_iocp(conn) @@ -510,17 +524,8 @@ class IocpProactor: except BrokenPipeError: return self._result((0, None)) - def finish_recv(trans, key, ov): - try: - return ov.getresult() - except OSError as exc: - if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, - _overlapped.ERROR_OPERATION_ABORTED): - raise ConnectionResetError(*exc.args) - else: - raise - - return self._register(ov, conn, finish_recv) + return self._register(ov, conn, partial(self._finish_recvfrom, + empty_result=0)) def sendto(self, conn, buf, flags=0, addr=None): self._register_with_iocp(conn) |