summaryrefslogtreecommitdiffstats
path: root/contrib/tools/python3/Lib/multiprocessing/resource_tracker.py
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/tools/python3/Lib/multiprocessing/resource_tracker.py')
-rw-r--r--contrib/tools/python3/Lib/multiprocessing/resource_tracker.py332
1 files changed, 241 insertions, 91 deletions
diff --git a/contrib/tools/python3/Lib/multiprocessing/resource_tracker.py b/contrib/tools/python3/Lib/multiprocessing/resource_tracker.py
index 23fea295c35..22e3bbcf21b 100644
--- a/contrib/tools/python3/Lib/multiprocessing/resource_tracker.py
+++ b/contrib/tools/python3/Lib/multiprocessing/resource_tracker.py
@@ -15,11 +15,15 @@
# this resource tracker process, "killall python" would probably leave unlinked
# resources.
+import base64
import os
import signal
import sys
import threading
import warnings
+from collections import deque
+
+import json
from . import spawn
from . import util
@@ -29,8 +33,12 @@ __all__ = ['ensure_running', 'register', 'unregister']
_HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask')
_IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM)
+def cleanup_noop(name):
+ raise RuntimeError('noop should never be registered or cleaned up')
+
_CLEANUP_FUNCS = {
- 'noop': lambda: None,
+ 'noop': cleanup_noop,
+ 'dummy': lambda name: None, # Dummy resource used in tests
}
if os.name == 'posix':
@@ -61,6 +69,15 @@ class ResourceTracker(object):
self._lock = threading.RLock()
self._fd = None
self._pid = None
+ self._exitcode = None
+ self._reentrant_messages = deque()
+
+ # True to use colon-separated lines, rather than JSON lines,
+ # for internal communication. (Mainly for testing).
+ # Filenames not supported by the simple format will always be sent
+ # using JSON.
+ # The reader should understand all formats.
+ self._use_simple_format = True
def _reentrant_call_error(self):
# gh-109629: this happens if an explicit call to the ResourceTracker
@@ -70,22 +87,58 @@ class ResourceTracker(object):
raise ReentrantCallError(
"Reentrant call into the multiprocessing resource tracker")
- def _stop(self):
- with self._lock:
- # This should not happen (_stop() isn't called by a finalizer)
- # but we check for it anyway.
- if self._lock._recursion_count() > 1:
- return self._reentrant_call_error()
- if self._fd is None:
- # not running
- return
+ def __del__(self):
+ # making sure child processess are cleaned before ResourceTracker
+ # gets destructed.
+ # see https://github.com/python/cpython/issues/88887
+ self._stop(use_blocking_lock=False)
- # closing the "alive" file descriptor stops main()
- os.close(self._fd)
- self._fd = None
+ def _stop(self, use_blocking_lock=True):
+ if use_blocking_lock:
+ with self._lock:
+ self._stop_locked()
+ else:
+ acquired = self._lock.acquire(blocking=False)
+ try:
+ self._stop_locked()
+ finally:
+ if acquired:
+ self._lock.release()
- os.waitpid(self._pid, 0)
+ def _stop_locked(
+ self,
+ close=os.close,
+ waitpid=os.waitpid,
+ waitstatus_to_exitcode=os.waitstatus_to_exitcode,
+ ):
+ # This shouldn't happen (it might when called by a finalizer)
+ # so we check for it anyway.
+ if self._lock._recursion_count() > 1:
+ raise self._reentrant_call_error()
+ if self._fd is None:
+ # not running
+ return
+ if self._pid is None:
+ return
+
+ # closing the "alive" file descriptor stops main()
+ close(self._fd)
+ self._fd = None
+
+ try:
+ _, status = waitpid(self._pid, 0)
+ except ChildProcessError:
self._pid = None
+ self._exitcode = None
+ return
+
+ self._pid = None
+
+ try:
+ self._exitcode = waitstatus_to_exitcode(status)
+ except ValueError:
+ # os.waitstatus_to_exitcode may raise an exception for invalid values
+ self._exitcode = None
def getfd(self):
self.ensure_running()
@@ -96,75 +149,119 @@ class ResourceTracker(object):
This can be run from any process. Usually a child process will use
the resource created by its parent.'''
+ return self._ensure_running_and_write()
+
+ def _teardown_dead_process(self):
+ os.close(self._fd)
+
+ # Clean-up to avoid dangling processes.
+ try:
+ # _pid can be None if this process is a child from another
+ # python process, which has started the resource_tracker.
+ if self._pid is not None:
+ os.waitpid(self._pid, 0)
+ except ChildProcessError:
+ # The resource_tracker has already been terminated.
+ pass
+ self._fd = None
+ self._pid = None
+ self._exitcode = None
+
+ warnings.warn('resource_tracker: process died unexpectedly, '
+ 'relaunching. Some resources might leak.')
+
+ def _launch(self):
+ fds_to_pass = []
+ try:
+ fds_to_pass.append(sys.stderr.fileno())
+ except Exception:
+ pass
+ r, w = os.pipe()
+ try:
+ fds_to_pass.append(r)
+ # process will out live us, so no need to wait on pid
+ exe = spawn.get_executable()
+ args = [
+ exe,
+ *util._args_from_interpreter_flags(),
+ '-c',
+ f'from multiprocessing.resource_tracker import main;main({r})',
+ ]
+ # bpo-33613: Register a signal mask that will block the signals.
+ # This signal mask will be inherited by the child that is going
+ # to be spawned and will protect the child from a race condition
+ # that can make the child die before it registers signal handlers
+ # for SIGINT and SIGTERM. The mask is unregistered after spawning
+ # the child.
+ prev_sigmask = None
+ try:
+ if _HAVE_SIGMASK:
+ prev_sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
+ pid = util.spawnv_passfds(exe, args, fds_to_pass)
+ finally:
+ if prev_sigmask is not None:
+ signal.pthread_sigmask(signal.SIG_SETMASK, prev_sigmask)
+ except:
+ os.close(w)
+ raise
+ else:
+ self._fd = w
+ self._pid = pid
+ finally:
+ os.close(r)
+
+ def _make_probe_message(self):
+ """Return a probe message."""
+ if self._use_simple_format:
+ return b'PROBE:0:noop\n'
+ return (
+ json.dumps(
+ {"cmd": "PROBE", "rtype": "noop"},
+ ensure_ascii=True,
+ separators=(",", ":"),
+ )
+ + "\n"
+ ).encode("ascii")
+
+ def _ensure_running_and_write(self, msg=None):
with self._lock:
if self._lock._recursion_count() > 1:
# The code below is certainly not reentrant-safe, so bail out
- return self._reentrant_call_error()
+ if msg is None:
+ raise self._reentrant_call_error()
+ return self._reentrant_messages.append(msg)
+
if self._fd is not None:
# resource tracker was launched before, is it still running?
- if self._check_alive():
- # => still alive
- return
- # => dead, launch it again
- os.close(self._fd)
-
- # Clean-up to avoid dangling processes.
+ if msg is None:
+ to_send = self._make_probe_message()
+ else:
+ to_send = msg
try:
- # _pid can be None if this process is a child from another
- # python process, which has started the resource_tracker.
- if self._pid is not None:
- os.waitpid(self._pid, 0)
- except ChildProcessError:
- # The resource_tracker has already been terminated.
- pass
- self._fd = None
- self._pid = None
+ self._write(to_send)
+ except OSError:
+ self._teardown_dead_process()
+ self._launch()
- warnings.warn('resource_tracker: process died unexpectedly, '
- 'relaunching. Some resources might leak.')
+ msg = None # message was sent in probe
+ else:
+ self._launch()
- fds_to_pass = []
- try:
- fds_to_pass.append(sys.stderr.fileno())
- except Exception:
- pass
- cmd = 'from multiprocessing.resource_tracker import main;main(%d)'
- r, w = os.pipe()
+ while True:
try:
- fds_to_pass.append(r)
- # process will out live us, so no need to wait on pid
- exe = spawn.get_executable()
- args = [exe] + util._args_from_interpreter_flags()
- args += ['-c', cmd % r]
- # bpo-33613: Register a signal mask that will block the signals.
- # This signal mask will be inherited by the child that is going
- # to be spawned and will protect the child from a race condition
- # that can make the child die before it registers signal handlers
- # for SIGINT and SIGTERM. The mask is unregistered after spawning
- # the child.
- prev_sigmask = None
- try:
- if _HAVE_SIGMASK:
- prev_sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
- pid = util.spawnv_passfds(exe, args, fds_to_pass)
- finally:
- if prev_sigmask is not None:
- signal.pthread_sigmask(signal.SIG_SETMASK, prev_sigmask)
- except:
- os.close(w)
- raise
- else:
- self._fd = w
- self._pid = pid
- finally:
- os.close(r)
+ reentrant_msg = self._reentrant_messages.popleft()
+ except IndexError:
+ break
+ self._write(reentrant_msg)
+ if msg is not None:
+ self._write(msg)
def _check_alive(self):
'''Check that the pipe has not been closed by sending a probe.'''
try:
# We cannot use send here as it calls ensure_running, creating
# a cycle.
- os.write(self._fd, b'PROBE:0:noop\n')
+ os.write(self._fd, self._make_probe_message())
except OSError:
return False
else:
@@ -178,27 +275,42 @@ class ResourceTracker(object):
'''Unregister name of resource with resource tracker.'''
self._send('UNREGISTER', name, rtype)
- def _send(self, cmd, name, rtype):
- try:
- self.ensure_running()
- except ReentrantCallError:
- # The code below might or might not work, depending on whether
- # the resource tracker was already running and still alive.
- # Better warn the user.
- # (XXX is warnings.warn itself reentrant-safe? :-)
- warnings.warn(
- f"ResourceTracker called reentrantly for resource cleanup, "
- f"which is unsupported. "
- f"The {rtype} object {name!r} might leak.")
- msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii')
- if len(msg) > 512:
- # posix guarantees that writes to a pipe of less than PIPE_BUF
- # bytes are atomic, and that PIPE_BUF >= 512
- raise ValueError('msg too long')
+ def _write(self, msg):
nbytes = os.write(self._fd, msg)
- assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format(
- nbytes, len(msg))
+ assert nbytes == len(msg), f"{nbytes=} != {len(msg)=}"
+
+ def _send(self, cmd, name, rtype):
+ if self._use_simple_format and '\n' not in name:
+ msg = f"{cmd}:{name}:{rtype}\n".encode("ascii")
+ if len(msg) > 512:
+ # posix guarantees that writes to a pipe of less than PIPE_BUF
+ # bytes are atomic, and that PIPE_BUF >= 512
+ raise ValueError('msg too long')
+ self._ensure_running_and_write(msg)
+ return
+
+ # POSIX guarantees that writes to a pipe of less than PIPE_BUF (512 on Linux)
+ # bytes are atomic. Therefore, we want the message to be shorter than 512 bytes.
+ # POSIX shm_open() and sem_open() require the name, including its leading slash,
+ # to be at most NAME_MAX bytes (255 on Linux)
+ # With json.dump(..., ensure_ascii=True) every non-ASCII byte becomes a 6-char
+ # escape like \uDC80.
+ # As we want the overall message to be kept atomic and therefore smaller than 512,
+ # we encode encode the raw name bytes with URL-safe Base64 - so a 255 long name
+ # will not exceed 340 bytes.
+ b = name.encode('utf-8', 'surrogateescape')
+ if len(b) > 255:
+ raise ValueError('shared memory name too long (max 255 bytes)')
+ b64 = base64.urlsafe_b64encode(b).decode('ascii')
+
+ payload = {"cmd": cmd, "rtype": rtype, "base64_name": b64}
+ msg = (json.dumps(payload, ensure_ascii=True, separators=(",", ":")) + "\n").encode("ascii")
+
+ # The entire JSON message is guaranteed < PIPE_BUF (512 bytes) by construction.
+ assert len(msg) <= 512, f"internal error: message too long ({len(msg)} bytes)"
+ assert msg.startswith(b'{')
+ self._ensure_running_and_write(msg)
_resource_tracker = ResourceTracker()
ensure_running = _resource_tracker.ensure_running
@@ -207,6 +319,30 @@ unregister = _resource_tracker.unregister
getfd = _resource_tracker.getfd
+def _decode_message(line):
+ if line.startswith(b'{'):
+ try:
+ obj = json.loads(line.decode('ascii'))
+ except Exception as e:
+ raise ValueError("malformed resource_tracker message: %r" % (line,)) from e
+
+ cmd = obj["cmd"]
+ rtype = obj["rtype"]
+ b64 = obj.get("base64_name", "")
+
+ if not isinstance(cmd, str) or not isinstance(rtype, str) or not isinstance(b64, str):
+ raise ValueError("malformed resource_tracker fields: %r" % (obj,))
+
+ try:
+ name = base64.urlsafe_b64decode(b64).decode('utf-8', 'surrogateescape')
+ except ValueError as e:
+ raise ValueError("malformed resource_tracker base64_name: %r" % (b64,)) from e
+ else:
+ cmd, rest = line.strip().decode('ascii').split(':', maxsplit=1)
+ name, rtype = rest.rsplit(':', maxsplit=1)
+ return cmd, rtype, name
+
+
def main(fd):
'''Run resource tracker.'''
# protect the process from ^C and "killall python" etc
@@ -222,12 +358,14 @@ def main(fd):
pass
cache = {rtype: set() for rtype in _CLEANUP_FUNCS.keys()}
+ exit_code = 0
+
try:
# keep track of registered/unregistered resources
with open(fd, 'rb') as f:
for line in f:
try:
- cmd, name, rtype = line.strip().decode('ascii').split(':')
+ cmd, rtype, name = _decode_message(line)
cleanup_func = _CLEANUP_FUNCS.get(rtype, None)
if cleanup_func is None:
raise ValueError(
@@ -243,6 +381,7 @@ def main(fd):
else:
raise RuntimeError('unrecognized command %r' % cmd)
except Exception:
+ exit_code = 3
try:
sys.excepthook(*sys.exc_info())
except:
@@ -252,9 +391,17 @@ def main(fd):
for rtype, rtype_cache in cache.items():
if rtype_cache:
try:
- warnings.warn('resource_tracker: There appear to be %d '
- 'leaked %s objects to clean up at shutdown' %
- (len(rtype_cache), rtype))
+ exit_code = 1
+ if rtype == 'dummy':
+ # The test 'dummy' resource is expected to leak.
+ # We skip the warning (and *only* the warning) for it.
+ pass
+ else:
+ warnings.warn(
+ f'resource_tracker: There appear to be '
+ f'{len(rtype_cache)} leaked {rtype} objects to '
+ f'clean up at shutdown: {rtype_cache}'
+ )
except Exception:
pass
for name in rtype_cache:
@@ -265,6 +412,9 @@ def main(fd):
try:
_CLEANUP_FUNCS[rtype](name)
except Exception as e:
+ exit_code = 2
warnings.warn('resource_tracker: %r: %s' % (name, e))
finally:
pass
+
+ sys.exit(exit_code)