diff options
| author | shadchin <[email protected]> | 2023-10-03 23:32:21 +0300 |
|---|---|---|
| committer | shadchin <[email protected]> | 2023-10-03 23:48:51 +0300 |
| commit | 01ffd024041ac933854c367fb8d1b5682d19883f (patch) | |
| tree | b70aa497ba132a133ccece49f7763427dcd0743f /contrib/tools/python3/src/Lib/multiprocessing | |
| parent | a33fdb9a34581fd124e92535153b1f1fdeca6aaf (diff) | |
Update Python 3 to 3.11.6
Diffstat (limited to 'contrib/tools/python3/src/Lib/multiprocessing')
4 files changed, 63 insertions, 7 deletions
diff --git a/contrib/tools/python3/src/Lib/multiprocessing/connection.py b/contrib/tools/python3/src/Lib/multiprocessing/connection.py index b08144f7a1a..8b81f9954e4 100644 --- a/contrib/tools/python3/src/Lib/multiprocessing/connection.py +++ b/contrib/tools/python3/src/Lib/multiprocessing/connection.py @@ -9,6 +9,7 @@ __all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ] +import errno import io import os import sys @@ -271,12 +272,22 @@ if _winapi: with FILE_FLAG_OVERLAPPED. """ _got_empty_message = False + _send_ov = None def _close(self, _CloseHandle=_winapi.CloseHandle): + ov = self._send_ov + if ov is not None: + # Interrupt WaitForMultipleObjects() in _send_bytes() + ov.cancel() _CloseHandle(self._handle) def _send_bytes(self, buf): + if self._send_ov is not None: + # A connection should only be used by a single thread + raise ValueError("concurrent send_bytes() calls " + "are not supported") ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True) + self._send_ov = ov try: if err == _winapi.ERROR_IO_PENDING: waitres = _winapi.WaitForMultipleObjects( @@ -286,7 +297,13 @@ if _winapi: ov.cancel() raise finally: + self._send_ov = None nwritten, err = ov.GetOverlappedResult(True) + if err == _winapi.ERROR_OPERATION_ABORTED: + # close() was called by another thread while + # WaitForMultipleObjects() was waiting for the overlapped + # operation. + raise OSError(errno.EPIPE, "handle is closed") assert err == 0 assert nwritten == len(buf) diff --git a/contrib/tools/python3/src/Lib/multiprocessing/popen_spawn_win32.py b/contrib/tools/python3/src/Lib/multiprocessing/popen_spawn_win32.py index 5f25eaf68ec..f968968189b 100644 --- a/contrib/tools/python3/src/Lib/multiprocessing/popen_spawn_win32.py +++ b/contrib/tools/python3/src/Lib/multiprocessing/popen_spawn_win32.py @@ -14,6 +14,7 @@ __all__ = ['Popen'] # # +# Exit code used by Popen.terminate() TERMINATE = 0x10000 WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False)) WINSERVICE = sys.executable.lower().endswith("pythonservice.exe") @@ -123,9 +124,15 @@ class Popen(object): if self.returncode is None: try: _winapi.TerminateProcess(int(self._handle), TERMINATE) - except OSError: - if self.wait(timeout=1.0) is None: + except PermissionError: + # ERROR_ACCESS_DENIED (winerror 5) is received when the + # process already died. + code = _winapi.GetExitCodeProcess(int(self._handle)) + if code == _winapi.STILL_ACTIVE: raise + self.returncode = code + else: + self.returncode = -signal.SIGTERM kill = terminate diff --git a/contrib/tools/python3/src/Lib/multiprocessing/resource_tracker.py b/contrib/tools/python3/src/Lib/multiprocessing/resource_tracker.py index ea369507297..79e96ecf324 100644 --- a/contrib/tools/python3/src/Lib/multiprocessing/resource_tracker.py +++ b/contrib/tools/python3/src/Lib/multiprocessing/resource_tracker.py @@ -51,15 +51,31 @@ if os.name == 'posix': }) +class ReentrantCallError(RuntimeError): + pass + + class ResourceTracker(object): def __init__(self): - self._lock = threading.Lock() + self._lock = threading.RLock() self._fd = None self._pid = None + def _reentrant_call_error(self): + # gh-109629: this happens if an explicit call to the ResourceTracker + # gets interrupted by a garbage collection, invoking a finalizer (*) + # that itself calls back into ResourceTracker. + # (*) for example the SemLock finalizer + raise ReentrantCallError( + "Reentrant call into the multiprocessing resource tracker") + def _stop(self): with self._lock: + # This should not happen (_stop() isn't called by a finalizer) + # but we check for it anyway. + if self._lock._recursion_count() > 1: + return self._reentrant_call_error() if self._fd is None: # not running return @@ -81,6 +97,9 @@ class ResourceTracker(object): This can be run from any process. Usually a child process will use the resource created by its parent.''' with self._lock: + if self._lock._recursion_count() > 1: + # The code below is certainly not reentrant-safe, so bail out + return self._reentrant_call_error() if self._fd is not None: # resource tracker was launched before, is it still running? if self._check_alive(): @@ -159,7 +178,17 @@ class ResourceTracker(object): self._send('UNREGISTER', name, rtype) def _send(self, cmd, name, rtype): - self.ensure_running() + try: + self.ensure_running() + except ReentrantCallError: + # The code below might or might not work, depending on whether + # the resource tracker was already running and still alive. + # Better warn the user. + # (XXX is warnings.warn itself reentrant-safe? :-) + warnings.warn( + f"ResourceTracker called reentrantly for resource cleanup, " + f"which is unsupported. " + f"The {rtype} object {name!r} might leak.") msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii') if len(msg) > 512: # posix guarantees that writes to a pipe of less than PIPE_BUF @@ -176,6 +205,7 @@ register = _resource_tracker.register unregister = _resource_tracker.unregister getfd = _resource_tracker.getfd + def main(fd): '''Run resource tracker.''' # protect the process from ^C and "killall python" etc diff --git a/contrib/tools/python3/src/Lib/multiprocessing/synchronize.py b/contrib/tools/python3/src/Lib/multiprocessing/synchronize.py index 2328d332123..3ccbfe311c7 100644 --- a/contrib/tools/python3/src/Lib/multiprocessing/synchronize.py +++ b/contrib/tools/python3/src/Lib/multiprocessing/synchronize.py @@ -50,8 +50,8 @@ class SemLock(object): def __init__(self, kind, value, maxvalue, *, ctx): if ctx is None: ctx = context._default_context.get_context() - self.is_fork_ctx = ctx.get_start_method() == 'fork' - unlink_now = sys.platform == 'win32' or self.is_fork_ctx + self._is_fork_ctx = ctx.get_start_method() == 'fork' + unlink_now = sys.platform == 'win32' or self._is_fork_ctx for i in range(100): try: sl = self._semlock = _multiprocessing.SemLock( @@ -103,7 +103,7 @@ class SemLock(object): if sys.platform == 'win32': h = context.get_spawning_popen().duplicate_for_child(sl.handle) else: - if self.is_fork_ctx: + if self._is_fork_ctx: raise RuntimeError('A SemLock created in a fork context is being ' 'shared with a process in a spawn context. This is ' 'not supported. Please use the same context to create ' @@ -115,6 +115,8 @@ class SemLock(object): self._semlock = _multiprocessing.SemLock._rebuild(*state) util.debug('recreated blocker with handle %r' % state[0]) self._make_methods() + # Ensure that deserialized SemLock can be serialized again (gh-108520). + self._is_fork_ctx = False @staticmethod def _make_name(): |
