diff options
author | shadchin <shadchin@yandex-team.ru> | 2022-02-10 16:44:30 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:44:30 +0300 |
commit | 2598ef1d0aee359b4b6d5fdd1758916d5907d04f (patch) | |
tree | 012bb94d777798f1f56ac1cec429509766d05181 /contrib/tools/python3/src/Lib/multiprocessing | |
parent | 6751af0b0c1b952fede40b19b71da8025b5d8bcf (diff) | |
download | ydb-2598ef1d0aee359b4b6d5fdd1758916d5907d04f.tar.gz |
Restoring authorship annotation for <shadchin@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/tools/python3/src/Lib/multiprocessing')
21 files changed, 1535 insertions, 1535 deletions
diff --git a/contrib/tools/python3/src/Lib/multiprocessing/__init__.py b/contrib/tools/python3/src/Lib/multiprocessing/__init__.py index 8336f381de..d412235024 100644 --- a/contrib/tools/python3/src/Lib/multiprocessing/__init__.py +++ b/contrib/tools/python3/src/Lib/multiprocessing/__init__.py @@ -19,8 +19,8 @@ from . import context # Copy stuff from default context # -__all__ = [x for x in dir(context._default_context) if not x.startswith('_')] -globals().update((name, getattr(context._default_context, name)) for name in __all__) +__all__ = [x for x in dir(context._default_context) if not x.startswith('_')] +globals().update((name, getattr(context._default_context, name)) for name in __all__) # # XXX These should not really be documented or public. diff --git a/contrib/tools/python3/src/Lib/multiprocessing/connection.py b/contrib/tools/python3/src/Lib/multiprocessing/connection.py index 510e4b5aba..850c317da5 100644 --- a/contrib/tools/python3/src/Lib/multiprocessing/connection.py +++ b/contrib/tools/python3/src/Lib/multiprocessing/connection.py @@ -73,11 +73,11 @@ def arbitrary_address(family): if family == 'AF_INET': return ('localhost', 0) elif family == 'AF_UNIX': - # Prefer abstract sockets if possible to avoid problems with the address - # size. When coding portable applications, some implementations have - # sun_path as short as 92 bytes in the sockaddr_un struct. - if util.abstract_sockets_supported: - return f"\0listener-{os.getpid()}-{next(_mmap_counter)}" + # Prefer abstract sockets if possible to avoid problems with the address + # size. When coding portable applications, some implementations have + # sun_path as short as 92 bytes in the sockaddr_un struct. + if util.abstract_sockets_supported: + return f"\0listener-{os.getpid()}-{next(_mmap_counter)}" return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir()) elif family == 'AF_PIPE': return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' % @@ -107,7 +107,7 @@ def address_type(address): return 'AF_INET' elif type(address) is str and address.startswith('\\\\'): return 'AF_PIPE' - elif type(address) is str or util.is_abstract_socket_namespace(address): + elif type(address) is str or util.is_abstract_socket_namespace(address): return 'AF_UNIX' else: raise ValueError('address type of %r unrecognized' % address) @@ -394,33 +394,33 @@ class Connection(_ConnectionBase): def _send_bytes(self, buf): n = len(buf) - if n > 0x7fffffff: - pre_header = struct.pack("!i", -1) - header = struct.pack("!Q", n) - self._send(pre_header) + if n > 0x7fffffff: + pre_header = struct.pack("!i", -1) + header = struct.pack("!Q", n) + self._send(pre_header) self._send(header) self._send(buf) else: - # For wire compatibility with 3.7 and lower - header = struct.pack("!i", n) - if n > 16384: - # The payload is large so Nagle's algorithm won't be triggered - # and we'd better avoid the cost of concatenation. - self._send(header) - self._send(buf) - else: - # Issue #20540: concatenate before sending, to avoid delays due - # to Nagle's algorithm on a TCP socket. - # Also note we want to avoid sending a 0-length buffer separately, - # to avoid "broken pipe" errors if the other end closed the pipe. - self._send(header + buf) + # For wire compatibility with 3.7 and lower + header = struct.pack("!i", n) + if n > 16384: + # The payload is large so Nagle's algorithm won't be triggered + # and we'd better avoid the cost of concatenation. + self._send(header) + self._send(buf) + else: + # Issue #20540: concatenate before sending, to avoid delays due + # to Nagle's algorithm on a TCP socket. + # Also note we want to avoid sending a 0-length buffer separately, + # to avoid "broken pipe" errors if the other end closed the pipe. + self._send(header + buf) def _recv_bytes(self, maxsize=None): buf = self._recv(4) size, = struct.unpack("!i", buf.getvalue()) - if size == -1: - buf = self._recv(8) - size, = struct.unpack("!Q", buf.getvalue()) + if size == -1: + buf = self._recv(8) + size, = struct.unpack("!Q", buf.getvalue()) if maxsize is not None and size > maxsize: return None return self._recv(size) @@ -602,8 +602,8 @@ class SocketListener(object): self._family = family self._last_accepted = None - if family == 'AF_UNIX' and not util.is_abstract_socket_namespace(address): - # Linux abstract socket namespaces do not need to be explicitly unlinked + if family == 'AF_UNIX' and not util.is_abstract_socket_namespace(address): + # Linux abstract socket namespaces do not need to be explicitly unlinked self._unlink = util.Finalize( self, os.unlink, args=(address,), exitpriority=0 ) diff --git a/contrib/tools/python3/src/Lib/multiprocessing/context.py b/contrib/tools/python3/src/Lib/multiprocessing/context.py index 8d0525d5d6..eb1917bcbf 100644 --- a/contrib/tools/python3/src/Lib/multiprocessing/context.py +++ b/contrib/tools/python3/src/Lib/multiprocessing/context.py @@ -5,7 +5,7 @@ import threading from . import process from . import reduction -__all__ = () +__all__ = () # # Exceptions @@ -24,7 +24,7 @@ class AuthenticationError(ProcessError): pass # -# Base type for contexts. Bound methods of an instance of this type are included in __all__ of __init__.py +# Base type for contexts. Bound methods of an instance of this type are included in __all__ of __init__.py # class BaseContext(object): @@ -35,7 +35,7 @@ class BaseContext(object): AuthenticationError = AuthenticationError current_process = staticmethod(process.current_process) - parent_process = staticmethod(process.parent_process) + parent_process = staticmethod(process.parent_process) active_children = staticmethod(process.active_children) def cpu_count(self): @@ -257,10 +257,10 @@ class DefaultContext(BaseContext): if sys.platform == 'win32': return ['spawn'] else: - methods = ['spawn', 'fork'] if sys.platform == 'darwin' else ['fork', 'spawn'] + methods = ['spawn', 'fork'] if sys.platform == 'darwin' else ['fork', 'spawn'] if reduction.HAVE_SEND_HANDLE: - methods.append('forkserver') - return methods + methods.append('forkserver') + return methods # @@ -310,12 +310,12 @@ if sys.platform != 'win32': 'spawn': SpawnContext(), 'forkserver': ForkServerContext(), } - if sys.platform == 'darwin': - # bpo-33725: running arbitrary code after fork() is no longer reliable - # on macOS since macOS 10.14 (Mojave). Use spawn by default instead. - _default_context = DefaultContext(_concrete_contexts['spawn']) - else: - _default_context = DefaultContext(_concrete_contexts['fork']) + if sys.platform == 'darwin': + # bpo-33725: running arbitrary code after fork() is no longer reliable + # on macOS since macOS 10.14 (Mojave). Use spawn by default instead. + _default_context = DefaultContext(_concrete_contexts['spawn']) + else: + _default_context = DefaultContext(_concrete_contexts['fork']) else: diff --git a/contrib/tools/python3/src/Lib/multiprocessing/dummy/__init__.py b/contrib/tools/python3/src/Lib/multiprocessing/dummy/__init__.py index 6a1468609e..1ecf8376d3 100644 --- a/contrib/tools/python3/src/Lib/multiprocessing/dummy/__init__.py +++ b/contrib/tools/python3/src/Lib/multiprocessing/dummy/__init__.py @@ -80,7 +80,7 @@ def freeze_support(): # class Namespace(object): - def __init__(self, /, **kwds): + def __init__(self, /, **kwds): self.__dict__.update(kwds) def __repr__(self): items = list(self.__dict__.items()) diff --git a/contrib/tools/python3/src/Lib/multiprocessing/forkserver.py b/contrib/tools/python3/src/Lib/multiprocessing/forkserver.py index 22a911a7a2..45193a1235 100644 --- a/contrib/tools/python3/src/Lib/multiprocessing/forkserver.py +++ b/contrib/tools/python3/src/Lib/multiprocessing/forkserver.py @@ -11,7 +11,7 @@ import warnings from . import connection from . import process from .context import reduction -from . import resource_tracker +from . import resource_tracker from . import spawn from . import util @@ -39,26 +39,26 @@ class ForkServer(object): self._lock = threading.Lock() self._preload_modules = ['__main__'] - def _stop(self): - # Method used by unit tests to stop the server - with self._lock: - self._stop_unlocked() - - def _stop_unlocked(self): - if self._forkserver_pid is None: - return - - # close the "alive" file descriptor asks the server to stop - os.close(self._forkserver_alive_fd) - self._forkserver_alive_fd = None - - os.waitpid(self._forkserver_pid, 0) - self._forkserver_pid = None - - if not util.is_abstract_socket_namespace(self._forkserver_address): - os.unlink(self._forkserver_address) - self._forkserver_address = None - + def _stop(self): + # Method used by unit tests to stop the server + with self._lock: + self._stop_unlocked() + + def _stop_unlocked(self): + if self._forkserver_pid is None: + return + + # close the "alive" file descriptor asks the server to stop + os.close(self._forkserver_alive_fd) + self._forkserver_alive_fd = None + + os.waitpid(self._forkserver_pid, 0) + self._forkserver_pid = None + + if not util.is_abstract_socket_namespace(self._forkserver_address): + os.unlink(self._forkserver_address) + self._forkserver_address = None + def set_forkserver_preload(self, modules_names): '''Set list of module names to try to load in forkserver process.''' if not all(type(mod) is str for mod in self._preload_modules): @@ -89,7 +89,7 @@ class ForkServer(object): parent_r, child_w = os.pipe() child_r, parent_w = os.pipe() allfds = [child_r, child_w, self._forkserver_alive_fd, - resource_tracker.getfd()] + resource_tracker.getfd()] allfds += fds try: reduction.sendfds(client, allfds) @@ -110,7 +110,7 @@ class ForkServer(object): ensure_running() will do nothing. ''' with self._lock: - resource_tracker.ensure_running() + resource_tracker.ensure_running() if self._forkserver_pid is not None: # forkserver was launched before, is it still running? pid, status = os.waitpid(self._forkserver_pid, os.WNOHANG) @@ -136,8 +136,8 @@ class ForkServer(object): with socket.socket(socket.AF_UNIX) as listener: address = connection.arbitrary_address('AF_UNIX') listener.bind(address) - if not util.is_abstract_socket_namespace(address): - os.chmod(address, 0o600) + if not util.is_abstract_socket_namespace(address): + os.chmod(address, 0o600) listener.listen() # all client processes own the write end of the "alive" pipe; @@ -237,8 +237,8 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None): break child_w = pid_to_fd.pop(pid, None) if child_w is not None: - returncode = os.waitstatus_to_exitcode(sts) - + returncode = os.waitstatus_to_exitcode(sts) + # Send exit code to client process try: write_signed(child_w, returncode) @@ -305,12 +305,12 @@ def _serve_one(child_r, fds, unused_fds, handlers): os.close(fd) (_forkserver._forkserver_alive_fd, - resource_tracker._resource_tracker._fd, + resource_tracker._resource_tracker._fd, *_forkserver._inherited_fds) = fds # Run process object received over pipe - parent_sentinel = os.dup(child_r) - code = spawn._main(child_r, parent_sentinel) + parent_sentinel = os.dup(child_r) + code = spawn._main(child_r, parent_sentinel) return code diff --git a/contrib/tools/python3/src/Lib/multiprocessing/heap.py b/contrib/tools/python3/src/Lib/multiprocessing/heap.py index 6217dfe126..909c1e1df9 100644 --- a/contrib/tools/python3/src/Lib/multiprocessing/heap.py +++ b/contrib/tools/python3/src/Lib/multiprocessing/heap.py @@ -8,7 +8,7 @@ # import bisect -from collections import defaultdict +from collections import defaultdict import mmap import os import sys @@ -29,9 +29,9 @@ if sys.platform == 'win32': import _winapi class Arena(object): - """ - A shared memory area backed by anonymous memory (Windows). - """ + """ + A shared memory area backed by anonymous memory (Windows). + """ _rand = tempfile._RandomNameSequence() @@ -56,7 +56,7 @@ if sys.platform == 'win32': def __setstate__(self, state): self.size, self.name = self._state = state - # Reopen existing mmap + # Reopen existing mmap self.buffer = mmap.mmap(-1, self.size, tagname=self.name) # XXX Temporarily preventing buildbot failures while determining # XXX the correct long-term fix. See issue 23060 @@ -65,10 +65,10 @@ if sys.platform == 'win32': else: class Arena(object): - """ - A shared memory area backed by a temporary file (POSIX). - """ - + """ + A shared memory area backed by a temporary file (POSIX). + """ + if sys.platform == 'linux': _dir_candidates = ['/dev/shm'] else: @@ -78,8 +78,8 @@ else: self.size = size self.fd = fd if fd == -1: - # Arena is created anew (if fd != -1, it means we're coming - # from rebuild_arena() below) + # Arena is created anew (if fd != -1, it means we're coming + # from rebuild_arena() below) self.fd, name = tempfile.mkstemp( prefix='pym-%d-'%os.getpid(), dir=self._choose_dir(size)) @@ -114,82 +114,82 @@ else: class Heap(object): - # Minimum malloc() alignment + # Minimum malloc() alignment _alignment = 8 - _DISCARD_FREE_SPACE_LARGER_THAN = 4 * 1024 ** 2 # 4 MB - _DOUBLE_ARENA_SIZE_UNTIL = 4 * 1024 ** 2 - + _DISCARD_FREE_SPACE_LARGER_THAN = 4 * 1024 ** 2 # 4 MB + _DOUBLE_ARENA_SIZE_UNTIL = 4 * 1024 ** 2 + def __init__(self, size=mmap.PAGESIZE): self._lastpid = os.getpid() self._lock = threading.Lock() - # Current arena allocation size + # Current arena allocation size self._size = size - # A sorted list of available block sizes in arenas + # A sorted list of available block sizes in arenas self._lengths = [] - - # Free block management: - # - map each block size to a list of `(Arena, start, stop)` blocks + + # Free block management: + # - map each block size to a list of `(Arena, start, stop)` blocks self._len_to_seq = {} - # - map `(Arena, start)` tuple to the `(Arena, start, stop)` block - # starting at that offset + # - map `(Arena, start)` tuple to the `(Arena, start, stop)` block + # starting at that offset self._start_to_block = {} - # - map `(Arena, stop)` tuple to the `(Arena, start, stop)` block - # ending at that offset + # - map `(Arena, stop)` tuple to the `(Arena, start, stop)` block + # ending at that offset self._stop_to_block = {} - - # Map arenas to their `(Arena, start, stop)` blocks in use - self._allocated_blocks = defaultdict(set) + + # Map arenas to their `(Arena, start, stop)` blocks in use + self._allocated_blocks = defaultdict(set) self._arenas = [] - - # List of pending blocks to free - see comment in free() below + + # List of pending blocks to free - see comment in free() below self._pending_free_blocks = [] - # Statistics - self._n_mallocs = 0 - self._n_frees = 0 - + # Statistics + self._n_mallocs = 0 + self._n_frees = 0 + @staticmethod def _roundup(n, alignment): # alignment must be a power of 2 mask = alignment - 1 return (n + mask) & ~mask - def _new_arena(self, size): - # Create a new arena with at least the given *size* - length = self._roundup(max(self._size, size), mmap.PAGESIZE) - # We carve larger and larger arenas, for efficiency, until we - # reach a large-ish size (roughly L3 cache-sized) - if self._size < self._DOUBLE_ARENA_SIZE_UNTIL: - self._size *= 2 - util.info('allocating a new mmap of length %d', length) - arena = Arena(length) - self._arenas.append(arena) - return (arena, 0, length) - - def _discard_arena(self, arena): - # Possibly delete the given (unused) arena - length = arena.size - # Reusing an existing arena is faster than creating a new one, so - # we only reclaim space if it's large enough. - if length < self._DISCARD_FREE_SPACE_LARGER_THAN: - return - blocks = self._allocated_blocks.pop(arena) - assert not blocks - del self._start_to_block[(arena, 0)] - del self._stop_to_block[(arena, length)] - self._arenas.remove(arena) - seq = self._len_to_seq[length] - seq.remove((arena, 0, length)) - if not seq: - del self._len_to_seq[length] - self._lengths.remove(length) - + def _new_arena(self, size): + # Create a new arena with at least the given *size* + length = self._roundup(max(self._size, size), mmap.PAGESIZE) + # We carve larger and larger arenas, for efficiency, until we + # reach a large-ish size (roughly L3 cache-sized) + if self._size < self._DOUBLE_ARENA_SIZE_UNTIL: + self._size *= 2 + util.info('allocating a new mmap of length %d', length) + arena = Arena(length) + self._arenas.append(arena) + return (arena, 0, length) + + def _discard_arena(self, arena): + # Possibly delete the given (unused) arena + length = arena.size + # Reusing an existing arena is faster than creating a new one, so + # we only reclaim space if it's large enough. + if length < self._DISCARD_FREE_SPACE_LARGER_THAN: + return + blocks = self._allocated_blocks.pop(arena) + assert not blocks + del self._start_to_block[(arena, 0)] + del self._stop_to_block[(arena, length)] + self._arenas.remove(arena) + seq = self._len_to_seq[length] + seq.remove((arena, 0, length)) + if not seq: + del self._len_to_seq[length] + self._lengths.remove(length) + def _malloc(self, size): # returns a large enough block -- it might be much larger i = bisect.bisect_left(self._lengths, size) if i == len(self._lengths): - return self._new_arena(size) + return self._new_arena(size) else: length = self._lengths[i] seq = self._len_to_seq[length] @@ -202,8 +202,8 @@ class Heap(object): del self._stop_to_block[(arena, stop)] return block - def _add_free_block(self, block): - # make block available and try to merge with its neighbours in the arena + def _add_free_block(self, block): + # make block available and try to merge with its neighbours in the arena (arena, start, stop) = block try: @@ -247,14 +247,14 @@ class Heap(object): return start, stop - def _remove_allocated_block(self, block): - arena, start, stop = block - blocks = self._allocated_blocks[arena] - blocks.remove((start, stop)) - if not blocks: - # Arena is entirely free, discard it from this process - self._discard_arena(arena) - + def _remove_allocated_block(self, block): + arena, start, stop = block + blocks = self._allocated_blocks[arena] + blocks.remove((start, stop)) + if not blocks: + # Arena is entirely free, discard it from this process + self._discard_arena(arena) + def _free_pending_blocks(self): # Free all the blocks in the pending list - called with the lock held. while True: @@ -262,8 +262,8 @@ class Heap(object): block = self._pending_free_blocks.pop() except IndexError: break - self._add_free_block(block) - self._remove_allocated_block(block) + self._add_free_block(block) + self._remove_allocated_block(block) def free(self, block): # free a block returned by malloc() @@ -274,7 +274,7 @@ class Heap(object): # immediately, the block is added to a list of blocks to be freed # synchronously sometimes later from malloc() or free(), by calling # _free_pending_blocks() (appending and retrieving from a list is not - # strictly thread-safe but under CPython it's atomic thanks to the GIL). + # strictly thread-safe but under CPython it's atomic thanks to the GIL). if os.getpid() != self._lastpid: raise ValueError( "My pid ({0:n}) is not last pid {1:n}".format( @@ -286,10 +286,10 @@ class Heap(object): else: # we hold the lock try: - self._n_frees += 1 + self._n_frees += 1 self._free_pending_blocks() - self._add_free_block(block) - self._remove_allocated_block(block) + self._add_free_block(block) + self._remove_allocated_block(block) finally: self._lock.release() @@ -302,21 +302,21 @@ class Heap(object): if os.getpid() != self._lastpid: self.__init__() # reinitialize after fork with self._lock: - self._n_mallocs += 1 - # allow pending blocks to be marked available + self._n_mallocs += 1 + # allow pending blocks to be marked available self._free_pending_blocks() - size = self._roundup(max(size, 1), self._alignment) + size = self._roundup(max(size, 1), self._alignment) (arena, start, stop) = self._malloc(size) - real_stop = start + size - if real_stop < stop: - # if the returned block is larger than necessary, mark - # the remainder available - self._add_free_block((arena, real_stop, stop)) - self._allocated_blocks[arena].add((start, real_stop)) - return (arena, start, real_stop) + real_stop = start + size + if real_stop < stop: + # if the returned block is larger than necessary, mark + # the remainder available + self._add_free_block((arena, real_stop, stop)) + self._allocated_blocks[arena].add((start, real_stop)) + return (arena, start, real_stop) # -# Class wrapping a block allocated out of a Heap -- can be inherited by child process +# Class wrapping a block allocated out of a Heap -- can be inherited by child process # class BufferWrapper(object): diff --git a/contrib/tools/python3/src/Lib/multiprocessing/managers.py b/contrib/tools/python3/src/Lib/multiprocessing/managers.py index dfa566c6fc..047ea27672 100644 --- a/contrib/tools/python3/src/Lib/multiprocessing/managers.py +++ b/contrib/tools/python3/src/Lib/multiprocessing/managers.py @@ -1,5 +1,5 @@ # -# Module providing manager classes for dealing +# Module providing manager classes for dealing # with shared objects # # multiprocessing/managers.py @@ -8,7 +8,7 @@ # Licensed to PSF under a Contributor Agreement. # -__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ] +__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ] # # Imports @@ -16,13 +16,13 @@ __all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ] import sys import threading -import signal +import signal import array import queue import time -import types -import os -from os import getpid +import types +import os +from os import getpid from traceback import format_exc @@ -32,13 +32,13 @@ from . import pool from . import process from . import util from . import get_context -try: - from . import shared_memory -except ImportError: - HAS_SHMEM = False -else: - HAS_SHMEM = True - __all__.append('SharedMemoryManager') +try: + from . import shared_memory +except ImportError: + HAS_SHMEM = False +else: + HAS_SHMEM = True + __all__.append('SharedMemoryManager') # # Register some things for pickling @@ -61,7 +61,7 @@ if view_types[0] is not list: # only needed in Py3.0 class Token(object): ''' - Type to uniquely identify a shared object + Type to uniquely identify a shared object ''' __slots__ = ('typeid', 'address', 'id') @@ -250,7 +250,7 @@ class Server(object): try: obj, exposed, gettypeid = \ self.id_to_local_proxy_obj[ident] - except KeyError: + except KeyError: raise ke if methodname not in exposed: @@ -298,7 +298,7 @@ class Server(object): try: try: send(msg) - except Exception: + except Exception: send(('#UNSERIALIZABLE', format_exc())) except Exception as e: util.info('exception in thread serving %r', @@ -362,7 +362,7 @@ class Server(object): finally: self.stop_event.set() - def create(self, c, typeid, /, *args, **kwds): + def create(self, c, typeid, /, *args, **kwds): ''' Create a new shared object and return its id ''' @@ -573,9 +573,9 @@ class BaseManager(object): ''' Create a server, report its address and run it ''' - # bpo-36368: protect server process from KeyboardInterrupt signals - signal.signal(signal.SIGINT, signal.SIG_IGN) - + # bpo-36368: protect server process from KeyboardInterrupt signals + signal.signal(signal.SIGINT, signal.SIG_IGN) + if initializer is not None: initializer(*initargs) @@ -590,7 +590,7 @@ class BaseManager(object): util.info('manager serving at %r', server.address) server.serve_forever() - def _create(self, typeid, /, *args, **kwds): + def _create(self, typeid, /, *args, **kwds): ''' Create a new shared object; return the token and exposed tuple ''' @@ -710,7 +710,7 @@ class BaseManager(object): ) if create_method: - def temp(self, /, *args, **kwds): + def temp(self, /, *args, **kwds): util.debug('requesting creation of a shared %r object', typeid) token, exp = self._create(typeid, *args, **kwds) proxy = proxytype( @@ -796,7 +796,7 @@ class BaseProxy(object): def _callmethod(self, methodname, args=(), kwds={}): ''' - Try to call a method of the referent and return a copy of the result + Try to call a method of the referent and return a copy of the result ''' try: conn = self._tls.connection @@ -950,7 +950,7 @@ def MakeProxyType(name, exposed, _cache={}): dic = {} for meth in exposed: - exec('''def %s(self, /, *args, **kwds): + exec('''def %s(self, /, *args, **kwds): return self._callmethod(%r, args, kwds)''' % (meth, meth), dic) ProxyType = type(name, (BaseProxy,), dic) @@ -989,7 +989,7 @@ def AutoProxy(token, serializer, manager=None, authkey=None, # class Namespace(object): - def __init__(self, /, **kwds): + def __init__(self, /, **kwds): self.__dict__.update(kwds) def __repr__(self): items = list(self.__dict__.items()) @@ -1131,9 +1131,9 @@ class ValueProxy(BaseProxy): return self._callmethod('set', (value,)) value = property(get, set) - __class_getitem__ = classmethod(types.GenericAlias) - + __class_getitem__ = classmethod(types.GenericAlias) + BaseListProxy = MakeProxyType('BaseListProxy', ( '__add__', '__contains__', '__delitem__', '__getitem__', '__len__', '__mul__', '__reversed__', '__rmul__', '__setitem__', @@ -1216,155 +1216,155 @@ SyncManager.register('Namespace', Namespace, NamespaceProxy) # types returned by methods of PoolProxy SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False) SyncManager.register('AsyncResult', create_method=False) - -# -# Definition of SharedMemoryManager and SharedMemoryServer -# - -if HAS_SHMEM: - class _SharedMemoryTracker: - "Manages one or more shared memory segments." - - def __init__(self, name, segment_names=[]): - self.shared_memory_context_name = name - self.segment_names = segment_names - - def register_segment(self, segment_name): - "Adds the supplied shared memory block name to tracker." - util.debug(f"Register segment {segment_name!r} in pid {getpid()}") - self.segment_names.append(segment_name) - - def destroy_segment(self, segment_name): - """Calls unlink() on the shared memory block with the supplied name - and removes it from the list of blocks being tracked.""" - util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}") - self.segment_names.remove(segment_name) - segment = shared_memory.SharedMemory(segment_name) - segment.close() - segment.unlink() - - def unlink(self): - "Calls destroy_segment() on all tracked shared memory blocks." - for segment_name in self.segment_names[:]: - self.destroy_segment(segment_name) - - def __del__(self): - util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}") - self.unlink() - - def __getstate__(self): - return (self.shared_memory_context_name, self.segment_names) - - def __setstate__(self, state): - self.__init__(*state) - - - class SharedMemoryServer(Server): - - public = Server.public + \ - ['track_segment', 'release_segment', 'list_segments'] - - def __init__(self, *args, **kwargs): - Server.__init__(self, *args, **kwargs) - address = self.address - # The address of Linux abstract namespaces can be bytes - if isinstance(address, bytes): - address = os.fsdecode(address) - self.shared_memory_context = \ - _SharedMemoryTracker(f"shm_{address}_{getpid()}") - util.debug(f"SharedMemoryServer started by pid {getpid()}") - - def create(self, c, typeid, /, *args, **kwargs): - """Create a new distributed-shared object (not backed by a shared - memory block) and return its id to be used in a Proxy Object.""" - # Unless set up as a shared proxy, don't make shared_memory_context - # a standard part of kwargs. This makes things easier for supplying - # simple functions. - if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"): - kwargs['shared_memory_context'] = self.shared_memory_context - return Server.create(self, c, typeid, *args, **kwargs) - - def shutdown(self, c): - "Call unlink() on all tracked shared memory, terminate the Server." - self.shared_memory_context.unlink() - return Server.shutdown(self, c) - - def track_segment(self, c, segment_name): - "Adds the supplied shared memory block name to Server's tracker." - self.shared_memory_context.register_segment(segment_name) - - def release_segment(self, c, segment_name): - """Calls unlink() on the shared memory block with the supplied name - and removes it from the tracker instance inside the Server.""" - self.shared_memory_context.destroy_segment(segment_name) - - def list_segments(self, c): - """Returns a list of names of shared memory blocks that the Server - is currently tracking.""" - return self.shared_memory_context.segment_names - - - class SharedMemoryManager(BaseManager): - """Like SyncManager but uses SharedMemoryServer instead of Server. - - It provides methods for creating and returning SharedMemory instances - and for creating a list-like object (ShareableList) backed by shared - memory. It also provides methods that create and return Proxy Objects - that support synchronization across processes (i.e. multi-process-safe - locks and semaphores). - """ - - _Server = SharedMemoryServer - - def __init__(self, *args, **kwargs): - if os.name == "posix": - # bpo-36867: Ensure the resource_tracker is running before - # launching the manager process, so that concurrent - # shared_memory manipulation both in the manager and in the - # current process does not create two resource_tracker - # processes. - from . import resource_tracker - resource_tracker.ensure_running() - BaseManager.__init__(self, *args, **kwargs) - util.debug(f"{self.__class__.__name__} created by pid {getpid()}") - - def __del__(self): - util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}") - pass - - def get_server(self): - 'Better than monkeypatching for now; merge into Server ultimately' - if self._state.value != State.INITIAL: - if self._state.value == State.STARTED: - raise ProcessError("Already started SharedMemoryServer") - elif self._state.value == State.SHUTDOWN: - raise ProcessError("SharedMemoryManager has shut down") - else: - raise ProcessError( - "Unknown state {!r}".format(self._state.value)) - return self._Server(self._registry, self._address, - self._authkey, self._serializer) - - def SharedMemory(self, size): - """Returns a new SharedMemory instance with the specified size in - bytes, to be tracked by the manager.""" - with self._Client(self._address, authkey=self._authkey) as conn: - sms = shared_memory.SharedMemory(None, create=True, size=size) - try: - dispatch(conn, None, 'track_segment', (sms.name,)) - except BaseException as e: - sms.unlink() - raise e - return sms - - def ShareableList(self, sequence): - """Returns a new ShareableList instance populated with the values - from the input sequence, to be tracked by the manager.""" - with self._Client(self._address, authkey=self._authkey) as conn: - sl = shared_memory.ShareableList(sequence) - try: - dispatch(conn, None, 'track_segment', (sl.shm.name,)) - except BaseException as e: - sl.shm.unlink() - raise e - return sl + +# +# Definition of SharedMemoryManager and SharedMemoryServer +# + +if HAS_SHMEM: + class _SharedMemoryTracker: + "Manages one or more shared memory segments." + + def __init__(self, name, segment_names=[]): + self.shared_memory_context_name = name + self.segment_names = segment_names + + def register_segment(self, segment_name): + "Adds the supplied shared memory block name to tracker." + util.debug(f"Register segment {segment_name!r} in pid {getpid()}") + self.segment_names.append(segment_name) + + def destroy_segment(self, segment_name): + """Calls unlink() on the shared memory block with the supplied name + and removes it from the list of blocks being tracked.""" + util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}") + self.segment_names.remove(segment_name) + segment = shared_memory.SharedMemory(segment_name) + segment.close() + segment.unlink() + + def unlink(self): + "Calls destroy_segment() on all tracked shared memory blocks." + for segment_name in self.segment_names[:]: + self.destroy_segment(segment_name) + + def __del__(self): + util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}") + self.unlink() + + def __getstate__(self): + return (self.shared_memory_context_name, self.segment_names) + + def __setstate__(self, state): + self.__init__(*state) + + + class SharedMemoryServer(Server): + + public = Server.public + \ + ['track_segment', 'release_segment', 'list_segments'] + + def __init__(self, *args, **kwargs): + Server.__init__(self, *args, **kwargs) + address = self.address + # The address of Linux abstract namespaces can be bytes + if isinstance(address, bytes): + address = os.fsdecode(address) + self.shared_memory_context = \ + _SharedMemoryTracker(f"shm_{address}_{getpid()}") + util.debug(f"SharedMemoryServer started by pid {getpid()}") + + def create(self, c, typeid, /, *args, **kwargs): + """Create a new distributed-shared object (not backed by a shared + memory block) and return its id to be used in a Proxy Object.""" + # Unless set up as a shared proxy, don't make shared_memory_context + # a standard part of kwargs. This makes things easier for supplying + # simple functions. + if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"): + kwargs['shared_memory_context'] = self.shared_memory_context + return Server.create(self, c, typeid, *args, **kwargs) + + def shutdown(self, c): + "Call unlink() on all tracked shared memory, terminate the Server." + self.shared_memory_context.unlink() + return Server.shutdown(self, c) + + def track_segment(self, c, segment_name): + "Adds the supplied shared memory block name to Server's tracker." + self.shared_memory_context.register_segment(segment_name) + + def release_segment(self, c, segment_name): + """Calls unlink() on the shared memory block with the supplied name + and removes it from the tracker instance inside the Server.""" + self.shared_memory_context.destroy_segment(segment_name) + + def list_segments(self, c): + """Returns a list of names of shared memory blocks that the Server + is currently tracking.""" + return self.shared_memory_context.segment_names + + + class SharedMemoryManager(BaseManager): + """Like SyncManager but uses SharedMemoryServer instead of Server. + + It provides methods for creating and returning SharedMemory instances + and for creating a list-like object (ShareableList) backed by shared + memory. It also provides methods that create and return Proxy Objects + that support synchronization across processes (i.e. multi-process-safe + locks and semaphores). + """ + + _Server = SharedMemoryServer + + def __init__(self, *args, **kwargs): + if os.name == "posix": + # bpo-36867: Ensure the resource_tracker is running before + # launching the manager process, so that concurrent + # shared_memory manipulation both in the manager and in the + # current process does not create two resource_tracker + # processes. + from . import resource_tracker + resource_tracker.ensure_running() + BaseManager.__init__(self, *args, **kwargs) + util.debug(f"{self.__class__.__name__} created by pid {getpid()}") + + def __del__(self): + util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}") + pass + + def get_server(self): + 'Better than monkeypatching for now; merge into Server ultimately' + if self._state.value != State.INITIAL: + if self._state.value == State.STARTED: + raise ProcessError("Already started SharedMemoryServer") + elif self._state.value == State.SHUTDOWN: + raise ProcessError("SharedMemoryManager has shut down") + else: + raise ProcessError( + "Unknown state {!r}".format(self._state.value)) + return self._Server(self._registry, self._address, + self._authkey, self._serializer) + + def SharedMemory(self, size): + """Returns a new SharedMemory instance with the specified size in + bytes, to be tracked by the manager.""" + with self._Client(self._address, authkey=self._authkey) as conn: + sms = shared_memory.SharedMemory(None, create=True, size=size) + try: + dispatch(conn, None, 'track_segment', (sms.name,)) + except BaseException as e: + sms.unlink() + raise e + return sms + + def ShareableList(self, sequence): + """Returns a new ShareableList instance populated with the values + from the input sequence, to be tracked by the manager.""" + with self._Client(self._address, authkey=self._authkey) as conn: + sl = shared_memory.ShareableList(sequence) + try: + dispatch(conn, None, 'track_segment', (sl.shm.name,)) + except BaseException as e: + sl.shm.unlink() + raise e + return sl diff --git a/contrib/tools/python3/src/Lib/multiprocessing/pool.py b/contrib/tools/python3/src/Lib/multiprocessing/pool.py index bbe05a550c..5ae69156f7 100644 --- a/contrib/tools/python3/src/Lib/multiprocessing/pool.py +++ b/contrib/tools/python3/src/Lib/multiprocessing/pool.py @@ -13,30 +13,30 @@ __all__ = ['Pool', 'ThreadPool'] # Imports # -import collections +import collections import itertools import os -import queue -import threading +import queue +import threading import time import traceback -import types -import warnings +import types +import warnings # If threading is available then ThreadPool should be provided. Therefore # we avoid top-level imports which are liable to fail on some systems. from . import util from . import get_context, TimeoutError -from .connection import wait +from .connection import wait # # Constants representing the state of a pool # -INIT = "INIT" -RUN = "RUN" -CLOSE = "CLOSE" -TERMINATE = "TERMINATE" +INIT = "INIT" +RUN = "RUN" +CLOSE = "CLOSE" +TERMINATE = "TERMINATE" # # Miscellaneous @@ -147,54 +147,54 @@ def _helper_reraises_exception(ex): # Class representing a process pool # -class _PoolCache(dict): - """ - Class that implements a cache for the Pool class that will notify - the pool management threads every time the cache is emptied. The - notification is done by the use of a queue that is provided when - instantiating the cache. - """ - def __init__(self, /, *args, notifier=None, **kwds): - self.notifier = notifier - super().__init__(*args, **kwds) - - def __delitem__(self, item): - super().__delitem__(item) - - # Notify that the cache is empty. This is important because the - # pool keeps maintaining workers until the cache gets drained. This - # eliminates a race condition in which a task is finished after the - # the pool's _handle_workers method has enter another iteration of the - # loop. In this situation, the only event that can wake up the pool - # is the cache to be emptied (no more tasks available). - if not self: - self.notifier.put(None) - +class _PoolCache(dict): + """ + Class that implements a cache for the Pool class that will notify + the pool management threads every time the cache is emptied. The + notification is done by the use of a queue that is provided when + instantiating the cache. + """ + def __init__(self, /, *args, notifier=None, **kwds): + self.notifier = notifier + super().__init__(*args, **kwds) + + def __delitem__(self, item): + super().__delitem__(item) + + # Notify that the cache is empty. This is important because the + # pool keeps maintaining workers until the cache gets drained. This + # eliminates a race condition in which a task is finished after the + # the pool's _handle_workers method has enter another iteration of the + # loop. In this situation, the only event that can wake up the pool + # is the cache to be emptied (no more tasks available). + if not self: + self.notifier.put(None) + class Pool(object): ''' Class which supports an async version of applying functions to arguments. ''' _wrap_exception = True - @staticmethod - def Process(ctx, *args, **kwds): - return ctx.Process(*args, **kwds) + @staticmethod + def Process(ctx, *args, **kwds): + return ctx.Process(*args, **kwds) def __init__(self, processes=None, initializer=None, initargs=(), maxtasksperchild=None, context=None): - # Attributes initialized early to make sure that they exist in - # __del__() if __init__() raises an exception - self._pool = [] - self._state = INIT - + # Attributes initialized early to make sure that they exist in + # __del__() if __init__() raises an exception + self._pool = [] + self._state = INIT + self._ctx = context or get_context() self._setup_queues() self._taskqueue = queue.SimpleQueue() - # The _change_notifier queue exist to wake up self._handle_workers() - # when the cache (self._cache) is empty or when there is a change in - # the _state variable of the thread that runs _handle_workers. - self._change_notifier = self._ctx.SimpleQueue() - self._cache = _PoolCache(notifier=self._change_notifier) + # The _change_notifier queue exist to wake up self._handle_workers() + # when the cache (self._cache) is empty or when there is a change in + # the _state variable of the thread that runs _handle_workers. + self._change_notifier = self._ctx.SimpleQueue() + self._cache = _PoolCache(notifier=self._change_notifier) self._maxtasksperchild = maxtasksperchild self._initializer = initializer self._initargs = initargs @@ -208,24 +208,24 @@ class Pool(object): raise TypeError('initializer must be a callable') self._processes = processes - try: - self._repopulate_pool() - except Exception: - for p in self._pool: - if p.exitcode is None: - p.terminate() - for p in self._pool: - p.join() - raise - - sentinels = self._get_sentinels() - + try: + self._repopulate_pool() + except Exception: + for p in self._pool: + if p.exitcode is None: + p.terminate() + for p in self._pool: + p.join() + raise + + sentinels = self._get_sentinels() + self._worker_handler = threading.Thread( target=Pool._handle_workers, - args=(self._cache, self._taskqueue, self._ctx, self.Process, - self._processes, self._pool, self._inqueue, self._outqueue, - self._initializer, self._initargs, self._maxtasksperchild, - self._wrap_exception, sentinels, self._change_notifier) + args=(self._cache, self._taskqueue, self._ctx, self.Process, + self._processes, self._pool, self._inqueue, self._outqueue, + self._initializer, self._initargs, self._maxtasksperchild, + self._wrap_exception, sentinels, self._change_notifier) ) self._worker_handler.daemon = True self._worker_handler._state = RUN @@ -252,92 +252,92 @@ class Pool(object): self._terminate = util.Finalize( self, self._terminate_pool, args=(self._taskqueue, self._inqueue, self._outqueue, self._pool, - self._change_notifier, self._worker_handler, self._task_handler, + self._change_notifier, self._worker_handler, self._task_handler, self._result_handler, self._cache), exitpriority=15 ) - self._state = RUN - - # Copy globals as function locals to make sure that they are available - # during Python shutdown when the Pool is destroyed. - def __del__(self, _warn=warnings.warn, RUN=RUN): - if self._state == RUN: - _warn(f"unclosed running multiprocessing pool {self!r}", - ResourceWarning, source=self) - if getattr(self, '_change_notifier', None) is not None: - self._change_notifier.put(None) - - def __repr__(self): - cls = self.__class__ - return (f'<{cls.__module__}.{cls.__qualname__} ' - f'state={self._state} ' - f'pool_size={len(self._pool)}>') - - def _get_sentinels(self): - task_queue_sentinels = [self._outqueue._reader] - self_notifier_sentinels = [self._change_notifier._reader] - return [*task_queue_sentinels, *self_notifier_sentinels] - - @staticmethod - def _get_worker_sentinels(workers): - return [worker.sentinel for worker in - workers if hasattr(worker, "sentinel")] - - @staticmethod - def _join_exited_workers(pool): + self._state = RUN + + # Copy globals as function locals to make sure that they are available + # during Python shutdown when the Pool is destroyed. + def __del__(self, _warn=warnings.warn, RUN=RUN): + if self._state == RUN: + _warn(f"unclosed running multiprocessing pool {self!r}", + ResourceWarning, source=self) + if getattr(self, '_change_notifier', None) is not None: + self._change_notifier.put(None) + + def __repr__(self): + cls = self.__class__ + return (f'<{cls.__module__}.{cls.__qualname__} ' + f'state={self._state} ' + f'pool_size={len(self._pool)}>') + + def _get_sentinels(self): + task_queue_sentinels = [self._outqueue._reader] + self_notifier_sentinels = [self._change_notifier._reader] + return [*task_queue_sentinels, *self_notifier_sentinels] + + @staticmethod + def _get_worker_sentinels(workers): + return [worker.sentinel for worker in + workers if hasattr(worker, "sentinel")] + + @staticmethod + def _join_exited_workers(pool): """Cleanup after any worker processes which have exited due to reaching their specified lifetime. Returns True if any workers were cleaned up. """ cleaned = False - for i in reversed(range(len(pool))): - worker = pool[i] + for i in reversed(range(len(pool))): + worker = pool[i] if worker.exitcode is not None: # worker exited util.debug('cleaning up worker %d' % i) worker.join() cleaned = True - del pool[i] + del pool[i] return cleaned def _repopulate_pool(self): - return self._repopulate_pool_static(self._ctx, self.Process, - self._processes, - self._pool, self._inqueue, - self._outqueue, self._initializer, - self._initargs, - self._maxtasksperchild, - self._wrap_exception) - - @staticmethod - def _repopulate_pool_static(ctx, Process, processes, pool, inqueue, - outqueue, initializer, initargs, - maxtasksperchild, wrap_exception): + return self._repopulate_pool_static(self._ctx, self.Process, + self._processes, + self._pool, self._inqueue, + self._outqueue, self._initializer, + self._initargs, + self._maxtasksperchild, + self._wrap_exception) + + @staticmethod + def _repopulate_pool_static(ctx, Process, processes, pool, inqueue, + outqueue, initializer, initargs, + maxtasksperchild, wrap_exception): """Bring the number of pool processes up to the specified number, for use after reaping workers which have exited. """ - for i in range(processes - len(pool)): - w = Process(ctx, target=worker, - args=(inqueue, outqueue, - initializer, - initargs, maxtasksperchild, - wrap_exception)) + for i in range(processes - len(pool)): + w = Process(ctx, target=worker, + args=(inqueue, outqueue, + initializer, + initargs, maxtasksperchild, + wrap_exception)) w.name = w.name.replace('Process', 'PoolWorker') w.daemon = True w.start() - pool.append(w) + pool.append(w) util.debug('added worker') - @staticmethod - def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue, - initializer, initargs, maxtasksperchild, - wrap_exception): + @staticmethod + def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue, + initializer, initargs, maxtasksperchild, + wrap_exception): """Clean up any exited workers and start replacements for them. """ - if Pool._join_exited_workers(pool): - Pool._repopulate_pool_static(ctx, Process, processes, pool, - inqueue, outqueue, initializer, - initargs, maxtasksperchild, - wrap_exception) + if Pool._join_exited_workers(pool): + Pool._repopulate_pool_static(ctx, Process, processes, pool, + inqueue, outqueue, initializer, + initargs, maxtasksperchild, + wrap_exception) def _setup_queues(self): self._inqueue = self._ctx.SimpleQueue() @@ -345,10 +345,10 @@ class Pool(object): self._quick_put = self._inqueue._writer.send self._quick_get = self._outqueue._reader.recv - def _check_running(self): - if self._state != RUN: - raise ValueError("Pool not running") - + def _check_running(self): + if self._state != RUN: + raise ValueError("Pool not running") + def apply(self, func, args=(), kwds={}): ''' Equivalent of `func(*args, **kwds)`. @@ -394,9 +394,9 @@ class Pool(object): ''' Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. ''' - self._check_running() + self._check_running() if chunksize == 1: - result = IMapIterator(self) + result = IMapIterator(self) self._taskqueue.put( ( self._guarded_task_generation(result._job, func, iterable), @@ -409,7 +409,7 @@ class Pool(object): "Chunksize must be 1+, not {0:n}".format( chunksize)) task_batches = Pool._get_tasks(func, iterable, chunksize) - result = IMapIterator(self) + result = IMapIterator(self) self._taskqueue.put( ( self._guarded_task_generation(result._job, @@ -423,9 +423,9 @@ class Pool(object): ''' Like `imap()` method but ordering of results is arbitrary. ''' - self._check_running() + self._check_running() if chunksize == 1: - result = IMapUnorderedIterator(self) + result = IMapUnorderedIterator(self) self._taskqueue.put( ( self._guarded_task_generation(result._job, func, iterable), @@ -437,7 +437,7 @@ class Pool(object): raise ValueError( "Chunksize must be 1+, not {0!r}".format(chunksize)) task_batches = Pool._get_tasks(func, iterable, chunksize) - result = IMapUnorderedIterator(self) + result = IMapUnorderedIterator(self) self._taskqueue.put( ( self._guarded_task_generation(result._job, @@ -452,8 +452,8 @@ class Pool(object): ''' Asynchronous version of `apply()` method. ''' - self._check_running() - result = ApplyResult(self, callback, error_callback) + self._check_running() + result = ApplyResult(self, callback, error_callback) self._taskqueue.put(([(result._job, 0, func, args, kwds)], None)) return result @@ -470,7 +470,7 @@ class Pool(object): ''' Helper function to implement map, starmap and their async counterparts. ''' - self._check_running() + self._check_running() if not hasattr(iterable, '__len__'): iterable = list(iterable) @@ -482,7 +482,7 @@ class Pool(object): chunksize = 0 task_batches = Pool._get_tasks(func, iterable, chunksize) - result = MapResult(self, chunksize, len(iterable), callback, + result = MapResult(self, chunksize, len(iterable), callback, error_callback=error_callback) self._taskqueue.put( ( @@ -495,30 +495,30 @@ class Pool(object): return result @staticmethod - def _wait_for_updates(sentinels, change_notifier, timeout=None): - wait(sentinels, timeout=timeout) - while not change_notifier.empty(): - change_notifier.get() - - @classmethod - def _handle_workers(cls, cache, taskqueue, ctx, Process, processes, - pool, inqueue, outqueue, initializer, initargs, - maxtasksperchild, wrap_exception, sentinels, - change_notifier): + def _wait_for_updates(sentinels, change_notifier, timeout=None): + wait(sentinels, timeout=timeout) + while not change_notifier.empty(): + change_notifier.get() + + @classmethod + def _handle_workers(cls, cache, taskqueue, ctx, Process, processes, + pool, inqueue, outqueue, initializer, initargs, + maxtasksperchild, wrap_exception, sentinels, + change_notifier): thread = threading.current_thread() # Keep maintaining workers until the cache gets drained, unless the pool # is terminated. - while thread._state == RUN or (cache and thread._state != TERMINATE): - cls._maintain_pool(ctx, Process, processes, pool, inqueue, - outqueue, initializer, initargs, - maxtasksperchild, wrap_exception) - - current_sentinels = [*cls._get_worker_sentinels(pool), *sentinels] - - cls._wait_for_updates(current_sentinels, change_notifier) + while thread._state == RUN or (cache and thread._state != TERMINATE): + cls._maintain_pool(ctx, Process, processes, pool, inqueue, + outqueue, initializer, initargs, + maxtasksperchild, wrap_exception) + + current_sentinels = [*cls._get_worker_sentinels(pool), *sentinels] + + cls._wait_for_updates(current_sentinels, change_notifier) # send sentinel to stop workers - taskqueue.put(None) + taskqueue.put(None) util.debug('worker handler exiting') @staticmethod @@ -530,7 +530,7 @@ class Pool(object): try: # iterating taskseq cannot fail for task in taskseq: - if thread._state != RUN: + if thread._state != RUN: util.debug('task handler found thread._state != RUN') break try: @@ -578,7 +578,7 @@ class Pool(object): util.debug('result handler got EOFError/OSError -- exiting') return - if thread._state != RUN: + if thread._state != RUN: assert thread._state == TERMINATE, "Thread not in TERMINATE" util.debug('result handler found thread._state=TERMINATE') break @@ -646,7 +646,7 @@ class Pool(object): if self._state == RUN: self._state = CLOSE self._worker_handler._state = CLOSE - self._change_notifier.put(None) + self._change_notifier.put(None) def terminate(self): util.debug('terminating pool') @@ -675,17 +675,17 @@ class Pool(object): time.sleep(0) @classmethod - def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier, + def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier, worker_handler, task_handler, result_handler, cache): # this is guaranteed to only be called once util.debug('finalizing pool') - # Notify that the worker_handler state has been changed so the - # _handle_workers loop can be unblocked (and exited) in order to - # send the finalization sentinel all the workers. + # Notify that the worker_handler state has been changed so the + # _handle_workers loop can be unblocked (and exited) in order to + # send the finalization sentinel all the workers. worker_handler._state = TERMINATE - change_notifier.put(None) - + change_notifier.put(None) + task_handler._state = TERMINATE util.debug('helping task handler/workers to finish') @@ -696,7 +696,7 @@ class Pool(object): "Cannot have cache with result_hander not alive") result_handler._state = TERMINATE - change_notifier.put(None) + change_notifier.put(None) outqueue.put(None) # sentinel # We must wait for the worker handler to exit before terminating @@ -729,7 +729,7 @@ class Pool(object): p.join() def __enter__(self): - self._check_running() + self._check_running() return self def __exit__(self, exc_type, exc_val, exc_tb): @@ -741,14 +741,14 @@ class Pool(object): class ApplyResult(object): - def __init__(self, pool, callback, error_callback): - self._pool = pool + def __init__(self, pool, callback, error_callback): + self._pool = pool self._event = threading.Event() self._job = next(job_counter) - self._cache = pool._cache + self._cache = pool._cache self._callback = callback self._error_callback = error_callback - self._cache[self._job] = self + self._cache[self._job] = self def ready(self): return self._event.is_set() @@ -778,10 +778,10 @@ class ApplyResult(object): self._error_callback(self._value) self._event.set() del self._cache[self._job] - self._pool = None - - __class_getitem__ = classmethod(types.GenericAlias) + self._pool = None + __class_getitem__ = classmethod(types.GenericAlias) + AsyncResult = ApplyResult # create alias -- see #17805 # @@ -790,8 +790,8 @@ AsyncResult = ApplyResult # create alias -- see #17805 class MapResult(ApplyResult): - def __init__(self, pool, chunksize, length, callback, error_callback): - ApplyResult.__init__(self, pool, callback, + def __init__(self, pool, chunksize, length, callback, error_callback): + ApplyResult.__init__(self, pool, callback, error_callback=error_callback) self._success = True self._value = [None] * length @@ -799,7 +799,7 @@ class MapResult(ApplyResult): if chunksize <= 0: self._number_left = 0 self._event.set() - del self._cache[self._job] + del self._cache[self._job] else: self._number_left = length//chunksize + bool(length % chunksize) @@ -813,7 +813,7 @@ class MapResult(ApplyResult): self._callback(self._value) del self._cache[self._job] self._event.set() - self._pool = None + self._pool = None else: if not success and self._success: # only store first exception @@ -825,7 +825,7 @@ class MapResult(ApplyResult): self._error_callback(self._value) del self._cache[self._job] self._event.set() - self._pool = None + self._pool = None # # Class whose instances are returned by `Pool.imap()` @@ -833,16 +833,16 @@ class MapResult(ApplyResult): class IMapIterator(object): - def __init__(self, pool): - self._pool = pool + def __init__(self, pool): + self._pool = pool self._cond = threading.Condition(threading.Lock()) self._job = next(job_counter) - self._cache = pool._cache + self._cache = pool._cache self._items = collections.deque() self._index = 0 self._length = None self._unsorted = {} - self._cache[self._job] = self + self._cache[self._job] = self def __iter__(self): return self @@ -853,14 +853,14 @@ class IMapIterator(object): item = self._items.popleft() except IndexError: if self._index == self._length: - self._pool = None + self._pool = None raise StopIteration from None self._cond.wait(timeout) try: item = self._items.popleft() except IndexError: if self._index == self._length: - self._pool = None + self._pool = None raise StopIteration from None raise TimeoutError from None @@ -886,7 +886,7 @@ class IMapIterator(object): if self._index == self._length: del self._cache[self._job] - self._pool = None + self._pool = None def _set_length(self, length): with self._cond: @@ -894,7 +894,7 @@ class IMapIterator(object): if self._index == self._length: self._cond.notify() del self._cache[self._job] - self._pool = None + self._pool = None # # Class whose instances are returned by `Pool.imap_unordered()` @@ -909,7 +909,7 @@ class IMapUnorderedIterator(IMapIterator): self._cond.notify() if self._index == self._length: del self._cache[self._job] - self._pool = None + self._pool = None # # @@ -919,7 +919,7 @@ class ThreadPool(Pool): _wrap_exception = False @staticmethod - def Process(ctx, *args, **kwds): + def Process(ctx, *args, **kwds): from .dummy import Process return Process(*args, **kwds) @@ -932,14 +932,14 @@ class ThreadPool(Pool): self._quick_put = self._inqueue.put self._quick_get = self._outqueue.get - def _get_sentinels(self): - return [self._change_notifier._reader] - - @staticmethod - def _get_worker_sentinels(workers): - return [] - + def _get_sentinels(self): + return [self._change_notifier._reader] + @staticmethod + def _get_worker_sentinels(workers): + return [] + + @staticmethod def _help_stuff_finish(inqueue, task_handler, size): # drain inqueue, and put sentinels at its head to make workers finish try: @@ -949,6 +949,6 @@ class ThreadPool(Pool): pass for i in range(size): inqueue.put(None) - - def _wait_for_updates(self, sentinels, change_notifier, timeout): - time.sleep(timeout) + + def _wait_for_updates(self, sentinels, change_notifier, timeout): + time.sleep(timeout) diff --git a/contrib/tools/python3/src/Lib/multiprocessing/popen_fork.py b/contrib/tools/python3/src/Lib/multiprocessing/popen_fork.py index 625981cf47..a3da92ef0a 100644 --- a/contrib/tools/python3/src/Lib/multiprocessing/popen_fork.py +++ b/contrib/tools/python3/src/Lib/multiprocessing/popen_fork.py @@ -25,12 +25,12 @@ class Popen(object): if self.returncode is None: try: pid, sts = os.waitpid(self.pid, flag) - except OSError: + except OSError: # Child process not yet created. See #1731717 # e.errno == errno.ECHILD == 10 return None if pid == self.pid: - self.returncode = os.waitstatus_to_exitcode(sts) + self.returncode = os.waitstatus_to_exitcode(sts) return self.returncode def wait(self, timeout=None): @@ -62,20 +62,20 @@ class Popen(object): def _launch(self, process_obj): code = 1 parent_r, child_w = os.pipe() - child_r, parent_w = os.pipe() + child_r, parent_w = os.pipe() self.pid = os.fork() if self.pid == 0: try: os.close(parent_r) - os.close(parent_w) - code = process_obj._bootstrap(parent_sentinel=child_r) + os.close(parent_w) + code = process_obj._bootstrap(parent_sentinel=child_r) finally: os._exit(code) else: os.close(child_w) - os.close(child_r) - self.finalizer = util.Finalize(self, util.close_fds, - (parent_r, parent_w,)) + os.close(child_r) + self.finalizer = util.Finalize(self, util.close_fds, + (parent_r, parent_w,)) self.sentinel = parent_r def close(self): diff --git a/contrib/tools/python3/src/Lib/multiprocessing/popen_forkserver.py b/contrib/tools/python3/src/Lib/multiprocessing/popen_forkserver.py index a56eb9bf11..4cd65bad76 100644 --- a/contrib/tools/python3/src/Lib/multiprocessing/popen_forkserver.py +++ b/contrib/tools/python3/src/Lib/multiprocessing/popen_forkserver.py @@ -49,11 +49,11 @@ class Popen(popen_fork.Popen): set_spawning_popen(None) self.sentinel, w = forkserver.connect_to_new_process(self._fds) - # Keep a duplicate of the data pipe's write end as a sentinel of the - # parent process used by the child process. - _parent_w = os.dup(w) - self.finalizer = util.Finalize(self, util.close_fds, - (_parent_w, self.sentinel)) + # Keep a duplicate of the data pipe's write end as a sentinel of the + # parent process used by the child process. + _parent_w = os.dup(w) + self.finalizer = util.Finalize(self, util.close_fds, + (_parent_w, self.sentinel)) with open(w, 'wb', closefd=True) as f: f.write(buf.getbuffer()) self.pid = forkserver.read_signed(self.sentinel) diff --git a/contrib/tools/python3/src/Lib/multiprocessing/popen_spawn_posix.py b/contrib/tools/python3/src/Lib/multiprocessing/popen_spawn_posix.py index 24b8634523..898b689c38 100644 --- a/contrib/tools/python3/src/Lib/multiprocessing/popen_spawn_posix.py +++ b/contrib/tools/python3/src/Lib/multiprocessing/popen_spawn_posix.py @@ -36,8 +36,8 @@ class Popen(popen_fork.Popen): return fd def _launch(self, process_obj): - from . import resource_tracker - tracker_fd = resource_tracker.getfd() + from . import resource_tracker + tracker_fd = resource_tracker.getfd() self._fds.append(tracker_fd) prep_data = spawn.get_preparation_data(process_obj._name) fp = io.BytesIO() @@ -61,12 +61,12 @@ class Popen(popen_fork.Popen): with open(parent_w, 'wb', closefd=False) as f: f.write(fp.getbuffer()) finally: - fds_to_close = [] - for fd in (parent_r, parent_w): - if fd is not None: - fds_to_close.append(fd) - self.finalizer = util.Finalize(self, util.close_fds, fds_to_close) - - for fd in (child_r, child_w): + fds_to_close = [] + for fd in (parent_r, parent_w): if fd is not None: + fds_to_close.append(fd) + self.finalizer = util.Finalize(self, util.close_fds, fds_to_close) + + for fd in (child_r, child_w): + if fd is not None: os.close(fd) diff --git a/contrib/tools/python3/src/Lib/multiprocessing/popen_spawn_win32.py b/contrib/tools/python3/src/Lib/multiprocessing/popen_spawn_win32.py index 27fe064290..f866c59743 100644 --- a/contrib/tools/python3/src/Lib/multiprocessing/popen_spawn_win32.py +++ b/contrib/tools/python3/src/Lib/multiprocessing/popen_spawn_win32.py @@ -22,7 +22,7 @@ WINSERVICE = sys.executable.lower().endswith("pythonservice.exe") def _path_eq(p1, p2): return p1 == p2 or os.path.normcase(p1) == os.path.normcase(p2) -WINENV = not _path_eq(sys.executable, sys._base_executable) +WINENV = not _path_eq(sys.executable, sys._base_executable) def _close_handles(*handles): @@ -44,12 +44,12 @@ class Popen(object): def __init__(self, process_obj): prep_data = spawn.get_preparation_data(process_obj._name) - # read end of pipe will be duplicated by the child process + # read end of pipe will be duplicated by the child process # -- see spawn_main() in spawn.py. - # - # bpo-33929: Previously, the read end of pipe was "stolen" by the child - # process, but it leaked a handle if the child process had been - # terminated before it could steal the handle from the parent process. + # + # bpo-33929: Previously, the read end of pipe was "stolen" by the child + # process, but it leaked a handle if the child process had been + # terminated before it could steal the handle from the parent process. rhandle, whandle = _winapi.CreatePipe(None, 0) wfd = msvcrt.open_osfhandle(whandle, 0) cmd = spawn.get_command_line(parent_pid=os.getpid(), @@ -73,7 +73,7 @@ class Popen(object): try: hp, ht, pid, tid = _winapi.CreateProcess( python_exe, cmd, - None, None, False, 0, env, None, None) + None, None, False, 0, env, None, None) _winapi.CloseHandle(ht) except: _winapi.CloseHandle(rhandle) @@ -84,8 +84,8 @@ class Popen(object): self.returncode = None self._handle = hp self.sentinel = int(hp) - self.finalizer = util.Finalize(self, _close_handles, - (self.sentinel, int(rhandle))) + self.finalizer = util.Finalize(self, _close_handles, + (self.sentinel, int(rhandle))) # send information to child set_spawning_popen(self) diff --git a/contrib/tools/python3/src/Lib/multiprocessing/process.py b/contrib/tools/python3/src/Lib/multiprocessing/process.py index 0b2e0b45b2..8177dcd7be 100644 --- a/contrib/tools/python3/src/Lib/multiprocessing/process.py +++ b/contrib/tools/python3/src/Lib/multiprocessing/process.py @@ -7,8 +7,8 @@ # Licensed to PSF under a Contributor Agreement. # -__all__ = ['BaseProcess', 'current_process', 'active_children', - 'parent_process'] +__all__ = ['BaseProcess', 'current_process', 'active_children', + 'parent_process'] # # Imports @@ -47,13 +47,13 @@ def active_children(): _cleanup() return list(_children) - -def parent_process(): - ''' - Return process object representing the parent process - ''' - return _parent_process - + +def parent_process(): + ''' + Return process object representing the parent process + ''' + return _parent_process + # # # @@ -84,7 +84,7 @@ class BaseProcess(object): self._identity = _current_process._identity + (count,) self._config = _current_process._config.copy() self._parent_pid = os.getpid() - self._parent_name = _current_process.name + self._parent_name = _current_process.name self._popen = None self._closed = False self._target = target @@ -257,7 +257,7 @@ class BaseProcess(object): raise ValueError("process not started") from None def __repr__(self): - exitcode = None + exitcode = None if self is _current_process: status = 'started' elif self._closed: @@ -267,29 +267,29 @@ class BaseProcess(object): elif self._popen is None: status = 'initial' else: - exitcode = self._popen.poll() - if exitcode is not None: - status = 'stopped' + exitcode = self._popen.poll() + if exitcode is not None: + status = 'stopped' else: status = 'started' - info = [type(self).__name__, 'name=%r' % self._name] - if self._popen is not None: - info.append('pid=%s' % self._popen.pid) - info.append('parent=%s' % self._parent_pid) - info.append(status) - if exitcode is not None: - exitcode = _exitcode_to_name.get(exitcode, exitcode) - info.append('exitcode=%s' % exitcode) - if self.daemon: - info.append('daemon') - return '<%s>' % ' '.join(info) + info = [type(self).__name__, 'name=%r' % self._name] + if self._popen is not None: + info.append('pid=%s' % self._popen.pid) + info.append('parent=%s' % self._parent_pid) + info.append(status) + if exitcode is not None: + exitcode = _exitcode_to_name.get(exitcode, exitcode) + info.append('exitcode=%s' % exitcode) + if self.daemon: + info.append('daemon') + return '<%s>' % ' '.join(info) ## - def _bootstrap(self, parent_sentinel=None): + def _bootstrap(self, parent_sentinel=None): from . import util, context - global _current_process, _parent_process, _process_counter, _children + global _current_process, _parent_process, _process_counter, _children try: if self._start_method is not None: @@ -299,10 +299,10 @@ class BaseProcess(object): util._close_stdin() old_process = _current_process _current_process = self - _parent_process = _ParentProcess( - self._parent_name, self._parent_pid, parent_sentinel) - if threading._HAVE_THREAD_NATIVE_ID: - threading.main_thread()._set_native_id() + _parent_process = _ParentProcess( + self._parent_name, self._parent_pid, parent_sentinel) + if threading._HAVE_THREAD_NATIVE_ID: + threading.main_thread()._set_native_id() try: util._finalizer_registry.clear() util._run_after_forkers() @@ -317,12 +317,12 @@ class BaseProcess(object): finally: util._exit_function() except SystemExit as e: - if e.code is None: - exitcode = 0 - elif isinstance(e.code, int): - exitcode = e.code + if e.code is None: + exitcode = 0 + elif isinstance(e.code, int): + exitcode = e.code else: - sys.stderr.write(str(e.code) + '\n') + sys.stderr.write(str(e.code) + '\n') exitcode = 1 except: exitcode = 1 @@ -350,41 +350,41 @@ class AuthenticationString(bytes): ) return AuthenticationString, (bytes(self),) - -# -# Create object representing the parent process -# - -class _ParentProcess(BaseProcess): - - def __init__(self, name, pid, sentinel): - self._identity = () - self._name = name - self._pid = pid - self._parent_pid = None - self._popen = None - self._closed = False - self._sentinel = sentinel - self._config = {} - - def is_alive(self): - from multiprocessing.connection import wait - return not wait([self._sentinel], timeout=0) - - @property - def ident(self): - return self._pid - - def join(self, timeout=None): - ''' - Wait until parent process terminates - ''' - from multiprocessing.connection import wait - wait([self._sentinel], timeout=timeout) - - pid = ident - + # +# Create object representing the parent process +# + +class _ParentProcess(BaseProcess): + + def __init__(self, name, pid, sentinel): + self._identity = () + self._name = name + self._pid = pid + self._parent_pid = None + self._popen = None + self._closed = False + self._sentinel = sentinel + self._config = {} + + def is_alive(self): + from multiprocessing.connection import wait + return not wait([self._sentinel], timeout=0) + + @property + def ident(self): + return self._pid + + def join(self, timeout=None): + ''' + Wait until parent process terminates + ''' + from multiprocessing.connection import wait + wait([self._sentinel], timeout=timeout) + + pid = ident + +# # Create object representing the main process # @@ -412,7 +412,7 @@ class _MainProcess(BaseProcess): pass -_parent_process = None +_parent_process = None _current_process = _MainProcess() _process_counter = itertools.count(1) _children = set() @@ -426,7 +426,7 @@ _exitcode_to_name = {} for name, signum in list(signal.__dict__.items()): if name[:3]=='SIG' and '_' not in name: - _exitcode_to_name[-signum] = f'-{name}' + _exitcode_to_name[-signum] = f'-{name}' # For debug and leak testing _dangling = WeakSet() diff --git a/contrib/tools/python3/src/Lib/multiprocessing/queues.py b/contrib/tools/python3/src/Lib/multiprocessing/queues.py index a290181487..1646d270ec 100644 --- a/contrib/tools/python3/src/Lib/multiprocessing/queues.py +++ b/contrib/tools/python3/src/Lib/multiprocessing/queues.py @@ -14,7 +14,7 @@ import os import threading import collections import time -import types +import types import weakref import errno @@ -49,7 +49,7 @@ class Queue(object): self._sem = ctx.BoundedSemaphore(maxsize) # For use by concurrent.futures self._ignore_epipe = False - self._reset() + self._reset() if sys.platform != 'win32': register_after_fork(self, Queue._after_fork) @@ -62,17 +62,17 @@ class Queue(object): def __setstate__(self, state): (self._ignore_epipe, self._maxsize, self._reader, self._writer, self._rlock, self._wlock, self._sem, self._opid) = state - self._reset() + self._reset() def _after_fork(self): debug('Queue._after_fork()') - self._reset(after_fork=True) - - def _reset(self, after_fork=False): - if after_fork: - self._notempty._at_fork_reinit() - else: - self._notempty = threading.Condition(threading.Lock()) + self._reset(after_fork=True) + + def _reset(self, after_fork=False): + if after_fork: + self._notempty._at_fork_reinit() + else: + self._notempty = threading.Condition(threading.Lock()) self._buffer = collections.deque() self._thread = None self._jointhread = None @@ -84,8 +84,8 @@ class Queue(object): self._poll = self._reader.poll def put(self, obj, block=True, timeout=None): - if self._closed: - raise ValueError(f"Queue {self!r} is closed") + if self._closed: + raise ValueError(f"Queue {self!r} is closed") if not self._sem.acquire(block, timeout): raise Full @@ -96,8 +96,8 @@ class Queue(object): self._notempty.notify() def get(self, block=True, timeout=None): - if self._closed: - raise ValueError(f"Queue {self!r} is closed") + if self._closed: + raise ValueError(f"Queue {self!r} is closed") if block and timeout is None: with self._rlock: res = self._recv_bytes() @@ -307,8 +307,8 @@ class JoinableQueue(Queue): self._cond, self._unfinished_tasks = state[-2:] def put(self, obj, block=True, timeout=None): - if self._closed: - raise ValueError(f"Queue {self!r} is closed") + if self._closed: + raise ValueError(f"Queue {self!r} is closed") if not self._sem.acquire(block, timeout): raise Full @@ -346,10 +346,10 @@ class SimpleQueue(object): else: self._wlock = ctx.Lock() - def close(self): - self._reader.close() - self._writer.close() - + def close(self): + self._reader.close() + self._writer.close() + def empty(self): return not self._poll() @@ -376,5 +376,5 @@ class SimpleQueue(object): else: with self._wlock: self._writer.send_bytes(obj) - - __class_getitem__ = classmethod(types.GenericAlias) + + __class_getitem__ = classmethod(types.GenericAlias) diff --git a/contrib/tools/python3/src/Lib/multiprocessing/reduction.py b/contrib/tools/python3/src/Lib/multiprocessing/reduction.py index 5593f0682f..d4a7f802ba 100644 --- a/contrib/tools/python3/src/Lib/multiprocessing/reduction.py +++ b/contrib/tools/python3/src/Lib/multiprocessing/reduction.py @@ -68,16 +68,16 @@ if sys.platform == 'win32': __all__ += ['DupHandle', 'duplicate', 'steal_handle'] import _winapi - def duplicate(handle, target_process=None, inheritable=False, - *, source_process=None): + def duplicate(handle, target_process=None, inheritable=False, + *, source_process=None): '''Duplicate a handle. (target_process is a handle not a pid!)''' - current_process = _winapi.GetCurrentProcess() - if source_process is None: - source_process = current_process + current_process = _winapi.GetCurrentProcess() + if source_process is None: + source_process = current_process if target_process is None: - target_process = current_process + target_process = current_process return _winapi.DuplicateHandle( - source_process, handle, target_process, + source_process, handle, target_process, 0, inheritable, _winapi.DUPLICATE_SAME_ACCESS) def steal_handle(source_pid, handle): diff --git a/contrib/tools/python3/src/Lib/multiprocessing/resource_sharer.py b/contrib/tools/python3/src/Lib/multiprocessing/resource_sharer.py index 66076509a1..aef0c1e84f 100644 --- a/contrib/tools/python3/src/Lib/multiprocessing/resource_sharer.py +++ b/contrib/tools/python3/src/Lib/multiprocessing/resource_sharer.py @@ -59,7 +59,7 @@ else: class _ResourceSharer(object): - '''Manager for resources using background thread.''' + '''Manager for resources using background thread.''' def __init__(self): self._key = 0 self._cache = {} @@ -112,7 +112,7 @@ class _ResourceSharer(object): for key, (send, close) in self._cache.items(): close() self._cache.clear() - self._lock._at_fork_reinit() + self._lock._at_fork_reinit() if self._listener is not None: self._listener.close() self._listener = None @@ -132,7 +132,7 @@ class _ResourceSharer(object): def _serve(self): if hasattr(signal, 'pthread_sigmask'): - signal.pthread_sigmask(signal.SIG_BLOCK, signal.valid_signals()) + signal.pthread_sigmask(signal.SIG_BLOCK, signal.valid_signals()) while 1: try: with self._listener.accept() as conn: diff --git a/contrib/tools/python3/src/Lib/multiprocessing/resource_tracker.py b/contrib/tools/python3/src/Lib/multiprocessing/resource_tracker.py index c9bfa9b82b..dc24abcae1 100644 --- a/contrib/tools/python3/src/Lib/multiprocessing/resource_tracker.py +++ b/contrib/tools/python3/src/Lib/multiprocessing/resource_tracker.py @@ -1,231 +1,231 @@ -############################################################################### -# Server process to keep track of unlinked resources (like shared memory -# segments, semaphores etc.) and clean them. -# -# On Unix we run a server process which keeps track of unlinked -# resources. The server ignores SIGINT and SIGTERM and reads from a -# pipe. Every other process of the program has a copy of the writable -# end of the pipe, so we get EOF when all other processes have exited. -# Then the server process unlinks any remaining resource names. -# -# This is important because there may be system limits for such resources: for -# instance, the system only supports a limited number of named semaphores, and -# shared-memory segments live in the RAM. If a python process leaks such a -# resource, this resource will not be removed till the next reboot. Without -# this resource tracker process, "killall python" would probably leave unlinked -# resources. - -import os -import signal -import sys -import threading -import warnings - -from . import spawn -from . import util - -__all__ = ['ensure_running', 'register', 'unregister'] - -_HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask') -_IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM) - -_CLEANUP_FUNCS = { - 'noop': lambda: None, -} - -if os.name == 'posix': - import _multiprocessing - import _posixshmem - - _CLEANUP_FUNCS.update({ - 'semaphore': _multiprocessing.sem_unlink, - 'shared_memory': _posixshmem.shm_unlink, - }) - - -class ResourceTracker(object): - - def __init__(self): - self._lock = threading.Lock() - self._fd = None - self._pid = None - - def _stop(self): - with self._lock: - if self._fd is None: - # not running - return - - # closing the "alive" file descriptor stops main() - os.close(self._fd) - self._fd = None - - os.waitpid(self._pid, 0) - self._pid = None - - def getfd(self): - self.ensure_running() - return self._fd - - def ensure_running(self): - '''Make sure that resource tracker process is running. - - This can be run from any process. Usually a child process will use - the resource created by its parent.''' - with self._lock: - if self._fd is not None: - # resource tracker was launched before, is it still running? - if self._check_alive(): - # => still alive - return - # => dead, launch it again - os.close(self._fd) - - # Clean-up to avoid dangling processes. - try: - # _pid can be None if this process is a child from another - # python process, which has started the resource_tracker. - if self._pid is not None: - os.waitpid(self._pid, 0) - except ChildProcessError: - # The resource_tracker has already been terminated. - pass - self._fd = None - self._pid = None - - warnings.warn('resource_tracker: process died unexpectedly, ' - 'relaunching. Some resources might leak.') - - fds_to_pass = [] - try: - fds_to_pass.append(sys.stderr.fileno()) - except Exception: - pass - cmd = 'from multiprocessing.resource_tracker import main;main(%d)' - r, w = os.pipe() - try: - fds_to_pass.append(r) - # process will out live us, so no need to wait on pid - exe = spawn.get_executable() - args = [exe] + util._args_from_interpreter_flags() - args += ['-c', cmd % r] - # bpo-33613: Register a signal mask that will block the signals. - # This signal mask will be inherited by the child that is going - # to be spawned and will protect the child from a race condition - # that can make the child die before it registers signal handlers - # for SIGINT and SIGTERM. The mask is unregistered after spawning - # the child. - try: - if _HAVE_SIGMASK: - signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS) - pid = util.spawnv_passfds(exe, args, fds_to_pass) - finally: - if _HAVE_SIGMASK: - signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS) - except: - os.close(w) - raise - else: - self._fd = w - self._pid = pid - finally: - os.close(r) - - def _check_alive(self): - '''Check that the pipe has not been closed by sending a probe.''' - try: - # We cannot use send here as it calls ensure_running, creating - # a cycle. - os.write(self._fd, b'PROBE:0:noop\n') - except OSError: - return False - else: - return True - - def register(self, name, rtype): - '''Register name of resource with resource tracker.''' - self._send('REGISTER', name, rtype) - - def unregister(self, name, rtype): - '''Unregister name of resource with resource tracker.''' - self._send('UNREGISTER', name, rtype) - - def _send(self, cmd, name, rtype): - self.ensure_running() - msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii') - if len(name) > 512: - # posix guarantees that writes to a pipe of less than PIPE_BUF - # bytes are atomic, and that PIPE_BUF >= 512 - raise ValueError('name too long') - nbytes = os.write(self._fd, msg) - assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format( - nbytes, len(msg)) - - -_resource_tracker = ResourceTracker() -ensure_running = _resource_tracker.ensure_running -register = _resource_tracker.register -unregister = _resource_tracker.unregister -getfd = _resource_tracker.getfd - -def main(fd): - '''Run resource tracker.''' - # protect the process from ^C and "killall python" etc - signal.signal(signal.SIGINT, signal.SIG_IGN) - signal.signal(signal.SIGTERM, signal.SIG_IGN) - if _HAVE_SIGMASK: - signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS) - - for f in (sys.stdin, sys.stdout): - try: - f.close() - except Exception: - pass - - cache = {rtype: set() for rtype in _CLEANUP_FUNCS.keys()} - try: - # keep track of registered/unregistered resources - with open(fd, 'rb') as f: - for line in f: - try: - cmd, name, rtype = line.strip().decode('ascii').split(':') - cleanup_func = _CLEANUP_FUNCS.get(rtype, None) - if cleanup_func is None: - raise ValueError( - f'Cannot register {name} for automatic cleanup: ' - f'unknown resource type {rtype}') - - if cmd == 'REGISTER': - cache[rtype].add(name) - elif cmd == 'UNREGISTER': - cache[rtype].remove(name) - elif cmd == 'PROBE': - pass - else: - raise RuntimeError('unrecognized command %r' % cmd) - except Exception: - try: - sys.excepthook(*sys.exc_info()) - except: - pass - finally: - # all processes have terminated; cleanup any remaining resources - for rtype, rtype_cache in cache.items(): - if rtype_cache: - try: - warnings.warn('resource_tracker: There appear to be %d ' - 'leaked %s objects to clean up at shutdown' % - (len(rtype_cache), rtype)) - except Exception: - pass - for name in rtype_cache: - # For some reason the process which created and registered this - # resource has failed to unregister it. Presumably it has - # died. We therefore unlink it. - try: - try: - _CLEANUP_FUNCS[rtype](name) - except Exception as e: - warnings.warn('resource_tracker: %r: %s' % (name, e)) - finally: - pass +############################################################################### +# Server process to keep track of unlinked resources (like shared memory +# segments, semaphores etc.) and clean them. +# +# On Unix we run a server process which keeps track of unlinked +# resources. The server ignores SIGINT and SIGTERM and reads from a +# pipe. Every other process of the program has a copy of the writable +# end of the pipe, so we get EOF when all other processes have exited. +# Then the server process unlinks any remaining resource names. +# +# This is important because there may be system limits for such resources: for +# instance, the system only supports a limited number of named semaphores, and +# shared-memory segments live in the RAM. If a python process leaks such a +# resource, this resource will not be removed till the next reboot. Without +# this resource tracker process, "killall python" would probably leave unlinked +# resources. + +import os +import signal +import sys +import threading +import warnings + +from . import spawn +from . import util + +__all__ = ['ensure_running', 'register', 'unregister'] + +_HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask') +_IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM) + +_CLEANUP_FUNCS = { + 'noop': lambda: None, +} + +if os.name == 'posix': + import _multiprocessing + import _posixshmem + + _CLEANUP_FUNCS.update({ + 'semaphore': _multiprocessing.sem_unlink, + 'shared_memory': _posixshmem.shm_unlink, + }) + + +class ResourceTracker(object): + + def __init__(self): + self._lock = threading.Lock() + self._fd = None + self._pid = None + + def _stop(self): + with self._lock: + if self._fd is None: + # not running + return + + # closing the "alive" file descriptor stops main() + os.close(self._fd) + self._fd = None + + os.waitpid(self._pid, 0) + self._pid = None + + def getfd(self): + self.ensure_running() + return self._fd + + def ensure_running(self): + '''Make sure that resource tracker process is running. + + This can be run from any process. Usually a child process will use + the resource created by its parent.''' + with self._lock: + if self._fd is not None: + # resource tracker was launched before, is it still running? + if self._check_alive(): + # => still alive + return + # => dead, launch it again + os.close(self._fd) + + # Clean-up to avoid dangling processes. + try: + # _pid can be None if this process is a child from another + # python process, which has started the resource_tracker. + if self._pid is not None: + os.waitpid(self._pid, 0) + except ChildProcessError: + # The resource_tracker has already been terminated. + pass + self._fd = None + self._pid = None + + warnings.warn('resource_tracker: process died unexpectedly, ' + 'relaunching. Some resources might leak.') + + fds_to_pass = [] + try: + fds_to_pass.append(sys.stderr.fileno()) + except Exception: + pass + cmd = 'from multiprocessing.resource_tracker import main;main(%d)' + r, w = os.pipe() + try: + fds_to_pass.append(r) + # process will out live us, so no need to wait on pid + exe = spawn.get_executable() + args = [exe] + util._args_from_interpreter_flags() + args += ['-c', cmd % r] + # bpo-33613: Register a signal mask that will block the signals. + # This signal mask will be inherited by the child that is going + # to be spawned and will protect the child from a race condition + # that can make the child die before it registers signal handlers + # for SIGINT and SIGTERM. The mask is unregistered after spawning + # the child. + try: + if _HAVE_SIGMASK: + signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS) + pid = util.spawnv_passfds(exe, args, fds_to_pass) + finally: + if _HAVE_SIGMASK: + signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS) + except: + os.close(w) + raise + else: + self._fd = w + self._pid = pid + finally: + os.close(r) + + def _check_alive(self): + '''Check that the pipe has not been closed by sending a probe.''' + try: + # We cannot use send here as it calls ensure_running, creating + # a cycle. + os.write(self._fd, b'PROBE:0:noop\n') + except OSError: + return False + else: + return True + + def register(self, name, rtype): + '''Register name of resource with resource tracker.''' + self._send('REGISTER', name, rtype) + + def unregister(self, name, rtype): + '''Unregister name of resource with resource tracker.''' + self._send('UNREGISTER', name, rtype) + + def _send(self, cmd, name, rtype): + self.ensure_running() + msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii') + if len(name) > 512: + # posix guarantees that writes to a pipe of less than PIPE_BUF + # bytes are atomic, and that PIPE_BUF >= 512 + raise ValueError('name too long') + nbytes = os.write(self._fd, msg) + assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format( + nbytes, len(msg)) + + +_resource_tracker = ResourceTracker() +ensure_running = _resource_tracker.ensure_running +register = _resource_tracker.register +unregister = _resource_tracker.unregister +getfd = _resource_tracker.getfd + +def main(fd): + '''Run resource tracker.''' + # protect the process from ^C and "killall python" etc + signal.signal(signal.SIGINT, signal.SIG_IGN) + signal.signal(signal.SIGTERM, signal.SIG_IGN) + if _HAVE_SIGMASK: + signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS) + + for f in (sys.stdin, sys.stdout): + try: + f.close() + except Exception: + pass + + cache = {rtype: set() for rtype in _CLEANUP_FUNCS.keys()} + try: + # keep track of registered/unregistered resources + with open(fd, 'rb') as f: + for line in f: + try: + cmd, name, rtype = line.strip().decode('ascii').split(':') + cleanup_func = _CLEANUP_FUNCS.get(rtype, None) + if cleanup_func is None: + raise ValueError( + f'Cannot register {name} for automatic cleanup: ' + f'unknown resource type {rtype}') + + if cmd == 'REGISTER': + cache[rtype].add(name) + elif cmd == 'UNREGISTER': + cache[rtype].remove(name) + elif cmd == 'PROBE': + pass + else: + raise RuntimeError('unrecognized command %r' % cmd) + except Exception: + try: + sys.excepthook(*sys.exc_info()) + except: + pass + finally: + # all processes have terminated; cleanup any remaining resources + for rtype, rtype_cache in cache.items(): + if rtype_cache: + try: + warnings.warn('resource_tracker: There appear to be %d ' + 'leaked %s objects to clean up at shutdown' % + (len(rtype_cache), rtype)) + except Exception: + pass + for name in rtype_cache: + # For some reason the process which created and registered this + # resource has failed to unregister it. Presumably it has + # died. We therefore unlink it. + try: + try: + _CLEANUP_FUNCS[rtype](name) + except Exception as e: + warnings.warn('resource_tracker: %r: %s' % (name, e)) + finally: + pass diff --git a/contrib/tools/python3/src/Lib/multiprocessing/shared_memory.py b/contrib/tools/python3/src/Lib/multiprocessing/shared_memory.py index 122b3fcebf..db0516a993 100644 --- a/contrib/tools/python3/src/Lib/multiprocessing/shared_memory.py +++ b/contrib/tools/python3/src/Lib/multiprocessing/shared_memory.py @@ -1,532 +1,532 @@ -"""Provides shared memory for direct access across processes. - -The API of this package is currently provisional. Refer to the -documentation for details. -""" - - -__all__ = [ 'SharedMemory', 'ShareableList' ] - - -from functools import partial -import mmap -import os -import errno -import struct -import secrets -import types - -if os.name == "nt": - import _winapi - _USE_POSIX = False -else: - import _posixshmem - _USE_POSIX = True - - -_O_CREX = os.O_CREAT | os.O_EXCL - -# FreeBSD (and perhaps other BSDs) limit names to 14 characters. -_SHM_SAFE_NAME_LENGTH = 14 - -# Shared memory block name prefix -if _USE_POSIX: - _SHM_NAME_PREFIX = '/psm_' -else: - _SHM_NAME_PREFIX = 'wnsm_' - - -def _make_filename(): - "Create a random filename for the shared memory object." - # number of random bytes to use for name - nbytes = (_SHM_SAFE_NAME_LENGTH - len(_SHM_NAME_PREFIX)) // 2 - assert nbytes >= 2, '_SHM_NAME_PREFIX too long' - name = _SHM_NAME_PREFIX + secrets.token_hex(nbytes) - assert len(name) <= _SHM_SAFE_NAME_LENGTH - return name - - -class SharedMemory: - """Creates a new shared memory block or attaches to an existing - shared memory block. - - Every shared memory block is assigned a unique name. This enables - one process to create a shared memory block with a particular name - so that a different process can attach to that same shared memory - block using that same name. - - As a resource for sharing data across processes, shared memory blocks - may outlive the original process that created them. When one process - no longer needs access to a shared memory block that might still be - needed by other processes, the close() method should be called. - When a shared memory block is no longer needed by any process, the - unlink() method should be called to ensure proper cleanup.""" - - # Defaults; enables close() and unlink() to run without errors. - _name = None - _fd = -1 - _mmap = None - _buf = None - _flags = os.O_RDWR - _mode = 0o600 - _prepend_leading_slash = True if _USE_POSIX else False - - def __init__(self, name=None, create=False, size=0): - if not size >= 0: - raise ValueError("'size' must be a positive integer") - if create: - self._flags = _O_CREX | os.O_RDWR - if size == 0: - raise ValueError("'size' must be a positive number different from zero") - if name is None and not self._flags & os.O_EXCL: - raise ValueError("'name' can only be None if create=True") - - if _USE_POSIX: - - # POSIX Shared Memory - - if name is None: - while True: - name = _make_filename() - try: - self._fd = _posixshmem.shm_open( - name, - self._flags, - mode=self._mode - ) - except FileExistsError: - continue - self._name = name - break - else: - name = "/" + name if self._prepend_leading_slash else name - self._fd = _posixshmem.shm_open( - name, - self._flags, - mode=self._mode - ) - self._name = name - try: - if create and size: - os.ftruncate(self._fd, size) - stats = os.fstat(self._fd) - size = stats.st_size - self._mmap = mmap.mmap(self._fd, size) - except OSError: - self.unlink() - raise - - from .resource_tracker import register - register(self._name, "shared_memory") - - else: - - # Windows Named Shared Memory - - if create: - while True: - temp_name = _make_filename() if name is None else name - # Create and reserve shared memory block with this name - # until it can be attached to by mmap. - h_map = _winapi.CreateFileMapping( - _winapi.INVALID_HANDLE_VALUE, - _winapi.NULL, - _winapi.PAGE_READWRITE, - (size >> 32) & 0xFFFFFFFF, - size & 0xFFFFFFFF, - temp_name - ) - try: - last_error_code = _winapi.GetLastError() - if last_error_code == _winapi.ERROR_ALREADY_EXISTS: - if name is not None: - raise FileExistsError( - errno.EEXIST, - os.strerror(errno.EEXIST), - name, - _winapi.ERROR_ALREADY_EXISTS - ) - else: - continue - self._mmap = mmap.mmap(-1, size, tagname=temp_name) - finally: - _winapi.CloseHandle(h_map) - self._name = temp_name - break - - else: - self._name = name - # Dynamically determine the existing named shared memory - # block's size which is likely a multiple of mmap.PAGESIZE. - h_map = _winapi.OpenFileMapping( - _winapi.FILE_MAP_READ, - False, - name - ) - try: - p_buf = _winapi.MapViewOfFile( - h_map, - _winapi.FILE_MAP_READ, - 0, - 0, - 0 - ) - finally: - _winapi.CloseHandle(h_map) - size = _winapi.VirtualQuerySize(p_buf) - self._mmap = mmap.mmap(-1, size, tagname=name) - - self._size = size - self._buf = memoryview(self._mmap) - - def __del__(self): - try: - self.close() - except OSError: - pass - - def __reduce__(self): - return ( - self.__class__, - ( - self.name, - False, - self.size, - ), - ) - - def __repr__(self): - return f'{self.__class__.__name__}({self.name!r}, size={self.size})' - - @property - def buf(self): - "A memoryview of contents of the shared memory block." - return self._buf - - @property - def name(self): - "Unique name that identifies the shared memory block." - reported_name = self._name - if _USE_POSIX and self._prepend_leading_slash: - if self._name.startswith("/"): - reported_name = self._name[1:] - return reported_name - - @property - def size(self): - "Size in bytes." - return self._size - - def close(self): - """Closes access to the shared memory from this instance but does - not destroy the shared memory block.""" - if self._buf is not None: - self._buf.release() - self._buf = None - if self._mmap is not None: - self._mmap.close() - self._mmap = None - if _USE_POSIX and self._fd >= 0: - os.close(self._fd) - self._fd = -1 - - def unlink(self): - """Requests that the underlying shared memory block be destroyed. - - In order to ensure proper cleanup of resources, unlink should be - called once (and only once) across all processes which have access - to the shared memory block.""" - if _USE_POSIX and self._name: - from .resource_tracker import unregister - _posixshmem.shm_unlink(self._name) - unregister(self._name, "shared_memory") - - -_encoding = "utf8" - -class ShareableList: - """Pattern for a mutable list-like object shareable via a shared - memory block. It differs from the built-in list type in that these - lists can not change their overall length (i.e. no append, insert, - etc.) - - Because values are packed into a memoryview as bytes, the struct - packing format for any storable value must require no more than 8 - characters to describe its format.""" - - # The shared memory area is organized as follows: - # - 8 bytes: number of items (N) as a 64-bit integer - # - (N + 1) * 8 bytes: offsets of each element from the start of the - # data area - # - K bytes: the data area storing item values (with encoding and size - # depending on their respective types) - # - N * 8 bytes: `struct` format string for each element - # - N bytes: index into _back_transforms_mapping for each element - # (for reconstructing the corresponding Python value) - _types_mapping = { - int: "q", - float: "d", - bool: "xxxxxxx?", - str: "%ds", - bytes: "%ds", - None.__class__: "xxxxxx?x", - } - _alignment = 8 - _back_transforms_mapping = { - 0: lambda value: value, # int, float, bool - 1: lambda value: value.rstrip(b'\x00').decode(_encoding), # str - 2: lambda value: value.rstrip(b'\x00'), # bytes - 3: lambda _value: None, # None - } - - @staticmethod - def _extract_recreation_code(value): - """Used in concert with _back_transforms_mapping to convert values - into the appropriate Python objects when retrieving them from - the list as well as when storing them.""" - if not isinstance(value, (str, bytes, None.__class__)): - return 0 - elif isinstance(value, str): - return 1 - elif isinstance(value, bytes): - return 2 - else: - return 3 # NoneType - - def __init__(self, sequence=None, *, name=None): - if name is None or sequence is not None: - sequence = sequence or () - _formats = [ - self._types_mapping[type(item)] - if not isinstance(item, (str, bytes)) - else self._types_mapping[type(item)] % ( - self._alignment * (len(item) // self._alignment + 1), - ) - for item in sequence - ] - self._list_len = len(_formats) - assert sum(len(fmt) <= 8 for fmt in _formats) == self._list_len - offset = 0 - # The offsets of each list element into the shared memory's - # data area (0 meaning the start of the data area, not the start - # of the shared memory area). - self._allocated_offsets = [0] - for fmt in _formats: - offset += self._alignment if fmt[-1] != "s" else int(fmt[:-1]) - self._allocated_offsets.append(offset) - _recreation_codes = [ - self._extract_recreation_code(item) for item in sequence - ] - requested_size = struct.calcsize( - "q" + self._format_size_metainfo + - "".join(_formats) + - self._format_packing_metainfo + - self._format_back_transform_codes - ) - - self.shm = SharedMemory(name, create=True, size=requested_size) - else: - self.shm = SharedMemory(name) - - if sequence is not None: - _enc = _encoding - struct.pack_into( - "q" + self._format_size_metainfo, - self.shm.buf, - 0, - self._list_len, - *(self._allocated_offsets) - ) - struct.pack_into( - "".join(_formats), - self.shm.buf, - self._offset_data_start, - *(v.encode(_enc) if isinstance(v, str) else v for v in sequence) - ) - struct.pack_into( - self._format_packing_metainfo, - self.shm.buf, - self._offset_packing_formats, - *(v.encode(_enc) for v in _formats) - ) - struct.pack_into( - self._format_back_transform_codes, - self.shm.buf, - self._offset_back_transform_codes, - *(_recreation_codes) - ) - - else: - self._list_len = len(self) # Obtains size from offset 0 in buffer. - self._allocated_offsets = list( - struct.unpack_from( - self._format_size_metainfo, - self.shm.buf, - 1 * 8 - ) - ) - - def _get_packing_format(self, position): - "Gets the packing format for a single value stored in the list." - position = position if position >= 0 else position + self._list_len - if (position >= self._list_len) or (self._list_len < 0): - raise IndexError("Requested position out of range.") - - v = struct.unpack_from( - "8s", - self.shm.buf, - self._offset_packing_formats + position * 8 - )[0] - fmt = v.rstrip(b'\x00') - fmt_as_str = fmt.decode(_encoding) - - return fmt_as_str - - def _get_back_transform(self, position): - "Gets the back transformation function for a single value." - - if (position >= self._list_len) or (self._list_len < 0): - raise IndexError("Requested position out of range.") - - transform_code = struct.unpack_from( - "b", - self.shm.buf, - self._offset_back_transform_codes + position - )[0] - transform_function = self._back_transforms_mapping[transform_code] - - return transform_function - - def _set_packing_format_and_transform(self, position, fmt_as_str, value): - """Sets the packing format and back transformation code for a - single value in the list at the specified position.""" - - if (position >= self._list_len) or (self._list_len < 0): - raise IndexError("Requested position out of range.") - - struct.pack_into( - "8s", - self.shm.buf, - self._offset_packing_formats + position * 8, - fmt_as_str.encode(_encoding) - ) - - transform_code = self._extract_recreation_code(value) - struct.pack_into( - "b", - self.shm.buf, - self._offset_back_transform_codes + position, - transform_code - ) - - def __getitem__(self, position): - position = position if position >= 0 else position + self._list_len - try: - offset = self._offset_data_start + self._allocated_offsets[position] - (v,) = struct.unpack_from( - self._get_packing_format(position), - self.shm.buf, - offset - ) - except IndexError: - raise IndexError("index out of range") - - back_transform = self._get_back_transform(position) - v = back_transform(v) - - return v - - def __setitem__(self, position, value): - position = position if position >= 0 else position + self._list_len - try: - item_offset = self._allocated_offsets[position] - offset = self._offset_data_start + item_offset - current_format = self._get_packing_format(position) - except IndexError: - raise IndexError("assignment index out of range") - - if not isinstance(value, (str, bytes)): - new_format = self._types_mapping[type(value)] - encoded_value = value - else: - allocated_length = self._allocated_offsets[position + 1] - item_offset - - encoded_value = (value.encode(_encoding) - if isinstance(value, str) else value) - if len(encoded_value) > allocated_length: - raise ValueError("bytes/str item exceeds available storage") - if current_format[-1] == "s": - new_format = current_format - else: - new_format = self._types_mapping[str] % ( - allocated_length, - ) - - self._set_packing_format_and_transform( - position, - new_format, - value - ) - struct.pack_into(new_format, self.shm.buf, offset, encoded_value) - - def __reduce__(self): - return partial(self.__class__, name=self.shm.name), () - - def __len__(self): - return struct.unpack_from("q", self.shm.buf, 0)[0] - - def __repr__(self): - return f'{self.__class__.__name__}({list(self)}, name={self.shm.name!r})' - - @property - def format(self): - "The struct packing format used by all currently stored items." - return "".join( - self._get_packing_format(i) for i in range(self._list_len) - ) - - @property - def _format_size_metainfo(self): - "The struct packing format used for the items' storage offsets." - return "q" * (self._list_len + 1) - - @property - def _format_packing_metainfo(self): - "The struct packing format used for the items' packing formats." - return "8s" * self._list_len - - @property - def _format_back_transform_codes(self): - "The struct packing format used for the items' back transforms." - return "b" * self._list_len - - @property - def _offset_data_start(self): - # - 8 bytes for the list length - # - (N + 1) * 8 bytes for the element offsets - return (self._list_len + 2) * 8 - - @property - def _offset_packing_formats(self): - return self._offset_data_start + self._allocated_offsets[-1] - - @property - def _offset_back_transform_codes(self): - return self._offset_packing_formats + self._list_len * 8 - - def count(self, value): - "L.count(value) -> integer -- return number of occurrences of value." - - return sum(value == entry for entry in self) - - def index(self, value): - """L.index(value) -> integer -- return first index of value. - Raises ValueError if the value is not present.""" - - for position, entry in enumerate(self): - if value == entry: - return position - else: - raise ValueError(f"{value!r} not in this container") - - __class_getitem__ = classmethod(types.GenericAlias) +"""Provides shared memory for direct access across processes. + +The API of this package is currently provisional. Refer to the +documentation for details. +""" + + +__all__ = [ 'SharedMemory', 'ShareableList' ] + + +from functools import partial +import mmap +import os +import errno +import struct +import secrets +import types + +if os.name == "nt": + import _winapi + _USE_POSIX = False +else: + import _posixshmem + _USE_POSIX = True + + +_O_CREX = os.O_CREAT | os.O_EXCL + +# FreeBSD (and perhaps other BSDs) limit names to 14 characters. +_SHM_SAFE_NAME_LENGTH = 14 + +# Shared memory block name prefix +if _USE_POSIX: + _SHM_NAME_PREFIX = '/psm_' +else: + _SHM_NAME_PREFIX = 'wnsm_' + + +def _make_filename(): + "Create a random filename for the shared memory object." + # number of random bytes to use for name + nbytes = (_SHM_SAFE_NAME_LENGTH - len(_SHM_NAME_PREFIX)) // 2 + assert nbytes >= 2, '_SHM_NAME_PREFIX too long' + name = _SHM_NAME_PREFIX + secrets.token_hex(nbytes) + assert len(name) <= _SHM_SAFE_NAME_LENGTH + return name + + +class SharedMemory: + """Creates a new shared memory block or attaches to an existing + shared memory block. + + Every shared memory block is assigned a unique name. This enables + one process to create a shared memory block with a particular name + so that a different process can attach to that same shared memory + block using that same name. + + As a resource for sharing data across processes, shared memory blocks + may outlive the original process that created them. When one process + no longer needs access to a shared memory block that might still be + needed by other processes, the close() method should be called. + When a shared memory block is no longer needed by any process, the + unlink() method should be called to ensure proper cleanup.""" + + # Defaults; enables close() and unlink() to run without errors. + _name = None + _fd = -1 + _mmap = None + _buf = None + _flags = os.O_RDWR + _mode = 0o600 + _prepend_leading_slash = True if _USE_POSIX else False + + def __init__(self, name=None, create=False, size=0): + if not size >= 0: + raise ValueError("'size' must be a positive integer") + if create: + self._flags = _O_CREX | os.O_RDWR + if size == 0: + raise ValueError("'size' must be a positive number different from zero") + if name is None and not self._flags & os.O_EXCL: + raise ValueError("'name' can only be None if create=True") + + if _USE_POSIX: + + # POSIX Shared Memory + + if name is None: + while True: + name = _make_filename() + try: + self._fd = _posixshmem.shm_open( + name, + self._flags, + mode=self._mode + ) + except FileExistsError: + continue + self._name = name + break + else: + name = "/" + name if self._prepend_leading_slash else name + self._fd = _posixshmem.shm_open( + name, + self._flags, + mode=self._mode + ) + self._name = name + try: + if create and size: + os.ftruncate(self._fd, size) + stats = os.fstat(self._fd) + size = stats.st_size + self._mmap = mmap.mmap(self._fd, size) + except OSError: + self.unlink() + raise + + from .resource_tracker import register + register(self._name, "shared_memory") + + else: + + # Windows Named Shared Memory + + if create: + while True: + temp_name = _make_filename() if name is None else name + # Create and reserve shared memory block with this name + # until it can be attached to by mmap. + h_map = _winapi.CreateFileMapping( + _winapi.INVALID_HANDLE_VALUE, + _winapi.NULL, + _winapi.PAGE_READWRITE, + (size >> 32) & 0xFFFFFFFF, + size & 0xFFFFFFFF, + temp_name + ) + try: + last_error_code = _winapi.GetLastError() + if last_error_code == _winapi.ERROR_ALREADY_EXISTS: + if name is not None: + raise FileExistsError( + errno.EEXIST, + os.strerror(errno.EEXIST), + name, + _winapi.ERROR_ALREADY_EXISTS + ) + else: + continue + self._mmap = mmap.mmap(-1, size, tagname=temp_name) + finally: + _winapi.CloseHandle(h_map) + self._name = temp_name + break + + else: + self._name = name + # Dynamically determine the existing named shared memory + # block's size which is likely a multiple of mmap.PAGESIZE. + h_map = _winapi.OpenFileMapping( + _winapi.FILE_MAP_READ, + False, + name + ) + try: + p_buf = _winapi.MapViewOfFile( + h_map, + _winapi.FILE_MAP_READ, + 0, + 0, + 0 + ) + finally: + _winapi.CloseHandle(h_map) + size = _winapi.VirtualQuerySize(p_buf) + self._mmap = mmap.mmap(-1, size, tagname=name) + + self._size = size + self._buf = memoryview(self._mmap) + + def __del__(self): + try: + self.close() + except OSError: + pass + + def __reduce__(self): + return ( + self.__class__, + ( + self.name, + False, + self.size, + ), + ) + + def __repr__(self): + return f'{self.__class__.__name__}({self.name!r}, size={self.size})' + + @property + def buf(self): + "A memoryview of contents of the shared memory block." + return self._buf + + @property + def name(self): + "Unique name that identifies the shared memory block." + reported_name = self._name + if _USE_POSIX and self._prepend_leading_slash: + if self._name.startswith("/"): + reported_name = self._name[1:] + return reported_name + + @property + def size(self): + "Size in bytes." + return self._size + + def close(self): + """Closes access to the shared memory from this instance but does + not destroy the shared memory block.""" + if self._buf is not None: + self._buf.release() + self._buf = None + if self._mmap is not None: + self._mmap.close() + self._mmap = None + if _USE_POSIX and self._fd >= 0: + os.close(self._fd) + self._fd = -1 + + def unlink(self): + """Requests that the underlying shared memory block be destroyed. + + In order to ensure proper cleanup of resources, unlink should be + called once (and only once) across all processes which have access + to the shared memory block.""" + if _USE_POSIX and self._name: + from .resource_tracker import unregister + _posixshmem.shm_unlink(self._name) + unregister(self._name, "shared_memory") + + +_encoding = "utf8" + +class ShareableList: + """Pattern for a mutable list-like object shareable via a shared + memory block. It differs from the built-in list type in that these + lists can not change their overall length (i.e. no append, insert, + etc.) + + Because values are packed into a memoryview as bytes, the struct + packing format for any storable value must require no more than 8 + characters to describe its format.""" + + # The shared memory area is organized as follows: + # - 8 bytes: number of items (N) as a 64-bit integer + # - (N + 1) * 8 bytes: offsets of each element from the start of the + # data area + # - K bytes: the data area storing item values (with encoding and size + # depending on their respective types) + # - N * 8 bytes: `struct` format string for each element + # - N bytes: index into _back_transforms_mapping for each element + # (for reconstructing the corresponding Python value) + _types_mapping = { + int: "q", + float: "d", + bool: "xxxxxxx?", + str: "%ds", + bytes: "%ds", + None.__class__: "xxxxxx?x", + } + _alignment = 8 + _back_transforms_mapping = { + 0: lambda value: value, # int, float, bool + 1: lambda value: value.rstrip(b'\x00').decode(_encoding), # str + 2: lambda value: value.rstrip(b'\x00'), # bytes + 3: lambda _value: None, # None + } + + @staticmethod + def _extract_recreation_code(value): + """Used in concert with _back_transforms_mapping to convert values + into the appropriate Python objects when retrieving them from + the list as well as when storing them.""" + if not isinstance(value, (str, bytes, None.__class__)): + return 0 + elif isinstance(value, str): + return 1 + elif isinstance(value, bytes): + return 2 + else: + return 3 # NoneType + + def __init__(self, sequence=None, *, name=None): + if name is None or sequence is not None: + sequence = sequence or () + _formats = [ + self._types_mapping[type(item)] + if not isinstance(item, (str, bytes)) + else self._types_mapping[type(item)] % ( + self._alignment * (len(item) // self._alignment + 1), + ) + for item in sequence + ] + self._list_len = len(_formats) + assert sum(len(fmt) <= 8 for fmt in _formats) == self._list_len + offset = 0 + # The offsets of each list element into the shared memory's + # data area (0 meaning the start of the data area, not the start + # of the shared memory area). + self._allocated_offsets = [0] + for fmt in _formats: + offset += self._alignment if fmt[-1] != "s" else int(fmt[:-1]) + self._allocated_offsets.append(offset) + _recreation_codes = [ + self._extract_recreation_code(item) for item in sequence + ] + requested_size = struct.calcsize( + "q" + self._format_size_metainfo + + "".join(_formats) + + self._format_packing_metainfo + + self._format_back_transform_codes + ) + + self.shm = SharedMemory(name, create=True, size=requested_size) + else: + self.shm = SharedMemory(name) + + if sequence is not None: + _enc = _encoding + struct.pack_into( + "q" + self._format_size_metainfo, + self.shm.buf, + 0, + self._list_len, + *(self._allocated_offsets) + ) + struct.pack_into( + "".join(_formats), + self.shm.buf, + self._offset_data_start, + *(v.encode(_enc) if isinstance(v, str) else v for v in sequence) + ) + struct.pack_into( + self._format_packing_metainfo, + self.shm.buf, + self._offset_packing_formats, + *(v.encode(_enc) for v in _formats) + ) + struct.pack_into( + self._format_back_transform_codes, + self.shm.buf, + self._offset_back_transform_codes, + *(_recreation_codes) + ) + + else: + self._list_len = len(self) # Obtains size from offset 0 in buffer. + self._allocated_offsets = list( + struct.unpack_from( + self._format_size_metainfo, + self.shm.buf, + 1 * 8 + ) + ) + + def _get_packing_format(self, position): + "Gets the packing format for a single value stored in the list." + position = position if position >= 0 else position + self._list_len + if (position >= self._list_len) or (self._list_len < 0): + raise IndexError("Requested position out of range.") + + v = struct.unpack_from( + "8s", + self.shm.buf, + self._offset_packing_formats + position * 8 + )[0] + fmt = v.rstrip(b'\x00') + fmt_as_str = fmt.decode(_encoding) + + return fmt_as_str + + def _get_back_transform(self, position): + "Gets the back transformation function for a single value." + + if (position >= self._list_len) or (self._list_len < 0): + raise IndexError("Requested position out of range.") + + transform_code = struct.unpack_from( + "b", + self.shm.buf, + self._offset_back_transform_codes + position + )[0] + transform_function = self._back_transforms_mapping[transform_code] + + return transform_function + + def _set_packing_format_and_transform(self, position, fmt_as_str, value): + """Sets the packing format and back transformation code for a + single value in the list at the specified position.""" + + if (position >= self._list_len) or (self._list_len < 0): + raise IndexError("Requested position out of range.") + + struct.pack_into( + "8s", + self.shm.buf, + self._offset_packing_formats + position * 8, + fmt_as_str.encode(_encoding) + ) + + transform_code = self._extract_recreation_code(value) + struct.pack_into( + "b", + self.shm.buf, + self._offset_back_transform_codes + position, + transform_code + ) + + def __getitem__(self, position): + position = position if position >= 0 else position + self._list_len + try: + offset = self._offset_data_start + self._allocated_offsets[position] + (v,) = struct.unpack_from( + self._get_packing_format(position), + self.shm.buf, + offset + ) + except IndexError: + raise IndexError("index out of range") + + back_transform = self._get_back_transform(position) + v = back_transform(v) + + return v + + def __setitem__(self, position, value): + position = position if position >= 0 else position + self._list_len + try: + item_offset = self._allocated_offsets[position] + offset = self._offset_data_start + item_offset + current_format = self._get_packing_format(position) + except IndexError: + raise IndexError("assignment index out of range") + + if not isinstance(value, (str, bytes)): + new_format = self._types_mapping[type(value)] + encoded_value = value + else: + allocated_length = self._allocated_offsets[position + 1] - item_offset + + encoded_value = (value.encode(_encoding) + if isinstance(value, str) else value) + if len(encoded_value) > allocated_length: + raise ValueError("bytes/str item exceeds available storage") + if current_format[-1] == "s": + new_format = current_format + else: + new_format = self._types_mapping[str] % ( + allocated_length, + ) + + self._set_packing_format_and_transform( + position, + new_format, + value + ) + struct.pack_into(new_format, self.shm.buf, offset, encoded_value) + + def __reduce__(self): + return partial(self.__class__, name=self.shm.name), () + + def __len__(self): + return struct.unpack_from("q", self.shm.buf, 0)[0] + + def __repr__(self): + return f'{self.__class__.__name__}({list(self)}, name={self.shm.name!r})' + + @property + def format(self): + "The struct packing format used by all currently stored items." + return "".join( + self._get_packing_format(i) for i in range(self._list_len) + ) + + @property + def _format_size_metainfo(self): + "The struct packing format used for the items' storage offsets." + return "q" * (self._list_len + 1) + + @property + def _format_packing_metainfo(self): + "The struct packing format used for the items' packing formats." + return "8s" * self._list_len + + @property + def _format_back_transform_codes(self): + "The struct packing format used for the items' back transforms." + return "b" * self._list_len + + @property + def _offset_data_start(self): + # - 8 bytes for the list length + # - (N + 1) * 8 bytes for the element offsets + return (self._list_len + 2) * 8 + + @property + def _offset_packing_formats(self): + return self._offset_data_start + self._allocated_offsets[-1] + + @property + def _offset_back_transform_codes(self): + return self._offset_packing_formats + self._list_len * 8 + + def count(self, value): + "L.count(value) -> integer -- return number of occurrences of value." + + return sum(value == entry for entry in self) + + def index(self, value): + """L.index(value) -> integer -- return first index of value. + Raises ValueError if the value is not present.""" + + for position, entry in enumerate(self): + if value == entry: + return position + else: + raise ValueError(f"{value!r} not in this container") + + __class_getitem__ = classmethod(types.GenericAlias) diff --git a/contrib/tools/python3/src/Lib/multiprocessing/spawn.py b/contrib/tools/python3/src/Lib/multiprocessing/spawn.py index f7f8deb246..2ac0b653b6 100644 --- a/contrib/tools/python3/src/Lib/multiprocessing/spawn.py +++ b/contrib/tools/python3/src/Lib/multiprocessing/spawn.py @@ -96,28 +96,28 @@ def spawn_main(pipe_handle, parent_pid=None, tracker_fd=None): assert is_forking(sys.argv), "Not forking" if sys.platform == 'win32': import msvcrt - import _winapi - - if parent_pid is not None: - source_process = _winapi.OpenProcess( - _winapi.SYNCHRONIZE | _winapi.PROCESS_DUP_HANDLE, - False, parent_pid) - else: - source_process = None - new_handle = reduction.duplicate(pipe_handle, - source_process=source_process) + import _winapi + + if parent_pid is not None: + source_process = _winapi.OpenProcess( + _winapi.SYNCHRONIZE | _winapi.PROCESS_DUP_HANDLE, + False, parent_pid) + else: + source_process = None + new_handle = reduction.duplicate(pipe_handle, + source_process=source_process) fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY) - parent_sentinel = source_process + parent_sentinel = source_process else: - from . import resource_tracker - resource_tracker._resource_tracker._fd = tracker_fd + from . import resource_tracker + resource_tracker._resource_tracker._fd = tracker_fd fd = pipe_handle - parent_sentinel = os.dup(pipe_handle) - exitcode = _main(fd, parent_sentinel) + parent_sentinel = os.dup(pipe_handle) + exitcode = _main(fd, parent_sentinel) sys.exit(exitcode) -def _main(fd, parent_sentinel): +def _main(fd, parent_sentinel): with os.fdopen(fd, 'rb', closefd=True) as from_parent: process.current_process()._inheriting = True try: @@ -126,7 +126,7 @@ def _main(fd, parent_sentinel): self = reduction.pickle.load(from_parent) finally: del process.current_process()._inheriting - return self._bootstrap(parent_sentinel) + return self._bootstrap(parent_sentinel) def _check_not_importing_main(): diff --git a/contrib/tools/python3/src/Lib/multiprocessing/synchronize.py b/contrib/tools/python3/src/Lib/multiprocessing/synchronize.py index d0be48f1fd..881c823d28 100644 --- a/contrib/tools/python3/src/Lib/multiprocessing/synchronize.py +++ b/contrib/tools/python3/src/Lib/multiprocessing/synchronize.py @@ -76,16 +76,16 @@ class SemLock(object): # We only get here if we are on Unix with forking # disabled. When the object is garbage collected or the # process shuts down we unlink the semaphore name - from .resource_tracker import register - register(self._semlock.name, "semaphore") + from .resource_tracker import register + register(self._semlock.name, "semaphore") util.Finalize(self, SemLock._cleanup, (self._semlock.name,), exitpriority=0) @staticmethod def _cleanup(name): - from .resource_tracker import unregister + from .resource_tracker import unregister sem_unlink(name) - unregister(name, "semaphore") + unregister(name, "semaphore") def _make_methods(self): self.acquire = self._semlock.acquire @@ -270,7 +270,7 @@ class Condition(object): def notify(self, n=1): assert self._lock._semlock._is_mine(), 'lock is not owned' assert not self._wait_semaphore.acquire( - False), ('notify: Should not have been able to acquire ' + False), ('notify: Should not have been able to acquire ' + '_wait_semaphore') # to take account of timeouts since last notify*() we subtract diff --git a/contrib/tools/python3/src/Lib/multiprocessing/util.py b/contrib/tools/python3/src/Lib/multiprocessing/util.py index e94466be8e..fed9ddf6ee 100644 --- a/contrib/tools/python3/src/Lib/multiprocessing/util.py +++ b/contrib/tools/python3/src/Lib/multiprocessing/util.py @@ -102,42 +102,42 @@ def log_to_stderr(level=None): _log_to_stderr = True return _logger - -# Abstract socket support - -def _platform_supports_abstract_sockets(): - if sys.platform == "linux": - return True - if hasattr(sys, 'getandroidapilevel'): - return True - return False - - -def is_abstract_socket_namespace(address): - if not address: - return False - if isinstance(address, bytes): - return address[0] == 0 - elif isinstance(address, str): - return address[0] == "\0" - raise TypeError('address type of {address!r} unrecognized') - - -abstract_sockets_supported = _platform_supports_abstract_sockets() - + +# Abstract socket support + +def _platform_supports_abstract_sockets(): + if sys.platform == "linux": + return True + if hasattr(sys, 'getandroidapilevel'): + return True + return False + + +def is_abstract_socket_namespace(address): + if not address: + return False + if isinstance(address, bytes): + return address[0] == 0 + elif isinstance(address, str): + return address[0] == "\0" + raise TypeError('address type of {address!r} unrecognized') + + +abstract_sockets_supported = _platform_supports_abstract_sockets() + # # Function returning a temp directory which will be removed on exit # -def _remove_temp_dir(rmtree, tempdir): - rmtree(tempdir) - - current_process = process.current_process() - # current_process() can be None if the finalizer is called - # late during Python finalization - if current_process is not None: - current_process._config['tempdir'] = None - +def _remove_temp_dir(rmtree, tempdir): + rmtree(tempdir) + + current_process = process.current_process() + # current_process() can be None if the finalizer is called + # late during Python finalization + if current_process is not None: + current_process._config['tempdir'] = None + def get_temp_dir(): # get name of a temp directory which will be automatically cleaned up tempdir = process.current_process()._config.get('tempdir') @@ -145,10 +145,10 @@ def get_temp_dir(): import shutil, tempfile tempdir = tempfile.mkdtemp(prefix='pymp-') info('created temp directory %s', tempdir) - # keep a strong reference to shutil.rmtree(), since the finalizer - # can be called late during Python shutdown - Finalize(None, _remove_temp_dir, args=(shutil.rmtree, tempdir), - exitpriority=-100) + # keep a strong reference to shutil.rmtree(), since the finalizer + # can be called late during Python shutdown + Finalize(None, _remove_temp_dir, args=(shutil.rmtree, tempdir), + exitpriority=-100) process.current_process()._config['tempdir'] = tempdir return tempdir @@ -261,7 +261,7 @@ class Finalize(object): if self._kwargs: x += ', kwargs=' + str(self._kwargs) if self._key[0] is not None: - x += ', exitpriority=' + str(self._key[0]) + x += ', exitpriority=' + str(self._key[0]) return x + '>' @@ -370,11 +370,11 @@ class ForkAwareThreadLock(object): self._lock = threading.Lock() self.acquire = self._lock.acquire self.release = self._lock.release - register_after_fork(self, ForkAwareThreadLock._at_fork_reinit) - - def _at_fork_reinit(self): - self._lock._at_fork_reinit() + register_after_fork(self, ForkAwareThreadLock._at_fork_reinit) + def _at_fork_reinit(self): + self._lock._at_fork_reinit() + def __enter__(self): return self._lock.__enter__() @@ -467,38 +467,38 @@ def spawnv_passfds(path, args, passfds): return _posixsubprocess.fork_exec( args, [os.fsencode(path)], True, passfds, None, _env_list(), -1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write, - False, False, None, None, None, -1, None) + False, False, None, None, None, -1, None) finally: os.close(errpipe_read) os.close(errpipe_write) - - -def close_fds(*fds): - """Close each file descriptor given as an argument""" - for fd in fds: - os.close(fd) - - -def _cleanup_tests(): - """Cleanup multiprocessing resources when multiprocessing tests - completed.""" - - from test import support - - # cleanup multiprocessing - process._cleanup() - - # Stop the ForkServer process if it's running - from multiprocessing import forkserver - forkserver._forkserver._stop() - - # Stop the ResourceTracker process if it's running - from multiprocessing import resource_tracker - resource_tracker._resource_tracker._stop() - - # bpo-37421: Explicitly call _run_finalizers() to remove immediately - # temporary directories created by multiprocessing.util.get_temp_dir(). - _run_finalizers() - support.gc_collect() - - support.reap_children() + + +def close_fds(*fds): + """Close each file descriptor given as an argument""" + for fd in fds: + os.close(fd) + + +def _cleanup_tests(): + """Cleanup multiprocessing resources when multiprocessing tests + completed.""" + + from test import support + + # cleanup multiprocessing + process._cleanup() + + # Stop the ForkServer process if it's running + from multiprocessing import forkserver + forkserver._forkserver._stop() + + # Stop the ResourceTracker process if it's running + from multiprocessing import resource_tracker + resource_tracker._resource_tracker._stop() + + # bpo-37421: Explicitly call _run_finalizers() to remove immediately + # temporary directories created by multiprocessing.util.get_temp_dir(). + _run_finalizers() + support.gc_collect() + + support.reap_children() |