aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/tools/python3/Lib/asyncio
diff options
context:
space:
mode:
authorAlexSm <alex@ydb.tech>2024-05-06 18:27:11 +0200
committerGitHub <noreply@github.com>2024-05-06 18:27:11 +0200
commit068e8291de67631f063304b76dda3c1fd6601c12 (patch)
treef9058c69ef88f04c55ff9c92949dffa8cd6b83a5 /contrib/tools/python3/Lib/asyncio
parent653a427438ab0fa69068180c34233b015af0d405 (diff)
parent41f0129e44731de1ba129fbae27008f8a4048fdc (diff)
downloadydb-068e8291de67631f063304b76dda3c1fd6601c12.tar.gz
Merge pull request #4325 from ydb-platform/mergelibs-240506-1255
Library import 240506-1255
Diffstat (limited to 'contrib/tools/python3/Lib/asyncio')
-rw-r--r--contrib/tools/python3/Lib/asyncio/base_events.py20
-rw-r--r--contrib/tools/python3/Lib/asyncio/tasks.py2
-rw-r--r--contrib/tools/python3/Lib/asyncio/windows_events.py43
3 files changed, 36 insertions, 29 deletions
diff --git a/contrib/tools/python3/Lib/asyncio/base_events.py b/contrib/tools/python3/Lib/asyncio/base_events.py
index c16c445bde..29eff0499c 100644
--- a/contrib/tools/python3/Lib/asyncio/base_events.py
+++ b/contrib/tools/python3/Lib/asyncio/base_events.py
@@ -45,6 +45,7 @@ from . import protocols
from . import sslproto
from . import staggered
from . import tasks
+from . import timeouts
from . import transports
from . import trsock
from .log import logger
@@ -596,23 +597,24 @@ class BaseEventLoop(events.AbstractEventLoop):
thread = threading.Thread(target=self._do_shutdown, args=(future,))
thread.start()
try:
- await future
- finally:
- thread.join(timeout)
-
- if thread.is_alive():
+ async with timeouts.timeout(timeout):
+ await future
+ except TimeoutError:
warnings.warn("The executor did not finishing joining "
- f"its threads within {timeout} seconds.",
- RuntimeWarning, stacklevel=2)
+ f"its threads within {timeout} seconds.",
+ RuntimeWarning, stacklevel=2)
self._default_executor.shutdown(wait=False)
+ else:
+ thread.join()
def _do_shutdown(self, future):
try:
self._default_executor.shutdown(wait=True)
if not self.is_closed():
- self.call_soon_threadsafe(future.set_result, None)
+ self.call_soon_threadsafe(futures._set_result_unless_cancelled,
+ future, None)
except Exception as ex:
- if not self.is_closed():
+ if not self.is_closed() and not future.cancelled():
self.call_soon_threadsafe(future.set_exception, ex)
def _check_running(self):
diff --git a/contrib/tools/python3/Lib/asyncio/tasks.py b/contrib/tools/python3/Lib/asyncio/tasks.py
index 65f2a6ef80..0b22e28d8e 100644
--- a/contrib/tools/python3/Lib/asyncio/tasks.py
+++ b/contrib/tools/python3/Lib/asyncio/tasks.py
@@ -480,7 +480,7 @@ async def wait_for(fut, timeout):
If the wait is cancelled, the task is also cancelled.
- If the task supresses the cancellation and returns a value instead,
+ If the task suppresses the cancellation and returns a value instead,
that value is returned.
This function is a coroutine.
diff --git a/contrib/tools/python3/Lib/asyncio/windows_events.py b/contrib/tools/python3/Lib/asyncio/windows_events.py
index c9a5fb841c..cb613451a5 100644
--- a/contrib/tools/python3/Lib/asyncio/windows_events.py
+++ b/contrib/tools/python3/Lib/asyncio/windows_events.py
@@ -8,6 +8,7 @@ if sys.platform != 'win32': # pragma: no cover
import _overlapped
import _winapi
import errno
+from functools import partial
import math
import msvcrt
import socket
@@ -323,13 +324,13 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
if self._self_reading_future is not None:
ov = self._self_reading_future._ov
self._self_reading_future.cancel()
- # self_reading_future was just cancelled so if it hasn't been
- # finished yet, it never will be (it's possible that it has
- # already finished and its callback is waiting in the queue,
- # where it could still happen if the event loop is restarted).
- # Unregister it otherwise IocpProactor.close will wait for it
- # forever
- if ov is not None:
+ # self_reading_future always uses IOCP, so even though it's
+ # been cancelled, we need to make sure that the IOCP message
+ # is received so that the kernel is not holding on to the
+ # memory, possibly causing memory corruption later. Only
+ # unregister it if IO is complete in all respects. Otherwise
+ # we need another _poll() later to complete the IO.
+ if ov is not None and not ov.pending:
self._proactor._unregister(ov)
self._self_reading_future = None
@@ -466,6 +467,18 @@ class IocpProactor:
else:
raise
+ @classmethod
+ def _finish_recvfrom(cls, trans, key, ov, *, empty_result):
+ try:
+ return cls.finish_socket_func(trans, key, ov)
+ except OSError as exc:
+ # WSARecvFrom will report ERROR_PORT_UNREACHABLE when the same
+ # socket is used to send to an address that is not listening.
+ if exc.winerror == _overlapped.ERROR_PORT_UNREACHABLE:
+ return empty_result, None
+ else:
+ raise
+
def recv(self, conn, nbytes, flags=0):
self._register_with_iocp(conn)
ov = _overlapped.Overlapped(NULL)
@@ -500,7 +513,8 @@ class IocpProactor:
except BrokenPipeError:
return self._result((b'', None))
- return self._register(ov, conn, self.finish_socket_func)
+ return self._register(ov, conn, partial(self._finish_recvfrom,
+ empty_result=b''))
def recvfrom_into(self, conn, buf, flags=0):
self._register_with_iocp(conn)
@@ -510,17 +524,8 @@ class IocpProactor:
except BrokenPipeError:
return self._result((0, None))
- def finish_recv(trans, key, ov):
- try:
- return ov.getresult()
- except OSError as exc:
- if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
- _overlapped.ERROR_OPERATION_ABORTED):
- raise ConnectionResetError(*exc.args)
- else:
- raise
-
- return self._register(ov, conn, finish_recv)
+ return self._register(ov, conn, partial(self._finish_recvfrom,
+ empty_result=0))
def sendto(self, conn, buf, flags=0, addr=None):
self._register_with_iocp(conn)